Skip to main content

fakecloud_stepfunctions/
service.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use chrono::Utc;
6use http::StatusCode;
7use serde_json::{json, Value};
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::pagination::paginate;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_core::validation::*;
14use fakecloud_dynamodb::state::SharedDynamoDbState;
15use fakecloud_persistence::SnapshotStore;
16
17use crate::interpreter;
18use crate::state::{
19    Execution, ExecutionStatus, SharedStepFunctionsState, StateMachine, StateMachineStatus,
20    StateMachineType, StepFunctionsSnapshot, StepFunctionsState,
21    STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
22};
23
24const SUPPORTED: &[&str] = &[
25    "CreateStateMachine",
26    "DescribeStateMachine",
27    "ListStateMachines",
28    "DeleteStateMachine",
29    "UpdateStateMachine",
30    "TagResource",
31    "UntagResource",
32    "ListTagsForResource",
33    "StartExecution",
34    "StopExecution",
35    "DescribeExecution",
36    "ListExecutions",
37    "GetExecutionHistory",
38    "DescribeStateMachineForExecution",
39    "CreateActivity",
40    "DeleteActivity",
41    "DescribeActivity",
42    "ListActivities",
43    "GetActivityTask",
44    "SendTaskFailure",
45    "SendTaskHeartbeat",
46    "SendTaskSuccess",
47    "PublishStateMachineVersion",
48    "DeleteStateMachineVersion",
49    "ListStateMachineVersions",
50    "CreateStateMachineAlias",
51    "DeleteStateMachineAlias",
52    "DescribeStateMachineAlias",
53    "ListStateMachineAliases",
54    "UpdateStateMachineAlias",
55    "DescribeMapRun",
56    "ListMapRuns",
57    "UpdateMapRun",
58    "RedriveExecution",
59    "StartSyncExecution",
60    "TestState",
61    "ValidateStateMachineDefinition",
62];
63
64pub struct StepFunctionsService {
65    state: SharedStepFunctionsState,
66    delivery: Option<Arc<DeliveryBus>>,
67    dynamodb_state: Option<SharedDynamoDbState>,
68    snapshot_store: Option<Arc<dyn SnapshotStore>>,
69    snapshot_lock: Arc<AsyncMutex<()>>,
70}
71
72impl StepFunctionsService {
73    pub fn new(state: SharedStepFunctionsState) -> Self {
74        Self {
75            state,
76            delivery: None,
77            dynamodb_state: None,
78            snapshot_store: None,
79            snapshot_lock: Arc::new(AsyncMutex::new(())),
80        }
81    }
82
83    pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
84        self.delivery = Some(delivery);
85        self
86    }
87
88    pub fn with_dynamodb(mut self, dynamodb_state: SharedDynamoDbState) -> Self {
89        self.dynamodb_state = Some(dynamodb_state);
90        self
91    }
92
93    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
94        self.snapshot_store = Some(store);
95        self
96    }
97
98    async fn save_snapshot(&self) {
99        let Some(store) = self.snapshot_store.clone() else {
100            return;
101        };
102        let _guard = self.snapshot_lock.lock().await;
103        let snapshot = StepFunctionsSnapshot {
104            schema_version: STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
105            state: None,
106            accounts: Some(self.state.read().clone()),
107        };
108        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
109            let bytes = serde_json::to_vec(&snapshot)
110                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
111            store.save(&bytes)
112        })
113        .await;
114        match join {
115            Ok(Ok(())) => {}
116            Ok(Err(err)) => tracing::error!(%err, "failed to write stepfunctions snapshot"),
117            Err(err) => tracing::error!(%err, "stepfunctions snapshot task panicked"),
118        }
119    }
120}
121
122fn is_mutating_action(action: &str) -> bool {
123    matches!(
124        action,
125        "CreateStateMachine"
126            | "DeleteStateMachine"
127            | "UpdateStateMachine"
128            | "TagResource"
129            | "UntagResource"
130            | "StartExecution"
131            | "StopExecution"
132            | "CreateActivity"
133            | "DeleteActivity"
134            | "GetActivityTask"
135            | "SendTaskFailure"
136            | "SendTaskHeartbeat"
137            | "SendTaskSuccess"
138            | "PublishStateMachineVersion"
139            | "DeleteStateMachineVersion"
140            | "CreateStateMachineAlias"
141            | "DeleteStateMachineAlias"
142            | "UpdateStateMachineAlias"
143            | "UpdateMapRun"
144            | "RedriveExecution"
145            | "StartSyncExecution"
146    )
147}
148
149#[async_trait]
150impl AwsService for StepFunctionsService {
151    fn service_name(&self) -> &str {
152        "states"
153    }
154
155    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
156        let mutates = is_mutating_action(req.action.as_str());
157        let result = match req.action.as_str() {
158            "CreateStateMachine" => self.create_state_machine(&req),
159            "DescribeStateMachine" => self.describe_state_machine(&req),
160            "ListStateMachines" => self.list_state_machines(&req),
161            "DeleteStateMachine" => self.delete_state_machine(&req),
162            "UpdateStateMachine" => self.update_state_machine(&req),
163            "TagResource" => self.tag_resource(&req),
164            "UntagResource" => self.untag_resource(&req),
165            "ListTagsForResource" => self.list_tags_for_resource(&req),
166            "StartExecution" => self.start_execution(&req),
167            "StopExecution" => self.stop_execution(&req),
168            "DescribeExecution" => self.describe_execution(&req),
169            "ListExecutions" => self.list_executions(&req),
170            "GetExecutionHistory" => self.get_execution_history(&req),
171            "DescribeStateMachineForExecution" => self.describe_state_machine_for_execution(&req),
172            "CreateActivity" => self.create_activity(&req),
173            "DeleteActivity" => self.delete_activity(&req),
174            "DescribeActivity" => self.describe_activity(&req),
175            "ListActivities" => self.list_activities(&req),
176            "GetActivityTask" => self.get_activity_task(&req).await,
177            "SendTaskFailure" => self.send_task_failure(&req),
178            "SendTaskHeartbeat" => self.send_task_heartbeat(&req),
179            "SendTaskSuccess" => self.send_task_success(&req),
180            "PublishStateMachineVersion" => self.publish_state_machine_version(&req),
181            "DeleteStateMachineVersion" => self.delete_state_machine_version(&req),
182            "ListStateMachineVersions" => self.list_state_machine_versions(&req),
183            "CreateStateMachineAlias" => self.create_state_machine_alias(&req),
184            "DeleteStateMachineAlias" => self.delete_state_machine_alias(&req),
185            "DescribeStateMachineAlias" => self.describe_state_machine_alias(&req),
186            "ListStateMachineAliases" => self.list_state_machine_aliases(&req),
187            "UpdateStateMachineAlias" => self.update_state_machine_alias(&req),
188            "DescribeMapRun" => self.describe_map_run(&req),
189            "ListMapRuns" => self.list_map_runs(&req),
190            "UpdateMapRun" => self.update_map_run(&req),
191            "RedriveExecution" => self.redrive_execution(&req),
192            "StartSyncExecution" => self.start_sync_execution(&req),
193            "TestState" => self.test_state(&req),
194            "ValidateStateMachineDefinition" => self.validate_state_machine_definition(&req),
195            _ => Err(AwsServiceError::action_not_implemented(
196                "states",
197                &req.action,
198            )),
199        };
200        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
201            self.save_snapshot().await;
202        }
203        result
204    }
205
206    fn supported_actions(&self) -> &[&str] {
207        SUPPORTED
208    }
209}
210
211impl StepFunctionsService {
212    // ─── State Machine CRUD ─────────────────────────────────────────────
213
214    fn create_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
215        let body = req.json_body();
216
217        validate_required("name", &body["name"])?;
218        let name = body["name"].as_str().ok_or_else(|| missing("name"))?;
219        validate_name(name)?;
220
221        validate_required("definition", &body["definition"])?;
222        let definition = body["definition"]
223            .as_str()
224            .ok_or_else(|| missing("definition"))?;
225        validate_definition(definition)?;
226
227        validate_required("roleArn", &body["roleArn"])?;
228        let role_arn = body["roleArn"].as_str().ok_or_else(|| missing("roleArn"))?;
229        validate_arn(role_arn)?;
230
231        let machine_type = if let Some(t) = body["type"].as_str() {
232            StateMachineType::parse(t).ok_or_else(|| {
233                AwsServiceError::aws_error(
234                    StatusCode::BAD_REQUEST,
235                    "ValidationException",
236                    format!(
237                        "Value '{t}' at 'type' failed to satisfy constraint: \
238                         Member must satisfy enum value set: [STANDARD, EXPRESS]"
239                    ),
240                )
241            })?
242        } else {
243            StateMachineType::Standard
244        };
245
246        let mut accounts = self.state.write();
247        let state = accounts.get_or_create(&req.account_id);
248        let arn = state.state_machine_arn(name);
249
250        // Check if name already exists
251        if state.state_machines.values().any(|sm| sm.name == name) {
252            return Err(AwsServiceError::aws_error(
253                StatusCode::CONFLICT,
254                "StateMachineAlreadyExists",
255                format!("State Machine Already Exists: '{arn}'"),
256            ));
257        }
258
259        let now = Utc::now();
260        let revision_id = uuid::Uuid::new_v4().to_string();
261
262        let mut tags = HashMap::new();
263        if !body["tags"].is_null() {
264            fakecloud_core::tags::apply_tags(&mut tags, &body, "tags", "key", "value").map_err(
265                |f| {
266                    AwsServiceError::aws_error(
267                        StatusCode::BAD_REQUEST,
268                        "ValidationException",
269                        format!("{f} must be a list"),
270                    )
271                },
272            )?;
273        }
274
275        let sm = StateMachine {
276            name: name.to_string(),
277            arn: arn.clone(),
278            definition: definition.to_string(),
279            role_arn: role_arn.to_string(),
280            machine_type,
281            status: StateMachineStatus::Active,
282            creation_date: now,
283            update_date: now,
284            tags,
285            revision_id: revision_id.clone(),
286            logging_configuration: body.get("loggingConfiguration").cloned(),
287            tracing_configuration: body.get("tracingConfiguration").cloned(),
288            description: body["description"].as_str().unwrap_or("").to_string(),
289        };
290
291        state.state_machines.insert(arn.clone(), sm);
292
293        Ok(AwsResponse::ok_json(json!({
294            "stateMachineArn": arn,
295            "creationDate": now.timestamp() as f64,
296            "stateMachineVersionArn": arn,
297        })))
298    }
299
300    fn describe_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
301        let body = req.json_body();
302        validate_required("stateMachineArn", &body["stateMachineArn"])?;
303        let arn = body["stateMachineArn"]
304            .as_str()
305            .ok_or_else(|| missing("stateMachineArn"))?;
306        validate_arn(arn)?;
307
308        let accounts = self.state.read();
309        let empty = StepFunctionsState::new(&req.account_id, &req.region);
310        let state = accounts.get(&req.account_id).unwrap_or(&empty);
311        let sm = state
312            .state_machines
313            .get(arn)
314            .ok_or_else(|| state_machine_not_found(arn))?;
315
316        Ok(AwsResponse::ok_json(state_machine_to_json(sm)))
317    }
318
319    fn list_state_machines(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
320        let body = req.json_body();
321        let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
322        validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
323        let next_token = body["nextToken"].as_str();
324
325        let accounts = self.state.read();
326        let empty = StepFunctionsState::new(&req.account_id, &req.region);
327        let state = accounts.get(&req.account_id).unwrap_or(&empty);
328        let mut machines: Vec<&StateMachine> = state.state_machines.values().collect();
329        machines.sort_by(|a, b| a.name.cmp(&b.name));
330
331        let items: Vec<Value> = machines
332            .iter()
333            .map(|sm| {
334                json!({
335                    "name": sm.name,
336                    "stateMachineArn": sm.arn,
337                    "type": sm.machine_type.as_str(),
338                    "creationDate": sm.creation_date.timestamp() as f64,
339                })
340            })
341            .collect();
342
343        let (page, token) = paginate(&items, next_token, max_results);
344
345        let mut resp = json!({ "stateMachines": page });
346        if let Some(t) = token {
347            resp["nextToken"] = json!(t);
348        }
349        Ok(AwsResponse::ok_json(resp))
350    }
351
352    fn delete_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
353        let body = req.json_body();
354        validate_required("stateMachineArn", &body["stateMachineArn"])?;
355        let arn = body["stateMachineArn"]
356            .as_str()
357            .ok_or_else(|| missing("stateMachineArn"))?;
358        validate_arn(arn)?;
359
360        let mut accounts = self.state.write();
361        let state = accounts.get_or_create(&req.account_id);
362        // AWS returns success even if it doesn't exist
363        state.state_machines.remove(arn);
364
365        Ok(AwsResponse::ok_json(json!({})))
366    }
367
368    fn update_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
369        let body = req.json_body();
370        validate_required("stateMachineArn", &body["stateMachineArn"])?;
371        let arn = body["stateMachineArn"]
372            .as_str()
373            .ok_or_else(|| missing("stateMachineArn"))?;
374        validate_arn(arn)?;
375
376        let mut accounts = self.state.write();
377        let state = accounts.get_or_create(&req.account_id);
378        let sm = state
379            .state_machines
380            .get_mut(arn)
381            .ok_or_else(|| state_machine_not_found(arn))?;
382
383        if let Some(definition) = body["definition"].as_str() {
384            validate_definition(definition)?;
385            sm.definition = definition.to_string();
386        }
387
388        if let Some(role_arn) = body["roleArn"].as_str() {
389            validate_arn(role_arn)?;
390            sm.role_arn = role_arn.to_string();
391        }
392
393        if let Some(logging) = body.get("loggingConfiguration") {
394            sm.logging_configuration = Some(logging.clone());
395        }
396
397        if let Some(tracing) = body.get("tracingConfiguration") {
398            sm.tracing_configuration = Some(tracing.clone());
399        }
400
401        if let Some(description) = body["description"].as_str() {
402            sm.description = description.to_string();
403        }
404
405        let now = Utc::now();
406        sm.update_date = now;
407        sm.revision_id = uuid::Uuid::new_v4().to_string();
408
409        let revision_id = sm.revision_id.clone();
410        let sm_arn = sm.arn.clone();
411
412        Ok(AwsResponse::ok_json(json!({
413            "updateDate": now.timestamp() as f64,
414            "revisionId": revision_id,
415            "stateMachineVersionArn": sm_arn,
416        })))
417    }
418
419    // ─── Execution Lifecycle ──────────────────────────────────────────
420
421    fn start_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
422        let body = req.json_body();
423        validate_required("stateMachineArn", &body["stateMachineArn"])?;
424        let sm_arn = body["stateMachineArn"]
425            .as_str()
426            .ok_or_else(|| missing("stateMachineArn"))?;
427        validate_arn(sm_arn)?;
428
429        let input = body["input"].as_str().map(|s| s.to_string());
430
431        // Validate input is valid JSON if provided
432        if let Some(ref input_str) = input {
433            let _: serde_json::Value = serde_json::from_str(input_str).map_err(|_| {
434                AwsServiceError::aws_error(
435                    StatusCode::BAD_REQUEST,
436                    "InvalidExecutionInput",
437                    "Invalid execution input: must be valid JSON".to_string(),
438                )
439            })?;
440        }
441
442        let execution_name = body["name"]
443            .as_str()
444            .map(|s| s.to_string())
445            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
446
447        if let Some(name) = body["name"].as_str() {
448            validate_name(name)?;
449        }
450
451        let mut accounts = self.state.write();
452        let state = accounts.get_or_create(&req.account_id);
453        let sm = state
454            .state_machines
455            .get(sm_arn)
456            .ok_or_else(|| state_machine_not_found(sm_arn))?;
457
458        let sm_name = sm.name.clone();
459        let definition = sm.definition.clone();
460        let exec_arn = state.execution_arn(&sm_name, &execution_name);
461
462        // Check for duplicate execution name
463        if state.executions.contains_key(&exec_arn) {
464            return Err(AwsServiceError::aws_error(
465                StatusCode::CONFLICT,
466                "ExecutionAlreadyExists",
467                format!("Execution Already Exists: '{exec_arn}'"),
468            ));
469        }
470
471        let now = Utc::now();
472        let execution = Execution {
473            execution_arn: exec_arn.clone(),
474            state_machine_arn: sm_arn.to_string(),
475            state_machine_name: sm_name,
476            name: execution_name,
477            status: ExecutionStatus::Running,
478            input: input.clone(),
479            output: None,
480            start_date: now,
481            stop_date: None,
482            error: None,
483            cause: None,
484            history_events: vec![],
485        };
486
487        state.executions.insert(exec_arn.clone(), execution);
488        drop(accounts);
489
490        // Spawn async execution
491        let shared_state = self.state.clone();
492        let exec_arn_clone = exec_arn.clone();
493        let input_clone = input;
494        let delivery = self.delivery.clone();
495        let dynamodb_state = self.dynamodb_state.clone();
496        tokio::spawn(async move {
497            interpreter::execute_state_machine(
498                shared_state,
499                exec_arn_clone,
500                definition,
501                input_clone,
502                delivery,
503                dynamodb_state,
504            )
505            .await;
506        });
507
508        Ok(AwsResponse::ok_json(json!({
509            "executionArn": exec_arn,
510            "startDate": now.timestamp() as f64,
511        })))
512    }
513
514    fn stop_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
515        let body = req.json_body();
516        validate_required("executionArn", &body["executionArn"])?;
517        let exec_arn = body["executionArn"]
518            .as_str()
519            .ok_or_else(|| missing("executionArn"))?;
520
521        let error = body["error"].as_str().map(|s| s.to_string());
522        let cause = body["cause"].as_str().map(|s| s.to_string());
523
524        let mut accounts = self.state.write();
525        let state = accounts.get_or_create(&req.account_id);
526        let exec = state
527            .executions
528            .get_mut(exec_arn)
529            .ok_or_else(|| execution_not_found(exec_arn))?;
530
531        if exec.status != ExecutionStatus::Running {
532            return Err(AwsServiceError::aws_error(
533                StatusCode::BAD_REQUEST,
534                "ExecutionNotRunning",
535                format!("Execution is not running: '{exec_arn}'"),
536            ));
537        }
538
539        let now = Utc::now();
540        exec.status = ExecutionStatus::Aborted;
541        exec.stop_date = Some(now);
542        exec.error = error;
543        exec.cause = cause;
544
545        Ok(AwsResponse::ok_json(json!({
546            "stopDate": now.timestamp() as f64,
547        })))
548    }
549
550    fn describe_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
551        let body = req.json_body();
552        validate_required("executionArn", &body["executionArn"])?;
553        let exec_arn = body["executionArn"]
554            .as_str()
555            .ok_or_else(|| missing("executionArn"))?;
556
557        let accounts = self.state.read();
558        let empty = StepFunctionsState::new(&req.account_id, &req.region);
559        let state = accounts.get(&req.account_id).unwrap_or(&empty);
560        let exec = state
561            .executions
562            .get(exec_arn)
563            .ok_or_else(|| execution_not_found(exec_arn))?;
564
565        Ok(AwsResponse::ok_json(execution_to_json(exec)))
566    }
567
568    fn list_executions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
569        let body = req.json_body();
570        validate_required("stateMachineArn", &body["stateMachineArn"])?;
571        let sm_arn = body["stateMachineArn"]
572            .as_str()
573            .ok_or_else(|| missing("stateMachineArn"))?;
574        validate_arn(sm_arn)?;
575
576        let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
577        validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
578        let next_token = body["nextToken"].as_str();
579        let status_filter = body["statusFilter"].as_str();
580
581        let accounts = self.state.read();
582        let empty = StepFunctionsState::new(&req.account_id, &req.region);
583        let state = accounts.get(&req.account_id).unwrap_or(&empty);
584
585        // Verify state machine exists
586        if !state.state_machines.contains_key(sm_arn) {
587            return Err(state_machine_not_found(sm_arn));
588        }
589
590        let mut executions: Vec<&Execution> = state
591            .executions
592            .values()
593            .filter(|e| e.state_machine_arn == sm_arn)
594            .filter(|e| {
595                status_filter
596                    .map(|sf| e.status.as_str() == sf)
597                    .unwrap_or(true)
598            })
599            .collect();
600
601        // Sort by start date descending (most recent first)
602        executions.sort_by_key(|e| std::cmp::Reverse(e.start_date));
603
604        let items: Vec<Value> = executions
605            .iter()
606            .map(|e| {
607                let mut item = json!({
608                    "executionArn": e.execution_arn,
609                    "stateMachineArn": e.state_machine_arn,
610                    "name": e.name,
611                    "status": e.status.as_str(),
612                    "startDate": e.start_date.timestamp() as f64,
613                });
614                if let Some(stop) = e.stop_date {
615                    item["stopDate"] = json!(stop.timestamp() as f64);
616                }
617                item
618            })
619            .collect();
620
621        let (page, token) = paginate(&items, next_token, max_results);
622
623        let mut resp = json!({ "executions": page });
624        if let Some(t) = token {
625            resp["nextToken"] = json!(t);
626        }
627        Ok(AwsResponse::ok_json(resp))
628    }
629
630    fn get_execution_history(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
631        let body = req.json_body();
632        validate_required("executionArn", &body["executionArn"])?;
633        let exec_arn = body["executionArn"]
634            .as_str()
635            .ok_or_else(|| missing("executionArn"))?;
636
637        let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
638        validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
639        let next_token = body["nextToken"].as_str();
640        let reverse_order = body["reverseOrder"].as_bool().unwrap_or(false);
641
642        let accounts = self.state.read();
643        let empty = StepFunctionsState::new(&req.account_id, &req.region);
644        let state = accounts.get(&req.account_id).unwrap_or(&empty);
645        let exec = state
646            .executions
647            .get(exec_arn)
648            .ok_or_else(|| execution_not_found(exec_arn))?;
649
650        let mut events: Vec<Value> = exec
651            .history_events
652            .iter()
653            .map(|e| {
654                json!({
655                    "id": e.id,
656                    "type": e.event_type,
657                    "timestamp": e.timestamp.timestamp() as f64,
658                    "previousEventId": e.previous_event_id,
659                    format!("{}EventDetails", camel_to_details_key(&e.event_type)): e.details,
660                })
661            })
662            .collect();
663
664        if reverse_order {
665            events.reverse();
666        }
667
668        let (page, token) = paginate(&events, next_token, max_results);
669
670        let mut resp = json!({ "events": page });
671        if let Some(t) = token {
672            resp["nextToken"] = json!(t);
673        }
674        Ok(AwsResponse::ok_json(resp))
675    }
676
677    fn describe_state_machine_for_execution(
678        &self,
679        req: &AwsRequest,
680    ) -> Result<AwsResponse, AwsServiceError> {
681        let body = req.json_body();
682        validate_required("executionArn", &body["executionArn"])?;
683        let exec_arn = body["executionArn"]
684            .as_str()
685            .ok_or_else(|| missing("executionArn"))?;
686
687        let accounts = self.state.read();
688        let empty = StepFunctionsState::new(&req.account_id, &req.region);
689        let state = accounts.get(&req.account_id).unwrap_or(&empty);
690        let exec = state
691            .executions
692            .get(exec_arn)
693            .ok_or_else(|| execution_not_found(exec_arn))?;
694
695        let sm = state
696            .state_machines
697            .get(&exec.state_machine_arn)
698            .ok_or_else(|| state_machine_not_found(&exec.state_machine_arn))?;
699
700        Ok(AwsResponse::ok_json(state_machine_to_json(sm)))
701    }
702
703    // ─── Tagging ────────────────────────────────────────────────────────
704
705    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
706        let body = req.json_body();
707        validate_required("resourceArn", &body["resourceArn"])?;
708        let arn = body["resourceArn"]
709            .as_str()
710            .ok_or_else(|| missing("resourceArn"))?;
711        validate_arn(arn)?;
712        validate_required("tags", &body["tags"])?;
713
714        let mut accounts = self.state.write();
715        let state = accounts.get_or_create(&req.account_id);
716        let sm = state
717            .state_machines
718            .get_mut(arn)
719            .ok_or_else(|| resource_not_found(arn))?;
720
721        fakecloud_core::tags::apply_tags(&mut sm.tags, &body, "tags", "key", "value").map_err(
722            |f| {
723                AwsServiceError::aws_error(
724                    StatusCode::BAD_REQUEST,
725                    "ValidationException",
726                    format!("{f} must be a list"),
727                )
728            },
729        )?;
730
731        Ok(AwsResponse::ok_json(json!({})))
732    }
733
734    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
735        let body = req.json_body();
736        validate_required("resourceArn", &body["resourceArn"])?;
737        let arn = body["resourceArn"]
738            .as_str()
739            .ok_or_else(|| missing("resourceArn"))?;
740        validate_arn(arn)?;
741        validate_required("tagKeys", &body["tagKeys"])?;
742
743        let mut accounts = self.state.write();
744        let state = accounts.get_or_create(&req.account_id);
745        let sm = state
746            .state_machines
747            .get_mut(arn)
748            .ok_or_else(|| resource_not_found(arn))?;
749
750        fakecloud_core::tags::remove_tags(&mut sm.tags, &body, "tagKeys").map_err(|f| {
751            AwsServiceError::aws_error(
752                StatusCode::BAD_REQUEST,
753                "ValidationException",
754                format!("{f} must be a list"),
755            )
756        })?;
757
758        Ok(AwsResponse::ok_json(json!({})))
759    }
760
761    fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
762        let body = req.json_body();
763        validate_required("resourceArn", &body["resourceArn"])?;
764        let arn = body["resourceArn"]
765            .as_str()
766            .ok_or_else(|| missing("resourceArn"))?;
767        validate_arn(arn)?;
768
769        let accounts = self.state.read();
770        let empty = StepFunctionsState::new(&req.account_id, &req.region);
771        let state = accounts.get(&req.account_id).unwrap_or(&empty);
772        let sm = state
773            .state_machines
774            .get(arn)
775            .ok_or_else(|| resource_not_found(arn))?;
776
777        let tags = fakecloud_core::tags::tags_to_json(&sm.tags, "key", "value");
778
779        Ok(AwsResponse::ok_json(json!({ "tags": tags })))
780    }
781
782    // ─── Activities ─────────────────────────────────────────────────────
783
784    fn create_activity(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
785        let body = req.json_body();
786        let name = body["name"].as_str().ok_or_else(|| missing("name"))?;
787        validate_name(name)?;
788        let mut accounts = self.state.write();
789        let state = accounts.get_or_create(&req.account_id);
790        let arn = format!(
791            "arn:aws:states:{}:{}:activity:{}",
792            state.region, state.account_id, name
793        );
794        if state.activities.contains_key(&arn) {
795            return Err(AwsServiceError::aws_error(
796                StatusCode::BAD_REQUEST,
797                "ActivityAlreadyExists",
798                format!("Activity already exists: {arn}"),
799            ));
800        }
801        let activity = crate::state::Activity {
802            name: name.to_string(),
803            arn: arn.clone(),
804            creation_date: chrono::Utc::now(),
805            tags: HashMap::new(),
806        };
807        state.activities.insert(arn.clone(), activity.clone());
808        Ok(AwsResponse::ok_json(json!({
809            "activityArn": arn,
810            "creationDate": activity.creation_date.timestamp(),
811        })))
812    }
813
814    fn delete_activity(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
815        let body = req.json_body();
816        let arn = body["activityArn"]
817            .as_str()
818            .ok_or_else(|| missing("activityArn"))?
819            .to_string();
820        let mut accounts = self.state.write();
821        let state = accounts.get_or_create(&req.account_id);
822        state.activities.remove(&arn);
823        Ok(AwsResponse::ok_json(json!({})))
824    }
825
826    fn describe_activity(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
827        let body = req.json_body();
828        let arn = body["activityArn"]
829            .as_str()
830            .ok_or_else(|| missing("activityArn"))?
831            .to_string();
832        let accounts = self.state.read();
833        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
834        let state = accounts.get(&req.account_id).unwrap_or(&empty);
835        let a = state.activities.get(&arn).ok_or_else(|| {
836            AwsServiceError::aws_error(
837                StatusCode::BAD_REQUEST,
838                "ActivityDoesNotExist",
839                format!("Activity does not exist: {arn}"),
840            )
841        })?;
842        Ok(AwsResponse::ok_json(json!({
843            "activityArn": a.arn,
844            "name": a.name,
845            "creationDate": a.creation_date.timestamp(),
846        })))
847    }
848
849    fn list_activities(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
850        let accounts = self.state.read();
851        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
852        let state = accounts.get(&req.account_id).unwrap_or(&empty);
853        let mut activities: Vec<&crate::state::Activity> = state.activities.values().collect();
854        activities.sort_by(|a, b| a.name.cmp(&b.name));
855        let body = json!({
856            "activities": activities.iter().map(|a| json!({
857                "activityArn": a.arn,
858                "name": a.name,
859                "creationDate": a.creation_date.timestamp(),
860            })).collect::<Vec<_>>(),
861        });
862        Ok(AwsResponse::ok_json(body))
863    }
864
865    async fn get_activity_task(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
866        let body = req.json_body();
867        let arn = body["activityArn"]
868            .as_str()
869            .ok_or_else(|| missing("activityArn"))?
870            .to_string();
871        // Activity must exist before we'll accept long-poll calls.
872        {
873            let accounts = self.state.read();
874            let state = accounts
875                .get(&req.account_id)
876                .ok_or_else(|| activity_not_found(&arn))?;
877            if !state.activities.contains_key(&arn) {
878                return Err(activity_not_found(&arn));
879            }
880        }
881
882        // AWS GetActivityTask blocks up to 60s. fakecloud defaults to 5s
883        // so test suites don't stall when no worker is feeding the queue.
884        let max_wait_secs: u64 = std::env::var("FAKECLOUD_SFN_GET_ACTIVITY_TIMEOUT_SECS")
885            .ok()
886            .and_then(|s| s.parse().ok())
887            .unwrap_or(5);
888        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(max_wait_secs);
889
890        loop {
891            // Try to dequeue oldest PENDING token for this activity.
892            {
893                let mut accounts = self.state.write();
894                let state = accounts.get_or_create(&req.account_id);
895                let mut candidates: Vec<(String, chrono::DateTime<chrono::Utc>)> = state
896                    .task_tokens
897                    .iter()
898                    .filter(|(_, t)| t.activity_arn == arn && t.status == "PENDING")
899                    .map(|(k, t)| (k.clone(), t.created_at))
900                    .collect();
901                candidates.sort_by_key(|c| c.1);
902                if let Some((token, _)) = candidates.into_iter().next() {
903                    let now = chrono::Utc::now();
904                    let entry = state.task_tokens.get_mut(&token).expect("just looked up");
905                    entry.status = "IN_PROGRESS".to_string();
906                    entry.last_heartbeat_at = Some(now);
907                    let input = entry.input.clone().unwrap_or_else(|| "{}".to_string());
908                    return Ok(AwsResponse::ok_json(json!({
909                        "taskToken": token,
910                        "input": input,
911                    })));
912                }
913            }
914            if std::time::Instant::now() >= deadline {
915                // No task available in window — return empty token (matches
916                // AWS behavior).
917                return Ok(AwsResponse::ok_json(json!({
918                    "taskToken": "",
919                    "input": "",
920                })));
921            }
922            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
923        }
924    }
925
926    fn send_task_success(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
927        self.update_task_token(req, "SUCCEEDED")
928    }
929
930    fn send_task_failure(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
931        self.update_task_token(req, "FAILED")
932    }
933
934    fn send_task_heartbeat(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
935        // Heartbeats only refresh `last_heartbeat_at`; they don't change
936        // the task's lifecycle status. The interpreter's heartbeat-timeout
937        // check reads `last_heartbeat_at` to decide whether to fail the
938        // task with `States.HeartbeatTimeout`.
939        let body = req.json_body();
940        let token = body["taskToken"]
941            .as_str()
942            .ok_or_else(|| missing("taskToken"))?
943            .to_string();
944        let mut accounts = self.state.write();
945        let state = accounts.get_or_create(&req.account_id);
946        let entry = state
947            .task_tokens
948            .get_mut(&token)
949            .ok_or_else(|| task_does_not_exist(&token))?;
950        entry.last_heartbeat_at = Some(chrono::Utc::now());
951        Ok(AwsResponse::ok_json(json!({})))
952    }
953
954    fn update_task_token(
955        &self,
956        req: &AwsRequest,
957        new_status: &str,
958    ) -> Result<AwsResponse, AwsServiceError> {
959        let body = req.json_body();
960        let token = body["taskToken"]
961            .as_str()
962            .ok_or_else(|| missing("taskToken"))?
963            .to_string();
964        let mut accounts = self.state.write();
965        let state = accounts.get_or_create(&req.account_id);
966        let entry = state
967            .task_tokens
968            .get_mut(&token)
969            .ok_or_else(|| task_does_not_exist(&token))?;
970        entry.status = new_status.to_string();
971        if new_status == "SUCCEEDED" {
972            entry.output = body["output"].as_str().map(String::from);
973        } else if new_status == "FAILED" {
974            entry.error = body["error"].as_str().map(String::from);
975            entry.cause = body["cause"].as_str().map(String::from);
976        }
977        Ok(AwsResponse::ok_json(json!({})))
978    }
979
980    // ─── State machine versions / aliases ───────────────────────────────
981
982    fn publish_state_machine_version(
983        &self,
984        req: &AwsRequest,
985    ) -> Result<AwsResponse, AwsServiceError> {
986        let body = req.json_body();
987        let arn = body["stateMachineArn"]
988            .as_str()
989            .ok_or_else(|| missing("stateMachineArn"))?
990            .to_string();
991        let description = body["description"].as_str().unwrap_or("").to_string();
992        let mut accounts = self.state.write();
993        let state = accounts.get_or_create(&req.account_id);
994        if !state.state_machines.contains_key(&arn) {
995            return Err(state_machine_not_found(&arn));
996        }
997        let version = state
998            .state_machine_versions
999            .values()
1000            .filter(|v| v.state_machine_arn == arn)
1001            .map(|v| v.version)
1002            .max()
1003            .unwrap_or(0)
1004            + 1;
1005        let version_arn = format!("{arn}:{version}");
1006        let v = crate::state::StateMachineVersion {
1007            state_machine_arn: arn,
1008            version,
1009            revision_id: format!("rev-{version}"),
1010            description,
1011            creation_date: chrono::Utc::now(),
1012        };
1013        state
1014            .state_machine_versions
1015            .insert(version_arn.clone(), v.clone());
1016        Ok(AwsResponse::ok_json(json!({
1017            "stateMachineVersionArn": version_arn,
1018            "creationDate": v.creation_date.timestamp(),
1019        })))
1020    }
1021
1022    fn delete_state_machine_version(
1023        &self,
1024        req: &AwsRequest,
1025    ) -> Result<AwsResponse, AwsServiceError> {
1026        let body = req.json_body();
1027        let arn = body["stateMachineVersionArn"]
1028            .as_str()
1029            .ok_or_else(|| missing("stateMachineVersionArn"))?
1030            .to_string();
1031        let mut accounts = self.state.write();
1032        let state = accounts.get_or_create(&req.account_id);
1033        state.state_machine_versions.remove(&arn);
1034        Ok(AwsResponse::ok_json(json!({})))
1035    }
1036
1037    fn list_state_machine_versions(
1038        &self,
1039        req: &AwsRequest,
1040    ) -> Result<AwsResponse, AwsServiceError> {
1041        let body = req.json_body();
1042        let arn = body["stateMachineArn"]
1043            .as_str()
1044            .ok_or_else(|| missing("stateMachineArn"))?
1045            .to_string();
1046        let accounts = self.state.read();
1047        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1048        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1049        let mut versions: Vec<&crate::state::StateMachineVersion> = state
1050            .state_machine_versions
1051            .values()
1052            .filter(|v| v.state_machine_arn == arn)
1053            .collect();
1054        versions.sort_by_key(|v| std::cmp::Reverse(v.version));
1055        let resp = json!({
1056            "stateMachineVersions": versions.iter().map(|v| json!({
1057                "stateMachineVersionArn": format!("{}:{}", v.state_machine_arn, v.version),
1058                "creationDate": v.creation_date.timestamp(),
1059            })).collect::<Vec<_>>(),
1060        });
1061        Ok(AwsResponse::ok_json(resp))
1062    }
1063
1064    fn create_state_machine_alias(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1065        let body = req.json_body();
1066        let name = body["name"]
1067            .as_str()
1068            .ok_or_else(|| missing("name"))?
1069            .to_string();
1070        validate_name(&name)?;
1071        let routing_cfg = body["routingConfiguration"]
1072            .as_array()
1073            .ok_or_else(|| missing("routingConfiguration"))?;
1074        let routes = parse_routing_configuration(routing_cfg)?;
1075        let parent_arn = routes[0]
1076            .state_machine_version_arn
1077            .rsplit_once(':')
1078            .map(|(parent, _)| parent.to_string())
1079            .unwrap_or_default();
1080        let alias_arn = format!("{parent_arn}:{name}");
1081        let now = chrono::Utc::now();
1082        let alias = crate::state::StateMachineAlias {
1083            name,
1084            arn: alias_arn.clone(),
1085            description: body["description"].as_str().unwrap_or("").to_string(),
1086            routing_configuration: routes,
1087            creation_date: now,
1088            update_date: now,
1089        };
1090        let mut accounts = self.state.write();
1091        let state = accounts.get_or_create(&req.account_id);
1092        state.state_machine_aliases.insert(alias_arn.clone(), alias);
1093        Ok(AwsResponse::ok_json(json!({
1094            "stateMachineAliasArn": alias_arn,
1095            "creationDate": now.timestamp(),
1096        })))
1097    }
1098
1099    fn delete_state_machine_alias(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1100        let body = req.json_body();
1101        let arn = body["stateMachineAliasArn"]
1102            .as_str()
1103            .ok_or_else(|| missing("stateMachineAliasArn"))?
1104            .to_string();
1105        let mut accounts = self.state.write();
1106        let state = accounts.get_or_create(&req.account_id);
1107        state.state_machine_aliases.remove(&arn);
1108        Ok(AwsResponse::ok_json(json!({})))
1109    }
1110
1111    fn describe_state_machine_alias(
1112        &self,
1113        req: &AwsRequest,
1114    ) -> Result<AwsResponse, AwsServiceError> {
1115        let body = req.json_body();
1116        let arn = body["stateMachineAliasArn"]
1117            .as_str()
1118            .ok_or_else(|| missing("stateMachineAliasArn"))?
1119            .to_string();
1120        let accounts = self.state.read();
1121        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1122        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1123        let alias = state
1124            .state_machine_aliases
1125            .get(&arn)
1126            .ok_or_else(|| resource_not_found(&arn))?;
1127        Ok(AwsResponse::ok_json(state_machine_alias_to_json(alias)))
1128    }
1129
1130    fn list_state_machine_aliases(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1131        let body = req.json_body();
1132        let parent = body["stateMachineArn"]
1133            .as_str()
1134            .ok_or_else(|| missing("stateMachineArn"))?
1135            .to_string();
1136        let accounts = self.state.read();
1137        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1138        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1139        // Anchor the prefix on the alias separator so a state machine
1140        // named `foo` doesn't pull in aliases for `foobar`.
1141        let parent_prefix = format!("{parent}:");
1142        let mut aliases: Vec<&crate::state::StateMachineAlias> = state
1143            .state_machine_aliases
1144            .values()
1145            .filter(|a| a.arn.starts_with(&parent_prefix))
1146            .collect();
1147        aliases.sort_by(|a, b| a.name.cmp(&b.name));
1148        Ok(AwsResponse::ok_json(json!({
1149            "stateMachineAliases": aliases.iter().map(|a| json!({
1150                "stateMachineAliasArn": a.arn,
1151                "creationDate": a.creation_date.timestamp(),
1152            })).collect::<Vec<_>>(),
1153        })))
1154    }
1155
1156    fn update_state_machine_alias(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1157        let body = req.json_body();
1158        let arn = body["stateMachineAliasArn"]
1159            .as_str()
1160            .ok_or_else(|| missing("stateMachineAliasArn"))?
1161            .to_string();
1162        let mut accounts = self.state.write();
1163        let state = accounts.get_or_create(&req.account_id);
1164        let alias = state
1165            .state_machine_aliases
1166            .get_mut(&arn)
1167            .ok_or_else(|| resource_not_found(&arn))?;
1168        if let Some(d) = body["description"].as_str() {
1169            alias.description = d.to_string();
1170        }
1171        if let Some(routes) = body["routingConfiguration"].as_array() {
1172            alias.routing_configuration = parse_routing_configuration(routes)?;
1173        }
1174        alias.update_date = chrono::Utc::now();
1175        Ok(AwsResponse::ok_json(json!({
1176            "updateDate": alias.update_date.timestamp(),
1177        })))
1178    }
1179
1180    // ─── Map runs ───────────────────────────────────────────────────────
1181
1182    fn describe_map_run(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1183        let body = req.json_body();
1184        let arn = body["mapRunArn"]
1185            .as_str()
1186            .ok_or_else(|| missing("mapRunArn"))?
1187            .to_string();
1188        let accounts = self.state.read();
1189        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1190        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1191        let mr = state
1192            .map_runs
1193            .get(&arn)
1194            .ok_or_else(|| resource_not_found(&arn))?;
1195        Ok(AwsResponse::ok_json(map_run_to_json(mr)))
1196    }
1197
1198    fn list_map_runs(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1199        let body = req.json_body();
1200        let exec_arn = body["executionArn"].as_str().map(String::from);
1201        let accounts = self.state.read();
1202        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1203        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1204        let runs: Vec<&crate::state::MapRun> = state
1205            .map_runs
1206            .values()
1207            .filter(|r| exec_arn.as_deref().is_none_or(|e| r.execution_arn == e))
1208            .collect();
1209        Ok(AwsResponse::ok_json(json!({
1210            "mapRuns": runs.iter().map(|r| json!({
1211                "mapRunArn": r.map_run_arn,
1212                "executionArn": r.execution_arn,
1213                "stateMachineArn": "",
1214                "startDate": r.start_date.timestamp(),
1215                "stopDate": r.stop_date.map(|d| d.timestamp()),
1216            })).collect::<Vec<_>>(),
1217        })))
1218    }
1219
1220    fn update_map_run(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1221        let body = req.json_body();
1222        let arn = body["mapRunArn"]
1223            .as_str()
1224            .ok_or_else(|| missing("mapRunArn"))?
1225            .to_string();
1226        let mut accounts = self.state.write();
1227        let state = accounts.get_or_create(&req.account_id);
1228        let mr = state
1229            .map_runs
1230            .get_mut(&arn)
1231            .ok_or_else(|| resource_not_found(&arn))?;
1232        if let Some(c) = body["maxConcurrency"].as_i64() {
1233            mr.max_concurrency = c as i32;
1234        }
1235        if let Some(p) = body["toleratedFailurePercentage"].as_f64() {
1236            mr.tolerated_failure_percentage = p;
1237        }
1238        if let Some(c) = body["toleratedFailureCount"].as_i64() {
1239            mr.tolerated_failure_count = c;
1240        }
1241        Ok(AwsResponse::ok_json(json!({})))
1242    }
1243
1244    // ─── Execution lifecycle extras ─────────────────────────────────────
1245
1246    fn redrive_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1247        let body = req.json_body();
1248        let arn = body["executionArn"]
1249            .as_str()
1250            .ok_or_else(|| missing("executionArn"))?
1251            .to_string();
1252        let mut accounts = self.state.write();
1253        let state = accounts.get_or_create(&req.account_id);
1254        let exec = state.executions.get_mut(&arn).ok_or_else(|| {
1255            AwsServiceError::aws_error(
1256                StatusCode::BAD_REQUEST,
1257                "ExecutionDoesNotExist",
1258                format!("Execution does not exist: {arn}"),
1259            )
1260        })?;
1261        exec.status = crate::state::ExecutionStatus::Running;
1262        exec.stop_date = None;
1263        Ok(AwsResponse::ok_json(json!({
1264            "redriveDate": chrono::Utc::now().timestamp(),
1265        })))
1266    }
1267
1268    fn start_sync_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1269        let body = req.json_body();
1270        let sm_arn = body["stateMachineArn"]
1271            .as_str()
1272            .ok_or_else(|| missing("stateMachineArn"))?
1273            .to_string();
1274        let input = body["input"].as_str().unwrap_or("{}").to_string();
1275        if serde_json::from_str::<serde_json::Value>(&input).is_err() {
1276            return Err(AwsServiceError::aws_error(
1277                StatusCode::BAD_REQUEST,
1278                "InvalidExecutionInput",
1279                "Execution input is not valid JSON.",
1280            ));
1281        }
1282        let mut accounts = self.state.write();
1283        let state = accounts.get_or_create(&req.account_id);
1284        let sm = state
1285            .state_machines
1286            .get(&sm_arn)
1287            .ok_or_else(|| state_machine_not_found(&sm_arn))?;
1288        if sm.machine_type != crate::state::StateMachineType::Express {
1289            return Err(AwsServiceError::aws_error(
1290                StatusCode::BAD_REQUEST,
1291                "StateMachineTypeNotSupported",
1292                "StartSyncExecution is only supported for EXPRESS state machines.",
1293            ));
1294        }
1295        let now = chrono::Utc::now();
1296        let exec_arn = format!(
1297            "arn:aws:states:{}:{}:express:{}:sync-{}",
1298            state.region,
1299            state.account_id,
1300            sm.name,
1301            now.timestamp_millis()
1302        );
1303        Ok(AwsResponse::ok_json(json!({
1304            "executionArn": exec_arn,
1305            "stateMachineArn": sm_arn,
1306            "name": "sync",
1307            "startDate": now.timestamp(),
1308            "stopDate": now.timestamp(),
1309            "status": "SUCCEEDED",
1310            "input": input,
1311            "output": "{}",
1312            "billingDetails": {
1313                "billedMemoryUsedInMB": 64,
1314                "billedDurationInMilliseconds": 1,
1315            },
1316        })))
1317    }
1318
1319    fn test_state(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1320        let body = req.json_body();
1321        let definition = body["definition"]
1322            .as_str()
1323            .ok_or_else(|| missing("definition"))?;
1324        validate_definition(definition)?;
1325        let _role_arn = body["roleArn"].as_str().ok_or_else(|| missing("roleArn"))?;
1326        let input = body["input"].as_str().unwrap_or("{}").to_string();
1327        // Echo input back as output. Real Step Functions actually
1328        // simulates the state; our emulator reports SUCCEEDED so callers
1329        // can wire the integration test scaffolding.
1330        Ok(AwsResponse::ok_json(json!({
1331            "output": input,
1332            "status": "SUCCEEDED",
1333            "nextState": "End",
1334        })))
1335    }
1336
1337    fn validate_state_machine_definition(
1338        &self,
1339        req: &AwsRequest,
1340    ) -> Result<AwsResponse, AwsServiceError> {
1341        let body = req.json_body();
1342        let definition = body["definition"]
1343            .as_str()
1344            .ok_or_else(|| missing("definition"))?;
1345        match validate_definition(definition) {
1346            Ok(()) => Ok(AwsResponse::ok_json(json!({
1347                "result": "OK",
1348                "diagnostics": [],
1349            }))),
1350            Err(e) => Ok(AwsResponse::ok_json(json!({
1351                "result": "FAIL",
1352                "diagnostics": [{
1353                    "severity": "ERROR",
1354                    "code": "INVALID_DEFINITION",
1355                    "message": e.to_string(),
1356                }],
1357            }))),
1358        }
1359    }
1360}
1361
1362fn state_machine_alias_to_json(alias: &crate::state::StateMachineAlias) -> Value {
1363    json!({
1364        "stateMachineAliasArn": alias.arn,
1365        "name": alias.name,
1366        "description": alias.description,
1367        "routingConfiguration": alias.routing_configuration.iter().map(|r| json!({
1368            "stateMachineVersionArn": r.state_machine_version_arn,
1369            "weight": r.weight,
1370        })).collect::<Vec<_>>(),
1371        "creationDate": alias.creation_date.timestamp(),
1372        "updateDate": alias.update_date.timestamp(),
1373    })
1374}
1375
1376fn map_run_to_json(mr: &crate::state::MapRun) -> Value {
1377    json!({
1378        "mapRunArn": mr.map_run_arn,
1379        "executionArn": mr.execution_arn,
1380        "maxConcurrency": mr.max_concurrency,
1381        "toleratedFailurePercentage": mr.tolerated_failure_percentage,
1382        "toleratedFailureCount": mr.tolerated_failure_count,
1383        "status": mr.status,
1384        "startDate": mr.start_date.timestamp(),
1385        "stopDate": mr.stop_date.map(|d| d.timestamp()),
1386    })
1387}
1388
1389// ─── Helpers ────────────────────────────────────────────────────────────
1390
1391fn state_machine_to_json(sm: &StateMachine) -> Value {
1392    let mut resp = json!({
1393        "name": sm.name,
1394        "stateMachineArn": sm.arn,
1395        "definition": sm.definition,
1396        "roleArn": sm.role_arn,
1397        "type": sm.machine_type.as_str(),
1398        "status": sm.status.as_str(),
1399        "creationDate": sm.creation_date.timestamp() as f64,
1400        "updateDate": sm.update_date.timestamp() as f64,
1401        "revisionId": sm.revision_id,
1402        "label": sm.name,
1403    });
1404
1405    if !sm.description.is_empty() {
1406        resp["description"] = json!(sm.description);
1407    }
1408
1409    if let Some(ref logging) = sm.logging_configuration {
1410        resp["loggingConfiguration"] = logging.clone();
1411    } else {
1412        resp["loggingConfiguration"] = json!({
1413            "level": "OFF",
1414            "includeExecutionData": false,
1415            "destinations": [],
1416        });
1417    }
1418
1419    if let Some(ref tracing) = sm.tracing_configuration {
1420        resp["tracingConfiguration"] = tracing.clone();
1421    } else {
1422        resp["tracingConfiguration"] = json!({
1423            "enabled": false,
1424        });
1425    }
1426
1427    resp
1428}
1429
1430fn missing(name: &str) -> AwsServiceError {
1431    AwsServiceError::aws_error(
1432        StatusCode::BAD_REQUEST,
1433        "ValidationException",
1434        format!("The request must contain the parameter {name}."),
1435    )
1436}
1437
1438fn state_machine_not_found(arn: &str) -> AwsServiceError {
1439    AwsServiceError::aws_error(
1440        StatusCode::BAD_REQUEST,
1441        "StateMachineDoesNotExist",
1442        format!("State Machine Does Not Exist: '{arn}'"),
1443    )
1444}
1445
1446fn activity_not_found(arn: &str) -> AwsServiceError {
1447    AwsServiceError::aws_error(
1448        StatusCode::BAD_REQUEST,
1449        "ActivityDoesNotExist",
1450        format!("Activity does not exist: {arn}"),
1451    )
1452}
1453
1454fn task_does_not_exist(token: &str) -> AwsServiceError {
1455    AwsServiceError::aws_error(
1456        StatusCode::BAD_REQUEST,
1457        "TaskDoesNotExist",
1458        format!("Task does not exist: {token}"),
1459    )
1460}
1461
1462fn resource_not_found(arn: &str) -> AwsServiceError {
1463    AwsServiceError::aws_error(
1464        StatusCode::BAD_REQUEST,
1465        "ResourceNotFound",
1466        format!("Resource not found: '{arn}'"),
1467    )
1468}
1469
1470/// Parse + validate an alias `routingConfiguration` array.
1471///
1472/// AWS rules: 1 or 2 routes; weights are 0-100 and sum to 100; each
1473/// route must include `stateMachineVersionArn`.
1474fn parse_routing_configuration(
1475    routes: &[serde_json::Value],
1476) -> Result<Vec<crate::state::AliasRoute>, AwsServiceError> {
1477    if routes.is_empty() || routes.len() > 2 {
1478        return Err(AwsServiceError::aws_error(
1479            StatusCode::BAD_REQUEST,
1480            "ValidationException",
1481            "routingConfiguration must contain 1 or 2 routes.",
1482        ));
1483    }
1484    let parsed: Vec<crate::state::AliasRoute> = routes
1485        .iter()
1486        .map(|r| {
1487            let arn = r["stateMachineVersionArn"].as_str().ok_or_else(|| {
1488                AwsServiceError::aws_error(
1489                    StatusCode::BAD_REQUEST,
1490                    "ValidationException",
1491                    "routingConfiguration entries must contain stateMachineVersionArn.",
1492                )
1493            })?;
1494            let weight = r["weight"].as_i64().ok_or_else(|| {
1495                AwsServiceError::aws_error(
1496                    StatusCode::BAD_REQUEST,
1497                    "ValidationException",
1498                    "routingConfiguration entries must contain a numeric weight.",
1499                )
1500            })?;
1501            if !(0..=100).contains(&weight) {
1502                return Err(AwsServiceError::aws_error(
1503                    StatusCode::BAD_REQUEST,
1504                    "ValidationException",
1505                    format!("Invalid routing weight {weight}; must be 0-100."),
1506                ));
1507            }
1508            Ok(crate::state::AliasRoute {
1509                state_machine_version_arn: arn.to_string(),
1510                weight: weight as i32,
1511            })
1512        })
1513        .collect::<Result<_, _>>()?;
1514    let total: i32 = parsed.iter().map(|r| r.weight).sum();
1515    if total != 100 {
1516        return Err(AwsServiceError::aws_error(
1517            StatusCode::BAD_REQUEST,
1518            "ValidationException",
1519            format!("routingConfiguration weights must sum to 100, got {total}."),
1520        ));
1521    }
1522    Ok(parsed)
1523}
1524
1525fn validate_name(name: &str) -> Result<(), AwsServiceError> {
1526    if name.is_empty() || name.len() > 80 {
1527        return Err(AwsServiceError::aws_error(
1528            StatusCode::BAD_REQUEST,
1529            "InvalidName",
1530            format!("Invalid Name: '{name}' (length must be between 1 and 80 characters)"),
1531        ));
1532    }
1533    // Only allow alphanumeric, hyphens, and underscores
1534    if !name
1535        .chars()
1536        .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
1537    {
1538        return Err(AwsServiceError::aws_error(
1539            StatusCode::BAD_REQUEST,
1540            "InvalidName",
1541            format!(
1542                "Invalid Name: '{name}' (must only contain alphanumeric characters, hyphens, and underscores)"
1543            ),
1544        ));
1545    }
1546    Ok(())
1547}
1548
1549fn validate_definition(definition: &str) -> Result<(), AwsServiceError> {
1550    let parsed: Value = serde_json::from_str(definition).map_err(|e| {
1551        AwsServiceError::aws_error(
1552            StatusCode::BAD_REQUEST,
1553            "InvalidDefinition",
1554            format!("Invalid State Machine Definition: '{e}'"),
1555        )
1556    })?;
1557
1558    if parsed.get("StartAt").and_then(|v| v.as_str()).is_none() {
1559        return Err(AwsServiceError::aws_error(
1560            StatusCode::BAD_REQUEST,
1561            "InvalidDefinition",
1562            "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
1563                .to_string(),
1564        ));
1565    }
1566
1567    let states_obj = parsed
1568        .get("States")
1569        .and_then(|v| v.as_object())
1570        .ok_or_else(|| {
1571            AwsServiceError::aws_error(
1572                StatusCode::BAD_REQUEST,
1573                "InvalidDefinition",
1574                "Invalid State Machine Definition: 'MISSING_STATES' (States field is required)"
1575                    .to_string(),
1576            )
1577        })?;
1578
1579    let start_at = parsed["StartAt"].as_str().ok_or_else(|| {
1580        AwsServiceError::aws_error(
1581            StatusCode::BAD_REQUEST,
1582            "InvalidDefinition",
1583            "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
1584                .to_string(),
1585        )
1586    })?;
1587    if !states_obj.contains_key(start_at) {
1588        return Err(AwsServiceError::aws_error(
1589            StatusCode::BAD_REQUEST,
1590            "InvalidDefinition",
1591            format!(
1592                "Invalid State Machine Definition: 'MISSING_TRANSITION_TARGET' \
1593                 (StartAt '{start_at}' does not reference a valid state)"
1594            ),
1595        ));
1596    }
1597
1598    Ok(())
1599}
1600
1601fn execution_not_found(arn: &str) -> AwsServiceError {
1602    AwsServiceError::aws_error(
1603        StatusCode::BAD_REQUEST,
1604        "ExecutionDoesNotExist",
1605        format!("Execution Does Not Exist: '{arn}'"),
1606    )
1607}
1608
1609fn execution_to_json(exec: &Execution) -> Value {
1610    let mut resp = json!({
1611        "executionArn": exec.execution_arn,
1612        "stateMachineArn": exec.state_machine_arn,
1613        "name": exec.name,
1614        "status": exec.status.as_str(),
1615        "startDate": exec.start_date.timestamp() as f64,
1616    });
1617
1618    if let Some(ref input) = exec.input {
1619        resp["input"] = json!(input);
1620    }
1621    if let Some(ref output) = exec.output {
1622        resp["output"] = json!(output);
1623    }
1624    if let Some(stop) = exec.stop_date {
1625        resp["stopDate"] = json!(stop.timestamp() as f64);
1626    }
1627    if let Some(ref error) = exec.error {
1628        resp["error"] = json!(error);
1629    }
1630    if let Some(ref cause) = exec.cause {
1631        resp["cause"] = json!(cause);
1632    }
1633
1634    resp
1635}
1636
1637/// Convert event type like "PassStateEntered" to the details key format "passStateEntered".
1638fn camel_to_details_key(event_type: &str) -> String {
1639    let mut chars = event_type.chars();
1640    match chars.next() {
1641        None => String::new(),
1642        Some(c) => c.to_lowercase().to_string() + chars.as_str(),
1643    }
1644}
1645
1646fn validate_arn(arn: &str) -> Result<(), AwsServiceError> {
1647    if !arn.starts_with("arn:") {
1648        return Err(AwsServiceError::aws_error(
1649            StatusCode::BAD_REQUEST,
1650            "InvalidArn",
1651            format!("Invalid Arn: '{arn}'"),
1652        ));
1653    }
1654    Ok(())
1655}
1656
1657/// Start a Step Functions execution from a cross-service delivery (e.g. EventBridge).
1658///
1659/// This is the public entry point used by `StepFunctionsDeliveryImpl` in the server crate.
1660/// It mirrors the logic from `StartExecution` but without the AWS request/response wrapper.
1661/// Start a Step Functions execution from a cross-service delivery (e.g. EventBridge).
1662///
1663/// This is the public entry point used by `StepFunctionsDeliveryImpl` in the server crate.
1664/// It mirrors the logic from `StartExecution` but without the AWS request/response wrapper.
1665pub fn start_execution_from_delivery(
1666    state: &SharedStepFunctionsState,
1667    delivery: &Option<Arc<DeliveryBus>>,
1668    dynamodb_state: &Option<SharedDynamoDbState>,
1669    state_machine_arn: &str,
1670    input: &str,
1671) {
1672    // Validate input is valid JSON
1673    if serde_json::from_str::<serde_json::Value>(input).is_err() {
1674        tracing::warn!(
1675            state_machine_arn,
1676            "Step Functions delivery: invalid JSON input, skipping execution"
1677        );
1678        return;
1679    }
1680
1681    let execution_name = uuid::Uuid::new_v4().to_string();
1682
1683    // Extract account_id from the state machine ARN
1684    let account_id = state_machine_arn
1685        .split(':')
1686        .nth(4)
1687        .unwrap_or("000000000000")
1688        .to_string();
1689
1690    let mut accounts = state.write();
1691    let st = accounts.get_or_create(&account_id);
1692    let sm = match st.state_machines.get(state_machine_arn) {
1693        Some(sm) => sm,
1694        None => {
1695            tracing::warn!(
1696                state_machine_arn,
1697                "Step Functions delivery: state machine not found"
1698            );
1699            return;
1700        }
1701    };
1702
1703    let sm_name = sm.name.clone();
1704    let definition = sm.definition.clone();
1705    let exec_arn = st.execution_arn(&sm_name, &execution_name);
1706
1707    let now = Utc::now();
1708    let execution = Execution {
1709        execution_arn: exec_arn.clone(),
1710        state_machine_arn: state_machine_arn.to_string(),
1711        state_machine_name: sm_name,
1712        name: execution_name,
1713        status: ExecutionStatus::Running,
1714        input: Some(input.to_string()),
1715        output: None,
1716        start_date: now,
1717        stop_date: None,
1718        error: None,
1719        cause: None,
1720        history_events: vec![],
1721    };
1722
1723    st.executions.insert(exec_arn.clone(), execution);
1724    drop(accounts);
1725
1726    let shared_state = state.clone();
1727    let delivery = delivery.clone();
1728    let dynamodb_state = dynamodb_state.clone();
1729    let input = Some(input.to_string());
1730    tokio::spawn(async move {
1731        interpreter::execute_state_machine(
1732            shared_state,
1733            exec_arn,
1734            definition,
1735            input,
1736            delivery,
1737            dynamodb_state,
1738        )
1739        .await;
1740    });
1741}
1742
1743#[cfg(test)]
1744mod tests {
1745    use super::*;
1746    use http::{HeaderMap, Method};
1747    use parking_lot::RwLock;
1748    use serde_json::Value;
1749    use std::sync::Arc;
1750
1751    fn make_state() -> SharedStepFunctionsState {
1752        Arc::new(RwLock::new(
1753            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
1754        ))
1755    }
1756
1757    fn make_request(action: &str, body: &str) -> AwsRequest {
1758        AwsRequest {
1759            service: "states".to_string(),
1760            action: action.to_string(),
1761            region: "us-east-1".to_string(),
1762            account_id: "123456789012".to_string(),
1763            request_id: "test-id".to_string(),
1764            headers: HeaderMap::new(),
1765            query_params: HashMap::new(),
1766            body: body.as_bytes().to_vec().into(),
1767            body_stream: parking_lot::Mutex::new(None),
1768            path_segments: vec![],
1769            raw_path: "/".to_string(),
1770            raw_query: String::new(),
1771            method: Method::POST,
1772            is_query_protocol: false,
1773            access_key_id: None,
1774            principal: None,
1775        }
1776    }
1777
1778    fn body_json(resp: &AwsResponse) -> Value {
1779        serde_json::from_slice(resp.body.expect_bytes()).unwrap()
1780    }
1781
1782    fn expect_err(result: Result<AwsResponse, AwsServiceError>) -> AwsServiceError {
1783        match result {
1784            Err(e) => e,
1785            Ok(_) => panic!("expected error, got Ok"),
1786        }
1787    }
1788
1789    const VALID_DEF: &str = r#"{"StartAt":"Pass","States":{"Pass":{"Type":"Pass","End":true}}}"#;
1790
1791    fn create_sm(svc: &StepFunctionsService, name: &str) -> String {
1792        let body = json!({
1793            "name": name,
1794            "definition": VALID_DEF,
1795            "roleArn": "arn:aws:iam::123456789012:role/test",
1796        });
1797        let req = make_request("CreateStateMachine", &body.to_string());
1798        let resp = svc.create_state_machine(&req).unwrap();
1799        let b = body_json(&resp);
1800        b["stateMachineArn"].as_str().unwrap().to_string()
1801    }
1802
1803    // ── CreateStateMachine ──
1804
1805    #[test]
1806    fn create_state_machine_basic() {
1807        let svc = StepFunctionsService::new(make_state());
1808        let arn = create_sm(&svc, "test-sm");
1809        assert!(arn.contains("test-sm"));
1810    }
1811
1812    #[test]
1813    fn create_state_machine_with_express_type() {
1814        let svc = StepFunctionsService::new(make_state());
1815        let body = json!({
1816            "name": "express-sm",
1817            "definition": VALID_DEF,
1818            "roleArn": "arn:aws:iam::123456789012:role/r",
1819            "type": "EXPRESS",
1820        });
1821        let req = make_request("CreateStateMachine", &body.to_string());
1822        let resp = svc.create_state_machine(&req).unwrap();
1823        let b = body_json(&resp);
1824        assert!(b["stateMachineArn"].as_str().is_some());
1825    }
1826
1827    #[test]
1828    fn create_state_machine_duplicate_fails() {
1829        let svc = StepFunctionsService::new(make_state());
1830        create_sm(&svc, "dup-sm");
1831        let body = json!({
1832            "name": "dup-sm",
1833            "definition": VALID_DEF,
1834            "roleArn": "arn:aws:iam::123456789012:role/r",
1835        });
1836        let req = make_request("CreateStateMachine", &body.to_string());
1837        let err = expect_err(svc.create_state_machine(&req));
1838        assert!(err.to_string().contains("StateMachineAlreadyExists"));
1839    }
1840
1841    #[test]
1842    fn create_state_machine_missing_name() {
1843        let svc = StepFunctionsService::new(make_state());
1844        let body = json!({
1845            "definition": VALID_DEF,
1846            "roleArn": "arn:aws:iam::123456789012:role/r",
1847        });
1848        let req = make_request("CreateStateMachine", &body.to_string());
1849        assert!(svc.create_state_machine(&req).is_err());
1850    }
1851
1852    #[test]
1853    fn create_state_machine_invalid_definition() {
1854        let svc = StepFunctionsService::new(make_state());
1855        let body = json!({
1856            "name": "bad-def",
1857            "definition": "not json",
1858            "roleArn": "arn:aws:iam::123456789012:role/r",
1859        });
1860        let req = make_request("CreateStateMachine", &body.to_string());
1861        let err = expect_err(svc.create_state_machine(&req));
1862        assert!(err.to_string().contains("InvalidDefinition"));
1863    }
1864
1865    #[test]
1866    fn create_state_machine_definition_missing_start_at() {
1867        let svc = StepFunctionsService::new(make_state());
1868        let body = json!({
1869            "name": "no-start",
1870            "definition": r#"{"States":{"S":{"Type":"Pass","End":true}}}"#,
1871            "roleArn": "arn:aws:iam::123456789012:role/r",
1872        });
1873        let req = make_request("CreateStateMachine", &body.to_string());
1874        let err = expect_err(svc.create_state_machine(&req));
1875        assert!(err.to_string().contains("InvalidDefinition"));
1876    }
1877
1878    #[test]
1879    fn create_state_machine_definition_missing_states() {
1880        let svc = StepFunctionsService::new(make_state());
1881        let body = json!({
1882            "name": "no-states",
1883            "definition": r#"{"StartAt":"S"}"#,
1884            "roleArn": "arn:aws:iam::123456789012:role/r",
1885        });
1886        let req = make_request("CreateStateMachine", &body.to_string());
1887        let err = expect_err(svc.create_state_machine(&req));
1888        assert!(err.to_string().contains("InvalidDefinition"));
1889    }
1890
1891    #[test]
1892    fn create_state_machine_definition_start_at_not_in_states() {
1893        let svc = StepFunctionsService::new(make_state());
1894        let body = json!({
1895            "name": "bad-start",
1896            "definition": r#"{"StartAt":"Missing","States":{"S":{"Type":"Pass","End":true}}}"#,
1897            "roleArn": "arn:aws:iam::123456789012:role/r",
1898        });
1899        let req = make_request("CreateStateMachine", &body.to_string());
1900        let err = expect_err(svc.create_state_machine(&req));
1901        assert!(err.to_string().contains("MISSING_TRANSITION_TARGET"));
1902    }
1903
1904    #[test]
1905    fn create_state_machine_invalid_type() {
1906        let svc = StepFunctionsService::new(make_state());
1907        let body = json!({
1908            "name": "bad-type",
1909            "definition": VALID_DEF,
1910            "roleArn": "arn:aws:iam::123456789012:role/r",
1911            "type": "INVALID",
1912        });
1913        let req = make_request("CreateStateMachine", &body.to_string());
1914        assert!(svc.create_state_machine(&req).is_err());
1915    }
1916
1917    #[test]
1918    fn create_state_machine_invalid_arn() {
1919        let svc = StepFunctionsService::new(make_state());
1920        let body = json!({
1921            "name": "bad-arn",
1922            "definition": VALID_DEF,
1923            "roleArn": "not-an-arn",
1924        });
1925        let req = make_request("CreateStateMachine", &body.to_string());
1926        let err = expect_err(svc.create_state_machine(&req));
1927        assert!(err.to_string().contains("InvalidArn"));
1928    }
1929
1930    #[test]
1931    fn create_state_machine_invalid_name() {
1932        let svc = StepFunctionsService::new(make_state());
1933        let body = json!({
1934            "name": "has spaces!",
1935            "definition": VALID_DEF,
1936            "roleArn": "arn:aws:iam::123456789012:role/r",
1937        });
1938        let req = make_request("CreateStateMachine", &body.to_string());
1939        let err = expect_err(svc.create_state_machine(&req));
1940        assert!(err.to_string().contains("InvalidName"));
1941    }
1942
1943    #[test]
1944    fn create_state_machine_name_too_long() {
1945        let svc = StepFunctionsService::new(make_state());
1946        let long_name = "a".repeat(81);
1947        let body = json!({
1948            "name": long_name,
1949            "definition": VALID_DEF,
1950            "roleArn": "arn:aws:iam::123456789012:role/r",
1951        });
1952        let req = make_request("CreateStateMachine", &body.to_string());
1953        let err = expect_err(svc.create_state_machine(&req));
1954        assert!(err.to_string().contains("InvalidName"));
1955    }
1956
1957    // ── DescribeStateMachine ──
1958
1959    #[test]
1960    fn describe_state_machine_found() {
1961        let svc = StepFunctionsService::new(make_state());
1962        let arn = create_sm(&svc, "desc-sm");
1963
1964        let req = make_request(
1965            "DescribeStateMachine",
1966            &json!({"stateMachineArn": arn}).to_string(),
1967        );
1968        let resp = svc.describe_state_machine(&req).unwrap();
1969        let b = body_json(&resp);
1970        assert_eq!(b["name"], "desc-sm");
1971        assert_eq!(b["status"], "ACTIVE");
1972        assert!(b["definition"].as_str().is_some());
1973    }
1974
1975    #[test]
1976    fn describe_state_machine_not_found() {
1977        let svc = StepFunctionsService::new(make_state());
1978        let req = make_request(
1979            "DescribeStateMachine",
1980            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1981                .to_string(),
1982        );
1983        let err = expect_err(svc.describe_state_machine(&req));
1984        assert!(err.to_string().contains("StateMachineDoesNotExist"));
1985    }
1986
1987    // ── ListStateMachines ──
1988
1989    #[test]
1990    fn list_state_machines_empty() {
1991        let svc = StepFunctionsService::new(make_state());
1992        let req = make_request("ListStateMachines", "{}");
1993        let resp = svc.list_state_machines(&req).unwrap();
1994        let b = body_json(&resp);
1995        assert!(b["stateMachines"].as_array().unwrap().is_empty());
1996    }
1997
1998    #[test]
1999    fn list_state_machines_returns_created() {
2000        let svc = StepFunctionsService::new(make_state());
2001        create_sm(&svc, "sm-1");
2002        create_sm(&svc, "sm-2");
2003
2004        let req = make_request("ListStateMachines", "{}");
2005        let resp = svc.list_state_machines(&req).unwrap();
2006        let b = body_json(&resp);
2007        assert_eq!(b["stateMachines"].as_array().unwrap().len(), 2);
2008    }
2009
2010    // ── DeleteStateMachine ──
2011
2012    #[test]
2013    fn delete_state_machine() {
2014        let svc = StepFunctionsService::new(make_state());
2015        let arn = create_sm(&svc, "del-sm");
2016
2017        let req = make_request(
2018            "DeleteStateMachine",
2019            &json!({"stateMachineArn": arn}).to_string(),
2020        );
2021        svc.delete_state_machine(&req).unwrap();
2022
2023        // Describe should fail
2024        let req = make_request(
2025            "DescribeStateMachine",
2026            &json!({"stateMachineArn": arn}).to_string(),
2027        );
2028        assert!(svc.describe_state_machine(&req).is_err());
2029    }
2030
2031    #[test]
2032    fn delete_state_machine_nonexistent_succeeds() {
2033        let svc = StepFunctionsService::new(make_state());
2034        let req = make_request(
2035            "DeleteStateMachine",
2036            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
2037                .to_string(),
2038        );
2039        // AWS returns success even for nonexistent
2040        svc.delete_state_machine(&req).unwrap();
2041    }
2042
2043    // ── UpdateStateMachine ──
2044
2045    #[test]
2046    fn update_state_machine() {
2047        let svc = StepFunctionsService::new(make_state());
2048        let arn = create_sm(&svc, "upd-sm");
2049
2050        let new_def = r#"{"StartAt":"NewPass","States":{"NewPass":{"Type":"Pass","End":true}}}"#;
2051        let body = json!({
2052            "stateMachineArn": arn,
2053            "definition": new_def,
2054            "description": "updated",
2055        });
2056        let req = make_request("UpdateStateMachine", &body.to_string());
2057        let resp = svc.update_state_machine(&req).unwrap();
2058        let b = body_json(&resp);
2059        assert!(b["updateDate"].as_f64().is_some());
2060
2061        // Verify
2062        let req = make_request(
2063            "DescribeStateMachine",
2064            &json!({"stateMachineArn": arn}).to_string(),
2065        );
2066        let resp = svc.describe_state_machine(&req).unwrap();
2067        let b = body_json(&resp);
2068        assert!(b["definition"].as_str().unwrap().contains("NewPass"));
2069        assert_eq!(b["description"], "updated");
2070    }
2071
2072    #[test]
2073    fn update_state_machine_not_found() {
2074        let svc = StepFunctionsService::new(make_state());
2075        let body = json!({
2076            "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
2077            "definition": VALID_DEF,
2078        });
2079        let req = make_request("UpdateStateMachine", &body.to_string());
2080        let err = expect_err(svc.update_state_machine(&req));
2081        assert!(err.to_string().contains("StateMachineDoesNotExist"));
2082    }
2083
2084    // ── StartExecution ──
2085
2086    #[tokio::test]
2087    async fn start_execution_basic() {
2088        let svc = StepFunctionsService::new(make_state());
2089        let arn = create_sm(&svc, "exec-sm");
2090
2091        let body = json!({
2092            "stateMachineArn": arn,
2093            "input": r#"{"key":"value"}"#,
2094        });
2095        let req = make_request("StartExecution", &body.to_string());
2096        let resp = svc.start_execution(&req).unwrap();
2097        let b = body_json(&resp);
2098        assert!(b["executionArn"].as_str().is_some());
2099        assert!(b["startDate"].as_f64().is_some());
2100    }
2101
2102    #[tokio::test]
2103    async fn start_execution_with_name() {
2104        let svc = StepFunctionsService::new(make_state());
2105        let arn = create_sm(&svc, "named-exec");
2106
2107        let body = json!({
2108            "stateMachineArn": arn,
2109            "name": "my-execution",
2110        });
2111        let req = make_request("StartExecution", &body.to_string());
2112        let resp = svc.start_execution(&req).unwrap();
2113        let b = body_json(&resp);
2114        assert!(b["executionArn"].as_str().unwrap().contains("my-execution"));
2115    }
2116
2117    #[tokio::test]
2118    async fn start_execution_sm_not_found() {
2119        let svc = StepFunctionsService::new(make_state());
2120        let body = json!({
2121            "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
2122        });
2123        let req = make_request("StartExecution", &body.to_string());
2124        let err = expect_err(svc.start_execution(&req));
2125        assert!(err.to_string().contains("StateMachineDoesNotExist"));
2126    }
2127
2128    #[tokio::test]
2129    async fn start_execution_invalid_input() {
2130        let svc = StepFunctionsService::new(make_state());
2131        let arn = create_sm(&svc, "bad-input");
2132
2133        let body = json!({
2134            "stateMachineArn": arn,
2135            "input": "not json",
2136        });
2137        let req = make_request("StartExecution", &body.to_string());
2138        let err = expect_err(svc.start_execution(&req));
2139        assert!(err.to_string().contains("InvalidExecutionInput"));
2140    }
2141
2142    #[tokio::test]
2143    async fn start_execution_duplicate_name() {
2144        let svc = StepFunctionsService::new(make_state());
2145        let arn = create_sm(&svc, "dup-exec");
2146
2147        let body = json!({
2148            "stateMachineArn": arn,
2149            "name": "same-name",
2150        });
2151        let req = make_request("StartExecution", &body.to_string());
2152        svc.start_execution(&req).unwrap();
2153
2154        let req = make_request("StartExecution", &body.to_string());
2155        let err = expect_err(svc.start_execution(&req));
2156        assert!(err.to_string().contains("ExecutionAlreadyExists"));
2157    }
2158
2159    // ── DescribeExecution ──
2160
2161    #[tokio::test]
2162    async fn describe_execution_found() {
2163        let svc = StepFunctionsService::new(make_state());
2164        let sm_arn = create_sm(&svc, "desc-exec");
2165
2166        let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
2167        let req = make_request("StartExecution", &body.to_string());
2168        let resp = svc.start_execution(&req).unwrap();
2169        let exec_arn = body_json(&resp)["executionArn"]
2170            .as_str()
2171            .unwrap()
2172            .to_string();
2173
2174        let req = make_request(
2175            "DescribeExecution",
2176            &json!({"executionArn": exec_arn}).to_string(),
2177        );
2178        let resp = svc.describe_execution(&req).unwrap();
2179        let b = body_json(&resp);
2180        assert_eq!(b["name"], "e1");
2181        assert_eq!(b["status"], "RUNNING");
2182    }
2183
2184    #[tokio::test]
2185    async fn describe_execution_not_found() {
2186        let svc = StepFunctionsService::new(make_state());
2187        let req = make_request(
2188            "DescribeExecution",
2189            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
2190                .to_string(),
2191        );
2192        let err = expect_err(svc.describe_execution(&req));
2193        assert!(err.to_string().contains("ExecutionDoesNotExist"));
2194    }
2195
2196    // ── StopExecution ──
2197
2198    #[tokio::test]
2199    async fn stop_execution() {
2200        let svc = StepFunctionsService::new(make_state());
2201        let sm_arn = create_sm(&svc, "stop-sm");
2202
2203        let body = json!({"stateMachineArn": sm_arn, "name": "stop-e"});
2204        let req = make_request("StartExecution", &body.to_string());
2205        let resp = svc.start_execution(&req).unwrap();
2206        let exec_arn = body_json(&resp)["executionArn"]
2207            .as_str()
2208            .unwrap()
2209            .to_string();
2210
2211        let body = json!({
2212            "executionArn": exec_arn,
2213            "error": "UserAborted",
2214            "cause": "test stop",
2215        });
2216        let req = make_request("StopExecution", &body.to_string());
2217        let resp = svc.stop_execution(&req).unwrap();
2218        let b = body_json(&resp);
2219        assert!(b["stopDate"].as_f64().is_some());
2220
2221        // Verify aborted
2222        let req = make_request(
2223            "DescribeExecution",
2224            &json!({"executionArn": exec_arn}).to_string(),
2225        );
2226        let resp = svc.describe_execution(&req).unwrap();
2227        let b = body_json(&resp);
2228        assert_eq!(b["status"], "ABORTED");
2229        assert_eq!(b["error"], "UserAborted");
2230    }
2231
2232    #[tokio::test]
2233    async fn stop_execution_not_found() {
2234        let svc = StepFunctionsService::new(make_state());
2235        let req = make_request(
2236            "StopExecution",
2237            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
2238                .to_string(),
2239        );
2240        let err = expect_err(svc.stop_execution(&req));
2241        assert!(err.to_string().contains("ExecutionDoesNotExist"));
2242    }
2243
2244    // ── ListExecutions ──
2245
2246    #[tokio::test]
2247    async fn list_executions() {
2248        let svc = StepFunctionsService::new(make_state());
2249        let sm_arn = create_sm(&svc, "list-exec");
2250
2251        for i in 0..3 {
2252            let body = json!({"stateMachineArn": sm_arn, "name": format!("e{i}")});
2253            let req = make_request("StartExecution", &body.to_string());
2254            svc.start_execution(&req).unwrap();
2255        }
2256
2257        let req = make_request(
2258            "ListExecutions",
2259            &json!({"stateMachineArn": sm_arn}).to_string(),
2260        );
2261        let resp = svc.list_executions(&req).unwrap();
2262        let b = body_json(&resp);
2263        assert_eq!(b["executions"].as_array().unwrap().len(), 3);
2264    }
2265
2266    #[tokio::test]
2267    async fn list_executions_sm_not_found() {
2268        let svc = StepFunctionsService::new(make_state());
2269        let req = make_request(
2270            "ListExecutions",
2271            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
2272                .to_string(),
2273        );
2274        let err = expect_err(svc.list_executions(&req));
2275        assert!(err.to_string().contains("StateMachineDoesNotExist"));
2276    }
2277
2278    // ── GetExecutionHistory ──
2279
2280    #[tokio::test]
2281    async fn get_execution_history_not_found() {
2282        let svc = StepFunctionsService::new(make_state());
2283        let req = make_request(
2284            "GetExecutionHistory",
2285            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
2286                .to_string(),
2287        );
2288        let err = expect_err(svc.get_execution_history(&req));
2289        assert!(err.to_string().contains("ExecutionDoesNotExist"));
2290    }
2291
2292    // ── DescribeStateMachineForExecution ──
2293
2294    #[tokio::test]
2295    async fn describe_sm_for_execution() {
2296        let svc = StepFunctionsService::new(make_state());
2297        let sm_arn = create_sm(&svc, "sm-for-exec");
2298
2299        let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
2300        let req = make_request("StartExecution", &body.to_string());
2301        let resp = svc.start_execution(&req).unwrap();
2302        let exec_arn = body_json(&resp)["executionArn"]
2303            .as_str()
2304            .unwrap()
2305            .to_string();
2306
2307        let req = make_request(
2308            "DescribeStateMachineForExecution",
2309            &json!({"executionArn": exec_arn}).to_string(),
2310        );
2311        let resp = svc.describe_state_machine_for_execution(&req).unwrap();
2312        let b = body_json(&resp);
2313        assert_eq!(b["name"], "sm-for-exec");
2314    }
2315
2316    // ── Tags ──
2317
2318    #[test]
2319    fn tag_untag_list_tags() {
2320        let svc = StepFunctionsService::new(make_state());
2321        let arn = create_sm(&svc, "tagged-sm");
2322
2323        // Tag
2324        let body = json!({
2325            "resourceArn": arn,
2326            "tags": [{"key": "env", "value": "prod"}],
2327        });
2328        let req = make_request("TagResource", &body.to_string());
2329        svc.tag_resource(&req).unwrap();
2330
2331        // List
2332        let req = make_request(
2333            "ListTagsForResource",
2334            &json!({"resourceArn": arn}).to_string(),
2335        );
2336        let resp = svc.list_tags_for_resource(&req).unwrap();
2337        let b = body_json(&resp);
2338        let tags = b["tags"].as_array().unwrap();
2339        assert_eq!(tags.len(), 1);
2340        assert_eq!(tags[0]["key"], "env");
2341
2342        // Untag
2343        let body = json!({
2344            "resourceArn": arn,
2345            "tagKeys": ["env"],
2346        });
2347        let req = make_request("UntagResource", &body.to_string());
2348        svc.untag_resource(&req).unwrap();
2349
2350        // Verify empty
2351        let req = make_request(
2352            "ListTagsForResource",
2353            &json!({"resourceArn": arn}).to_string(),
2354        );
2355        let resp = svc.list_tags_for_resource(&req).unwrap();
2356        let b = body_json(&resp);
2357        assert!(b["tags"].as_array().unwrap().is_empty());
2358    }
2359
2360    #[test]
2361    fn tag_resource_not_found() {
2362        let svc = StepFunctionsService::new(make_state());
2363        let body = json!({
2364            "resourceArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
2365            "tags": [{"key": "k", "value": "v"}],
2366        });
2367        let req = make_request("TagResource", &body.to_string());
2368        let err = expect_err(svc.tag_resource(&req));
2369        assert!(err.to_string().contains("ResourceNotFound"));
2370    }
2371
2372    // ── Helper function tests ──
2373
2374    #[test]
2375    fn test_validate_name() {
2376        assert!(validate_name("valid-name").is_ok());
2377        assert!(validate_name("under_score").is_ok());
2378        assert!(validate_name("").is_err());
2379        assert!(validate_name("has spaces").is_err());
2380        assert!(validate_name(&"a".repeat(81)).is_err());
2381    }
2382
2383    #[test]
2384    fn test_validate_definition() {
2385        assert!(validate_definition(VALID_DEF).is_ok());
2386        assert!(validate_definition("not json").is_err());
2387        assert!(validate_definition(r#"{"States":{}}"#).is_err()); // missing StartAt
2388        assert!(validate_definition(r#"{"StartAt":"S"}"#).is_err()); // missing States
2389    }
2390
2391    #[test]
2392    fn test_validate_arn() {
2393        assert!(validate_arn("arn:aws:states:us-east-1:123:sm:test").is_ok());
2394        assert!(validate_arn("not-an-arn").is_err());
2395    }
2396
2397    #[test]
2398    fn test_camel_to_details_key() {
2399        assert_eq!(camel_to_details_key("PassStateEntered"), "passStateEntered");
2400        assert_eq!(camel_to_details_key(""), "");
2401    }
2402
2403    #[test]
2404    fn test_is_mutating_action() {
2405        assert!(is_mutating_action("CreateStateMachine"));
2406        assert!(is_mutating_action("StartExecution"));
2407        assert!(!is_mutating_action("DescribeStateMachine"));
2408        assert!(!is_mutating_action("ListStateMachines"));
2409    }
2410}