Skip to main content

fakecloud_stepfunctions/
interpreter.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::Utc;
5use serde_json::{json, Value};
6use tracing::{debug, warn};
7
8use fakecloud_aws::arn::Arn;
9use fakecloud_core::delivery::DeliveryBus;
10use fakecloud_dynamodb::SharedDynamoDbState;
11
12use crate::choice::evaluate_choice;
13use crate::error_handling::{find_catcher, should_retry};
14use crate::io_processing::{apply_input_path, apply_output_path, apply_result_path};
15use crate::service::SharedServiceRegistry;
16use crate::state::{ExecutionStatus, HistoryEvent, SharedStepFunctionsState};
17
18/// Execute a state machine definition with the given input.
19/// Updates the execution record in shared state as it progresses.
20#[allow(clippy::too_many_arguments)]
21pub async fn execute_state_machine(
22    state: SharedStepFunctionsState,
23    execution_arn: String,
24    definition: String,
25    input: Option<String>,
26    delivery: Option<Arc<DeliveryBus>>,
27    dynamodb_state: Option<SharedDynamoDbState>,
28    registry: Option<SharedServiceRegistry>,
29    logging_configuration: Option<Value>,
30) {
31    let def: Value = match serde_json::from_str(&definition) {
32        Ok(v) => v,
33        Err(e) => {
34            fail_execution(
35                &state,
36                &execution_arn,
37                "States.Runtime",
38                &format!("Failed to parse definition: {e}"),
39            );
40            return;
41        }
42    };
43
44    let raw_input: Value = input
45        .as_deref()
46        .and_then(|s| serde_json::from_str(s).ok())
47        .unwrap_or(json!({}));
48
49    // Record ExecutionStarted event
50    add_event(
51        &state,
52        &execution_arn,
53        "ExecutionStarted",
54        0,
55        json!({
56            "input": serde_json::to_string(&raw_input).expect("serde_json::Value serialization is infallible"),
57            "roleArn": "arn:aws:iam::123456789012:role/execution-role"
58        }),
59    );
60
61    // Run the state machine inside an inner tokio::spawn so that any panic
62    // bubbles up as a JoinError instead of tearing down the caller. Without
63    // this the panic propagates through the outer spawn in `start_execution`
64    // which leaves the execution stuck in Running and leaks the panic to
65    // tokio's default hook.
66    let def_owned = def;
67    let state_clone = state.clone();
68    let execution_arn_clone = execution_arn.clone();
69    let delivery_clone = delivery.clone();
70    let dynamodb_state_clone = dynamodb_state.clone();
71    let registry_clone = registry.clone();
72    let handle = tokio::spawn(async move {
73        run_states(
74            &def_owned,
75            raw_input,
76            &delivery_clone,
77            &dynamodb_state_clone,
78            &registry_clone,
79            &state_clone,
80            &execution_arn_clone,
81        )
82        .await
83    });
84
85    match handle.await {
86        Ok(Ok(output)) => {
87            succeed_execution(&state, &execution_arn, &output);
88        }
89        Ok(Err((error, cause))) => {
90            fail_execution(&state, &execution_arn, &error, &cause);
91        }
92        Err(join_err) => {
93            let msg = if join_err.is_panic() {
94                let payload = join_err.into_panic();
95                if let Some(s) = payload.downcast_ref::<String>() {
96                    s.clone()
97                } else if let Some(s) = payload.downcast_ref::<&'static str>() {
98                    (*s).to_string()
99                } else {
100                    "execution task panicked".to_string()
101                }
102            } else {
103                format!("execution task cancelled: {join_err}")
104            };
105            tracing::error!(
106                execution_arn = %execution_arn,
107                panic = %msg,
108                "Step Functions execution panicked"
109            );
110            fail_execution(&state, &execution_arn, "States.Runtime", &msg);
111        }
112    }
113
114    // Deliver execution history to CloudWatch Logs when logging is configured.
115    deliver_execution_logs(
116        &state,
117        &execution_arn,
118        delivery.as_ref(),
119        logging_configuration.as_ref(),
120    );
121}
122
123type StatesResult<'a> = std::pin::Pin<
124    Box<dyn std::future::Future<Output = Result<Value, (String, String)>> + Send + 'a>,
125>;
126
127/// Result of executing a single state in the state machine.
128pub(crate) enum Advance {
129    /// Continue to the given state with the given input.
130    Next(String, Value),
131    /// Terminate the state machine with the given output.
132    End(Value),
133    /// Fail the state machine with the given error and cause.
134    Fail(String, String),
135}
136
137async fn run_wait_state(
138    name: &str,
139    state_def: &Value,
140    input: Value,
141    shared_state: &SharedStepFunctionsState,
142    execution_arn: &str,
143) -> Advance {
144    let entered_event_id = add_event(
145        shared_state,
146        execution_arn,
147        "WaitStateEntered",
148        0,
149        json!({
150            "name": name,
151            "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
152        }),
153    );
154
155    execute_wait_state(state_def, &input).await;
156
157    add_event(
158        shared_state,
159        execution_arn,
160        "WaitStateExited",
161        entered_event_id,
162        json!({
163            "name": name,
164            "output": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
165        }),
166    );
167
168    advance_from_next(state_def, input)
169}
170
171#[allow(clippy::too_many_arguments)]
172async fn run_task_state(
173    name: &str,
174    state_def: &Value,
175    input: Value,
176    delivery: &Option<Arc<DeliveryBus>>,
177    dynamodb_state: &Option<SharedDynamoDbState>,
178    registry: &Option<SharedServiceRegistry>,
179    shared_state: &SharedStepFunctionsState,
180    execution_arn: &str,
181) -> Advance {
182    let entered_event_id = add_event(
183        shared_state,
184        execution_arn,
185        "TaskStateEntered",
186        0,
187        json!({
188            "name": name,
189            "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
190        }),
191    );
192
193    let result = execute_task_state(
194        name,
195        state_def,
196        &input,
197        delivery,
198        dynamodb_state,
199        registry,
200        shared_state,
201        execution_arn,
202        entered_event_id,
203    )
204    .await;
205
206    match result {
207        Ok(output) => {
208            add_event(
209                shared_state,
210                execution_arn,
211                "TaskStateExited",
212                entered_event_id,
213                json!({
214                    "name": name,
215                    "output": serde_json::to_string(&output).expect("serde_json::Value serialization is infallible"),
216                }),
217            );
218            advance_from_next(state_def, output)
219        }
220        Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
221    }
222}
223
224#[allow(clippy::too_many_arguments)]
225async fn run_parallel_state(
226    name: &str,
227    state_def: &Value,
228    input: Value,
229    delivery: &Option<Arc<DeliveryBus>>,
230    dynamodb_state: &Option<SharedDynamoDbState>,
231    registry: &Option<SharedServiceRegistry>,
232    shared_state: &SharedStepFunctionsState,
233    execution_arn: &str,
234) -> Advance {
235    let entered_event_id = add_event(
236        shared_state,
237        execution_arn,
238        "ParallelStateEntered",
239        0,
240        json!({
241            "name": name,
242            "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
243        }),
244    );
245
246    let result = execute_parallel_state(
247        state_def,
248        &input,
249        delivery,
250        dynamodb_state,
251        registry,
252        shared_state,
253        execution_arn,
254    )
255    .await;
256
257    match result {
258        Ok(output) => {
259            add_event(
260                shared_state,
261                execution_arn,
262                "ParallelStateExited",
263                entered_event_id,
264                json!({
265                    "name": name,
266                    "output": serde_json::to_string(&output).expect("serde_json::Value serialization is infallible"),
267                }),
268            );
269            advance_from_next(state_def, output)
270        }
271        Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
272    }
273}
274
275#[allow(clippy::too_many_arguments)]
276async fn run_map_state(
277    name: &str,
278    state_def: &Value,
279    input: Value,
280    delivery: &Option<Arc<DeliveryBus>>,
281    dynamodb_state: &Option<SharedDynamoDbState>,
282    registry: &Option<SharedServiceRegistry>,
283    shared_state: &SharedStepFunctionsState,
284    execution_arn: &str,
285) -> Advance {
286    let entered_event_id = add_event(
287        shared_state,
288        execution_arn,
289        "MapStateEntered",
290        0,
291        json!({
292            "name": name,
293            "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
294        }),
295    );
296
297    let result = execute_map_state(
298        state_def,
299        &input,
300        delivery,
301        dynamodb_state,
302        registry,
303        shared_state,
304        execution_arn,
305    )
306    .await;
307
308    match result {
309        Ok(output) => {
310            add_event(
311                shared_state,
312                execution_arn,
313                "MapStateExited",
314                entered_event_id,
315                json!({
316                    "name": name,
317                    "output": serde_json::to_string(&output).expect("serde_json::Value serialization is infallible"),
318                }),
319            );
320            advance_from_next(state_def, output)
321        }
322        Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
323    }
324}
325
326/// Execute a Wait state: pause execution for a specified duration or until a timestamp.
327async fn execute_wait_state(state_def: &Value, input: &Value) {
328    if let Some(seconds) = state_def["Seconds"].as_u64() {
329        tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
330        return;
331    }
332
333    if let Some(path) = state_def["SecondsPath"].as_str() {
334        let val = crate::io_processing::resolve_path(input, path);
335        if let Some(seconds) = val.as_u64() {
336            tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
337        }
338        return;
339    }
340
341    if let Some(ts_str) = state_def["Timestamp"].as_str() {
342        if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
343            let now = Utc::now();
344            let target_utc = target.with_timezone(&chrono::Utc);
345            if target_utc > now {
346                let duration = (target_utc - now).to_std().unwrap_or_default();
347                tokio::time::sleep(duration).await;
348            }
349        }
350        return;
351    }
352
353    if let Some(path) = state_def["TimestampPath"].as_str() {
354        let val = crate::io_processing::resolve_path(input, path);
355        if let Some(ts_str) = val.as_str() {
356            if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
357                let now = Utc::now();
358                let target_utc = target.with_timezone(&chrono::Utc);
359                if target_utc > now {
360                    let duration = (target_utc - now).to_std().unwrap_or_default();
361                    tokio::time::sleep(duration).await;
362                }
363            }
364        }
365        return;
366    }
367
368    warn!(
369        "Wait state has no valid Seconds, SecondsPath, Timestamp, or TimestampPath — skipping wait"
370    );
371}
372
373/// Execute a Task state: invoke the resource (Lambda, SQS, SNS, EventBridge, DynamoDB),
374/// apply I/O processing, handle Retry.
375#[allow(clippy::too_many_arguments)]
376async fn execute_task_state(
377    name: &str,
378    state_def: &Value,
379    input: &Value,
380    delivery: &Option<Arc<DeliveryBus>>,
381    dynamodb_state: &Option<SharedDynamoDbState>,
382    registry: &Option<SharedServiceRegistry>,
383    shared_state: &SharedStepFunctionsState,
384    execution_arn: &str,
385    entered_event_id: i64,
386) -> Result<Value, (String, String)> {
387    let resource = state_def["Resource"].as_str().unwrap_or("").to_string();
388
389    let input_path = state_def["InputPath"].as_str();
390    let result_path = state_def["ResultPath"].as_str();
391    let output_path = state_def["OutputPath"].as_str();
392
393    let effective_input = if input_path == Some("null") {
394        json!({})
395    } else {
396        apply_input_path(input, input_path)
397    };
398
399    let retriers = state_def["Retry"].as_array().cloned().unwrap_or_default();
400    let timeout_seconds = state_def["TimeoutSeconds"].as_u64();
401    let heartbeat_seconds = state_def["HeartbeatSeconds"].as_u64();
402    let mut attempt = 0u32;
403
404    let is_wait_for_task_token = resource.contains(".waitForTaskToken");
405    let task_token = if is_wait_for_task_token {
406        let token = format!(
407            "FCToken-{}-{}",
408            chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0),
409            uuid::Uuid::new_v4().simple(),
410        );
411        let account_id = account_id_from_arn(execution_arn);
412        let context = json!({
413            "Task": { "Token": token.clone() },
414            "Execution": { "Id": execution_arn },
415            "State": { "Name": name },
416        });
417        {
418            let mut accounts = shared_state.write();
419            let state = accounts.get_or_create(account_id);
420            state.task_tokens.insert(
421                token.clone(),
422                crate::state::TaskTokenState {
423                    activity_arn: String::new(),
424                    status: "PENDING".to_string(),
425                    output: None,
426                    error: None,
427                    cause: None,
428                    input: None,
429                    created_at: chrono::Utc::now(),
430                    last_heartbeat_at: None,
431                    heartbeat_seconds: heartbeat_seconds.map(|s| s as i64),
432                    timeout_seconds: timeout_seconds.map(|s| s as i64),
433                },
434            );
435        }
436        Some((token, context))
437    } else {
438        None
439    };
440
441    let task_input = if let Some(params) = state_def.get("Parameters") {
442        if let Some((_, ctx)) = &task_token {
443            apply_parameters(params, &effective_input, Some(ctx))
444        } else {
445            apply_parameters(params, &effective_input, None)
446        }
447    } else {
448        effective_input
449    };
450
451    loop {
452        add_event(
453            shared_state,
454            execution_arn,
455            "TaskScheduled",
456            entered_event_id,
457            json!({
458                "resource": resource,
459                "region": "us-east-1",
460                "parameters": serde_json::to_string(&task_input).expect("serde_json::Value serialization is infallible"),
461            }),
462        );
463
464        add_event(
465            shared_state,
466            execution_arn,
467            "TaskStarted",
468            entered_event_id,
469            json!({ "resource": resource }),
470        );
471
472        let invoke_result = invoke_resource(
473            &resource,
474            &task_input,
475            delivery,
476            dynamodb_state,
477            registry,
478            execution_arn,
479            timeout_seconds,
480            heartbeat_seconds,
481            shared_state,
482        )
483        .await;
484
485        match invoke_result {
486            Ok(result) => {
487                if let Some((token, _)) = &task_token {
488                    let account_id = account_id_from_arn(execution_arn);
489                    match poll_task_token(
490                        shared_state,
491                        account_id,
492                        token,
493                        timeout_seconds,
494                        heartbeat_seconds,
495                    )
496                    .await
497                    {
498                        Ok(output) => {
499                            add_event(
500                                shared_state,
501                                execution_arn,
502                                "TaskSucceeded",
503                                entered_event_id,
504                                json!({
505                                    "resource": resource,
506                                    "output": serde_json::to_string(&output).expect("serde_json::Value serialization is infallible"),
507                                }),
508                            );
509
510                            let selected = if let Some(selector) = state_def.get("ResultSelector") {
511                                apply_parameters(selector, &output, None)
512                            } else {
513                                output
514                            };
515
516                            let after_result = if result_path == Some("null") {
517                                input.clone()
518                            } else {
519                                apply_result_path(input, &selected, result_path)
520                            };
521
522                            let output = if output_path == Some("null") {
523                                json!({})
524                            } else {
525                                apply_output_path(&after_result, output_path)
526                            };
527
528                            return Ok(output);
529                        }
530                        Err((error, cause)) => {
531                            add_event(
532                                shared_state,
533                                execution_arn,
534                                "TaskFailed",
535                                entered_event_id,
536                                json!({ "error": error, "cause": cause }),
537                            );
538
539                            if let Some(delay_ms) = should_retry(&retriers, &error, attempt) {
540                                attempt += 1;
541                                let actual_delay = delay_ms.min(5000);
542                                tokio::time::sleep(tokio::time::Duration::from_millis(
543                                    actual_delay,
544                                ))
545                                .await;
546                                continue;
547                            }
548
549                            return Err((error, cause));
550                        }
551                    }
552                }
553
554                add_event(
555                    shared_state,
556                    execution_arn,
557                    "TaskSucceeded",
558                    entered_event_id,
559                    json!({
560                        "resource": resource,
561                        "output": serde_json::to_string(&result).expect("serde_json::Value serialization is infallible"),
562                    }),
563                );
564
565                let selected = if let Some(selector) = state_def.get("ResultSelector") {
566                    apply_parameters(selector, &result, None)
567                } else {
568                    result
569                };
570
571                let after_result = if result_path == Some("null") {
572                    input.clone()
573                } else {
574                    apply_result_path(input, &selected, result_path)
575                };
576
577                let output = if output_path == Some("null") {
578                    json!({})
579                } else {
580                    apply_output_path(&after_result, output_path)
581                };
582
583                return Ok(output);
584            }
585            Err((error, cause)) => {
586                add_event(
587                    shared_state,
588                    execution_arn,
589                    "TaskFailed",
590                    entered_event_id,
591                    json!({ "error": error, "cause": cause }),
592                );
593
594                if let Some(delay_ms) = should_retry(&retriers, &error, attempt) {
595                    attempt += 1;
596                    let actual_delay = delay_ms.min(5000);
597                    tokio::time::sleep(tokio::time::Duration::from_millis(actual_delay)).await;
598                    continue;
599                }
600
601                return Err((error, cause));
602            }
603        }
604    }
605}
606
607/// Execute a Parallel state: run all branches concurrently, collect results into an array.
608async fn execute_parallel_state(
609    state_def: &Value,
610    input: &Value,
611    delivery: &Option<Arc<DeliveryBus>>,
612    dynamodb_state: &Option<SharedDynamoDbState>,
613    registry: &Option<SharedServiceRegistry>,
614    shared_state: &SharedStepFunctionsState,
615    execution_arn: &str,
616) -> Result<Value, (String, String)> {
617    let input_path = state_def["InputPath"].as_str();
618    let result_path = state_def["ResultPath"].as_str();
619    let output_path = state_def["OutputPath"].as_str();
620
621    let effective_input = if input_path == Some("null") {
622        json!({})
623    } else {
624        apply_input_path(input, input_path)
625    };
626
627    let branches = state_def["Branches"]
628        .as_array()
629        .cloned()
630        .unwrap_or_default();
631
632    if branches.is_empty() {
633        return Err((
634            "States.Runtime".to_string(),
635            "Parallel state has no Branches".to_string(),
636        ));
637    }
638
639    // Spawn all branches concurrently
640    let mut handles = Vec::new();
641    for branch_def in &branches {
642        let branch = branch_def.clone();
643        let branch_input = effective_input.clone();
644        let delivery = delivery.clone();
645        let ddb = dynamodb_state.clone();
646        let reg = registry.clone();
647        let state = shared_state.clone();
648        let arn = execution_arn.to_string();
649
650        handles.push(tokio::spawn(async move {
651            run_states(&branch, branch_input, &delivery, &ddb, &reg, &state, &arn).await
652        }));
653    }
654
655    // Collect results in order
656    let mut results = Vec::with_capacity(handles.len());
657    for handle in handles {
658        let result = handle.await.map_err(|e| {
659            (
660                "States.Runtime".to_string(),
661                format!("Branch execution panicked: {e}"),
662            )
663        })??;
664        results.push(result);
665    }
666
667    let branch_output = Value::Array(results);
668
669    // Apply ResultSelector if present
670    let selected = if let Some(selector) = state_def.get("ResultSelector") {
671        apply_parameters(selector, &branch_output, None)
672    } else {
673        branch_output
674    };
675
676    // Apply ResultPath
677    let after_result = if result_path == Some("null") {
678        input.clone()
679    } else {
680        apply_result_path(input, &selected, result_path)
681    };
682
683    // Apply OutputPath
684    let output = if output_path == Some("null") {
685        json!({})
686    } else {
687        apply_output_path(&after_result, output_path)
688    };
689
690    Ok(output)
691}
692
693/// Execute a Map state: iterate over an array and run a sub-state machine per item.
694/// Supports distributed mode: ItemReader (S3), ItemBatcher, ResultWriter (S3),
695/// ToleratedFailurePercentage, and MaxConcurrencyPath.
696#[allow(clippy::too_many_arguments)]
697async fn execute_map_state(
698    state_def: &Value,
699    input: &Value,
700    delivery: &Option<Arc<DeliveryBus>>,
701    dynamodb_state: &Option<SharedDynamoDbState>,
702    registry: &Option<SharedServiceRegistry>,
703    shared_state: &SharedStepFunctionsState,
704    execution_arn: &str,
705) -> Result<Value, (String, String)> {
706    let input_path = state_def["InputPath"].as_str();
707    let result_path = state_def["ResultPath"].as_str();
708    let output_path = state_def["OutputPath"].as_str();
709
710    let effective_input = if input_path == Some("null") {
711        json!({})
712    } else {
713        apply_input_path(input, input_path)
714    };
715
716    // Resolve MaxConcurrencyPath if present
717    let max_concurrency = if let Some(path) = state_def["MaxConcurrencyPath"].as_str() {
718        crate::io_processing::resolve_path(&effective_input, path)
719            .as_u64()
720            .unwrap_or(0)
721    } else {
722        state_def["MaxConcurrency"].as_u64().unwrap_or(0)
723    };
724    let effective_concurrency = if max_concurrency == 0 {
725        40
726    } else {
727        max_concurrency as usize
728    };
729
730    // Read items from ItemReader (S3) or ItemsPath
731    let items = if let Some(item_reader) = state_def.get("ItemReader") {
732        read_items_from_s3(item_reader, registry, execution_arn).await?
733    } else {
734        let items_path = state_def["ItemsPath"].as_str().unwrap_or("$");
735        let items_value = crate::io_processing::resolve_path(&effective_input, items_path);
736        items_value.as_array().cloned().unwrap_or_default()
737    };
738
739    // Apply ItemBatcher if present
740    let batch_config = state_def.get("ItemBatcher").cloned();
741    let batched_items = if let Some(ref batcher) = batch_config {
742        apply_item_batcher(&items, batcher, &effective_input)
743    } else {
744        items
745    };
746
747    // Get the iterator definition (ItemProcessor or Iterator for backwards compat)
748    let iterator_def = state_def
749        .get("ItemProcessor")
750        .or_else(|| state_def.get("Iterator"))
751        .cloned()
752        .ok_or_else(|| {
753            (
754                "States.Runtime".to_string(),
755                "Map state has no ItemProcessor or Iterator".to_string(),
756            )
757        })?;
758
759    let tolerated_failure_percentage = state_def["ToleratedFailurePercentage"]
760        .as_f64()
761        .unwrap_or(0.0);
762    let total_items = batched_items.len() as f64;
763    let mut failure_count = 0usize;
764
765    let semaphore = Arc::new(tokio::sync::Semaphore::new(effective_concurrency));
766
767    // Process all items
768    let mut handles = Vec::new();
769    for (index, batch_item) in batched_items.into_iter().enumerate() {
770        let iter_def = iterator_def.clone();
771        let delivery = delivery.clone();
772        let ddb = dynamodb_state.clone();
773        let reg = registry.clone();
774        let state = shared_state.clone();
775        let arn = execution_arn.to_string();
776        let sem = semaphore.clone();
777
778        // Apply ItemSelector if present
779        let item_input = if let Some(selector) = state_def.get("ItemSelector") {
780            let mut ctx = serde_json::Map::new();
781            ctx.insert("value".to_string(), batch_item.clone());
782            ctx.insert("index".to_string(), json!(index));
783            apply_parameters(selector, &Value::Object(ctx), None)
784        } else {
785            batch_item
786        };
787
788        add_event(
789            shared_state,
790            execution_arn,
791            "MapIterationStarted",
792            0,
793            json!({ "index": index }),
794        );
795
796        handles.push(tokio::spawn(async move {
797            let _permit = sem
798                .acquire()
799                .await
800                .map_err(|_| ("States.Runtime".to_string(), "Semaphore closed".to_string()))?;
801            let result =
802                run_states(&iter_def, item_input, &delivery, &ddb, &reg, &state, &arn).await;
803            Ok::<(usize, Result<Value, (String, String)>), (String, String)>((index, result))
804        }));
805    }
806
807    // Collect results in order
808    let mut results: Vec<(usize, Value)> = Vec::with_capacity(handles.len());
809    for handle in handles {
810        let (index, result) = handle.await.map_err(|e| {
811            (
812                "States.Runtime".to_string(),
813                format!("Map iteration panicked: {e}"),
814            )
815        })??;
816
817        match result {
818            Ok(output) => {
819                add_event(
820                    shared_state,
821                    execution_arn,
822                    "MapIterationSucceeded",
823                    0,
824                    json!({ "index": index }),
825                );
826                results.push((index, output));
827            }
828            Err((error, cause)) => {
829                add_event(
830                    shared_state,
831                    execution_arn,
832                    "MapIterationFailed",
833                    0,
834                    json!({ "index": index, "error": error }),
835                );
836                failure_count += 1;
837                let failure_percentage = (failure_count as f64 / total_items) * 100.0;
838                if failure_percentage > tolerated_failure_percentage {
839                    return Err((error, cause));
840                }
841            }
842        }
843    }
844
845    // Sort by index to maintain order
846    results.sort_by_key(|(i, _)| *i);
847    let map_output = Value::Array(results.into_iter().map(|(_, v)| v).collect());
848
849    // Write results to S3 if ResultWriter is configured
850    if let Some(result_writer) = state_def.get("ResultWriter") {
851        write_map_results_to_s3(result_writer, registry, execution_arn, &map_output).await?;
852    }
853
854    // Apply ResultSelector if present
855    let selected = if let Some(selector) = state_def.get("ResultSelector") {
856        apply_parameters(selector, &map_output, None)
857    } else {
858        map_output
859    };
860
861    // Apply ResultPath
862    let after_result = if result_path == Some("null") {
863        input.clone()
864    } else {
865        apply_result_path(input, &selected, result_path)
866    };
867
868    // Apply OutputPath
869    let output = if output_path == Some("null") {
870        json!({})
871    } else {
872        apply_output_path(&after_result, output_path)
873    };
874
875    Ok(output)
876}
877
878/// Read items from S3 for distributed Map mode ItemReader.
879async fn read_items_from_s3(
880    item_reader: &Value,
881    registry: &Option<SharedServiceRegistry>,
882    execution_arn: &str,
883) -> Result<Vec<Value>, (String, String)> {
884    let resource = item_reader["Resource"]
885        .as_str()
886        .unwrap_or("arn:aws:states:::s3:getObject");
887    if !resource.contains("s3:getObject") {
888        return Err((
889            "States.Runtime".to_string(),
890            format!("ItemReader unsupported resource: {resource}"),
891        ));
892    }
893
894    let params = item_reader
895        .get("Parameters")
896        .cloned()
897        .unwrap_or_else(|| json!({}));
898    let bucket = params["Bucket"].as_str().ok_or_else(|| {
899        (
900            "States.Runtime".to_string(),
901            "ItemReader missing Bucket".to_string(),
902        )
903    })?;
904    let key = params["Key"].as_str().ok_or_else(|| {
905        (
906            "States.Runtime".to_string(),
907            "ItemReader missing Key".to_string(),
908        )
909    })?;
910
911    let registry_arc = resolve_registry(registry)?;
912    let account_id = account_from_execution_arn(execution_arn);
913
914    let body = call_sdk_action_raw_bytes(
915        &registry_arc,
916        "s3",
917        "GetObject",
918        &json!({ "Bucket": bucket, "Key": key }),
919        &account_id,
920    )
921    .await?;
922
923    // Parse JSON array from the S3 object body
924    let parsed: Value = serde_json::from_slice(&body).map_err(|e| {
925        (
926            "States.Runtime".to_string(),
927            format!("ItemReader failed to parse S3 object as JSON: {e}"),
928        )
929    })?;
930
931    parsed.as_array().cloned().ok_or_else(|| {
932        (
933            "States.Runtime".to_string(),
934            "ItemReader S3 object is not a JSON array".to_string(),
935        )
936    })
937}
938
939/// Write Map results to S3 for distributed Map mode ResultWriter.
940async fn write_map_results_to_s3(
941    result_writer: &Value,
942    registry: &Option<SharedServiceRegistry>,
943    execution_arn: &str,
944    results: &Value,
945) -> Result<(), (String, String)> {
946    let resource = result_writer["Resource"]
947        .as_str()
948        .unwrap_or("arn:aws:states:::s3:putObject");
949    if !resource.contains("s3:putObject") {
950        return Err((
951            "States.Runtime".to_string(),
952            format!("ResultWriter unsupported resource: {resource}"),
953        ));
954    }
955
956    let params = result_writer
957        .get("Parameters")
958        .cloned()
959        .unwrap_or_else(|| json!({}));
960    let bucket = params["Bucket"].as_str().ok_or_else(|| {
961        (
962            "States.Runtime".to_string(),
963            "ResultWriter missing Bucket".to_string(),
964        )
965    })?;
966    let prefix = params["Prefix"].as_str().unwrap_or("map-results/");
967
968    let registry_arc = resolve_registry(registry)?;
969    let account_id = account_from_execution_arn(execution_arn);
970
971    use bytes::Bytes;
972    let body = Bytes::from(
973        serde_json::to_vec(results).expect("serde_json::Value serialization is infallible"),
974    );
975
976    // Build a raw S3 PutObject request with the JSON body
977    use fakecloud_core::service::AwsRequest;
978    use http::{HeaderMap, Method};
979    let service = registry_arc.get("s3").ok_or_else(|| {
980        (
981            "States.TaskFailed".to_string(),
982            "S3 service not available for ResultWriter".to_string(),
983        )
984    })?;
985
986    let req = AwsRequest {
987        service: "s3".to_string(),
988        action: "PutObject".to_string(),
989        region: "us-east-1".to_string(),
990        account_id: account_id.to_string(),
991        request_id: uuid::Uuid::new_v4().to_string(),
992        headers: HeaderMap::new(),
993        query_params: std::collections::HashMap::new(),
994        body,
995        body_stream: parking_lot::Mutex::new(None),
996        path_segments: vec![bucket.to_string(), format!("{prefix}result.json")],
997        raw_path: format!("/{bucket}/{prefix}result.json"),
998        raw_query: String::new(),
999        method: Method::PUT,
1000        is_query_protocol: false,
1001        access_key_id: None,
1002        principal: None,
1003    };
1004
1005    service.handle(req).await.map_err(|err| {
1006        let code = err.code().to_string();
1007        let msg = err.message();
1008        (
1009            format!("S3.{code}"),
1010            format!("ResultWriter PutObject failed: {msg}"),
1011        )
1012    })?;
1013
1014    Ok(())
1015}
1016
1017/// Apply ItemBatcher configuration to group items into batches.
1018fn apply_item_batcher(items: &[Value], batcher: &Value, _effective_input: &Value) -> Vec<Value> {
1019    let max_per_batch = batcher["MaxItemsPerBatch"].as_u64().unwrap_or(u64::MAX) as usize;
1020    let max_bytes = batcher["MaxInputBytesPerBatch"].as_u64().unwrap_or(0) as usize;
1021    let batch_input = batcher.get("BatchInput").cloned();
1022
1023    let mut batches: Vec<Vec<Value>> = Vec::new();
1024    let mut current_batch: Vec<Value> = Vec::new();
1025    let mut current_bytes = 0usize;
1026
1027    for item in items.iter().cloned() {
1028        let item_bytes = serde_json::to_vec(&item).unwrap_or_default().len();
1029        if !current_batch.is_empty()
1030            && (current_batch.len() >= max_per_batch
1031                || (max_bytes > 0 && current_bytes + item_bytes > max_bytes))
1032        {
1033            batches.push(current_batch);
1034            current_batch = Vec::new();
1035            current_bytes = 0;
1036        }
1037        current_bytes += item_bytes;
1038        current_batch.push(item);
1039    }
1040    if !current_batch.is_empty() {
1041        batches.push(current_batch);
1042    }
1043
1044    batches
1045        .into_iter()
1046        .enumerate()
1047        .map(|(index, batch)| {
1048            let mut map = serde_json::Map::new();
1049            map.insert("index".to_string(), json!(index));
1050            map.insert("items".to_string(), Value::Array(batch));
1051            if let Some(Value::Object(ref obj)) = batch_input {
1052                for (k, v) in obj {
1053                    map.insert(k.clone(), v.clone());
1054                }
1055            }
1056            Value::Object(map)
1057        })
1058        .collect()
1059}
1060
1061/// Invoke a resource (Lambda function or SDK integration).
1062#[allow(clippy::too_many_arguments)]
1063async fn invoke_resource(
1064    resource: &str,
1065    input: &Value,
1066    delivery: &Option<Arc<DeliveryBus>>,
1067    dynamodb_state: &Option<SharedDynamoDbState>,
1068    registry: &Option<SharedServiceRegistry>,
1069    execution_arn: &str,
1070    timeout_seconds: Option<u64>,
1071    heartbeat_seconds: Option<u64>,
1072    shared_state: &SharedStepFunctionsState,
1073) -> Result<Value, (String, String)> {
1074    // Direct activity ARN: arn:aws:states:<region>:<account>:activity:<name>
1075    if resource.contains(":states:") && resource.contains(":activity:") {
1076        return invoke_activity(
1077            resource,
1078            input,
1079            shared_state,
1080            timeout_seconds,
1081            heartbeat_seconds,
1082        )
1083        .await;
1084    }
1085
1086    // Direct Lambda ARN: arn:aws:lambda:<region>:<account>:function:<name>
1087    if resource.contains(":lambda:") && resource.contains(":function:") {
1088        return invoke_lambda_direct(resource, input, delivery, timeout_seconds).await;
1089    }
1090
1091    // SDK integration patterns: arn:aws:states:::<service>:<action>
1092    if resource.starts_with("arn:aws:states:::lambda:invoke") {
1093        let function_name = input["FunctionName"].as_str().unwrap_or("");
1094        let payload = if let Some(p) = input.get("Payload") {
1095            p.clone()
1096        } else {
1097            input.clone()
1098        };
1099        // The optimized `lambda:invoke` integration returns the AWS Invoke API
1100        // response envelope, not the bare function output — `ResultSelector`
1101        // expressions like `{"value.$": "$.Payload"}` depend on it. Wrap only
1102        // the successful result; error results (function errors and transport
1103        // failures) pass through unchanged, since a failed lambda:invoke task
1104        // has no result envelope on AWS. The direct-ARN path above
1105        // intentionally stays unwrapped — there real AWS returns the bare
1106        // payload. `SdkHttpMetadata`/`SdkResponseMetadata` are omitted; real
1107        // templates select `Payload`/`StatusCode`.
1108        return invoke_lambda_direct(function_name, &payload, delivery, timeout_seconds)
1109            .await
1110            .map(|payload| {
1111                json!({
1112                    "ExecutedVersion": "$LATEST",
1113                    "Payload": payload,
1114                    "StatusCode": 200,
1115                })
1116            });
1117    }
1118
1119    if resource.starts_with("arn:aws:states:::sqs:sendMessage") {
1120        return invoke_sqs_send_message(input, delivery);
1121    }
1122
1123    if resource.starts_with("arn:aws:states:::sns:publish") {
1124        return invoke_sns_publish(input, delivery);
1125    }
1126
1127    if resource.starts_with("arn:aws:states:::events:putEvents") {
1128        return invoke_eventbridge_put_events(input, delivery);
1129    }
1130
1131    if resource.starts_with("arn:aws:states:::dynamodb:getItem") {
1132        return invoke_dynamodb_get_item(input, dynamodb_state);
1133    }
1134
1135    if resource.starts_with("arn:aws:states:::dynamodb:putItem") {
1136        return invoke_dynamodb_put_item(input, dynamodb_state);
1137    }
1138
1139    if resource.starts_with("arn:aws:states:::dynamodb:deleteItem") {
1140        return invoke_dynamodb_delete_item(input, dynamodb_state);
1141    }
1142
1143    if resource.starts_with("arn:aws:states:::dynamodb:updateItem") {
1144        return invoke_dynamodb_update_item(input, dynamodb_state);
1145    }
1146
1147    // Nested Step Functions execution:
1148    //   arn:aws:states:::states:startExecution[.sync|.waitForTaskToken]
1149    // Routes through the same registry path as the generic aws-sdk
1150    // integration so the inner execution sees a real `StartExecution`
1151    // call. `.sync` polls `DescribeExecution` until the inner execution
1152    // reaches a terminal state, mirroring AWS' behavior of bubbling the
1153    // inner Output back to the parent.
1154    if let Some(tail) = resource.strip_prefix("arn:aws:states:::") {
1155        if tail.starts_with("states:startExecution") {
1156            let account_id = account_from_execution_arn(execution_arn);
1157            let result =
1158                invoke_aws_sdk_integration(tail, input, registry, &account_id, timeout_seconds)
1159                    .await;
1160            // Patch the inner execution's parent_execution_arn so the
1161            // `/execution-tree` introspection can walk back-references.
1162            // For `.sync`, the result is the DescribeExecution shape
1163            // (has `executionArn`); for fire-and-forget StartExecution the
1164            // initial StartExecution response also carries `executionArn`.
1165            if let Ok(ref value) = result {
1166                if let Some(inner_arn) = value
1167                    .get("executionArn")
1168                    .or_else(|| value.get("ExecutionArn"))
1169                    .and_then(Value::as_str)
1170                {
1171                    let mut accounts = shared_state.write();
1172                    if let Some(state) = accounts.get_mut(&account_id) {
1173                        if let Some(exec) = state.executions.get_mut(inner_arn) {
1174                            exec.parent_execution_arn = Some(execution_arn.to_string());
1175                        }
1176                    }
1177                }
1178            }
1179            return result;
1180        }
1181    }
1182
1183    // Generic AWS SDK integration: arn:aws:states:::aws-sdk:<service>:<action>[.<wait>]
1184    // Routes the call to the registered service via the central
1185    // ServiceRegistry, passing the Task's `Parameters` block as the
1186    // request body. Mirrors the AWS SDK service integration pattern in
1187    // real Step Functions.
1188    if let Some(rest) = resource.strip_prefix("arn:aws:states:::aws-sdk:") {
1189        let account_id = account_from_execution_arn(execution_arn);
1190        return invoke_aws_sdk_integration(rest, input, registry, &account_id, timeout_seconds)
1191            .await;
1192    }
1193
1194    // Optimized service integrations expose `.sync` variants for ECS,
1195    // Athena, and Glue. Route them through the same waiter machinery as
1196    // `aws-sdk:` so callers can write the AWS-blessed ARN forms.
1197    if let Some(tail) = resource.strip_prefix("arn:aws:states:::") {
1198        if tail.contains(".sync") {
1199            let account_id = account_from_execution_arn(execution_arn);
1200            return invoke_aws_sdk_integration(tail, input, registry, &account_id, timeout_seconds)
1201                .await;
1202        }
1203    }
1204
1205    Err((
1206        "States.TaskFailed".to_string(),
1207        format!("Unsupported resource: {resource}"),
1208    ))
1209}
1210
1211/// Convert an SDK integration action (camelCase, e.g. `getItem`) to its
1212/// PascalCase wire form (`GetItem`). Step Functions Task ARNs use
1213/// camelCase, but the underlying AWS service handlers expect PascalCase
1214/// `X-Amz-Target`-style action names.
1215fn camel_to_pascal(action: &str) -> String {
1216    let mut chars = action.chars();
1217    match chars.next() {
1218        None => String::new(),
1219        Some(first) => first.to_ascii_uppercase().to_string() + chars.as_str(),
1220    }
1221}
1222
1223/// Map a Step Functions SDK integration service id to the corresponding
1224/// fakecloud `service_name()`. Most match 1:1, but a handful of AWS SDK
1225/// service ids differ from the SigV4 service identifier we register
1226/// services under (e.g. `sfn` -> `states`, `cloudwatchlogs` -> `logs`).
1227fn map_sdk_service_id(service_id: &str) -> &str {
1228    match service_id {
1229        "sfn" => "states",
1230        "cloudwatchlogs" => "logs",
1231        // Default: pass through unchanged.
1232        other => other,
1233    }
1234}
1235
1236/// Extract the AWS account id from a Step Functions execution ARN
1237/// (`arn:aws:states:<region>:<account>:execution:...`). Falls back to
1238/// the AWS-conventional fixture id if the ARN is malformed.
1239fn account_from_execution_arn(execution_arn: &str) -> String {
1240    execution_arn
1241        .split(':')
1242        .nth(4)
1243        .filter(|s| !s.is_empty())
1244        .unwrap_or("123456789012")
1245        .to_string()
1246}
1247
1248/// Dispatch a Step Functions `aws-sdk:<service>:<action>` Task to the
1249/// registered service via the central [`fakecloud_core::registry::ServiceRegistry`].
1250/// `tail` is the trailing portion of the resource ARN after the
1251/// `aws-sdk:` prefix (e.g. `dynamodb:getItem` or `sqs:sendMessage.waitForTaskToken`).
1252async fn invoke_aws_sdk_integration(
1253    tail: &str,
1254    input: &Value,
1255    registry: &Option<SharedServiceRegistry>,
1256    account_id: &str,
1257    timeout_seconds: Option<u64>,
1258) -> Result<Value, (String, String)> {
1259    let registry_arc = resolve_registry(registry)?;
1260
1261    // Split `<service>:<action>[.<modifier>]`. The `.waitForTaskToken`
1262    // modifier is accepted but ignored — the integration runs synchronously
1263    // regardless. The `.sync` modifier triggers a polling loop after the
1264    // initial call to wait for the downstream operation to reach a terminal
1265    // state.
1266    let mut parts = tail.splitn(2, ':');
1267    let service_id = parts.next().filter(|s| !s.is_empty()).ok_or_else(|| {
1268        (
1269            "States.TaskFailed".to_string(),
1270            format!("Invalid aws-sdk Resource ARN: missing service in '{tail}'"),
1271        )
1272    })?;
1273    let action_with_mod = parts.next().filter(|s| !s.is_empty()).ok_or_else(|| {
1274        (
1275            "States.TaskFailed".to_string(),
1276            format!("Invalid aws-sdk Resource ARN: missing action in '{tail}'"),
1277        )
1278    })?;
1279    let action_camel = action_with_mod
1280        .split('.')
1281        .next()
1282        .filter(|s| !s.is_empty())
1283        .ok_or_else(|| {
1284            (
1285                "States.TaskFailed".to_string(),
1286                format!("Invalid aws-sdk Resource ARN: empty action in '{tail}'"),
1287            )
1288        })?;
1289    let modifiers: Vec<&str> = action_with_mod.split('.').skip(1).collect();
1290    let is_sync = modifiers.iter().any(|m| *m == "sync" || *m == "sync:2");
1291
1292    let action_pascal = camel_to_pascal(action_camel);
1293    let service_name = map_sdk_service_id(service_id).to_string();
1294
1295    // ECS's wire format is camelCase, but Step Functions's optimized
1296    // integration takes PascalCase parameter names in the state machine
1297    // definition. Translate top-level keys for the ECS service so AWS
1298    // Step Functions definitions Just Work against the fake.
1299    let translated_input = match service_name.as_str() {
1300        "ecs" => translate_ecs_keys_to_camel(input),
1301        _ => input.clone(),
1302    };
1303
1304    let initial = call_sdk_action(
1305        &registry_arc,
1306        &service_name,
1307        &action_pascal,
1308        &translated_input,
1309        account_id,
1310    )
1311    .await?;
1312
1313    if !is_sync {
1314        return Ok(initial);
1315    }
1316
1317    // `.sync` pattern: dispatch to the per-service waiter that polls until
1318    // the downstream operation reaches a terminal state. Returns the full
1319    // describe-shape result, or surfaces a terminal failure as
1320    // `States.TaskFailed`.
1321    sync_wait(
1322        &registry_arc,
1323        &service_name,
1324        &action_pascal,
1325        &initial,
1326        &translated_input,
1327        account_id,
1328        timeout_seconds,
1329    )
1330    .await
1331}
1332
1333/// Translate the top-level PascalCase keys that AWS Step Functions
1334/// state machines use for `ecs:runTask` Parameters into the camelCase
1335/// shape that the AWS ECS API (and our handler) expects. Unknown keys
1336/// pass through unchanged so callers can still send raw camelCase. The
1337/// translation is shallow on purpose — nested overrides keep their
1338/// existing camelCase shape on real AWS too.
1339fn translate_ecs_keys_to_camel(input: &Value) -> Value {
1340    let Some(obj) = input.as_object() else {
1341        return input.clone();
1342    };
1343    let mut out = serde_json::Map::with_capacity(obj.len());
1344    for (k, v) in obj.iter() {
1345        let camel = match k.as_str() {
1346            "Cluster" => "cluster",
1347            "TaskDefinition" => "taskDefinition",
1348            "LaunchType" => "launchType",
1349            "Group" => "group",
1350            "Overrides" => "overrides",
1351            "PlatformVersion" => "platformVersion",
1352            "NetworkConfiguration" => "networkConfiguration",
1353            "Tags" => "tags",
1354            "EnableExecuteCommand" => "enableExecuteCommand",
1355            "PropagateTags" => "propagateTags",
1356            "ReferenceId" => "referenceId",
1357            "StartedBy" => "startedBy",
1358            "Count" => "count",
1359            "CapacityProviderStrategy" => "capacityProviderStrategy",
1360            "PlacementConstraints" => "placementConstraints",
1361            "PlacementStrategy" => "placementStrategy",
1362            other => other,
1363        };
1364        out.insert(camel.to_string(), v.clone());
1365    }
1366    Value::Object(out)
1367}
1368
1369fn resolve_registry(
1370    registry: &Option<SharedServiceRegistry>,
1371) -> Result<Arc<fakecloud_core::registry::ServiceRegistry>, (String, String)> {
1372    let registry_handle = registry.as_ref().ok_or_else(|| {
1373        (
1374            "States.TaskFailed".to_string(),
1375            "No service registry configured for aws-sdk integration".to_string(),
1376        )
1377    })?;
1378    registry_handle.get().cloned().ok_or_else(|| {
1379        (
1380            "States.TaskFailed".to_string(),
1381            "Service registry not yet initialised; aws-sdk integration unavailable".to_string(),
1382        )
1383    })
1384}
1385
1386/// Call a single AWS SDK action against the registered service handler.
1387async fn call_sdk_action(
1388    registry: &Arc<fakecloud_core::registry::ServiceRegistry>,
1389    service_name: &str,
1390    action_pascal: &str,
1391    input: &Value,
1392    account_id: &str,
1393) -> Result<Value, (String, String)> {
1394    use bytes::Bytes;
1395    use fakecloud_core::service::AwsRequest;
1396    use http::{HeaderMap, Method};
1397
1398    let service = registry.get(service_name).ok_or_else(|| {
1399        (
1400            "States.TaskFailed".to_string(),
1401            format!("Unknown aws-sdk service '{service_name}'"),
1402        )
1403    })?;
1404
1405    let body_bytes = Bytes::from(
1406        serde_json::to_vec(input).expect("serde_json::Value serialization is infallible"),
1407    );
1408
1409    let req = AwsRequest {
1410        service: service_name.to_string(),
1411        action: action_pascal.to_string(),
1412        region: "us-east-1".to_string(),
1413        account_id: account_id.to_string(),
1414        request_id: uuid::Uuid::new_v4().to_string(),
1415        headers: HeaderMap::new(),
1416        query_params: std::collections::HashMap::new(),
1417        body: body_bytes,
1418        body_stream: parking_lot::Mutex::new(None),
1419        path_segments: vec![],
1420        raw_path: "/".to_string(),
1421        raw_query: String::new(),
1422        method: Method::POST,
1423        is_query_protocol: false,
1424        access_key_id: None,
1425        principal: None,
1426    };
1427
1428    let response = service.handle(req).await.map_err(|err| {
1429        let code = err.code().to_string();
1430        let msg = err.message();
1431        let prefix_service = match service_name {
1432            "dynamodb" => "DynamoDb".to_string(),
1433            "states" => "Sfn".to_string(),
1434            other => camel_to_pascal(other),
1435        };
1436        (
1437            format!("{prefix_service}.{code}"),
1438            format!("{action_pascal} failed: {msg}"),
1439        )
1440    })?;
1441
1442    let response_bytes = match response.body {
1443        fakecloud_core::service::ResponseBody::Bytes(b) => b,
1444        fakecloud_core::service::ResponseBody::File { .. } => {
1445            return Err((
1446                "States.TaskFailed".to_string(),
1447                "aws-sdk integration: file-backed response not supported".to_string(),
1448            ));
1449        }
1450    };
1451
1452    if response_bytes.is_empty() {
1453        return Ok(json!({}));
1454    }
1455    serde_json::from_slice(&response_bytes).map_err(|e| {
1456        (
1457            "States.TaskFailed".to_string(),
1458            format!("aws-sdk integration: failed to parse response JSON: {e}"),
1459        )
1460    })
1461}
1462
1463/// Call an SDK action and return raw bytes without JSON parsing.
1464/// Used by distributed Map mode to read/write S3 objects.
1465async fn call_sdk_action_raw_bytes(
1466    registry: &Arc<fakecloud_core::registry::ServiceRegistry>,
1467    service_name: &str,
1468    action_pascal: &str,
1469    input: &Value,
1470    account_id: &str,
1471) -> Result<bytes::Bytes, (String, String)> {
1472    use bytes::Bytes;
1473    use fakecloud_core::service::AwsRequest;
1474    use http::{HeaderMap, Method};
1475
1476    let service = registry.get(service_name).ok_or_else(|| {
1477        (
1478            "States.TaskFailed".to_string(),
1479            format!("Unknown aws-sdk service '{service_name}'"),
1480        )
1481    })?;
1482
1483    let body_bytes = Bytes::from(
1484        serde_json::to_vec(input).expect("serde_json::Value serialization is infallible"),
1485    );
1486
1487    let req = AwsRequest {
1488        service: service_name.to_string(),
1489        action: action_pascal.to_string(),
1490        region: "us-east-1".to_string(),
1491        account_id: account_id.to_string(),
1492        request_id: uuid::Uuid::new_v4().to_string(),
1493        headers: HeaderMap::new(),
1494        query_params: std::collections::HashMap::new(),
1495        body: body_bytes,
1496        body_stream: parking_lot::Mutex::new(None),
1497        path_segments: vec![],
1498        raw_path: "/".to_string(),
1499        raw_query: String::new(),
1500        method: Method::POST,
1501        is_query_protocol: false,
1502        access_key_id: None,
1503        principal: None,
1504    };
1505
1506    let response = service.handle(req).await.map_err(|err| {
1507        let code = err.code().to_string();
1508        let msg = err.message();
1509        let prefix_service = match service_name {
1510            "dynamodb" => "DynamoDb".to_string(),
1511            "states" => "Sfn".to_string(),
1512            other => camel_to_pascal(other),
1513        };
1514        (
1515            format!("{prefix_service}.{code}"),
1516            format!("{action_pascal} failed: {msg}"),
1517        )
1518    })?;
1519
1520    match response.body {
1521        fakecloud_core::service::ResponseBody::Bytes(b) => Ok(b),
1522        fakecloud_core::service::ResponseBody::File { .. } => Err((
1523            "States.TaskFailed".to_string(),
1524            "aws-sdk integration: file-backed response not supported".to_string(),
1525        )),
1526    }
1527}
1528
1529/// Cap on `.sync` polling so a stuck downstream task can't hang an
1530/// execution forever. Mirrors the `TimeoutSeconds` knob on the Task state
1531/// when one is set; otherwise defaults to 5 minutes which is enough for
1532/// the in-process services that ship today (Athena returns synchronously,
1533/// ECS tasks finish within seconds even when docker-less).
1534const SYNC_DEFAULT_TIMEOUT_SECS: u64 = 300;
1535const SYNC_POLL_INTERVAL_MS: u64 = 200;
1536
1537/// Dispatch `.sync` waiters by service+action. Each waiter polls the
1538/// matching describe-style API until the downstream operation reaches a
1539/// terminal state, then returns the full describe response.
1540async fn sync_wait(
1541    registry: &Arc<fakecloud_core::registry::ServiceRegistry>,
1542    service_name: &str,
1543    action_pascal: &str,
1544    initial: &Value,
1545    input: &Value,
1546    account_id: &str,
1547    timeout_seconds: Option<u64>,
1548) -> Result<Value, (String, String)> {
1549    match (service_name, action_pascal) {
1550        ("ecs", "RunTask") => {
1551            sync_wait_ecs_run_task(registry, initial, input, account_id, timeout_seconds).await
1552        }
1553        ("athena", "StartQueryExecution") => {
1554            sync_wait_athena_query(registry, initial, account_id, timeout_seconds).await
1555        }
1556        ("states", "StartExecution") => {
1557            sync_wait_states_start_execution(registry, initial, account_id, timeout_seconds).await
1558        }
1559        ("glue", "StartJobRun") => {
1560            // Poll the real Glue `GetJobRun` (the job run was created by the
1561            // StartJobRun integration that ran just before this waiter), so the
1562            // `.sync` result is the actual run — full JobRun shape, and a
1563            // FAILED/STOPPED/TIMEOUT state surfaces as a task failure rather
1564            // than a hardcoded SUCCEEDED.
1565            let job_run_id = initial
1566                .get("JobRunId")
1567                .and_then(Value::as_str)
1568                .unwrap_or_default()
1569                .to_string();
1570            let job_name = input
1571                .get("JobName")
1572                .and_then(Value::as_str)
1573                .unwrap_or_default()
1574                .to_string();
1575            let deadline = sync_deadline(timeout_seconds);
1576            loop {
1577                let described = call_sdk_action(
1578                    registry,
1579                    "glue",
1580                    "GetJobRun",
1581                    &json!({ "JobName": job_name, "RunId": job_run_id }),
1582                    account_id,
1583                )
1584                .await?;
1585                let state = described
1586                    .get("JobRun")
1587                    .and_then(|r| r.get("JobRunState"))
1588                    .and_then(Value::as_str)
1589                    .unwrap_or("");
1590                match state {
1591                    "SUCCEEDED" => return Ok(described),
1592                    "FAILED" | "STOPPED" | "TIMEOUT" | "ERROR" => {
1593                        return Err((
1594                            "States.TaskFailed".to_string(),
1595                            format!("Glue job run {job_run_id} ended in state {state}"),
1596                        ));
1597                    }
1598                    _ => {}
1599                }
1600                if std::time::Instant::now() >= deadline {
1601                    return Err((
1602                        "States.Timeout".to_string(),
1603                        format!(
1604                            "glue:startJobRun.sync timed out after {}s for run {job_run_id}",
1605                            sync_timeout_secs(timeout_seconds)
1606                        ),
1607                    ));
1608                }
1609                tokio::time::sleep(std::time::Duration::from_millis(SYNC_POLL_INTERVAL_MS)).await;
1610            }
1611        }
1612        _ => Err((
1613            "States.TaskFailed".to_string(),
1614            format!(
1615                "`.sync` is not supported for {service_name}:{action_pascal} yet — \
1616                 supported: ecs:RunTask, athena:StartQueryExecution, glue:StartJobRun, states:StartExecution"
1617            ),
1618        )),
1619    }
1620}
1621
1622async fn sync_wait_ecs_run_task(
1623    registry: &Arc<fakecloud_core::registry::ServiceRegistry>,
1624    initial: &Value,
1625    input: &Value,
1626    account_id: &str,
1627    timeout_seconds: Option<u64>,
1628) -> Result<Value, (String, String)> {
1629    let tasks = initial
1630        .get("tasks")
1631        .and_then(Value::as_array)
1632        .ok_or_else(|| {
1633            (
1634                "States.TaskFailed".to_string(),
1635                "ecs:RunTask.sync: response missing 'tasks' array".to_string(),
1636            )
1637        })?;
1638    if tasks.is_empty() {
1639        return Err((
1640            "States.TaskFailed".to_string(),
1641            "ecs:RunTask.sync: no tasks were started".to_string(),
1642        ));
1643    }
1644    let task_arns: Vec<String> = tasks
1645        .iter()
1646        .filter_map(|t| t.get("taskArn").and_then(Value::as_str).map(String::from))
1647        .collect();
1648    let cluster = input
1649        .get("cluster")
1650        .or_else(|| input.get("Cluster"))
1651        .and_then(Value::as_str)
1652        .map(String::from);
1653
1654    let deadline = sync_deadline(timeout_seconds);
1655    loop {
1656        let mut describe_input = json!({ "tasks": task_arns });
1657        if let Some(c) = &cluster {
1658            describe_input["cluster"] = json!(c);
1659        }
1660        let described = call_sdk_action(
1661            registry,
1662            "ecs",
1663            "DescribeTasks",
1664            &describe_input,
1665            account_id,
1666        )
1667        .await?;
1668        let described_tasks = described
1669            .get("tasks")
1670            .and_then(Value::as_array)
1671            .cloned()
1672            .unwrap_or_default();
1673        let all_stopped = !described_tasks.is_empty()
1674            && described_tasks
1675                .iter()
1676                .all(|t| t.get("lastStatus").and_then(Value::as_str) == Some("STOPPED"));
1677        if all_stopped {
1678            // Surface non-zero container exit codes or stop codes that map
1679            // to AWS-style failures. Per AWS docs, ECS RunTask.sync raises
1680            // States.TaskFailed when any container exits non-zero or the
1681            // task stops with a failure code.
1682            let any_failed = described_tasks.iter().any(|t| {
1683                let stop_code = t.get("stopCode").and_then(Value::as_str);
1684                let bad_stop = matches!(
1685                    stop_code,
1686                    Some(
1687                        "TaskFailedToStart"
1688                            | "EssentialContainerExited"
1689                            | "ServiceSchedulerInitiated"
1690                    )
1691                );
1692                let bad_exit = t
1693                    .get("containers")
1694                    .and_then(Value::as_array)
1695                    .map(|cs| {
1696                        cs.iter().any(|c| {
1697                            c.get("exitCode")
1698                                .and_then(Value::as_i64)
1699                                .map(|n| n != 0)
1700                                .unwrap_or(false)
1701                        })
1702                    })
1703                    .unwrap_or(false);
1704                bad_stop || bad_exit
1705            });
1706            if any_failed {
1707                let cause = described_tasks
1708                    .iter()
1709                    .find_map(|t| {
1710                        t.get("stoppedReason")
1711                            .and_then(Value::as_str)
1712                            .map(String::from)
1713                    })
1714                    .unwrap_or_else(|| "ECS task failed".to_string());
1715                return Err(("States.TaskFailed".to_string(), cause));
1716            }
1717            return Ok(described);
1718        }
1719        if std::time::Instant::now() >= deadline {
1720            return Err((
1721                "States.Timeout".to_string(),
1722                format!(
1723                    "ecs:RunTask.sync timed out after {}s waiting for {} task(s) to STOP",
1724                    sync_timeout_secs(timeout_seconds),
1725                    task_arns.len()
1726                ),
1727            ));
1728        }
1729        tokio::time::sleep(std::time::Duration::from_millis(SYNC_POLL_INTERVAL_MS)).await;
1730    }
1731}
1732
1733async fn sync_wait_athena_query(
1734    registry: &Arc<fakecloud_core::registry::ServiceRegistry>,
1735    initial: &Value,
1736    account_id: &str,
1737    timeout_seconds: Option<u64>,
1738) -> Result<Value, (String, String)> {
1739    let qid = initial
1740        .get("QueryExecutionId")
1741        .and_then(Value::as_str)
1742        .ok_or_else(|| {
1743            (
1744                "States.TaskFailed".to_string(),
1745                "athena:StartQueryExecution.sync: response missing QueryExecutionId".to_string(),
1746            )
1747        })?
1748        .to_string();
1749
1750    let deadline = sync_deadline(timeout_seconds);
1751    loop {
1752        let described = call_sdk_action(
1753            registry,
1754            "athena",
1755            "GetQueryExecution",
1756            &json!({ "QueryExecutionId": qid }),
1757            account_id,
1758        )
1759        .await?;
1760        let state = described
1761            .get("QueryExecution")
1762            .and_then(|qe| qe.get("Status"))
1763            .and_then(|s| s.get("State"))
1764            .and_then(Value::as_str)
1765            .unwrap_or("");
1766        match state {
1767            "SUCCEEDED" => return Ok(described),
1768            "FAILED" | "CANCELLED" => {
1769                let cause = described
1770                    .get("QueryExecution")
1771                    .and_then(|qe| qe.get("Status"))
1772                    .and_then(|s| s.get("StateChangeReason"))
1773                    .and_then(Value::as_str)
1774                    .unwrap_or("Athena query reached terminal failure state")
1775                    .to_string();
1776                return Err(("States.TaskFailed".to_string(), cause));
1777            }
1778            _ => {}
1779        }
1780        if std::time::Instant::now() >= deadline {
1781            return Err((
1782                "States.Timeout".to_string(),
1783                format!(
1784                    "athena:StartQueryExecution.sync timed out after {}s for query {qid}",
1785                    sync_timeout_secs(timeout_seconds)
1786                ),
1787            ));
1788        }
1789        tokio::time::sleep(std::time::Duration::from_millis(SYNC_POLL_INTERVAL_MS)).await;
1790    }
1791}
1792
1793/// Wait for a nested Step Functions execution to reach a terminal
1794/// state, then return the bubbled `Output` like AWS does for
1795/// `arn:aws:states:::states:startExecution.sync`. The initial
1796/// `StartExecution` response carries `executionArn`; we poll
1797/// `DescribeExecution` until status leaves `RUNNING`.
1798async fn sync_wait_states_start_execution(
1799    registry: &Arc<fakecloud_core::registry::ServiceRegistry>,
1800    initial: &Value,
1801    account_id: &str,
1802    timeout_seconds: Option<u64>,
1803) -> Result<Value, (String, String)> {
1804    let exec_arn = initial
1805        .get("executionArn")
1806        .or_else(|| initial.get("ExecutionArn"))
1807        .and_then(Value::as_str)
1808        .ok_or_else(|| {
1809            (
1810                "States.TaskFailed".to_string(),
1811                "states:startExecution.sync: response missing executionArn".to_string(),
1812            )
1813        })?
1814        .to_string();
1815
1816    let deadline = sync_deadline(timeout_seconds);
1817    loop {
1818        let described = call_sdk_action(
1819            registry,
1820            "states",
1821            "DescribeExecution",
1822            &json!({ "executionArn": exec_arn }),
1823            account_id,
1824        )
1825        .await?;
1826        let status = described
1827            .get("status")
1828            .or_else(|| described.get("Status"))
1829            .and_then(Value::as_str)
1830            .unwrap_or("");
1831        match status {
1832            "SUCCEEDED" => return Ok(described),
1833            "FAILED" | "TIMED_OUT" | "ABORTED" => {
1834                let cause = described
1835                    .get("cause")
1836                    .or_else(|| described.get("Cause"))
1837                    .and_then(Value::as_str)
1838                    .unwrap_or("Nested execution reached terminal failure state")
1839                    .to_string();
1840                return Err(("States.TaskFailed".to_string(), cause));
1841            }
1842            _ => {}
1843        }
1844        if std::time::Instant::now() >= deadline {
1845            return Err((
1846                "States.Timeout".to_string(),
1847                format!(
1848                    "states:startExecution.sync timed out after {}s for {exec_arn}",
1849                    sync_timeout_secs(timeout_seconds)
1850                ),
1851            ));
1852        }
1853        tokio::time::sleep(std::time::Duration::from_millis(SYNC_POLL_INTERVAL_MS)).await;
1854    }
1855}
1856
1857fn sync_timeout_secs(timeout_seconds: Option<u64>) -> u64 {
1858    timeout_seconds.unwrap_or(SYNC_DEFAULT_TIMEOUT_SECS)
1859}
1860
1861fn sync_deadline(timeout_seconds: Option<u64>) -> std::time::Instant {
1862    std::time::Instant::now() + std::time::Duration::from_secs(sync_timeout_secs(timeout_seconds))
1863}
1864
1865#[derive(Clone, Copy)]
1866pub(crate) enum UpdateClause {
1867    Set,
1868    Remove,
1869    Add,
1870    Delete,
1871}
1872
1873/// Invoke a Lambda function directly via DeliveryBus.
1874async fn invoke_lambda_direct(
1875    function_arn: &str,
1876    input: &Value,
1877    delivery: &Option<Arc<DeliveryBus>>,
1878    timeout_seconds: Option<u64>,
1879) -> Result<Value, (String, String)> {
1880    let delivery = delivery.as_ref().ok_or_else(|| {
1881        (
1882            "States.TaskFailed".to_string(),
1883            "No delivery bus configured for Lambda invocation".to_string(),
1884        )
1885    })?;
1886
1887    let payload =
1888        serde_json::to_string(input).expect("serde_json::Value serialization is infallible");
1889
1890    let invoke_future = delivery.invoke_lambda(function_arn, &payload);
1891
1892    let result = if let Some(timeout) = timeout_seconds {
1893        match tokio::time::timeout(tokio::time::Duration::from_secs(timeout), invoke_future).await {
1894            Ok(r) => r,
1895            Err(_) => {
1896                return Err((
1897                    "States.Timeout".to_string(),
1898                    format!("Task timed out after {timeout} seconds"),
1899                ));
1900            }
1901        }
1902    } else {
1903        invoke_future.await
1904    };
1905
1906    match result {
1907        Some(Ok(bytes)) => {
1908            let response_str = String::from_utf8_lossy(&bytes);
1909            let value: Value =
1910                serde_json::from_str(&response_str).unwrap_or(json!(response_str.to_string()));
1911
1912            // The Lambda runtime returns HTTP 200 even for unhandled function
1913            // errors, encoding the failure in the payload as
1914            // `{"errorMessage", "errorType", "stackTrace"}`. The delivery trait
1915            // only conveys bytes, so the payload is the only available signal —
1916            // the same heuristic the Lambda service uses for async destination
1917            // routing. Mirror real AWS: a function error fails the task with the
1918            // function's `errorType` as the Error and the serialized payload as
1919            // the Cause, which is exactly what makes Retry/Catch on custom
1920            // exception names work.
1921            //
1922            // Require BOTH `errorType` and `errorMessage` (not either) to avoid
1923            // misreading a legitimate success payload that merely happens to
1924            // contain one of those field names as a function error.
1925            if let Some(obj) = value.as_object() {
1926                if obj.contains_key("errorType") && obj.contains_key("errorMessage") {
1927                    let error_type = obj
1928                        .get("errorType")
1929                        .and_then(Value::as_str)
1930                        .unwrap_or("Exception")
1931                        .to_string();
1932                    let cause = serde_json::to_string(&value)
1933                        .expect("serde_json::Value serialization is infallible");
1934                    return Err((error_type, cause));
1935                }
1936            }
1937
1938            Ok(value)
1939        }
1940        Some(Err(e)) => {
1941            // Failures of the Lambda Invoke API call itself surface on real AWS
1942            // with `Lambda.`-prefixed error names (`Lambda.ServiceException`,
1943            // `Lambda.SdkClientException`, `Lambda.Unknown`, ...) — never
1944            // `States.TaskFailed` — and those are the names AWS documents for
1945            // Retry rules. The delivery error is an unstructured string with
1946            // the outcome flattened, so the honest default is `Lambda.Unknown`
1947            // ("outcome unknown"). The one case we can cheaply distinguish is a
1948            // missing function, which AWS reports as
1949            // `Lambda.ResourceNotFoundException`.
1950            if e.starts_with("Function not found") {
1951                Err(("Lambda.ResourceNotFoundException".to_string(), e))
1952            } else {
1953                Err(("Lambda.Unknown".to_string(), e))
1954            }
1955        }
1956        None => {
1957            // No runtime available — return empty result
1958            Ok(json!({}))
1959        }
1960    }
1961}
1962
1963/// Invoke an activity worker. Inserts a `PENDING` token into shared state
1964/// so a worker can claim it via `GetActivityTask`, then polls until the
1965/// worker calls `SendTaskSuccess` / `SendTaskFailure` or the heartbeat /
1966/// timeout windows expire.
1967async fn invoke_activity(
1968    activity_arn: &str,
1969    input: &Value,
1970    shared_state: &SharedStepFunctionsState,
1971    timeout_seconds: Option<u64>,
1972    heartbeat_seconds: Option<u64>,
1973) -> Result<Value, (String, String)> {
1974    use crate::state::TaskTokenState;
1975
1976    // Activity must exist (look up across accounts via ARN segment).
1977    let activity_account = activity_arn.split(':').nth(4).unwrap_or("").to_string();
1978    {
1979        let accounts = shared_state.read();
1980        let exists = accounts
1981            .get(&activity_account)
1982            .map(|s| s.activities.contains_key(activity_arn))
1983            .unwrap_or(false);
1984        if !exists {
1985            return Err((
1986                "States.TaskFailed".to_string(),
1987                format!("Activity does not exist: {activity_arn}"),
1988            ));
1989        }
1990    }
1991
1992    let token = format!(
1993        "FCToken-{}-{}",
1994        chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0),
1995        uuid::Uuid::new_v4().simple(),
1996    );
1997    let now = chrono::Utc::now();
1998    let input_str =
1999        serde_json::to_string(input).expect("serde_json::Value serialization is infallible");
2000    {
2001        let mut accounts = shared_state.write();
2002        let state = accounts.get_or_create(&activity_account);
2003        state.task_tokens.insert(
2004            token.clone(),
2005            TaskTokenState {
2006                activity_arn: activity_arn.to_string(),
2007                status: "PENDING".to_string(),
2008                output: None,
2009                error: None,
2010                cause: None,
2011                input: Some(input_str),
2012                created_at: now,
2013                last_heartbeat_at: None,
2014                heartbeat_seconds: heartbeat_seconds.map(|s| s as i64),
2015                timeout_seconds: timeout_seconds.map(|s| s as i64),
2016            },
2017        );
2018    }
2019
2020    poll_task_token(
2021        shared_state,
2022        &activity_account,
2023        &token,
2024        timeout_seconds,
2025        heartbeat_seconds,
2026    )
2027    .await
2028}
2029
2030pub(crate) enum NextState {
2031    Name(String),
2032    End,
2033    Error(String),
2034}
2035
2036#[path = "interpreter_helpers.rs"]
2037mod interpreter_helpers;
2038pub(crate) use interpreter_helpers::*;
2039
2040#[cfg(test)]
2041#[path = "interpreter_tests.rs"]
2042mod tests;