Skip to main content

fakecloud_stepfunctions/
service.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use chrono::Utc;
6use http::StatusCode;
7use serde_json::{json, Value};
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::pagination::paginate;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_core::validation::*;
14use fakecloud_dynamodb::state::SharedDynamoDbState;
15use fakecloud_persistence::SnapshotStore;
16
17use crate::interpreter;
18use crate::state::{
19    Execution, ExecutionStatus, SharedStepFunctionsState, StateMachine, StateMachineStatus,
20    StateMachineType, StepFunctionsSnapshot, StepFunctionsState,
21    STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
22};
23
24const SUPPORTED: &[&str] = &[
25    "CreateStateMachine",
26    "DescribeStateMachine",
27    "ListStateMachines",
28    "DeleteStateMachine",
29    "UpdateStateMachine",
30    "TagResource",
31    "UntagResource",
32    "ListTagsForResource",
33    "StartExecution",
34    "StopExecution",
35    "DescribeExecution",
36    "ListExecutions",
37    "GetExecutionHistory",
38    "DescribeStateMachineForExecution",
39    "CreateActivity",
40    "DeleteActivity",
41    "DescribeActivity",
42    "ListActivities",
43    "GetActivityTask",
44    "SendTaskFailure",
45    "SendTaskHeartbeat",
46    "SendTaskSuccess",
47    "PublishStateMachineVersion",
48    "DeleteStateMachineVersion",
49    "ListStateMachineVersions",
50    "CreateStateMachineAlias",
51    "DeleteStateMachineAlias",
52    "DescribeStateMachineAlias",
53    "ListStateMachineAliases",
54    "UpdateStateMachineAlias",
55    "DescribeMapRun",
56    "ListMapRuns",
57    "UpdateMapRun",
58    "RedriveExecution",
59    "StartSyncExecution",
60    "TestState",
61    "ValidateStateMachineDefinition",
62];
63
64pub struct StepFunctionsService {
65    state: SharedStepFunctionsState,
66    delivery: Option<Arc<DeliveryBus>>,
67    dynamodb_state: Option<SharedDynamoDbState>,
68    snapshot_store: Option<Arc<dyn SnapshotStore>>,
69    snapshot_lock: Arc<AsyncMutex<()>>,
70}
71
72impl StepFunctionsService {
73    pub fn new(state: SharedStepFunctionsState) -> Self {
74        Self {
75            state,
76            delivery: None,
77            dynamodb_state: None,
78            snapshot_store: None,
79            snapshot_lock: Arc::new(AsyncMutex::new(())),
80        }
81    }
82
83    pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
84        self.delivery = Some(delivery);
85        self
86    }
87
88    pub fn with_dynamodb(mut self, dynamodb_state: SharedDynamoDbState) -> Self {
89        self.dynamodb_state = Some(dynamodb_state);
90        self
91    }
92
93    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
94        self.snapshot_store = Some(store);
95        self
96    }
97
98    async fn save_snapshot(&self) {
99        let Some(store) = self.snapshot_store.clone() else {
100            return;
101        };
102        let _guard = self.snapshot_lock.lock().await;
103        let snapshot = StepFunctionsSnapshot {
104            schema_version: STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
105            state: None,
106            accounts: Some(self.state.read().clone()),
107        };
108        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
109            let bytes = serde_json::to_vec(&snapshot)
110                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
111            store.save(&bytes)
112        })
113        .await;
114        match join {
115            Ok(Ok(())) => {}
116            Ok(Err(err)) => tracing::error!(%err, "failed to write stepfunctions snapshot"),
117            Err(err) => tracing::error!(%err, "stepfunctions snapshot task panicked"),
118        }
119    }
120}
121
122fn is_mutating_action(action: &str) -> bool {
123    matches!(
124        action,
125        "CreateStateMachine"
126            | "DeleteStateMachine"
127            | "UpdateStateMachine"
128            | "TagResource"
129            | "UntagResource"
130            | "StartExecution"
131            | "StopExecution"
132            | "CreateActivity"
133            | "DeleteActivity"
134            | "GetActivityTask"
135            | "SendTaskFailure"
136            | "SendTaskHeartbeat"
137            | "SendTaskSuccess"
138            | "PublishStateMachineVersion"
139            | "DeleteStateMachineVersion"
140            | "CreateStateMachineAlias"
141            | "DeleteStateMachineAlias"
142            | "UpdateStateMachineAlias"
143            | "UpdateMapRun"
144            | "RedriveExecution"
145            | "StartSyncExecution"
146    )
147}
148
149#[async_trait]
150impl AwsService for StepFunctionsService {
151    fn service_name(&self) -> &str {
152        "states"
153    }
154
155    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
156        let mutates = is_mutating_action(req.action.as_str());
157        let result = match req.action.as_str() {
158            "CreateStateMachine" => self.create_state_machine(&req),
159            "DescribeStateMachine" => self.describe_state_machine(&req),
160            "ListStateMachines" => self.list_state_machines(&req),
161            "DeleteStateMachine" => self.delete_state_machine(&req),
162            "UpdateStateMachine" => self.update_state_machine(&req),
163            "TagResource" => self.tag_resource(&req),
164            "UntagResource" => self.untag_resource(&req),
165            "ListTagsForResource" => self.list_tags_for_resource(&req),
166            "StartExecution" => self.start_execution(&req),
167            "StopExecution" => self.stop_execution(&req),
168            "DescribeExecution" => self.describe_execution(&req),
169            "ListExecutions" => self.list_executions(&req),
170            "GetExecutionHistory" => self.get_execution_history(&req),
171            "DescribeStateMachineForExecution" => self.describe_state_machine_for_execution(&req),
172            "CreateActivity" => self.create_activity(&req),
173            "DeleteActivity" => self.delete_activity(&req),
174            "DescribeActivity" => self.describe_activity(&req),
175            "ListActivities" => self.list_activities(&req),
176            "GetActivityTask" => self.get_activity_task(&req),
177            "SendTaskFailure" => self.send_task_failure(&req),
178            "SendTaskHeartbeat" => self.send_task_heartbeat(&req),
179            "SendTaskSuccess" => self.send_task_success(&req),
180            "PublishStateMachineVersion" => self.publish_state_machine_version(&req),
181            "DeleteStateMachineVersion" => self.delete_state_machine_version(&req),
182            "ListStateMachineVersions" => self.list_state_machine_versions(&req),
183            "CreateStateMachineAlias" => self.create_state_machine_alias(&req),
184            "DeleteStateMachineAlias" => self.delete_state_machine_alias(&req),
185            "DescribeStateMachineAlias" => self.describe_state_machine_alias(&req),
186            "ListStateMachineAliases" => self.list_state_machine_aliases(&req),
187            "UpdateStateMachineAlias" => self.update_state_machine_alias(&req),
188            "DescribeMapRun" => self.describe_map_run(&req),
189            "ListMapRuns" => self.list_map_runs(&req),
190            "UpdateMapRun" => self.update_map_run(&req),
191            "RedriveExecution" => self.redrive_execution(&req),
192            "StartSyncExecution" => self.start_sync_execution(&req),
193            "TestState" => self.test_state(&req),
194            "ValidateStateMachineDefinition" => self.validate_state_machine_definition(&req),
195            _ => Err(AwsServiceError::action_not_implemented(
196                "states",
197                &req.action,
198            )),
199        };
200        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
201            self.save_snapshot().await;
202        }
203        result
204    }
205
206    fn supported_actions(&self) -> &[&str] {
207        SUPPORTED
208    }
209}
210
211impl StepFunctionsService {
212    // ─── State Machine CRUD ─────────────────────────────────────────────
213
214    fn create_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
215        let body = req.json_body();
216
217        validate_required("name", &body["name"])?;
218        let name = body["name"].as_str().ok_or_else(|| missing("name"))?;
219        validate_name(name)?;
220
221        validate_required("definition", &body["definition"])?;
222        let definition = body["definition"]
223            .as_str()
224            .ok_or_else(|| missing("definition"))?;
225        validate_definition(definition)?;
226
227        validate_required("roleArn", &body["roleArn"])?;
228        let role_arn = body["roleArn"].as_str().ok_or_else(|| missing("roleArn"))?;
229        validate_arn(role_arn)?;
230
231        let machine_type = if let Some(t) = body["type"].as_str() {
232            StateMachineType::parse(t).ok_or_else(|| {
233                AwsServiceError::aws_error(
234                    StatusCode::BAD_REQUEST,
235                    "ValidationException",
236                    format!(
237                        "Value '{t}' at 'type' failed to satisfy constraint: \
238                         Member must satisfy enum value set: [STANDARD, EXPRESS]"
239                    ),
240                )
241            })?
242        } else {
243            StateMachineType::Standard
244        };
245
246        let mut accounts = self.state.write();
247        let state = accounts.get_or_create(&req.account_id);
248        let arn = state.state_machine_arn(name);
249
250        // Check if name already exists
251        if state.state_machines.values().any(|sm| sm.name == name) {
252            return Err(AwsServiceError::aws_error(
253                StatusCode::CONFLICT,
254                "StateMachineAlreadyExists",
255                format!("State Machine Already Exists: '{arn}'"),
256            ));
257        }
258
259        let now = Utc::now();
260        let revision_id = uuid::Uuid::new_v4().to_string();
261
262        let mut tags = HashMap::new();
263        if !body["tags"].is_null() {
264            fakecloud_core::tags::apply_tags(&mut tags, &body, "tags", "key", "value").map_err(
265                |f| {
266                    AwsServiceError::aws_error(
267                        StatusCode::BAD_REQUEST,
268                        "ValidationException",
269                        format!("{f} must be a list"),
270                    )
271                },
272            )?;
273        }
274
275        let sm = StateMachine {
276            name: name.to_string(),
277            arn: arn.clone(),
278            definition: definition.to_string(),
279            role_arn: role_arn.to_string(),
280            machine_type,
281            status: StateMachineStatus::Active,
282            creation_date: now,
283            update_date: now,
284            tags,
285            revision_id: revision_id.clone(),
286            logging_configuration: body.get("loggingConfiguration").cloned(),
287            tracing_configuration: body.get("tracingConfiguration").cloned(),
288            description: body["description"].as_str().unwrap_or("").to_string(),
289        };
290
291        state.state_machines.insert(arn.clone(), sm);
292
293        Ok(AwsResponse::ok_json(json!({
294            "stateMachineArn": arn,
295            "creationDate": now.timestamp() as f64,
296            "stateMachineVersionArn": arn,
297        })))
298    }
299
300    fn describe_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
301        let body = req.json_body();
302        validate_required("stateMachineArn", &body["stateMachineArn"])?;
303        let arn = body["stateMachineArn"]
304            .as_str()
305            .ok_or_else(|| missing("stateMachineArn"))?;
306        validate_arn(arn)?;
307
308        let accounts = self.state.read();
309        let empty = StepFunctionsState::new(&req.account_id, &req.region);
310        let state = accounts.get(&req.account_id).unwrap_or(&empty);
311        let sm = state
312            .state_machines
313            .get(arn)
314            .ok_or_else(|| state_machine_not_found(arn))?;
315
316        Ok(AwsResponse::ok_json(state_machine_to_json(sm)))
317    }
318
319    fn list_state_machines(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
320        let body = req.json_body();
321        let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
322        validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
323        let next_token = body["nextToken"].as_str();
324
325        let accounts = self.state.read();
326        let empty = StepFunctionsState::new(&req.account_id, &req.region);
327        let state = accounts.get(&req.account_id).unwrap_or(&empty);
328        let mut machines: Vec<&StateMachine> = state.state_machines.values().collect();
329        machines.sort_by(|a, b| a.name.cmp(&b.name));
330
331        let items: Vec<Value> = machines
332            .iter()
333            .map(|sm| {
334                json!({
335                    "name": sm.name,
336                    "stateMachineArn": sm.arn,
337                    "type": sm.machine_type.as_str(),
338                    "creationDate": sm.creation_date.timestamp() as f64,
339                })
340            })
341            .collect();
342
343        let (page, token) = paginate(&items, next_token, max_results);
344
345        let mut resp = json!({ "stateMachines": page });
346        if let Some(t) = token {
347            resp["nextToken"] = json!(t);
348        }
349        Ok(AwsResponse::ok_json(resp))
350    }
351
352    fn delete_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
353        let body = req.json_body();
354        validate_required("stateMachineArn", &body["stateMachineArn"])?;
355        let arn = body["stateMachineArn"]
356            .as_str()
357            .ok_or_else(|| missing("stateMachineArn"))?;
358        validate_arn(arn)?;
359
360        let mut accounts = self.state.write();
361        let state = accounts.get_or_create(&req.account_id);
362        // AWS returns success even if it doesn't exist
363        state.state_machines.remove(arn);
364
365        Ok(AwsResponse::ok_json(json!({})))
366    }
367
368    fn update_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
369        let body = req.json_body();
370        validate_required("stateMachineArn", &body["stateMachineArn"])?;
371        let arn = body["stateMachineArn"]
372            .as_str()
373            .ok_or_else(|| missing("stateMachineArn"))?;
374        validate_arn(arn)?;
375
376        let mut accounts = self.state.write();
377        let state = accounts.get_or_create(&req.account_id);
378        let sm = state
379            .state_machines
380            .get_mut(arn)
381            .ok_or_else(|| state_machine_not_found(arn))?;
382
383        if let Some(definition) = body["definition"].as_str() {
384            validate_definition(definition)?;
385            sm.definition = definition.to_string();
386        }
387
388        if let Some(role_arn) = body["roleArn"].as_str() {
389            validate_arn(role_arn)?;
390            sm.role_arn = role_arn.to_string();
391        }
392
393        if let Some(logging) = body.get("loggingConfiguration") {
394            sm.logging_configuration = Some(logging.clone());
395        }
396
397        if let Some(tracing) = body.get("tracingConfiguration") {
398            sm.tracing_configuration = Some(tracing.clone());
399        }
400
401        if let Some(description) = body["description"].as_str() {
402            sm.description = description.to_string();
403        }
404
405        let now = Utc::now();
406        sm.update_date = now;
407        sm.revision_id = uuid::Uuid::new_v4().to_string();
408
409        let revision_id = sm.revision_id.clone();
410        let sm_arn = sm.arn.clone();
411
412        Ok(AwsResponse::ok_json(json!({
413            "updateDate": now.timestamp() as f64,
414            "revisionId": revision_id,
415            "stateMachineVersionArn": sm_arn,
416        })))
417    }
418
419    // ─── Execution Lifecycle ──────────────────────────────────────────
420
421    fn start_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
422        let body = req.json_body();
423        validate_required("stateMachineArn", &body["stateMachineArn"])?;
424        let sm_arn = body["stateMachineArn"]
425            .as_str()
426            .ok_or_else(|| missing("stateMachineArn"))?;
427        validate_arn(sm_arn)?;
428
429        let input = body["input"].as_str().map(|s| s.to_string());
430
431        // Validate input is valid JSON if provided
432        if let Some(ref input_str) = input {
433            let _: serde_json::Value = serde_json::from_str(input_str).map_err(|_| {
434                AwsServiceError::aws_error(
435                    StatusCode::BAD_REQUEST,
436                    "InvalidExecutionInput",
437                    "Invalid execution input: must be valid JSON".to_string(),
438                )
439            })?;
440        }
441
442        let execution_name = body["name"]
443            .as_str()
444            .map(|s| s.to_string())
445            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
446
447        if let Some(name) = body["name"].as_str() {
448            validate_name(name)?;
449        }
450
451        let mut accounts = self.state.write();
452        let state = accounts.get_or_create(&req.account_id);
453        let sm = state
454            .state_machines
455            .get(sm_arn)
456            .ok_or_else(|| state_machine_not_found(sm_arn))?;
457
458        let sm_name = sm.name.clone();
459        let definition = sm.definition.clone();
460        let exec_arn = state.execution_arn(&sm_name, &execution_name);
461
462        // Check for duplicate execution name
463        if state.executions.contains_key(&exec_arn) {
464            return Err(AwsServiceError::aws_error(
465                StatusCode::CONFLICT,
466                "ExecutionAlreadyExists",
467                format!("Execution Already Exists: '{exec_arn}'"),
468            ));
469        }
470
471        let now = Utc::now();
472        let execution = Execution {
473            execution_arn: exec_arn.clone(),
474            state_machine_arn: sm_arn.to_string(),
475            state_machine_name: sm_name,
476            name: execution_name,
477            status: ExecutionStatus::Running,
478            input: input.clone(),
479            output: None,
480            start_date: now,
481            stop_date: None,
482            error: None,
483            cause: None,
484            history_events: vec![],
485        };
486
487        state.executions.insert(exec_arn.clone(), execution);
488        drop(accounts);
489
490        // Spawn async execution
491        let shared_state = self.state.clone();
492        let exec_arn_clone = exec_arn.clone();
493        let input_clone = input;
494        let delivery = self.delivery.clone();
495        let dynamodb_state = self.dynamodb_state.clone();
496        tokio::spawn(async move {
497            interpreter::execute_state_machine(
498                shared_state,
499                exec_arn_clone,
500                definition,
501                input_clone,
502                delivery,
503                dynamodb_state,
504            )
505            .await;
506        });
507
508        Ok(AwsResponse::ok_json(json!({
509            "executionArn": exec_arn,
510            "startDate": now.timestamp() as f64,
511        })))
512    }
513
514    fn stop_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
515        let body = req.json_body();
516        validate_required("executionArn", &body["executionArn"])?;
517        let exec_arn = body["executionArn"]
518            .as_str()
519            .ok_or_else(|| missing("executionArn"))?;
520
521        let error = body["error"].as_str().map(|s| s.to_string());
522        let cause = body["cause"].as_str().map(|s| s.to_string());
523
524        let mut accounts = self.state.write();
525        let state = accounts.get_or_create(&req.account_id);
526        let exec = state
527            .executions
528            .get_mut(exec_arn)
529            .ok_or_else(|| execution_not_found(exec_arn))?;
530
531        if exec.status != ExecutionStatus::Running {
532            return Err(AwsServiceError::aws_error(
533                StatusCode::BAD_REQUEST,
534                "ExecutionNotRunning",
535                format!("Execution is not running: '{exec_arn}'"),
536            ));
537        }
538
539        let now = Utc::now();
540        exec.status = ExecutionStatus::Aborted;
541        exec.stop_date = Some(now);
542        exec.error = error;
543        exec.cause = cause;
544
545        Ok(AwsResponse::ok_json(json!({
546            "stopDate": now.timestamp() as f64,
547        })))
548    }
549
550    fn describe_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
551        let body = req.json_body();
552        validate_required("executionArn", &body["executionArn"])?;
553        let exec_arn = body["executionArn"]
554            .as_str()
555            .ok_or_else(|| missing("executionArn"))?;
556
557        let accounts = self.state.read();
558        let empty = StepFunctionsState::new(&req.account_id, &req.region);
559        let state = accounts.get(&req.account_id).unwrap_or(&empty);
560        let exec = state
561            .executions
562            .get(exec_arn)
563            .ok_or_else(|| execution_not_found(exec_arn))?;
564
565        Ok(AwsResponse::ok_json(execution_to_json(exec)))
566    }
567
568    fn list_executions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
569        let body = req.json_body();
570        validate_required("stateMachineArn", &body["stateMachineArn"])?;
571        let sm_arn = body["stateMachineArn"]
572            .as_str()
573            .ok_or_else(|| missing("stateMachineArn"))?;
574        validate_arn(sm_arn)?;
575
576        let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
577        validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
578        let next_token = body["nextToken"].as_str();
579        let status_filter = body["statusFilter"].as_str();
580
581        let accounts = self.state.read();
582        let empty = StepFunctionsState::new(&req.account_id, &req.region);
583        let state = accounts.get(&req.account_id).unwrap_or(&empty);
584
585        // Verify state machine exists
586        if !state.state_machines.contains_key(sm_arn) {
587            return Err(state_machine_not_found(sm_arn));
588        }
589
590        let mut executions: Vec<&Execution> = state
591            .executions
592            .values()
593            .filter(|e| e.state_machine_arn == sm_arn)
594            .filter(|e| {
595                status_filter
596                    .map(|sf| e.status.as_str() == sf)
597                    .unwrap_or(true)
598            })
599            .collect();
600
601        // Sort by start date descending (most recent first)
602        executions.sort_by_key(|e| std::cmp::Reverse(e.start_date));
603
604        let items: Vec<Value> = executions
605            .iter()
606            .map(|e| {
607                let mut item = json!({
608                    "executionArn": e.execution_arn,
609                    "stateMachineArn": e.state_machine_arn,
610                    "name": e.name,
611                    "status": e.status.as_str(),
612                    "startDate": e.start_date.timestamp() as f64,
613                });
614                if let Some(stop) = e.stop_date {
615                    item["stopDate"] = json!(stop.timestamp() as f64);
616                }
617                item
618            })
619            .collect();
620
621        let (page, token) = paginate(&items, next_token, max_results);
622
623        let mut resp = json!({ "executions": page });
624        if let Some(t) = token {
625            resp["nextToken"] = json!(t);
626        }
627        Ok(AwsResponse::ok_json(resp))
628    }
629
630    fn get_execution_history(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
631        let body = req.json_body();
632        validate_required("executionArn", &body["executionArn"])?;
633        let exec_arn = body["executionArn"]
634            .as_str()
635            .ok_or_else(|| missing("executionArn"))?;
636
637        let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
638        validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
639        let next_token = body["nextToken"].as_str();
640        let reverse_order = body["reverseOrder"].as_bool().unwrap_or(false);
641
642        let accounts = self.state.read();
643        let empty = StepFunctionsState::new(&req.account_id, &req.region);
644        let state = accounts.get(&req.account_id).unwrap_or(&empty);
645        let exec = state
646            .executions
647            .get(exec_arn)
648            .ok_or_else(|| execution_not_found(exec_arn))?;
649
650        let mut events: Vec<Value> = exec
651            .history_events
652            .iter()
653            .map(|e| {
654                json!({
655                    "id": e.id,
656                    "type": e.event_type,
657                    "timestamp": e.timestamp.timestamp() as f64,
658                    "previousEventId": e.previous_event_id,
659                    format!("{}EventDetails", camel_to_details_key(&e.event_type)): e.details,
660                })
661            })
662            .collect();
663
664        if reverse_order {
665            events.reverse();
666        }
667
668        let (page, token) = paginate(&events, next_token, max_results);
669
670        let mut resp = json!({ "events": page });
671        if let Some(t) = token {
672            resp["nextToken"] = json!(t);
673        }
674        Ok(AwsResponse::ok_json(resp))
675    }
676
677    fn describe_state_machine_for_execution(
678        &self,
679        req: &AwsRequest,
680    ) -> Result<AwsResponse, AwsServiceError> {
681        let body = req.json_body();
682        validate_required("executionArn", &body["executionArn"])?;
683        let exec_arn = body["executionArn"]
684            .as_str()
685            .ok_or_else(|| missing("executionArn"))?;
686
687        let accounts = self.state.read();
688        let empty = StepFunctionsState::new(&req.account_id, &req.region);
689        let state = accounts.get(&req.account_id).unwrap_or(&empty);
690        let exec = state
691            .executions
692            .get(exec_arn)
693            .ok_or_else(|| execution_not_found(exec_arn))?;
694
695        let sm = state
696            .state_machines
697            .get(&exec.state_machine_arn)
698            .ok_or_else(|| state_machine_not_found(&exec.state_machine_arn))?;
699
700        Ok(AwsResponse::ok_json(state_machine_to_json(sm)))
701    }
702
703    // ─── Tagging ────────────────────────────────────────────────────────
704
705    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
706        let body = req.json_body();
707        validate_required("resourceArn", &body["resourceArn"])?;
708        let arn = body["resourceArn"]
709            .as_str()
710            .ok_or_else(|| missing("resourceArn"))?;
711        validate_arn(arn)?;
712        validate_required("tags", &body["tags"])?;
713
714        let mut accounts = self.state.write();
715        let state = accounts.get_or_create(&req.account_id);
716        let sm = state
717            .state_machines
718            .get_mut(arn)
719            .ok_or_else(|| resource_not_found(arn))?;
720
721        fakecloud_core::tags::apply_tags(&mut sm.tags, &body, "tags", "key", "value").map_err(
722            |f| {
723                AwsServiceError::aws_error(
724                    StatusCode::BAD_REQUEST,
725                    "ValidationException",
726                    format!("{f} must be a list"),
727                )
728            },
729        )?;
730
731        Ok(AwsResponse::ok_json(json!({})))
732    }
733
734    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
735        let body = req.json_body();
736        validate_required("resourceArn", &body["resourceArn"])?;
737        let arn = body["resourceArn"]
738            .as_str()
739            .ok_or_else(|| missing("resourceArn"))?;
740        validate_arn(arn)?;
741        validate_required("tagKeys", &body["tagKeys"])?;
742
743        let mut accounts = self.state.write();
744        let state = accounts.get_or_create(&req.account_id);
745        let sm = state
746            .state_machines
747            .get_mut(arn)
748            .ok_or_else(|| resource_not_found(arn))?;
749
750        fakecloud_core::tags::remove_tags(&mut sm.tags, &body, "tagKeys").map_err(|f| {
751            AwsServiceError::aws_error(
752                StatusCode::BAD_REQUEST,
753                "ValidationException",
754                format!("{f} must be a list"),
755            )
756        })?;
757
758        Ok(AwsResponse::ok_json(json!({})))
759    }
760
761    fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
762        let body = req.json_body();
763        validate_required("resourceArn", &body["resourceArn"])?;
764        let arn = body["resourceArn"]
765            .as_str()
766            .ok_or_else(|| missing("resourceArn"))?;
767        validate_arn(arn)?;
768
769        let accounts = self.state.read();
770        let empty = StepFunctionsState::new(&req.account_id, &req.region);
771        let state = accounts.get(&req.account_id).unwrap_or(&empty);
772        let sm = state
773            .state_machines
774            .get(arn)
775            .ok_or_else(|| resource_not_found(arn))?;
776
777        let tags = fakecloud_core::tags::tags_to_json(&sm.tags, "key", "value");
778
779        Ok(AwsResponse::ok_json(json!({ "tags": tags })))
780    }
781
782    // ─── Activities ─────────────────────────────────────────────────────
783
784    fn create_activity(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
785        let body = req.json_body();
786        let name = body["name"].as_str().ok_or_else(|| missing("name"))?;
787        validate_name(name)?;
788        let mut accounts = self.state.write();
789        let state = accounts.get_or_create(&req.account_id);
790        let arn = format!(
791            "arn:aws:states:{}:{}:activity:{}",
792            state.region, state.account_id, name
793        );
794        if state.activities.contains_key(&arn) {
795            return Err(AwsServiceError::aws_error(
796                StatusCode::BAD_REQUEST,
797                "ActivityAlreadyExists",
798                format!("Activity already exists: {arn}"),
799            ));
800        }
801        let activity = crate::state::Activity {
802            name: name.to_string(),
803            arn: arn.clone(),
804            creation_date: chrono::Utc::now(),
805            tags: HashMap::new(),
806        };
807        state.activities.insert(arn.clone(), activity.clone());
808        Ok(AwsResponse::ok_json(json!({
809            "activityArn": arn,
810            "creationDate": activity.creation_date.timestamp(),
811        })))
812    }
813
814    fn delete_activity(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
815        let body = req.json_body();
816        let arn = body["activityArn"]
817            .as_str()
818            .ok_or_else(|| missing("activityArn"))?
819            .to_string();
820        let mut accounts = self.state.write();
821        let state = accounts.get_or_create(&req.account_id);
822        state.activities.remove(&arn);
823        Ok(AwsResponse::ok_json(json!({})))
824    }
825
826    fn describe_activity(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
827        let body = req.json_body();
828        let arn = body["activityArn"]
829            .as_str()
830            .ok_or_else(|| missing("activityArn"))?
831            .to_string();
832        let accounts = self.state.read();
833        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
834        let state = accounts.get(&req.account_id).unwrap_or(&empty);
835        let a = state.activities.get(&arn).ok_or_else(|| {
836            AwsServiceError::aws_error(
837                StatusCode::BAD_REQUEST,
838                "ActivityDoesNotExist",
839                format!("Activity does not exist: {arn}"),
840            )
841        })?;
842        Ok(AwsResponse::ok_json(json!({
843            "activityArn": a.arn,
844            "name": a.name,
845            "creationDate": a.creation_date.timestamp(),
846        })))
847    }
848
849    fn list_activities(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
850        let accounts = self.state.read();
851        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
852        let state = accounts.get(&req.account_id).unwrap_or(&empty);
853        let mut activities: Vec<&crate::state::Activity> = state.activities.values().collect();
854        activities.sort_by(|a, b| a.name.cmp(&b.name));
855        let body = json!({
856            "activities": activities.iter().map(|a| json!({
857                "activityArn": a.arn,
858                "name": a.name,
859                "creationDate": a.creation_date.timestamp(),
860            })).collect::<Vec<_>>(),
861        });
862        Ok(AwsResponse::ok_json(body))
863    }
864
865    fn get_activity_task(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
866        let body = req.json_body();
867        let arn = body["activityArn"]
868            .as_str()
869            .ok_or_else(|| missing("activityArn"))?
870            .to_string();
871        let mut accounts = self.state.write();
872        let state = accounts.get_or_create(&req.account_id);
873        if !state.activities.contains_key(&arn) {
874            return Err(AwsServiceError::aws_error(
875                StatusCode::BAD_REQUEST,
876                "ActivityDoesNotExist",
877                format!("Activity does not exist: {arn}"),
878            ));
879        }
880        // Mint a synthetic task token tied to this activity. No worker
881        // pool to dispatch from in fakecloud, so we return an empty input.
882        let token = format!(
883            "FCToken-{}",
884            chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0)
885        );
886        state.task_tokens.insert(
887            token.clone(),
888            crate::state::TaskTokenState {
889                activity_arn: arn,
890                status: "PENDING".to_string(),
891                output: None,
892                error: None,
893                cause: None,
894            },
895        );
896        Ok(AwsResponse::ok_json(json!({
897            "taskToken": token,
898            "input": "{}",
899        })))
900    }
901
902    fn send_task_success(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
903        self.update_task_token(req, "SUCCEEDED")
904    }
905
906    fn send_task_failure(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
907        self.update_task_token(req, "FAILED")
908    }
909
910    fn send_task_heartbeat(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
911        self.update_task_token(req, "HEARTBEAT")
912    }
913
914    fn update_task_token(
915        &self,
916        req: &AwsRequest,
917        new_status: &str,
918    ) -> Result<AwsResponse, AwsServiceError> {
919        let body = req.json_body();
920        let token = body["taskToken"]
921            .as_str()
922            .ok_or_else(|| missing("taskToken"))?
923            .to_string();
924        let mut accounts = self.state.write();
925        let state = accounts.get_or_create(&req.account_id);
926        let entry = state.task_tokens.get_mut(&token).ok_or_else(|| {
927            AwsServiceError::aws_error(
928                StatusCode::BAD_REQUEST,
929                "TaskDoesNotExist",
930                format!("Task does not exist: {token}"),
931            )
932        })?;
933        entry.status = new_status.to_string();
934        if new_status == "SUCCEEDED" {
935            entry.output = body["output"].as_str().map(String::from);
936        } else if new_status == "FAILED" {
937            entry.error = body["error"].as_str().map(String::from);
938            entry.cause = body["cause"].as_str().map(String::from);
939        }
940        Ok(AwsResponse::ok_json(json!({})))
941    }
942
943    // ─── State machine versions / aliases ───────────────────────────────
944
945    fn publish_state_machine_version(
946        &self,
947        req: &AwsRequest,
948    ) -> Result<AwsResponse, AwsServiceError> {
949        let body = req.json_body();
950        let arn = body["stateMachineArn"]
951            .as_str()
952            .ok_or_else(|| missing("stateMachineArn"))?
953            .to_string();
954        let description = body["description"].as_str().unwrap_or("").to_string();
955        let mut accounts = self.state.write();
956        let state = accounts.get_or_create(&req.account_id);
957        if !state.state_machines.contains_key(&arn) {
958            return Err(state_machine_not_found(&arn));
959        }
960        let version = state
961            .state_machine_versions
962            .values()
963            .filter(|v| v.state_machine_arn == arn)
964            .map(|v| v.version)
965            .max()
966            .unwrap_or(0)
967            + 1;
968        let version_arn = format!("{arn}:{version}");
969        let v = crate::state::StateMachineVersion {
970            state_machine_arn: arn,
971            version,
972            revision_id: format!("rev-{version}"),
973            description,
974            creation_date: chrono::Utc::now(),
975        };
976        state
977            .state_machine_versions
978            .insert(version_arn.clone(), v.clone());
979        Ok(AwsResponse::ok_json(json!({
980            "stateMachineVersionArn": version_arn,
981            "creationDate": v.creation_date.timestamp(),
982        })))
983    }
984
985    fn delete_state_machine_version(
986        &self,
987        req: &AwsRequest,
988    ) -> Result<AwsResponse, AwsServiceError> {
989        let body = req.json_body();
990        let arn = body["stateMachineVersionArn"]
991            .as_str()
992            .ok_or_else(|| missing("stateMachineVersionArn"))?
993            .to_string();
994        let mut accounts = self.state.write();
995        let state = accounts.get_or_create(&req.account_id);
996        state.state_machine_versions.remove(&arn);
997        Ok(AwsResponse::ok_json(json!({})))
998    }
999
1000    fn list_state_machine_versions(
1001        &self,
1002        req: &AwsRequest,
1003    ) -> Result<AwsResponse, AwsServiceError> {
1004        let body = req.json_body();
1005        let arn = body["stateMachineArn"]
1006            .as_str()
1007            .ok_or_else(|| missing("stateMachineArn"))?
1008            .to_string();
1009        let accounts = self.state.read();
1010        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1011        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1012        let mut versions: Vec<&crate::state::StateMachineVersion> = state
1013            .state_machine_versions
1014            .values()
1015            .filter(|v| v.state_machine_arn == arn)
1016            .collect();
1017        versions.sort_by_key(|v| std::cmp::Reverse(v.version));
1018        let resp = json!({
1019            "stateMachineVersions": versions.iter().map(|v| json!({
1020                "stateMachineVersionArn": format!("{}:{}", v.state_machine_arn, v.version),
1021                "creationDate": v.creation_date.timestamp(),
1022            })).collect::<Vec<_>>(),
1023        });
1024        Ok(AwsResponse::ok_json(resp))
1025    }
1026
1027    fn create_state_machine_alias(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1028        let body = req.json_body();
1029        let name = body["name"]
1030            .as_str()
1031            .ok_or_else(|| missing("name"))?
1032            .to_string();
1033        validate_name(&name)?;
1034        let routing_cfg = body["routingConfiguration"]
1035            .as_array()
1036            .ok_or_else(|| missing("routingConfiguration"))?;
1037        let routes = parse_routing_configuration(routing_cfg)?;
1038        let parent_arn = routes[0]
1039            .state_machine_version_arn
1040            .rsplit_once(':')
1041            .map(|(parent, _)| parent.to_string())
1042            .unwrap_or_default();
1043        let alias_arn = format!("{parent_arn}:{name}");
1044        let now = chrono::Utc::now();
1045        let alias = crate::state::StateMachineAlias {
1046            name,
1047            arn: alias_arn.clone(),
1048            description: body["description"].as_str().unwrap_or("").to_string(),
1049            routing_configuration: routes,
1050            creation_date: now,
1051            update_date: now,
1052        };
1053        let mut accounts = self.state.write();
1054        let state = accounts.get_or_create(&req.account_id);
1055        state.state_machine_aliases.insert(alias_arn.clone(), alias);
1056        Ok(AwsResponse::ok_json(json!({
1057            "stateMachineAliasArn": alias_arn,
1058            "creationDate": now.timestamp(),
1059        })))
1060    }
1061
1062    fn delete_state_machine_alias(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1063        let body = req.json_body();
1064        let arn = body["stateMachineAliasArn"]
1065            .as_str()
1066            .ok_or_else(|| missing("stateMachineAliasArn"))?
1067            .to_string();
1068        let mut accounts = self.state.write();
1069        let state = accounts.get_or_create(&req.account_id);
1070        state.state_machine_aliases.remove(&arn);
1071        Ok(AwsResponse::ok_json(json!({})))
1072    }
1073
1074    fn describe_state_machine_alias(
1075        &self,
1076        req: &AwsRequest,
1077    ) -> Result<AwsResponse, AwsServiceError> {
1078        let body = req.json_body();
1079        let arn = body["stateMachineAliasArn"]
1080            .as_str()
1081            .ok_or_else(|| missing("stateMachineAliasArn"))?
1082            .to_string();
1083        let accounts = self.state.read();
1084        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1085        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1086        let alias = state
1087            .state_machine_aliases
1088            .get(&arn)
1089            .ok_or_else(|| resource_not_found(&arn))?;
1090        Ok(AwsResponse::ok_json(state_machine_alias_to_json(alias)))
1091    }
1092
1093    fn list_state_machine_aliases(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1094        let body = req.json_body();
1095        let parent = body["stateMachineArn"]
1096            .as_str()
1097            .ok_or_else(|| missing("stateMachineArn"))?
1098            .to_string();
1099        let accounts = self.state.read();
1100        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1101        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1102        // Anchor the prefix on the alias separator so a state machine
1103        // named `foo` doesn't pull in aliases for `foobar`.
1104        let parent_prefix = format!("{parent}:");
1105        let mut aliases: Vec<&crate::state::StateMachineAlias> = state
1106            .state_machine_aliases
1107            .values()
1108            .filter(|a| a.arn.starts_with(&parent_prefix))
1109            .collect();
1110        aliases.sort_by(|a, b| a.name.cmp(&b.name));
1111        Ok(AwsResponse::ok_json(json!({
1112            "stateMachineAliases": aliases.iter().map(|a| json!({
1113                "stateMachineAliasArn": a.arn,
1114                "creationDate": a.creation_date.timestamp(),
1115            })).collect::<Vec<_>>(),
1116        })))
1117    }
1118
1119    fn update_state_machine_alias(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1120        let body = req.json_body();
1121        let arn = body["stateMachineAliasArn"]
1122            .as_str()
1123            .ok_or_else(|| missing("stateMachineAliasArn"))?
1124            .to_string();
1125        let mut accounts = self.state.write();
1126        let state = accounts.get_or_create(&req.account_id);
1127        let alias = state
1128            .state_machine_aliases
1129            .get_mut(&arn)
1130            .ok_or_else(|| resource_not_found(&arn))?;
1131        if let Some(d) = body["description"].as_str() {
1132            alias.description = d.to_string();
1133        }
1134        if let Some(routes) = body["routingConfiguration"].as_array() {
1135            alias.routing_configuration = parse_routing_configuration(routes)?;
1136        }
1137        alias.update_date = chrono::Utc::now();
1138        Ok(AwsResponse::ok_json(json!({
1139            "updateDate": alias.update_date.timestamp(),
1140        })))
1141    }
1142
1143    // ─── Map runs ───────────────────────────────────────────────────────
1144
1145    fn describe_map_run(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1146        let body = req.json_body();
1147        let arn = body["mapRunArn"]
1148            .as_str()
1149            .ok_or_else(|| missing("mapRunArn"))?
1150            .to_string();
1151        let accounts = self.state.read();
1152        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1153        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1154        let mr = state
1155            .map_runs
1156            .get(&arn)
1157            .ok_or_else(|| resource_not_found(&arn))?;
1158        Ok(AwsResponse::ok_json(map_run_to_json(mr)))
1159    }
1160
1161    fn list_map_runs(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1162        let body = req.json_body();
1163        let exec_arn = body["executionArn"].as_str().map(String::from);
1164        let accounts = self.state.read();
1165        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1166        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1167        let runs: Vec<&crate::state::MapRun> = state
1168            .map_runs
1169            .values()
1170            .filter(|r| exec_arn.as_deref().is_none_or(|e| r.execution_arn == e))
1171            .collect();
1172        Ok(AwsResponse::ok_json(json!({
1173            "mapRuns": runs.iter().map(|r| json!({
1174                "mapRunArn": r.map_run_arn,
1175                "executionArn": r.execution_arn,
1176                "stateMachineArn": "",
1177                "startDate": r.start_date.timestamp(),
1178                "stopDate": r.stop_date.map(|d| d.timestamp()),
1179            })).collect::<Vec<_>>(),
1180        })))
1181    }
1182
1183    fn update_map_run(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1184        let body = req.json_body();
1185        let arn = body["mapRunArn"]
1186            .as_str()
1187            .ok_or_else(|| missing("mapRunArn"))?
1188            .to_string();
1189        let mut accounts = self.state.write();
1190        let state = accounts.get_or_create(&req.account_id);
1191        let mr = state
1192            .map_runs
1193            .get_mut(&arn)
1194            .ok_or_else(|| resource_not_found(&arn))?;
1195        if let Some(c) = body["maxConcurrency"].as_i64() {
1196            mr.max_concurrency = c as i32;
1197        }
1198        if let Some(p) = body["toleratedFailurePercentage"].as_f64() {
1199            mr.tolerated_failure_percentage = p;
1200        }
1201        if let Some(c) = body["toleratedFailureCount"].as_i64() {
1202            mr.tolerated_failure_count = c;
1203        }
1204        Ok(AwsResponse::ok_json(json!({})))
1205    }
1206
1207    // ─── Execution lifecycle extras ─────────────────────────────────────
1208
1209    fn redrive_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1210        let body = req.json_body();
1211        let arn = body["executionArn"]
1212            .as_str()
1213            .ok_or_else(|| missing("executionArn"))?
1214            .to_string();
1215        let mut accounts = self.state.write();
1216        let state = accounts.get_or_create(&req.account_id);
1217        let exec = state.executions.get_mut(&arn).ok_or_else(|| {
1218            AwsServiceError::aws_error(
1219                StatusCode::BAD_REQUEST,
1220                "ExecutionDoesNotExist",
1221                format!("Execution does not exist: {arn}"),
1222            )
1223        })?;
1224        exec.status = crate::state::ExecutionStatus::Running;
1225        exec.stop_date = None;
1226        Ok(AwsResponse::ok_json(json!({
1227            "redriveDate": chrono::Utc::now().timestamp(),
1228        })))
1229    }
1230
1231    fn start_sync_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1232        let body = req.json_body();
1233        let sm_arn = body["stateMachineArn"]
1234            .as_str()
1235            .ok_or_else(|| missing("stateMachineArn"))?
1236            .to_string();
1237        let input = body["input"].as_str().unwrap_or("{}").to_string();
1238        if serde_json::from_str::<serde_json::Value>(&input).is_err() {
1239            return Err(AwsServiceError::aws_error(
1240                StatusCode::BAD_REQUEST,
1241                "InvalidExecutionInput",
1242                "Execution input is not valid JSON.",
1243            ));
1244        }
1245        let mut accounts = self.state.write();
1246        let state = accounts.get_or_create(&req.account_id);
1247        let sm = state
1248            .state_machines
1249            .get(&sm_arn)
1250            .ok_or_else(|| state_machine_not_found(&sm_arn))?;
1251        if sm.machine_type != crate::state::StateMachineType::Express {
1252            return Err(AwsServiceError::aws_error(
1253                StatusCode::BAD_REQUEST,
1254                "StateMachineTypeNotSupported",
1255                "StartSyncExecution is only supported for EXPRESS state machines.",
1256            ));
1257        }
1258        let now = chrono::Utc::now();
1259        let exec_arn = format!(
1260            "arn:aws:states:{}:{}:express:{}:sync-{}",
1261            state.region,
1262            state.account_id,
1263            sm.name,
1264            now.timestamp_millis()
1265        );
1266        Ok(AwsResponse::ok_json(json!({
1267            "executionArn": exec_arn,
1268            "stateMachineArn": sm_arn,
1269            "name": "sync",
1270            "startDate": now.timestamp(),
1271            "stopDate": now.timestamp(),
1272            "status": "SUCCEEDED",
1273            "input": input,
1274            "output": "{}",
1275            "billingDetails": {
1276                "billedMemoryUsedInMB": 64,
1277                "billedDurationInMilliseconds": 1,
1278            },
1279        })))
1280    }
1281
1282    fn test_state(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1283        let body = req.json_body();
1284        let definition = body["definition"]
1285            .as_str()
1286            .ok_or_else(|| missing("definition"))?;
1287        validate_definition(definition)?;
1288        let _role_arn = body["roleArn"].as_str().ok_or_else(|| missing("roleArn"))?;
1289        let input = body["input"].as_str().unwrap_or("{}").to_string();
1290        // Echo input back as output. Real Step Functions actually
1291        // simulates the state; our emulator reports SUCCEEDED so callers
1292        // can wire the integration test scaffolding.
1293        Ok(AwsResponse::ok_json(json!({
1294            "output": input,
1295            "status": "SUCCEEDED",
1296            "nextState": "End",
1297        })))
1298    }
1299
1300    fn validate_state_machine_definition(
1301        &self,
1302        req: &AwsRequest,
1303    ) -> Result<AwsResponse, AwsServiceError> {
1304        let body = req.json_body();
1305        let definition = body["definition"]
1306            .as_str()
1307            .ok_or_else(|| missing("definition"))?;
1308        match validate_definition(definition) {
1309            Ok(()) => Ok(AwsResponse::ok_json(json!({
1310                "result": "OK",
1311                "diagnostics": [],
1312            }))),
1313            Err(e) => Ok(AwsResponse::ok_json(json!({
1314                "result": "FAIL",
1315                "diagnostics": [{
1316                    "severity": "ERROR",
1317                    "code": "INVALID_DEFINITION",
1318                    "message": e.to_string(),
1319                }],
1320            }))),
1321        }
1322    }
1323}
1324
1325fn state_machine_alias_to_json(alias: &crate::state::StateMachineAlias) -> Value {
1326    json!({
1327        "stateMachineAliasArn": alias.arn,
1328        "name": alias.name,
1329        "description": alias.description,
1330        "routingConfiguration": alias.routing_configuration.iter().map(|r| json!({
1331            "stateMachineVersionArn": r.state_machine_version_arn,
1332            "weight": r.weight,
1333        })).collect::<Vec<_>>(),
1334        "creationDate": alias.creation_date.timestamp(),
1335        "updateDate": alias.update_date.timestamp(),
1336    })
1337}
1338
1339fn map_run_to_json(mr: &crate::state::MapRun) -> Value {
1340    json!({
1341        "mapRunArn": mr.map_run_arn,
1342        "executionArn": mr.execution_arn,
1343        "maxConcurrency": mr.max_concurrency,
1344        "toleratedFailurePercentage": mr.tolerated_failure_percentage,
1345        "toleratedFailureCount": mr.tolerated_failure_count,
1346        "status": mr.status,
1347        "startDate": mr.start_date.timestamp(),
1348        "stopDate": mr.stop_date.map(|d| d.timestamp()),
1349    })
1350}
1351
1352// ─── Helpers ────────────────────────────────────────────────────────────
1353
1354fn state_machine_to_json(sm: &StateMachine) -> Value {
1355    let mut resp = json!({
1356        "name": sm.name,
1357        "stateMachineArn": sm.arn,
1358        "definition": sm.definition,
1359        "roleArn": sm.role_arn,
1360        "type": sm.machine_type.as_str(),
1361        "status": sm.status.as_str(),
1362        "creationDate": sm.creation_date.timestamp() as f64,
1363        "updateDate": sm.update_date.timestamp() as f64,
1364        "revisionId": sm.revision_id,
1365        "label": sm.name,
1366    });
1367
1368    if !sm.description.is_empty() {
1369        resp["description"] = json!(sm.description);
1370    }
1371
1372    if let Some(ref logging) = sm.logging_configuration {
1373        resp["loggingConfiguration"] = logging.clone();
1374    } else {
1375        resp["loggingConfiguration"] = json!({
1376            "level": "OFF",
1377            "includeExecutionData": false,
1378            "destinations": [],
1379        });
1380    }
1381
1382    if let Some(ref tracing) = sm.tracing_configuration {
1383        resp["tracingConfiguration"] = tracing.clone();
1384    } else {
1385        resp["tracingConfiguration"] = json!({
1386            "enabled": false,
1387        });
1388    }
1389
1390    resp
1391}
1392
1393fn missing(name: &str) -> AwsServiceError {
1394    AwsServiceError::aws_error(
1395        StatusCode::BAD_REQUEST,
1396        "ValidationException",
1397        format!("The request must contain the parameter {name}."),
1398    )
1399}
1400
1401fn state_machine_not_found(arn: &str) -> AwsServiceError {
1402    AwsServiceError::aws_error(
1403        StatusCode::BAD_REQUEST,
1404        "StateMachineDoesNotExist",
1405        format!("State Machine Does Not Exist: '{arn}'"),
1406    )
1407}
1408
1409fn resource_not_found(arn: &str) -> AwsServiceError {
1410    AwsServiceError::aws_error(
1411        StatusCode::BAD_REQUEST,
1412        "ResourceNotFound",
1413        format!("Resource not found: '{arn}'"),
1414    )
1415}
1416
1417/// Parse + validate an alias `routingConfiguration` array.
1418///
1419/// AWS rules: 1 or 2 routes; weights are 0-100 and sum to 100; each
1420/// route must include `stateMachineVersionArn`.
1421fn parse_routing_configuration(
1422    routes: &[serde_json::Value],
1423) -> Result<Vec<crate::state::AliasRoute>, AwsServiceError> {
1424    if routes.is_empty() || routes.len() > 2 {
1425        return Err(AwsServiceError::aws_error(
1426            StatusCode::BAD_REQUEST,
1427            "ValidationException",
1428            "routingConfiguration must contain 1 or 2 routes.",
1429        ));
1430    }
1431    let parsed: Vec<crate::state::AliasRoute> = routes
1432        .iter()
1433        .map(|r| {
1434            let arn = r["stateMachineVersionArn"].as_str().ok_or_else(|| {
1435                AwsServiceError::aws_error(
1436                    StatusCode::BAD_REQUEST,
1437                    "ValidationException",
1438                    "routingConfiguration entries must contain stateMachineVersionArn.",
1439                )
1440            })?;
1441            let weight = r["weight"].as_i64().ok_or_else(|| {
1442                AwsServiceError::aws_error(
1443                    StatusCode::BAD_REQUEST,
1444                    "ValidationException",
1445                    "routingConfiguration entries must contain a numeric weight.",
1446                )
1447            })?;
1448            if !(0..=100).contains(&weight) {
1449                return Err(AwsServiceError::aws_error(
1450                    StatusCode::BAD_REQUEST,
1451                    "ValidationException",
1452                    format!("Invalid routing weight {weight}; must be 0-100."),
1453                ));
1454            }
1455            Ok(crate::state::AliasRoute {
1456                state_machine_version_arn: arn.to_string(),
1457                weight: weight as i32,
1458            })
1459        })
1460        .collect::<Result<_, _>>()?;
1461    let total: i32 = parsed.iter().map(|r| r.weight).sum();
1462    if total != 100 {
1463        return Err(AwsServiceError::aws_error(
1464            StatusCode::BAD_REQUEST,
1465            "ValidationException",
1466            format!("routingConfiguration weights must sum to 100, got {total}."),
1467        ));
1468    }
1469    Ok(parsed)
1470}
1471
1472fn validate_name(name: &str) -> Result<(), AwsServiceError> {
1473    if name.is_empty() || name.len() > 80 {
1474        return Err(AwsServiceError::aws_error(
1475            StatusCode::BAD_REQUEST,
1476            "InvalidName",
1477            format!("Invalid Name: '{name}' (length must be between 1 and 80 characters)"),
1478        ));
1479    }
1480    // Only allow alphanumeric, hyphens, and underscores
1481    if !name
1482        .chars()
1483        .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
1484    {
1485        return Err(AwsServiceError::aws_error(
1486            StatusCode::BAD_REQUEST,
1487            "InvalidName",
1488            format!(
1489                "Invalid Name: '{name}' (must only contain alphanumeric characters, hyphens, and underscores)"
1490            ),
1491        ));
1492    }
1493    Ok(())
1494}
1495
1496fn validate_definition(definition: &str) -> Result<(), AwsServiceError> {
1497    let parsed: Value = serde_json::from_str(definition).map_err(|e| {
1498        AwsServiceError::aws_error(
1499            StatusCode::BAD_REQUEST,
1500            "InvalidDefinition",
1501            format!("Invalid State Machine Definition: '{e}'"),
1502        )
1503    })?;
1504
1505    if parsed.get("StartAt").and_then(|v| v.as_str()).is_none() {
1506        return Err(AwsServiceError::aws_error(
1507            StatusCode::BAD_REQUEST,
1508            "InvalidDefinition",
1509            "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
1510                .to_string(),
1511        ));
1512    }
1513
1514    let states_obj = parsed
1515        .get("States")
1516        .and_then(|v| v.as_object())
1517        .ok_or_else(|| {
1518            AwsServiceError::aws_error(
1519                StatusCode::BAD_REQUEST,
1520                "InvalidDefinition",
1521                "Invalid State Machine Definition: 'MISSING_STATES' (States field is required)"
1522                    .to_string(),
1523            )
1524        })?;
1525
1526    let start_at = parsed["StartAt"].as_str().ok_or_else(|| {
1527        AwsServiceError::aws_error(
1528            StatusCode::BAD_REQUEST,
1529            "InvalidDefinition",
1530            "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
1531                .to_string(),
1532        )
1533    })?;
1534    if !states_obj.contains_key(start_at) {
1535        return Err(AwsServiceError::aws_error(
1536            StatusCode::BAD_REQUEST,
1537            "InvalidDefinition",
1538            format!(
1539                "Invalid State Machine Definition: 'MISSING_TRANSITION_TARGET' \
1540                 (StartAt '{start_at}' does not reference a valid state)"
1541            ),
1542        ));
1543    }
1544
1545    Ok(())
1546}
1547
1548fn execution_not_found(arn: &str) -> AwsServiceError {
1549    AwsServiceError::aws_error(
1550        StatusCode::BAD_REQUEST,
1551        "ExecutionDoesNotExist",
1552        format!("Execution Does Not Exist: '{arn}'"),
1553    )
1554}
1555
1556fn execution_to_json(exec: &Execution) -> Value {
1557    let mut resp = json!({
1558        "executionArn": exec.execution_arn,
1559        "stateMachineArn": exec.state_machine_arn,
1560        "name": exec.name,
1561        "status": exec.status.as_str(),
1562        "startDate": exec.start_date.timestamp() as f64,
1563    });
1564
1565    if let Some(ref input) = exec.input {
1566        resp["input"] = json!(input);
1567    }
1568    if let Some(ref output) = exec.output {
1569        resp["output"] = json!(output);
1570    }
1571    if let Some(stop) = exec.stop_date {
1572        resp["stopDate"] = json!(stop.timestamp() as f64);
1573    }
1574    if let Some(ref error) = exec.error {
1575        resp["error"] = json!(error);
1576    }
1577    if let Some(ref cause) = exec.cause {
1578        resp["cause"] = json!(cause);
1579    }
1580
1581    resp
1582}
1583
1584/// Convert event type like "PassStateEntered" to the details key format "passStateEntered".
1585fn camel_to_details_key(event_type: &str) -> String {
1586    let mut chars = event_type.chars();
1587    match chars.next() {
1588        None => String::new(),
1589        Some(c) => c.to_lowercase().to_string() + chars.as_str(),
1590    }
1591}
1592
1593fn validate_arn(arn: &str) -> Result<(), AwsServiceError> {
1594    if !arn.starts_with("arn:") {
1595        return Err(AwsServiceError::aws_error(
1596            StatusCode::BAD_REQUEST,
1597            "InvalidArn",
1598            format!("Invalid Arn: '{arn}'"),
1599        ));
1600    }
1601    Ok(())
1602}
1603
1604/// Start a Step Functions execution from a cross-service delivery (e.g. EventBridge).
1605///
1606/// This is the public entry point used by `StepFunctionsDeliveryImpl` in the server crate.
1607/// It mirrors the logic from `StartExecution` but without the AWS request/response wrapper.
1608/// Start a Step Functions execution from a cross-service delivery (e.g. EventBridge).
1609///
1610/// This is the public entry point used by `StepFunctionsDeliveryImpl` in the server crate.
1611/// It mirrors the logic from `StartExecution` but without the AWS request/response wrapper.
1612pub fn start_execution_from_delivery(
1613    state: &SharedStepFunctionsState,
1614    delivery: &Option<Arc<DeliveryBus>>,
1615    dynamodb_state: &Option<SharedDynamoDbState>,
1616    state_machine_arn: &str,
1617    input: &str,
1618) {
1619    // Validate input is valid JSON
1620    if serde_json::from_str::<serde_json::Value>(input).is_err() {
1621        tracing::warn!(
1622            state_machine_arn,
1623            "Step Functions delivery: invalid JSON input, skipping execution"
1624        );
1625        return;
1626    }
1627
1628    let execution_name = uuid::Uuid::new_v4().to_string();
1629
1630    // Extract account_id from the state machine ARN
1631    let account_id = state_machine_arn
1632        .split(':')
1633        .nth(4)
1634        .unwrap_or("000000000000")
1635        .to_string();
1636
1637    let mut accounts = state.write();
1638    let st = accounts.get_or_create(&account_id);
1639    let sm = match st.state_machines.get(state_machine_arn) {
1640        Some(sm) => sm,
1641        None => {
1642            tracing::warn!(
1643                state_machine_arn,
1644                "Step Functions delivery: state machine not found"
1645            );
1646            return;
1647        }
1648    };
1649
1650    let sm_name = sm.name.clone();
1651    let definition = sm.definition.clone();
1652    let exec_arn = st.execution_arn(&sm_name, &execution_name);
1653
1654    let now = Utc::now();
1655    let execution = Execution {
1656        execution_arn: exec_arn.clone(),
1657        state_machine_arn: state_machine_arn.to_string(),
1658        state_machine_name: sm_name,
1659        name: execution_name,
1660        status: ExecutionStatus::Running,
1661        input: Some(input.to_string()),
1662        output: None,
1663        start_date: now,
1664        stop_date: None,
1665        error: None,
1666        cause: None,
1667        history_events: vec![],
1668    };
1669
1670    st.executions.insert(exec_arn.clone(), execution);
1671    drop(accounts);
1672
1673    let shared_state = state.clone();
1674    let delivery = delivery.clone();
1675    let dynamodb_state = dynamodb_state.clone();
1676    let input = Some(input.to_string());
1677    tokio::spawn(async move {
1678        interpreter::execute_state_machine(
1679            shared_state,
1680            exec_arn,
1681            definition,
1682            input,
1683            delivery,
1684            dynamodb_state,
1685        )
1686        .await;
1687    });
1688}
1689
1690#[cfg(test)]
1691mod tests {
1692    use super::*;
1693    use http::{HeaderMap, Method};
1694    use parking_lot::RwLock;
1695    use serde_json::Value;
1696    use std::sync::Arc;
1697
1698    fn make_state() -> SharedStepFunctionsState {
1699        Arc::new(RwLock::new(
1700            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
1701        ))
1702    }
1703
1704    fn make_request(action: &str, body: &str) -> AwsRequest {
1705        AwsRequest {
1706            service: "states".to_string(),
1707            action: action.to_string(),
1708            region: "us-east-1".to_string(),
1709            account_id: "123456789012".to_string(),
1710            request_id: "test-id".to_string(),
1711            headers: HeaderMap::new(),
1712            query_params: HashMap::new(),
1713            body: body.as_bytes().to_vec().into(),
1714            path_segments: vec![],
1715            raw_path: "/".to_string(),
1716            raw_query: String::new(),
1717            method: Method::POST,
1718            is_query_protocol: false,
1719            access_key_id: None,
1720            principal: None,
1721        }
1722    }
1723
1724    fn body_json(resp: &AwsResponse) -> Value {
1725        serde_json::from_slice(resp.body.expect_bytes()).unwrap()
1726    }
1727
1728    fn expect_err(result: Result<AwsResponse, AwsServiceError>) -> AwsServiceError {
1729        match result {
1730            Err(e) => e,
1731            Ok(_) => panic!("expected error, got Ok"),
1732        }
1733    }
1734
1735    const VALID_DEF: &str = r#"{"StartAt":"Pass","States":{"Pass":{"Type":"Pass","End":true}}}"#;
1736
1737    fn create_sm(svc: &StepFunctionsService, name: &str) -> String {
1738        let body = json!({
1739            "name": name,
1740            "definition": VALID_DEF,
1741            "roleArn": "arn:aws:iam::123456789012:role/test",
1742        });
1743        let req = make_request("CreateStateMachine", &body.to_string());
1744        let resp = svc.create_state_machine(&req).unwrap();
1745        let b = body_json(&resp);
1746        b["stateMachineArn"].as_str().unwrap().to_string()
1747    }
1748
1749    // ── CreateStateMachine ──
1750
1751    #[test]
1752    fn create_state_machine_basic() {
1753        let svc = StepFunctionsService::new(make_state());
1754        let arn = create_sm(&svc, "test-sm");
1755        assert!(arn.contains("test-sm"));
1756    }
1757
1758    #[test]
1759    fn create_state_machine_with_express_type() {
1760        let svc = StepFunctionsService::new(make_state());
1761        let body = json!({
1762            "name": "express-sm",
1763            "definition": VALID_DEF,
1764            "roleArn": "arn:aws:iam::123456789012:role/r",
1765            "type": "EXPRESS",
1766        });
1767        let req = make_request("CreateStateMachine", &body.to_string());
1768        let resp = svc.create_state_machine(&req).unwrap();
1769        let b = body_json(&resp);
1770        assert!(b["stateMachineArn"].as_str().is_some());
1771    }
1772
1773    #[test]
1774    fn create_state_machine_duplicate_fails() {
1775        let svc = StepFunctionsService::new(make_state());
1776        create_sm(&svc, "dup-sm");
1777        let body = json!({
1778            "name": "dup-sm",
1779            "definition": VALID_DEF,
1780            "roleArn": "arn:aws:iam::123456789012:role/r",
1781        });
1782        let req = make_request("CreateStateMachine", &body.to_string());
1783        let err = expect_err(svc.create_state_machine(&req));
1784        assert!(err.to_string().contains("StateMachineAlreadyExists"));
1785    }
1786
1787    #[test]
1788    fn create_state_machine_missing_name() {
1789        let svc = StepFunctionsService::new(make_state());
1790        let body = json!({
1791            "definition": VALID_DEF,
1792            "roleArn": "arn:aws:iam::123456789012:role/r",
1793        });
1794        let req = make_request("CreateStateMachine", &body.to_string());
1795        assert!(svc.create_state_machine(&req).is_err());
1796    }
1797
1798    #[test]
1799    fn create_state_machine_invalid_definition() {
1800        let svc = StepFunctionsService::new(make_state());
1801        let body = json!({
1802            "name": "bad-def",
1803            "definition": "not json",
1804            "roleArn": "arn:aws:iam::123456789012:role/r",
1805        });
1806        let req = make_request("CreateStateMachine", &body.to_string());
1807        let err = expect_err(svc.create_state_machine(&req));
1808        assert!(err.to_string().contains("InvalidDefinition"));
1809    }
1810
1811    #[test]
1812    fn create_state_machine_definition_missing_start_at() {
1813        let svc = StepFunctionsService::new(make_state());
1814        let body = json!({
1815            "name": "no-start",
1816            "definition": r#"{"States":{"S":{"Type":"Pass","End":true}}}"#,
1817            "roleArn": "arn:aws:iam::123456789012:role/r",
1818        });
1819        let req = make_request("CreateStateMachine", &body.to_string());
1820        let err = expect_err(svc.create_state_machine(&req));
1821        assert!(err.to_string().contains("InvalidDefinition"));
1822    }
1823
1824    #[test]
1825    fn create_state_machine_definition_missing_states() {
1826        let svc = StepFunctionsService::new(make_state());
1827        let body = json!({
1828            "name": "no-states",
1829            "definition": r#"{"StartAt":"S"}"#,
1830            "roleArn": "arn:aws:iam::123456789012:role/r",
1831        });
1832        let req = make_request("CreateStateMachine", &body.to_string());
1833        let err = expect_err(svc.create_state_machine(&req));
1834        assert!(err.to_string().contains("InvalidDefinition"));
1835    }
1836
1837    #[test]
1838    fn create_state_machine_definition_start_at_not_in_states() {
1839        let svc = StepFunctionsService::new(make_state());
1840        let body = json!({
1841            "name": "bad-start",
1842            "definition": r#"{"StartAt":"Missing","States":{"S":{"Type":"Pass","End":true}}}"#,
1843            "roleArn": "arn:aws:iam::123456789012:role/r",
1844        });
1845        let req = make_request("CreateStateMachine", &body.to_string());
1846        let err = expect_err(svc.create_state_machine(&req));
1847        assert!(err.to_string().contains("MISSING_TRANSITION_TARGET"));
1848    }
1849
1850    #[test]
1851    fn create_state_machine_invalid_type() {
1852        let svc = StepFunctionsService::new(make_state());
1853        let body = json!({
1854            "name": "bad-type",
1855            "definition": VALID_DEF,
1856            "roleArn": "arn:aws:iam::123456789012:role/r",
1857            "type": "INVALID",
1858        });
1859        let req = make_request("CreateStateMachine", &body.to_string());
1860        assert!(svc.create_state_machine(&req).is_err());
1861    }
1862
1863    #[test]
1864    fn create_state_machine_invalid_arn() {
1865        let svc = StepFunctionsService::new(make_state());
1866        let body = json!({
1867            "name": "bad-arn",
1868            "definition": VALID_DEF,
1869            "roleArn": "not-an-arn",
1870        });
1871        let req = make_request("CreateStateMachine", &body.to_string());
1872        let err = expect_err(svc.create_state_machine(&req));
1873        assert!(err.to_string().contains("InvalidArn"));
1874    }
1875
1876    #[test]
1877    fn create_state_machine_invalid_name() {
1878        let svc = StepFunctionsService::new(make_state());
1879        let body = json!({
1880            "name": "has spaces!",
1881            "definition": VALID_DEF,
1882            "roleArn": "arn:aws:iam::123456789012:role/r",
1883        });
1884        let req = make_request("CreateStateMachine", &body.to_string());
1885        let err = expect_err(svc.create_state_machine(&req));
1886        assert!(err.to_string().contains("InvalidName"));
1887    }
1888
1889    #[test]
1890    fn create_state_machine_name_too_long() {
1891        let svc = StepFunctionsService::new(make_state());
1892        let long_name = "a".repeat(81);
1893        let body = json!({
1894            "name": long_name,
1895            "definition": VALID_DEF,
1896            "roleArn": "arn:aws:iam::123456789012:role/r",
1897        });
1898        let req = make_request("CreateStateMachine", &body.to_string());
1899        let err = expect_err(svc.create_state_machine(&req));
1900        assert!(err.to_string().contains("InvalidName"));
1901    }
1902
1903    // ── DescribeStateMachine ──
1904
1905    #[test]
1906    fn describe_state_machine_found() {
1907        let svc = StepFunctionsService::new(make_state());
1908        let arn = create_sm(&svc, "desc-sm");
1909
1910        let req = make_request(
1911            "DescribeStateMachine",
1912            &json!({"stateMachineArn": arn}).to_string(),
1913        );
1914        let resp = svc.describe_state_machine(&req).unwrap();
1915        let b = body_json(&resp);
1916        assert_eq!(b["name"], "desc-sm");
1917        assert_eq!(b["status"], "ACTIVE");
1918        assert!(b["definition"].as_str().is_some());
1919    }
1920
1921    #[test]
1922    fn describe_state_machine_not_found() {
1923        let svc = StepFunctionsService::new(make_state());
1924        let req = make_request(
1925            "DescribeStateMachine",
1926            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1927                .to_string(),
1928        );
1929        let err = expect_err(svc.describe_state_machine(&req));
1930        assert!(err.to_string().contains("StateMachineDoesNotExist"));
1931    }
1932
1933    // ── ListStateMachines ──
1934
1935    #[test]
1936    fn list_state_machines_empty() {
1937        let svc = StepFunctionsService::new(make_state());
1938        let req = make_request("ListStateMachines", "{}");
1939        let resp = svc.list_state_machines(&req).unwrap();
1940        let b = body_json(&resp);
1941        assert!(b["stateMachines"].as_array().unwrap().is_empty());
1942    }
1943
1944    #[test]
1945    fn list_state_machines_returns_created() {
1946        let svc = StepFunctionsService::new(make_state());
1947        create_sm(&svc, "sm-1");
1948        create_sm(&svc, "sm-2");
1949
1950        let req = make_request("ListStateMachines", "{}");
1951        let resp = svc.list_state_machines(&req).unwrap();
1952        let b = body_json(&resp);
1953        assert_eq!(b["stateMachines"].as_array().unwrap().len(), 2);
1954    }
1955
1956    // ── DeleteStateMachine ──
1957
1958    #[test]
1959    fn delete_state_machine() {
1960        let svc = StepFunctionsService::new(make_state());
1961        let arn = create_sm(&svc, "del-sm");
1962
1963        let req = make_request(
1964            "DeleteStateMachine",
1965            &json!({"stateMachineArn": arn}).to_string(),
1966        );
1967        svc.delete_state_machine(&req).unwrap();
1968
1969        // Describe should fail
1970        let req = make_request(
1971            "DescribeStateMachine",
1972            &json!({"stateMachineArn": arn}).to_string(),
1973        );
1974        assert!(svc.describe_state_machine(&req).is_err());
1975    }
1976
1977    #[test]
1978    fn delete_state_machine_nonexistent_succeeds() {
1979        let svc = StepFunctionsService::new(make_state());
1980        let req = make_request(
1981            "DeleteStateMachine",
1982            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1983                .to_string(),
1984        );
1985        // AWS returns success even for nonexistent
1986        svc.delete_state_machine(&req).unwrap();
1987    }
1988
1989    // ── UpdateStateMachine ──
1990
1991    #[test]
1992    fn update_state_machine() {
1993        let svc = StepFunctionsService::new(make_state());
1994        let arn = create_sm(&svc, "upd-sm");
1995
1996        let new_def = r#"{"StartAt":"NewPass","States":{"NewPass":{"Type":"Pass","End":true}}}"#;
1997        let body = json!({
1998            "stateMachineArn": arn,
1999            "definition": new_def,
2000            "description": "updated",
2001        });
2002        let req = make_request("UpdateStateMachine", &body.to_string());
2003        let resp = svc.update_state_machine(&req).unwrap();
2004        let b = body_json(&resp);
2005        assert!(b["updateDate"].as_f64().is_some());
2006
2007        // Verify
2008        let req = make_request(
2009            "DescribeStateMachine",
2010            &json!({"stateMachineArn": arn}).to_string(),
2011        );
2012        let resp = svc.describe_state_machine(&req).unwrap();
2013        let b = body_json(&resp);
2014        assert!(b["definition"].as_str().unwrap().contains("NewPass"));
2015        assert_eq!(b["description"], "updated");
2016    }
2017
2018    #[test]
2019    fn update_state_machine_not_found() {
2020        let svc = StepFunctionsService::new(make_state());
2021        let body = json!({
2022            "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
2023            "definition": VALID_DEF,
2024        });
2025        let req = make_request("UpdateStateMachine", &body.to_string());
2026        let err = expect_err(svc.update_state_machine(&req));
2027        assert!(err.to_string().contains("StateMachineDoesNotExist"));
2028    }
2029
2030    // ── StartExecution ──
2031
2032    #[tokio::test]
2033    async fn start_execution_basic() {
2034        let svc = StepFunctionsService::new(make_state());
2035        let arn = create_sm(&svc, "exec-sm");
2036
2037        let body = json!({
2038            "stateMachineArn": arn,
2039            "input": r#"{"key":"value"}"#,
2040        });
2041        let req = make_request("StartExecution", &body.to_string());
2042        let resp = svc.start_execution(&req).unwrap();
2043        let b = body_json(&resp);
2044        assert!(b["executionArn"].as_str().is_some());
2045        assert!(b["startDate"].as_f64().is_some());
2046    }
2047
2048    #[tokio::test]
2049    async fn start_execution_with_name() {
2050        let svc = StepFunctionsService::new(make_state());
2051        let arn = create_sm(&svc, "named-exec");
2052
2053        let body = json!({
2054            "stateMachineArn": arn,
2055            "name": "my-execution",
2056        });
2057        let req = make_request("StartExecution", &body.to_string());
2058        let resp = svc.start_execution(&req).unwrap();
2059        let b = body_json(&resp);
2060        assert!(b["executionArn"].as_str().unwrap().contains("my-execution"));
2061    }
2062
2063    #[tokio::test]
2064    async fn start_execution_sm_not_found() {
2065        let svc = StepFunctionsService::new(make_state());
2066        let body = json!({
2067            "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
2068        });
2069        let req = make_request("StartExecution", &body.to_string());
2070        let err = expect_err(svc.start_execution(&req));
2071        assert!(err.to_string().contains("StateMachineDoesNotExist"));
2072    }
2073
2074    #[tokio::test]
2075    async fn start_execution_invalid_input() {
2076        let svc = StepFunctionsService::new(make_state());
2077        let arn = create_sm(&svc, "bad-input");
2078
2079        let body = json!({
2080            "stateMachineArn": arn,
2081            "input": "not json",
2082        });
2083        let req = make_request("StartExecution", &body.to_string());
2084        let err = expect_err(svc.start_execution(&req));
2085        assert!(err.to_string().contains("InvalidExecutionInput"));
2086    }
2087
2088    #[tokio::test]
2089    async fn start_execution_duplicate_name() {
2090        let svc = StepFunctionsService::new(make_state());
2091        let arn = create_sm(&svc, "dup-exec");
2092
2093        let body = json!({
2094            "stateMachineArn": arn,
2095            "name": "same-name",
2096        });
2097        let req = make_request("StartExecution", &body.to_string());
2098        svc.start_execution(&req).unwrap();
2099
2100        let req = make_request("StartExecution", &body.to_string());
2101        let err = expect_err(svc.start_execution(&req));
2102        assert!(err.to_string().contains("ExecutionAlreadyExists"));
2103    }
2104
2105    // ── DescribeExecution ──
2106
2107    #[tokio::test]
2108    async fn describe_execution_found() {
2109        let svc = StepFunctionsService::new(make_state());
2110        let sm_arn = create_sm(&svc, "desc-exec");
2111
2112        let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
2113        let req = make_request("StartExecution", &body.to_string());
2114        let resp = svc.start_execution(&req).unwrap();
2115        let exec_arn = body_json(&resp)["executionArn"]
2116            .as_str()
2117            .unwrap()
2118            .to_string();
2119
2120        let req = make_request(
2121            "DescribeExecution",
2122            &json!({"executionArn": exec_arn}).to_string(),
2123        );
2124        let resp = svc.describe_execution(&req).unwrap();
2125        let b = body_json(&resp);
2126        assert_eq!(b["name"], "e1");
2127        assert_eq!(b["status"], "RUNNING");
2128    }
2129
2130    #[tokio::test]
2131    async fn describe_execution_not_found() {
2132        let svc = StepFunctionsService::new(make_state());
2133        let req = make_request(
2134            "DescribeExecution",
2135            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
2136                .to_string(),
2137        );
2138        let err = expect_err(svc.describe_execution(&req));
2139        assert!(err.to_string().contains("ExecutionDoesNotExist"));
2140    }
2141
2142    // ── StopExecution ──
2143
2144    #[tokio::test]
2145    async fn stop_execution() {
2146        let svc = StepFunctionsService::new(make_state());
2147        let sm_arn = create_sm(&svc, "stop-sm");
2148
2149        let body = json!({"stateMachineArn": sm_arn, "name": "stop-e"});
2150        let req = make_request("StartExecution", &body.to_string());
2151        let resp = svc.start_execution(&req).unwrap();
2152        let exec_arn = body_json(&resp)["executionArn"]
2153            .as_str()
2154            .unwrap()
2155            .to_string();
2156
2157        let body = json!({
2158            "executionArn": exec_arn,
2159            "error": "UserAborted",
2160            "cause": "test stop",
2161        });
2162        let req = make_request("StopExecution", &body.to_string());
2163        let resp = svc.stop_execution(&req).unwrap();
2164        let b = body_json(&resp);
2165        assert!(b["stopDate"].as_f64().is_some());
2166
2167        // Verify aborted
2168        let req = make_request(
2169            "DescribeExecution",
2170            &json!({"executionArn": exec_arn}).to_string(),
2171        );
2172        let resp = svc.describe_execution(&req).unwrap();
2173        let b = body_json(&resp);
2174        assert_eq!(b["status"], "ABORTED");
2175        assert_eq!(b["error"], "UserAborted");
2176    }
2177
2178    #[tokio::test]
2179    async fn stop_execution_not_found() {
2180        let svc = StepFunctionsService::new(make_state());
2181        let req = make_request(
2182            "StopExecution",
2183            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
2184                .to_string(),
2185        );
2186        let err = expect_err(svc.stop_execution(&req));
2187        assert!(err.to_string().contains("ExecutionDoesNotExist"));
2188    }
2189
2190    // ── ListExecutions ──
2191
2192    #[tokio::test]
2193    async fn list_executions() {
2194        let svc = StepFunctionsService::new(make_state());
2195        let sm_arn = create_sm(&svc, "list-exec");
2196
2197        for i in 0..3 {
2198            let body = json!({"stateMachineArn": sm_arn, "name": format!("e{i}")});
2199            let req = make_request("StartExecution", &body.to_string());
2200            svc.start_execution(&req).unwrap();
2201        }
2202
2203        let req = make_request(
2204            "ListExecutions",
2205            &json!({"stateMachineArn": sm_arn}).to_string(),
2206        );
2207        let resp = svc.list_executions(&req).unwrap();
2208        let b = body_json(&resp);
2209        assert_eq!(b["executions"].as_array().unwrap().len(), 3);
2210    }
2211
2212    #[tokio::test]
2213    async fn list_executions_sm_not_found() {
2214        let svc = StepFunctionsService::new(make_state());
2215        let req = make_request(
2216            "ListExecutions",
2217            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
2218                .to_string(),
2219        );
2220        let err = expect_err(svc.list_executions(&req));
2221        assert!(err.to_string().contains("StateMachineDoesNotExist"));
2222    }
2223
2224    // ── GetExecutionHistory ──
2225
2226    #[tokio::test]
2227    async fn get_execution_history_not_found() {
2228        let svc = StepFunctionsService::new(make_state());
2229        let req = make_request(
2230            "GetExecutionHistory",
2231            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
2232                .to_string(),
2233        );
2234        let err = expect_err(svc.get_execution_history(&req));
2235        assert!(err.to_string().contains("ExecutionDoesNotExist"));
2236    }
2237
2238    // ── DescribeStateMachineForExecution ──
2239
2240    #[tokio::test]
2241    async fn describe_sm_for_execution() {
2242        let svc = StepFunctionsService::new(make_state());
2243        let sm_arn = create_sm(&svc, "sm-for-exec");
2244
2245        let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
2246        let req = make_request("StartExecution", &body.to_string());
2247        let resp = svc.start_execution(&req).unwrap();
2248        let exec_arn = body_json(&resp)["executionArn"]
2249            .as_str()
2250            .unwrap()
2251            .to_string();
2252
2253        let req = make_request(
2254            "DescribeStateMachineForExecution",
2255            &json!({"executionArn": exec_arn}).to_string(),
2256        );
2257        let resp = svc.describe_state_machine_for_execution(&req).unwrap();
2258        let b = body_json(&resp);
2259        assert_eq!(b["name"], "sm-for-exec");
2260    }
2261
2262    // ── Tags ──
2263
2264    #[test]
2265    fn tag_untag_list_tags() {
2266        let svc = StepFunctionsService::new(make_state());
2267        let arn = create_sm(&svc, "tagged-sm");
2268
2269        // Tag
2270        let body = json!({
2271            "resourceArn": arn,
2272            "tags": [{"key": "env", "value": "prod"}],
2273        });
2274        let req = make_request("TagResource", &body.to_string());
2275        svc.tag_resource(&req).unwrap();
2276
2277        // List
2278        let req = make_request(
2279            "ListTagsForResource",
2280            &json!({"resourceArn": arn}).to_string(),
2281        );
2282        let resp = svc.list_tags_for_resource(&req).unwrap();
2283        let b = body_json(&resp);
2284        let tags = b["tags"].as_array().unwrap();
2285        assert_eq!(tags.len(), 1);
2286        assert_eq!(tags[0]["key"], "env");
2287
2288        // Untag
2289        let body = json!({
2290            "resourceArn": arn,
2291            "tagKeys": ["env"],
2292        });
2293        let req = make_request("UntagResource", &body.to_string());
2294        svc.untag_resource(&req).unwrap();
2295
2296        // Verify empty
2297        let req = make_request(
2298            "ListTagsForResource",
2299            &json!({"resourceArn": arn}).to_string(),
2300        );
2301        let resp = svc.list_tags_for_resource(&req).unwrap();
2302        let b = body_json(&resp);
2303        assert!(b["tags"].as_array().unwrap().is_empty());
2304    }
2305
2306    #[test]
2307    fn tag_resource_not_found() {
2308        let svc = StepFunctionsService::new(make_state());
2309        let body = json!({
2310            "resourceArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
2311            "tags": [{"key": "k", "value": "v"}],
2312        });
2313        let req = make_request("TagResource", &body.to_string());
2314        let err = expect_err(svc.tag_resource(&req));
2315        assert!(err.to_string().contains("ResourceNotFound"));
2316    }
2317
2318    // ── Helper function tests ──
2319
2320    #[test]
2321    fn test_validate_name() {
2322        assert!(validate_name("valid-name").is_ok());
2323        assert!(validate_name("under_score").is_ok());
2324        assert!(validate_name("").is_err());
2325        assert!(validate_name("has spaces").is_err());
2326        assert!(validate_name(&"a".repeat(81)).is_err());
2327    }
2328
2329    #[test]
2330    fn test_validate_definition() {
2331        assert!(validate_definition(VALID_DEF).is_ok());
2332        assert!(validate_definition("not json").is_err());
2333        assert!(validate_definition(r#"{"States":{}}"#).is_err()); // missing StartAt
2334        assert!(validate_definition(r#"{"StartAt":"S"}"#).is_err()); // missing States
2335    }
2336
2337    #[test]
2338    fn test_validate_arn() {
2339        assert!(validate_arn("arn:aws:states:us-east-1:123:sm:test").is_ok());
2340        assert!(validate_arn("not-an-arn").is_err());
2341    }
2342
2343    #[test]
2344    fn test_camel_to_details_key() {
2345        assert_eq!(camel_to_details_key("PassStateEntered"), "passStateEntered");
2346        assert_eq!(camel_to_details_key(""), "");
2347    }
2348
2349    #[test]
2350    fn test_is_mutating_action() {
2351        assert!(is_mutating_action("CreateStateMachine"));
2352        assert!(is_mutating_action("StartExecution"));
2353        assert!(!is_mutating_action("DescribeStateMachine"));
2354        assert!(!is_mutating_action("ListStateMachines"));
2355    }
2356}