Skip to main content

fakecloud_stepfunctions/
service.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use chrono::Utc;
6use http::StatusCode;
7use serde_json::{json, Value};
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::pagination::paginate;
12use fakecloud_core::registry::ServiceRegistry;
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_core::validation::*;
15use fakecloud_dynamodb::SharedDynamoDbState;
16use fakecloud_persistence::SnapshotStore;
17
18use crate::interpreter;
19use crate::state::{
20    Execution, ExecutionStatus, SharedStepFunctionsState, StateMachine, StateMachineStatus,
21    StateMachineType, StepFunctionsSnapshot, StepFunctionsState,
22    STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
23};
24
25const SUPPORTED: &[&str] = &[
26    "CreateStateMachine",
27    "DescribeStateMachine",
28    "ListStateMachines",
29    "DeleteStateMachine",
30    "UpdateStateMachine",
31    "TagResource",
32    "UntagResource",
33    "ListTagsForResource",
34    "StartExecution",
35    "StopExecution",
36    "DescribeExecution",
37    "ListExecutions",
38    "GetExecutionHistory",
39    "DescribeStateMachineForExecution",
40    "CreateActivity",
41    "DeleteActivity",
42    "DescribeActivity",
43    "ListActivities",
44    "GetActivityTask",
45    "SendTaskFailure",
46    "SendTaskHeartbeat",
47    "SendTaskSuccess",
48    "PublishStateMachineVersion",
49    "DeleteStateMachineVersion",
50    "ListStateMachineVersions",
51    "CreateStateMachineAlias",
52    "DeleteStateMachineAlias",
53    "DescribeStateMachineAlias",
54    "ListStateMachineAliases",
55    "UpdateStateMachineAlias",
56    "DescribeMapRun",
57    "ListMapRuns",
58    "UpdateMapRun",
59    "RedriveExecution",
60    "StartSyncExecution",
61    "TestState",
62    "ValidateStateMachineDefinition",
63];
64
65/// Handle to the central service registry, set by `main.rs` after every service
66/// has been registered. Wrapped in `OnceLock` so `StepFunctionsService` can be
67/// constructed (and registered into the very registry it later reads back) before
68/// the registry itself is finalized. The interpreter snapshots the inner `Arc`
69/// when it needs to dispatch generic `aws-sdk:*` Task integrations.
70pub type SharedServiceRegistry = Arc<std::sync::OnceLock<Arc<ServiceRegistry>>>;
71
72pub struct StepFunctionsService {
73    state: SharedStepFunctionsState,
74    delivery: Option<Arc<DeliveryBus>>,
75    dynamodb_state: Option<SharedDynamoDbState>,
76    registry: Option<SharedServiceRegistry>,
77    snapshot_store: Option<Arc<dyn SnapshotStore>>,
78    snapshot_lock: Arc<AsyncMutex<()>>,
79}
80
81impl StepFunctionsService {
82    pub fn new(state: SharedStepFunctionsState) -> Self {
83        Self {
84            state,
85            delivery: None,
86            dynamodb_state: None,
87            registry: None,
88            snapshot_store: None,
89            snapshot_lock: Arc::new(AsyncMutex::new(())),
90        }
91    }
92
93    pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
94        self.delivery = Some(delivery);
95        self
96    }
97
98    pub fn with_dynamodb(mut self, dynamodb_state: SharedDynamoDbState) -> Self {
99        self.dynamodb_state = Some(dynamodb_state);
100        self
101    }
102
103    /// Hand the service a deferred-fill handle to the central [`ServiceRegistry`].
104    /// `main.rs` calls [`OnceLock::set`] on the inner cell after every service
105    /// has been registered; until then the interpreter falls back to its
106    /// hand-coded SDK integrations (lambda invoke, sqs sendMessage, …).
107    pub fn with_registry(mut self, registry: SharedServiceRegistry) -> Self {
108        self.registry = Some(registry);
109        self
110    }
111
112    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
113        self.snapshot_store = Some(store);
114        self
115    }
116
117    async fn save_snapshot(&self) {
118        let Some(store) = self.snapshot_store.clone() else {
119            return;
120        };
121        let _guard = self.snapshot_lock.lock().await;
122        let snapshot = StepFunctionsSnapshot {
123            schema_version: STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
124            state: None,
125            accounts: Some(self.state.read().clone()),
126        };
127        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
128            let bytes = serde_json::to_vec(&snapshot)
129                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
130            store.save(&bytes)
131        })
132        .await;
133        match join {
134            Ok(Ok(())) => {}
135            Ok(Err(err)) => tracing::error!(%err, "failed to write stepfunctions snapshot"),
136            Err(err) => tracing::error!(%err, "stepfunctions snapshot task panicked"),
137        }
138    }
139}
140
141fn is_mutating_action(action: &str) -> bool {
142    matches!(
143        action,
144        "CreateStateMachine"
145            | "DeleteStateMachine"
146            | "UpdateStateMachine"
147            | "TagResource"
148            | "UntagResource"
149            | "StartExecution"
150            | "StopExecution"
151            | "CreateActivity"
152            | "DeleteActivity"
153            | "GetActivityTask"
154            | "SendTaskFailure"
155            | "SendTaskHeartbeat"
156            | "SendTaskSuccess"
157            | "PublishStateMachineVersion"
158            | "DeleteStateMachineVersion"
159            | "CreateStateMachineAlias"
160            | "DeleteStateMachineAlias"
161            | "UpdateStateMachineAlias"
162            | "UpdateMapRun"
163            | "RedriveExecution"
164            | "StartSyncExecution"
165    )
166}
167
168#[async_trait]
169impl AwsService for StepFunctionsService {
170    fn service_name(&self) -> &str {
171        "states"
172    }
173
174    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
175        let mutates = is_mutating_action(req.action.as_str());
176        let result = match req.action.as_str() {
177            "CreateStateMachine" => self.create_state_machine(&req),
178            "DescribeStateMachine" => self.describe_state_machine(&req),
179            "ListStateMachines" => self.list_state_machines(&req),
180            "DeleteStateMachine" => self.delete_state_machine(&req),
181            "UpdateStateMachine" => self.update_state_machine(&req),
182            "TagResource" => self.tag_resource(&req),
183            "UntagResource" => self.untag_resource(&req),
184            "ListTagsForResource" => self.list_tags_for_resource(&req),
185            "StartExecution" => self.start_execution(&req),
186            "StopExecution" => self.stop_execution(&req),
187            "DescribeExecution" => self.describe_execution(&req),
188            "ListExecutions" => self.list_executions(&req),
189            "GetExecutionHistory" => self.get_execution_history(&req),
190            "DescribeStateMachineForExecution" => self.describe_state_machine_for_execution(&req),
191            "CreateActivity" => self.create_activity(&req),
192            "DeleteActivity" => self.delete_activity(&req),
193            "DescribeActivity" => self.describe_activity(&req),
194            "ListActivities" => self.list_activities(&req),
195            "GetActivityTask" => self.get_activity_task(&req).await,
196            "SendTaskFailure" => self.send_task_failure(&req),
197            "SendTaskHeartbeat" => self.send_task_heartbeat(&req),
198            "SendTaskSuccess" => self.send_task_success(&req),
199            "PublishStateMachineVersion" => self.publish_state_machine_version(&req),
200            "DeleteStateMachineVersion" => self.delete_state_machine_version(&req),
201            "ListStateMachineVersions" => self.list_state_machine_versions(&req),
202            "CreateStateMachineAlias" => self.create_state_machine_alias(&req),
203            "DeleteStateMachineAlias" => self.delete_state_machine_alias(&req),
204            "DescribeStateMachineAlias" => self.describe_state_machine_alias(&req),
205            "ListStateMachineAliases" => self.list_state_machine_aliases(&req),
206            "UpdateStateMachineAlias" => self.update_state_machine_alias(&req),
207            "DescribeMapRun" => self.describe_map_run(&req),
208            "ListMapRuns" => self.list_map_runs(&req),
209            "UpdateMapRun" => self.update_map_run(&req),
210            "RedriveExecution" => self.redrive_execution(&req),
211            "StartSyncExecution" => self.start_sync_execution(&req).await,
212            "TestState" => self.test_state(&req),
213            "ValidateStateMachineDefinition" => self.validate_state_machine_definition(&req),
214            _ => Err(AwsServiceError::action_not_implemented(
215                "states",
216                &req.action,
217            )),
218        };
219        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
220            self.save_snapshot().await;
221        }
222        result
223    }
224
225    fn supported_actions(&self) -> &[&str] {
226        SUPPORTED
227    }
228}
229
230impl StepFunctionsService {
231    // ─── State Machine CRUD ─────────────────────────────────────────────
232
233    fn create_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
234        let body = req.json_body();
235
236        validate_required("name", &body["name"])?;
237        let name = body["name"].as_str().ok_or_else(|| missing("name"))?;
238        validate_name(name)?;
239
240        validate_required("definition", &body["definition"])?;
241        let definition = body["definition"]
242            .as_str()
243            .ok_or_else(|| missing("definition"))?;
244        validate_definition(definition)?;
245
246        validate_required("roleArn", &body["roleArn"])?;
247        let role_arn = body["roleArn"].as_str().ok_or_else(|| missing("roleArn"))?;
248        validate_arn(role_arn)?;
249
250        let machine_type = if let Some(t) = body["type"].as_str() {
251            StateMachineType::parse(t).ok_or_else(|| {
252                AwsServiceError::aws_error(
253                    StatusCode::BAD_REQUEST,
254                    "ValidationException",
255                    format!(
256                        "Value '{t}' at 'type' failed to satisfy constraint: \
257                         Member must satisfy enum value set: [STANDARD, EXPRESS]"
258                    ),
259                )
260            })?
261        } else {
262            StateMachineType::Standard
263        };
264
265        let mut accounts = self.state.write();
266        let state = accounts.get_or_create(&req.account_id);
267        let arn = state.state_machine_arn(name);
268
269        // Check if name already exists
270        if state.state_machines.values().any(|sm| sm.name == name) {
271            return Err(AwsServiceError::aws_error(
272                StatusCode::CONFLICT,
273                "StateMachineAlreadyExists",
274                format!("State Machine Already Exists: '{arn}'"),
275            ));
276        }
277
278        let now = Utc::now();
279        let revision_id = uuid::Uuid::new_v4().to_string();
280
281        let mut tags = BTreeMap::new();
282        if !body["tags"].is_null() {
283            fakecloud_core::tags::apply_tags(&mut tags, &body, "tags", "key", "value").map_err(
284                |f| {
285                    AwsServiceError::aws_error(
286                        StatusCode::BAD_REQUEST,
287                        "ValidationException",
288                        format!("{f} must be a list"),
289                    )
290                },
291            )?;
292        }
293
294        let sm = StateMachine {
295            name: name.to_string(),
296            arn: arn.clone(),
297            definition: definition.to_string(),
298            role_arn: role_arn.to_string(),
299            machine_type,
300            status: StateMachineStatus::Active,
301            creation_date: now,
302            update_date: now,
303            tags,
304            revision_id: revision_id.clone(),
305            logging_configuration: body.get("loggingConfiguration").cloned(),
306            tracing_configuration: body.get("tracingConfiguration").cloned(),
307            description: body["description"].as_str().unwrap_or("").to_string(),
308        };
309
310        state.state_machines.insert(arn.clone(), sm);
311
312        Ok(AwsResponse::ok_json(json!({
313            "stateMachineArn": arn,
314            "creationDate": now.timestamp() as f64,
315            "stateMachineVersionArn": arn,
316        })))
317    }
318
319    fn describe_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
320        let body = req.json_body();
321        validate_required("stateMachineArn", &body["stateMachineArn"])?;
322        let arn = body["stateMachineArn"]
323            .as_str()
324            .ok_or_else(|| missing("stateMachineArn"))?;
325        validate_arn(arn)?;
326
327        let accounts = self.state.read();
328        let empty = StepFunctionsState::new(&req.account_id, &req.region);
329        let state = accounts.get(&req.account_id).unwrap_or(&empty);
330        let sm = state
331            .state_machines
332            .get(arn)
333            .ok_or_else(|| state_machine_not_found(arn))?;
334
335        Ok(AwsResponse::ok_json(state_machine_to_json(sm)))
336    }
337
338    fn list_state_machines(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
339        let body = req.json_body();
340        let raw_max_results = body["maxResults"].as_i64();
341        if let Some(mr) = raw_max_results {
342            validate_max_results(mr)?;
343        }
344        // Smithy default: maxResults=0 means "use the service default" (100).
345        let max_results = match raw_max_results.unwrap_or(0) {
346            0 => 100,
347            n => n as usize,
348        };
349        let next_token = body["nextToken"].as_str();
350        if let Some(t) = next_token {
351            validate_page_token(t)?;
352        }
353
354        let accounts = self.state.read();
355        let empty = StepFunctionsState::new(&req.account_id, &req.region);
356        let state = accounts.get(&req.account_id).unwrap_or(&empty);
357        let mut machines: Vec<&StateMachine> = state.state_machines.values().collect();
358        machines.sort_by(|a, b| a.name.cmp(&b.name));
359
360        let items: Vec<Value> = machines
361            .iter()
362            .map(|sm| {
363                json!({
364                    "name": sm.name,
365                    "stateMachineArn": sm.arn,
366                    "type": sm.machine_type.as_str(),
367                    "creationDate": sm.creation_date.timestamp() as f64,
368                })
369            })
370            .collect();
371
372        let (page, token) = paginate(&items, next_token, max_results);
373
374        let mut resp = json!({ "stateMachines": page });
375        if let Some(t) = token {
376            resp["nextToken"] = json!(t);
377        }
378        Ok(AwsResponse::ok_json(resp))
379    }
380
381    fn delete_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
382        let body = req.json_body();
383        validate_required("stateMachineArn", &body["stateMachineArn"])?;
384        let arn = body["stateMachineArn"]
385            .as_str()
386            .ok_or_else(|| missing("stateMachineArn"))?;
387        validate_arn(arn)?;
388
389        let mut accounts = self.state.write();
390        let state = accounts.get_or_create(&req.account_id);
391        // AWS returns success even if it doesn't exist
392        state.state_machines.remove(arn);
393
394        Ok(AwsResponse::ok_json(json!({})))
395    }
396
397    fn update_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
398        let body = req.json_body();
399        validate_required("stateMachineArn", &body["stateMachineArn"])?;
400        let arn = body["stateMachineArn"]
401            .as_str()
402            .ok_or_else(|| missing("stateMachineArn"))?;
403        validate_arn(arn)?;
404
405        let mut accounts = self.state.write();
406        let state = accounts.get_or_create(&req.account_id);
407        let sm = state
408            .state_machines
409            .get_mut(arn)
410            .ok_or_else(|| state_machine_not_found(arn))?;
411
412        if let Some(definition) = body["definition"].as_str() {
413            validate_definition(definition)?;
414            sm.definition = definition.to_string();
415        }
416
417        if let Some(role_arn) = body["roleArn"].as_str() {
418            validate_arn(role_arn)?;
419            sm.role_arn = role_arn.to_string();
420        }
421
422        if let Some(logging) = body.get("loggingConfiguration") {
423            sm.logging_configuration = Some(logging.clone());
424        }
425
426        if let Some(tracing) = body.get("tracingConfiguration") {
427            sm.tracing_configuration = Some(tracing.clone());
428        }
429
430        if let Some(description) = body["description"].as_str() {
431            sm.description = description.to_string();
432        }
433
434        let now = Utc::now();
435        sm.update_date = now;
436        sm.revision_id = uuid::Uuid::new_v4().to_string();
437
438        let revision_id = sm.revision_id.clone();
439        let sm_arn = sm.arn.clone();
440
441        Ok(AwsResponse::ok_json(json!({
442            "updateDate": now.timestamp() as f64,
443            "revisionId": revision_id,
444            "stateMachineVersionArn": sm_arn,
445        })))
446    }
447
448    // ─── Execution Lifecycle ──────────────────────────────────────────
449
450    fn start_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
451        let body = req.json_body();
452        validate_required("stateMachineArn", &body["stateMachineArn"])?;
453        let sm_arn = body["stateMachineArn"]
454            .as_str()
455            .ok_or_else(|| missing("stateMachineArn"))?;
456        validate_arn(sm_arn)?;
457
458        let input = body["input"].as_str().map(|s| s.to_string());
459
460        // Validate input is valid JSON if provided
461        if let Some(ref input_str) = input {
462            let _: serde_json::Value = serde_json::from_str(input_str).map_err(|_| {
463                AwsServiceError::aws_error(
464                    StatusCode::BAD_REQUEST,
465                    "InvalidExecutionInput",
466                    "Invalid execution input: must be valid JSON".to_string(),
467                )
468            })?;
469        }
470
471        let execution_name = body["name"]
472            .as_str()
473            .map(|s| s.to_string())
474            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
475
476        if let Some(name) = body["name"].as_str() {
477            validate_name(name)?;
478        }
479
480        let mut accounts = self.state.write();
481        let state = accounts.get_or_create(&req.account_id);
482        let sm = state
483            .state_machines
484            .get(sm_arn)
485            .ok_or_else(|| state_machine_not_found(sm_arn))?;
486
487        let sm_name = sm.name.clone();
488        let definition = sm.definition.clone();
489        let exec_arn = state.execution_arn(&sm_name, &execution_name);
490
491        // Check for duplicate execution name
492        if state.executions.contains_key(&exec_arn) {
493            return Err(AwsServiceError::aws_error(
494                StatusCode::CONFLICT,
495                "ExecutionAlreadyExists",
496                format!("Execution Already Exists: '{exec_arn}'"),
497            ));
498        }
499
500        let now = Utc::now();
501        let execution = Execution {
502            execution_arn: exec_arn.clone(),
503            state_machine_arn: sm_arn.to_string(),
504            state_machine_name: sm_name,
505            name: execution_name,
506            status: ExecutionStatus::Running,
507            input: input.clone(),
508            output: None,
509            start_date: now,
510            stop_date: None,
511            error: None,
512            cause: None,
513            history_events: vec![],
514            parent_execution_arn: None,
515            is_sync: false,
516            billed_duration_ms: None,
517            billed_memory_mb: None,
518        };
519
520        state.executions.insert(exec_arn.clone(), execution);
521        let logging_config = sm.logging_configuration.clone();
522        drop(accounts);
523
524        // Spawn async execution
525        let shared_state = self.state.clone();
526        let exec_arn_clone = exec_arn.clone();
527        let input_clone = input;
528        let delivery = self.delivery.clone();
529        let dynamodb_state = self.dynamodb_state.clone();
530        let registry = self.registry.clone();
531        tokio::spawn(async move {
532            interpreter::execute_state_machine(
533                shared_state,
534                exec_arn_clone,
535                definition,
536                input_clone,
537                delivery,
538                dynamodb_state,
539                registry,
540                logging_config,
541            )
542            .await;
543        });
544
545        Ok(AwsResponse::ok_json(json!({
546            "executionArn": exec_arn,
547            "startDate": now.timestamp() as f64,
548        })))
549    }
550
551    fn stop_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
552        let body = req.json_body();
553        validate_required("executionArn", &body["executionArn"])?;
554        let exec_arn = body["executionArn"]
555            .as_str()
556            .ok_or_else(|| missing("executionArn"))?;
557
558        let error = body["error"].as_str().map(|s| s.to_string());
559        let cause = body["cause"].as_str().map(|s| s.to_string());
560
561        let mut accounts = self.state.write();
562        let state = accounts.get_or_create(&req.account_id);
563        let exec = state
564            .executions
565            .get_mut(exec_arn)
566            .ok_or_else(|| execution_not_found(exec_arn))?;
567
568        if exec.status != ExecutionStatus::Running {
569            return Err(AwsServiceError::aws_error(
570                StatusCode::BAD_REQUEST,
571                "ExecutionNotRunning",
572                format!("Execution is not running: '{exec_arn}'"),
573            ));
574        }
575
576        let now = Utc::now();
577        exec.status = ExecutionStatus::Aborted;
578        exec.stop_date = Some(now);
579        exec.error = error;
580        exec.cause = cause;
581
582        Ok(AwsResponse::ok_json(json!({
583            "stopDate": now.timestamp() as f64,
584        })))
585    }
586
587    fn describe_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
588        let body = req.json_body();
589        validate_required("executionArn", &body["executionArn"])?;
590        let exec_arn = body["executionArn"]
591            .as_str()
592            .ok_or_else(|| missing("executionArn"))?;
593
594        let accounts = self.state.read();
595        let empty = StepFunctionsState::new(&req.account_id, &req.region);
596        let state = accounts.get(&req.account_id).unwrap_or(&empty);
597        let exec = state
598            .executions
599            .get(exec_arn)
600            .ok_or_else(|| execution_not_found(exec_arn))?;
601
602        Ok(AwsResponse::ok_json(execution_to_json(exec)))
603    }
604
605    fn list_executions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
606        let body = req.json_body();
607        validate_required("stateMachineArn", &body["stateMachineArn"])?;
608        let sm_arn = body["stateMachineArn"]
609            .as_str()
610            .ok_or_else(|| missing("stateMachineArn"))?;
611        validate_arn(sm_arn)?;
612
613        let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
614        validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
615        let next_token = body["nextToken"].as_str();
616        let status_filter = body["statusFilter"].as_str();
617
618        let accounts = self.state.read();
619        let empty = StepFunctionsState::new(&req.account_id, &req.region);
620        let state = accounts.get(&req.account_id).unwrap_or(&empty);
621
622        // Verify state machine exists
623        if !state.state_machines.contains_key(sm_arn) {
624            return Err(state_machine_not_found(sm_arn));
625        }
626
627        let mut executions: Vec<&Execution> = state
628            .executions
629            .values()
630            .filter(|e| e.state_machine_arn == sm_arn)
631            .filter(|e| {
632                status_filter
633                    .map(|sf| e.status.as_str() == sf)
634                    .unwrap_or(true)
635            })
636            .collect();
637
638        // Sort by start date descending (most recent first)
639        executions.sort_by_key(|e| std::cmp::Reverse(e.start_date));
640
641        let items: Vec<Value> = executions
642            .iter()
643            .map(|e| {
644                let mut item = json!({
645                    "executionArn": e.execution_arn,
646                    "stateMachineArn": e.state_machine_arn,
647                    "name": e.name,
648                    "status": e.status.as_str(),
649                    "startDate": e.start_date.timestamp() as f64,
650                });
651                if let Some(stop) = e.stop_date {
652                    item["stopDate"] = json!(stop.timestamp() as f64);
653                }
654                item
655            })
656            .collect();
657
658        let (page, token) = paginate(&items, next_token, max_results);
659
660        let mut resp = json!({ "executions": page });
661        if let Some(t) = token {
662            resp["nextToken"] = json!(t);
663        }
664        Ok(AwsResponse::ok_json(resp))
665    }
666
667    fn get_execution_history(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
668        let body = req.json_body();
669        validate_required("executionArn", &body["executionArn"])?;
670        let exec_arn = body["executionArn"]
671            .as_str()
672            .ok_or_else(|| missing("executionArn"))?;
673        validate_arn_length("executionArn", exec_arn, 256)?;
674
675        if let Some(mr) = body["maxResults"].as_i64() {
676            validate_max_results(mr)?;
677        }
678        let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
679        let next_token = body["nextToken"].as_str();
680        if let Some(t) = next_token {
681            validate_page_token(t)?;
682        }
683        let reverse_order = body["reverseOrder"].as_bool().unwrap_or(false);
684
685        let accounts = self.state.read();
686        let empty = StepFunctionsState::new(&req.account_id, &req.region);
687        let state = accounts.get(&req.account_id).unwrap_or(&empty);
688        let exec = state
689            .executions
690            .get(exec_arn)
691            .ok_or_else(|| execution_not_found(exec_arn))?;
692
693        let mut events: Vec<Value> = exec
694            .history_events
695            .iter()
696            .map(|e| {
697                json!({
698                    "id": e.id,
699                    "type": e.event_type,
700                    "timestamp": e.timestamp.timestamp() as f64,
701                    "previousEventId": e.previous_event_id,
702                    format!("{}EventDetails", camel_to_details_key(&e.event_type)): e.details,
703                })
704            })
705            .collect();
706
707        if reverse_order {
708            events.reverse();
709        }
710
711        let (page, token) = paginate(&events, next_token, max_results);
712
713        let mut resp = json!({ "events": page });
714        if let Some(t) = token {
715            resp["nextToken"] = json!(t);
716        }
717        Ok(AwsResponse::ok_json(resp))
718    }
719
720    fn describe_state_machine_for_execution(
721        &self,
722        req: &AwsRequest,
723    ) -> Result<AwsResponse, AwsServiceError> {
724        let body = req.json_body();
725        validate_required("executionArn", &body["executionArn"])?;
726        let exec_arn = body["executionArn"]
727            .as_str()
728            .ok_or_else(|| missing("executionArn"))?;
729
730        let accounts = self.state.read();
731        let empty = StepFunctionsState::new(&req.account_id, &req.region);
732        let state = accounts.get(&req.account_id).unwrap_or(&empty);
733        let exec = state
734            .executions
735            .get(exec_arn)
736            .ok_or_else(|| execution_not_found(exec_arn))?;
737
738        let sm = state
739            .state_machines
740            .get(&exec.state_machine_arn)
741            .ok_or_else(|| state_machine_not_found(&exec.state_machine_arn))?;
742
743        Ok(AwsResponse::ok_json(state_machine_to_json(sm)))
744    }
745
746    // ─── Tagging ────────────────────────────────────────────────────────
747
748    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
749        let body = req.json_body();
750        validate_required("resourceArn", &body["resourceArn"])?;
751        let arn = body["resourceArn"]
752            .as_str()
753            .ok_or_else(|| missing("resourceArn"))?;
754        validate_arn(arn)?;
755        validate_required("tags", &body["tags"])?;
756
757        let mut accounts = self.state.write();
758        let state = accounts.get_or_create(&req.account_id);
759        let sm = state
760            .state_machines
761            .get_mut(arn)
762            .ok_or_else(|| resource_not_found(arn))?;
763
764        fakecloud_core::tags::apply_tags(&mut sm.tags, &body, "tags", "key", "value").map_err(
765            |f| {
766                AwsServiceError::aws_error(
767                    StatusCode::BAD_REQUEST,
768                    "ValidationException",
769                    format!("{f} must be a list"),
770                )
771            },
772        )?;
773
774        Ok(AwsResponse::ok_json(json!({})))
775    }
776
777    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
778        let body = req.json_body();
779        validate_required("resourceArn", &body["resourceArn"])?;
780        let arn = body["resourceArn"]
781            .as_str()
782            .ok_or_else(|| missing("resourceArn"))?;
783        validate_arn(arn)?;
784        validate_required("tagKeys", &body["tagKeys"])?;
785
786        let mut accounts = self.state.write();
787        let state = accounts.get_or_create(&req.account_id);
788        let sm = state
789            .state_machines
790            .get_mut(arn)
791            .ok_or_else(|| resource_not_found(arn))?;
792
793        fakecloud_core::tags::remove_tags(&mut sm.tags, &body, "tagKeys").map_err(|f| {
794            AwsServiceError::aws_error(
795                StatusCode::BAD_REQUEST,
796                "ValidationException",
797                format!("{f} must be a list"),
798            )
799        })?;
800
801        Ok(AwsResponse::ok_json(json!({})))
802    }
803
804    fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
805        let body = req.json_body();
806        validate_required("resourceArn", &body["resourceArn"])?;
807        let arn = body["resourceArn"]
808            .as_str()
809            .ok_or_else(|| missing("resourceArn"))?;
810        validate_arn(arn)?;
811
812        let accounts = self.state.read();
813        let empty = StepFunctionsState::new(&req.account_id, &req.region);
814        let state = accounts.get(&req.account_id).unwrap_or(&empty);
815        let sm = state
816            .state_machines
817            .get(arn)
818            .ok_or_else(|| resource_not_found(arn))?;
819
820        let tags = fakecloud_core::tags::tags_to_json(&sm.tags, "key", "value");
821
822        Ok(AwsResponse::ok_json(json!({ "tags": tags })))
823    }
824
825    // ─── Activities ─────────────────────────────────────────────────────
826
827    fn create_activity(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
828        let body = req.json_body();
829        let name = body["name"].as_str().ok_or_else(|| missing("name"))?;
830        validate_name(name)?;
831        let mut accounts = self.state.write();
832        let state = accounts.get_or_create(&req.account_id);
833        let arn = format!(
834            "arn:aws:states:{}:{}:activity:{}",
835            state.region, state.account_id, name
836        );
837        if state.activities.contains_key(&arn) {
838            return Err(AwsServiceError::aws_error(
839                StatusCode::BAD_REQUEST,
840                "ActivityAlreadyExists",
841                format!("Activity already exists: {arn}"),
842            ));
843        }
844        let activity = crate::state::Activity {
845            name: name.to_string(),
846            arn: arn.clone(),
847            creation_date: chrono::Utc::now(),
848            tags: BTreeMap::new(),
849        };
850        state.activities.insert(arn.clone(), activity.clone());
851        Ok(AwsResponse::ok_json(json!({
852            "activityArn": arn,
853            "creationDate": activity.creation_date.timestamp(),
854        })))
855    }
856
857    fn delete_activity(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
858        let body = req.json_body();
859        let arn = body["activityArn"]
860            .as_str()
861            .ok_or_else(|| missing("activityArn"))?
862            .to_string();
863        validate_arn_length("activityArn", &arn, 256)?;
864        let mut accounts = self.state.write();
865        let state = accounts.get_or_create(&req.account_id);
866        state.activities.remove(&arn);
867        Ok(AwsResponse::ok_json(json!({})))
868    }
869
870    fn describe_activity(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
871        let body = req.json_body();
872        let arn = body["activityArn"]
873            .as_str()
874            .ok_or_else(|| missing("activityArn"))?
875            .to_string();
876        let accounts = self.state.read();
877        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
878        let state = accounts.get(&req.account_id).unwrap_or(&empty);
879        let a = state.activities.get(&arn).ok_or_else(|| {
880            AwsServiceError::aws_error(
881                StatusCode::BAD_REQUEST,
882                "ActivityDoesNotExist",
883                format!("Activity does not exist: {arn}"),
884            )
885        })?;
886        Ok(AwsResponse::ok_json(json!({
887            "activityArn": a.arn,
888            "name": a.name,
889            "creationDate": a.creation_date.timestamp(),
890        })))
891    }
892
893    fn list_activities(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
894        let body = req.json_body();
895        let raw_max_results = body["maxResults"].as_i64();
896        if let Some(mr) = raw_max_results {
897            validate_max_results(mr)?;
898        }
899        let next_token = body["nextToken"].as_str();
900        if let Some(t) = next_token {
901            validate_page_token(t)?;
902        }
903        // Smithy default: maxResults=0 means "use the service default"
904        // (100 for Step Functions), which we honour by treating both
905        // `None` and `0` as 100.
906        let max_results = match raw_max_results.unwrap_or(0) {
907            0 => 100,
908            n => n as usize,
909        };
910        let accounts = self.state.read();
911        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
912        let state = accounts.get(&req.account_id).unwrap_or(&empty);
913        let mut activities: Vec<&crate::state::Activity> = state.activities.values().collect();
914        activities.sort_by(|a, b| a.name.cmp(&b.name));
915        let items: Vec<Value> = activities
916            .iter()
917            .map(|a| {
918                json!({
919                    "activityArn": a.arn,
920                    "name": a.name,
921                    "creationDate": a.creation_date.timestamp(),
922                })
923            })
924            .collect();
925        let (page, token) = paginate(&items, next_token, max_results);
926        let mut resp = json!({ "activities": page });
927        if let Some(t) = token {
928            resp["nextToken"] = json!(t);
929        }
930        Ok(AwsResponse::ok_json(resp))
931    }
932
933    async fn get_activity_task(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
934        let body = req.json_body();
935        let arn = body["activityArn"]
936            .as_str()
937            .ok_or_else(|| missing("activityArn"))?
938            .to_string();
939        // Activity must exist before we'll accept long-poll calls.
940        {
941            let accounts = self.state.read();
942            let state = accounts
943                .get(&req.account_id)
944                .ok_or_else(|| activity_not_found(&arn))?;
945            if !state.activities.contains_key(&arn) {
946                return Err(activity_not_found(&arn));
947            }
948        }
949
950        // AWS GetActivityTask blocks up to 60s. fakecloud defaults to 5s
951        // so test suites don't stall when no worker is feeding the queue.
952        let max_wait_secs: u64 = std::env::var("FAKECLOUD_SFN_GET_ACTIVITY_TIMEOUT_SECS")
953            .ok()
954            .and_then(|s| s.parse().ok())
955            .unwrap_or(5);
956        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(max_wait_secs);
957
958        loop {
959            // Try to dequeue oldest PENDING token for this activity.
960            {
961                let mut accounts = self.state.write();
962                let state = accounts.get_or_create(&req.account_id);
963                let mut candidates: Vec<(String, chrono::DateTime<chrono::Utc>)> = state
964                    .task_tokens
965                    .iter()
966                    .filter(|(_, t)| t.activity_arn == arn && t.status == "PENDING")
967                    .map(|(k, t)| (k.clone(), t.created_at))
968                    .collect();
969                candidates.sort_by_key(|c| c.1);
970                if let Some((token, _)) = candidates.into_iter().next() {
971                    let now = chrono::Utc::now();
972                    let entry = state.task_tokens.get_mut(&token).expect("just looked up");
973                    entry.status = "IN_PROGRESS".to_string();
974                    entry.last_heartbeat_at = Some(now);
975                    let input = entry.input.clone().unwrap_or_else(|| "{}".to_string());
976                    return Ok(AwsResponse::ok_json(json!({
977                        "taskToken": token,
978                        "input": input,
979                    })));
980                }
981            }
982            if std::time::Instant::now() >= deadline {
983                // No task available in window — return empty token (matches
984                // AWS behavior).
985                return Ok(AwsResponse::ok_json(json!({
986                    "taskToken": "",
987                    "input": "",
988                })));
989            }
990            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
991        }
992    }
993
994    fn send_task_success(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
995        self.update_task_token(req, "SUCCEEDED")
996    }
997
998    fn send_task_failure(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
999        self.update_task_token(req, "FAILED")
1000    }
1001
1002    fn send_task_heartbeat(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1003        // Heartbeats only refresh `last_heartbeat_at`; they don't change
1004        // the task's lifecycle status. The interpreter's heartbeat-timeout
1005        // check reads `last_heartbeat_at` to decide whether to fail the
1006        // task with `States.HeartbeatTimeout`.
1007        let body = req.json_body();
1008        let token = body["taskToken"]
1009            .as_str()
1010            .ok_or_else(|| missing("taskToken"))?
1011            .to_string();
1012        let mut accounts = self.state.write();
1013        let state = accounts.get_or_create(&req.account_id);
1014        let entry = state
1015            .task_tokens
1016            .get_mut(&token)
1017            .ok_or_else(|| task_does_not_exist(&token))?;
1018        entry.last_heartbeat_at = Some(chrono::Utc::now());
1019        Ok(AwsResponse::ok_json(json!({})))
1020    }
1021
1022    fn update_task_token(
1023        &self,
1024        req: &AwsRequest,
1025        new_status: &str,
1026    ) -> Result<AwsResponse, AwsServiceError> {
1027        let body = req.json_body();
1028        let token = body["taskToken"]
1029            .as_str()
1030            .ok_or_else(|| missing("taskToken"))?
1031            .to_string();
1032        let mut accounts = self.state.write();
1033        let state = accounts.get_or_create(&req.account_id);
1034        let entry = state
1035            .task_tokens
1036            .get_mut(&token)
1037            .ok_or_else(|| task_does_not_exist(&token))?;
1038        entry.status = new_status.to_string();
1039        if new_status == "SUCCEEDED" {
1040            entry.output = body["output"].as_str().map(String::from);
1041        } else if new_status == "FAILED" {
1042            entry.error = body["error"].as_str().map(String::from);
1043            entry.cause = body["cause"].as_str().map(String::from);
1044        }
1045        Ok(AwsResponse::ok_json(json!({})))
1046    }
1047
1048    // ─── State machine versions / aliases ───────────────────────────────
1049
1050    fn publish_state_machine_version(
1051        &self,
1052        req: &AwsRequest,
1053    ) -> Result<AwsResponse, AwsServiceError> {
1054        let body = req.json_body();
1055        let arn = body["stateMachineArn"]
1056            .as_str()
1057            .ok_or_else(|| missing("stateMachineArn"))?
1058            .to_string();
1059        let description = body["description"].as_str().unwrap_or("").to_string();
1060        let mut accounts = self.state.write();
1061        let state = accounts.get_or_create(&req.account_id);
1062        if !state.state_machines.contains_key(&arn) {
1063            return Err(state_machine_not_found(&arn));
1064        }
1065        let version = state
1066            .state_machine_versions
1067            .values()
1068            .filter(|v| v.state_machine_arn == arn)
1069            .map(|v| v.version)
1070            .max()
1071            .unwrap_or(0)
1072            + 1;
1073        let version_arn = format!("{arn}:{version}");
1074        let v = crate::state::StateMachineVersion {
1075            state_machine_arn: arn,
1076            version,
1077            revision_id: format!("rev-{version}"),
1078            description,
1079            creation_date: chrono::Utc::now(),
1080        };
1081        state
1082            .state_machine_versions
1083            .insert(version_arn.clone(), v.clone());
1084        Ok(AwsResponse::ok_json(json!({
1085            "stateMachineVersionArn": version_arn,
1086            "creationDate": v.creation_date.timestamp(),
1087        })))
1088    }
1089
1090    fn delete_state_machine_version(
1091        &self,
1092        req: &AwsRequest,
1093    ) -> Result<AwsResponse, AwsServiceError> {
1094        let body = req.json_body();
1095        let arn = body["stateMachineVersionArn"]
1096            .as_str()
1097            .ok_or_else(|| missing("stateMachineVersionArn"))?
1098            .to_string();
1099        validate_arn_length("stateMachineVersionArn", &arn, 2000)?;
1100        let mut accounts = self.state.write();
1101        let state = accounts.get_or_create(&req.account_id);
1102        state.state_machine_versions.remove(&arn);
1103        Ok(AwsResponse::ok_json(json!({})))
1104    }
1105
1106    fn list_state_machine_versions(
1107        &self,
1108        req: &AwsRequest,
1109    ) -> Result<AwsResponse, AwsServiceError> {
1110        let body = req.json_body();
1111        let arn = body["stateMachineArn"]
1112            .as_str()
1113            .ok_or_else(|| missing("stateMachineArn"))?
1114            .to_string();
1115        validate_arn_length("stateMachineArn", &arn, 256)?;
1116        let raw_max_results = body["maxResults"].as_i64();
1117        if let Some(mr) = raw_max_results {
1118            validate_max_results(mr)?;
1119        }
1120        let next_token = body["nextToken"].as_str();
1121        if let Some(t) = next_token {
1122            validate_page_token(t)?;
1123        }
1124        let max_results = match raw_max_results.unwrap_or(0) {
1125            0 => 100,
1126            n => n as usize,
1127        };
1128        let accounts = self.state.read();
1129        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1130        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1131        let mut versions: Vec<&crate::state::StateMachineVersion> = state
1132            .state_machine_versions
1133            .values()
1134            .filter(|v| v.state_machine_arn == arn)
1135            .collect();
1136        versions.sort_by_key(|v| std::cmp::Reverse(v.version));
1137        let items: Vec<Value> = versions
1138            .iter()
1139            .map(|v| {
1140                json!({
1141                    "stateMachineVersionArn": format!("{}:{}", v.state_machine_arn, v.version),
1142                    "creationDate": v.creation_date.timestamp(),
1143                })
1144            })
1145            .collect();
1146        let (page, token) = paginate(&items, next_token, max_results);
1147        let mut resp = json!({ "stateMachineVersions": page });
1148        if let Some(t) = token {
1149            resp["nextToken"] = json!(t);
1150        }
1151        Ok(AwsResponse::ok_json(resp))
1152    }
1153
1154    fn create_state_machine_alias(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1155        let body = req.json_body();
1156        let name = body["name"]
1157            .as_str()
1158            .ok_or_else(|| missing("name"))?
1159            .to_string();
1160        validate_name(&name)?;
1161        let routing_cfg = body["routingConfiguration"]
1162            .as_array()
1163            .ok_or_else(|| missing("routingConfiguration"))?;
1164        let routes = parse_routing_configuration(routing_cfg)?;
1165        let parent_arn = routes[0]
1166            .state_machine_version_arn
1167            .rsplit_once(':')
1168            .map(|(parent, _)| parent.to_string())
1169            .unwrap_or_default();
1170        let alias_arn = format!("{parent_arn}:{name}");
1171        let now = chrono::Utc::now();
1172        let alias = crate::state::StateMachineAlias {
1173            name,
1174            arn: alias_arn.clone(),
1175            description: body["description"].as_str().unwrap_or("").to_string(),
1176            routing_configuration: routes,
1177            creation_date: now,
1178            update_date: now,
1179        };
1180        let mut accounts = self.state.write();
1181        let state = accounts.get_or_create(&req.account_id);
1182        state.state_machine_aliases.insert(alias_arn.clone(), alias);
1183        Ok(AwsResponse::ok_json(json!({
1184            "stateMachineAliasArn": alias_arn,
1185            "creationDate": now.timestamp(),
1186        })))
1187    }
1188
1189    fn delete_state_machine_alias(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1190        let body = req.json_body();
1191        let arn = body["stateMachineAliasArn"]
1192            .as_str()
1193            .ok_or_else(|| missing("stateMachineAliasArn"))?
1194            .to_string();
1195        validate_arn_length("stateMachineAliasArn", &arn, 256)?;
1196        let mut accounts = self.state.write();
1197        let state = accounts.get_or_create(&req.account_id);
1198        state.state_machine_aliases.remove(&arn);
1199        Ok(AwsResponse::ok_json(json!({})))
1200    }
1201
1202    fn describe_state_machine_alias(
1203        &self,
1204        req: &AwsRequest,
1205    ) -> Result<AwsResponse, AwsServiceError> {
1206        let body = req.json_body();
1207        let arn = body["stateMachineAliasArn"]
1208            .as_str()
1209            .ok_or_else(|| missing("stateMachineAliasArn"))?
1210            .to_string();
1211        let accounts = self.state.read();
1212        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1213        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1214        let alias = state
1215            .state_machine_aliases
1216            .get(&arn)
1217            .ok_or_else(|| resource_not_found(&arn))?;
1218        Ok(AwsResponse::ok_json(state_machine_alias_to_json(alias)))
1219    }
1220
1221    fn list_state_machine_aliases(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1222        let body = req.json_body();
1223        let parent = body["stateMachineArn"]
1224            .as_str()
1225            .ok_or_else(|| missing("stateMachineArn"))?
1226            .to_string();
1227        validate_arn_length("stateMachineArn", &parent, 256)?;
1228        let raw_max_results = body["maxResults"].as_i64();
1229        if let Some(mr) = raw_max_results {
1230            validate_max_results(mr)?;
1231        }
1232        let next_token = body["nextToken"].as_str();
1233        if let Some(t) = next_token {
1234            validate_page_token(t)?;
1235        }
1236        let max_results = match raw_max_results.unwrap_or(0) {
1237            0 => 100,
1238            n => n as usize,
1239        };
1240        let accounts = self.state.read();
1241        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1242        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1243        // Anchor the prefix on the alias separator so a state machine
1244        // named `foo` doesn't pull in aliases for `foobar`.
1245        let parent_prefix = format!("{parent}:");
1246        let mut aliases: Vec<&crate::state::StateMachineAlias> = state
1247            .state_machine_aliases
1248            .values()
1249            .filter(|a| a.arn.starts_with(&parent_prefix))
1250            .collect();
1251        aliases.sort_by(|a, b| a.name.cmp(&b.name));
1252        let items: Vec<Value> = aliases
1253            .iter()
1254            .map(|a| {
1255                json!({
1256                    "stateMachineAliasArn": a.arn,
1257                    "creationDate": a.creation_date.timestamp(),
1258                })
1259            })
1260            .collect();
1261        let (page, token) = paginate(&items, next_token, max_results);
1262        let mut resp = json!({ "stateMachineAliases": page });
1263        if let Some(t) = token {
1264            resp["nextToken"] = json!(t);
1265        }
1266        Ok(AwsResponse::ok_json(resp))
1267    }
1268
1269    fn update_state_machine_alias(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1270        let body = req.json_body();
1271        let arn = body["stateMachineAliasArn"]
1272            .as_str()
1273            .ok_or_else(|| missing("stateMachineAliasArn"))?
1274            .to_string();
1275        let mut accounts = self.state.write();
1276        let state = accounts.get_or_create(&req.account_id);
1277        let alias = state
1278            .state_machine_aliases
1279            .get_mut(&arn)
1280            .ok_or_else(|| resource_not_found(&arn))?;
1281        if let Some(d) = body["description"].as_str() {
1282            alias.description = d.to_string();
1283        }
1284        if let Some(routes) = body["routingConfiguration"].as_array() {
1285            alias.routing_configuration = parse_routing_configuration(routes)?;
1286        }
1287        alias.update_date = chrono::Utc::now();
1288        Ok(AwsResponse::ok_json(json!({
1289            "updateDate": alias.update_date.timestamp(),
1290        })))
1291    }
1292
1293    // ─── Map runs ───────────────────────────────────────────────────────
1294
1295    fn describe_map_run(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1296        let body = req.json_body();
1297        let arn = body["mapRunArn"]
1298            .as_str()
1299            .ok_or_else(|| missing("mapRunArn"))?
1300            .to_string();
1301        let accounts = self.state.read();
1302        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1303        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1304        let mr = state
1305            .map_runs
1306            .get(&arn)
1307            .ok_or_else(|| resource_not_found(&arn))?;
1308        Ok(AwsResponse::ok_json(map_run_to_json(mr)))
1309    }
1310
1311    fn list_map_runs(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1312        let body = req.json_body();
1313        // `executionArn` is required + has @length 1..=256.
1314        let exec_arn = body["executionArn"]
1315            .as_str()
1316            .ok_or_else(|| missing("executionArn"))?
1317            .to_string();
1318        validate_arn_length("executionArn", &exec_arn, 256)?;
1319        let raw_max_results = body["maxResults"].as_i64();
1320        if let Some(mr) = raw_max_results {
1321            validate_max_results(mr)?;
1322        }
1323        let next_token = body["nextToken"].as_str();
1324        if let Some(t) = next_token {
1325            validate_page_token(t)?;
1326        }
1327        let max_results = match raw_max_results.unwrap_or(0) {
1328            0 => 100,
1329            n => n as usize,
1330        };
1331        let accounts = self.state.read();
1332        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1333        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1334        let mut runs: Vec<&crate::state::MapRun> = state
1335            .map_runs
1336            .values()
1337            .filter(|r| r.execution_arn == exec_arn)
1338            .collect();
1339        // Stable order so pagination is deterministic.
1340        runs.sort_by_key(|r| r.start_date);
1341        let items: Vec<Value> = runs
1342            .iter()
1343            .map(|r| {
1344                json!({
1345                    "mapRunArn": r.map_run_arn,
1346                    "executionArn": r.execution_arn,
1347                    "stateMachineArn": "",
1348                    "startDate": r.start_date.timestamp(),
1349                    "stopDate": r.stop_date.map(|d| d.timestamp()),
1350                })
1351            })
1352            .collect();
1353        let (page, token) = paginate(&items, next_token, max_results);
1354        let mut resp = json!({ "mapRuns": page });
1355        if let Some(t) = token {
1356            resp["nextToken"] = json!(t);
1357        }
1358        Ok(AwsResponse::ok_json(resp))
1359    }
1360
1361    fn update_map_run(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1362        let body = req.json_body();
1363        let arn = body["mapRunArn"]
1364            .as_str()
1365            .ok_or_else(|| missing("mapRunArn"))?
1366            .to_string();
1367        let mut accounts = self.state.write();
1368        let state = accounts.get_or_create(&req.account_id);
1369        let mr = state
1370            .map_runs
1371            .get_mut(&arn)
1372            .ok_or_else(|| resource_not_found(&arn))?;
1373        if let Some(c) = body["maxConcurrency"].as_i64() {
1374            mr.max_concurrency = c as i32;
1375        }
1376        if let Some(p) = body["toleratedFailurePercentage"].as_f64() {
1377            mr.tolerated_failure_percentage = p;
1378        }
1379        if let Some(c) = body["toleratedFailureCount"].as_i64() {
1380            mr.tolerated_failure_count = c;
1381        }
1382        Ok(AwsResponse::ok_json(json!({})))
1383    }
1384
1385    // ─── Execution lifecycle extras ─────────────────────────────────────
1386
1387    fn redrive_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1388        let body = req.json_body();
1389        let arn = body["executionArn"]
1390            .as_str()
1391            .ok_or_else(|| missing("executionArn"))?
1392            .to_string();
1393        let mut accounts = self.state.write();
1394        let state = accounts.get_or_create(&req.account_id);
1395        let exec = state.executions.get_mut(&arn).ok_or_else(|| {
1396            AwsServiceError::aws_error(
1397                StatusCode::BAD_REQUEST,
1398                "ExecutionDoesNotExist",
1399                format!("Execution does not exist: {arn}"),
1400            )
1401        })?;
1402        exec.status = crate::state::ExecutionStatus::Running;
1403        exec.stop_date = None;
1404        Ok(AwsResponse::ok_json(json!({
1405            "redriveDate": chrono::Utc::now().timestamp(),
1406        })))
1407    }
1408
1409    async fn start_sync_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1410        let body = req.json_body();
1411        let sm_arn = body["stateMachineArn"]
1412            .as_str()
1413            .ok_or_else(|| missing("stateMachineArn"))?
1414            .to_string();
1415        let input = body["input"].as_str().unwrap_or("{}").to_string();
1416        if serde_json::from_str::<serde_json::Value>(&input).is_err() {
1417            return Err(AwsServiceError::aws_error(
1418                StatusCode::BAD_REQUEST,
1419                "InvalidExecutionInput",
1420                "Execution input is not valid JSON.",
1421            ));
1422        }
1423        let (exec_arn, definition, logging_config) = {
1424            let mut accounts = self.state.write();
1425            let state = accounts.get_or_create(&req.account_id);
1426            let sm = state
1427                .state_machines
1428                .get(&sm_arn)
1429                .ok_or_else(|| state_machine_not_found(&sm_arn))?;
1430            if sm.machine_type != crate::state::StateMachineType::Express {
1431                return Err(AwsServiceError::aws_error(
1432                    StatusCode::BAD_REQUEST,
1433                    "StateMachineTypeNotSupported",
1434                    "StartSyncExecution is only supported for EXPRESS state machines.",
1435                ));
1436            }
1437            let now = chrono::Utc::now();
1438            let exec_name = format!("sync-{}", now.timestamp_millis());
1439            let exec_arn = format!(
1440                "arn:aws:states:{}:{}:express:{}:{}",
1441                state.region, state.account_id, sm.name, exec_name
1442            );
1443            let execution = Execution {
1444                execution_arn: exec_arn.clone(),
1445                state_machine_arn: sm_arn.clone(),
1446                state_machine_name: sm.name.clone(),
1447                name: exec_name.clone(),
1448                status: ExecutionStatus::Running,
1449                input: Some(input.clone()),
1450                output: None,
1451                start_date: now,
1452                stop_date: None,
1453                error: None,
1454                cause: None,
1455                history_events: vec![],
1456                parent_execution_arn: None,
1457                is_sync: true,
1458                billed_duration_ms: None,
1459                billed_memory_mb: None,
1460            };
1461            state.executions.insert(exec_arn.clone(), execution);
1462            (
1463                exec_arn,
1464                sm.definition.clone(),
1465                sm.logging_configuration.clone(),
1466            )
1467        };
1468
1469        interpreter::execute_state_machine(
1470            self.state.clone(),
1471            exec_arn.clone(),
1472            definition,
1473            Some(input),
1474            self.delivery.clone(),
1475            self.dynamodb_state.clone(),
1476            self.registry.clone(),
1477            logging_config,
1478        )
1479        .await;
1480
1481        // Persist billing details on the stored execution so introspection
1482        // endpoints can replay the same numbers later.
1483        {
1484            let mut accounts = self.state.write();
1485            if let Some(state) = accounts.get_mut(&req.account_id) {
1486                if let Some(exec) = state.executions.get_mut(&exec_arn) {
1487                    let duration_ms = exec
1488                        .stop_date
1489                        .map_or(0, |stop| (stop - exec.start_date).num_milliseconds())
1490                        .max(0);
1491                    exec.billed_duration_ms = Some(duration_ms);
1492                    exec.billed_memory_mb = Some(64);
1493                }
1494            }
1495        }
1496
1497        let accounts = self.state.read();
1498        let state = accounts.get(&req.account_id).unwrap();
1499        let exec = state
1500            .executions
1501            .get(&exec_arn)
1502            .ok_or_else(|| execution_not_found(&exec_arn))?;
1503
1504        let mut resp = json!({
1505            "executionArn": exec.execution_arn,
1506            "stateMachineArn": exec.state_machine_arn,
1507            "name": exec.name,
1508            "startDate": exec.start_date.timestamp(),
1509            "stopDate": exec.stop_date.map(|d| d.timestamp()),
1510            "status": exec.status.as_str(),
1511            "input": exec.input.as_deref().unwrap_or("{}"),
1512        });
1513
1514        if let Some(ref output) = exec.output {
1515            resp["output"] = json!(output);
1516        }
1517        if let Some(ref error) = exec.error {
1518            resp["error"] = json!(error);
1519        }
1520        if let Some(ref cause) = exec.cause {
1521            resp["cause"] = json!(cause);
1522        }
1523
1524        let duration_ms = exec
1525            .stop_date
1526            .map_or(0, |stop| (stop - exec.start_date).num_milliseconds());
1527        resp["billingDetails"] = json!({
1528            "billedMemoryUsedInMB": 64,
1529            "billedDurationInMilliseconds": duration_ms.max(0),
1530        });
1531
1532        Ok(AwsResponse::ok_json(resp))
1533    }
1534
1535    fn test_state(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1536        let body = req.json_body();
1537        let definition = body["definition"]
1538            .as_str()
1539            .ok_or_else(|| missing("definition"))?;
1540        validate_definition(definition)?;
1541        let _role_arn = body["roleArn"].as_str().ok_or_else(|| missing("roleArn"))?;
1542        let input = body["input"].as_str().unwrap_or("{}").to_string();
1543        // Echo input back as output. Real Step Functions actually
1544        // simulates the state; our emulator reports SUCCEEDED so callers
1545        // can wire the integration test scaffolding.
1546        Ok(AwsResponse::ok_json(json!({
1547            "output": input,
1548            "status": "SUCCEEDED",
1549            "nextState": "End",
1550        })))
1551    }
1552
1553    fn validate_state_machine_definition(
1554        &self,
1555        req: &AwsRequest,
1556    ) -> Result<AwsResponse, AwsServiceError> {
1557        let body = req.json_body();
1558        let definition = body["definition"]
1559            .as_str()
1560            .ok_or_else(|| missing("definition"))?;
1561        if definition.is_empty() {
1562            return Err(AwsServiceError::aws_error(
1563                StatusCode::BAD_REQUEST,
1564                "ValidationException",
1565                "definition must be 1..=1048576 characters",
1566            ));
1567        }
1568        if let Some(mr) = body["maxResults"].as_i64() {
1569            if !(0..=100).contains(&mr) {
1570                return Err(AwsServiceError::aws_error(
1571                    StatusCode::BAD_REQUEST,
1572                    "ValidationException",
1573                    format!("maxResults '{mr}' is outside 0..=100"),
1574                ));
1575            }
1576        }
1577        if let Some(sev) = body["severity"].as_str() {
1578            if !matches!(sev, "ERROR" | "WARNING") {
1579                return Err(AwsServiceError::aws_error(
1580                    StatusCode::BAD_REQUEST,
1581                    "ValidationException",
1582                    format!("severity '{sev}' must be ERROR or WARNING"),
1583                ));
1584            }
1585        }
1586        if let Some(ty) = body["type"].as_str() {
1587            if !matches!(ty, "STANDARD" | "EXPRESS") {
1588                return Err(AwsServiceError::aws_error(
1589                    StatusCode::BAD_REQUEST,
1590                    "ValidationException",
1591                    format!("type '{ty}' must be STANDARD or EXPRESS"),
1592                ));
1593            }
1594        }
1595        match validate_definition(definition) {
1596            Ok(()) => Ok(AwsResponse::ok_json(json!({
1597                "result": "OK",
1598                "diagnostics": [],
1599            }))),
1600            Err(e) => Ok(AwsResponse::ok_json(json!({
1601                "result": "FAIL",
1602                "diagnostics": [{
1603                    "severity": "ERROR",
1604                    "code": "INVALID_DEFINITION",
1605                    "message": e.to_string(),
1606                }],
1607            }))),
1608        }
1609    }
1610}
1611
1612fn state_machine_alias_to_json(alias: &crate::state::StateMachineAlias) -> Value {
1613    json!({
1614        "stateMachineAliasArn": alias.arn,
1615        "name": alias.name,
1616        "description": alias.description,
1617        "routingConfiguration": alias.routing_configuration.iter().map(|r| json!({
1618            "stateMachineVersionArn": r.state_machine_version_arn,
1619            "weight": r.weight,
1620        })).collect::<Vec<_>>(),
1621        "creationDate": alias.creation_date.timestamp(),
1622        "updateDate": alias.update_date.timestamp(),
1623    })
1624}
1625
1626fn map_run_to_json(mr: &crate::state::MapRun) -> Value {
1627    json!({
1628        "mapRunArn": mr.map_run_arn,
1629        "executionArn": mr.execution_arn,
1630        "maxConcurrency": mr.max_concurrency,
1631        "toleratedFailurePercentage": mr.tolerated_failure_percentage,
1632        "toleratedFailureCount": mr.tolerated_failure_count,
1633        "status": mr.status,
1634        "startDate": mr.start_date.timestamp(),
1635        "stopDate": mr.stop_date.map(|d| d.timestamp()),
1636    })
1637}
1638
1639// ─── Helpers ────────────────────────────────────────────────────────────
1640
1641fn state_machine_to_json(sm: &StateMachine) -> Value {
1642    let mut resp = json!({
1643        "name": sm.name,
1644        "stateMachineArn": sm.arn,
1645        "definition": sm.definition,
1646        "roleArn": sm.role_arn,
1647        "type": sm.machine_type.as_str(),
1648        "status": sm.status.as_str(),
1649        "creationDate": sm.creation_date.timestamp() as f64,
1650        "updateDate": sm.update_date.timestamp() as f64,
1651        "revisionId": sm.revision_id,
1652        "label": sm.name,
1653    });
1654
1655    if !sm.description.is_empty() {
1656        resp["description"] = json!(sm.description);
1657    }
1658
1659    if let Some(ref logging) = sm.logging_configuration {
1660        resp["loggingConfiguration"] = logging.clone();
1661    } else {
1662        resp["loggingConfiguration"] = json!({
1663            "level": "OFF",
1664            "includeExecutionData": false,
1665            "destinations": [],
1666        });
1667    }
1668
1669    if let Some(ref tracing) = sm.tracing_configuration {
1670        resp["tracingConfiguration"] = tracing.clone();
1671    } else {
1672        resp["tracingConfiguration"] = json!({
1673            "enabled": false,
1674        });
1675    }
1676
1677    resp
1678}
1679
1680fn missing(name: &str) -> AwsServiceError {
1681    AwsServiceError::aws_error(
1682        StatusCode::BAD_REQUEST,
1683        "ValidationException",
1684        format!("The request must contain the parameter {name}."),
1685    )
1686}
1687
1688fn state_machine_not_found(arn: &str) -> AwsServiceError {
1689    AwsServiceError::aws_error(
1690        StatusCode::BAD_REQUEST,
1691        "StateMachineDoesNotExist",
1692        format!("State Machine Does Not Exist: '{arn}'"),
1693    )
1694}
1695
1696fn activity_not_found(arn: &str) -> AwsServiceError {
1697    AwsServiceError::aws_error(
1698        StatusCode::BAD_REQUEST,
1699        "ActivityDoesNotExist",
1700        format!("Activity does not exist: {arn}"),
1701    )
1702}
1703
1704fn task_does_not_exist(token: &str) -> AwsServiceError {
1705    AwsServiceError::aws_error(
1706        StatusCode::BAD_REQUEST,
1707        "TaskDoesNotExist",
1708        format!("Task does not exist: {token}"),
1709    )
1710}
1711
1712fn resource_not_found(arn: &str) -> AwsServiceError {
1713    AwsServiceError::aws_error(
1714        StatusCode::BAD_REQUEST,
1715        "ResourceNotFound",
1716        format!("Resource not found: '{arn}'"),
1717    )
1718}
1719
1720/// Parse + validate an alias `routingConfiguration` array.
1721///
1722/// AWS rules: 1 or 2 routes; weights are 0-100 and sum to 100; each
1723/// route must include `stateMachineVersionArn`.
1724fn parse_routing_configuration(
1725    routes: &[serde_json::Value],
1726) -> Result<Vec<crate::state::AliasRoute>, AwsServiceError> {
1727    if routes.is_empty() || routes.len() > 2 {
1728        return Err(AwsServiceError::aws_error(
1729            StatusCode::BAD_REQUEST,
1730            "ValidationException",
1731            "routingConfiguration must contain 1 or 2 routes.",
1732        ));
1733    }
1734    let parsed: Vec<crate::state::AliasRoute> = routes
1735        .iter()
1736        .map(|r| {
1737            let arn = r["stateMachineVersionArn"].as_str().ok_or_else(|| {
1738                AwsServiceError::aws_error(
1739                    StatusCode::BAD_REQUEST,
1740                    "ValidationException",
1741                    "routingConfiguration entries must contain stateMachineVersionArn.",
1742                )
1743            })?;
1744            let weight = r["weight"].as_i64().ok_or_else(|| {
1745                AwsServiceError::aws_error(
1746                    StatusCode::BAD_REQUEST,
1747                    "ValidationException",
1748                    "routingConfiguration entries must contain a numeric weight.",
1749                )
1750            })?;
1751            if !(0..=100).contains(&weight) {
1752                return Err(AwsServiceError::aws_error(
1753                    StatusCode::BAD_REQUEST,
1754                    "ValidationException",
1755                    format!("Invalid routing weight {weight}; must be 0-100."),
1756                ));
1757            }
1758            Ok(crate::state::AliasRoute {
1759                state_machine_version_arn: arn.to_string(),
1760                weight: weight as i32,
1761            })
1762        })
1763        .collect::<Result<_, _>>()?;
1764    let total: i32 = parsed.iter().map(|r| r.weight).sum();
1765    if total != 100 {
1766        return Err(AwsServiceError::aws_error(
1767            StatusCode::BAD_REQUEST,
1768            "ValidationException",
1769            format!("routingConfiguration weights must sum to 100, got {total}."),
1770        ));
1771    }
1772    Ok(parsed)
1773}
1774
1775fn validate_name(name: &str) -> Result<(), AwsServiceError> {
1776    if name.is_empty() || name.len() > 80 {
1777        return Err(AwsServiceError::aws_error(
1778            StatusCode::BAD_REQUEST,
1779            "InvalidName",
1780            format!("Invalid Name: '{name}' (length must be between 1 and 80 characters)"),
1781        ));
1782    }
1783    // Only allow alphanumeric, hyphens, and underscores
1784    if !name
1785        .chars()
1786        .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
1787    {
1788        return Err(AwsServiceError::aws_error(
1789            StatusCode::BAD_REQUEST,
1790            "InvalidName",
1791            format!(
1792                "Invalid Name: '{name}' (must only contain alphanumeric characters, hyphens, and underscores)"
1793            ),
1794        ));
1795    }
1796    Ok(())
1797}
1798
1799fn validate_definition(definition: &str) -> Result<(), AwsServiceError> {
1800    let parsed: Value = serde_json::from_str(definition).map_err(|e| {
1801        AwsServiceError::aws_error(
1802            StatusCode::BAD_REQUEST,
1803            "InvalidDefinition",
1804            format!("Invalid State Machine Definition: '{e}'"),
1805        )
1806    })?;
1807
1808    if parsed.get("StartAt").and_then(|v| v.as_str()).is_none() {
1809        return Err(AwsServiceError::aws_error(
1810            StatusCode::BAD_REQUEST,
1811            "InvalidDefinition",
1812            "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
1813                .to_string(),
1814        ));
1815    }
1816
1817    let states_obj = parsed
1818        .get("States")
1819        .and_then(|v| v.as_object())
1820        .ok_or_else(|| {
1821            AwsServiceError::aws_error(
1822                StatusCode::BAD_REQUEST,
1823                "InvalidDefinition",
1824                "Invalid State Machine Definition: 'MISSING_STATES' (States field is required)"
1825                    .to_string(),
1826            )
1827        })?;
1828
1829    let start_at = parsed["StartAt"].as_str().ok_or_else(|| {
1830        AwsServiceError::aws_error(
1831            StatusCode::BAD_REQUEST,
1832            "InvalidDefinition",
1833            "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
1834                .to_string(),
1835        )
1836    })?;
1837    if !states_obj.contains_key(start_at) {
1838        return Err(AwsServiceError::aws_error(
1839            StatusCode::BAD_REQUEST,
1840            "InvalidDefinition",
1841            format!(
1842                "Invalid State Machine Definition: 'MISSING_TRANSITION_TARGET' \
1843                 (StartAt '{start_at}' does not reference a valid state)"
1844            ),
1845        ));
1846    }
1847
1848    Ok(())
1849}
1850
1851fn execution_not_found(arn: &str) -> AwsServiceError {
1852    AwsServiceError::aws_error(
1853        StatusCode::BAD_REQUEST,
1854        "ExecutionDoesNotExist",
1855        format!("Execution Does Not Exist: '{arn}'"),
1856    )
1857}
1858
1859fn execution_to_json(exec: &Execution) -> Value {
1860    let mut resp = json!({
1861        "executionArn": exec.execution_arn,
1862        "stateMachineArn": exec.state_machine_arn,
1863        "name": exec.name,
1864        "status": exec.status.as_str(),
1865        "startDate": exec.start_date.timestamp() as f64,
1866    });
1867
1868    if let Some(ref input) = exec.input {
1869        resp["input"] = json!(input);
1870    }
1871    if let Some(ref output) = exec.output {
1872        resp["output"] = json!(output);
1873    }
1874    if let Some(stop) = exec.stop_date {
1875        resp["stopDate"] = json!(stop.timestamp() as f64);
1876    }
1877    if let Some(ref error) = exec.error {
1878        resp["error"] = json!(error);
1879    }
1880    if let Some(ref cause) = exec.cause {
1881        resp["cause"] = json!(cause);
1882    }
1883
1884    resp
1885}
1886
1887/// Convert event type like "PassStateEntered" to the details key format "passStateEntered".
1888fn camel_to_details_key(event_type: &str) -> String {
1889    let mut chars = event_type.chars();
1890    match chars.next() {
1891        None => String::new(),
1892        Some(c) => c.to_lowercase().to_string() + chars.as_str(),
1893    }
1894}
1895
1896fn validate_arn(arn: &str) -> Result<(), AwsServiceError> {
1897    if !arn.starts_with("arn:") {
1898        return Err(AwsServiceError::aws_error(
1899            StatusCode::BAD_REQUEST,
1900            "InvalidArn",
1901            format!("Invalid Arn: '{arn}'"),
1902        ));
1903    }
1904    Ok(())
1905}
1906
1907/// Enforce the Smithy @length on an ARN-typed field (`Arn` = 1..=256,
1908/// `LongArn` = 1..=2000). Empty / oversize ARNs map to `InvalidArn`, which
1909/// every ARN-bearing Step Functions operation declares.
1910fn validate_arn_length(field: &str, value: &str, max: usize) -> Result<(), AwsServiceError> {
1911    if value.is_empty() || value.len() > max {
1912        return Err(AwsServiceError::aws_error(
1913            StatusCode::BAD_REQUEST,
1914            "InvalidArn",
1915            format!("Invalid Arn at '{field}': must be 1..={max} characters"),
1916        ));
1917    }
1918    Ok(())
1919}
1920
1921/// `nextToken` is declared as `PageToken` (length 1..=1024). The only
1922/// error code declared on every paginated list op is `InvalidToken`, so
1923/// route both empty and oversize tokens through it.
1924fn validate_page_token(value: &str) -> Result<(), AwsServiceError> {
1925    if value.is_empty() || value.len() > 1024 {
1926        return Err(AwsServiceError::aws_error(
1927            StatusCode::BAD_REQUEST,
1928            "InvalidToken",
1929            "nextToken must be 1..=1024 characters",
1930        ));
1931    }
1932    Ok(())
1933}
1934
1935/// `maxResults` is typed as `PageSize` (range 0..=1000). The negative
1936/// probes flip below-min / above-max; since the only error declared on
1937/// `ListActivities` is `InvalidToken`, we map page-size violations to
1938/// the same code (matches how AWS surfaces malformed pagination input
1939/// for ops that don't model a separate `ValidationException`).
1940fn validate_max_results(value: i64) -> Result<(), AwsServiceError> {
1941    if !(0..=1000).contains(&value) {
1942        return Err(AwsServiceError::aws_error(
1943            StatusCode::BAD_REQUEST,
1944            "InvalidToken",
1945            format!("maxResults '{value}' is outside 0..=1000"),
1946        ));
1947    }
1948    Ok(())
1949}
1950
1951/// Start a Step Functions execution from a cross-service delivery (e.g. EventBridge).
1952///
1953/// This is the public entry point used by `StepFunctionsDeliveryImpl` in the server crate.
1954/// It mirrors the logic from `StartExecution` but without the AWS request/response wrapper.
1955/// Start a Step Functions execution from a cross-service delivery (e.g. EventBridge).
1956///
1957/// This is the public entry point used by `StepFunctionsDeliveryImpl` in the server crate.
1958/// It mirrors the logic from `StartExecution` but without the AWS request/response wrapper.
1959pub fn start_execution_from_delivery(
1960    state: &SharedStepFunctionsState,
1961    delivery: &Option<Arc<DeliveryBus>>,
1962    dynamodb_state: &Option<SharedDynamoDbState>,
1963    registry: &Option<SharedServiceRegistry>,
1964    state_machine_arn: &str,
1965    input: &str,
1966) {
1967    // Validate input is valid JSON
1968    if serde_json::from_str::<serde_json::Value>(input).is_err() {
1969        tracing::warn!(
1970            state_machine_arn,
1971            "Step Functions delivery: invalid JSON input, skipping execution"
1972        );
1973        return;
1974    }
1975
1976    let execution_name = uuid::Uuid::new_v4().to_string();
1977
1978    // Extract account_id from the state machine ARN
1979    let account_id = state_machine_arn
1980        .split(':')
1981        .nth(4)
1982        .unwrap_or("000000000000")
1983        .to_string();
1984
1985    let mut accounts = state.write();
1986    let st = accounts.get_or_create(&account_id);
1987    let sm = match st.state_machines.get(state_machine_arn) {
1988        Some(sm) => sm,
1989        None => {
1990            tracing::warn!(
1991                state_machine_arn,
1992                "Step Functions delivery: state machine not found"
1993            );
1994            return;
1995        }
1996    };
1997
1998    let sm_name = sm.name.clone();
1999    let definition = sm.definition.clone();
2000    let exec_arn = st.execution_arn(&sm_name, &execution_name);
2001
2002    let now = Utc::now();
2003    let execution = Execution {
2004        execution_arn: exec_arn.clone(),
2005        state_machine_arn: state_machine_arn.to_string(),
2006        state_machine_name: sm_name,
2007        name: execution_name,
2008        status: ExecutionStatus::Running,
2009        input: Some(input.to_string()),
2010        output: None,
2011        start_date: now,
2012        stop_date: None,
2013        error: None,
2014        cause: None,
2015        history_events: vec![],
2016        parent_execution_arn: None,
2017        is_sync: false,
2018        billed_duration_ms: None,
2019        billed_memory_mb: None,
2020    };
2021
2022    st.executions.insert(exec_arn.clone(), execution);
2023    let logging_config = sm.logging_configuration.clone();
2024    drop(accounts);
2025
2026    let shared_state = state.clone();
2027    let delivery = delivery.clone();
2028    let dynamodb_state = dynamodb_state.clone();
2029    let registry = registry.clone();
2030    let input = Some(input.to_string());
2031    tokio::spawn(async move {
2032        interpreter::execute_state_machine(
2033            shared_state,
2034            exec_arn,
2035            definition,
2036            input,
2037            delivery,
2038            dynamodb_state,
2039            registry,
2040            logging_config,
2041        )
2042        .await;
2043    });
2044}
2045
2046#[cfg(test)]
2047mod tests {
2048    use super::*;
2049    use http::{HeaderMap, Method};
2050    use parking_lot::RwLock;
2051    use serde_json::Value;
2052    use std::collections::HashMap;
2053    use std::sync::Arc;
2054
2055    fn make_state() -> SharedStepFunctionsState {
2056        Arc::new(RwLock::new(
2057            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
2058        ))
2059    }
2060
2061    fn make_request(action: &str, body: &str) -> AwsRequest {
2062        AwsRequest {
2063            service: "states".to_string(),
2064            action: action.to_string(),
2065            region: "us-east-1".to_string(),
2066            account_id: "123456789012".to_string(),
2067            request_id: "test-id".to_string(),
2068            headers: HeaderMap::new(),
2069            query_params: HashMap::new(),
2070            body: body.as_bytes().to_vec().into(),
2071            body_stream: parking_lot::Mutex::new(None),
2072            path_segments: vec![],
2073            raw_path: "/".to_string(),
2074            raw_query: String::new(),
2075            method: Method::POST,
2076            is_query_protocol: false,
2077            access_key_id: None,
2078            principal: None,
2079        }
2080    }
2081
2082    fn body_json(resp: &AwsResponse) -> Value {
2083        serde_json::from_slice(resp.body.expect_bytes()).unwrap()
2084    }
2085
2086    fn expect_err(result: Result<AwsResponse, AwsServiceError>) -> AwsServiceError {
2087        match result {
2088            Err(e) => e,
2089            Ok(_) => panic!("expected error, got Ok"),
2090        }
2091    }
2092
2093    const VALID_DEF: &str = r#"{"StartAt":"Pass","States":{"Pass":{"Type":"Pass","End":true}}}"#;
2094
2095    fn create_sm(svc: &StepFunctionsService, name: &str) -> String {
2096        let body = json!({
2097            "name": name,
2098            "definition": VALID_DEF,
2099            "roleArn": "arn:aws:iam::123456789012:role/test",
2100        });
2101        let req = make_request("CreateStateMachine", &body.to_string());
2102        let resp = svc.create_state_machine(&req).unwrap();
2103        let b = body_json(&resp);
2104        b["stateMachineArn"].as_str().unwrap().to_string()
2105    }
2106
2107    // ── CreateStateMachine ──
2108
2109    #[test]
2110    fn create_state_machine_basic() {
2111        let svc = StepFunctionsService::new(make_state());
2112        let arn = create_sm(&svc, "test-sm");
2113        assert!(arn.contains("test-sm"));
2114    }
2115
2116    #[test]
2117    fn create_state_machine_with_express_type() {
2118        let svc = StepFunctionsService::new(make_state());
2119        let body = json!({
2120            "name": "express-sm",
2121            "definition": VALID_DEF,
2122            "roleArn": "arn:aws:iam::123456789012:role/r",
2123            "type": "EXPRESS",
2124        });
2125        let req = make_request("CreateStateMachine", &body.to_string());
2126        let resp = svc.create_state_machine(&req).unwrap();
2127        let b = body_json(&resp);
2128        assert!(b["stateMachineArn"].as_str().is_some());
2129    }
2130
2131    #[test]
2132    fn create_state_machine_duplicate_fails() {
2133        let svc = StepFunctionsService::new(make_state());
2134        create_sm(&svc, "dup-sm");
2135        let body = json!({
2136            "name": "dup-sm",
2137            "definition": VALID_DEF,
2138            "roleArn": "arn:aws:iam::123456789012:role/r",
2139        });
2140        let req = make_request("CreateStateMachine", &body.to_string());
2141        let err = expect_err(svc.create_state_machine(&req));
2142        assert!(err.to_string().contains("StateMachineAlreadyExists"));
2143    }
2144
2145    #[test]
2146    fn create_state_machine_missing_name() {
2147        let svc = StepFunctionsService::new(make_state());
2148        let body = json!({
2149            "definition": VALID_DEF,
2150            "roleArn": "arn:aws:iam::123456789012:role/r",
2151        });
2152        let req = make_request("CreateStateMachine", &body.to_string());
2153        assert!(svc.create_state_machine(&req).is_err());
2154    }
2155
2156    #[test]
2157    fn create_state_machine_invalid_definition() {
2158        let svc = StepFunctionsService::new(make_state());
2159        let body = json!({
2160            "name": "bad-def",
2161            "definition": "not json",
2162            "roleArn": "arn:aws:iam::123456789012:role/r",
2163        });
2164        let req = make_request("CreateStateMachine", &body.to_string());
2165        let err = expect_err(svc.create_state_machine(&req));
2166        assert!(err.to_string().contains("InvalidDefinition"));
2167    }
2168
2169    #[test]
2170    fn create_state_machine_definition_missing_start_at() {
2171        let svc = StepFunctionsService::new(make_state());
2172        let body = json!({
2173            "name": "no-start",
2174            "definition": r#"{"States":{"S":{"Type":"Pass","End":true}}}"#,
2175            "roleArn": "arn:aws:iam::123456789012:role/r",
2176        });
2177        let req = make_request("CreateStateMachine", &body.to_string());
2178        let err = expect_err(svc.create_state_machine(&req));
2179        assert!(err.to_string().contains("InvalidDefinition"));
2180    }
2181
2182    #[test]
2183    fn create_state_machine_definition_missing_states() {
2184        let svc = StepFunctionsService::new(make_state());
2185        let body = json!({
2186            "name": "no-states",
2187            "definition": r#"{"StartAt":"S"}"#,
2188            "roleArn": "arn:aws:iam::123456789012:role/r",
2189        });
2190        let req = make_request("CreateStateMachine", &body.to_string());
2191        let err = expect_err(svc.create_state_machine(&req));
2192        assert!(err.to_string().contains("InvalidDefinition"));
2193    }
2194
2195    #[test]
2196    fn create_state_machine_definition_start_at_not_in_states() {
2197        let svc = StepFunctionsService::new(make_state());
2198        let body = json!({
2199            "name": "bad-start",
2200            "definition": r#"{"StartAt":"Missing","States":{"S":{"Type":"Pass","End":true}}}"#,
2201            "roleArn": "arn:aws:iam::123456789012:role/r",
2202        });
2203        let req = make_request("CreateStateMachine", &body.to_string());
2204        let err = expect_err(svc.create_state_machine(&req));
2205        assert!(err.to_string().contains("MISSING_TRANSITION_TARGET"));
2206    }
2207
2208    #[test]
2209    fn create_state_machine_invalid_type() {
2210        let svc = StepFunctionsService::new(make_state());
2211        let body = json!({
2212            "name": "bad-type",
2213            "definition": VALID_DEF,
2214            "roleArn": "arn:aws:iam::123456789012:role/r",
2215            "type": "INVALID",
2216        });
2217        let req = make_request("CreateStateMachine", &body.to_string());
2218        assert!(svc.create_state_machine(&req).is_err());
2219    }
2220
2221    #[test]
2222    fn create_state_machine_invalid_arn() {
2223        let svc = StepFunctionsService::new(make_state());
2224        let body = json!({
2225            "name": "bad-arn",
2226            "definition": VALID_DEF,
2227            "roleArn": "not-an-arn",
2228        });
2229        let req = make_request("CreateStateMachine", &body.to_string());
2230        let err = expect_err(svc.create_state_machine(&req));
2231        assert!(err.to_string().contains("InvalidArn"));
2232    }
2233
2234    #[test]
2235    fn create_state_machine_invalid_name() {
2236        let svc = StepFunctionsService::new(make_state());
2237        let body = json!({
2238            "name": "has spaces!",
2239            "definition": VALID_DEF,
2240            "roleArn": "arn:aws:iam::123456789012:role/r",
2241        });
2242        let req = make_request("CreateStateMachine", &body.to_string());
2243        let err = expect_err(svc.create_state_machine(&req));
2244        assert!(err.to_string().contains("InvalidName"));
2245    }
2246
2247    #[test]
2248    fn create_state_machine_name_too_long() {
2249        let svc = StepFunctionsService::new(make_state());
2250        let long_name = "a".repeat(81);
2251        let body = json!({
2252            "name": long_name,
2253            "definition": VALID_DEF,
2254            "roleArn": "arn:aws:iam::123456789012:role/r",
2255        });
2256        let req = make_request("CreateStateMachine", &body.to_string());
2257        let err = expect_err(svc.create_state_machine(&req));
2258        assert!(err.to_string().contains("InvalidName"));
2259    }
2260
2261    // ── DescribeStateMachine ──
2262
2263    #[test]
2264    fn describe_state_machine_found() {
2265        let svc = StepFunctionsService::new(make_state());
2266        let arn = create_sm(&svc, "desc-sm");
2267
2268        let req = make_request(
2269            "DescribeStateMachine",
2270            &json!({"stateMachineArn": arn}).to_string(),
2271        );
2272        let resp = svc.describe_state_machine(&req).unwrap();
2273        let b = body_json(&resp);
2274        assert_eq!(b["name"], "desc-sm");
2275        assert_eq!(b["status"], "ACTIVE");
2276        assert!(b["definition"].as_str().is_some());
2277    }
2278
2279    #[test]
2280    fn describe_state_machine_not_found() {
2281        let svc = StepFunctionsService::new(make_state());
2282        let req = make_request(
2283            "DescribeStateMachine",
2284            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
2285                .to_string(),
2286        );
2287        let err = expect_err(svc.describe_state_machine(&req));
2288        assert!(err.to_string().contains("StateMachineDoesNotExist"));
2289    }
2290
2291    // ── ListStateMachines ──
2292
2293    #[test]
2294    fn list_state_machines_empty() {
2295        let svc = StepFunctionsService::new(make_state());
2296        let req = make_request("ListStateMachines", "{}");
2297        let resp = svc.list_state_machines(&req).unwrap();
2298        let b = body_json(&resp);
2299        assert!(b["stateMachines"].as_array().unwrap().is_empty());
2300    }
2301
2302    #[test]
2303    fn list_state_machines_returns_created() {
2304        let svc = StepFunctionsService::new(make_state());
2305        create_sm(&svc, "sm-1");
2306        create_sm(&svc, "sm-2");
2307
2308        let req = make_request("ListStateMachines", "{}");
2309        let resp = svc.list_state_machines(&req).unwrap();
2310        let b = body_json(&resp);
2311        assert_eq!(b["stateMachines"].as_array().unwrap().len(), 2);
2312    }
2313
2314    // ── DeleteStateMachine ──
2315
2316    #[test]
2317    fn delete_state_machine() {
2318        let svc = StepFunctionsService::new(make_state());
2319        let arn = create_sm(&svc, "del-sm");
2320
2321        let req = make_request(
2322            "DeleteStateMachine",
2323            &json!({"stateMachineArn": arn}).to_string(),
2324        );
2325        svc.delete_state_machine(&req).unwrap();
2326
2327        // Describe should fail
2328        let req = make_request(
2329            "DescribeStateMachine",
2330            &json!({"stateMachineArn": arn}).to_string(),
2331        );
2332        assert!(svc.describe_state_machine(&req).is_err());
2333    }
2334
2335    #[test]
2336    fn delete_state_machine_nonexistent_succeeds() {
2337        let svc = StepFunctionsService::new(make_state());
2338        let req = make_request(
2339            "DeleteStateMachine",
2340            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
2341                .to_string(),
2342        );
2343        // AWS returns success even for nonexistent
2344        svc.delete_state_machine(&req).unwrap();
2345    }
2346
2347    // ── UpdateStateMachine ──
2348
2349    #[test]
2350    fn update_state_machine() {
2351        let svc = StepFunctionsService::new(make_state());
2352        let arn = create_sm(&svc, "upd-sm");
2353
2354        let new_def = r#"{"StartAt":"NewPass","States":{"NewPass":{"Type":"Pass","End":true}}}"#;
2355        let body = json!({
2356            "stateMachineArn": arn,
2357            "definition": new_def,
2358            "description": "updated",
2359        });
2360        let req = make_request("UpdateStateMachine", &body.to_string());
2361        let resp = svc.update_state_machine(&req).unwrap();
2362        let b = body_json(&resp);
2363        assert!(b["updateDate"].as_f64().is_some());
2364
2365        // Verify
2366        let req = make_request(
2367            "DescribeStateMachine",
2368            &json!({"stateMachineArn": arn}).to_string(),
2369        );
2370        let resp = svc.describe_state_machine(&req).unwrap();
2371        let b = body_json(&resp);
2372        assert!(b["definition"].as_str().unwrap().contains("NewPass"));
2373        assert_eq!(b["description"], "updated");
2374    }
2375
2376    #[test]
2377    fn update_state_machine_not_found() {
2378        let svc = StepFunctionsService::new(make_state());
2379        let body = json!({
2380            "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
2381            "definition": VALID_DEF,
2382        });
2383        let req = make_request("UpdateStateMachine", &body.to_string());
2384        let err = expect_err(svc.update_state_machine(&req));
2385        assert!(err.to_string().contains("StateMachineDoesNotExist"));
2386    }
2387
2388    // ── StartExecution ──
2389
2390    #[tokio::test]
2391    async fn start_execution_basic() {
2392        let svc = StepFunctionsService::new(make_state());
2393        let arn = create_sm(&svc, "exec-sm");
2394
2395        let body = json!({
2396            "stateMachineArn": arn,
2397            "input": r#"{"key":"value"}"#,
2398        });
2399        let req = make_request("StartExecution", &body.to_string());
2400        let resp = svc.start_execution(&req).unwrap();
2401        let b = body_json(&resp);
2402        assert!(b["executionArn"].as_str().is_some());
2403        assert!(b["startDate"].as_f64().is_some());
2404    }
2405
2406    #[tokio::test]
2407    async fn start_execution_with_name() {
2408        let svc = StepFunctionsService::new(make_state());
2409        let arn = create_sm(&svc, "named-exec");
2410
2411        let body = json!({
2412            "stateMachineArn": arn,
2413            "name": "my-execution",
2414        });
2415        let req = make_request("StartExecution", &body.to_string());
2416        let resp = svc.start_execution(&req).unwrap();
2417        let b = body_json(&resp);
2418        assert!(b["executionArn"].as_str().unwrap().contains("my-execution"));
2419    }
2420
2421    #[tokio::test]
2422    async fn start_execution_sm_not_found() {
2423        let svc = StepFunctionsService::new(make_state());
2424        let body = json!({
2425            "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
2426        });
2427        let req = make_request("StartExecution", &body.to_string());
2428        let err = expect_err(svc.start_execution(&req));
2429        assert!(err.to_string().contains("StateMachineDoesNotExist"));
2430    }
2431
2432    #[tokio::test]
2433    async fn start_execution_invalid_input() {
2434        let svc = StepFunctionsService::new(make_state());
2435        let arn = create_sm(&svc, "bad-input");
2436
2437        let body = json!({
2438            "stateMachineArn": arn,
2439            "input": "not json",
2440        });
2441        let req = make_request("StartExecution", &body.to_string());
2442        let err = expect_err(svc.start_execution(&req));
2443        assert!(err.to_string().contains("InvalidExecutionInput"));
2444    }
2445
2446    #[tokio::test]
2447    async fn start_execution_duplicate_name() {
2448        let svc = StepFunctionsService::new(make_state());
2449        let arn = create_sm(&svc, "dup-exec");
2450
2451        let body = json!({
2452            "stateMachineArn": arn,
2453            "name": "same-name",
2454        });
2455        let req = make_request("StartExecution", &body.to_string());
2456        svc.start_execution(&req).unwrap();
2457
2458        let req = make_request("StartExecution", &body.to_string());
2459        let err = expect_err(svc.start_execution(&req));
2460        assert!(err.to_string().contains("ExecutionAlreadyExists"));
2461    }
2462
2463    // ── DescribeExecution ──
2464
2465    #[tokio::test]
2466    async fn describe_execution_found() {
2467        let svc = StepFunctionsService::new(make_state());
2468        let sm_arn = create_sm(&svc, "desc-exec");
2469
2470        let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
2471        let req = make_request("StartExecution", &body.to_string());
2472        let resp = svc.start_execution(&req).unwrap();
2473        let exec_arn = body_json(&resp)["executionArn"]
2474            .as_str()
2475            .unwrap()
2476            .to_string();
2477
2478        let req = make_request(
2479            "DescribeExecution",
2480            &json!({"executionArn": exec_arn}).to_string(),
2481        );
2482        let resp = svc.describe_execution(&req).unwrap();
2483        let b = body_json(&resp);
2484        assert_eq!(b["name"], "e1");
2485        assert_eq!(b["status"], "RUNNING");
2486    }
2487
2488    #[tokio::test]
2489    async fn describe_execution_not_found() {
2490        let svc = StepFunctionsService::new(make_state());
2491        let req = make_request(
2492            "DescribeExecution",
2493            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
2494                .to_string(),
2495        );
2496        let err = expect_err(svc.describe_execution(&req));
2497        assert!(err.to_string().contains("ExecutionDoesNotExist"));
2498    }
2499
2500    // ── StopExecution ──
2501
2502    #[tokio::test]
2503    async fn stop_execution() {
2504        let svc = StepFunctionsService::new(make_state());
2505        let sm_arn = create_sm(&svc, "stop-sm");
2506
2507        let body = json!({"stateMachineArn": sm_arn, "name": "stop-e"});
2508        let req = make_request("StartExecution", &body.to_string());
2509        let resp = svc.start_execution(&req).unwrap();
2510        let exec_arn = body_json(&resp)["executionArn"]
2511            .as_str()
2512            .unwrap()
2513            .to_string();
2514
2515        let body = json!({
2516            "executionArn": exec_arn,
2517            "error": "UserAborted",
2518            "cause": "test stop",
2519        });
2520        let req = make_request("StopExecution", &body.to_string());
2521        let resp = svc.stop_execution(&req).unwrap();
2522        let b = body_json(&resp);
2523        assert!(b["stopDate"].as_f64().is_some());
2524
2525        // Verify aborted
2526        let req = make_request(
2527            "DescribeExecution",
2528            &json!({"executionArn": exec_arn}).to_string(),
2529        );
2530        let resp = svc.describe_execution(&req).unwrap();
2531        let b = body_json(&resp);
2532        assert_eq!(b["status"], "ABORTED");
2533        assert_eq!(b["error"], "UserAborted");
2534    }
2535
2536    #[tokio::test]
2537    async fn stop_execution_not_found() {
2538        let svc = StepFunctionsService::new(make_state());
2539        let req = make_request(
2540            "StopExecution",
2541            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
2542                .to_string(),
2543        );
2544        let err = expect_err(svc.stop_execution(&req));
2545        assert!(err.to_string().contains("ExecutionDoesNotExist"));
2546    }
2547
2548    // ── ListExecutions ──
2549
2550    #[tokio::test]
2551    async fn list_executions() {
2552        let svc = StepFunctionsService::new(make_state());
2553        let sm_arn = create_sm(&svc, "list-exec");
2554
2555        for i in 0..3 {
2556            let body = json!({"stateMachineArn": sm_arn, "name": format!("e{i}")});
2557            let req = make_request("StartExecution", &body.to_string());
2558            svc.start_execution(&req).unwrap();
2559        }
2560
2561        let req = make_request(
2562            "ListExecutions",
2563            &json!({"stateMachineArn": sm_arn}).to_string(),
2564        );
2565        let resp = svc.list_executions(&req).unwrap();
2566        let b = body_json(&resp);
2567        assert_eq!(b["executions"].as_array().unwrap().len(), 3);
2568    }
2569
2570    #[tokio::test]
2571    async fn list_executions_sm_not_found() {
2572        let svc = StepFunctionsService::new(make_state());
2573        let req = make_request(
2574            "ListExecutions",
2575            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
2576                .to_string(),
2577        );
2578        let err = expect_err(svc.list_executions(&req));
2579        assert!(err.to_string().contains("StateMachineDoesNotExist"));
2580    }
2581
2582    // ── GetExecutionHistory ──
2583
2584    #[tokio::test]
2585    async fn get_execution_history_not_found() {
2586        let svc = StepFunctionsService::new(make_state());
2587        let req = make_request(
2588            "GetExecutionHistory",
2589            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
2590                .to_string(),
2591        );
2592        let err = expect_err(svc.get_execution_history(&req));
2593        assert!(err.to_string().contains("ExecutionDoesNotExist"));
2594    }
2595
2596    // ── DescribeStateMachineForExecution ──
2597
2598    #[tokio::test]
2599    async fn describe_sm_for_execution() {
2600        let svc = StepFunctionsService::new(make_state());
2601        let sm_arn = create_sm(&svc, "sm-for-exec");
2602
2603        let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
2604        let req = make_request("StartExecution", &body.to_string());
2605        let resp = svc.start_execution(&req).unwrap();
2606        let exec_arn = body_json(&resp)["executionArn"]
2607            .as_str()
2608            .unwrap()
2609            .to_string();
2610
2611        let req = make_request(
2612            "DescribeStateMachineForExecution",
2613            &json!({"executionArn": exec_arn}).to_string(),
2614        );
2615        let resp = svc.describe_state_machine_for_execution(&req).unwrap();
2616        let b = body_json(&resp);
2617        assert_eq!(b["name"], "sm-for-exec");
2618    }
2619
2620    // ── Tags ──
2621
2622    #[test]
2623    fn tag_untag_list_tags() {
2624        let svc = StepFunctionsService::new(make_state());
2625        let arn = create_sm(&svc, "tagged-sm");
2626
2627        // Tag
2628        let body = json!({
2629            "resourceArn": arn,
2630            "tags": [{"key": "env", "value": "prod"}],
2631        });
2632        let req = make_request("TagResource", &body.to_string());
2633        svc.tag_resource(&req).unwrap();
2634
2635        // List
2636        let req = make_request(
2637            "ListTagsForResource",
2638            &json!({"resourceArn": arn}).to_string(),
2639        );
2640        let resp = svc.list_tags_for_resource(&req).unwrap();
2641        let b = body_json(&resp);
2642        let tags = b["tags"].as_array().unwrap();
2643        assert_eq!(tags.len(), 1);
2644        assert_eq!(tags[0]["key"], "env");
2645
2646        // Untag
2647        let body = json!({
2648            "resourceArn": arn,
2649            "tagKeys": ["env"],
2650        });
2651        let req = make_request("UntagResource", &body.to_string());
2652        svc.untag_resource(&req).unwrap();
2653
2654        // Verify empty
2655        let req = make_request(
2656            "ListTagsForResource",
2657            &json!({"resourceArn": arn}).to_string(),
2658        );
2659        let resp = svc.list_tags_for_resource(&req).unwrap();
2660        let b = body_json(&resp);
2661        assert!(b["tags"].as_array().unwrap().is_empty());
2662    }
2663
2664    #[test]
2665    fn tag_resource_not_found() {
2666        let svc = StepFunctionsService::new(make_state());
2667        let body = json!({
2668            "resourceArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
2669            "tags": [{"key": "k", "value": "v"}],
2670        });
2671        let req = make_request("TagResource", &body.to_string());
2672        let err = expect_err(svc.tag_resource(&req));
2673        assert!(err.to_string().contains("ResourceNotFound"));
2674    }
2675
2676    // ── Helper function tests ──
2677
2678    #[test]
2679    fn test_validate_name() {
2680        assert!(validate_name("valid-name").is_ok());
2681        assert!(validate_name("under_score").is_ok());
2682        assert!(validate_name("").is_err());
2683        assert!(validate_name("has spaces").is_err());
2684        assert!(validate_name(&"a".repeat(81)).is_err());
2685    }
2686
2687    #[test]
2688    fn test_validate_definition() {
2689        assert!(validate_definition(VALID_DEF).is_ok());
2690        assert!(validate_definition("not json").is_err());
2691        assert!(validate_definition(r#"{"States":{}}"#).is_err()); // missing StartAt
2692        assert!(validate_definition(r#"{"StartAt":"S"}"#).is_err()); // missing States
2693    }
2694
2695    #[test]
2696    fn test_validate_arn() {
2697        assert!(validate_arn("arn:aws:states:us-east-1:123:sm:test").is_ok());
2698        assert!(validate_arn("not-an-arn").is_err());
2699    }
2700
2701    #[test]
2702    fn test_camel_to_details_key() {
2703        assert_eq!(camel_to_details_key("PassStateEntered"), "passStateEntered");
2704        assert_eq!(camel_to_details_key(""), "");
2705    }
2706
2707    #[test]
2708    fn test_is_mutating_action() {
2709        assert!(is_mutating_action("CreateStateMachine"));
2710        assert!(is_mutating_action("StartExecution"));
2711        assert!(!is_mutating_action("DescribeStateMachine"));
2712        assert!(!is_mutating_action("ListStateMachines"));
2713    }
2714
2715    // ── StartSyncExecution ──
2716
2717    fn create_express_sm(svc: &StepFunctionsService, name: &str) -> String {
2718        let body = json!({
2719            "name": name,
2720            "definition": VALID_DEF,
2721            "roleArn": "arn:aws:iam::123456789012:role/test",
2722            "type": "EXPRESS",
2723        });
2724        let req = make_request("CreateStateMachine", &body.to_string());
2725        let resp = svc.create_state_machine(&req).unwrap();
2726        let b = body_json(&resp);
2727        b["stateMachineArn"].as_str().unwrap().to_string()
2728    }
2729
2730    #[tokio::test]
2731    async fn start_sync_execution_basic() {
2732        let svc = StepFunctionsService::new(make_state());
2733        let arn = create_express_sm(&svc, "sync-sm");
2734
2735        let body = json!({
2736            "stateMachineArn": arn,
2737            "input": r#"{"key":"value"}"#,
2738        });
2739        let req = make_request("StartSyncExecution", &body.to_string());
2740        let resp = svc.start_sync_execution(&req).await.unwrap();
2741        let b = body_json(&resp);
2742        assert!(b["executionArn"]
2743            .as_str()
2744            .unwrap()
2745            .contains("express:sync-sm"));
2746        assert_eq!(b["stateMachineArn"], arn);
2747        assert_eq!(b["status"], "SUCCEEDED");
2748        assert!(b["startDate"].as_i64().is_some());
2749        assert!(b["stopDate"].as_i64().is_some());
2750        assert!(b["output"].as_str().is_some());
2751        assert!(b["billingDetails"]["billedDurationInMilliseconds"]
2752            .as_i64()
2753            .is_some());
2754    }
2755
2756    #[tokio::test]
2757    async fn start_sync_execution_not_express() {
2758        let svc = StepFunctionsService::new(make_state());
2759        let arn = create_sm(&svc, "std-sm");
2760
2761        let body = json!({"stateMachineArn": arn});
2762        let req = make_request("StartSyncExecution", &body.to_string());
2763        let err = expect_err(svc.start_sync_execution(&req).await);
2764        assert!(err.to_string().contains("StateMachineTypeNotSupported"));
2765    }
2766
2767    #[tokio::test]
2768    async fn start_sync_execution_sm_not_found() {
2769        let svc = StepFunctionsService::new(make_state());
2770        let body = json!({
2771            "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
2772        });
2773        let req = make_request("StartSyncExecution", &body.to_string());
2774        let err = expect_err(svc.start_sync_execution(&req).await);
2775        assert!(err.to_string().contains("StateMachineDoesNotExist"));
2776    }
2777
2778    #[tokio::test]
2779    async fn start_sync_execution_records_introspection_fields() {
2780        let svc = StepFunctionsService::new(make_state());
2781        let arn = create_express_sm(&svc, "sync-introspect");
2782
2783        let body = json!({"stateMachineArn": arn, "input": "{}"});
2784        let req = make_request("StartSyncExecution", &body.to_string());
2785        let resp = svc.start_sync_execution(&req).await.unwrap();
2786        let b = body_json(&resp);
2787        let exec_arn = b["executionArn"].as_str().unwrap().to_string();
2788
2789        let accounts = svc.state.read();
2790        let state = accounts.get("123456789012").unwrap();
2791        let stored = state
2792            .executions
2793            .get(&exec_arn)
2794            .expect("sync execution should be persisted for introspection");
2795        assert!(stored.is_sync, "sync executions must be marked is_sync");
2796        assert_eq!(stored.billed_memory_mb, Some(64));
2797        assert!(
2798            stored.billed_duration_ms.is_some(),
2799            "billed_duration_ms must be populated after sync run"
2800        );
2801        assert!(
2802            stored.parent_execution_arn.is_none(),
2803            "top-level sync execution has no parent"
2804        );
2805    }
2806
2807    #[tokio::test]
2808    async fn start_sync_execution_invalid_input() {
2809        let svc = StepFunctionsService::new(make_state());
2810        let arn = create_express_sm(&svc, "bad-input-sync");
2811
2812        let body = json!({
2813            "stateMachineArn": arn,
2814            "input": "not json",
2815        });
2816        let req = make_request("StartSyncExecution", &body.to_string());
2817        let err = expect_err(svc.start_sync_execution(&req).await);
2818        assert!(err.to_string().contains("InvalidExecutionInput"));
2819    }
2820}