Skip to main content

fakecloud_stepfunctions/
interpreter.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::Utc;
5use serde_json::{json, Value};
6use tracing::{debug, warn};
7
8use fakecloud_aws::arn::Arn;
9use fakecloud_core::delivery::DeliveryBus;
10use fakecloud_dynamodb::state::SharedDynamoDbState;
11
12use crate::choice::evaluate_choice;
13use crate::error_handling::{find_catcher, should_retry};
14use crate::io_processing::{apply_input_path, apply_output_path, apply_result_path};
15use crate::state::{ExecutionStatus, HistoryEvent, SharedStepFunctionsState};
16
17/// Execute a state machine definition with the given input.
18/// Updates the execution record in shared state as it progresses.
19pub async fn execute_state_machine(
20    state: SharedStepFunctionsState,
21    execution_arn: String,
22    definition: String,
23    input: Option<String>,
24    delivery: Option<Arc<DeliveryBus>>,
25    dynamodb_state: Option<SharedDynamoDbState>,
26) {
27    let def: Value = match serde_json::from_str(&definition) {
28        Ok(v) => v,
29        Err(e) => {
30            fail_execution(
31                &state,
32                &execution_arn,
33                "States.Runtime",
34                &format!("Failed to parse definition: {e}"),
35            );
36            return;
37        }
38    };
39
40    let raw_input: Value = input
41        .as_deref()
42        .and_then(|s| serde_json::from_str(s).ok())
43        .unwrap_or(json!({}));
44
45    // Record ExecutionStarted event
46    add_event(
47        &state,
48        &execution_arn,
49        "ExecutionStarted",
50        0,
51        json!({
52            "input": serde_json::to_string(&raw_input).expect("serde_json::Value serialization is infallible"),
53            "roleArn": "arn:aws:iam::123456789012:role/execution-role"
54        }),
55    );
56
57    // Run the state machine inside an inner tokio::spawn so that any panic
58    // bubbles up as a JoinError instead of tearing down the caller. Without
59    // this the panic propagates through the outer spawn in `start_execution`
60    // which leaves the execution stuck in Running and leaks the panic to
61    // tokio's default hook.
62    let def_owned = def;
63    let state_clone = state.clone();
64    let execution_arn_clone = execution_arn.clone();
65    let delivery_clone = delivery.clone();
66    let dynamodb_state_clone = dynamodb_state.clone();
67    let handle = tokio::spawn(async move {
68        run_states(
69            &def_owned,
70            raw_input,
71            &delivery_clone,
72            &dynamodb_state_clone,
73            &state_clone,
74            &execution_arn_clone,
75        )
76        .await
77    });
78
79    match handle.await {
80        Ok(Ok(output)) => {
81            succeed_execution(&state, &execution_arn, &output);
82        }
83        Ok(Err((error, cause))) => {
84            fail_execution(&state, &execution_arn, &error, &cause);
85        }
86        Err(join_err) => {
87            let msg = if join_err.is_panic() {
88                let payload = join_err.into_panic();
89                if let Some(s) = payload.downcast_ref::<String>() {
90                    s.clone()
91                } else if let Some(s) = payload.downcast_ref::<&'static str>() {
92                    (*s).to_string()
93                } else {
94                    "execution task panicked".to_string()
95                }
96            } else {
97                format!("execution task cancelled: {join_err}")
98            };
99            tracing::error!(
100                execution_arn = %execution_arn,
101                panic = %msg,
102                "Step Functions execution panicked"
103            );
104            fail_execution(&state, &execution_arn, "States.Runtime", &msg);
105        }
106    }
107}
108
109type StatesResult<'a> = std::pin::Pin<
110    Box<dyn std::future::Future<Output = Result<Value, (String, String)>> + Send + 'a>,
111>;
112
113/// Core execution loop: runs through states in a definition and returns the output.
114/// Used by the top-level executor, Parallel branches, and Map iterations.
115fn run_states<'a>(
116    def: &'a Value,
117    input: Value,
118    delivery: &'a Option<Arc<DeliveryBus>>,
119    dynamodb_state: &'a Option<SharedDynamoDbState>,
120    shared_state: &'a SharedStepFunctionsState,
121    execution_arn: &'a str,
122) -> StatesResult<'a> {
123    Box::pin(async move {
124        let start_at = def["StartAt"]
125            .as_str()
126            .ok_or_else(|| {
127                (
128                    "States.Runtime".to_string(),
129                    "Missing StartAt in definition".to_string(),
130                )
131            })?
132            .to_string();
133
134        let states = def.get("States").ok_or_else(|| {
135            (
136                "States.Runtime".to_string(),
137                "Missing States in definition".to_string(),
138            )
139        })?;
140
141        let mut current_state = start_at;
142        let mut effective_input = input;
143        let mut iteration = 0;
144        let max_iterations = 500;
145
146        loop {
147            iteration += 1;
148            if iteration > max_iterations {
149                return Err((
150                    "States.Runtime".to_string(),
151                    "Maximum number of state transitions exceeded".to_string(),
152                ));
153            }
154
155            let state_def = states.get(&current_state).cloned().ok_or_else(|| {
156                (
157                    "States.Runtime".to_string(),
158                    format!("State '{current_state}' not found in definition"),
159                )
160            })?;
161
162            let state_type = state_def["Type"]
163                .as_str()
164                .ok_or_else(|| {
165                    (
166                        "States.Runtime".to_string(),
167                        format!("State '{current_state}' missing Type field"),
168                    )
169                })?
170                .to_string();
171
172            debug!(
173                execution_arn = %execution_arn,
174                state = %current_state,
175                state_type = %state_type,
176                "Executing state"
177            );
178
179            let advance = match state_type.as_str() {
180                "Pass" => run_pass_state(
181                    &current_state,
182                    &state_def,
183                    effective_input,
184                    shared_state,
185                    execution_arn,
186                ),
187                "Succeed" => run_succeed_state(
188                    &current_state,
189                    &state_def,
190                    effective_input,
191                    shared_state,
192                    execution_arn,
193                ),
194                "Fail" => run_fail_state(
195                    &current_state,
196                    &state_def,
197                    effective_input,
198                    shared_state,
199                    execution_arn,
200                ),
201                "Choice" => run_choice_state(
202                    &current_state,
203                    &state_def,
204                    effective_input,
205                    shared_state,
206                    execution_arn,
207                ),
208                "Wait" => {
209                    run_wait_state(
210                        &current_state,
211                        &state_def,
212                        effective_input,
213                        shared_state,
214                        execution_arn,
215                    )
216                    .await
217                }
218                "Task" => {
219                    run_task_state(
220                        &current_state,
221                        &state_def,
222                        effective_input,
223                        delivery,
224                        dynamodb_state,
225                        shared_state,
226                        execution_arn,
227                    )
228                    .await
229                }
230                "Parallel" => {
231                    run_parallel_state(
232                        &current_state,
233                        &state_def,
234                        effective_input,
235                        delivery,
236                        dynamodb_state,
237                        shared_state,
238                        execution_arn,
239                    )
240                    .await
241                }
242                "Map" => {
243                    run_map_state(
244                        &current_state,
245                        &state_def,
246                        effective_input,
247                        delivery,
248                        dynamodb_state,
249                        shared_state,
250                        execution_arn,
251                    )
252                    .await
253                }
254                other => Advance::Fail(
255                    "States.Runtime".to_string(),
256                    format!("Unsupported state type: '{other}'"),
257                ),
258            };
259
260            match advance {
261                Advance::Next(next, new_input) => {
262                    effective_input = new_input;
263                    current_state = next;
264                }
265                Advance::End(output) => return Ok(output),
266                Advance::Fail(error, cause) => return Err((error, cause)),
267            }
268        }
269    })
270}
271
272/// Result of executing a single state in the state machine.
273enum Advance {
274    /// Continue to the given state with the given input.
275    Next(String, Value),
276    /// Terminate the state machine with the given output.
277    End(Value),
278    /// Fail the state machine with the given error and cause.
279    Fail(String, String),
280}
281
282fn advance_from_next(state_def: &Value, input: Value) -> Advance {
283    match next_state(state_def) {
284        NextState::Name(next) => Advance::Next(next, input),
285        NextState::End => Advance::End(input),
286        NextState::Error(msg) => Advance::Fail("States.Runtime".to_string(), msg),
287    }
288}
289
290fn advance_from_error(state_def: &Value, input: &Value, error: String, cause: String) -> Advance {
291    match apply_state_catcher(state_def, input, &error, &cause) {
292        Some((next, new_input)) => Advance::Next(next, new_input),
293        None => Advance::Fail(error, cause),
294    }
295}
296
297fn run_pass_state(
298    name: &str,
299    state_def: &Value,
300    input: Value,
301    shared_state: &SharedStepFunctionsState,
302    execution_arn: &str,
303) -> Advance {
304    let entered_event_id = add_event(
305        shared_state,
306        execution_arn,
307        "PassStateEntered",
308        0,
309        json!({
310            "name": name,
311            "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
312        }),
313    );
314
315    let result = execute_pass_state(state_def, &input);
316
317    add_event(
318        shared_state,
319        execution_arn,
320        "PassStateExited",
321        entered_event_id,
322        json!({
323            "name": name,
324            "output": serde_json::to_string(&result).expect("serde_json::Value serialization is infallible"),
325        }),
326    );
327
328    advance_from_next(state_def, result)
329}
330
331fn run_succeed_state(
332    name: &str,
333    state_def: &Value,
334    input: Value,
335    shared_state: &SharedStepFunctionsState,
336    execution_arn: &str,
337) -> Advance {
338    add_event(
339        shared_state,
340        execution_arn,
341        "SucceedStateEntered",
342        0,
343        json!({
344            "name": name,
345            "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
346        }),
347    );
348
349    let input_path = state_def["InputPath"].as_str();
350    let output_path = state_def["OutputPath"].as_str();
351
352    let processed = if input_path == Some("null") {
353        json!({})
354    } else {
355        apply_input_path(&input, input_path)
356    };
357
358    let output = if output_path == Some("null") {
359        json!({})
360    } else {
361        apply_output_path(&processed, output_path)
362    };
363
364    add_event(
365        shared_state,
366        execution_arn,
367        "SucceedStateExited",
368        0,
369        json!({
370            "name": name,
371            "output": serde_json::to_string(&output).expect("serde_json::Value serialization is infallible"),
372        }),
373    );
374
375    Advance::End(output)
376}
377
378fn run_fail_state(
379    name: &str,
380    state_def: &Value,
381    input: Value,
382    shared_state: &SharedStepFunctionsState,
383    execution_arn: &str,
384) -> Advance {
385    let error = state_def["Error"]
386        .as_str()
387        .unwrap_or("States.Fail")
388        .to_string();
389    let cause = state_def["Cause"].as_str().unwrap_or("").to_string();
390
391    add_event(
392        shared_state,
393        execution_arn,
394        "FailStateEntered",
395        0,
396        json!({
397            "name": name,
398            "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
399        }),
400    );
401
402    Advance::Fail(error, cause)
403}
404
405fn run_choice_state(
406    name: &str,
407    state_def: &Value,
408    input: Value,
409    shared_state: &SharedStepFunctionsState,
410    execution_arn: &str,
411) -> Advance {
412    let entered_event_id = add_event(
413        shared_state,
414        execution_arn,
415        "ChoiceStateEntered",
416        0,
417        json!({
418            "name": name,
419            "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
420        }),
421    );
422
423    let input_path = state_def["InputPath"].as_str();
424    let processed_input = if input_path == Some("null") {
425        json!({})
426    } else {
427        apply_input_path(&input, input_path)
428    };
429
430    match evaluate_choice(state_def, &processed_input) {
431        Some(next) => {
432            add_event(
433                shared_state,
434                execution_arn,
435                "ChoiceStateExited",
436                entered_event_id,
437                json!({
438                    "name": name,
439                    "output": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
440                }),
441            );
442            Advance::Next(next, input)
443        }
444        None => Advance::Fail(
445            "States.NoChoiceMatched".to_string(),
446            format!("No choice rule matched and no Default in state '{name}'"),
447        ),
448    }
449}
450
451async fn run_wait_state(
452    name: &str,
453    state_def: &Value,
454    input: Value,
455    shared_state: &SharedStepFunctionsState,
456    execution_arn: &str,
457) -> Advance {
458    let entered_event_id = add_event(
459        shared_state,
460        execution_arn,
461        "WaitStateEntered",
462        0,
463        json!({
464            "name": name,
465            "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
466        }),
467    );
468
469    execute_wait_state(state_def, &input).await;
470
471    add_event(
472        shared_state,
473        execution_arn,
474        "WaitStateExited",
475        entered_event_id,
476        json!({
477            "name": name,
478            "output": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
479        }),
480    );
481
482    advance_from_next(state_def, input)
483}
484
485#[allow(clippy::too_many_arguments)]
486async fn run_task_state(
487    name: &str,
488    state_def: &Value,
489    input: Value,
490    delivery: &Option<Arc<DeliveryBus>>,
491    dynamodb_state: &Option<SharedDynamoDbState>,
492    shared_state: &SharedStepFunctionsState,
493    execution_arn: &str,
494) -> Advance {
495    let entered_event_id = add_event(
496        shared_state,
497        execution_arn,
498        "TaskStateEntered",
499        0,
500        json!({
501            "name": name,
502            "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
503        }),
504    );
505
506    let result = execute_task_state(
507        state_def,
508        &input,
509        delivery,
510        dynamodb_state,
511        shared_state,
512        execution_arn,
513        entered_event_id,
514    )
515    .await;
516
517    match result {
518        Ok(output) => {
519            add_event(
520                shared_state,
521                execution_arn,
522                "TaskStateExited",
523                entered_event_id,
524                json!({
525                    "name": name,
526                    "output": serde_json::to_string(&output).expect("serde_json::Value serialization is infallible"),
527                }),
528            );
529            advance_from_next(state_def, output)
530        }
531        Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
532    }
533}
534
535#[allow(clippy::too_many_arguments)]
536async fn run_parallel_state(
537    name: &str,
538    state_def: &Value,
539    input: Value,
540    delivery: &Option<Arc<DeliveryBus>>,
541    dynamodb_state: &Option<SharedDynamoDbState>,
542    shared_state: &SharedStepFunctionsState,
543    execution_arn: &str,
544) -> Advance {
545    let entered_event_id = add_event(
546        shared_state,
547        execution_arn,
548        "ParallelStateEntered",
549        0,
550        json!({
551            "name": name,
552            "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
553        }),
554    );
555
556    let result = execute_parallel_state(
557        state_def,
558        &input,
559        delivery,
560        dynamodb_state,
561        shared_state,
562        execution_arn,
563    )
564    .await;
565
566    match result {
567        Ok(output) => {
568            add_event(
569                shared_state,
570                execution_arn,
571                "ParallelStateExited",
572                entered_event_id,
573                json!({
574                    "name": name,
575                    "output": serde_json::to_string(&output).expect("serde_json::Value serialization is infallible"),
576                }),
577            );
578            advance_from_next(state_def, output)
579        }
580        Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
581    }
582}
583
584#[allow(clippy::too_many_arguments)]
585async fn run_map_state(
586    name: &str,
587    state_def: &Value,
588    input: Value,
589    delivery: &Option<Arc<DeliveryBus>>,
590    dynamodb_state: &Option<SharedDynamoDbState>,
591    shared_state: &SharedStepFunctionsState,
592    execution_arn: &str,
593) -> Advance {
594    let entered_event_id = add_event(
595        shared_state,
596        execution_arn,
597        "MapStateEntered",
598        0,
599        json!({
600            "name": name,
601            "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
602        }),
603    );
604
605    let result = execute_map_state(
606        state_def,
607        &input,
608        delivery,
609        dynamodb_state,
610        shared_state,
611        execution_arn,
612    )
613    .await;
614
615    match result {
616        Ok(output) => {
617            add_event(
618                shared_state,
619                execution_arn,
620                "MapStateExited",
621                entered_event_id,
622                json!({
623                    "name": name,
624                    "output": serde_json::to_string(&output).expect("serde_json::Value serialization is infallible"),
625                }),
626            );
627            advance_from_next(state_def, output)
628        }
629        Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
630    }
631}
632
633/// Execute a Wait state: pause execution for a specified duration or until a timestamp.
634async fn execute_wait_state(state_def: &Value, input: &Value) {
635    if let Some(seconds) = state_def["Seconds"].as_u64() {
636        tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
637        return;
638    }
639
640    if let Some(path) = state_def["SecondsPath"].as_str() {
641        let val = crate::io_processing::resolve_path(input, path);
642        if let Some(seconds) = val.as_u64() {
643            tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
644        }
645        return;
646    }
647
648    if let Some(ts_str) = state_def["Timestamp"].as_str() {
649        if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
650            let now = Utc::now();
651            let target_utc = target.with_timezone(&chrono::Utc);
652            if target_utc > now {
653                let duration = (target_utc - now).to_std().unwrap_or_default();
654                tokio::time::sleep(duration).await;
655            }
656        }
657        return;
658    }
659
660    if let Some(path) = state_def["TimestampPath"].as_str() {
661        let val = crate::io_processing::resolve_path(input, path);
662        if let Some(ts_str) = val.as_str() {
663            if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
664                let now = Utc::now();
665                let target_utc = target.with_timezone(&chrono::Utc);
666                if target_utc > now {
667                    let duration = (target_utc - now).to_std().unwrap_or_default();
668                    tokio::time::sleep(duration).await;
669                }
670            }
671        }
672        return;
673    }
674
675    warn!(
676        "Wait state has no valid Seconds, SecondsPath, Timestamp, or TimestampPath — skipping wait"
677    );
678}
679
680/// Execute a Pass state: apply InputPath, use Result if present, apply ResultPath and OutputPath.
681fn execute_pass_state(state_def: &Value, input: &Value) -> Value {
682    let input_path = state_def["InputPath"].as_str();
683    let result_path = state_def["ResultPath"].as_str();
684    let output_path = state_def["OutputPath"].as_str();
685
686    let effective_input = if input_path == Some("null") {
687        json!({})
688    } else {
689        apply_input_path(input, input_path)
690    };
691
692    let result = if let Some(r) = state_def.get("Result") {
693        r.clone()
694    } else {
695        effective_input.clone()
696    };
697
698    let after_result = if result_path == Some("null") {
699        input.clone()
700    } else {
701        apply_result_path(input, &result, result_path)
702    };
703
704    if output_path == Some("null") {
705        json!({})
706    } else {
707        apply_output_path(&after_result, output_path)
708    }
709}
710
711/// Execute a Task state: invoke the resource (Lambda, SQS, SNS, EventBridge, DynamoDB),
712/// apply I/O processing, handle Retry.
713async fn execute_task_state(
714    state_def: &Value,
715    input: &Value,
716    delivery: &Option<Arc<DeliveryBus>>,
717    dynamodb_state: &Option<SharedDynamoDbState>,
718    shared_state: &SharedStepFunctionsState,
719    execution_arn: &str,
720    entered_event_id: i64,
721) -> Result<Value, (String, String)> {
722    let resource = state_def["Resource"].as_str().unwrap_or("").to_string();
723
724    let input_path = state_def["InputPath"].as_str();
725    let result_path = state_def["ResultPath"].as_str();
726    let output_path = state_def["OutputPath"].as_str();
727
728    let effective_input = if input_path == Some("null") {
729        json!({})
730    } else {
731        apply_input_path(input, input_path)
732    };
733
734    let task_input = if let Some(params) = state_def.get("Parameters") {
735        apply_parameters(params, &effective_input)
736    } else {
737        effective_input
738    };
739
740    let retriers = state_def["Retry"].as_array().cloned().unwrap_or_default();
741    let timeout_seconds = state_def["TimeoutSeconds"].as_u64();
742    let heartbeat_seconds = state_def["HeartbeatSeconds"].as_u64();
743    let mut attempt = 0u32;
744
745    loop {
746        add_event(
747            shared_state,
748            execution_arn,
749            "TaskScheduled",
750            entered_event_id,
751            json!({
752                "resource": resource,
753                "region": "us-east-1",
754                "parameters": serde_json::to_string(&task_input).expect("serde_json::Value serialization is infallible"),
755            }),
756        );
757
758        add_event(
759            shared_state,
760            execution_arn,
761            "TaskStarted",
762            entered_event_id,
763            json!({ "resource": resource }),
764        );
765
766        let invoke_result = invoke_resource(
767            &resource,
768            &task_input,
769            delivery,
770            dynamodb_state,
771            timeout_seconds,
772            heartbeat_seconds,
773            shared_state,
774        )
775        .await;
776
777        match invoke_result {
778            Ok(result) => {
779                add_event(
780                    shared_state,
781                    execution_arn,
782                    "TaskSucceeded",
783                    entered_event_id,
784                    json!({
785                        "resource": resource,
786                        "output": serde_json::to_string(&result).expect("serde_json::Value serialization is infallible"),
787                    }),
788                );
789
790                let selected = if let Some(selector) = state_def.get("ResultSelector") {
791                    apply_parameters(selector, &result)
792                } else {
793                    result
794                };
795
796                let after_result = if result_path == Some("null") {
797                    input.clone()
798                } else {
799                    apply_result_path(input, &selected, result_path)
800                };
801
802                let output = if output_path == Some("null") {
803                    json!({})
804                } else {
805                    apply_output_path(&after_result, output_path)
806                };
807
808                return Ok(output);
809            }
810            Err((error, cause)) => {
811                add_event(
812                    shared_state,
813                    execution_arn,
814                    "TaskFailed",
815                    entered_event_id,
816                    json!({ "error": error, "cause": cause }),
817                );
818
819                if let Some(delay_ms) = should_retry(&retriers, &error, attempt) {
820                    attempt += 1;
821                    let actual_delay = delay_ms.min(5000);
822                    tokio::time::sleep(tokio::time::Duration::from_millis(actual_delay)).await;
823                    continue;
824                }
825
826                return Err((error, cause));
827            }
828        }
829    }
830}
831
832/// Execute a Parallel state: run all branches concurrently, collect results into an array.
833async fn execute_parallel_state(
834    state_def: &Value,
835    input: &Value,
836    delivery: &Option<Arc<DeliveryBus>>,
837    dynamodb_state: &Option<SharedDynamoDbState>,
838    shared_state: &SharedStepFunctionsState,
839    execution_arn: &str,
840) -> Result<Value, (String, String)> {
841    let input_path = state_def["InputPath"].as_str();
842    let result_path = state_def["ResultPath"].as_str();
843    let output_path = state_def["OutputPath"].as_str();
844
845    let effective_input = if input_path == Some("null") {
846        json!({})
847    } else {
848        apply_input_path(input, input_path)
849    };
850
851    let branches = state_def["Branches"]
852        .as_array()
853        .cloned()
854        .unwrap_or_default();
855
856    if branches.is_empty() {
857        return Err((
858            "States.Runtime".to_string(),
859            "Parallel state has no Branches".to_string(),
860        ));
861    }
862
863    // Spawn all branches concurrently
864    let mut handles = Vec::new();
865    for branch_def in &branches {
866        let branch = branch_def.clone();
867        let branch_input = effective_input.clone();
868        let delivery = delivery.clone();
869        let ddb = dynamodb_state.clone();
870        let state = shared_state.clone();
871        let arn = execution_arn.to_string();
872
873        handles.push(tokio::spawn(async move {
874            run_states(&branch, branch_input, &delivery, &ddb, &state, &arn).await
875        }));
876    }
877
878    // Collect results in order
879    let mut results = Vec::with_capacity(handles.len());
880    for handle in handles {
881        let result = handle.await.map_err(|e| {
882            (
883                "States.Runtime".to_string(),
884                format!("Branch execution panicked: {e}"),
885            )
886        })??;
887        results.push(result);
888    }
889
890    let branch_output = Value::Array(results);
891
892    // Apply ResultSelector if present
893    let selected = if let Some(selector) = state_def.get("ResultSelector") {
894        apply_parameters(selector, &branch_output)
895    } else {
896        branch_output
897    };
898
899    // Apply ResultPath
900    let after_result = if result_path == Some("null") {
901        input.clone()
902    } else {
903        apply_result_path(input, &selected, result_path)
904    };
905
906    // Apply OutputPath
907    let output = if output_path == Some("null") {
908        json!({})
909    } else {
910        apply_output_path(&after_result, output_path)
911    };
912
913    Ok(output)
914}
915
916/// Execute a Map state: iterate over an array and run a sub-state machine per item.
917async fn execute_map_state(
918    state_def: &Value,
919    input: &Value,
920    delivery: &Option<Arc<DeliveryBus>>,
921    dynamodb_state: &Option<SharedDynamoDbState>,
922    shared_state: &SharedStepFunctionsState,
923    execution_arn: &str,
924) -> Result<Value, (String, String)> {
925    let input_path = state_def["InputPath"].as_str();
926    let result_path = state_def["ResultPath"].as_str();
927    let output_path = state_def["OutputPath"].as_str();
928
929    let effective_input = if input_path == Some("null") {
930        json!({})
931    } else {
932        apply_input_path(input, input_path)
933    };
934
935    // Get the items to iterate over
936    let items_path = state_def["ItemsPath"].as_str().unwrap_or("$");
937    let items_value = crate::io_processing::resolve_path(&effective_input, items_path);
938    let items = items_value.as_array().cloned().unwrap_or_default();
939
940    // Get the iterator definition (ItemProcessor or Iterator for backwards compat)
941    let iterator_def = state_def
942        .get("ItemProcessor")
943        .or_else(|| state_def.get("Iterator"))
944        .cloned()
945        .ok_or_else(|| {
946            (
947                "States.Runtime".to_string(),
948                "Map state has no ItemProcessor or Iterator".to_string(),
949            )
950        })?;
951
952    let max_concurrency = state_def["MaxConcurrency"].as_u64().unwrap_or(0);
953    let effective_concurrency = if max_concurrency == 0 {
954        40
955    } else {
956        max_concurrency as usize
957    };
958
959    let semaphore = Arc::new(tokio::sync::Semaphore::new(effective_concurrency));
960
961    // Process all items
962    let mut handles = Vec::new();
963    for (index, item) in items.into_iter().enumerate() {
964        let iter_def = iterator_def.clone();
965        let delivery = delivery.clone();
966        let ddb = dynamodb_state.clone();
967        let state = shared_state.clone();
968        let arn = execution_arn.to_string();
969        let sem = semaphore.clone();
970
971        // Apply ItemSelector if present
972        let item_input = if let Some(selector) = state_def.get("ItemSelector") {
973            let mut ctx = serde_json::Map::new();
974            ctx.insert("value".to_string(), item.clone());
975            ctx.insert("index".to_string(), json!(index));
976            apply_parameters(selector, &Value::Object(ctx))
977        } else {
978            item
979        };
980
981        add_event(
982            shared_state,
983            execution_arn,
984            "MapIterationStarted",
985            0,
986            json!({ "index": index }),
987        );
988
989        handles.push(tokio::spawn(async move {
990            let _permit = sem
991                .acquire()
992                .await
993                .map_err(|_| ("States.Runtime".to_string(), "Semaphore closed".to_string()))?;
994            let result = run_states(&iter_def, item_input, &delivery, &ddb, &state, &arn).await;
995            Ok::<(usize, Result<Value, (String, String)>), (String, String)>((index, result))
996        }));
997    }
998
999    // Collect results in order
1000    let mut results: Vec<(usize, Value)> = Vec::with_capacity(handles.len());
1001    for handle in handles {
1002        let (index, result) = handle.await.map_err(|e| {
1003            (
1004                "States.Runtime".to_string(),
1005                format!("Map iteration panicked: {e}"),
1006            )
1007        })??;
1008
1009        match result {
1010            Ok(output) => {
1011                add_event(
1012                    shared_state,
1013                    execution_arn,
1014                    "MapIterationSucceeded",
1015                    0,
1016                    json!({ "index": index }),
1017                );
1018                results.push((index, output));
1019            }
1020            Err((error, cause)) => {
1021                add_event(
1022                    shared_state,
1023                    execution_arn,
1024                    "MapIterationFailed",
1025                    0,
1026                    json!({ "index": index, "error": error }),
1027                );
1028                return Err((error, cause));
1029            }
1030        }
1031    }
1032
1033    // Sort by index to maintain order
1034    results.sort_by_key(|(i, _)| *i);
1035    let map_output = Value::Array(results.into_iter().map(|(_, v)| v).collect());
1036
1037    // Apply ResultSelector if present
1038    let selected = if let Some(selector) = state_def.get("ResultSelector") {
1039        apply_parameters(selector, &map_output)
1040    } else {
1041        map_output
1042    };
1043
1044    // Apply ResultPath
1045    let after_result = if result_path == Some("null") {
1046        input.clone()
1047    } else {
1048        apply_result_path(input, &selected, result_path)
1049    };
1050
1051    // Apply OutputPath
1052    let output = if output_path == Some("null") {
1053        json!({})
1054    } else {
1055        apply_output_path(&after_result, output_path)
1056    };
1057
1058    Ok(output)
1059}
1060
1061/// Invoke a resource (Lambda function or SDK integration).
1062#[allow(clippy::too_many_arguments)]
1063async fn invoke_resource(
1064    resource: &str,
1065    input: &Value,
1066    delivery: &Option<Arc<DeliveryBus>>,
1067    dynamodb_state: &Option<SharedDynamoDbState>,
1068    timeout_seconds: Option<u64>,
1069    heartbeat_seconds: Option<u64>,
1070    shared_state: &SharedStepFunctionsState,
1071) -> Result<Value, (String, String)> {
1072    // Direct activity ARN: arn:aws:states:<region>:<account>:activity:<name>
1073    if resource.contains(":states:") && resource.contains(":activity:") {
1074        return invoke_activity(
1075            resource,
1076            input,
1077            shared_state,
1078            timeout_seconds,
1079            heartbeat_seconds,
1080        )
1081        .await;
1082    }
1083
1084    // Direct Lambda ARN: arn:aws:lambda:<region>:<account>:function:<name>
1085    if resource.contains(":lambda:") && resource.contains(":function:") {
1086        return invoke_lambda_direct(resource, input, delivery, timeout_seconds).await;
1087    }
1088
1089    // SDK integration patterns: arn:aws:states:::<service>:<action>
1090    if resource.starts_with("arn:aws:states:::lambda:invoke") {
1091        let function_name = input["FunctionName"].as_str().unwrap_or("");
1092        let payload = if let Some(p) = input.get("Payload") {
1093            p.clone()
1094        } else {
1095            input.clone()
1096        };
1097        return invoke_lambda_direct(function_name, &payload, delivery, timeout_seconds).await;
1098    }
1099
1100    if resource.starts_with("arn:aws:states:::sqs:sendMessage") {
1101        return invoke_sqs_send_message(input, delivery);
1102    }
1103
1104    if resource.starts_with("arn:aws:states:::sns:publish") {
1105        return invoke_sns_publish(input, delivery);
1106    }
1107
1108    if resource.starts_with("arn:aws:states:::events:putEvents") {
1109        return invoke_eventbridge_put_events(input, delivery);
1110    }
1111
1112    if resource.starts_with("arn:aws:states:::dynamodb:getItem") {
1113        return invoke_dynamodb_get_item(input, dynamodb_state);
1114    }
1115
1116    if resource.starts_with("arn:aws:states:::dynamodb:putItem") {
1117        return invoke_dynamodb_put_item(input, dynamodb_state);
1118    }
1119
1120    if resource.starts_with("arn:aws:states:::dynamodb:deleteItem") {
1121        return invoke_dynamodb_delete_item(input, dynamodb_state);
1122    }
1123
1124    if resource.starts_with("arn:aws:states:::dynamodb:updateItem") {
1125        return invoke_dynamodb_update_item(input, dynamodb_state);
1126    }
1127
1128    Err((
1129        "States.TaskFailed".to_string(),
1130        format!("Unsupported resource: {resource}"),
1131    ))
1132}
1133
1134/// Send a message to an SQS queue via DeliveryBus.
1135fn invoke_sqs_send_message(
1136    input: &Value,
1137    delivery: &Option<Arc<DeliveryBus>>,
1138) -> Result<Value, (String, String)> {
1139    let delivery = delivery.as_ref().ok_or_else(|| {
1140        (
1141            "States.TaskFailed".to_string(),
1142            "No delivery bus configured for SQS".to_string(),
1143        )
1144    })?;
1145
1146    let queue_url = input["QueueUrl"].as_str().ok_or_else(|| {
1147        (
1148            "States.TaskFailed".to_string(),
1149            "Missing QueueUrl in SQS sendMessage parameters".to_string(),
1150        )
1151    })?;
1152
1153    let message_body = input["MessageBody"]
1154        .as_str()
1155        .map(|s| s.to_string())
1156        .unwrap_or_else(|| {
1157            // If MessageBody is not a string, serialize the value
1158            serde_json::to_string(&input["MessageBody"])
1159                .expect("serde_json::Value serialization is infallible")
1160        });
1161
1162    // Convert QueueUrl to ARN format for the delivery bus
1163    // QueueUrl format: http://.../<account>/<queue-name>
1164    // ARN format: arn:aws:sqs:<region>:<account>:<queue-name>
1165    let queue_arn = queue_url_to_arn(queue_url);
1166
1167    delivery.send_to_sqs(&queue_arn, &message_body, &HashMap::new());
1168
1169    Ok(json!({
1170        "MessageId": uuid::Uuid::new_v4().to_string(),
1171        "MD5OfMessageBody": md5_hex(&message_body),
1172    }))
1173}
1174
1175/// Publish a message to an SNS topic via DeliveryBus.
1176fn invoke_sns_publish(
1177    input: &Value,
1178    delivery: &Option<Arc<DeliveryBus>>,
1179) -> Result<Value, (String, String)> {
1180    let delivery = delivery.as_ref().ok_or_else(|| {
1181        (
1182            "States.TaskFailed".to_string(),
1183            "No delivery bus configured for SNS".to_string(),
1184        )
1185    })?;
1186
1187    let topic_arn = input["TopicArn"].as_str().ok_or_else(|| {
1188        (
1189            "States.TaskFailed".to_string(),
1190            "Missing TopicArn in SNS publish parameters".to_string(),
1191        )
1192    })?;
1193
1194    let message = input["Message"]
1195        .as_str()
1196        .map(|s| s.to_string())
1197        .unwrap_or_else(|| {
1198            serde_json::to_string(&input["Message"])
1199                .expect("serde_json::Value serialization is infallible")
1200        });
1201
1202    let subject = input["Subject"].as_str();
1203
1204    delivery.publish_to_sns(topic_arn, &message, subject);
1205
1206    Ok(json!({
1207        "MessageId": uuid::Uuid::new_v4().to_string(),
1208    }))
1209}
1210
1211/// Put events onto an EventBridge bus via DeliveryBus.
1212fn invoke_eventbridge_put_events(
1213    input: &Value,
1214    delivery: &Option<Arc<DeliveryBus>>,
1215) -> Result<Value, (String, String)> {
1216    let delivery = delivery.as_ref().ok_or_else(|| {
1217        (
1218            "States.TaskFailed".to_string(),
1219            "No delivery bus configured for EventBridge".to_string(),
1220        )
1221    })?;
1222
1223    let entries = input["Entries"]
1224        .as_array()
1225        .ok_or_else(|| {
1226            (
1227                "States.TaskFailed".to_string(),
1228                "Missing Entries in EventBridge putEvents parameters".to_string(),
1229            )
1230        })?
1231        .clone();
1232
1233    let mut event_ids = Vec::new();
1234    for entry in &entries {
1235        let source = entry["Source"].as_str().unwrap_or("aws.stepfunctions");
1236        let detail_type = entry["DetailType"].as_str().unwrap_or("StepFunctionsEvent");
1237        let detail = entry["Detail"]
1238            .as_str()
1239            .map(|s| s.to_string())
1240            .unwrap_or_else(|| {
1241                serde_json::to_string(&entry["Detail"])
1242                    .expect("serde_json::Value serialization is infallible")
1243            });
1244        let bus_name = entry["EventBusName"].as_str().unwrap_or("default");
1245
1246        delivery.put_event_to_eventbridge(source, detail_type, &detail, bus_name);
1247        event_ids.push(uuid::Uuid::new_v4().to_string());
1248    }
1249
1250    Ok(json!({
1251        "Entries": event_ids.iter().map(|id| json!({"EventId": id})).collect::<Vec<_>>(),
1252        "FailedEntryCount": 0,
1253    }))
1254}
1255
1256/// Get an item from DynamoDB via direct state access.
1257fn invoke_dynamodb_get_item(
1258    input: &Value,
1259    dynamodb_state: &Option<SharedDynamoDbState>,
1260) -> Result<Value, (String, String)> {
1261    let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1262        (
1263            "States.TaskFailed".to_string(),
1264            "No DynamoDB state configured".to_string(),
1265        )
1266    })?;
1267
1268    let table_name = input["TableName"].as_str().ok_or_else(|| {
1269        (
1270            "States.TaskFailed".to_string(),
1271            "Missing TableName in DynamoDB getItem parameters".to_string(),
1272        )
1273    })?;
1274
1275    let key = input
1276        .get("Key")
1277        .and_then(|k| k.as_object())
1278        .ok_or_else(|| {
1279            (
1280                "States.TaskFailed".to_string(),
1281                "Missing Key in DynamoDB getItem parameters".to_string(),
1282            )
1283        })?;
1284
1285    let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1286
1287    let __mas = ddb.read();
1288    let state = __mas.default_ref();
1289    let table = state.tables.get(table_name).ok_or_else(|| {
1290        (
1291            "States.TaskFailed".to_string(),
1292            format!("Table '{table_name}' not found"),
1293        )
1294    })?;
1295
1296    let item = table
1297        .find_item_index(&key_map)
1298        .map(|idx| table.items[idx].clone());
1299
1300    match item {
1301        Some(item_map) => {
1302            let item_value: serde_json::Map<String, Value> = item_map.into_iter().collect();
1303            Ok(json!({ "Item": item_value }))
1304        }
1305        None => Ok(json!({})),
1306    }
1307}
1308
1309/// Put an item into DynamoDB via direct state access.
1310fn invoke_dynamodb_put_item(
1311    input: &Value,
1312    dynamodb_state: &Option<SharedDynamoDbState>,
1313) -> Result<Value, (String, String)> {
1314    let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1315        (
1316            "States.TaskFailed".to_string(),
1317            "No DynamoDB state configured".to_string(),
1318        )
1319    })?;
1320
1321    let table_name = input["TableName"].as_str().ok_or_else(|| {
1322        (
1323            "States.TaskFailed".to_string(),
1324            "Missing TableName in DynamoDB putItem parameters".to_string(),
1325        )
1326    })?;
1327
1328    let item = input
1329        .get("Item")
1330        .and_then(|i| i.as_object())
1331        .ok_or_else(|| {
1332            (
1333                "States.TaskFailed".to_string(),
1334                "Missing Item in DynamoDB putItem parameters".to_string(),
1335            )
1336        })?;
1337
1338    let item_map: HashMap<String, Value> =
1339        item.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1340
1341    let mut __mas = ddb.write();
1342    let state = __mas.default_mut();
1343    let table = state.tables.get_mut(table_name).ok_or_else(|| {
1344        (
1345            "States.TaskFailed".to_string(),
1346            format!("Table '{table_name}' not found"),
1347        )
1348    })?;
1349
1350    // Replace existing item with same key, or insert new
1351    if let Some(idx) = table.find_item_index(&item_map) {
1352        table.items[idx] = item_map;
1353    } else {
1354        table.items.push(item_map);
1355    }
1356
1357    Ok(json!({}))
1358}
1359
1360/// Delete an item from DynamoDB via direct state access.
1361fn invoke_dynamodb_delete_item(
1362    input: &Value,
1363    dynamodb_state: &Option<SharedDynamoDbState>,
1364) -> Result<Value, (String, String)> {
1365    let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1366        (
1367            "States.TaskFailed".to_string(),
1368            "No DynamoDB state configured".to_string(),
1369        )
1370    })?;
1371
1372    let table_name = input["TableName"].as_str().ok_or_else(|| {
1373        (
1374            "States.TaskFailed".to_string(),
1375            "Missing TableName in DynamoDB deleteItem parameters".to_string(),
1376        )
1377    })?;
1378
1379    let key = input
1380        .get("Key")
1381        .and_then(|k| k.as_object())
1382        .ok_or_else(|| {
1383            (
1384                "States.TaskFailed".to_string(),
1385                "Missing Key in DynamoDB deleteItem parameters".to_string(),
1386            )
1387        })?;
1388
1389    let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1390
1391    let mut __mas = ddb.write();
1392    let state = __mas.default_mut();
1393    let table = state.tables.get_mut(table_name).ok_or_else(|| {
1394        (
1395            "States.TaskFailed".to_string(),
1396            format!("Table '{table_name}' not found"),
1397        )
1398    })?;
1399
1400    if let Some(idx) = table.find_item_index(&key_map) {
1401        table.items.remove(idx);
1402    }
1403
1404    Ok(json!({}))
1405}
1406
1407/// Update an item in DynamoDB via direct state access. Honors UpdateExpression
1408/// SET (with `=`, `+`, `-`, `if_not_exists`), REMOVE, ADD (numeric), and
1409/// DELETE (set elements). Creates the item from `Key` plus the expression
1410/// when no matching item exists, mirroring DynamoDB upsert semantics.
1411fn invoke_dynamodb_update_item(
1412    input: &Value,
1413    dynamodb_state: &Option<SharedDynamoDbState>,
1414) -> Result<Value, (String, String)> {
1415    let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1416        (
1417            "States.TaskFailed".to_string(),
1418            "No DynamoDB state configured".to_string(),
1419        )
1420    })?;
1421
1422    let table_name = input["TableName"].as_str().ok_or_else(|| {
1423        (
1424            "States.TaskFailed".to_string(),
1425            "Missing TableName in DynamoDB updateItem parameters".to_string(),
1426        )
1427    })?;
1428
1429    let key = input
1430        .get("Key")
1431        .and_then(|k| k.as_object())
1432        .ok_or_else(|| {
1433            (
1434                "States.TaskFailed".to_string(),
1435                "Missing Key in DynamoDB updateItem parameters".to_string(),
1436            )
1437        })?;
1438
1439    let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1440
1441    let mut __mas = ddb.write();
1442    let state = __mas.default_mut();
1443    let table = state.tables.get_mut(table_name).ok_or_else(|| {
1444        (
1445            "States.TaskFailed".to_string(),
1446            format!("Table '{table_name}' not found"),
1447        )
1448    })?;
1449
1450    // Parse UpdateExpression to apply SET operations
1451    if let Some(update_expr) = input["UpdateExpression"].as_str() {
1452        let attr_values = input
1453            .get("ExpressionAttributeValues")
1454            .and_then(|v| v.as_object())
1455            .cloned()
1456            .unwrap_or_default();
1457        let attr_names = input
1458            .get("ExpressionAttributeNames")
1459            .and_then(|v| v.as_object())
1460            .cloned()
1461            .unwrap_or_default();
1462
1463        if let Some(idx) = table.find_item_index(&key_map) {
1464            apply_update_expression(
1465                &mut table.items[idx],
1466                update_expr,
1467                &attr_values,
1468                &attr_names,
1469            );
1470        } else {
1471            // Create new item with key + update expression values
1472            let mut new_item = key_map;
1473            apply_update_expression(&mut new_item, update_expr, &attr_values, &attr_names);
1474            table.items.push(new_item);
1475        }
1476    }
1477
1478    Ok(json!({}))
1479}
1480
1481/// Apply a simple SET UpdateExpression to an item.
1482fn apply_update_expression(
1483    item: &mut HashMap<String, Value>,
1484    expr: &str,
1485    attr_values: &serde_json::Map<String, Value>,
1486    attr_names: &serde_json::Map<String, Value>,
1487) {
1488    // DynamoDB UpdateExpression has up to four clauses: SET, REMOVE, ADD, DELETE.
1489    // The clauses are separated by whitespace; we tokenize by walking the string
1490    // and switching mode whenever we hit a keyword, then split the body of each
1491    // clause on commas.
1492    let clauses = split_update_clauses(expr);
1493    for (clause, body) in clauses {
1494        match clause {
1495            UpdateClause::Set => apply_set(item, &body, attr_values, attr_names),
1496            UpdateClause::Remove => apply_remove(item, &body, attr_names),
1497            UpdateClause::Add => apply_add(item, &body, attr_values, attr_names),
1498            UpdateClause::Delete => apply_delete(item, &body, attr_values, attr_names),
1499        }
1500    }
1501}
1502
1503#[derive(Clone, Copy)]
1504enum UpdateClause {
1505    Set,
1506    Remove,
1507    Add,
1508    Delete,
1509}
1510
1511fn split_update_clauses(expr: &str) -> Vec<(UpdateClause, String)> {
1512    let mut out = Vec::new();
1513    let mut current: Option<UpdateClause> = None;
1514    let mut buf = String::new();
1515    for token in expr.split_whitespace() {
1516        let upper = token.to_ascii_uppercase();
1517        let next_clause = match upper.as_str() {
1518            "SET" => Some(UpdateClause::Set),
1519            "REMOVE" => Some(UpdateClause::Remove),
1520            "ADD" => Some(UpdateClause::Add),
1521            "DELETE" => Some(UpdateClause::Delete),
1522            _ => None,
1523        };
1524        if let Some(nc) = next_clause {
1525            if let Some(prev) = current.take() {
1526                out.push((prev, buf.trim().to_string()));
1527                buf.clear();
1528            }
1529            current = Some(nc);
1530        } else if current.is_some() {
1531            if !buf.is_empty() {
1532                buf.push(' ');
1533            }
1534            buf.push_str(token);
1535        }
1536    }
1537    if let Some(c) = current {
1538        out.push((c, buf.trim().to_string()));
1539    }
1540    out
1541}
1542
1543fn resolve_attr_name(token: &str, attr_names: &serde_json::Map<String, Value>) -> String {
1544    if token.starts_with('#') {
1545        attr_names
1546            .get(token)
1547            .and_then(|v| v.as_str())
1548            .unwrap_or(token)
1549            .to_string()
1550    } else {
1551        token.to_string()
1552    }
1553}
1554
1555fn apply_set(
1556    item: &mut HashMap<String, Value>,
1557    body: &str,
1558    attr_values: &serde_json::Map<String, Value>,
1559    attr_names: &serde_json::Map<String, Value>,
1560) {
1561    for assignment in split_top_commas(body) {
1562        let Some((lhs, rhs)) = assignment.split_once('=') else {
1563            continue;
1564        };
1565        let attr_name = resolve_attr_name(lhs.trim(), attr_names);
1566        let value = evaluate_set_rhs(rhs.trim(), &attr_name, item, attr_values, attr_names);
1567        if let Some(v) = value {
1568            item.insert(attr_name, v);
1569        }
1570    }
1571}
1572
1573fn evaluate_set_rhs(
1574    rhs: &str,
1575    attr_name: &str,
1576    item: &HashMap<String, Value>,
1577    attr_values: &serde_json::Map<String, Value>,
1578    attr_names: &serde_json::Map<String, Value>,
1579) -> Option<Value> {
1580    // if_not_exists(path, :val)
1581    if let Some(args) = rhs
1582        .strip_prefix("if_not_exists(")
1583        .and_then(|s| s.strip_suffix(')'))
1584    {
1585        let parts: Vec<&str> = args.splitn(2, ',').collect();
1586        if parts.len() == 2 {
1587            let path = resolve_attr_name(parts[0].trim(), attr_names);
1588            if item.contains_key(&path) {
1589                return item.get(&path).cloned();
1590            }
1591            return resolve_value(parts[1].trim(), attr_values);
1592        }
1593        return None;
1594    }
1595    // path + :inc / path - :dec — DynamoDB stores numbers as {"N":"<str>"}.
1596    for op in ['+', '-'] {
1597        if let Some((left, right)) = split_top_op(rhs, op) {
1598            let left = left.trim();
1599            let right = right.trim();
1600            let left_val = if left.starts_with(':') {
1601                resolve_value(left, attr_values)
1602            } else {
1603                let name = resolve_attr_name(left, attr_names);
1604                item.get(&name).cloned()
1605            };
1606            let right_val = if right.starts_with(':') {
1607                resolve_value(right, attr_values)
1608            } else {
1609                let name = resolve_attr_name(right, attr_names);
1610                item.get(&name).cloned()
1611            };
1612            return arithmetic(left_val.as_ref(), op, right_val.as_ref());
1613        }
1614    }
1615    // bare value or attribute reference
1616    if rhs.starts_with(':') {
1617        return resolve_value(rhs, attr_values);
1618    }
1619    if rhs.starts_with('#') {
1620        let _ = attr_name;
1621        let name = resolve_attr_name(rhs, attr_names);
1622        return item.get(&name).cloned();
1623    }
1624    None
1625}
1626
1627fn arithmetic(left: Option<&Value>, op: char, right: Option<&Value>) -> Option<Value> {
1628    let lf = number_from_dynamo(left?)?;
1629    let rf = number_from_dynamo(right?)?;
1630    let out = match op {
1631        '+' => lf + rf,
1632        '-' => lf - rf,
1633        _ => return None,
1634    };
1635    Some(json!({ "N": format_number(out) }))
1636}
1637
1638fn number_from_dynamo(v: &Value) -> Option<f64> {
1639    v.get("N")?.as_str()?.parse().ok()
1640}
1641
1642fn format_number(n: f64) -> String {
1643    // i64::MAX is 2^63-1 which is not exactly representable in f64; `i64::MAX as f64`
1644    // rounds up to 2^63, and casting 2^63 back to i64 saturates. Use an exclusive upper
1645    // bound so we never hand `n as i64` a value it can't faithfully represent.
1646    if n.fract() == 0.0 && n.is_finite() && n >= i64::MIN as f64 && n < i64::MAX as f64 {
1647        format!("{}", n as i64)
1648    } else {
1649        format!("{n}")
1650    }
1651}
1652
1653fn resolve_value(token: &str, attr_values: &serde_json::Map<String, Value>) -> Option<Value> {
1654    attr_values.get(token).cloned()
1655}
1656
1657fn apply_remove(
1658    item: &mut HashMap<String, Value>,
1659    body: &str,
1660    attr_names: &serde_json::Map<String, Value>,
1661) {
1662    for path in split_top_commas(body) {
1663        let name = resolve_attr_name(path.trim(), attr_names);
1664        item.remove(&name);
1665    }
1666}
1667
1668fn apply_add(
1669    item: &mut HashMap<String, Value>,
1670    body: &str,
1671    attr_values: &serde_json::Map<String, Value>,
1672    attr_names: &serde_json::Map<String, Value>,
1673) {
1674    // ADD #path :inc — numeric increment, with the value initialized to :inc when
1675    // the attribute is absent. Set union (NS/SS/BS) is not implemented; ADD on a
1676    // non-numeric attribute is a no-op.
1677    for clause in split_top_commas(body) {
1678        let mut parts = clause.split_whitespace();
1679        let Some(path) = parts.next() else { continue };
1680        let Some(value_ref) = parts.next() else {
1681            continue;
1682        };
1683        let attr_name = resolve_attr_name(path, attr_names);
1684        let Some(delta) = resolve_value(value_ref, attr_values) else {
1685            continue;
1686        };
1687        let current = item.get(&attr_name).cloned();
1688        let next = match (current.as_ref(), &delta) {
1689            (None, _) => delta.clone(),
1690            (Some(cur), _) => arithmetic(Some(cur), '+', Some(&delta)).unwrap_or(delta.clone()),
1691        };
1692        item.insert(attr_name, next);
1693    }
1694}
1695
1696fn apply_delete(
1697    item: &mut HashMap<String, Value>,
1698    body: &str,
1699    attr_values: &serde_json::Map<String, Value>,
1700    attr_names: &serde_json::Map<String, Value>,
1701) {
1702    // DELETE #path :elements — remove each element of the set value from the
1703    // attribute's set. Drops the attribute when the resulting set is empty.
1704    for clause in split_top_commas(body) {
1705        let mut parts = clause.split_whitespace();
1706        let Some(path) = parts.next() else { continue };
1707        let Some(value_ref) = parts.next() else {
1708            continue;
1709        };
1710        let attr_name = resolve_attr_name(path, attr_names);
1711        let Some(elements) = resolve_value(value_ref, attr_values) else {
1712            continue;
1713        };
1714        let Some(current) = item.get_mut(&attr_name) else {
1715            continue;
1716        };
1717        for set_kind in ["SS", "NS", "BS"] {
1718            if let (Some(cur_arr), Some(rem_arr)) = (
1719                current.get_mut(set_kind).and_then(|v| v.as_array_mut()),
1720                elements.get(set_kind).and_then(|v| v.as_array()),
1721            ) {
1722                let to_remove: std::collections::HashSet<String> = rem_arr
1723                    .iter()
1724                    .filter_map(|v| v.as_str().map(String::from))
1725                    .collect();
1726                cur_arr.retain(|v| !v.as_str().is_some_and(|s| to_remove.contains(s)));
1727                if cur_arr.is_empty() {
1728                    item.remove(&attr_name);
1729                }
1730                break;
1731            }
1732        }
1733    }
1734}
1735
1736fn split_top_commas(s: &str) -> Vec<String> {
1737    // Splits on `,` while respecting paren depth (so commas inside
1738    // `if_not_exists(a, :b)` don't split the assignment).
1739    let mut out = Vec::new();
1740    let mut depth = 0i32;
1741    let mut buf = String::new();
1742    for c in s.chars() {
1743        match c {
1744            '(' => {
1745                depth += 1;
1746                buf.push(c);
1747            }
1748            ')' => {
1749                depth -= 1;
1750                buf.push(c);
1751            }
1752            ',' if depth == 0 => {
1753                out.push(std::mem::take(&mut buf).trim().to_string());
1754            }
1755            _ => buf.push(c),
1756        }
1757    }
1758    if !buf.trim().is_empty() {
1759        out.push(buf.trim().to_string());
1760    }
1761    out
1762}
1763
1764fn split_top_op(s: &str, op: char) -> Option<(&str, &str)> {
1765    let mut depth = 0i32;
1766    for (i, c) in s.char_indices() {
1767        match c {
1768            '(' => depth += 1,
1769            ')' => depth -= 1,
1770            c if c == op && depth == 0 && i > 0 => {
1771                return Some((&s[..i], &s[i + c.len_utf8()..]));
1772            }
1773            _ => {}
1774        }
1775    }
1776    None
1777}
1778
1779/// Convert an SQS queue URL to an ARN.
1780/// QueueUrl format: http://localhost:4566/123456789012/my-queue
1781fn queue_url_to_arn(url: &str) -> String {
1782    let parts: Vec<&str> = url.rsplitn(3, '/').collect();
1783    if parts.len() >= 2 {
1784        let queue_name = parts[0];
1785        let account_id = parts[1];
1786        Arn::new("sqs", "us-east-1", account_id, queue_name).to_string()
1787    } else {
1788        url.to_string()
1789    }
1790}
1791
1792/// Compute MD5 hex digest for SQS message response format.
1793fn md5_hex(data: &str) -> String {
1794    use md5::Digest;
1795    let result = md5::Md5::digest(data.as_bytes());
1796    format!("{result:032x}")
1797}
1798
1799/// Invoke a Lambda function directly via DeliveryBus.
1800async fn invoke_lambda_direct(
1801    function_arn: &str,
1802    input: &Value,
1803    delivery: &Option<Arc<DeliveryBus>>,
1804    timeout_seconds: Option<u64>,
1805) -> Result<Value, (String, String)> {
1806    let delivery = delivery.as_ref().ok_or_else(|| {
1807        (
1808            "States.TaskFailed".to_string(),
1809            "No delivery bus configured for Lambda invocation".to_string(),
1810        )
1811    })?;
1812
1813    let payload =
1814        serde_json::to_string(input).expect("serde_json::Value serialization is infallible");
1815
1816    let invoke_future = delivery.invoke_lambda(function_arn, &payload);
1817
1818    let result = if let Some(timeout) = timeout_seconds {
1819        match tokio::time::timeout(tokio::time::Duration::from_secs(timeout), invoke_future).await {
1820            Ok(r) => r,
1821            Err(_) => {
1822                return Err((
1823                    "States.Timeout".to_string(),
1824                    format!("Task timed out after {timeout} seconds"),
1825                ));
1826            }
1827        }
1828    } else {
1829        invoke_future.await
1830    };
1831
1832    match result {
1833        Some(Ok(bytes)) => {
1834            let response_str = String::from_utf8_lossy(&bytes);
1835            let value: Value =
1836                serde_json::from_str(&response_str).unwrap_or(json!(response_str.to_string()));
1837            Ok(value)
1838        }
1839        Some(Err(e)) => Err(("States.TaskFailed".to_string(), e)),
1840        None => {
1841            // No runtime available — return empty result
1842            Ok(json!({}))
1843        }
1844    }
1845}
1846
1847/// Invoke an activity worker. Inserts a `PENDING` token into shared state
1848/// so a worker can claim it via `GetActivityTask`, then polls until the
1849/// worker calls `SendTaskSuccess` / `SendTaskFailure` or the heartbeat /
1850/// timeout windows expire.
1851async fn invoke_activity(
1852    activity_arn: &str,
1853    input: &Value,
1854    shared_state: &SharedStepFunctionsState,
1855    timeout_seconds: Option<u64>,
1856    heartbeat_seconds: Option<u64>,
1857) -> Result<Value, (String, String)> {
1858    use crate::state::TaskTokenState;
1859
1860    // Activity must exist (look up across accounts via ARN segment).
1861    let activity_account = activity_arn.split(':').nth(4).unwrap_or("").to_string();
1862    {
1863        let accounts = shared_state.read();
1864        let exists = accounts
1865            .get(&activity_account)
1866            .map(|s| s.activities.contains_key(activity_arn))
1867            .unwrap_or(false);
1868        if !exists {
1869            return Err((
1870                "States.TaskFailed".to_string(),
1871                format!("Activity does not exist: {activity_arn}"),
1872            ));
1873        }
1874    }
1875
1876    let token = format!(
1877        "FCToken-{}-{}",
1878        chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0),
1879        uuid::Uuid::new_v4().simple(),
1880    );
1881    let now = chrono::Utc::now();
1882    let input_str =
1883        serde_json::to_string(input).expect("serde_json::Value serialization is infallible");
1884    {
1885        let mut accounts = shared_state.write();
1886        let state = accounts.get_or_create(&activity_account);
1887        state.task_tokens.insert(
1888            token.clone(),
1889            TaskTokenState {
1890                activity_arn: activity_arn.to_string(),
1891                status: "PENDING".to_string(),
1892                output: None,
1893                error: None,
1894                cause: None,
1895                input: Some(input_str),
1896                created_at: now,
1897                last_heartbeat_at: None,
1898                heartbeat_seconds: heartbeat_seconds.map(|s| s as i64),
1899                timeout_seconds: timeout_seconds.map(|s| s as i64),
1900            },
1901        );
1902    }
1903
1904    // Poll for completion. Default poll cadence 200ms; 1 hour absolute
1905    // ceiling so a stuck activity can't block the interpreter forever
1906    // when no TimeoutSeconds is set on the Task state.
1907    let absolute_deadline =
1908        std::time::Instant::now() + std::time::Duration::from_secs(timeout_seconds.unwrap_or(3600));
1909    loop {
1910        let now_ts = chrono::Utc::now();
1911        let snapshot = {
1912            let accounts = shared_state.read();
1913            accounts
1914                .get(&activity_account)
1915                .and_then(|s| s.task_tokens.get(&token).cloned())
1916        };
1917        let Some(entry) = snapshot else {
1918            return Err((
1919                "States.TaskFailed".to_string(),
1920                "Activity task token disappeared".to_string(),
1921            ));
1922        };
1923        match entry.status.as_str() {
1924            "SUCCEEDED" => {
1925                cleanup_token(shared_state, &activity_account, &token);
1926                let output = entry.output.unwrap_or_else(|| "{}".to_string());
1927                let value: Value = serde_json::from_str(&output).unwrap_or(Value::String(output));
1928                return Ok(value);
1929            }
1930            "FAILED" => {
1931                cleanup_token(shared_state, &activity_account, &token);
1932                return Err((
1933                    entry
1934                        .error
1935                        .unwrap_or_else(|| "States.TaskFailed".to_string()),
1936                    entry.cause.unwrap_or_default(),
1937                ));
1938            }
1939            _ => {}
1940        }
1941        // Heartbeat timeout: only enforced once a worker has picked up the
1942        // task (status == IN_PROGRESS) and a heartbeat window is set.
1943        if entry.status == "IN_PROGRESS" {
1944            if let Some(hb) = entry.heartbeat_seconds {
1945                let last = entry.last_heartbeat_at.unwrap_or(entry.created_at);
1946                if (now_ts - last).num_seconds() > hb {
1947                    cleanup_token(shared_state, &activity_account, &token);
1948                    return Err((
1949                        "States.HeartbeatTimeout".to_string(),
1950                        format!("Activity worker missed heartbeat ({hb}s window)"),
1951                    ));
1952                }
1953            }
1954        }
1955        if std::time::Instant::now() >= absolute_deadline {
1956            cleanup_token(shared_state, &activity_account, &token);
1957            let secs = timeout_seconds.unwrap_or(3600);
1958            return Err((
1959                "States.Timeout".to_string(),
1960                format!("Activity timed out after {secs} seconds"),
1961            ));
1962        }
1963        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1964    }
1965}
1966
1967fn cleanup_token(shared_state: &SharedStepFunctionsState, account_id: &str, token: &str) {
1968    let mut accounts = shared_state.write();
1969    if let Some(state) = accounts.get_mut(account_id) {
1970        state.task_tokens.remove(token);
1971    }
1972}
1973
1974/// Apply Parameters template: keys ending with .$ are treated as JsonPath references.
1975fn apply_parameters(template: &Value, input: &Value) -> Value {
1976    match template {
1977        Value::Object(map) => {
1978            let mut result = serde_json::Map::new();
1979            for (key, value) in map {
1980                if let Some(stripped) = key.strip_suffix(".$") {
1981                    if let Some(path) = value.as_str() {
1982                        result.insert(
1983                            stripped.to_string(),
1984                            crate::io_processing::resolve_path(input, path),
1985                        );
1986                    }
1987                } else {
1988                    result.insert(key.clone(), apply_parameters(value, input));
1989                }
1990            }
1991            Value::Object(result)
1992        }
1993        Value::Array(arr) => Value::Array(arr.iter().map(|v| apply_parameters(v, input)).collect()),
1994        other => other.clone(),
1995    }
1996}
1997
1998enum NextState {
1999    Name(String),
2000    End,
2001    Error(String),
2002}
2003
2004fn next_state(state_def: &Value) -> NextState {
2005    if state_def["End"].as_bool() == Some(true) {
2006        return NextState::End;
2007    }
2008    match state_def["Next"].as_str() {
2009        Some(next) => NextState::Name(next.to_string()),
2010        None => NextState::Error("State has neither 'End' nor 'Next' field".to_string()),
2011    }
2012}
2013
2014/// Find the first `Catch` clause on `state_def` that matches `error` and
2015/// apply its `ResultPath` to produce the state to transition to and the
2016/// new effective input. Returns None when no catcher applies, in which
2017/// case the error should propagate up.
2018fn apply_state_catcher(
2019    state_def: &Value,
2020    effective_input: &Value,
2021    error: &str,
2022    cause: &str,
2023) -> Option<(String, Value)> {
2024    let catchers = state_def["Catch"].as_array().cloned().unwrap_or_default();
2025    let (next, result_path) = find_catcher(&catchers, error)?;
2026    let error_output = json!({
2027        "Error": error,
2028        "Cause": cause,
2029    });
2030    let new_input = apply_result_path(effective_input, &error_output, result_path.as_deref());
2031    Some((next, new_input))
2032}
2033
2034/// Extract account ID from an execution ARN (`arn:aws:states:region:account_id:...`).
2035fn account_id_from_arn(arn: &str) -> &str {
2036    arn.split(':').nth(4).unwrap_or("000000000000")
2037}
2038
2039fn add_event(
2040    state: &SharedStepFunctionsState,
2041    execution_arn: &str,
2042    event_type: &str,
2043    previous_event_id: i64,
2044    details: Value,
2045) -> i64 {
2046    let account_id = account_id_from_arn(execution_arn).to_string();
2047    let mut accounts = state.write();
2048    let s = accounts.get_or_create(&account_id);
2049    if let Some(exec) = s.executions.get_mut(execution_arn) {
2050        let id = exec.history_events.len() as i64 + 1;
2051        exec.history_events.push(HistoryEvent {
2052            id,
2053            event_type: event_type.to_string(),
2054            timestamp: Utc::now(),
2055            previous_event_id,
2056            details,
2057        });
2058        id
2059    } else {
2060        0
2061    }
2062}
2063
2064fn succeed_execution(state: &SharedStepFunctionsState, execution_arn: &str, output: &Value) {
2065    let account_id = account_id_from_arn(execution_arn).to_string();
2066    // Check terminal status before recording events to avoid inconsistent history
2067    {
2068        let accounts = state.read();
2069        if let Some(s) = accounts.get(&account_id) {
2070            if let Some(exec) = s.executions.get(execution_arn) {
2071                if exec.status != ExecutionStatus::Running {
2072                    return;
2073                }
2074            }
2075        }
2076    }
2077
2078    let output_str =
2079        serde_json::to_string(output).expect("serde_json::Value serialization is infallible");
2080
2081    add_event(
2082        state,
2083        execution_arn,
2084        "ExecutionSucceeded",
2085        0,
2086        json!({ "output": output_str }),
2087    );
2088
2089    let mut accounts = state.write();
2090    let s = accounts.get_or_create(&account_id);
2091    if let Some(exec) = s.executions.get_mut(execution_arn) {
2092        exec.status = ExecutionStatus::Succeeded;
2093        exec.output = Some(output_str);
2094        exec.stop_date = Some(Utc::now());
2095    }
2096}
2097
2098fn fail_execution(state: &SharedStepFunctionsState, execution_arn: &str, error: &str, cause: &str) {
2099    let account_id = account_id_from_arn(execution_arn).to_string();
2100    // Check terminal status before recording events to avoid inconsistent history
2101    {
2102        let accounts = state.read();
2103        if let Some(s) = accounts.get(&account_id) {
2104            if let Some(exec) = s.executions.get(execution_arn) {
2105                if exec.status != ExecutionStatus::Running {
2106                    return;
2107                }
2108            }
2109        }
2110    }
2111
2112    add_event(
2113        state,
2114        execution_arn,
2115        "ExecutionFailed",
2116        0,
2117        json!({ "error": error, "cause": cause }),
2118    );
2119
2120    let mut accounts = state.write();
2121    let s = accounts.get_or_create(&account_id);
2122    if let Some(exec) = s.executions.get_mut(execution_arn) {
2123        exec.status = ExecutionStatus::Failed;
2124        exec.error = Some(error.to_string());
2125        exec.cause = Some(cause.to_string());
2126        exec.stop_date = Some(Utc::now());
2127    }
2128}
2129
2130#[cfg(test)]
2131mod tests {
2132    use super::*;
2133    use crate::state::Execution;
2134    use parking_lot::RwLock;
2135    use std::sync::Arc;
2136
2137    fn make_state() -> SharedStepFunctionsState {
2138        Arc::new(RwLock::new(
2139            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
2140        ))
2141    }
2142
2143    fn create_execution(state: &SharedStepFunctionsState, arn: &str, input: Option<String>) {
2144        let mut accounts = state.write();
2145        let s = accounts.get_or_create("123456789012");
2146        s.executions.insert(
2147            arn.to_string(),
2148            Execution {
2149                execution_arn: arn.to_string(),
2150                state_machine_arn: "arn:aws:states:us-east-1:123456789012:stateMachine:test"
2151                    .to_string(),
2152                state_machine_name: "test".to_string(),
2153                name: "exec-1".to_string(),
2154                status: ExecutionStatus::Running,
2155                input,
2156                output: None,
2157                start_date: Utc::now(),
2158                stop_date: None,
2159                error: None,
2160                cause: None,
2161                history_events: vec![],
2162            },
2163        );
2164    }
2165
2166    #[tokio::test]
2167    async fn test_simple_pass_state() {
2168        let state = make_state();
2169        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
2170        create_execution(&state, arn, Some(r#"{"hello":"world"}"#.to_string()));
2171
2172        let definition = json!({
2173            "StartAt": "PassState",
2174            "States": {
2175                "PassState": {
2176                    "Type": "Pass",
2177                    "Result": {"processed": true},
2178                    "End": true
2179                }
2180            }
2181        })
2182        .to_string();
2183
2184        execute_state_machine(
2185            state.clone(),
2186            arn.to_string(),
2187            definition,
2188            Some(r#"{"hello":"world"}"#.to_string()),
2189            None,
2190            None,
2191        )
2192        .await;
2193
2194        let __a = state.read();
2195        let s = __a.default_ref();
2196        let exec = s.executions.get(arn).unwrap();
2197        assert_eq!(exec.status, ExecutionStatus::Succeeded);
2198        assert!(exec.output.is_some());
2199        let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2200        assert_eq!(output, json!({"processed": true}));
2201    }
2202
2203    #[tokio::test]
2204    async fn test_pass_chain() {
2205        let state = make_state();
2206        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
2207        create_execution(&state, arn, Some(r#"{}"#.to_string()));
2208
2209        let definition = json!({
2210            "StartAt": "First",
2211            "States": {
2212                "First": {
2213                    "Type": "Pass",
2214                    "Result": "step1",
2215                    "ResultPath": "$.first",
2216                    "Next": "Second"
2217                },
2218                "Second": {
2219                    "Type": "Pass",
2220                    "Result": "step2",
2221                    "ResultPath": "$.second",
2222                    "End": true
2223                }
2224            }
2225        })
2226        .to_string();
2227
2228        execute_state_machine(
2229            state.clone(),
2230            arn.to_string(),
2231            definition,
2232            Some("{}".to_string()),
2233            None,
2234            None,
2235        )
2236        .await;
2237
2238        let __a = state.read();
2239        let s = __a.default_ref();
2240        let exec = s.executions.get(arn).unwrap();
2241        assert_eq!(exec.status, ExecutionStatus::Succeeded);
2242        let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2243        assert_eq!(output["first"], json!("step1"));
2244        assert_eq!(output["second"], json!("step2"));
2245    }
2246
2247    #[tokio::test]
2248    async fn test_succeed_state() {
2249        let state = make_state();
2250        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
2251        create_execution(&state, arn, Some(r#"{"data": "value"}"#.to_string()));
2252
2253        let definition = json!({
2254            "StartAt": "Done",
2255            "States": {
2256                "Done": {
2257                    "Type": "Succeed"
2258                }
2259            }
2260        })
2261        .to_string();
2262
2263        execute_state_machine(
2264            state.clone(),
2265            arn.to_string(),
2266            definition,
2267            Some(r#"{"data": "value"}"#.to_string()),
2268            None,
2269            None,
2270        )
2271        .await;
2272
2273        let __a = state.read();
2274        let s = __a.default_ref();
2275        let exec = s.executions.get(arn).unwrap();
2276        assert_eq!(exec.status, ExecutionStatus::Succeeded);
2277    }
2278
2279    #[tokio::test]
2280    async fn test_fail_state() {
2281        let state = make_state();
2282        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
2283        create_execution(&state, arn, None);
2284
2285        let definition = json!({
2286            "StartAt": "FailState",
2287            "States": {
2288                "FailState": {
2289                    "Type": "Fail",
2290                    "Error": "CustomError",
2291                    "Cause": "Something went wrong"
2292                }
2293            }
2294        })
2295        .to_string();
2296
2297        execute_state_machine(state.clone(), arn.to_string(), definition, None, None, None).await;
2298
2299        let __a = state.read();
2300        let s = __a.default_ref();
2301        let exec = s.executions.get(arn).unwrap();
2302        assert_eq!(exec.status, ExecutionStatus::Failed);
2303        assert_eq!(exec.error.as_deref(), Some("CustomError"));
2304        assert_eq!(exec.cause.as_deref(), Some("Something went wrong"));
2305    }
2306
2307    #[tokio::test]
2308    async fn test_history_events_recorded() {
2309        let state = make_state();
2310        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
2311        create_execution(&state, arn, Some("{}".to_string()));
2312
2313        let definition = json!({
2314            "StartAt": "PassState",
2315            "States": {
2316                "PassState": {
2317                    "Type": "Pass",
2318                    "End": true
2319                }
2320            }
2321        })
2322        .to_string();
2323
2324        execute_state_machine(
2325            state.clone(),
2326            arn.to_string(),
2327            definition,
2328            Some("{}".to_string()),
2329            None,
2330            None,
2331        )
2332        .await;
2333
2334        let __a = state.read();
2335        let s = __a.default_ref();
2336        let exec = s.executions.get(arn).unwrap();
2337        let event_types: Vec<&str> = exec
2338            .history_events
2339            .iter()
2340            .map(|e| e.event_type.as_str())
2341            .collect();
2342        assert_eq!(
2343            event_types,
2344            vec![
2345                "ExecutionStarted",
2346                "PassStateEntered",
2347                "PassStateExited",
2348                "ExecutionSucceeded"
2349            ]
2350        );
2351    }
2352
2353    fn drive(state: &SharedStepFunctionsState, arn: &str, def: Value, input: Option<&str>) {
2354        create_execution(state, arn, input.map(|s| s.to_string()));
2355        let fut = execute_state_machine(
2356            state.clone(),
2357            arn.to_string(),
2358            def.to_string(),
2359            input.map(|s| s.to_string()),
2360            None,
2361            None,
2362        );
2363        let rt = tokio::runtime::Builder::new_current_thread()
2364            .enable_time()
2365            .build()
2366            .unwrap();
2367        rt.block_on(fut);
2368    }
2369
2370    fn read_exec<R>(
2371        state: &SharedStepFunctionsState,
2372        arn: &str,
2373        f: impl FnOnce(&Execution) -> R,
2374    ) -> R {
2375        let __a = state.read();
2376        let s = __a.default_ref();
2377        f(s.executions.get(arn).expect("execution missing"))
2378    }
2379
2380    fn arn_for(name: &str) -> String {
2381        format!("arn:aws:states:us-east-1:123456789012:execution:test:{name}")
2382    }
2383
2384    // ── Pass state: InputPath / OutputPath ───────────────────────────
2385
2386    #[test]
2387    fn pass_state_input_output_path_select_fields() {
2388        let state = make_state();
2389        let arn = arn_for("pass-paths");
2390        let def = json!({
2391            "StartAt": "P",
2392            "States": {
2393                "P": {
2394                    "Type": "Pass",
2395                    "InputPath": "$.inner",
2396                    "OutputPath": "$.kept",
2397                    "End": true
2398                }
2399            }
2400        });
2401        drive(
2402            &state,
2403            &arn,
2404            def,
2405            Some(r#"{"inner":{"kept":"yes","dropped":true},"sibling":1}"#),
2406        );
2407
2408        read_exec(&state, &arn, |exec| {
2409            assert_eq!(exec.status, ExecutionStatus::Succeeded);
2410            let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2411            assert_eq!(output, json!("yes"));
2412        });
2413    }
2414
2415    // ── Succeed / Fail variants ──────────────────────────────────────
2416
2417    #[test]
2418    fn succeed_state_honors_input_path_null() {
2419        let state = make_state();
2420        let arn = arn_for("succeed-null");
2421        let def = json!({
2422            "StartAt": "S",
2423            "States": { "S": { "Type": "Succeed", "InputPath": "null" } }
2424        });
2425        drive(&state, &arn, def, Some(r#"{"a":1}"#));
2426
2427        read_exec(&state, &arn, |exec| {
2428            assert_eq!(exec.status, ExecutionStatus::Succeeded);
2429            let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2430            assert_eq!(output, json!({}));
2431        });
2432    }
2433
2434    #[test]
2435    fn fail_state_defaults_when_fields_missing() {
2436        let state = make_state();
2437        let arn = arn_for("fail-default");
2438        let def = json!({
2439            "StartAt": "F",
2440            "States": { "F": { "Type": "Fail" } }
2441        });
2442        drive(&state, &arn, def, None);
2443
2444        read_exec(&state, &arn, |exec| {
2445            assert_eq!(exec.status, ExecutionStatus::Failed);
2446            assert_eq!(exec.error.as_deref(), Some("States.Fail"));
2447            assert_eq!(exec.cause.as_deref(), Some(""));
2448        });
2449    }
2450
2451    // ── Choice ───────────────────────────────────────────────────────
2452
2453    fn choice_def() -> Value {
2454        json!({
2455            "StartAt": "C",
2456            "States": {
2457                "C": {
2458                    "Type": "Choice",
2459                    "Choices": [
2460                        {
2461                            "Variable": "$.n",
2462                            "NumericGreaterThan": 10,
2463                            "Next": "Big"
2464                        }
2465                    ],
2466                    "Default": "Small"
2467                },
2468                "Big":   { "Type": "Pass", "Result": "big",   "End": true },
2469                "Small": { "Type": "Pass", "Result": "small", "End": true }
2470            }
2471        })
2472    }
2473
2474    #[test]
2475    fn choice_routes_to_matching_branch() {
2476        let state = make_state();
2477        let arn = arn_for("choice-big");
2478        drive(&state, &arn, choice_def(), Some(r#"{"n":42}"#));
2479
2480        read_exec(&state, &arn, |exec| {
2481            assert_eq!(exec.status, ExecutionStatus::Succeeded);
2482            assert_eq!(
2483                serde_json::from_str::<Value>(exec.output.as_ref().unwrap()).unwrap(),
2484                json!("big")
2485            );
2486        });
2487    }
2488
2489    #[test]
2490    fn choice_falls_through_to_default() {
2491        let state = make_state();
2492        let arn = arn_for("choice-default");
2493        drive(&state, &arn, choice_def(), Some(r#"{"n":3}"#));
2494
2495        read_exec(&state, &arn, |exec| {
2496            assert_eq!(exec.status, ExecutionStatus::Succeeded);
2497            assert_eq!(
2498                serde_json::from_str::<Value>(exec.output.as_ref().unwrap()).unwrap(),
2499                json!("small")
2500            );
2501        });
2502    }
2503
2504    #[test]
2505    fn choice_no_match_and_no_default_fails() {
2506        let state = make_state();
2507        let arn = arn_for("choice-nomatch");
2508        let def = json!({
2509            "StartAt": "C",
2510            "States": {
2511                "C": {
2512                    "Type": "Choice",
2513                    "Choices": [
2514                        { "Variable": "$.n", "NumericEquals": 1, "Next": "End1" }
2515                    ]
2516                },
2517                "End1": { "Type": "Pass", "End": true }
2518            }
2519        });
2520        drive(&state, &arn, def, Some(r#"{"n":99}"#));
2521
2522        read_exec(&state, &arn, |exec| {
2523            assert_eq!(exec.status, ExecutionStatus::Failed);
2524            assert_eq!(exec.error.as_deref(), Some("States.NoChoiceMatched"));
2525        });
2526    }
2527
2528    // ── Wait ─────────────────────────────────────────────────────────
2529
2530    #[test]
2531    fn wait_seconds_then_advances() {
2532        let state = make_state();
2533        let arn = arn_for("wait-secs");
2534        let def = json!({
2535            "StartAt": "W",
2536            "States": {
2537                "W": { "Type": "Wait", "Seconds": 0, "Next": "Done" },
2538                "Done": { "Type": "Succeed" }
2539            }
2540        });
2541        drive(&state, &arn, def, Some(r#"{"k":1}"#));
2542
2543        read_exec(&state, &arn, |exec| {
2544            assert_eq!(exec.status, ExecutionStatus::Succeeded);
2545        });
2546    }
2547
2548    #[test]
2549    fn wait_timestamp_in_past_is_noop() {
2550        let state = make_state();
2551        let arn = arn_for("wait-past");
2552        let def = json!({
2553            "StartAt": "W",
2554            "States": {
2555                "W": {
2556                    "Type": "Wait",
2557                    "Timestamp": "2000-01-01T00:00:00Z",
2558                    "End": true
2559                }
2560            }
2561        });
2562        drive(&state, &arn, def, Some(r#"{"k":1}"#));
2563
2564        read_exec(&state, &arn, |exec| {
2565            assert_eq!(exec.status, ExecutionStatus::Succeeded);
2566        });
2567    }
2568
2569    #[test]
2570    fn wait_without_any_duration_falls_through() {
2571        let state = make_state();
2572        let arn = arn_for("wait-none");
2573        let def = json!({
2574            "StartAt": "W",
2575            "States": { "W": { "Type": "Wait", "End": true } }
2576        });
2577        drive(&state, &arn, def, None);
2578
2579        read_exec(&state, &arn, |exec| {
2580            assert_eq!(exec.status, ExecutionStatus::Succeeded);
2581        });
2582    }
2583
2584    // ── Parallel ─────────────────────────────────────────────────────
2585
2586    #[test]
2587    fn parallel_runs_two_pass_branches_and_collects_results() {
2588        let state = make_state();
2589        let arn = arn_for("parallel-ok");
2590        let def = json!({
2591            "StartAt": "P",
2592            "States": {
2593                "P": {
2594                    "Type": "Parallel",
2595                    "End": true,
2596                    "Branches": [
2597                        {
2598                            "StartAt": "A",
2599                            "States": { "A": { "Type": "Pass", "Result": "a", "End": true } }
2600                        },
2601                        {
2602                            "StartAt": "B",
2603                            "States": { "B": { "Type": "Pass", "Result": "b", "End": true } }
2604                        }
2605                    ]
2606                }
2607            }
2608        });
2609        drive(&state, &arn, def, Some(r#"{}"#));
2610
2611        read_exec(&state, &arn, |exec| {
2612            assert_eq!(exec.status, ExecutionStatus::Succeeded);
2613            let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2614            assert_eq!(output, json!(["a", "b"]));
2615        });
2616    }
2617
2618    #[test]
2619    fn parallel_empty_branches_fails() {
2620        let state = make_state();
2621        let arn = arn_for("parallel-empty");
2622        let def = json!({
2623            "StartAt": "P",
2624            "States": { "P": { "Type": "Parallel", "Branches": [], "End": true } }
2625        });
2626        drive(&state, &arn, def, None);
2627
2628        read_exec(&state, &arn, |exec| {
2629            assert_eq!(exec.status, ExecutionStatus::Failed);
2630            assert_eq!(exec.error.as_deref(), Some("States.Runtime"));
2631        });
2632    }
2633
2634    // ── Map ──────────────────────────────────────────────────────────
2635
2636    #[test]
2637    fn map_iterates_pass_state_over_items_path() {
2638        let state = make_state();
2639        let arn = arn_for("map-ok");
2640        let def = json!({
2641            "StartAt": "M",
2642            "States": {
2643                "M": {
2644                    "Type": "Map",
2645                    "ItemsPath": "$.items",
2646                    "Iterator": {
2647                        "StartAt": "Item",
2648                        "States": { "Item": { "Type": "Pass", "End": true } }
2649                    },
2650                    "End": true
2651                }
2652            }
2653        });
2654        drive(&state, &arn, def, Some(r#"{"items":[1,2,3]}"#));
2655
2656        read_exec(&state, &arn, |exec| {
2657            assert_eq!(exec.status, ExecutionStatus::Succeeded);
2658            let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2659            assert_eq!(output, json!([1, 2, 3]));
2660        });
2661    }
2662
2663    // ── Task: unsupported resources / delivery == None ───────────────
2664
2665    #[test]
2666    fn task_unsupported_resource_propagates_failure() {
2667        let state = make_state();
2668        let arn = arn_for("task-unsupported");
2669        let def = json!({
2670            "StartAt": "T",
2671            "States": {
2672                "T": {
2673                    "Type": "Task",
2674                    "Resource": "arn:aws:states:::nothing:here",
2675                    "End": true
2676                }
2677            }
2678        });
2679        drive(&state, &arn, def, None);
2680
2681        read_exec(&state, &arn, |exec| {
2682            assert_eq!(exec.status, ExecutionStatus::Failed);
2683            assert_eq!(exec.error.as_deref(), Some("States.TaskFailed"));
2684            assert!(exec.cause.as_deref().unwrap().contains("Unsupported"));
2685        });
2686    }
2687
2688    #[test]
2689    fn task_sqs_send_without_delivery_fails() {
2690        let state = make_state();
2691        let arn = arn_for("task-sqs-nodelivery");
2692        let def = json!({
2693            "StartAt": "T",
2694            "States": {
2695                "T": {
2696                    "Type": "Task",
2697                    "Resource": "arn:aws:states:::sqs:sendMessage",
2698                    "Parameters": { "QueueUrl": "http://localhost/123/q", "MessageBody": "hi" },
2699                    "End": true
2700                }
2701            }
2702        });
2703        drive(&state, &arn, def, Some("{}"));
2704
2705        read_exec(&state, &arn, |exec| {
2706            assert_eq!(exec.status, ExecutionStatus::Failed);
2707            assert!(exec.cause.as_deref().unwrap().contains("delivery bus"));
2708        });
2709    }
2710
2711    // ── Task: Catch clause ───────────────────────────────────────────
2712
2713    #[test]
2714    fn task_catch_routes_error_into_handler() {
2715        let state = make_state();
2716        let arn = arn_for("task-catch");
2717        let def = json!({
2718            "StartAt": "T",
2719            "States": {
2720                "T": {
2721                    "Type": "Task",
2722                    "Resource": "arn:aws:states:::nothing:here",
2723                    "Catch": [
2724                        { "ErrorEquals": ["States.ALL"], "Next": "Handler", "ResultPath": "$.err" }
2725                    ],
2726                    "End": true
2727                },
2728                "Handler": { "Type": "Pass", "End": true }
2729            }
2730        });
2731        drive(&state, &arn, def, Some(r#"{"orig":"v"}"#));
2732
2733        read_exec(&state, &arn, |exec| {
2734            assert_eq!(exec.status, ExecutionStatus::Succeeded);
2735            let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2736            // Handler is Pass with no Result — effective input flows through.
2737            assert_eq!(output["orig"], json!("v"));
2738            assert_eq!(output["err"]["Error"], json!("States.TaskFailed"));
2739        });
2740    }
2741
2742    // ── Top-level errors: definition / start-at / missing states ─────
2743
2744    #[test]
2745    fn invalid_definition_json_fails_execution() {
2746        let state = make_state();
2747        let arn = arn_for("bad-json");
2748        create_execution(&state, &arn, None);
2749        let rt = tokio::runtime::Builder::new_current_thread()
2750            .enable_time()
2751            .build()
2752            .unwrap();
2753        rt.block_on(execute_state_machine(
2754            state.clone(),
2755            arn.clone(),
2756            "not json{".to_string(),
2757            None,
2758            None,
2759            None,
2760        ));
2761
2762        read_exec(&state, &arn, |exec| {
2763            assert_eq!(exec.status, ExecutionStatus::Failed);
2764            assert_eq!(exec.error.as_deref(), Some("States.Runtime"));
2765            assert!(exec.cause.as_deref().unwrap().contains("Failed to parse"));
2766        });
2767    }
2768
2769    #[test]
2770    fn missing_start_at_fails_execution() {
2771        let state = make_state();
2772        let arn = arn_for("no-startat");
2773        let def = json!({ "States": { "X": { "Type": "Succeed" } } });
2774        drive(&state, &arn, def, None);
2775
2776        read_exec(&state, &arn, |exec| {
2777            assert_eq!(exec.status, ExecutionStatus::Failed);
2778            assert!(exec.cause.as_deref().unwrap().contains("StartAt"));
2779        });
2780    }
2781
2782    #[test]
2783    fn next_state_not_found_fails_execution() {
2784        let state = make_state();
2785        let arn = arn_for("dangling-next");
2786        let def = json!({
2787            "StartAt": "A",
2788            "States": { "A": { "Type": "Pass", "Next": "DoesNotExist" } }
2789        });
2790        drive(&state, &arn, def, None);
2791
2792        read_exec(&state, &arn, |exec| {
2793            assert_eq!(exec.status, ExecutionStatus::Failed);
2794            assert!(exec.cause.as_deref().unwrap().contains("not found"));
2795        });
2796    }
2797
2798    #[test]
2799    fn unsupported_state_type_fails_execution() {
2800        let state = make_state();
2801        let arn = arn_for("bad-type");
2802        let def = json!({
2803            "StartAt": "X",
2804            "States": { "X": { "Type": "WatChoo", "End": true } }
2805        });
2806        drive(&state, &arn, def, None);
2807
2808        read_exec(&state, &arn, |exec| {
2809            assert_eq!(exec.status, ExecutionStatus::Failed);
2810            assert!(exec
2811                .cause
2812                .as_deref()
2813                .unwrap()
2814                .contains("Unsupported state type"));
2815        });
2816    }
2817
2818    // ── Pure helpers ─────────────────────────────────────────────────
2819
2820    #[test]
2821    fn apply_parameters_substitutes_json_path_refs() {
2822        let template = json!({
2823            "literal": "constant",
2824            "ref.$": "$.user.id",
2825            "nested": { "inner.$": "$.user.name" },
2826            "list": [ { "x.$": "$.user.id" } ]
2827        });
2828        let input = json!({ "user": { "id": 42, "name": "zoe" } });
2829        let out = apply_parameters(&template, &input);
2830        assert_eq!(out["literal"], json!("constant"));
2831        assert_eq!(out["ref"], json!(42));
2832        assert_eq!(out["nested"]["inner"], json!("zoe"));
2833        assert_eq!(out["list"][0]["x"], json!(42));
2834    }
2835
2836    #[test]
2837    fn next_state_returns_end_name_or_error() {
2838        match next_state(&json!({ "End": true })) {
2839            NextState::End => {}
2840            _ => panic!("expected End"),
2841        }
2842        match next_state(&json!({ "Next": "A" })) {
2843            NextState::Name(n) => assert_eq!(n, "A"),
2844            _ => panic!("expected Name"),
2845        }
2846        match next_state(&json!({})) {
2847            NextState::Error(_) => {}
2848            _ => panic!("expected Error"),
2849        }
2850    }
2851
2852    #[test]
2853    fn apply_state_catcher_matches_wildcard_and_stashes_error() {
2854        let state_def = json!({
2855            "Catch": [
2856                { "ErrorEquals": ["States.ALL"], "Next": "H", "ResultPath": "$.caught" }
2857            ]
2858        });
2859        let input = json!({ "a": 1 });
2860        let (next, new_input) =
2861            apply_state_catcher(&state_def, &input, "Boom", "it exploded").unwrap();
2862        assert_eq!(next, "H");
2863        assert_eq!(new_input["a"], json!(1));
2864        assert_eq!(new_input["caught"]["Error"], json!("Boom"));
2865        assert_eq!(new_input["caught"]["Cause"], json!("it exploded"));
2866    }
2867
2868    #[test]
2869    fn apply_state_catcher_returns_none_without_match() {
2870        let state_def = json!({
2871            "Catch": [
2872                { "ErrorEquals": ["Specific.Error"], "Next": "H" }
2873            ]
2874        });
2875        let input = json!({});
2876        assert!(apply_state_catcher(&state_def, &input, "Other", "why").is_none());
2877    }
2878
2879    #[test]
2880    fn queue_url_to_arn_parses_account_and_name() {
2881        assert_eq!(
2882            queue_url_to_arn("http://sqs.local:4566/123456789012/my-queue"),
2883            "arn:aws:sqs:us-east-1:123456789012:my-queue"
2884        );
2885    }
2886
2887    #[test]
2888    fn queue_url_to_arn_falls_back_for_unparseable_input() {
2889        assert_eq!(queue_url_to_arn("bad"), "bad");
2890    }
2891
2892    #[test]
2893    fn md5_hex_is_deterministic_and_32_chars() {
2894        let a = md5_hex("hello");
2895        let b = md5_hex("hello");
2896        assert_eq!(a, b);
2897        assert_eq!(a.len(), 32);
2898        assert_ne!(a, md5_hex("world"));
2899    }
2900
2901    #[test]
2902    fn apply_update_expression_sets_direct_and_aliased_attrs() {
2903        let mut item: HashMap<String, Value> = HashMap::new();
2904        item.insert("id".to_string(), json!({"S": "1"}));
2905
2906        let mut attr_values = serde_json::Map::new();
2907        attr_values.insert(":n".to_string(), json!({"S": "Alice"}));
2908        attr_values.insert(":c".to_string(), json!({"N": "5"}));
2909
2910        let mut attr_names = serde_json::Map::new();
2911        attr_names.insert("#name".to_string(), json!("name"));
2912
2913        apply_update_expression(
2914            &mut item,
2915            "SET #name = :n, count = :c",
2916            &attr_values,
2917            &attr_names,
2918        );
2919
2920        assert_eq!(item.get("name").unwrap(), &json!({"S": "Alice"}));
2921        assert_eq!(item.get("count").unwrap(), &json!({"N": "5"}));
2922        assert_eq!(item.get("id").unwrap(), &json!({"S": "1"}));
2923    }
2924
2925    #[test]
2926    fn apply_update_expression_accepts_lowercase_set_keyword() {
2927        let mut item: HashMap<String, Value> = HashMap::new();
2928        let mut attr_values = serde_json::Map::new();
2929        attr_values.insert(":v".to_string(), json!({"S": "x"}));
2930        apply_update_expression(
2931            &mut item,
2932            "set field = :v",
2933            &attr_values,
2934            &serde_json::Map::new(),
2935        );
2936        assert_eq!(item.get("field").unwrap(), &json!({"S": "x"}));
2937    }
2938
2939    #[test]
2940    fn apply_update_expression_set_arithmetic_increments_counter() {
2941        let mut item: HashMap<String, Value> = HashMap::new();
2942        item.insert("count".to_string(), json!({"N": "10"}));
2943        let mut attr_values = serde_json::Map::new();
2944        attr_values.insert(":inc".to_string(), json!({"N": "3"}));
2945        apply_update_expression(
2946            &mut item,
2947            "SET count = count + :inc",
2948            &attr_values,
2949            &serde_json::Map::new(),
2950        );
2951        assert_eq!(item.get("count").unwrap(), &json!({"N": "13"}));
2952    }
2953
2954    #[test]
2955    fn apply_update_expression_set_decrement() {
2956        let mut item: HashMap<String, Value> = HashMap::new();
2957        item.insert("count".to_string(), json!({"N": "10"}));
2958        let mut attr_values = serde_json::Map::new();
2959        attr_values.insert(":d".to_string(), json!({"N": "4"}));
2960        apply_update_expression(
2961            &mut item,
2962            "SET count = count - :d",
2963            &attr_values,
2964            &serde_json::Map::new(),
2965        );
2966        assert_eq!(item.get("count").unwrap(), &json!({"N": "6"}));
2967    }
2968
2969    #[test]
2970    fn apply_update_expression_remove_drops_attributes() {
2971        let mut item: HashMap<String, Value> = HashMap::new();
2972        item.insert("a".to_string(), json!({"S": "x"}));
2973        item.insert("b".to_string(), json!({"S": "y"}));
2974        item.insert("c".to_string(), json!({"S": "z"}));
2975        apply_update_expression(
2976            &mut item,
2977            "REMOVE a, c",
2978            &serde_json::Map::new(),
2979            &serde_json::Map::new(),
2980        );
2981        assert!(!item.contains_key("a"));
2982        assert!(item.contains_key("b"));
2983        assert!(!item.contains_key("c"));
2984    }
2985
2986    #[test]
2987    fn apply_update_expression_add_increments_existing_or_initializes() {
2988        // Existing attribute -> sum
2989        let mut item: HashMap<String, Value> = HashMap::new();
2990        item.insert("count".to_string(), json!({"N": "5"}));
2991        let mut attr_values = serde_json::Map::new();
2992        attr_values.insert(":inc".to_string(), json!({"N": "2"}));
2993        apply_update_expression(
2994            &mut item,
2995            "ADD count :inc",
2996            &attr_values,
2997            &serde_json::Map::new(),
2998        );
2999        assert_eq!(item.get("count").unwrap(), &json!({"N": "7"}));
3000
3001        // Absent attribute -> initialized to value
3002        let mut item2: HashMap<String, Value> = HashMap::new();
3003        apply_update_expression(
3004            &mut item2,
3005            "ADD count :inc",
3006            &attr_values,
3007            &serde_json::Map::new(),
3008        );
3009        assert_eq!(item2.get("count").unwrap(), &json!({"N": "2"}));
3010    }
3011
3012    #[test]
3013    fn apply_update_expression_delete_removes_set_elements() {
3014        let mut item: HashMap<String, Value> = HashMap::new();
3015        item.insert("tags".to_string(), json!({"SS": ["a", "b", "c"]}));
3016        let mut attr_values = serde_json::Map::new();
3017        attr_values.insert(":rm".to_string(), json!({"SS": ["b"]}));
3018        apply_update_expression(
3019            &mut item,
3020            "DELETE tags :rm",
3021            &attr_values,
3022            &serde_json::Map::new(),
3023        );
3024        assert_eq!(item.get("tags").unwrap(), &json!({"SS": ["a", "c"]}));
3025    }
3026
3027    #[test]
3028    fn apply_update_expression_if_not_exists_initializes_only_when_absent() {
3029        // Absent -> initialize.
3030        let mut item: HashMap<String, Value> = HashMap::new();
3031        let mut attr_values = serde_json::Map::new();
3032        attr_values.insert(":zero".to_string(), json!({"N": "0"}));
3033        apply_update_expression(
3034            &mut item,
3035            "SET count = if_not_exists(count, :zero)",
3036            &attr_values,
3037            &serde_json::Map::new(),
3038        );
3039        assert_eq!(item.get("count").unwrap(), &json!({"N": "0"}));
3040
3041        // Present -> preserve existing.
3042        let mut item2: HashMap<String, Value> = HashMap::new();
3043        item2.insert("count".to_string(), json!({"N": "42"}));
3044        apply_update_expression(
3045            &mut item2,
3046            "SET count = if_not_exists(count, :zero)",
3047            &attr_values,
3048            &serde_json::Map::new(),
3049        );
3050        assert_eq!(item2.get("count").unwrap(), &json!({"N": "42"}));
3051    }
3052
3053    #[test]
3054    fn apply_update_expression_combines_clauses() {
3055        let mut item: HashMap<String, Value> = HashMap::new();
3056        item.insert("a".to_string(), json!({"S": "old"}));
3057        item.insert("b".to_string(), json!({"N": "1"}));
3058        item.insert("c".to_string(), json!({"S": "drop"}));
3059        let mut attr_values = serde_json::Map::new();
3060        attr_values.insert(":new".to_string(), json!({"S": "new"}));
3061        attr_values.insert(":one".to_string(), json!({"N": "1"}));
3062        apply_update_expression(
3063            &mut item,
3064            "SET a = :new ADD b :one REMOVE c",
3065            &attr_values,
3066            &serde_json::Map::new(),
3067        );
3068        assert_eq!(item.get("a").unwrap(), &json!({"S": "new"}));
3069        assert_eq!(item.get("b").unwrap(), &json!({"N": "2"}));
3070        assert!(!item.contains_key("c"));
3071    }
3072
3073    // ── DynamoDB invoke: error paths without delivery bus ────────────
3074
3075    #[test]
3076    fn task_dynamodb_get_item_without_state_fails() {
3077        let state = make_state();
3078        let arn = arn_for("ddb-get-nostate");
3079        let def = json!({
3080            "StartAt": "T",
3081            "States": {
3082                "T": {
3083                    "Type": "Task",
3084                    "Resource": "arn:aws:states:::dynamodb:getItem",
3085                    "Parameters": { "TableName": "t", "Key": { "id": { "S": "1" } } },
3086                    "End": true
3087                }
3088            }
3089        });
3090        drive(&state, &arn, def, Some("{}"));
3091        read_exec(&state, &arn, |exec| {
3092            assert_eq!(exec.status, ExecutionStatus::Failed);
3093            assert!(exec.cause.as_deref().unwrap().contains("DynamoDB"));
3094        });
3095    }
3096
3097    // ── Terminal guards on succeed/fail helpers ──────────────────────
3098
3099    #[test]
3100    fn succeed_execution_is_noop_when_already_terminal() {
3101        let state = make_state();
3102        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:already";
3103        create_execution(&state, arn, None);
3104        {
3105            let mut __a = state.write();
3106            let s = __a.default_mut();
3107            s.executions.get_mut(arn).unwrap().status = ExecutionStatus::Failed;
3108        }
3109        succeed_execution(&state, arn, &json!({"x":1}));
3110        let __a = state.read();
3111        let s = __a.default_ref();
3112        let exec = s.executions.get(arn).unwrap();
3113        assert_eq!(exec.status, ExecutionStatus::Failed);
3114        assert!(exec.output.is_none());
3115    }
3116
3117    #[test]
3118    fn fail_execution_is_noop_when_already_terminal() {
3119        let state = make_state();
3120        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:already2";
3121        create_execution(&state, arn, None);
3122        {
3123            let mut __a = state.write();
3124            let s = __a.default_mut();
3125            s.executions.get_mut(arn).unwrap().status = ExecutionStatus::Succeeded;
3126        }
3127        fail_execution(&state, arn, "Oops", "nope");
3128        let __a = state.read();
3129        let s = __a.default_ref();
3130        let exec = s.executions.get(arn).unwrap();
3131        assert_eq!(exec.status, ExecutionStatus::Succeeded);
3132        assert!(exec.error.is_none());
3133    }
3134
3135    // ── Pass state with ResultPath ──
3136
3137    #[test]
3138    fn pass_state_result_path_merges_into_input() {
3139        let state = make_state();
3140        let arn = arn_for("result-path");
3141        let def = json!({
3142            "StartAt": "P",
3143            "States": {
3144                "P": {"Type": "Pass", "Result": {"x": 2}, "ResultPath": "$.data", "End": true}
3145            }
3146        });
3147        drive(&state, &arn, def, Some(r#"{"a":1}"#));
3148        let output = read_exec(&state, &arn, |e| e.output.clone().unwrap_or_default());
3149        let v: Value = serde_json::from_str(&output).unwrap();
3150        assert_eq!(v["a"], 1);
3151        assert_eq!(v["data"]["x"], 2);
3152    }
3153
3154    // ── Choice with many operators ──
3155
3156    #[test]
3157    fn choice_string_greater_than_equals() {
3158        let state = make_state();
3159        let arn = arn_for("choice-sgte");
3160        let def = json!({
3161            "StartAt": "C",
3162            "States": {
3163                "C": {
3164                    "Type": "Choice",
3165                    "Choices": [
3166                        {"Variable": "$.val", "StringGreaterThanEquals": "apple", "Next": "End"}
3167                    ],
3168                    "Default": "Fail"
3169                },
3170                "End": {"Type": "Pass", "End": true},
3171                "Fail": {"Type": "Fail"}
3172            }
3173        });
3174        drive(&state, &arn, def, Some(r#"{"val":"banana"}"#));
3175        let status = read_exec(&state, &arn, |e| e.status);
3176        assert_eq!(status, ExecutionStatus::Succeeded);
3177    }
3178
3179    #[test]
3180    fn choice_is_present_and_is_null() {
3181        let state = make_state();
3182        let arn = arn_for("choice-ispres");
3183        let def = json!({
3184            "StartAt": "C",
3185            "States": {
3186                "C": {
3187                    "Type": "Choice",
3188                    "Choices": [
3189                        {"Variable": "$.foo", "IsPresent": true, "Next": "End"}
3190                    ],
3191                    "Default": "Fail"
3192                },
3193                "End": {"Type": "Pass", "End": true},
3194                "Fail": {"Type": "Fail"}
3195            }
3196        });
3197        drive(&state, &arn, def, Some(r#"{"foo":null}"#));
3198        assert_eq!(
3199            read_exec(&state, &arn, |e| e.status),
3200            ExecutionStatus::Succeeded
3201        );
3202    }
3203
3204    #[test]
3205    fn choice_or_short_circuits() {
3206        let state = make_state();
3207        let arn = arn_for("choice-or");
3208        let def = json!({
3209            "StartAt": "C",
3210            "States": {
3211                "C": {
3212                    "Type": "Choice",
3213                    "Choices": [{
3214                        "Or": [
3215                            {"Variable": "$.x", "NumericEquals": 99},
3216                            {"Variable": "$.y", "StringEquals": "b"}
3217                        ],
3218                        "Next": "End"
3219                    }],
3220                    "Default": "Fail"
3221                },
3222                "End": {"Type": "Pass", "End": true},
3223                "Fail": {"Type": "Fail"}
3224            }
3225        });
3226        drive(&state, &arn, def, Some(r#"{"x":1,"y":"b"}"#));
3227        assert_eq!(
3228            read_exec(&state, &arn, |e| e.status),
3229            ExecutionStatus::Succeeded
3230        );
3231    }
3232
3233    #[test]
3234    fn choice_not_negates() {
3235        let state = make_state();
3236        let arn = arn_for("choice-not");
3237        let def = json!({
3238            "StartAt": "C",
3239            "States": {
3240                "C": {
3241                    "Type": "Choice",
3242                    "Choices": [{
3243                        "Not": {"Variable": "$.x", "NumericEquals": 99},
3244                        "Next": "End"
3245                    }],
3246                    "Default": "Fail"
3247                },
3248                "End": {"Type": "Pass", "End": true},
3249                "Fail": {"Type": "Fail"}
3250            }
3251        });
3252        drive(&state, &arn, def, Some(r#"{"x":1}"#));
3253        assert_eq!(
3254            read_exec(&state, &arn, |e| e.status),
3255            ExecutionStatus::Succeeded
3256        );
3257    }
3258
3259    #[test]
3260    fn choice_boolean_equals() {
3261        let state = make_state();
3262        let arn = arn_for("choice-bool");
3263        let def = json!({
3264            "StartAt": "C",
3265            "States": {
3266                "C": {
3267                    "Type": "Choice",
3268                    "Choices": [
3269                        {"Variable": "$.ok", "BooleanEquals": true, "Next": "End"}
3270                    ],
3271                    "Default": "Fail"
3272                },
3273                "End": {"Type": "Pass", "End": true},
3274                "Fail": {"Type": "Fail"}
3275            }
3276        });
3277        drive(&state, &arn, def, Some(r#"{"ok":true}"#));
3278        assert_eq!(
3279            read_exec(&state, &arn, |e| e.status),
3280            ExecutionStatus::Succeeded
3281        );
3282    }
3283
3284    // ── Wait with SecondsPath ──
3285
3286    #[test]
3287    fn wait_seconds_path_uses_input_value() {
3288        let state = make_state();
3289        let arn = arn_for("wait-sp");
3290        let def = json!({
3291            "StartAt": "W",
3292            "States": {
3293                "W": {"Type": "Wait", "SecondsPath": "$.wait", "End": true}
3294            }
3295        });
3296        drive(&state, &arn, def, Some(r#"{"wait":0}"#));
3297        assert_eq!(
3298            read_exec(&state, &arn, |e| e.status),
3299            ExecutionStatus::Succeeded
3300        );
3301    }
3302
3303    // ── Map state with empty input array ──
3304
3305    #[test]
3306    fn map_state_empty_array_succeeds() {
3307        let state = make_state();
3308        let arn = arn_for("map-empty");
3309        let def = json!({
3310            "StartAt": "M",
3311            "States": {
3312                "M": {
3313                    "Type": "Map",
3314                    "ItemsPath": "$.items",
3315                    "ItemProcessor": {
3316                        "StartAt": "P",
3317                        "States": {
3318                            "P": {"Type": "Pass", "End": true}
3319                        }
3320                    },
3321                    "End": true
3322                }
3323            }
3324        });
3325        drive(&state, &arn, def, Some(r#"{"items":[]}"#));
3326        assert_eq!(
3327            read_exec(&state, &arn, |e| e.status),
3328            ExecutionStatus::Succeeded
3329        );
3330    }
3331
3332    // ── Fail state with Error + Cause ──
3333
3334    #[test]
3335    fn fail_state_with_explicit_error_and_cause() {
3336        let state = make_state();
3337        let arn = arn_for("fail-fields");
3338        create_execution(&state, &arn, None);
3339        let def = json!({
3340            "StartAt": "F",
3341            "States": {
3342                "F": {"Type": "Fail", "Error": "MyError", "Cause": "my cause"}
3343            }
3344        });
3345        drive(&state, &arn, def, None);
3346        let status = read_exec(&state, &arn, |e| e.status);
3347        assert_eq!(status, ExecutionStatus::Failed);
3348        let err = read_exec(&state, &arn, |e| e.error.clone().unwrap_or_default());
3349        assert_eq!(err, "MyError");
3350    }
3351}