Skip to main content

fakecloud_stepfunctions/
interpreter.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::Utc;
5use serde_json::{json, Value};
6use tracing::{debug, warn};
7
8use fakecloud_aws::arn::Arn;
9use fakecloud_core::delivery::DeliveryBus;
10use fakecloud_dynamodb::SharedDynamoDbState;
11
12use crate::choice::evaluate_choice;
13use crate::error_handling::{find_catcher, should_retry};
14use crate::io_processing::{apply_input_path, apply_output_path, apply_result_path};
15use crate::service::SharedServiceRegistry;
16use crate::state::{ExecutionStatus, HistoryEvent, SharedStepFunctionsState};
17
18/// Execute a state machine definition with the given input.
19/// Updates the execution record in shared state as it progresses.
20#[allow(clippy::too_many_arguments)]
21pub async fn execute_state_machine(
22    state: SharedStepFunctionsState,
23    execution_arn: String,
24    definition: String,
25    input: Option<String>,
26    delivery: Option<Arc<DeliveryBus>>,
27    dynamodb_state: Option<SharedDynamoDbState>,
28    registry: Option<SharedServiceRegistry>,
29    logging_configuration: Option<Value>,
30) {
31    let def: Value = match serde_json::from_str(&definition) {
32        Ok(v) => v,
33        Err(e) => {
34            fail_execution(
35                &state,
36                &execution_arn,
37                "States.Runtime",
38                &format!("Failed to parse definition: {e}"),
39            );
40            return;
41        }
42    };
43
44    let raw_input: Value = input
45        .as_deref()
46        .and_then(|s| serde_json::from_str(s).ok())
47        .unwrap_or(json!({}));
48
49    // Record ExecutionStarted event
50    add_event(
51        &state,
52        &execution_arn,
53        "ExecutionStarted",
54        0,
55        json!({
56            "input": serde_json::to_string(&raw_input).expect("serde_json::Value serialization is infallible"),
57            "roleArn": "arn:aws:iam::123456789012:role/execution-role"
58        }),
59    );
60
61    // Run the state machine inside an inner tokio::spawn so that any panic
62    // bubbles up as a JoinError instead of tearing down the caller. Without
63    // this the panic propagates through the outer spawn in `start_execution`
64    // which leaves the execution stuck in Running and leaks the panic to
65    // tokio's default hook.
66    let def_owned = def;
67    let state_clone = state.clone();
68    let execution_arn_clone = execution_arn.clone();
69    let delivery_clone = delivery.clone();
70    let dynamodb_state_clone = dynamodb_state.clone();
71    let registry_clone = registry.clone();
72    let handle = tokio::spawn(async move {
73        run_states(
74            &def_owned,
75            raw_input,
76            &delivery_clone,
77            &dynamodb_state_clone,
78            &registry_clone,
79            &state_clone,
80            &execution_arn_clone,
81        )
82        .await
83    });
84
85    match handle.await {
86        Ok(Ok(output)) => {
87            succeed_execution(&state, &execution_arn, &output);
88        }
89        Ok(Err((error, cause))) => {
90            fail_execution(&state, &execution_arn, &error, &cause);
91        }
92        Err(join_err) => {
93            let msg = if join_err.is_panic() {
94                let payload = join_err.into_panic();
95                if let Some(s) = payload.downcast_ref::<String>() {
96                    s.clone()
97                } else if let Some(s) = payload.downcast_ref::<&'static str>() {
98                    (*s).to_string()
99                } else {
100                    "execution task panicked".to_string()
101                }
102            } else {
103                format!("execution task cancelled: {join_err}")
104            };
105            tracing::error!(
106                execution_arn = %execution_arn,
107                panic = %msg,
108                "Step Functions execution panicked"
109            );
110            fail_execution(&state, &execution_arn, "States.Runtime", &msg);
111        }
112    }
113
114    // Deliver execution history to CloudWatch Logs when logging is configured.
115    deliver_execution_logs(
116        &state,
117        &execution_arn,
118        delivery.as_ref(),
119        logging_configuration.as_ref(),
120    );
121}
122
123type StatesResult<'a> = std::pin::Pin<
124    Box<dyn std::future::Future<Output = Result<Value, (String, String)>> + Send + 'a>,
125>;
126
127/// Result of executing a single state in the state machine.
128pub(crate) enum Advance {
129    /// Continue to the given state with the given input.
130    Next(String, Value),
131    /// Terminate the state machine with the given output.
132    End(Value),
133    /// Fail the state machine with the given error and cause.
134    Fail(String, String),
135}
136
137async fn run_wait_state(
138    name: &str,
139    state_def: &Value,
140    input: Value,
141    shared_state: &SharedStepFunctionsState,
142    execution_arn: &str,
143) -> Advance {
144    let entered_event_id = add_event(
145        shared_state,
146        execution_arn,
147        "WaitStateEntered",
148        0,
149        json!({
150            "name": name,
151            "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
152        }),
153    );
154
155    execute_wait_state(state_def, &input).await;
156
157    add_event(
158        shared_state,
159        execution_arn,
160        "WaitStateExited",
161        entered_event_id,
162        json!({
163            "name": name,
164            "output": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
165        }),
166    );
167
168    advance_from_next(state_def, input)
169}
170
171#[allow(clippy::too_many_arguments)]
172async fn run_task_state(
173    name: &str,
174    state_def: &Value,
175    input: Value,
176    delivery: &Option<Arc<DeliveryBus>>,
177    dynamodb_state: &Option<SharedDynamoDbState>,
178    registry: &Option<SharedServiceRegistry>,
179    shared_state: &SharedStepFunctionsState,
180    execution_arn: &str,
181) -> Advance {
182    let entered_event_id = add_event(
183        shared_state,
184        execution_arn,
185        "TaskStateEntered",
186        0,
187        json!({
188            "name": name,
189            "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
190        }),
191    );
192
193    let result = execute_task_state(
194        name,
195        state_def,
196        &input,
197        delivery,
198        dynamodb_state,
199        registry,
200        shared_state,
201        execution_arn,
202        entered_event_id,
203    )
204    .await;
205
206    match result {
207        Ok(output) => {
208            add_event(
209                shared_state,
210                execution_arn,
211                "TaskStateExited",
212                entered_event_id,
213                json!({
214                    "name": name,
215                    "output": serde_json::to_string(&output).expect("serde_json::Value serialization is infallible"),
216                }),
217            );
218            advance_from_next(state_def, output)
219        }
220        Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
221    }
222}
223
224#[allow(clippy::too_many_arguments)]
225async fn run_parallel_state(
226    name: &str,
227    state_def: &Value,
228    input: Value,
229    delivery: &Option<Arc<DeliveryBus>>,
230    dynamodb_state: &Option<SharedDynamoDbState>,
231    registry: &Option<SharedServiceRegistry>,
232    shared_state: &SharedStepFunctionsState,
233    execution_arn: &str,
234) -> Advance {
235    let entered_event_id = add_event(
236        shared_state,
237        execution_arn,
238        "ParallelStateEntered",
239        0,
240        json!({
241            "name": name,
242            "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
243        }),
244    );
245
246    let result = execute_parallel_state(
247        state_def,
248        &input,
249        delivery,
250        dynamodb_state,
251        registry,
252        shared_state,
253        execution_arn,
254    )
255    .await;
256
257    match result {
258        Ok(output) => {
259            add_event(
260                shared_state,
261                execution_arn,
262                "ParallelStateExited",
263                entered_event_id,
264                json!({
265                    "name": name,
266                    "output": serde_json::to_string(&output).expect("serde_json::Value serialization is infallible"),
267                }),
268            );
269            advance_from_next(state_def, output)
270        }
271        Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
272    }
273}
274
275#[allow(clippy::too_many_arguments)]
276async fn run_map_state(
277    name: &str,
278    state_def: &Value,
279    input: Value,
280    delivery: &Option<Arc<DeliveryBus>>,
281    dynamodb_state: &Option<SharedDynamoDbState>,
282    registry: &Option<SharedServiceRegistry>,
283    shared_state: &SharedStepFunctionsState,
284    execution_arn: &str,
285) -> Advance {
286    let entered_event_id = add_event(
287        shared_state,
288        execution_arn,
289        "MapStateEntered",
290        0,
291        json!({
292            "name": name,
293            "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
294        }),
295    );
296
297    let result = execute_map_state(
298        state_def,
299        &input,
300        delivery,
301        dynamodb_state,
302        registry,
303        shared_state,
304        execution_arn,
305    )
306    .await;
307
308    match result {
309        Ok(output) => {
310            add_event(
311                shared_state,
312                execution_arn,
313                "MapStateExited",
314                entered_event_id,
315                json!({
316                    "name": name,
317                    "output": serde_json::to_string(&output).expect("serde_json::Value serialization is infallible"),
318                }),
319            );
320            advance_from_next(state_def, output)
321        }
322        Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
323    }
324}
325
326/// Execute a Wait state: pause execution for a specified duration or until a timestamp.
327async fn execute_wait_state(state_def: &Value, input: &Value) {
328    if let Some(seconds) = state_def["Seconds"].as_u64() {
329        tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
330        return;
331    }
332
333    if let Some(path) = state_def["SecondsPath"].as_str() {
334        let val = crate::io_processing::resolve_path(input, path);
335        if let Some(seconds) = val.as_u64() {
336            tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
337        }
338        return;
339    }
340
341    if let Some(ts_str) = state_def["Timestamp"].as_str() {
342        if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
343            let now = Utc::now();
344            let target_utc = target.with_timezone(&chrono::Utc);
345            if target_utc > now {
346                let duration = (target_utc - now).to_std().unwrap_or_default();
347                tokio::time::sleep(duration).await;
348            }
349        }
350        return;
351    }
352
353    if let Some(path) = state_def["TimestampPath"].as_str() {
354        let val = crate::io_processing::resolve_path(input, path);
355        if let Some(ts_str) = val.as_str() {
356            if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
357                let now = Utc::now();
358                let target_utc = target.with_timezone(&chrono::Utc);
359                if target_utc > now {
360                    let duration = (target_utc - now).to_std().unwrap_or_default();
361                    tokio::time::sleep(duration).await;
362                }
363            }
364        }
365        return;
366    }
367
368    warn!(
369        "Wait state has no valid Seconds, SecondsPath, Timestamp, or TimestampPath — skipping wait"
370    );
371}
372
373/// Execute a Task state: invoke the resource (Lambda, SQS, SNS, EventBridge, DynamoDB),
374/// apply I/O processing, handle Retry.
375#[allow(clippy::too_many_arguments)]
376async fn execute_task_state(
377    name: &str,
378    state_def: &Value,
379    input: &Value,
380    delivery: &Option<Arc<DeliveryBus>>,
381    dynamodb_state: &Option<SharedDynamoDbState>,
382    registry: &Option<SharedServiceRegistry>,
383    shared_state: &SharedStepFunctionsState,
384    execution_arn: &str,
385    entered_event_id: i64,
386) -> Result<Value, (String, String)> {
387    let resource = state_def["Resource"].as_str().unwrap_or("").to_string();
388
389    let input_path = state_def["InputPath"].as_str();
390    let result_path = state_def["ResultPath"].as_str();
391    let output_path = state_def["OutputPath"].as_str();
392
393    let effective_input = if input_path == Some("null") {
394        json!({})
395    } else {
396        apply_input_path(input, input_path)
397    };
398
399    let retriers = state_def["Retry"].as_array().cloned().unwrap_or_default();
400    let timeout_seconds = state_def["TimeoutSeconds"].as_u64();
401    let heartbeat_seconds = state_def["HeartbeatSeconds"].as_u64();
402    let mut attempt = 0u32;
403
404    let is_wait_for_task_token = resource.contains(".waitForTaskToken");
405    let task_token = if is_wait_for_task_token {
406        let token = format!(
407            "FCToken-{}-{}",
408            chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0),
409            uuid::Uuid::new_v4().simple(),
410        );
411        let account_id = account_id_from_arn(execution_arn);
412        let context = json!({
413            "Task": { "Token": token.clone() },
414            "Execution": { "Id": execution_arn },
415            "State": { "Name": name },
416        });
417        {
418            let mut accounts = shared_state.write();
419            let state = accounts.get_or_create(account_id);
420            state.task_tokens.insert(
421                token.clone(),
422                crate::state::TaskTokenState {
423                    activity_arn: String::new(),
424                    status: "PENDING".to_string(),
425                    output: None,
426                    error: None,
427                    cause: None,
428                    input: None,
429                    created_at: chrono::Utc::now(),
430                    last_heartbeat_at: None,
431                    heartbeat_seconds: heartbeat_seconds.map(|s| s as i64),
432                    timeout_seconds: timeout_seconds.map(|s| s as i64),
433                },
434            );
435        }
436        Some((token, context))
437    } else {
438        None
439    };
440
441    let task_input = if let Some(params) = state_def.get("Parameters") {
442        if let Some((_, ctx)) = &task_token {
443            apply_parameters(params, &effective_input, Some(ctx))
444        } else {
445            apply_parameters(params, &effective_input, None)
446        }
447    } else {
448        effective_input
449    };
450
451    loop {
452        add_event(
453            shared_state,
454            execution_arn,
455            "TaskScheduled",
456            entered_event_id,
457            json!({
458                "resource": resource,
459                "region": "us-east-1",
460                "parameters": serde_json::to_string(&task_input).expect("serde_json::Value serialization is infallible"),
461            }),
462        );
463
464        add_event(
465            shared_state,
466            execution_arn,
467            "TaskStarted",
468            entered_event_id,
469            json!({ "resource": resource }),
470        );
471
472        let invoke_result = invoke_resource(
473            &resource,
474            &task_input,
475            delivery,
476            dynamodb_state,
477            registry,
478            execution_arn,
479            timeout_seconds,
480            heartbeat_seconds,
481            shared_state,
482        )
483        .await;
484
485        match invoke_result {
486            Ok(result) => {
487                if let Some((token, _)) = &task_token {
488                    let account_id = account_id_from_arn(execution_arn);
489                    match poll_task_token(
490                        shared_state,
491                        account_id,
492                        token,
493                        timeout_seconds,
494                        heartbeat_seconds,
495                    )
496                    .await
497                    {
498                        Ok(output) => {
499                            add_event(
500                                shared_state,
501                                execution_arn,
502                                "TaskSucceeded",
503                                entered_event_id,
504                                json!({
505                                    "resource": resource,
506                                    "output": serde_json::to_string(&output).expect("serde_json::Value serialization is infallible"),
507                                }),
508                            );
509
510                            let selected = if let Some(selector) = state_def.get("ResultSelector") {
511                                apply_parameters(selector, &output, None)
512                            } else {
513                                output
514                            };
515
516                            let after_result = if result_path == Some("null") {
517                                input.clone()
518                            } else {
519                                apply_result_path(input, &selected, result_path)
520                            };
521
522                            let output = if output_path == Some("null") {
523                                json!({})
524                            } else {
525                                apply_output_path(&after_result, output_path)
526                            };
527
528                            return Ok(output);
529                        }
530                        Err((error, cause)) => {
531                            add_event(
532                                shared_state,
533                                execution_arn,
534                                "TaskFailed",
535                                entered_event_id,
536                                json!({ "error": error, "cause": cause }),
537                            );
538
539                            if let Some(delay_ms) = should_retry(&retriers, &error, attempt) {
540                                attempt += 1;
541                                let actual_delay = delay_ms.min(5000);
542                                tokio::time::sleep(tokio::time::Duration::from_millis(
543                                    actual_delay,
544                                ))
545                                .await;
546                                continue;
547                            }
548
549                            return Err((error, cause));
550                        }
551                    }
552                }
553
554                add_event(
555                    shared_state,
556                    execution_arn,
557                    "TaskSucceeded",
558                    entered_event_id,
559                    json!({
560                        "resource": resource,
561                        "output": serde_json::to_string(&result).expect("serde_json::Value serialization is infallible"),
562                    }),
563                );
564
565                let selected = if let Some(selector) = state_def.get("ResultSelector") {
566                    apply_parameters(selector, &result, None)
567                } else {
568                    result
569                };
570
571                let after_result = if result_path == Some("null") {
572                    input.clone()
573                } else {
574                    apply_result_path(input, &selected, result_path)
575                };
576
577                let output = if output_path == Some("null") {
578                    json!({})
579                } else {
580                    apply_output_path(&after_result, output_path)
581                };
582
583                return Ok(output);
584            }
585            Err((error, cause)) => {
586                add_event(
587                    shared_state,
588                    execution_arn,
589                    "TaskFailed",
590                    entered_event_id,
591                    json!({ "error": error, "cause": cause }),
592                );
593
594                if let Some(delay_ms) = should_retry(&retriers, &error, attempt) {
595                    attempt += 1;
596                    let actual_delay = delay_ms.min(5000);
597                    tokio::time::sleep(tokio::time::Duration::from_millis(actual_delay)).await;
598                    continue;
599                }
600
601                return Err((error, cause));
602            }
603        }
604    }
605}
606
607/// Execute a Parallel state: run all branches concurrently, collect results into an array.
608async fn execute_parallel_state(
609    state_def: &Value,
610    input: &Value,
611    delivery: &Option<Arc<DeliveryBus>>,
612    dynamodb_state: &Option<SharedDynamoDbState>,
613    registry: &Option<SharedServiceRegistry>,
614    shared_state: &SharedStepFunctionsState,
615    execution_arn: &str,
616) -> Result<Value, (String, String)> {
617    let input_path = state_def["InputPath"].as_str();
618    let result_path = state_def["ResultPath"].as_str();
619    let output_path = state_def["OutputPath"].as_str();
620
621    let effective_input = if input_path == Some("null") {
622        json!({})
623    } else {
624        apply_input_path(input, input_path)
625    };
626
627    let branches = state_def["Branches"]
628        .as_array()
629        .cloned()
630        .unwrap_or_default();
631
632    if branches.is_empty() {
633        return Err((
634            "States.Runtime".to_string(),
635            "Parallel state has no Branches".to_string(),
636        ));
637    }
638
639    // Spawn all branches concurrently
640    let mut handles = Vec::new();
641    for branch_def in &branches {
642        let branch = branch_def.clone();
643        let branch_input = effective_input.clone();
644        let delivery = delivery.clone();
645        let ddb = dynamodb_state.clone();
646        let reg = registry.clone();
647        let state = shared_state.clone();
648        let arn = execution_arn.to_string();
649
650        handles.push(tokio::spawn(async move {
651            run_states(&branch, branch_input, &delivery, &ddb, &reg, &state, &arn).await
652        }));
653    }
654
655    // Collect results in order
656    let mut results = Vec::with_capacity(handles.len());
657    for handle in handles {
658        let result = handle.await.map_err(|e| {
659            (
660                "States.Runtime".to_string(),
661                format!("Branch execution panicked: {e}"),
662            )
663        })??;
664        results.push(result);
665    }
666
667    let branch_output = Value::Array(results);
668
669    // Apply ResultSelector if present
670    let selected = if let Some(selector) = state_def.get("ResultSelector") {
671        apply_parameters(selector, &branch_output, None)
672    } else {
673        branch_output
674    };
675
676    // Apply ResultPath
677    let after_result = if result_path == Some("null") {
678        input.clone()
679    } else {
680        apply_result_path(input, &selected, result_path)
681    };
682
683    // Apply OutputPath
684    let output = if output_path == Some("null") {
685        json!({})
686    } else {
687        apply_output_path(&after_result, output_path)
688    };
689
690    Ok(output)
691}
692
693/// Execute a Map state: iterate over an array and run a sub-state machine per item.
694/// Supports distributed mode: ItemReader (S3), ItemBatcher, ResultWriter (S3),
695/// ToleratedFailurePercentage, and MaxConcurrencyPath.
696#[allow(clippy::too_many_arguments)]
697async fn execute_map_state(
698    state_def: &Value,
699    input: &Value,
700    delivery: &Option<Arc<DeliveryBus>>,
701    dynamodb_state: &Option<SharedDynamoDbState>,
702    registry: &Option<SharedServiceRegistry>,
703    shared_state: &SharedStepFunctionsState,
704    execution_arn: &str,
705) -> Result<Value, (String, String)> {
706    let input_path = state_def["InputPath"].as_str();
707    let result_path = state_def["ResultPath"].as_str();
708    let output_path = state_def["OutputPath"].as_str();
709
710    let effective_input = if input_path == Some("null") {
711        json!({})
712    } else {
713        apply_input_path(input, input_path)
714    };
715
716    // Resolve MaxConcurrencyPath if present
717    let max_concurrency = if let Some(path) = state_def["MaxConcurrencyPath"].as_str() {
718        crate::io_processing::resolve_path(&effective_input, path)
719            .as_u64()
720            .unwrap_or(0)
721    } else {
722        state_def["MaxConcurrency"].as_u64().unwrap_or(0)
723    };
724    let effective_concurrency = if max_concurrency == 0 {
725        40
726    } else {
727        max_concurrency as usize
728    };
729
730    // Read items from ItemReader (S3) or ItemsPath
731    let items = if let Some(item_reader) = state_def.get("ItemReader") {
732        read_items_from_s3(item_reader, registry, execution_arn).await?
733    } else {
734        let items_path = state_def["ItemsPath"].as_str().unwrap_or("$");
735        let items_value = crate::io_processing::resolve_path(&effective_input, items_path);
736        items_value.as_array().cloned().unwrap_or_default()
737    };
738
739    // Apply ItemBatcher if present
740    let batch_config = state_def.get("ItemBatcher").cloned();
741    let batched_items = if let Some(ref batcher) = batch_config {
742        apply_item_batcher(&items, batcher, &effective_input)
743    } else {
744        items
745    };
746
747    // Get the iterator definition (ItemProcessor or Iterator for backwards compat)
748    let iterator_def = state_def
749        .get("ItemProcessor")
750        .or_else(|| state_def.get("Iterator"))
751        .cloned()
752        .ok_or_else(|| {
753            (
754                "States.Runtime".to_string(),
755                "Map state has no ItemProcessor or Iterator".to_string(),
756            )
757        })?;
758
759    let tolerated_failure_percentage = state_def["ToleratedFailurePercentage"]
760        .as_f64()
761        .unwrap_or(0.0);
762    let total_items = batched_items.len() as f64;
763    let mut failure_count = 0usize;
764
765    let semaphore = Arc::new(tokio::sync::Semaphore::new(effective_concurrency));
766
767    // Process all items
768    let mut handles = Vec::new();
769    for (index, batch_item) in batched_items.into_iter().enumerate() {
770        let iter_def = iterator_def.clone();
771        let delivery = delivery.clone();
772        let ddb = dynamodb_state.clone();
773        let reg = registry.clone();
774        let state = shared_state.clone();
775        let arn = execution_arn.to_string();
776        let sem = semaphore.clone();
777
778        // Apply ItemSelector if present
779        let item_input = if let Some(selector) = state_def.get("ItemSelector") {
780            let mut ctx = serde_json::Map::new();
781            ctx.insert("value".to_string(), batch_item.clone());
782            ctx.insert("index".to_string(), json!(index));
783            apply_parameters(selector, &Value::Object(ctx), None)
784        } else {
785            batch_item
786        };
787
788        add_event(
789            shared_state,
790            execution_arn,
791            "MapIterationStarted",
792            0,
793            json!({ "index": index }),
794        );
795
796        handles.push(tokio::spawn(async move {
797            let _permit = sem
798                .acquire()
799                .await
800                .map_err(|_| ("States.Runtime".to_string(), "Semaphore closed".to_string()))?;
801            let result =
802                run_states(&iter_def, item_input, &delivery, &ddb, &reg, &state, &arn).await;
803            Ok::<(usize, Result<Value, (String, String)>), (String, String)>((index, result))
804        }));
805    }
806
807    // Collect results in order
808    let mut results: Vec<(usize, Value)> = Vec::with_capacity(handles.len());
809    for handle in handles {
810        let (index, result) = handle.await.map_err(|e| {
811            (
812                "States.Runtime".to_string(),
813                format!("Map iteration panicked: {e}"),
814            )
815        })??;
816
817        match result {
818            Ok(output) => {
819                add_event(
820                    shared_state,
821                    execution_arn,
822                    "MapIterationSucceeded",
823                    0,
824                    json!({ "index": index }),
825                );
826                results.push((index, output));
827            }
828            Err((error, cause)) => {
829                add_event(
830                    shared_state,
831                    execution_arn,
832                    "MapIterationFailed",
833                    0,
834                    json!({ "index": index, "error": error }),
835                );
836                failure_count += 1;
837                let failure_percentage = (failure_count as f64 / total_items) * 100.0;
838                if failure_percentage > tolerated_failure_percentage {
839                    return Err((error, cause));
840                }
841            }
842        }
843    }
844
845    // Sort by index to maintain order
846    results.sort_by_key(|(i, _)| *i);
847    let map_output = Value::Array(results.into_iter().map(|(_, v)| v).collect());
848
849    // Write results to S3 if ResultWriter is configured
850    if let Some(result_writer) = state_def.get("ResultWriter") {
851        write_map_results_to_s3(result_writer, registry, execution_arn, &map_output).await?;
852    }
853
854    // Apply ResultSelector if present
855    let selected = if let Some(selector) = state_def.get("ResultSelector") {
856        apply_parameters(selector, &map_output, None)
857    } else {
858        map_output
859    };
860
861    // Apply ResultPath
862    let after_result = if result_path == Some("null") {
863        input.clone()
864    } else {
865        apply_result_path(input, &selected, result_path)
866    };
867
868    // Apply OutputPath
869    let output = if output_path == Some("null") {
870        json!({})
871    } else {
872        apply_output_path(&after_result, output_path)
873    };
874
875    Ok(output)
876}
877
878/// Read items from S3 for distributed Map mode ItemReader.
879async fn read_items_from_s3(
880    item_reader: &Value,
881    registry: &Option<SharedServiceRegistry>,
882    execution_arn: &str,
883) -> Result<Vec<Value>, (String, String)> {
884    let resource = item_reader["Resource"]
885        .as_str()
886        .unwrap_or("arn:aws:states:::s3:getObject");
887    if !resource.contains("s3:getObject") {
888        return Err((
889            "States.Runtime".to_string(),
890            format!("ItemReader unsupported resource: {resource}"),
891        ));
892    }
893
894    let params = item_reader
895        .get("Parameters")
896        .cloned()
897        .unwrap_or_else(|| json!({}));
898    let bucket = params["Bucket"].as_str().ok_or_else(|| {
899        (
900            "States.Runtime".to_string(),
901            "ItemReader missing Bucket".to_string(),
902        )
903    })?;
904    let key = params["Key"].as_str().ok_or_else(|| {
905        (
906            "States.Runtime".to_string(),
907            "ItemReader missing Key".to_string(),
908        )
909    })?;
910
911    let registry_arc = resolve_registry(registry)?;
912    let account_id = account_from_execution_arn(execution_arn);
913
914    let body = call_sdk_action_raw_bytes(
915        &registry_arc,
916        "s3",
917        "GetObject",
918        &json!({ "Bucket": bucket, "Key": key }),
919        &account_id,
920    )
921    .await?;
922
923    // Parse JSON array from the S3 object body
924    let parsed: Value = serde_json::from_slice(&body).map_err(|e| {
925        (
926            "States.Runtime".to_string(),
927            format!("ItemReader failed to parse S3 object as JSON: {e}"),
928        )
929    })?;
930
931    parsed.as_array().cloned().ok_or_else(|| {
932        (
933            "States.Runtime".to_string(),
934            "ItemReader S3 object is not a JSON array".to_string(),
935        )
936    })
937}
938
939/// Write Map results to S3 for distributed Map mode ResultWriter.
940async fn write_map_results_to_s3(
941    result_writer: &Value,
942    registry: &Option<SharedServiceRegistry>,
943    execution_arn: &str,
944    results: &Value,
945) -> Result<(), (String, String)> {
946    let resource = result_writer["Resource"]
947        .as_str()
948        .unwrap_or("arn:aws:states:::s3:putObject");
949    if !resource.contains("s3:putObject") {
950        return Err((
951            "States.Runtime".to_string(),
952            format!("ResultWriter unsupported resource: {resource}"),
953        ));
954    }
955
956    let params = result_writer
957        .get("Parameters")
958        .cloned()
959        .unwrap_or_else(|| json!({}));
960    let bucket = params["Bucket"].as_str().ok_or_else(|| {
961        (
962            "States.Runtime".to_string(),
963            "ResultWriter missing Bucket".to_string(),
964        )
965    })?;
966    let prefix = params["Prefix"].as_str().unwrap_or("map-results/");
967
968    let registry_arc = resolve_registry(registry)?;
969    let account_id = account_from_execution_arn(execution_arn);
970
971    use bytes::Bytes;
972    let body = Bytes::from(
973        serde_json::to_vec(results).expect("serde_json::Value serialization is infallible"),
974    );
975
976    // Build a raw S3 PutObject request with the JSON body
977    use fakecloud_core::service::AwsRequest;
978    use http::{HeaderMap, Method};
979    let service = registry_arc.get("s3").ok_or_else(|| {
980        (
981            "States.TaskFailed".to_string(),
982            "S3 service not available for ResultWriter".to_string(),
983        )
984    })?;
985
986    let req = AwsRequest {
987        service: "s3".to_string(),
988        action: "PutObject".to_string(),
989        region: "us-east-1".to_string(),
990        account_id: account_id.to_string(),
991        request_id: uuid::Uuid::new_v4().to_string(),
992        headers: HeaderMap::new(),
993        query_params: std::collections::HashMap::new(),
994        body,
995        body_stream: parking_lot::Mutex::new(None),
996        path_segments: vec![bucket.to_string(), format!("{prefix}result.json")],
997        raw_path: format!("/{bucket}/{prefix}result.json"),
998        raw_query: String::new(),
999        method: Method::PUT,
1000        is_query_protocol: false,
1001        access_key_id: None,
1002        principal: None,
1003    };
1004
1005    service.handle(req).await.map_err(|err| {
1006        let code = err.code().to_string();
1007        let msg = err.message();
1008        (
1009            format!("S3.{code}"),
1010            format!("ResultWriter PutObject failed: {msg}"),
1011        )
1012    })?;
1013
1014    Ok(())
1015}
1016
1017/// Apply ItemBatcher configuration to group items into batches.
1018fn apply_item_batcher(items: &[Value], batcher: &Value, _effective_input: &Value) -> Vec<Value> {
1019    let max_per_batch = batcher["MaxItemsPerBatch"].as_u64().unwrap_or(u64::MAX) as usize;
1020    let max_bytes = batcher["MaxInputBytesPerBatch"].as_u64().unwrap_or(0) as usize;
1021    let batch_input = batcher.get("BatchInput").cloned();
1022
1023    let mut batches: Vec<Vec<Value>> = Vec::new();
1024    let mut current_batch: Vec<Value> = Vec::new();
1025    let mut current_bytes = 0usize;
1026
1027    for item in items.iter().cloned() {
1028        let item_bytes = serde_json::to_vec(&item).unwrap_or_default().len();
1029        if !current_batch.is_empty()
1030            && (current_batch.len() >= max_per_batch
1031                || (max_bytes > 0 && current_bytes + item_bytes > max_bytes))
1032        {
1033            batches.push(current_batch);
1034            current_batch = Vec::new();
1035            current_bytes = 0;
1036        }
1037        current_bytes += item_bytes;
1038        current_batch.push(item);
1039    }
1040    if !current_batch.is_empty() {
1041        batches.push(current_batch);
1042    }
1043
1044    batches
1045        .into_iter()
1046        .enumerate()
1047        .map(|(index, batch)| {
1048            let mut map = serde_json::Map::new();
1049            map.insert("index".to_string(), json!(index));
1050            map.insert("items".to_string(), Value::Array(batch));
1051            if let Some(Value::Object(ref obj)) = batch_input {
1052                for (k, v) in obj {
1053                    map.insert(k.clone(), v.clone());
1054                }
1055            }
1056            Value::Object(map)
1057        })
1058        .collect()
1059}
1060
1061/// Invoke a resource (Lambda function or SDK integration).
1062#[allow(clippy::too_many_arguments)]
1063async fn invoke_resource(
1064    resource: &str,
1065    input: &Value,
1066    delivery: &Option<Arc<DeliveryBus>>,
1067    dynamodb_state: &Option<SharedDynamoDbState>,
1068    registry: &Option<SharedServiceRegistry>,
1069    execution_arn: &str,
1070    timeout_seconds: Option<u64>,
1071    heartbeat_seconds: Option<u64>,
1072    shared_state: &SharedStepFunctionsState,
1073) -> Result<Value, (String, String)> {
1074    // Direct activity ARN: arn:aws:states:<region>:<account>:activity:<name>
1075    if resource.contains(":states:") && resource.contains(":activity:") {
1076        return invoke_activity(
1077            resource,
1078            input,
1079            shared_state,
1080            timeout_seconds,
1081            heartbeat_seconds,
1082        )
1083        .await;
1084    }
1085
1086    // Direct Lambda ARN: arn:aws:lambda:<region>:<account>:function:<name>
1087    if resource.contains(":lambda:") && resource.contains(":function:") {
1088        return invoke_lambda_direct(resource, input, delivery, timeout_seconds).await;
1089    }
1090
1091    // SDK integration patterns: arn:aws:states:::<service>:<action>
1092    if resource.starts_with("arn:aws:states:::lambda:invoke") {
1093        let function_name = input["FunctionName"].as_str().unwrap_or("");
1094        let payload = if let Some(p) = input.get("Payload") {
1095            p.clone()
1096        } else {
1097            input.clone()
1098        };
1099        return invoke_lambda_direct(function_name, &payload, delivery, timeout_seconds).await;
1100    }
1101
1102    if resource.starts_with("arn:aws:states:::sqs:sendMessage") {
1103        return invoke_sqs_send_message(input, delivery);
1104    }
1105
1106    if resource.starts_with("arn:aws:states:::sns:publish") {
1107        return invoke_sns_publish(input, delivery);
1108    }
1109
1110    if resource.starts_with("arn:aws:states:::events:putEvents") {
1111        return invoke_eventbridge_put_events(input, delivery);
1112    }
1113
1114    if resource.starts_with("arn:aws:states:::dynamodb:getItem") {
1115        return invoke_dynamodb_get_item(input, dynamodb_state);
1116    }
1117
1118    if resource.starts_with("arn:aws:states:::dynamodb:putItem") {
1119        return invoke_dynamodb_put_item(input, dynamodb_state);
1120    }
1121
1122    if resource.starts_with("arn:aws:states:::dynamodb:deleteItem") {
1123        return invoke_dynamodb_delete_item(input, dynamodb_state);
1124    }
1125
1126    if resource.starts_with("arn:aws:states:::dynamodb:updateItem") {
1127        return invoke_dynamodb_update_item(input, dynamodb_state);
1128    }
1129
1130    // Nested Step Functions execution:
1131    //   arn:aws:states:::states:startExecution[.sync|.waitForTaskToken]
1132    // Routes through the same registry path as the generic aws-sdk
1133    // integration so the inner execution sees a real `StartExecution`
1134    // call. `.sync` polls `DescribeExecution` until the inner execution
1135    // reaches a terminal state, mirroring AWS' behavior of bubbling the
1136    // inner Output back to the parent.
1137    if let Some(tail) = resource.strip_prefix("arn:aws:states:::") {
1138        if tail.starts_with("states:startExecution") {
1139            let account_id = account_from_execution_arn(execution_arn);
1140            let result =
1141                invoke_aws_sdk_integration(tail, input, registry, &account_id, timeout_seconds)
1142                    .await;
1143            // Patch the inner execution's parent_execution_arn so the
1144            // `/execution-tree` introspection can walk back-references.
1145            // For `.sync`, the result is the DescribeExecution shape
1146            // (has `executionArn`); for fire-and-forget StartExecution the
1147            // initial StartExecution response also carries `executionArn`.
1148            if let Ok(ref value) = result {
1149                if let Some(inner_arn) = value
1150                    .get("executionArn")
1151                    .or_else(|| value.get("ExecutionArn"))
1152                    .and_then(Value::as_str)
1153                {
1154                    let mut accounts = shared_state.write();
1155                    if let Some(state) = accounts.get_mut(&account_id) {
1156                        if let Some(exec) = state.executions.get_mut(inner_arn) {
1157                            exec.parent_execution_arn = Some(execution_arn.to_string());
1158                        }
1159                    }
1160                }
1161            }
1162            return result;
1163        }
1164    }
1165
1166    // Generic AWS SDK integration: arn:aws:states:::aws-sdk:<service>:<action>[.<wait>]
1167    // Routes the call to the registered service via the central
1168    // ServiceRegistry, passing the Task's `Parameters` block as the
1169    // request body. Mirrors the AWS SDK service integration pattern in
1170    // real Step Functions.
1171    if let Some(rest) = resource.strip_prefix("arn:aws:states:::aws-sdk:") {
1172        let account_id = account_from_execution_arn(execution_arn);
1173        return invoke_aws_sdk_integration(rest, input, registry, &account_id, timeout_seconds)
1174            .await;
1175    }
1176
1177    // Optimized service integrations expose `.sync` variants for ECS,
1178    // Athena, and Glue. Route them through the same waiter machinery as
1179    // `aws-sdk:` so callers can write the AWS-blessed ARN forms.
1180    if let Some(tail) = resource.strip_prefix("arn:aws:states:::") {
1181        if tail.contains(".sync") {
1182            let account_id = account_from_execution_arn(execution_arn);
1183            return invoke_aws_sdk_integration(tail, input, registry, &account_id, timeout_seconds)
1184                .await;
1185        }
1186    }
1187
1188    Err((
1189        "States.TaskFailed".to_string(),
1190        format!("Unsupported resource: {resource}"),
1191    ))
1192}
1193
1194/// Convert an SDK integration action (camelCase, e.g. `getItem`) to its
1195/// PascalCase wire form (`GetItem`). Step Functions Task ARNs use
1196/// camelCase, but the underlying AWS service handlers expect PascalCase
1197/// `X-Amz-Target`-style action names.
1198fn camel_to_pascal(action: &str) -> String {
1199    let mut chars = action.chars();
1200    match chars.next() {
1201        None => String::new(),
1202        Some(first) => first.to_ascii_uppercase().to_string() + chars.as_str(),
1203    }
1204}
1205
1206/// Map a Step Functions SDK integration service id to the corresponding
1207/// fakecloud `service_name()`. Most match 1:1, but a handful of AWS SDK
1208/// service ids differ from the SigV4 service identifier we register
1209/// services under (e.g. `sfn` -> `states`, `cloudwatchlogs` -> `logs`).
1210fn map_sdk_service_id(service_id: &str) -> &str {
1211    match service_id {
1212        "sfn" => "states",
1213        "cloudwatchlogs" => "logs",
1214        // Default: pass through unchanged.
1215        other => other,
1216    }
1217}
1218
1219/// Extract the AWS account id from a Step Functions execution ARN
1220/// (`arn:aws:states:<region>:<account>:execution:...`). Falls back to
1221/// the AWS-conventional fixture id if the ARN is malformed.
1222fn account_from_execution_arn(execution_arn: &str) -> String {
1223    execution_arn
1224        .split(':')
1225        .nth(4)
1226        .filter(|s| !s.is_empty())
1227        .unwrap_or("123456789012")
1228        .to_string()
1229}
1230
1231/// Dispatch a Step Functions `aws-sdk:<service>:<action>` Task to the
1232/// registered service via the central [`fakecloud_core::registry::ServiceRegistry`].
1233/// `tail` is the trailing portion of the resource ARN after the
1234/// `aws-sdk:` prefix (e.g. `dynamodb:getItem` or `sqs:sendMessage.waitForTaskToken`).
1235async fn invoke_aws_sdk_integration(
1236    tail: &str,
1237    input: &Value,
1238    registry: &Option<SharedServiceRegistry>,
1239    account_id: &str,
1240    timeout_seconds: Option<u64>,
1241) -> Result<Value, (String, String)> {
1242    let registry_arc = resolve_registry(registry)?;
1243
1244    // Split `<service>:<action>[.<modifier>]`. The `.waitForTaskToken`
1245    // modifier is accepted but ignored — the integration runs synchronously
1246    // regardless. The `.sync` modifier triggers a polling loop after the
1247    // initial call to wait for the downstream operation to reach a terminal
1248    // state.
1249    let mut parts = tail.splitn(2, ':');
1250    let service_id = parts.next().filter(|s| !s.is_empty()).ok_or_else(|| {
1251        (
1252            "States.TaskFailed".to_string(),
1253            format!("Invalid aws-sdk Resource ARN: missing service in '{tail}'"),
1254        )
1255    })?;
1256    let action_with_mod = parts.next().filter(|s| !s.is_empty()).ok_or_else(|| {
1257        (
1258            "States.TaskFailed".to_string(),
1259            format!("Invalid aws-sdk Resource ARN: missing action in '{tail}'"),
1260        )
1261    })?;
1262    let action_camel = action_with_mod
1263        .split('.')
1264        .next()
1265        .filter(|s| !s.is_empty())
1266        .ok_or_else(|| {
1267            (
1268                "States.TaskFailed".to_string(),
1269                format!("Invalid aws-sdk Resource ARN: empty action in '{tail}'"),
1270            )
1271        })?;
1272    let modifiers: Vec<&str> = action_with_mod.split('.').skip(1).collect();
1273    let is_sync = modifiers.iter().any(|m| *m == "sync" || *m == "sync:2");
1274
1275    let action_pascal = camel_to_pascal(action_camel);
1276    let service_name = map_sdk_service_id(service_id).to_string();
1277
1278    // ECS's wire format is camelCase, but Step Functions's optimized
1279    // integration takes PascalCase parameter names in the state machine
1280    // definition. Translate top-level keys for the ECS service so AWS
1281    // Step Functions definitions Just Work against the fake.
1282    let translated_input = match service_name.as_str() {
1283        "ecs" => translate_ecs_keys_to_camel(input),
1284        _ => input.clone(),
1285    };
1286
1287    let initial = call_sdk_action(
1288        &registry_arc,
1289        &service_name,
1290        &action_pascal,
1291        &translated_input,
1292        account_id,
1293    )
1294    .await?;
1295
1296    if !is_sync {
1297        return Ok(initial);
1298    }
1299
1300    // `.sync` pattern: dispatch to the per-service waiter that polls until
1301    // the downstream operation reaches a terminal state. Returns the full
1302    // describe-shape result, or surfaces a terminal failure as
1303    // `States.TaskFailed`.
1304    sync_wait(
1305        &registry_arc,
1306        &service_name,
1307        &action_pascal,
1308        &initial,
1309        &translated_input,
1310        account_id,
1311        timeout_seconds,
1312    )
1313    .await
1314}
1315
1316/// Translate the top-level PascalCase keys that AWS Step Functions
1317/// state machines use for `ecs:runTask` Parameters into the camelCase
1318/// shape that the AWS ECS API (and our handler) expects. Unknown keys
1319/// pass through unchanged so callers can still send raw camelCase. The
1320/// translation is shallow on purpose — nested overrides keep their
1321/// existing camelCase shape on real AWS too.
1322fn translate_ecs_keys_to_camel(input: &Value) -> Value {
1323    let Some(obj) = input.as_object() else {
1324        return input.clone();
1325    };
1326    let mut out = serde_json::Map::with_capacity(obj.len());
1327    for (k, v) in obj.iter() {
1328        let camel = match k.as_str() {
1329            "Cluster" => "cluster",
1330            "TaskDefinition" => "taskDefinition",
1331            "LaunchType" => "launchType",
1332            "Group" => "group",
1333            "Overrides" => "overrides",
1334            "PlatformVersion" => "platformVersion",
1335            "NetworkConfiguration" => "networkConfiguration",
1336            "Tags" => "tags",
1337            "EnableExecuteCommand" => "enableExecuteCommand",
1338            "PropagateTags" => "propagateTags",
1339            "ReferenceId" => "referenceId",
1340            "StartedBy" => "startedBy",
1341            "Count" => "count",
1342            "CapacityProviderStrategy" => "capacityProviderStrategy",
1343            "PlacementConstraints" => "placementConstraints",
1344            "PlacementStrategy" => "placementStrategy",
1345            other => other,
1346        };
1347        out.insert(camel.to_string(), v.clone());
1348    }
1349    Value::Object(out)
1350}
1351
1352fn resolve_registry(
1353    registry: &Option<SharedServiceRegistry>,
1354) -> Result<Arc<fakecloud_core::registry::ServiceRegistry>, (String, String)> {
1355    let registry_handle = registry.as_ref().ok_or_else(|| {
1356        (
1357            "States.TaskFailed".to_string(),
1358            "No service registry configured for aws-sdk integration".to_string(),
1359        )
1360    })?;
1361    registry_handle.get().cloned().ok_or_else(|| {
1362        (
1363            "States.TaskFailed".to_string(),
1364            "Service registry not yet initialised; aws-sdk integration unavailable".to_string(),
1365        )
1366    })
1367}
1368
1369/// Call a single AWS SDK action against the registered service handler.
1370async fn call_sdk_action(
1371    registry: &Arc<fakecloud_core::registry::ServiceRegistry>,
1372    service_name: &str,
1373    action_pascal: &str,
1374    input: &Value,
1375    account_id: &str,
1376) -> Result<Value, (String, String)> {
1377    use bytes::Bytes;
1378    use fakecloud_core::service::AwsRequest;
1379    use http::{HeaderMap, Method};
1380
1381    let service = registry.get(service_name).ok_or_else(|| {
1382        (
1383            "States.TaskFailed".to_string(),
1384            format!("Unknown aws-sdk service '{service_name}'"),
1385        )
1386    })?;
1387
1388    let body_bytes = Bytes::from(
1389        serde_json::to_vec(input).expect("serde_json::Value serialization is infallible"),
1390    );
1391
1392    let req = AwsRequest {
1393        service: service_name.to_string(),
1394        action: action_pascal.to_string(),
1395        region: "us-east-1".to_string(),
1396        account_id: account_id.to_string(),
1397        request_id: uuid::Uuid::new_v4().to_string(),
1398        headers: HeaderMap::new(),
1399        query_params: std::collections::HashMap::new(),
1400        body: body_bytes,
1401        body_stream: parking_lot::Mutex::new(None),
1402        path_segments: vec![],
1403        raw_path: "/".to_string(),
1404        raw_query: String::new(),
1405        method: Method::POST,
1406        is_query_protocol: false,
1407        access_key_id: None,
1408        principal: None,
1409    };
1410
1411    let response = service.handle(req).await.map_err(|err| {
1412        let code = err.code().to_string();
1413        let msg = err.message();
1414        let prefix_service = match service_name {
1415            "dynamodb" => "DynamoDb".to_string(),
1416            "states" => "Sfn".to_string(),
1417            other => camel_to_pascal(other),
1418        };
1419        (
1420            format!("{prefix_service}.{code}"),
1421            format!("{action_pascal} failed: {msg}"),
1422        )
1423    })?;
1424
1425    let response_bytes = match response.body {
1426        fakecloud_core::service::ResponseBody::Bytes(b) => b,
1427        fakecloud_core::service::ResponseBody::File { .. } => {
1428            return Err((
1429                "States.TaskFailed".to_string(),
1430                "aws-sdk integration: file-backed response not supported".to_string(),
1431            ));
1432        }
1433    };
1434
1435    if response_bytes.is_empty() {
1436        return Ok(json!({}));
1437    }
1438    serde_json::from_slice(&response_bytes).map_err(|e| {
1439        (
1440            "States.TaskFailed".to_string(),
1441            format!("aws-sdk integration: failed to parse response JSON: {e}"),
1442        )
1443    })
1444}
1445
1446/// Call an SDK action and return raw bytes without JSON parsing.
1447/// Used by distributed Map mode to read/write S3 objects.
1448async fn call_sdk_action_raw_bytes(
1449    registry: &Arc<fakecloud_core::registry::ServiceRegistry>,
1450    service_name: &str,
1451    action_pascal: &str,
1452    input: &Value,
1453    account_id: &str,
1454) -> Result<bytes::Bytes, (String, String)> {
1455    use bytes::Bytes;
1456    use fakecloud_core::service::AwsRequest;
1457    use http::{HeaderMap, Method};
1458
1459    let service = registry.get(service_name).ok_or_else(|| {
1460        (
1461            "States.TaskFailed".to_string(),
1462            format!("Unknown aws-sdk service '{service_name}'"),
1463        )
1464    })?;
1465
1466    let body_bytes = Bytes::from(
1467        serde_json::to_vec(input).expect("serde_json::Value serialization is infallible"),
1468    );
1469
1470    let req = AwsRequest {
1471        service: service_name.to_string(),
1472        action: action_pascal.to_string(),
1473        region: "us-east-1".to_string(),
1474        account_id: account_id.to_string(),
1475        request_id: uuid::Uuid::new_v4().to_string(),
1476        headers: HeaderMap::new(),
1477        query_params: std::collections::HashMap::new(),
1478        body: body_bytes,
1479        body_stream: parking_lot::Mutex::new(None),
1480        path_segments: vec![],
1481        raw_path: "/".to_string(),
1482        raw_query: String::new(),
1483        method: Method::POST,
1484        is_query_protocol: false,
1485        access_key_id: None,
1486        principal: None,
1487    };
1488
1489    let response = service.handle(req).await.map_err(|err| {
1490        let code = err.code().to_string();
1491        let msg = err.message();
1492        let prefix_service = match service_name {
1493            "dynamodb" => "DynamoDb".to_string(),
1494            "states" => "Sfn".to_string(),
1495            other => camel_to_pascal(other),
1496        };
1497        (
1498            format!("{prefix_service}.{code}"),
1499            format!("{action_pascal} failed: {msg}"),
1500        )
1501    })?;
1502
1503    match response.body {
1504        fakecloud_core::service::ResponseBody::Bytes(b) => Ok(b),
1505        fakecloud_core::service::ResponseBody::File { .. } => Err((
1506            "States.TaskFailed".to_string(),
1507            "aws-sdk integration: file-backed response not supported".to_string(),
1508        )),
1509    }
1510}
1511
1512/// Cap on `.sync` polling so a stuck downstream task can't hang an
1513/// execution forever. Mirrors the `TimeoutSeconds` knob on the Task state
1514/// when one is set; otherwise defaults to 5 minutes which is enough for
1515/// the in-process services that ship today (Athena returns synchronously,
1516/// ECS tasks finish within seconds even when docker-less).
1517const SYNC_DEFAULT_TIMEOUT_SECS: u64 = 300;
1518const SYNC_POLL_INTERVAL_MS: u64 = 200;
1519
1520/// Dispatch `.sync` waiters by service+action. Each waiter polls the
1521/// matching describe-style API until the downstream operation reaches a
1522/// terminal state, then returns the full describe response.
1523async fn sync_wait(
1524    registry: &Arc<fakecloud_core::registry::ServiceRegistry>,
1525    service_name: &str,
1526    action_pascal: &str,
1527    initial: &Value,
1528    input: &Value,
1529    account_id: &str,
1530    timeout_seconds: Option<u64>,
1531) -> Result<Value, (String, String)> {
1532    match (service_name, action_pascal) {
1533        ("ecs", "RunTask") => {
1534            sync_wait_ecs_run_task(registry, initial, input, account_id, timeout_seconds).await
1535        }
1536        ("athena", "StartQueryExecution") => {
1537            sync_wait_athena_query(registry, initial, account_id, timeout_seconds).await
1538        }
1539        ("states", "StartExecution") => {
1540            sync_wait_states_start_execution(registry, initial, account_id, timeout_seconds).await
1541        }
1542        ("glue", "StartJobRun") => {
1543            // Glue has no real job runner in fakecloud; treat the run as
1544            // immediately SUCCEEDED so `.sync` callers see a terminal
1545            // result rather than spinning forever. Real AWS would poll
1546            // `GetJobRun` until JobRunState in {SUCCEEDED,FAILED,STOPPED,
1547            // TIMEOUT}, so we synthesize the SUCCEEDED shape.
1548            let job_run_id = initial
1549                .get("JobRunId")
1550                .and_then(Value::as_str)
1551                .unwrap_or("synthetic")
1552                .to_string();
1553            let job_name = input
1554                .get("JobName")
1555                .and_then(Value::as_str)
1556                .unwrap_or("")
1557                .to_string();
1558            Ok(json!({
1559                "JobRun": {
1560                    "Id": job_run_id,
1561                    "JobName": job_name,
1562                    "JobRunState": "SUCCEEDED",
1563                }
1564            }))
1565        }
1566        _ => Err((
1567            "States.TaskFailed".to_string(),
1568            format!(
1569                "`.sync` is not supported for {service_name}:{action_pascal} yet — \
1570                 supported: ecs:RunTask, athena:StartQueryExecution, glue:StartJobRun, states:StartExecution"
1571            ),
1572        )),
1573    }
1574}
1575
1576async fn sync_wait_ecs_run_task(
1577    registry: &Arc<fakecloud_core::registry::ServiceRegistry>,
1578    initial: &Value,
1579    input: &Value,
1580    account_id: &str,
1581    timeout_seconds: Option<u64>,
1582) -> Result<Value, (String, String)> {
1583    let tasks = initial
1584        .get("tasks")
1585        .and_then(Value::as_array)
1586        .ok_or_else(|| {
1587            (
1588                "States.TaskFailed".to_string(),
1589                "ecs:RunTask.sync: response missing 'tasks' array".to_string(),
1590            )
1591        })?;
1592    if tasks.is_empty() {
1593        return Err((
1594            "States.TaskFailed".to_string(),
1595            "ecs:RunTask.sync: no tasks were started".to_string(),
1596        ));
1597    }
1598    let task_arns: Vec<String> = tasks
1599        .iter()
1600        .filter_map(|t| t.get("taskArn").and_then(Value::as_str).map(String::from))
1601        .collect();
1602    let cluster = input
1603        .get("cluster")
1604        .or_else(|| input.get("Cluster"))
1605        .and_then(Value::as_str)
1606        .map(String::from);
1607
1608    let deadline = sync_deadline(timeout_seconds);
1609    loop {
1610        let mut describe_input = json!({ "tasks": task_arns });
1611        if let Some(c) = &cluster {
1612            describe_input["cluster"] = json!(c);
1613        }
1614        let described = call_sdk_action(
1615            registry,
1616            "ecs",
1617            "DescribeTasks",
1618            &describe_input,
1619            account_id,
1620        )
1621        .await?;
1622        let described_tasks = described
1623            .get("tasks")
1624            .and_then(Value::as_array)
1625            .cloned()
1626            .unwrap_or_default();
1627        let all_stopped = !described_tasks.is_empty()
1628            && described_tasks
1629                .iter()
1630                .all(|t| t.get("lastStatus").and_then(Value::as_str) == Some("STOPPED"));
1631        if all_stopped {
1632            // Surface non-zero container exit codes or stop codes that map
1633            // to AWS-style failures. Per AWS docs, ECS RunTask.sync raises
1634            // States.TaskFailed when any container exits non-zero or the
1635            // task stops with a failure code.
1636            let any_failed = described_tasks.iter().any(|t| {
1637                let stop_code = t.get("stopCode").and_then(Value::as_str);
1638                let bad_stop = matches!(
1639                    stop_code,
1640                    Some(
1641                        "TaskFailedToStart"
1642                            | "EssentialContainerExited"
1643                            | "ServiceSchedulerInitiated"
1644                    )
1645                );
1646                let bad_exit = t
1647                    .get("containers")
1648                    .and_then(Value::as_array)
1649                    .map(|cs| {
1650                        cs.iter().any(|c| {
1651                            c.get("exitCode")
1652                                .and_then(Value::as_i64)
1653                                .map(|n| n != 0)
1654                                .unwrap_or(false)
1655                        })
1656                    })
1657                    .unwrap_or(false);
1658                bad_stop || bad_exit
1659            });
1660            if any_failed {
1661                let cause = described_tasks
1662                    .iter()
1663                    .find_map(|t| {
1664                        t.get("stoppedReason")
1665                            .and_then(Value::as_str)
1666                            .map(String::from)
1667                    })
1668                    .unwrap_or_else(|| "ECS task failed".to_string());
1669                return Err(("States.TaskFailed".to_string(), cause));
1670            }
1671            return Ok(described);
1672        }
1673        if std::time::Instant::now() >= deadline {
1674            return Err((
1675                "States.Timeout".to_string(),
1676                format!(
1677                    "ecs:RunTask.sync timed out after {}s waiting for {} task(s) to STOP",
1678                    sync_timeout_secs(timeout_seconds),
1679                    task_arns.len()
1680                ),
1681            ));
1682        }
1683        tokio::time::sleep(std::time::Duration::from_millis(SYNC_POLL_INTERVAL_MS)).await;
1684    }
1685}
1686
1687async fn sync_wait_athena_query(
1688    registry: &Arc<fakecloud_core::registry::ServiceRegistry>,
1689    initial: &Value,
1690    account_id: &str,
1691    timeout_seconds: Option<u64>,
1692) -> Result<Value, (String, String)> {
1693    let qid = initial
1694        .get("QueryExecutionId")
1695        .and_then(Value::as_str)
1696        .ok_or_else(|| {
1697            (
1698                "States.TaskFailed".to_string(),
1699                "athena:StartQueryExecution.sync: response missing QueryExecutionId".to_string(),
1700            )
1701        })?
1702        .to_string();
1703
1704    let deadline = sync_deadline(timeout_seconds);
1705    loop {
1706        let described = call_sdk_action(
1707            registry,
1708            "athena",
1709            "GetQueryExecution",
1710            &json!({ "QueryExecutionId": qid }),
1711            account_id,
1712        )
1713        .await?;
1714        let state = described
1715            .get("QueryExecution")
1716            .and_then(|qe| qe.get("Status"))
1717            .and_then(|s| s.get("State"))
1718            .and_then(Value::as_str)
1719            .unwrap_or("");
1720        match state {
1721            "SUCCEEDED" => return Ok(described),
1722            "FAILED" | "CANCELLED" => {
1723                let cause = described
1724                    .get("QueryExecution")
1725                    .and_then(|qe| qe.get("Status"))
1726                    .and_then(|s| s.get("StateChangeReason"))
1727                    .and_then(Value::as_str)
1728                    .unwrap_or("Athena query reached terminal failure state")
1729                    .to_string();
1730                return Err(("States.TaskFailed".to_string(), cause));
1731            }
1732            _ => {}
1733        }
1734        if std::time::Instant::now() >= deadline {
1735            return Err((
1736                "States.Timeout".to_string(),
1737                format!(
1738                    "athena:StartQueryExecution.sync timed out after {}s for query {qid}",
1739                    sync_timeout_secs(timeout_seconds)
1740                ),
1741            ));
1742        }
1743        tokio::time::sleep(std::time::Duration::from_millis(SYNC_POLL_INTERVAL_MS)).await;
1744    }
1745}
1746
1747/// Wait for a nested Step Functions execution to reach a terminal
1748/// state, then return the bubbled `Output` like AWS does for
1749/// `arn:aws:states:::states:startExecution.sync`. The initial
1750/// `StartExecution` response carries `executionArn`; we poll
1751/// `DescribeExecution` until status leaves `RUNNING`.
1752async fn sync_wait_states_start_execution(
1753    registry: &Arc<fakecloud_core::registry::ServiceRegistry>,
1754    initial: &Value,
1755    account_id: &str,
1756    timeout_seconds: Option<u64>,
1757) -> Result<Value, (String, String)> {
1758    let exec_arn = initial
1759        .get("executionArn")
1760        .or_else(|| initial.get("ExecutionArn"))
1761        .and_then(Value::as_str)
1762        .ok_or_else(|| {
1763            (
1764                "States.TaskFailed".to_string(),
1765                "states:startExecution.sync: response missing executionArn".to_string(),
1766            )
1767        })?
1768        .to_string();
1769
1770    let deadline = sync_deadline(timeout_seconds);
1771    loop {
1772        let described = call_sdk_action(
1773            registry,
1774            "states",
1775            "DescribeExecution",
1776            &json!({ "executionArn": exec_arn }),
1777            account_id,
1778        )
1779        .await?;
1780        let status = described
1781            .get("status")
1782            .or_else(|| described.get("Status"))
1783            .and_then(Value::as_str)
1784            .unwrap_or("");
1785        match status {
1786            "SUCCEEDED" => return Ok(described),
1787            "FAILED" | "TIMED_OUT" | "ABORTED" => {
1788                let cause = described
1789                    .get("cause")
1790                    .or_else(|| described.get("Cause"))
1791                    .and_then(Value::as_str)
1792                    .unwrap_or("Nested execution reached terminal failure state")
1793                    .to_string();
1794                return Err(("States.TaskFailed".to_string(), cause));
1795            }
1796            _ => {}
1797        }
1798        if std::time::Instant::now() >= deadline {
1799            return Err((
1800                "States.Timeout".to_string(),
1801                format!(
1802                    "states:startExecution.sync timed out after {}s for {exec_arn}",
1803                    sync_timeout_secs(timeout_seconds)
1804                ),
1805            ));
1806        }
1807        tokio::time::sleep(std::time::Duration::from_millis(SYNC_POLL_INTERVAL_MS)).await;
1808    }
1809}
1810
1811fn sync_timeout_secs(timeout_seconds: Option<u64>) -> u64 {
1812    timeout_seconds.unwrap_or(SYNC_DEFAULT_TIMEOUT_SECS)
1813}
1814
1815fn sync_deadline(timeout_seconds: Option<u64>) -> std::time::Instant {
1816    std::time::Instant::now() + std::time::Duration::from_secs(sync_timeout_secs(timeout_seconds))
1817}
1818
1819#[derive(Clone, Copy)]
1820pub(crate) enum UpdateClause {
1821    Set,
1822    Remove,
1823    Add,
1824    Delete,
1825}
1826
1827/// Invoke a Lambda function directly via DeliveryBus.
1828async fn invoke_lambda_direct(
1829    function_arn: &str,
1830    input: &Value,
1831    delivery: &Option<Arc<DeliveryBus>>,
1832    timeout_seconds: Option<u64>,
1833) -> Result<Value, (String, String)> {
1834    let delivery = delivery.as_ref().ok_or_else(|| {
1835        (
1836            "States.TaskFailed".to_string(),
1837            "No delivery bus configured for Lambda invocation".to_string(),
1838        )
1839    })?;
1840
1841    let payload =
1842        serde_json::to_string(input).expect("serde_json::Value serialization is infallible");
1843
1844    let invoke_future = delivery.invoke_lambda(function_arn, &payload);
1845
1846    let result = if let Some(timeout) = timeout_seconds {
1847        match tokio::time::timeout(tokio::time::Duration::from_secs(timeout), invoke_future).await {
1848            Ok(r) => r,
1849            Err(_) => {
1850                return Err((
1851                    "States.Timeout".to_string(),
1852                    format!("Task timed out after {timeout} seconds"),
1853                ));
1854            }
1855        }
1856    } else {
1857        invoke_future.await
1858    };
1859
1860    match result {
1861        Some(Ok(bytes)) => {
1862            let response_str = String::from_utf8_lossy(&bytes);
1863            let value: Value =
1864                serde_json::from_str(&response_str).unwrap_or(json!(response_str.to_string()));
1865            Ok(value)
1866        }
1867        Some(Err(e)) => Err(("States.TaskFailed".to_string(), e)),
1868        None => {
1869            // No runtime available — return empty result
1870            Ok(json!({}))
1871        }
1872    }
1873}
1874
1875/// Invoke an activity worker. Inserts a `PENDING` token into shared state
1876/// so a worker can claim it via `GetActivityTask`, then polls until the
1877/// worker calls `SendTaskSuccess` / `SendTaskFailure` or the heartbeat /
1878/// timeout windows expire.
1879async fn invoke_activity(
1880    activity_arn: &str,
1881    input: &Value,
1882    shared_state: &SharedStepFunctionsState,
1883    timeout_seconds: Option<u64>,
1884    heartbeat_seconds: Option<u64>,
1885) -> Result<Value, (String, String)> {
1886    use crate::state::TaskTokenState;
1887
1888    // Activity must exist (look up across accounts via ARN segment).
1889    let activity_account = activity_arn.split(':').nth(4).unwrap_or("").to_string();
1890    {
1891        let accounts = shared_state.read();
1892        let exists = accounts
1893            .get(&activity_account)
1894            .map(|s| s.activities.contains_key(activity_arn))
1895            .unwrap_or(false);
1896        if !exists {
1897            return Err((
1898                "States.TaskFailed".to_string(),
1899                format!("Activity does not exist: {activity_arn}"),
1900            ));
1901        }
1902    }
1903
1904    let token = format!(
1905        "FCToken-{}-{}",
1906        chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0),
1907        uuid::Uuid::new_v4().simple(),
1908    );
1909    let now = chrono::Utc::now();
1910    let input_str =
1911        serde_json::to_string(input).expect("serde_json::Value serialization is infallible");
1912    {
1913        let mut accounts = shared_state.write();
1914        let state = accounts.get_or_create(&activity_account);
1915        state.task_tokens.insert(
1916            token.clone(),
1917            TaskTokenState {
1918                activity_arn: activity_arn.to_string(),
1919                status: "PENDING".to_string(),
1920                output: None,
1921                error: None,
1922                cause: None,
1923                input: Some(input_str),
1924                created_at: now,
1925                last_heartbeat_at: None,
1926                heartbeat_seconds: heartbeat_seconds.map(|s| s as i64),
1927                timeout_seconds: timeout_seconds.map(|s| s as i64),
1928            },
1929        );
1930    }
1931
1932    poll_task_token(
1933        shared_state,
1934        &activity_account,
1935        &token,
1936        timeout_seconds,
1937        heartbeat_seconds,
1938    )
1939    .await
1940}
1941
1942pub(crate) enum NextState {
1943    Name(String),
1944    End,
1945    Error(String),
1946}
1947
1948#[path = "interpreter_helpers.rs"]
1949mod interpreter_helpers;
1950pub(crate) use interpreter_helpers::*;
1951
1952#[cfg(test)]
1953#[path = "interpreter_tests.rs"]
1954mod tests;