Skip to main content

fakecloud_stepfunctions/
service.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use chrono::Utc;
6use http::StatusCode;
7use serde_json::{json, Value};
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::pagination::paginate;
12use fakecloud_core::registry::ServiceRegistry;
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_core::validation::*;
15use fakecloud_dynamodb::SharedDynamoDbState;
16use fakecloud_persistence::SnapshotStore;
17
18use crate::interpreter;
19use crate::state::{
20    Execution, ExecutionStatus, SharedStepFunctionsState, StateMachine, StateMachineStatus,
21    StateMachineType, StepFunctionsSnapshot, StepFunctionsState,
22    STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
23};
24
25const SUPPORTED: &[&str] = &[
26    "CreateStateMachine",
27    "DescribeStateMachine",
28    "ListStateMachines",
29    "DeleteStateMachine",
30    "UpdateStateMachine",
31    "TagResource",
32    "UntagResource",
33    "ListTagsForResource",
34    "StartExecution",
35    "StopExecution",
36    "DescribeExecution",
37    "ListExecutions",
38    "GetExecutionHistory",
39    "DescribeStateMachineForExecution",
40    "CreateActivity",
41    "DeleteActivity",
42    "DescribeActivity",
43    "ListActivities",
44    "GetActivityTask",
45    "SendTaskFailure",
46    "SendTaskHeartbeat",
47    "SendTaskSuccess",
48    "PublishStateMachineVersion",
49    "DeleteStateMachineVersion",
50    "ListStateMachineVersions",
51    "CreateStateMachineAlias",
52    "DeleteStateMachineAlias",
53    "DescribeStateMachineAlias",
54    "ListStateMachineAliases",
55    "UpdateStateMachineAlias",
56    "DescribeMapRun",
57    "ListMapRuns",
58    "UpdateMapRun",
59    "RedriveExecution",
60    "StartSyncExecution",
61    "TestState",
62    "ValidateStateMachineDefinition",
63];
64
65/// Handle to the central service registry, set by `main.rs` after every service
66/// has been registered. Wrapped in `OnceLock` so `StepFunctionsService` can be
67/// constructed (and registered into the very registry it later reads back) before
68/// the registry itself is finalized. The interpreter snapshots the inner `Arc`
69/// when it needs to dispatch generic `aws-sdk:*` Task integrations.
70pub type SharedServiceRegistry = Arc<std::sync::OnceLock<Arc<ServiceRegistry>>>;
71
72pub struct StepFunctionsService {
73    state: SharedStepFunctionsState,
74    delivery: Option<Arc<DeliveryBus>>,
75    dynamodb_state: Option<SharedDynamoDbState>,
76    registry: Option<SharedServiceRegistry>,
77    snapshot_store: Option<Arc<dyn SnapshotStore>>,
78    snapshot_lock: Arc<AsyncMutex<()>>,
79}
80
81impl StepFunctionsService {
82    pub fn new(state: SharedStepFunctionsState) -> Self {
83        Self {
84            state,
85            delivery: None,
86            dynamodb_state: None,
87            registry: None,
88            snapshot_store: None,
89            snapshot_lock: Arc::new(AsyncMutex::new(())),
90        }
91    }
92
93    pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
94        self.delivery = Some(delivery);
95        self
96    }
97
98    pub fn with_dynamodb(mut self, dynamodb_state: SharedDynamoDbState) -> Self {
99        self.dynamodb_state = Some(dynamodb_state);
100        self
101    }
102
103    /// Hand the service a deferred-fill handle to the central [`ServiceRegistry`].
104    /// `main.rs` calls [`OnceLock::set`] on the inner cell after every service
105    /// has been registered; until then the interpreter falls back to its
106    /// hand-coded SDK integrations (lambda invoke, sqs sendMessage, …).
107    pub fn with_registry(mut self, registry: SharedServiceRegistry) -> Self {
108        self.registry = Some(registry);
109        self
110    }
111
112    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
113        self.snapshot_store = Some(store);
114        self
115    }
116
117    async fn save_snapshot(&self) {
118        let Some(store) = self.snapshot_store.clone() else {
119            return;
120        };
121        let _guard = self.snapshot_lock.lock().await;
122        let snapshot = StepFunctionsSnapshot {
123            schema_version: STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
124            state: None,
125            accounts: Some(self.state.read().clone()),
126        };
127        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
128            let bytes = serde_json::to_vec(&snapshot)
129                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
130            store.save(&bytes)
131        })
132        .await;
133        match join {
134            Ok(Ok(())) => {}
135            Ok(Err(err)) => tracing::error!(%err, "failed to write stepfunctions snapshot"),
136            Err(err) => tracing::error!(%err, "stepfunctions snapshot task panicked"),
137        }
138    }
139}
140
141fn is_mutating_action(action: &str) -> bool {
142    matches!(
143        action,
144        "CreateStateMachine"
145            | "DeleteStateMachine"
146            | "UpdateStateMachine"
147            | "TagResource"
148            | "UntagResource"
149            | "StartExecution"
150            | "StopExecution"
151            | "CreateActivity"
152            | "DeleteActivity"
153            | "GetActivityTask"
154            | "SendTaskFailure"
155            | "SendTaskHeartbeat"
156            | "SendTaskSuccess"
157            | "PublishStateMachineVersion"
158            | "DeleteStateMachineVersion"
159            | "CreateStateMachineAlias"
160            | "DeleteStateMachineAlias"
161            | "UpdateStateMachineAlias"
162            | "UpdateMapRun"
163            | "RedriveExecution"
164            | "StartSyncExecution"
165    )
166}
167
168#[async_trait]
169impl AwsService for StepFunctionsService {
170    fn service_name(&self) -> &str {
171        "states"
172    }
173
174    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
175        let mutates = is_mutating_action(req.action.as_str());
176        let result = match req.action.as_str() {
177            "CreateStateMachine" => self.create_state_machine(&req),
178            "DescribeStateMachine" => self.describe_state_machine(&req),
179            "ListStateMachines" => self.list_state_machines(&req),
180            "DeleteStateMachine" => self.delete_state_machine(&req),
181            "UpdateStateMachine" => self.update_state_machine(&req),
182            "TagResource" => self.tag_resource(&req),
183            "UntagResource" => self.untag_resource(&req),
184            "ListTagsForResource" => self.list_tags_for_resource(&req),
185            "StartExecution" => self.start_execution(&req),
186            "StopExecution" => self.stop_execution(&req),
187            "DescribeExecution" => self.describe_execution(&req),
188            "ListExecutions" => self.list_executions(&req),
189            "GetExecutionHistory" => self.get_execution_history(&req),
190            "DescribeStateMachineForExecution" => self.describe_state_machine_for_execution(&req),
191            "CreateActivity" => self.create_activity(&req),
192            "DeleteActivity" => self.delete_activity(&req),
193            "DescribeActivity" => self.describe_activity(&req),
194            "ListActivities" => self.list_activities(&req),
195            "GetActivityTask" => self.get_activity_task(&req).await,
196            "SendTaskFailure" => self.send_task_failure(&req),
197            "SendTaskHeartbeat" => self.send_task_heartbeat(&req),
198            "SendTaskSuccess" => self.send_task_success(&req),
199            "PublishStateMachineVersion" => self.publish_state_machine_version(&req),
200            "DeleteStateMachineVersion" => self.delete_state_machine_version(&req),
201            "ListStateMachineVersions" => self.list_state_machine_versions(&req),
202            "CreateStateMachineAlias" => self.create_state_machine_alias(&req),
203            "DeleteStateMachineAlias" => self.delete_state_machine_alias(&req),
204            "DescribeStateMachineAlias" => self.describe_state_machine_alias(&req),
205            "ListStateMachineAliases" => self.list_state_machine_aliases(&req),
206            "UpdateStateMachineAlias" => self.update_state_machine_alias(&req),
207            "DescribeMapRun" => self.describe_map_run(&req),
208            "ListMapRuns" => self.list_map_runs(&req),
209            "UpdateMapRun" => self.update_map_run(&req),
210            "RedriveExecution" => self.redrive_execution(&req),
211            "StartSyncExecution" => self.start_sync_execution(&req).await,
212            "TestState" => self.test_state(&req),
213            "ValidateStateMachineDefinition" => self.validate_state_machine_definition(&req),
214            _ => Err(AwsServiceError::action_not_implemented(
215                "states",
216                &req.action,
217            )),
218        };
219        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
220            self.save_snapshot().await;
221        }
222        result
223    }
224
225    fn supported_actions(&self) -> &[&str] {
226        SUPPORTED
227    }
228}
229
230impl StepFunctionsService {
231    // ─── State Machine CRUD ─────────────────────────────────────────────
232
233    fn create_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
234        let body = req.json_body();
235
236        validate_required("name", &body["name"])?;
237        let name = body["name"].as_str().ok_or_else(|| missing("name"))?;
238        validate_name(name)?;
239
240        validate_required("definition", &body["definition"])?;
241        let definition = body["definition"]
242            .as_str()
243            .ok_or_else(|| missing("definition"))?;
244        validate_definition(definition)?;
245
246        validate_required("roleArn", &body["roleArn"])?;
247        let role_arn = body["roleArn"].as_str().ok_or_else(|| missing("roleArn"))?;
248        validate_arn(role_arn)?;
249
250        let machine_type = if let Some(t) = body["type"].as_str() {
251            StateMachineType::parse(t).ok_or_else(|| {
252                AwsServiceError::aws_error(
253                    StatusCode::BAD_REQUEST,
254                    "ValidationException",
255                    format!(
256                        "Value '{t}' at 'type' failed to satisfy constraint: \
257                         Member must satisfy enum value set: [STANDARD, EXPRESS]"
258                    ),
259                )
260            })?
261        } else {
262            StateMachineType::Standard
263        };
264
265        let mut accounts = self.state.write();
266        let state = accounts.get_or_create(&req.account_id);
267        let arn = state.state_machine_arn(name);
268
269        // Check if name already exists
270        if state.state_machines.values().any(|sm| sm.name == name) {
271            return Err(AwsServiceError::aws_error(
272                StatusCode::CONFLICT,
273                "StateMachineAlreadyExists",
274                format!("State Machine Already Exists: '{arn}'"),
275            ));
276        }
277
278        let now = Utc::now();
279        let revision_id = uuid::Uuid::new_v4().to_string();
280
281        let mut tags = BTreeMap::new();
282        if !body["tags"].is_null() {
283            fakecloud_core::tags::apply_tags(&mut tags, &body, "tags", "key", "value").map_err(
284                |f| {
285                    AwsServiceError::aws_error(
286                        StatusCode::BAD_REQUEST,
287                        "ValidationException",
288                        format!("{f} must be a list"),
289                    )
290                },
291            )?;
292        }
293
294        let sm = StateMachine {
295            name: name.to_string(),
296            arn: arn.clone(),
297            definition: definition.to_string(),
298            role_arn: role_arn.to_string(),
299            machine_type,
300            status: StateMachineStatus::Active,
301            creation_date: now,
302            update_date: now,
303            tags,
304            revision_id: revision_id.clone(),
305            logging_configuration: body.get("loggingConfiguration").cloned(),
306            tracing_configuration: body.get("tracingConfiguration").cloned(),
307            description: body["description"].as_str().unwrap_or("").to_string(),
308        };
309
310        state.state_machines.insert(arn.clone(), sm);
311
312        Ok(AwsResponse::ok_json(json!({
313            "stateMachineArn": arn,
314            "creationDate": now.timestamp() as f64,
315            "stateMachineVersionArn": arn,
316        })))
317    }
318
319    fn describe_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
320        let body = req.json_body();
321        validate_required("stateMachineArn", &body["stateMachineArn"])?;
322        let arn = body["stateMachineArn"]
323            .as_str()
324            .ok_or_else(|| missing("stateMachineArn"))?;
325        validate_arn(arn)?;
326
327        let accounts = self.state.read();
328        let empty = StepFunctionsState::new(&req.account_id, &req.region);
329        let state = accounts.get(&req.account_id).unwrap_or(&empty);
330        let sm = state
331            .state_machines
332            .get(arn)
333            .ok_or_else(|| state_machine_not_found(arn))?;
334
335        Ok(AwsResponse::ok_json(state_machine_to_json(sm)))
336    }
337
338    fn list_state_machines(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
339        let body = req.json_body();
340        let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
341        validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
342        let next_token = body["nextToken"].as_str();
343
344        let accounts = self.state.read();
345        let empty = StepFunctionsState::new(&req.account_id, &req.region);
346        let state = accounts.get(&req.account_id).unwrap_or(&empty);
347        let mut machines: Vec<&StateMachine> = state.state_machines.values().collect();
348        machines.sort_by(|a, b| a.name.cmp(&b.name));
349
350        let items: Vec<Value> = machines
351            .iter()
352            .map(|sm| {
353                json!({
354                    "name": sm.name,
355                    "stateMachineArn": sm.arn,
356                    "type": sm.machine_type.as_str(),
357                    "creationDate": sm.creation_date.timestamp() as f64,
358                })
359            })
360            .collect();
361
362        let (page, token) = paginate(&items, next_token, max_results);
363
364        let mut resp = json!({ "stateMachines": page });
365        if let Some(t) = token {
366            resp["nextToken"] = json!(t);
367        }
368        Ok(AwsResponse::ok_json(resp))
369    }
370
371    fn delete_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
372        let body = req.json_body();
373        validate_required("stateMachineArn", &body["stateMachineArn"])?;
374        let arn = body["stateMachineArn"]
375            .as_str()
376            .ok_or_else(|| missing("stateMachineArn"))?;
377        validate_arn(arn)?;
378
379        let mut accounts = self.state.write();
380        let state = accounts.get_or_create(&req.account_id);
381        // AWS returns success even if it doesn't exist
382        state.state_machines.remove(arn);
383
384        Ok(AwsResponse::ok_json(json!({})))
385    }
386
387    fn update_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
388        let body = req.json_body();
389        validate_required("stateMachineArn", &body["stateMachineArn"])?;
390        let arn = body["stateMachineArn"]
391            .as_str()
392            .ok_or_else(|| missing("stateMachineArn"))?;
393        validate_arn(arn)?;
394
395        let mut accounts = self.state.write();
396        let state = accounts.get_or_create(&req.account_id);
397        let sm = state
398            .state_machines
399            .get_mut(arn)
400            .ok_or_else(|| state_machine_not_found(arn))?;
401
402        if let Some(definition) = body["definition"].as_str() {
403            validate_definition(definition)?;
404            sm.definition = definition.to_string();
405        }
406
407        if let Some(role_arn) = body["roleArn"].as_str() {
408            validate_arn(role_arn)?;
409            sm.role_arn = role_arn.to_string();
410        }
411
412        if let Some(logging) = body.get("loggingConfiguration") {
413            sm.logging_configuration = Some(logging.clone());
414        }
415
416        if let Some(tracing) = body.get("tracingConfiguration") {
417            sm.tracing_configuration = Some(tracing.clone());
418        }
419
420        if let Some(description) = body["description"].as_str() {
421            sm.description = description.to_string();
422        }
423
424        let now = Utc::now();
425        sm.update_date = now;
426        sm.revision_id = uuid::Uuid::new_v4().to_string();
427
428        let revision_id = sm.revision_id.clone();
429        let sm_arn = sm.arn.clone();
430
431        Ok(AwsResponse::ok_json(json!({
432            "updateDate": now.timestamp() as f64,
433            "revisionId": revision_id,
434            "stateMachineVersionArn": sm_arn,
435        })))
436    }
437
438    // ─── Execution Lifecycle ──────────────────────────────────────────
439
440    fn start_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
441        let body = req.json_body();
442        validate_required("stateMachineArn", &body["stateMachineArn"])?;
443        let sm_arn = body["stateMachineArn"]
444            .as_str()
445            .ok_or_else(|| missing("stateMachineArn"))?;
446        validate_arn(sm_arn)?;
447
448        let input = body["input"].as_str().map(|s| s.to_string());
449
450        // Validate input is valid JSON if provided
451        if let Some(ref input_str) = input {
452            let _: serde_json::Value = serde_json::from_str(input_str).map_err(|_| {
453                AwsServiceError::aws_error(
454                    StatusCode::BAD_REQUEST,
455                    "InvalidExecutionInput",
456                    "Invalid execution input: must be valid JSON".to_string(),
457                )
458            })?;
459        }
460
461        let execution_name = body["name"]
462            .as_str()
463            .map(|s| s.to_string())
464            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
465
466        if let Some(name) = body["name"].as_str() {
467            validate_name(name)?;
468        }
469
470        let mut accounts = self.state.write();
471        let state = accounts.get_or_create(&req.account_id);
472        let sm = state
473            .state_machines
474            .get(sm_arn)
475            .ok_or_else(|| state_machine_not_found(sm_arn))?;
476
477        let sm_name = sm.name.clone();
478        let definition = sm.definition.clone();
479        let exec_arn = state.execution_arn(&sm_name, &execution_name);
480
481        // Check for duplicate execution name
482        if state.executions.contains_key(&exec_arn) {
483            return Err(AwsServiceError::aws_error(
484                StatusCode::CONFLICT,
485                "ExecutionAlreadyExists",
486                format!("Execution Already Exists: '{exec_arn}'"),
487            ));
488        }
489
490        let now = Utc::now();
491        let execution = Execution {
492            execution_arn: exec_arn.clone(),
493            state_machine_arn: sm_arn.to_string(),
494            state_machine_name: sm_name,
495            name: execution_name,
496            status: ExecutionStatus::Running,
497            input: input.clone(),
498            output: None,
499            start_date: now,
500            stop_date: None,
501            error: None,
502            cause: None,
503            history_events: vec![],
504            parent_execution_arn: None,
505            is_sync: false,
506            billed_duration_ms: None,
507            billed_memory_mb: None,
508        };
509
510        state.executions.insert(exec_arn.clone(), execution);
511        let logging_config = sm.logging_configuration.clone();
512        drop(accounts);
513
514        // Spawn async execution
515        let shared_state = self.state.clone();
516        let exec_arn_clone = exec_arn.clone();
517        let input_clone = input;
518        let delivery = self.delivery.clone();
519        let dynamodb_state = self.dynamodb_state.clone();
520        let registry = self.registry.clone();
521        tokio::spawn(async move {
522            interpreter::execute_state_machine(
523                shared_state,
524                exec_arn_clone,
525                definition,
526                input_clone,
527                delivery,
528                dynamodb_state,
529                registry,
530                logging_config,
531            )
532            .await;
533        });
534
535        Ok(AwsResponse::ok_json(json!({
536            "executionArn": exec_arn,
537            "startDate": now.timestamp() as f64,
538        })))
539    }
540
541    fn stop_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
542        let body = req.json_body();
543        validate_required("executionArn", &body["executionArn"])?;
544        let exec_arn = body["executionArn"]
545            .as_str()
546            .ok_or_else(|| missing("executionArn"))?;
547
548        let error = body["error"].as_str().map(|s| s.to_string());
549        let cause = body["cause"].as_str().map(|s| s.to_string());
550
551        let mut accounts = self.state.write();
552        let state = accounts.get_or_create(&req.account_id);
553        let exec = state
554            .executions
555            .get_mut(exec_arn)
556            .ok_or_else(|| execution_not_found(exec_arn))?;
557
558        if exec.status != ExecutionStatus::Running {
559            return Err(AwsServiceError::aws_error(
560                StatusCode::BAD_REQUEST,
561                "ExecutionNotRunning",
562                format!("Execution is not running: '{exec_arn}'"),
563            ));
564        }
565
566        let now = Utc::now();
567        exec.status = ExecutionStatus::Aborted;
568        exec.stop_date = Some(now);
569        exec.error = error;
570        exec.cause = cause;
571
572        Ok(AwsResponse::ok_json(json!({
573            "stopDate": now.timestamp() as f64,
574        })))
575    }
576
577    fn describe_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
578        let body = req.json_body();
579        validate_required("executionArn", &body["executionArn"])?;
580        let exec_arn = body["executionArn"]
581            .as_str()
582            .ok_or_else(|| missing("executionArn"))?;
583
584        let accounts = self.state.read();
585        let empty = StepFunctionsState::new(&req.account_id, &req.region);
586        let state = accounts.get(&req.account_id).unwrap_or(&empty);
587        let exec = state
588            .executions
589            .get(exec_arn)
590            .ok_or_else(|| execution_not_found(exec_arn))?;
591
592        Ok(AwsResponse::ok_json(execution_to_json(exec)))
593    }
594
595    fn list_executions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
596        let body = req.json_body();
597        validate_required("stateMachineArn", &body["stateMachineArn"])?;
598        let sm_arn = body["stateMachineArn"]
599            .as_str()
600            .ok_or_else(|| missing("stateMachineArn"))?;
601        validate_arn(sm_arn)?;
602
603        let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
604        validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
605        let next_token = body["nextToken"].as_str();
606        let status_filter = body["statusFilter"].as_str();
607
608        let accounts = self.state.read();
609        let empty = StepFunctionsState::new(&req.account_id, &req.region);
610        let state = accounts.get(&req.account_id).unwrap_or(&empty);
611
612        // Verify state machine exists
613        if !state.state_machines.contains_key(sm_arn) {
614            return Err(state_machine_not_found(sm_arn));
615        }
616
617        let mut executions: Vec<&Execution> = state
618            .executions
619            .values()
620            .filter(|e| e.state_machine_arn == sm_arn)
621            .filter(|e| {
622                status_filter
623                    .map(|sf| e.status.as_str() == sf)
624                    .unwrap_or(true)
625            })
626            .collect();
627
628        // Sort by start date descending (most recent first)
629        executions.sort_by_key(|e| std::cmp::Reverse(e.start_date));
630
631        let items: Vec<Value> = executions
632            .iter()
633            .map(|e| {
634                let mut item = json!({
635                    "executionArn": e.execution_arn,
636                    "stateMachineArn": e.state_machine_arn,
637                    "name": e.name,
638                    "status": e.status.as_str(),
639                    "startDate": e.start_date.timestamp() as f64,
640                });
641                if let Some(stop) = e.stop_date {
642                    item["stopDate"] = json!(stop.timestamp() as f64);
643                }
644                item
645            })
646            .collect();
647
648        let (page, token) = paginate(&items, next_token, max_results);
649
650        let mut resp = json!({ "executions": page });
651        if let Some(t) = token {
652            resp["nextToken"] = json!(t);
653        }
654        Ok(AwsResponse::ok_json(resp))
655    }
656
657    fn get_execution_history(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
658        let body = req.json_body();
659        validate_required("executionArn", &body["executionArn"])?;
660        let exec_arn = body["executionArn"]
661            .as_str()
662            .ok_or_else(|| missing("executionArn"))?;
663
664        let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
665        validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
666        let next_token = body["nextToken"].as_str();
667        let reverse_order = body["reverseOrder"].as_bool().unwrap_or(false);
668
669        let accounts = self.state.read();
670        let empty = StepFunctionsState::new(&req.account_id, &req.region);
671        let state = accounts.get(&req.account_id).unwrap_or(&empty);
672        let exec = state
673            .executions
674            .get(exec_arn)
675            .ok_or_else(|| execution_not_found(exec_arn))?;
676
677        let mut events: Vec<Value> = exec
678            .history_events
679            .iter()
680            .map(|e| {
681                json!({
682                    "id": e.id,
683                    "type": e.event_type,
684                    "timestamp": e.timestamp.timestamp() as f64,
685                    "previousEventId": e.previous_event_id,
686                    format!("{}EventDetails", camel_to_details_key(&e.event_type)): e.details,
687                })
688            })
689            .collect();
690
691        if reverse_order {
692            events.reverse();
693        }
694
695        let (page, token) = paginate(&events, next_token, max_results);
696
697        let mut resp = json!({ "events": page });
698        if let Some(t) = token {
699            resp["nextToken"] = json!(t);
700        }
701        Ok(AwsResponse::ok_json(resp))
702    }
703
704    fn describe_state_machine_for_execution(
705        &self,
706        req: &AwsRequest,
707    ) -> Result<AwsResponse, AwsServiceError> {
708        let body = req.json_body();
709        validate_required("executionArn", &body["executionArn"])?;
710        let exec_arn = body["executionArn"]
711            .as_str()
712            .ok_or_else(|| missing("executionArn"))?;
713
714        let accounts = self.state.read();
715        let empty = StepFunctionsState::new(&req.account_id, &req.region);
716        let state = accounts.get(&req.account_id).unwrap_or(&empty);
717        let exec = state
718            .executions
719            .get(exec_arn)
720            .ok_or_else(|| execution_not_found(exec_arn))?;
721
722        let sm = state
723            .state_machines
724            .get(&exec.state_machine_arn)
725            .ok_or_else(|| state_machine_not_found(&exec.state_machine_arn))?;
726
727        Ok(AwsResponse::ok_json(state_machine_to_json(sm)))
728    }
729
730    // ─── Tagging ────────────────────────────────────────────────────────
731
732    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
733        let body = req.json_body();
734        validate_required("resourceArn", &body["resourceArn"])?;
735        let arn = body["resourceArn"]
736            .as_str()
737            .ok_or_else(|| missing("resourceArn"))?;
738        validate_arn(arn)?;
739        validate_required("tags", &body["tags"])?;
740
741        let mut accounts = self.state.write();
742        let state = accounts.get_or_create(&req.account_id);
743        let sm = state
744            .state_machines
745            .get_mut(arn)
746            .ok_or_else(|| resource_not_found(arn))?;
747
748        fakecloud_core::tags::apply_tags(&mut sm.tags, &body, "tags", "key", "value").map_err(
749            |f| {
750                AwsServiceError::aws_error(
751                    StatusCode::BAD_REQUEST,
752                    "ValidationException",
753                    format!("{f} must be a list"),
754                )
755            },
756        )?;
757
758        Ok(AwsResponse::ok_json(json!({})))
759    }
760
761    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
762        let body = req.json_body();
763        validate_required("resourceArn", &body["resourceArn"])?;
764        let arn = body["resourceArn"]
765            .as_str()
766            .ok_or_else(|| missing("resourceArn"))?;
767        validate_arn(arn)?;
768        validate_required("tagKeys", &body["tagKeys"])?;
769
770        let mut accounts = self.state.write();
771        let state = accounts.get_or_create(&req.account_id);
772        let sm = state
773            .state_machines
774            .get_mut(arn)
775            .ok_or_else(|| resource_not_found(arn))?;
776
777        fakecloud_core::tags::remove_tags(&mut sm.tags, &body, "tagKeys").map_err(|f| {
778            AwsServiceError::aws_error(
779                StatusCode::BAD_REQUEST,
780                "ValidationException",
781                format!("{f} must be a list"),
782            )
783        })?;
784
785        Ok(AwsResponse::ok_json(json!({})))
786    }
787
788    fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
789        let body = req.json_body();
790        validate_required("resourceArn", &body["resourceArn"])?;
791        let arn = body["resourceArn"]
792            .as_str()
793            .ok_or_else(|| missing("resourceArn"))?;
794        validate_arn(arn)?;
795
796        let accounts = self.state.read();
797        let empty = StepFunctionsState::new(&req.account_id, &req.region);
798        let state = accounts.get(&req.account_id).unwrap_or(&empty);
799        let sm = state
800            .state_machines
801            .get(arn)
802            .ok_or_else(|| resource_not_found(arn))?;
803
804        let tags = fakecloud_core::tags::tags_to_json(&sm.tags, "key", "value");
805
806        Ok(AwsResponse::ok_json(json!({ "tags": tags })))
807    }
808
809    // ─── Activities ─────────────────────────────────────────────────────
810
811    fn create_activity(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
812        let body = req.json_body();
813        let name = body["name"].as_str().ok_or_else(|| missing("name"))?;
814        validate_name(name)?;
815        let mut accounts = self.state.write();
816        let state = accounts.get_or_create(&req.account_id);
817        let arn = format!(
818            "arn:aws:states:{}:{}:activity:{}",
819            state.region, state.account_id, name
820        );
821        if state.activities.contains_key(&arn) {
822            return Err(AwsServiceError::aws_error(
823                StatusCode::BAD_REQUEST,
824                "ActivityAlreadyExists",
825                format!("Activity already exists: {arn}"),
826            ));
827        }
828        let activity = crate::state::Activity {
829            name: name.to_string(),
830            arn: arn.clone(),
831            creation_date: chrono::Utc::now(),
832            tags: BTreeMap::new(),
833        };
834        state.activities.insert(arn.clone(), activity.clone());
835        Ok(AwsResponse::ok_json(json!({
836            "activityArn": arn,
837            "creationDate": activity.creation_date.timestamp(),
838        })))
839    }
840
841    fn delete_activity(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
842        let body = req.json_body();
843        let arn = body["activityArn"]
844            .as_str()
845            .ok_or_else(|| missing("activityArn"))?
846            .to_string();
847        let mut accounts = self.state.write();
848        let state = accounts.get_or_create(&req.account_id);
849        state.activities.remove(&arn);
850        Ok(AwsResponse::ok_json(json!({})))
851    }
852
853    fn describe_activity(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
854        let body = req.json_body();
855        let arn = body["activityArn"]
856            .as_str()
857            .ok_or_else(|| missing("activityArn"))?
858            .to_string();
859        let accounts = self.state.read();
860        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
861        let state = accounts.get(&req.account_id).unwrap_or(&empty);
862        let a = state.activities.get(&arn).ok_or_else(|| {
863            AwsServiceError::aws_error(
864                StatusCode::BAD_REQUEST,
865                "ActivityDoesNotExist",
866                format!("Activity does not exist: {arn}"),
867            )
868        })?;
869        Ok(AwsResponse::ok_json(json!({
870            "activityArn": a.arn,
871            "name": a.name,
872            "creationDate": a.creation_date.timestamp(),
873        })))
874    }
875
876    fn list_activities(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
877        let accounts = self.state.read();
878        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
879        let state = accounts.get(&req.account_id).unwrap_or(&empty);
880        let mut activities: Vec<&crate::state::Activity> = state.activities.values().collect();
881        activities.sort_by(|a, b| a.name.cmp(&b.name));
882        let body = json!({
883            "activities": activities.iter().map(|a| json!({
884                "activityArn": a.arn,
885                "name": a.name,
886                "creationDate": a.creation_date.timestamp(),
887            })).collect::<Vec<_>>(),
888        });
889        Ok(AwsResponse::ok_json(body))
890    }
891
892    async fn get_activity_task(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
893        let body = req.json_body();
894        let arn = body["activityArn"]
895            .as_str()
896            .ok_or_else(|| missing("activityArn"))?
897            .to_string();
898        // Activity must exist before we'll accept long-poll calls.
899        {
900            let accounts = self.state.read();
901            let state = accounts
902                .get(&req.account_id)
903                .ok_or_else(|| activity_not_found(&arn))?;
904            if !state.activities.contains_key(&arn) {
905                return Err(activity_not_found(&arn));
906            }
907        }
908
909        // AWS GetActivityTask blocks up to 60s. fakecloud defaults to 5s
910        // so test suites don't stall when no worker is feeding the queue.
911        let max_wait_secs: u64 = std::env::var("FAKECLOUD_SFN_GET_ACTIVITY_TIMEOUT_SECS")
912            .ok()
913            .and_then(|s| s.parse().ok())
914            .unwrap_or(5);
915        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(max_wait_secs);
916
917        loop {
918            // Try to dequeue oldest PENDING token for this activity.
919            {
920                let mut accounts = self.state.write();
921                let state = accounts.get_or_create(&req.account_id);
922                let mut candidates: Vec<(String, chrono::DateTime<chrono::Utc>)> = state
923                    .task_tokens
924                    .iter()
925                    .filter(|(_, t)| t.activity_arn == arn && t.status == "PENDING")
926                    .map(|(k, t)| (k.clone(), t.created_at))
927                    .collect();
928                candidates.sort_by_key(|c| c.1);
929                if let Some((token, _)) = candidates.into_iter().next() {
930                    let now = chrono::Utc::now();
931                    let entry = state.task_tokens.get_mut(&token).expect("just looked up");
932                    entry.status = "IN_PROGRESS".to_string();
933                    entry.last_heartbeat_at = Some(now);
934                    let input = entry.input.clone().unwrap_or_else(|| "{}".to_string());
935                    return Ok(AwsResponse::ok_json(json!({
936                        "taskToken": token,
937                        "input": input,
938                    })));
939                }
940            }
941            if std::time::Instant::now() >= deadline {
942                // No task available in window — return empty token (matches
943                // AWS behavior).
944                return Ok(AwsResponse::ok_json(json!({
945                    "taskToken": "",
946                    "input": "",
947                })));
948            }
949            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
950        }
951    }
952
953    fn send_task_success(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
954        self.update_task_token(req, "SUCCEEDED")
955    }
956
957    fn send_task_failure(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
958        self.update_task_token(req, "FAILED")
959    }
960
961    fn send_task_heartbeat(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
962        // Heartbeats only refresh `last_heartbeat_at`; they don't change
963        // the task's lifecycle status. The interpreter's heartbeat-timeout
964        // check reads `last_heartbeat_at` to decide whether to fail the
965        // task with `States.HeartbeatTimeout`.
966        let body = req.json_body();
967        let token = body["taskToken"]
968            .as_str()
969            .ok_or_else(|| missing("taskToken"))?
970            .to_string();
971        let mut accounts = self.state.write();
972        let state = accounts.get_or_create(&req.account_id);
973        let entry = state
974            .task_tokens
975            .get_mut(&token)
976            .ok_or_else(|| task_does_not_exist(&token))?;
977        entry.last_heartbeat_at = Some(chrono::Utc::now());
978        Ok(AwsResponse::ok_json(json!({})))
979    }
980
981    fn update_task_token(
982        &self,
983        req: &AwsRequest,
984        new_status: &str,
985    ) -> Result<AwsResponse, AwsServiceError> {
986        let body = req.json_body();
987        let token = body["taskToken"]
988            .as_str()
989            .ok_or_else(|| missing("taskToken"))?
990            .to_string();
991        let mut accounts = self.state.write();
992        let state = accounts.get_or_create(&req.account_id);
993        let entry = state
994            .task_tokens
995            .get_mut(&token)
996            .ok_or_else(|| task_does_not_exist(&token))?;
997        entry.status = new_status.to_string();
998        if new_status == "SUCCEEDED" {
999            entry.output = body["output"].as_str().map(String::from);
1000        } else if new_status == "FAILED" {
1001            entry.error = body["error"].as_str().map(String::from);
1002            entry.cause = body["cause"].as_str().map(String::from);
1003        }
1004        Ok(AwsResponse::ok_json(json!({})))
1005    }
1006
1007    // ─── State machine versions / aliases ───────────────────────────────
1008
1009    fn publish_state_machine_version(
1010        &self,
1011        req: &AwsRequest,
1012    ) -> Result<AwsResponse, AwsServiceError> {
1013        let body = req.json_body();
1014        let arn = body["stateMachineArn"]
1015            .as_str()
1016            .ok_or_else(|| missing("stateMachineArn"))?
1017            .to_string();
1018        let description = body["description"].as_str().unwrap_or("").to_string();
1019        let mut accounts = self.state.write();
1020        let state = accounts.get_or_create(&req.account_id);
1021        if !state.state_machines.contains_key(&arn) {
1022            return Err(state_machine_not_found(&arn));
1023        }
1024        let version = state
1025            .state_machine_versions
1026            .values()
1027            .filter(|v| v.state_machine_arn == arn)
1028            .map(|v| v.version)
1029            .max()
1030            .unwrap_or(0)
1031            + 1;
1032        let version_arn = format!("{arn}:{version}");
1033        let v = crate::state::StateMachineVersion {
1034            state_machine_arn: arn,
1035            version,
1036            revision_id: format!("rev-{version}"),
1037            description,
1038            creation_date: chrono::Utc::now(),
1039        };
1040        state
1041            .state_machine_versions
1042            .insert(version_arn.clone(), v.clone());
1043        Ok(AwsResponse::ok_json(json!({
1044            "stateMachineVersionArn": version_arn,
1045            "creationDate": v.creation_date.timestamp(),
1046        })))
1047    }
1048
1049    fn delete_state_machine_version(
1050        &self,
1051        req: &AwsRequest,
1052    ) -> Result<AwsResponse, AwsServiceError> {
1053        let body = req.json_body();
1054        let arn = body["stateMachineVersionArn"]
1055            .as_str()
1056            .ok_or_else(|| missing("stateMachineVersionArn"))?
1057            .to_string();
1058        let mut accounts = self.state.write();
1059        let state = accounts.get_or_create(&req.account_id);
1060        state.state_machine_versions.remove(&arn);
1061        Ok(AwsResponse::ok_json(json!({})))
1062    }
1063
1064    fn list_state_machine_versions(
1065        &self,
1066        req: &AwsRequest,
1067    ) -> Result<AwsResponse, AwsServiceError> {
1068        let body = req.json_body();
1069        let arn = body["stateMachineArn"]
1070            .as_str()
1071            .ok_or_else(|| missing("stateMachineArn"))?
1072            .to_string();
1073        let accounts = self.state.read();
1074        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1075        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1076        let mut versions: Vec<&crate::state::StateMachineVersion> = state
1077            .state_machine_versions
1078            .values()
1079            .filter(|v| v.state_machine_arn == arn)
1080            .collect();
1081        versions.sort_by_key(|v| std::cmp::Reverse(v.version));
1082        let resp = json!({
1083            "stateMachineVersions": versions.iter().map(|v| json!({
1084                "stateMachineVersionArn": format!("{}:{}", v.state_machine_arn, v.version),
1085                "creationDate": v.creation_date.timestamp(),
1086            })).collect::<Vec<_>>(),
1087        });
1088        Ok(AwsResponse::ok_json(resp))
1089    }
1090
1091    fn create_state_machine_alias(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1092        let body = req.json_body();
1093        let name = body["name"]
1094            .as_str()
1095            .ok_or_else(|| missing("name"))?
1096            .to_string();
1097        validate_name(&name)?;
1098        let routing_cfg = body["routingConfiguration"]
1099            .as_array()
1100            .ok_or_else(|| missing("routingConfiguration"))?;
1101        let routes = parse_routing_configuration(routing_cfg)?;
1102        let parent_arn = routes[0]
1103            .state_machine_version_arn
1104            .rsplit_once(':')
1105            .map(|(parent, _)| parent.to_string())
1106            .unwrap_or_default();
1107        let alias_arn = format!("{parent_arn}:{name}");
1108        let now = chrono::Utc::now();
1109        let alias = crate::state::StateMachineAlias {
1110            name,
1111            arn: alias_arn.clone(),
1112            description: body["description"].as_str().unwrap_or("").to_string(),
1113            routing_configuration: routes,
1114            creation_date: now,
1115            update_date: now,
1116        };
1117        let mut accounts = self.state.write();
1118        let state = accounts.get_or_create(&req.account_id);
1119        state.state_machine_aliases.insert(alias_arn.clone(), alias);
1120        Ok(AwsResponse::ok_json(json!({
1121            "stateMachineAliasArn": alias_arn,
1122            "creationDate": now.timestamp(),
1123        })))
1124    }
1125
1126    fn delete_state_machine_alias(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1127        let body = req.json_body();
1128        let arn = body["stateMachineAliasArn"]
1129            .as_str()
1130            .ok_or_else(|| missing("stateMachineAliasArn"))?
1131            .to_string();
1132        let mut accounts = self.state.write();
1133        let state = accounts.get_or_create(&req.account_id);
1134        state.state_machine_aliases.remove(&arn);
1135        Ok(AwsResponse::ok_json(json!({})))
1136    }
1137
1138    fn describe_state_machine_alias(
1139        &self,
1140        req: &AwsRequest,
1141    ) -> Result<AwsResponse, AwsServiceError> {
1142        let body = req.json_body();
1143        let arn = body["stateMachineAliasArn"]
1144            .as_str()
1145            .ok_or_else(|| missing("stateMachineAliasArn"))?
1146            .to_string();
1147        let accounts = self.state.read();
1148        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1149        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1150        let alias = state
1151            .state_machine_aliases
1152            .get(&arn)
1153            .ok_or_else(|| resource_not_found(&arn))?;
1154        Ok(AwsResponse::ok_json(state_machine_alias_to_json(alias)))
1155    }
1156
1157    fn list_state_machine_aliases(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1158        let body = req.json_body();
1159        let parent = body["stateMachineArn"]
1160            .as_str()
1161            .ok_or_else(|| missing("stateMachineArn"))?
1162            .to_string();
1163        let accounts = self.state.read();
1164        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1165        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1166        // Anchor the prefix on the alias separator so a state machine
1167        // named `foo` doesn't pull in aliases for `foobar`.
1168        let parent_prefix = format!("{parent}:");
1169        let mut aliases: Vec<&crate::state::StateMachineAlias> = state
1170            .state_machine_aliases
1171            .values()
1172            .filter(|a| a.arn.starts_with(&parent_prefix))
1173            .collect();
1174        aliases.sort_by(|a, b| a.name.cmp(&b.name));
1175        Ok(AwsResponse::ok_json(json!({
1176            "stateMachineAliases": aliases.iter().map(|a| json!({
1177                "stateMachineAliasArn": a.arn,
1178                "creationDate": a.creation_date.timestamp(),
1179            })).collect::<Vec<_>>(),
1180        })))
1181    }
1182
1183    fn update_state_machine_alias(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1184        let body = req.json_body();
1185        let arn = body["stateMachineAliasArn"]
1186            .as_str()
1187            .ok_or_else(|| missing("stateMachineAliasArn"))?
1188            .to_string();
1189        let mut accounts = self.state.write();
1190        let state = accounts.get_or_create(&req.account_id);
1191        let alias = state
1192            .state_machine_aliases
1193            .get_mut(&arn)
1194            .ok_or_else(|| resource_not_found(&arn))?;
1195        if let Some(d) = body["description"].as_str() {
1196            alias.description = d.to_string();
1197        }
1198        if let Some(routes) = body["routingConfiguration"].as_array() {
1199            alias.routing_configuration = parse_routing_configuration(routes)?;
1200        }
1201        alias.update_date = chrono::Utc::now();
1202        Ok(AwsResponse::ok_json(json!({
1203            "updateDate": alias.update_date.timestamp(),
1204        })))
1205    }
1206
1207    // ─── Map runs ───────────────────────────────────────────────────────
1208
1209    fn describe_map_run(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1210        let body = req.json_body();
1211        let arn = body["mapRunArn"]
1212            .as_str()
1213            .ok_or_else(|| missing("mapRunArn"))?
1214            .to_string();
1215        let accounts = self.state.read();
1216        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1217        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1218        let mr = state
1219            .map_runs
1220            .get(&arn)
1221            .ok_or_else(|| resource_not_found(&arn))?;
1222        Ok(AwsResponse::ok_json(map_run_to_json(mr)))
1223    }
1224
1225    fn list_map_runs(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1226        let body = req.json_body();
1227        let exec_arn = body["executionArn"].as_str().map(String::from);
1228        let accounts = self.state.read();
1229        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1230        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1231        let runs: Vec<&crate::state::MapRun> = state
1232            .map_runs
1233            .values()
1234            .filter(|r| exec_arn.as_deref().is_none_or(|e| r.execution_arn == e))
1235            .collect();
1236        Ok(AwsResponse::ok_json(json!({
1237            "mapRuns": runs.iter().map(|r| json!({
1238                "mapRunArn": r.map_run_arn,
1239                "executionArn": r.execution_arn,
1240                "stateMachineArn": "",
1241                "startDate": r.start_date.timestamp(),
1242                "stopDate": r.stop_date.map(|d| d.timestamp()),
1243            })).collect::<Vec<_>>(),
1244        })))
1245    }
1246
1247    fn update_map_run(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1248        let body = req.json_body();
1249        let arn = body["mapRunArn"]
1250            .as_str()
1251            .ok_or_else(|| missing("mapRunArn"))?
1252            .to_string();
1253        let mut accounts = self.state.write();
1254        let state = accounts.get_or_create(&req.account_id);
1255        let mr = state
1256            .map_runs
1257            .get_mut(&arn)
1258            .ok_or_else(|| resource_not_found(&arn))?;
1259        if let Some(c) = body["maxConcurrency"].as_i64() {
1260            mr.max_concurrency = c as i32;
1261        }
1262        if let Some(p) = body["toleratedFailurePercentage"].as_f64() {
1263            mr.tolerated_failure_percentage = p;
1264        }
1265        if let Some(c) = body["toleratedFailureCount"].as_i64() {
1266            mr.tolerated_failure_count = c;
1267        }
1268        Ok(AwsResponse::ok_json(json!({})))
1269    }
1270
1271    // ─── Execution lifecycle extras ─────────────────────────────────────
1272
1273    fn redrive_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1274        let body = req.json_body();
1275        let arn = body["executionArn"]
1276            .as_str()
1277            .ok_or_else(|| missing("executionArn"))?
1278            .to_string();
1279        let mut accounts = self.state.write();
1280        let state = accounts.get_or_create(&req.account_id);
1281        let exec = state.executions.get_mut(&arn).ok_or_else(|| {
1282            AwsServiceError::aws_error(
1283                StatusCode::BAD_REQUEST,
1284                "ExecutionDoesNotExist",
1285                format!("Execution does not exist: {arn}"),
1286            )
1287        })?;
1288        exec.status = crate::state::ExecutionStatus::Running;
1289        exec.stop_date = None;
1290        Ok(AwsResponse::ok_json(json!({
1291            "redriveDate": chrono::Utc::now().timestamp(),
1292        })))
1293    }
1294
1295    async fn start_sync_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1296        let body = req.json_body();
1297        let sm_arn = body["stateMachineArn"]
1298            .as_str()
1299            .ok_or_else(|| missing("stateMachineArn"))?
1300            .to_string();
1301        let input = body["input"].as_str().unwrap_or("{}").to_string();
1302        if serde_json::from_str::<serde_json::Value>(&input).is_err() {
1303            return Err(AwsServiceError::aws_error(
1304                StatusCode::BAD_REQUEST,
1305                "InvalidExecutionInput",
1306                "Execution input is not valid JSON.",
1307            ));
1308        }
1309        let (exec_arn, definition, logging_config) = {
1310            let mut accounts = self.state.write();
1311            let state = accounts.get_or_create(&req.account_id);
1312            let sm = state
1313                .state_machines
1314                .get(&sm_arn)
1315                .ok_or_else(|| state_machine_not_found(&sm_arn))?;
1316            if sm.machine_type != crate::state::StateMachineType::Express {
1317                return Err(AwsServiceError::aws_error(
1318                    StatusCode::BAD_REQUEST,
1319                    "StateMachineTypeNotSupported",
1320                    "StartSyncExecution is only supported for EXPRESS state machines.",
1321                ));
1322            }
1323            let now = chrono::Utc::now();
1324            let exec_name = format!("sync-{}", now.timestamp_millis());
1325            let exec_arn = format!(
1326                "arn:aws:states:{}:{}:express:{}:{}",
1327                state.region, state.account_id, sm.name, exec_name
1328            );
1329            let execution = Execution {
1330                execution_arn: exec_arn.clone(),
1331                state_machine_arn: sm_arn.clone(),
1332                state_machine_name: sm.name.clone(),
1333                name: exec_name.clone(),
1334                status: ExecutionStatus::Running,
1335                input: Some(input.clone()),
1336                output: None,
1337                start_date: now,
1338                stop_date: None,
1339                error: None,
1340                cause: None,
1341                history_events: vec![],
1342                parent_execution_arn: None,
1343                is_sync: true,
1344                billed_duration_ms: None,
1345                billed_memory_mb: None,
1346            };
1347            state.executions.insert(exec_arn.clone(), execution);
1348            (
1349                exec_arn,
1350                sm.definition.clone(),
1351                sm.logging_configuration.clone(),
1352            )
1353        };
1354
1355        interpreter::execute_state_machine(
1356            self.state.clone(),
1357            exec_arn.clone(),
1358            definition,
1359            Some(input),
1360            self.delivery.clone(),
1361            self.dynamodb_state.clone(),
1362            self.registry.clone(),
1363            logging_config,
1364        )
1365        .await;
1366
1367        // Persist billing details on the stored execution so introspection
1368        // endpoints can replay the same numbers later.
1369        {
1370            let mut accounts = self.state.write();
1371            if let Some(state) = accounts.get_mut(&req.account_id) {
1372                if let Some(exec) = state.executions.get_mut(&exec_arn) {
1373                    let duration_ms = exec
1374                        .stop_date
1375                        .map_or(0, |stop| (stop - exec.start_date).num_milliseconds())
1376                        .max(0);
1377                    exec.billed_duration_ms = Some(duration_ms);
1378                    exec.billed_memory_mb = Some(64);
1379                }
1380            }
1381        }
1382
1383        let accounts = self.state.read();
1384        let state = accounts.get(&req.account_id).unwrap();
1385        let exec = state
1386            .executions
1387            .get(&exec_arn)
1388            .ok_or_else(|| execution_not_found(&exec_arn))?;
1389
1390        let mut resp = json!({
1391            "executionArn": exec.execution_arn,
1392            "stateMachineArn": exec.state_machine_arn,
1393            "name": exec.name,
1394            "startDate": exec.start_date.timestamp(),
1395            "stopDate": exec.stop_date.map(|d| d.timestamp()),
1396            "status": exec.status.as_str(),
1397            "input": exec.input.as_deref().unwrap_or("{}"),
1398        });
1399
1400        if let Some(ref output) = exec.output {
1401            resp["output"] = json!(output);
1402        }
1403        if let Some(ref error) = exec.error {
1404            resp["error"] = json!(error);
1405        }
1406        if let Some(ref cause) = exec.cause {
1407            resp["cause"] = json!(cause);
1408        }
1409
1410        let duration_ms = exec
1411            .stop_date
1412            .map_or(0, |stop| (stop - exec.start_date).num_milliseconds());
1413        resp["billingDetails"] = json!({
1414            "billedMemoryUsedInMB": 64,
1415            "billedDurationInMilliseconds": duration_ms.max(0),
1416        });
1417
1418        Ok(AwsResponse::ok_json(resp))
1419    }
1420
1421    fn test_state(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1422        let body = req.json_body();
1423        let definition = body["definition"]
1424            .as_str()
1425            .ok_or_else(|| missing("definition"))?;
1426        validate_definition(definition)?;
1427        let _role_arn = body["roleArn"].as_str().ok_or_else(|| missing("roleArn"))?;
1428        let input = body["input"].as_str().unwrap_or("{}").to_string();
1429        // Echo input back as output. Real Step Functions actually
1430        // simulates the state; our emulator reports SUCCEEDED so callers
1431        // can wire the integration test scaffolding.
1432        Ok(AwsResponse::ok_json(json!({
1433            "output": input,
1434            "status": "SUCCEEDED",
1435            "nextState": "End",
1436        })))
1437    }
1438
1439    fn validate_state_machine_definition(
1440        &self,
1441        req: &AwsRequest,
1442    ) -> Result<AwsResponse, AwsServiceError> {
1443        let body = req.json_body();
1444        let definition = body["definition"]
1445            .as_str()
1446            .ok_or_else(|| missing("definition"))?;
1447        match validate_definition(definition) {
1448            Ok(()) => Ok(AwsResponse::ok_json(json!({
1449                "result": "OK",
1450                "diagnostics": [],
1451            }))),
1452            Err(e) => Ok(AwsResponse::ok_json(json!({
1453                "result": "FAIL",
1454                "diagnostics": [{
1455                    "severity": "ERROR",
1456                    "code": "INVALID_DEFINITION",
1457                    "message": e.to_string(),
1458                }],
1459            }))),
1460        }
1461    }
1462}
1463
1464fn state_machine_alias_to_json(alias: &crate::state::StateMachineAlias) -> Value {
1465    json!({
1466        "stateMachineAliasArn": alias.arn,
1467        "name": alias.name,
1468        "description": alias.description,
1469        "routingConfiguration": alias.routing_configuration.iter().map(|r| json!({
1470            "stateMachineVersionArn": r.state_machine_version_arn,
1471            "weight": r.weight,
1472        })).collect::<Vec<_>>(),
1473        "creationDate": alias.creation_date.timestamp(),
1474        "updateDate": alias.update_date.timestamp(),
1475    })
1476}
1477
1478fn map_run_to_json(mr: &crate::state::MapRun) -> Value {
1479    json!({
1480        "mapRunArn": mr.map_run_arn,
1481        "executionArn": mr.execution_arn,
1482        "maxConcurrency": mr.max_concurrency,
1483        "toleratedFailurePercentage": mr.tolerated_failure_percentage,
1484        "toleratedFailureCount": mr.tolerated_failure_count,
1485        "status": mr.status,
1486        "startDate": mr.start_date.timestamp(),
1487        "stopDate": mr.stop_date.map(|d| d.timestamp()),
1488    })
1489}
1490
1491// ─── Helpers ────────────────────────────────────────────────────────────
1492
1493fn state_machine_to_json(sm: &StateMachine) -> Value {
1494    let mut resp = json!({
1495        "name": sm.name,
1496        "stateMachineArn": sm.arn,
1497        "definition": sm.definition,
1498        "roleArn": sm.role_arn,
1499        "type": sm.machine_type.as_str(),
1500        "status": sm.status.as_str(),
1501        "creationDate": sm.creation_date.timestamp() as f64,
1502        "updateDate": sm.update_date.timestamp() as f64,
1503        "revisionId": sm.revision_id,
1504        "label": sm.name,
1505    });
1506
1507    if !sm.description.is_empty() {
1508        resp["description"] = json!(sm.description);
1509    }
1510
1511    if let Some(ref logging) = sm.logging_configuration {
1512        resp["loggingConfiguration"] = logging.clone();
1513    } else {
1514        resp["loggingConfiguration"] = json!({
1515            "level": "OFF",
1516            "includeExecutionData": false,
1517            "destinations": [],
1518        });
1519    }
1520
1521    if let Some(ref tracing) = sm.tracing_configuration {
1522        resp["tracingConfiguration"] = tracing.clone();
1523    } else {
1524        resp["tracingConfiguration"] = json!({
1525            "enabled": false,
1526        });
1527    }
1528
1529    resp
1530}
1531
1532fn missing(name: &str) -> AwsServiceError {
1533    AwsServiceError::aws_error(
1534        StatusCode::BAD_REQUEST,
1535        "ValidationException",
1536        format!("The request must contain the parameter {name}."),
1537    )
1538}
1539
1540fn state_machine_not_found(arn: &str) -> AwsServiceError {
1541    AwsServiceError::aws_error(
1542        StatusCode::BAD_REQUEST,
1543        "StateMachineDoesNotExist",
1544        format!("State Machine Does Not Exist: '{arn}'"),
1545    )
1546}
1547
1548fn activity_not_found(arn: &str) -> AwsServiceError {
1549    AwsServiceError::aws_error(
1550        StatusCode::BAD_REQUEST,
1551        "ActivityDoesNotExist",
1552        format!("Activity does not exist: {arn}"),
1553    )
1554}
1555
1556fn task_does_not_exist(token: &str) -> AwsServiceError {
1557    AwsServiceError::aws_error(
1558        StatusCode::BAD_REQUEST,
1559        "TaskDoesNotExist",
1560        format!("Task does not exist: {token}"),
1561    )
1562}
1563
1564fn resource_not_found(arn: &str) -> AwsServiceError {
1565    AwsServiceError::aws_error(
1566        StatusCode::BAD_REQUEST,
1567        "ResourceNotFound",
1568        format!("Resource not found: '{arn}'"),
1569    )
1570}
1571
1572/// Parse + validate an alias `routingConfiguration` array.
1573///
1574/// AWS rules: 1 or 2 routes; weights are 0-100 and sum to 100; each
1575/// route must include `stateMachineVersionArn`.
1576fn parse_routing_configuration(
1577    routes: &[serde_json::Value],
1578) -> Result<Vec<crate::state::AliasRoute>, AwsServiceError> {
1579    if routes.is_empty() || routes.len() > 2 {
1580        return Err(AwsServiceError::aws_error(
1581            StatusCode::BAD_REQUEST,
1582            "ValidationException",
1583            "routingConfiguration must contain 1 or 2 routes.",
1584        ));
1585    }
1586    let parsed: Vec<crate::state::AliasRoute> = routes
1587        .iter()
1588        .map(|r| {
1589            let arn = r["stateMachineVersionArn"].as_str().ok_or_else(|| {
1590                AwsServiceError::aws_error(
1591                    StatusCode::BAD_REQUEST,
1592                    "ValidationException",
1593                    "routingConfiguration entries must contain stateMachineVersionArn.",
1594                )
1595            })?;
1596            let weight = r["weight"].as_i64().ok_or_else(|| {
1597                AwsServiceError::aws_error(
1598                    StatusCode::BAD_REQUEST,
1599                    "ValidationException",
1600                    "routingConfiguration entries must contain a numeric weight.",
1601                )
1602            })?;
1603            if !(0..=100).contains(&weight) {
1604                return Err(AwsServiceError::aws_error(
1605                    StatusCode::BAD_REQUEST,
1606                    "ValidationException",
1607                    format!("Invalid routing weight {weight}; must be 0-100."),
1608                ));
1609            }
1610            Ok(crate::state::AliasRoute {
1611                state_machine_version_arn: arn.to_string(),
1612                weight: weight as i32,
1613            })
1614        })
1615        .collect::<Result<_, _>>()?;
1616    let total: i32 = parsed.iter().map(|r| r.weight).sum();
1617    if total != 100 {
1618        return Err(AwsServiceError::aws_error(
1619            StatusCode::BAD_REQUEST,
1620            "ValidationException",
1621            format!("routingConfiguration weights must sum to 100, got {total}."),
1622        ));
1623    }
1624    Ok(parsed)
1625}
1626
1627fn validate_name(name: &str) -> Result<(), AwsServiceError> {
1628    if name.is_empty() || name.len() > 80 {
1629        return Err(AwsServiceError::aws_error(
1630            StatusCode::BAD_REQUEST,
1631            "InvalidName",
1632            format!("Invalid Name: '{name}' (length must be between 1 and 80 characters)"),
1633        ));
1634    }
1635    // Only allow alphanumeric, hyphens, and underscores
1636    if !name
1637        .chars()
1638        .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
1639    {
1640        return Err(AwsServiceError::aws_error(
1641            StatusCode::BAD_REQUEST,
1642            "InvalidName",
1643            format!(
1644                "Invalid Name: '{name}' (must only contain alphanumeric characters, hyphens, and underscores)"
1645            ),
1646        ));
1647    }
1648    Ok(())
1649}
1650
1651fn validate_definition(definition: &str) -> Result<(), AwsServiceError> {
1652    let parsed: Value = serde_json::from_str(definition).map_err(|e| {
1653        AwsServiceError::aws_error(
1654            StatusCode::BAD_REQUEST,
1655            "InvalidDefinition",
1656            format!("Invalid State Machine Definition: '{e}'"),
1657        )
1658    })?;
1659
1660    if parsed.get("StartAt").and_then(|v| v.as_str()).is_none() {
1661        return Err(AwsServiceError::aws_error(
1662            StatusCode::BAD_REQUEST,
1663            "InvalidDefinition",
1664            "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
1665                .to_string(),
1666        ));
1667    }
1668
1669    let states_obj = parsed
1670        .get("States")
1671        .and_then(|v| v.as_object())
1672        .ok_or_else(|| {
1673            AwsServiceError::aws_error(
1674                StatusCode::BAD_REQUEST,
1675                "InvalidDefinition",
1676                "Invalid State Machine Definition: 'MISSING_STATES' (States field is required)"
1677                    .to_string(),
1678            )
1679        })?;
1680
1681    let start_at = parsed["StartAt"].as_str().ok_or_else(|| {
1682        AwsServiceError::aws_error(
1683            StatusCode::BAD_REQUEST,
1684            "InvalidDefinition",
1685            "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
1686                .to_string(),
1687        )
1688    })?;
1689    if !states_obj.contains_key(start_at) {
1690        return Err(AwsServiceError::aws_error(
1691            StatusCode::BAD_REQUEST,
1692            "InvalidDefinition",
1693            format!(
1694                "Invalid State Machine Definition: 'MISSING_TRANSITION_TARGET' \
1695                 (StartAt '{start_at}' does not reference a valid state)"
1696            ),
1697        ));
1698    }
1699
1700    Ok(())
1701}
1702
1703fn execution_not_found(arn: &str) -> AwsServiceError {
1704    AwsServiceError::aws_error(
1705        StatusCode::BAD_REQUEST,
1706        "ExecutionDoesNotExist",
1707        format!("Execution Does Not Exist: '{arn}'"),
1708    )
1709}
1710
1711fn execution_to_json(exec: &Execution) -> Value {
1712    let mut resp = json!({
1713        "executionArn": exec.execution_arn,
1714        "stateMachineArn": exec.state_machine_arn,
1715        "name": exec.name,
1716        "status": exec.status.as_str(),
1717        "startDate": exec.start_date.timestamp() as f64,
1718    });
1719
1720    if let Some(ref input) = exec.input {
1721        resp["input"] = json!(input);
1722    }
1723    if let Some(ref output) = exec.output {
1724        resp["output"] = json!(output);
1725    }
1726    if let Some(stop) = exec.stop_date {
1727        resp["stopDate"] = json!(stop.timestamp() as f64);
1728    }
1729    if let Some(ref error) = exec.error {
1730        resp["error"] = json!(error);
1731    }
1732    if let Some(ref cause) = exec.cause {
1733        resp["cause"] = json!(cause);
1734    }
1735
1736    resp
1737}
1738
1739/// Convert event type like "PassStateEntered" to the details key format "passStateEntered".
1740fn camel_to_details_key(event_type: &str) -> String {
1741    let mut chars = event_type.chars();
1742    match chars.next() {
1743        None => String::new(),
1744        Some(c) => c.to_lowercase().to_string() + chars.as_str(),
1745    }
1746}
1747
1748fn validate_arn(arn: &str) -> Result<(), AwsServiceError> {
1749    if !arn.starts_with("arn:") {
1750        return Err(AwsServiceError::aws_error(
1751            StatusCode::BAD_REQUEST,
1752            "InvalidArn",
1753            format!("Invalid Arn: '{arn}'"),
1754        ));
1755    }
1756    Ok(())
1757}
1758
1759/// Start a Step Functions execution from a cross-service delivery (e.g. EventBridge).
1760///
1761/// This is the public entry point used by `StepFunctionsDeliveryImpl` in the server crate.
1762/// It mirrors the logic from `StartExecution` but without the AWS request/response wrapper.
1763/// Start a Step Functions execution from a cross-service delivery (e.g. EventBridge).
1764///
1765/// This is the public entry point used by `StepFunctionsDeliveryImpl` in the server crate.
1766/// It mirrors the logic from `StartExecution` but without the AWS request/response wrapper.
1767pub fn start_execution_from_delivery(
1768    state: &SharedStepFunctionsState,
1769    delivery: &Option<Arc<DeliveryBus>>,
1770    dynamodb_state: &Option<SharedDynamoDbState>,
1771    registry: &Option<SharedServiceRegistry>,
1772    state_machine_arn: &str,
1773    input: &str,
1774) {
1775    // Validate input is valid JSON
1776    if serde_json::from_str::<serde_json::Value>(input).is_err() {
1777        tracing::warn!(
1778            state_machine_arn,
1779            "Step Functions delivery: invalid JSON input, skipping execution"
1780        );
1781        return;
1782    }
1783
1784    let execution_name = uuid::Uuid::new_v4().to_string();
1785
1786    // Extract account_id from the state machine ARN
1787    let account_id = state_machine_arn
1788        .split(':')
1789        .nth(4)
1790        .unwrap_or("000000000000")
1791        .to_string();
1792
1793    let mut accounts = state.write();
1794    let st = accounts.get_or_create(&account_id);
1795    let sm = match st.state_machines.get(state_machine_arn) {
1796        Some(sm) => sm,
1797        None => {
1798            tracing::warn!(
1799                state_machine_arn,
1800                "Step Functions delivery: state machine not found"
1801            );
1802            return;
1803        }
1804    };
1805
1806    let sm_name = sm.name.clone();
1807    let definition = sm.definition.clone();
1808    let exec_arn = st.execution_arn(&sm_name, &execution_name);
1809
1810    let now = Utc::now();
1811    let execution = Execution {
1812        execution_arn: exec_arn.clone(),
1813        state_machine_arn: state_machine_arn.to_string(),
1814        state_machine_name: sm_name,
1815        name: execution_name,
1816        status: ExecutionStatus::Running,
1817        input: Some(input.to_string()),
1818        output: None,
1819        start_date: now,
1820        stop_date: None,
1821        error: None,
1822        cause: None,
1823        history_events: vec![],
1824        parent_execution_arn: None,
1825        is_sync: false,
1826        billed_duration_ms: None,
1827        billed_memory_mb: None,
1828    };
1829
1830    st.executions.insert(exec_arn.clone(), execution);
1831    let logging_config = sm.logging_configuration.clone();
1832    drop(accounts);
1833
1834    let shared_state = state.clone();
1835    let delivery = delivery.clone();
1836    let dynamodb_state = dynamodb_state.clone();
1837    let registry = registry.clone();
1838    let input = Some(input.to_string());
1839    tokio::spawn(async move {
1840        interpreter::execute_state_machine(
1841            shared_state,
1842            exec_arn,
1843            definition,
1844            input,
1845            delivery,
1846            dynamodb_state,
1847            registry,
1848            logging_config,
1849        )
1850        .await;
1851    });
1852}
1853
1854#[cfg(test)]
1855mod tests {
1856    use super::*;
1857    use http::{HeaderMap, Method};
1858    use parking_lot::RwLock;
1859    use serde_json::Value;
1860    use std::collections::HashMap;
1861    use std::sync::Arc;
1862
1863    fn make_state() -> SharedStepFunctionsState {
1864        Arc::new(RwLock::new(
1865            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
1866        ))
1867    }
1868
1869    fn make_request(action: &str, body: &str) -> AwsRequest {
1870        AwsRequest {
1871            service: "states".to_string(),
1872            action: action.to_string(),
1873            region: "us-east-1".to_string(),
1874            account_id: "123456789012".to_string(),
1875            request_id: "test-id".to_string(),
1876            headers: HeaderMap::new(),
1877            query_params: HashMap::new(),
1878            body: body.as_bytes().to_vec().into(),
1879            body_stream: parking_lot::Mutex::new(None),
1880            path_segments: vec![],
1881            raw_path: "/".to_string(),
1882            raw_query: String::new(),
1883            method: Method::POST,
1884            is_query_protocol: false,
1885            access_key_id: None,
1886            principal: None,
1887        }
1888    }
1889
1890    fn body_json(resp: &AwsResponse) -> Value {
1891        serde_json::from_slice(resp.body.expect_bytes()).unwrap()
1892    }
1893
1894    fn expect_err(result: Result<AwsResponse, AwsServiceError>) -> AwsServiceError {
1895        match result {
1896            Err(e) => e,
1897            Ok(_) => panic!("expected error, got Ok"),
1898        }
1899    }
1900
1901    const VALID_DEF: &str = r#"{"StartAt":"Pass","States":{"Pass":{"Type":"Pass","End":true}}}"#;
1902
1903    fn create_sm(svc: &StepFunctionsService, name: &str) -> String {
1904        let body = json!({
1905            "name": name,
1906            "definition": VALID_DEF,
1907            "roleArn": "arn:aws:iam::123456789012:role/test",
1908        });
1909        let req = make_request("CreateStateMachine", &body.to_string());
1910        let resp = svc.create_state_machine(&req).unwrap();
1911        let b = body_json(&resp);
1912        b["stateMachineArn"].as_str().unwrap().to_string()
1913    }
1914
1915    // ── CreateStateMachine ──
1916
1917    #[test]
1918    fn create_state_machine_basic() {
1919        let svc = StepFunctionsService::new(make_state());
1920        let arn = create_sm(&svc, "test-sm");
1921        assert!(arn.contains("test-sm"));
1922    }
1923
1924    #[test]
1925    fn create_state_machine_with_express_type() {
1926        let svc = StepFunctionsService::new(make_state());
1927        let body = json!({
1928            "name": "express-sm",
1929            "definition": VALID_DEF,
1930            "roleArn": "arn:aws:iam::123456789012:role/r",
1931            "type": "EXPRESS",
1932        });
1933        let req = make_request("CreateStateMachine", &body.to_string());
1934        let resp = svc.create_state_machine(&req).unwrap();
1935        let b = body_json(&resp);
1936        assert!(b["stateMachineArn"].as_str().is_some());
1937    }
1938
1939    #[test]
1940    fn create_state_machine_duplicate_fails() {
1941        let svc = StepFunctionsService::new(make_state());
1942        create_sm(&svc, "dup-sm");
1943        let body = json!({
1944            "name": "dup-sm",
1945            "definition": VALID_DEF,
1946            "roleArn": "arn:aws:iam::123456789012:role/r",
1947        });
1948        let req = make_request("CreateStateMachine", &body.to_string());
1949        let err = expect_err(svc.create_state_machine(&req));
1950        assert!(err.to_string().contains("StateMachineAlreadyExists"));
1951    }
1952
1953    #[test]
1954    fn create_state_machine_missing_name() {
1955        let svc = StepFunctionsService::new(make_state());
1956        let body = json!({
1957            "definition": VALID_DEF,
1958            "roleArn": "arn:aws:iam::123456789012:role/r",
1959        });
1960        let req = make_request("CreateStateMachine", &body.to_string());
1961        assert!(svc.create_state_machine(&req).is_err());
1962    }
1963
1964    #[test]
1965    fn create_state_machine_invalid_definition() {
1966        let svc = StepFunctionsService::new(make_state());
1967        let body = json!({
1968            "name": "bad-def",
1969            "definition": "not json",
1970            "roleArn": "arn:aws:iam::123456789012:role/r",
1971        });
1972        let req = make_request("CreateStateMachine", &body.to_string());
1973        let err = expect_err(svc.create_state_machine(&req));
1974        assert!(err.to_string().contains("InvalidDefinition"));
1975    }
1976
1977    #[test]
1978    fn create_state_machine_definition_missing_start_at() {
1979        let svc = StepFunctionsService::new(make_state());
1980        let body = json!({
1981            "name": "no-start",
1982            "definition": r#"{"States":{"S":{"Type":"Pass","End":true}}}"#,
1983            "roleArn": "arn:aws:iam::123456789012:role/r",
1984        });
1985        let req = make_request("CreateStateMachine", &body.to_string());
1986        let err = expect_err(svc.create_state_machine(&req));
1987        assert!(err.to_string().contains("InvalidDefinition"));
1988    }
1989
1990    #[test]
1991    fn create_state_machine_definition_missing_states() {
1992        let svc = StepFunctionsService::new(make_state());
1993        let body = json!({
1994            "name": "no-states",
1995            "definition": r#"{"StartAt":"S"}"#,
1996            "roleArn": "arn:aws:iam::123456789012:role/r",
1997        });
1998        let req = make_request("CreateStateMachine", &body.to_string());
1999        let err = expect_err(svc.create_state_machine(&req));
2000        assert!(err.to_string().contains("InvalidDefinition"));
2001    }
2002
2003    #[test]
2004    fn create_state_machine_definition_start_at_not_in_states() {
2005        let svc = StepFunctionsService::new(make_state());
2006        let body = json!({
2007            "name": "bad-start",
2008            "definition": r#"{"StartAt":"Missing","States":{"S":{"Type":"Pass","End":true}}}"#,
2009            "roleArn": "arn:aws:iam::123456789012:role/r",
2010        });
2011        let req = make_request("CreateStateMachine", &body.to_string());
2012        let err = expect_err(svc.create_state_machine(&req));
2013        assert!(err.to_string().contains("MISSING_TRANSITION_TARGET"));
2014    }
2015
2016    #[test]
2017    fn create_state_machine_invalid_type() {
2018        let svc = StepFunctionsService::new(make_state());
2019        let body = json!({
2020            "name": "bad-type",
2021            "definition": VALID_DEF,
2022            "roleArn": "arn:aws:iam::123456789012:role/r",
2023            "type": "INVALID",
2024        });
2025        let req = make_request("CreateStateMachine", &body.to_string());
2026        assert!(svc.create_state_machine(&req).is_err());
2027    }
2028
2029    #[test]
2030    fn create_state_machine_invalid_arn() {
2031        let svc = StepFunctionsService::new(make_state());
2032        let body = json!({
2033            "name": "bad-arn",
2034            "definition": VALID_DEF,
2035            "roleArn": "not-an-arn",
2036        });
2037        let req = make_request("CreateStateMachine", &body.to_string());
2038        let err = expect_err(svc.create_state_machine(&req));
2039        assert!(err.to_string().contains("InvalidArn"));
2040    }
2041
2042    #[test]
2043    fn create_state_machine_invalid_name() {
2044        let svc = StepFunctionsService::new(make_state());
2045        let body = json!({
2046            "name": "has spaces!",
2047            "definition": VALID_DEF,
2048            "roleArn": "arn:aws:iam::123456789012:role/r",
2049        });
2050        let req = make_request("CreateStateMachine", &body.to_string());
2051        let err = expect_err(svc.create_state_machine(&req));
2052        assert!(err.to_string().contains("InvalidName"));
2053    }
2054
2055    #[test]
2056    fn create_state_machine_name_too_long() {
2057        let svc = StepFunctionsService::new(make_state());
2058        let long_name = "a".repeat(81);
2059        let body = json!({
2060            "name": long_name,
2061            "definition": VALID_DEF,
2062            "roleArn": "arn:aws:iam::123456789012:role/r",
2063        });
2064        let req = make_request("CreateStateMachine", &body.to_string());
2065        let err = expect_err(svc.create_state_machine(&req));
2066        assert!(err.to_string().contains("InvalidName"));
2067    }
2068
2069    // ── DescribeStateMachine ──
2070
2071    #[test]
2072    fn describe_state_machine_found() {
2073        let svc = StepFunctionsService::new(make_state());
2074        let arn = create_sm(&svc, "desc-sm");
2075
2076        let req = make_request(
2077            "DescribeStateMachine",
2078            &json!({"stateMachineArn": arn}).to_string(),
2079        );
2080        let resp = svc.describe_state_machine(&req).unwrap();
2081        let b = body_json(&resp);
2082        assert_eq!(b["name"], "desc-sm");
2083        assert_eq!(b["status"], "ACTIVE");
2084        assert!(b["definition"].as_str().is_some());
2085    }
2086
2087    #[test]
2088    fn describe_state_machine_not_found() {
2089        let svc = StepFunctionsService::new(make_state());
2090        let req = make_request(
2091            "DescribeStateMachine",
2092            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
2093                .to_string(),
2094        );
2095        let err = expect_err(svc.describe_state_machine(&req));
2096        assert!(err.to_string().contains("StateMachineDoesNotExist"));
2097    }
2098
2099    // ── ListStateMachines ──
2100
2101    #[test]
2102    fn list_state_machines_empty() {
2103        let svc = StepFunctionsService::new(make_state());
2104        let req = make_request("ListStateMachines", "{}");
2105        let resp = svc.list_state_machines(&req).unwrap();
2106        let b = body_json(&resp);
2107        assert!(b["stateMachines"].as_array().unwrap().is_empty());
2108    }
2109
2110    #[test]
2111    fn list_state_machines_returns_created() {
2112        let svc = StepFunctionsService::new(make_state());
2113        create_sm(&svc, "sm-1");
2114        create_sm(&svc, "sm-2");
2115
2116        let req = make_request("ListStateMachines", "{}");
2117        let resp = svc.list_state_machines(&req).unwrap();
2118        let b = body_json(&resp);
2119        assert_eq!(b["stateMachines"].as_array().unwrap().len(), 2);
2120    }
2121
2122    // ── DeleteStateMachine ──
2123
2124    #[test]
2125    fn delete_state_machine() {
2126        let svc = StepFunctionsService::new(make_state());
2127        let arn = create_sm(&svc, "del-sm");
2128
2129        let req = make_request(
2130            "DeleteStateMachine",
2131            &json!({"stateMachineArn": arn}).to_string(),
2132        );
2133        svc.delete_state_machine(&req).unwrap();
2134
2135        // Describe should fail
2136        let req = make_request(
2137            "DescribeStateMachine",
2138            &json!({"stateMachineArn": arn}).to_string(),
2139        );
2140        assert!(svc.describe_state_machine(&req).is_err());
2141    }
2142
2143    #[test]
2144    fn delete_state_machine_nonexistent_succeeds() {
2145        let svc = StepFunctionsService::new(make_state());
2146        let req = make_request(
2147            "DeleteStateMachine",
2148            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
2149                .to_string(),
2150        );
2151        // AWS returns success even for nonexistent
2152        svc.delete_state_machine(&req).unwrap();
2153    }
2154
2155    // ── UpdateStateMachine ──
2156
2157    #[test]
2158    fn update_state_machine() {
2159        let svc = StepFunctionsService::new(make_state());
2160        let arn = create_sm(&svc, "upd-sm");
2161
2162        let new_def = r#"{"StartAt":"NewPass","States":{"NewPass":{"Type":"Pass","End":true}}}"#;
2163        let body = json!({
2164            "stateMachineArn": arn,
2165            "definition": new_def,
2166            "description": "updated",
2167        });
2168        let req = make_request("UpdateStateMachine", &body.to_string());
2169        let resp = svc.update_state_machine(&req).unwrap();
2170        let b = body_json(&resp);
2171        assert!(b["updateDate"].as_f64().is_some());
2172
2173        // Verify
2174        let req = make_request(
2175            "DescribeStateMachine",
2176            &json!({"stateMachineArn": arn}).to_string(),
2177        );
2178        let resp = svc.describe_state_machine(&req).unwrap();
2179        let b = body_json(&resp);
2180        assert!(b["definition"].as_str().unwrap().contains("NewPass"));
2181        assert_eq!(b["description"], "updated");
2182    }
2183
2184    #[test]
2185    fn update_state_machine_not_found() {
2186        let svc = StepFunctionsService::new(make_state());
2187        let body = json!({
2188            "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
2189            "definition": VALID_DEF,
2190        });
2191        let req = make_request("UpdateStateMachine", &body.to_string());
2192        let err = expect_err(svc.update_state_machine(&req));
2193        assert!(err.to_string().contains("StateMachineDoesNotExist"));
2194    }
2195
2196    // ── StartExecution ──
2197
2198    #[tokio::test]
2199    async fn start_execution_basic() {
2200        let svc = StepFunctionsService::new(make_state());
2201        let arn = create_sm(&svc, "exec-sm");
2202
2203        let body = json!({
2204            "stateMachineArn": arn,
2205            "input": r#"{"key":"value"}"#,
2206        });
2207        let req = make_request("StartExecution", &body.to_string());
2208        let resp = svc.start_execution(&req).unwrap();
2209        let b = body_json(&resp);
2210        assert!(b["executionArn"].as_str().is_some());
2211        assert!(b["startDate"].as_f64().is_some());
2212    }
2213
2214    #[tokio::test]
2215    async fn start_execution_with_name() {
2216        let svc = StepFunctionsService::new(make_state());
2217        let arn = create_sm(&svc, "named-exec");
2218
2219        let body = json!({
2220            "stateMachineArn": arn,
2221            "name": "my-execution",
2222        });
2223        let req = make_request("StartExecution", &body.to_string());
2224        let resp = svc.start_execution(&req).unwrap();
2225        let b = body_json(&resp);
2226        assert!(b["executionArn"].as_str().unwrap().contains("my-execution"));
2227    }
2228
2229    #[tokio::test]
2230    async fn start_execution_sm_not_found() {
2231        let svc = StepFunctionsService::new(make_state());
2232        let body = json!({
2233            "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
2234        });
2235        let req = make_request("StartExecution", &body.to_string());
2236        let err = expect_err(svc.start_execution(&req));
2237        assert!(err.to_string().contains("StateMachineDoesNotExist"));
2238    }
2239
2240    #[tokio::test]
2241    async fn start_execution_invalid_input() {
2242        let svc = StepFunctionsService::new(make_state());
2243        let arn = create_sm(&svc, "bad-input");
2244
2245        let body = json!({
2246            "stateMachineArn": arn,
2247            "input": "not json",
2248        });
2249        let req = make_request("StartExecution", &body.to_string());
2250        let err = expect_err(svc.start_execution(&req));
2251        assert!(err.to_string().contains("InvalidExecutionInput"));
2252    }
2253
2254    #[tokio::test]
2255    async fn start_execution_duplicate_name() {
2256        let svc = StepFunctionsService::new(make_state());
2257        let arn = create_sm(&svc, "dup-exec");
2258
2259        let body = json!({
2260            "stateMachineArn": arn,
2261            "name": "same-name",
2262        });
2263        let req = make_request("StartExecution", &body.to_string());
2264        svc.start_execution(&req).unwrap();
2265
2266        let req = make_request("StartExecution", &body.to_string());
2267        let err = expect_err(svc.start_execution(&req));
2268        assert!(err.to_string().contains("ExecutionAlreadyExists"));
2269    }
2270
2271    // ── DescribeExecution ──
2272
2273    #[tokio::test]
2274    async fn describe_execution_found() {
2275        let svc = StepFunctionsService::new(make_state());
2276        let sm_arn = create_sm(&svc, "desc-exec");
2277
2278        let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
2279        let req = make_request("StartExecution", &body.to_string());
2280        let resp = svc.start_execution(&req).unwrap();
2281        let exec_arn = body_json(&resp)["executionArn"]
2282            .as_str()
2283            .unwrap()
2284            .to_string();
2285
2286        let req = make_request(
2287            "DescribeExecution",
2288            &json!({"executionArn": exec_arn}).to_string(),
2289        );
2290        let resp = svc.describe_execution(&req).unwrap();
2291        let b = body_json(&resp);
2292        assert_eq!(b["name"], "e1");
2293        assert_eq!(b["status"], "RUNNING");
2294    }
2295
2296    #[tokio::test]
2297    async fn describe_execution_not_found() {
2298        let svc = StepFunctionsService::new(make_state());
2299        let req = make_request(
2300            "DescribeExecution",
2301            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
2302                .to_string(),
2303        );
2304        let err = expect_err(svc.describe_execution(&req));
2305        assert!(err.to_string().contains("ExecutionDoesNotExist"));
2306    }
2307
2308    // ── StopExecution ──
2309
2310    #[tokio::test]
2311    async fn stop_execution() {
2312        let svc = StepFunctionsService::new(make_state());
2313        let sm_arn = create_sm(&svc, "stop-sm");
2314
2315        let body = json!({"stateMachineArn": sm_arn, "name": "stop-e"});
2316        let req = make_request("StartExecution", &body.to_string());
2317        let resp = svc.start_execution(&req).unwrap();
2318        let exec_arn = body_json(&resp)["executionArn"]
2319            .as_str()
2320            .unwrap()
2321            .to_string();
2322
2323        let body = json!({
2324            "executionArn": exec_arn,
2325            "error": "UserAborted",
2326            "cause": "test stop",
2327        });
2328        let req = make_request("StopExecution", &body.to_string());
2329        let resp = svc.stop_execution(&req).unwrap();
2330        let b = body_json(&resp);
2331        assert!(b["stopDate"].as_f64().is_some());
2332
2333        // Verify aborted
2334        let req = make_request(
2335            "DescribeExecution",
2336            &json!({"executionArn": exec_arn}).to_string(),
2337        );
2338        let resp = svc.describe_execution(&req).unwrap();
2339        let b = body_json(&resp);
2340        assert_eq!(b["status"], "ABORTED");
2341        assert_eq!(b["error"], "UserAborted");
2342    }
2343
2344    #[tokio::test]
2345    async fn stop_execution_not_found() {
2346        let svc = StepFunctionsService::new(make_state());
2347        let req = make_request(
2348            "StopExecution",
2349            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
2350                .to_string(),
2351        );
2352        let err = expect_err(svc.stop_execution(&req));
2353        assert!(err.to_string().contains("ExecutionDoesNotExist"));
2354    }
2355
2356    // ── ListExecutions ──
2357
2358    #[tokio::test]
2359    async fn list_executions() {
2360        let svc = StepFunctionsService::new(make_state());
2361        let sm_arn = create_sm(&svc, "list-exec");
2362
2363        for i in 0..3 {
2364            let body = json!({"stateMachineArn": sm_arn, "name": format!("e{i}")});
2365            let req = make_request("StartExecution", &body.to_string());
2366            svc.start_execution(&req).unwrap();
2367        }
2368
2369        let req = make_request(
2370            "ListExecutions",
2371            &json!({"stateMachineArn": sm_arn}).to_string(),
2372        );
2373        let resp = svc.list_executions(&req).unwrap();
2374        let b = body_json(&resp);
2375        assert_eq!(b["executions"].as_array().unwrap().len(), 3);
2376    }
2377
2378    #[tokio::test]
2379    async fn list_executions_sm_not_found() {
2380        let svc = StepFunctionsService::new(make_state());
2381        let req = make_request(
2382            "ListExecutions",
2383            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
2384                .to_string(),
2385        );
2386        let err = expect_err(svc.list_executions(&req));
2387        assert!(err.to_string().contains("StateMachineDoesNotExist"));
2388    }
2389
2390    // ── GetExecutionHistory ──
2391
2392    #[tokio::test]
2393    async fn get_execution_history_not_found() {
2394        let svc = StepFunctionsService::new(make_state());
2395        let req = make_request(
2396            "GetExecutionHistory",
2397            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
2398                .to_string(),
2399        );
2400        let err = expect_err(svc.get_execution_history(&req));
2401        assert!(err.to_string().contains("ExecutionDoesNotExist"));
2402    }
2403
2404    // ── DescribeStateMachineForExecution ──
2405
2406    #[tokio::test]
2407    async fn describe_sm_for_execution() {
2408        let svc = StepFunctionsService::new(make_state());
2409        let sm_arn = create_sm(&svc, "sm-for-exec");
2410
2411        let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
2412        let req = make_request("StartExecution", &body.to_string());
2413        let resp = svc.start_execution(&req).unwrap();
2414        let exec_arn = body_json(&resp)["executionArn"]
2415            .as_str()
2416            .unwrap()
2417            .to_string();
2418
2419        let req = make_request(
2420            "DescribeStateMachineForExecution",
2421            &json!({"executionArn": exec_arn}).to_string(),
2422        );
2423        let resp = svc.describe_state_machine_for_execution(&req).unwrap();
2424        let b = body_json(&resp);
2425        assert_eq!(b["name"], "sm-for-exec");
2426    }
2427
2428    // ── Tags ──
2429
2430    #[test]
2431    fn tag_untag_list_tags() {
2432        let svc = StepFunctionsService::new(make_state());
2433        let arn = create_sm(&svc, "tagged-sm");
2434
2435        // Tag
2436        let body = json!({
2437            "resourceArn": arn,
2438            "tags": [{"key": "env", "value": "prod"}],
2439        });
2440        let req = make_request("TagResource", &body.to_string());
2441        svc.tag_resource(&req).unwrap();
2442
2443        // List
2444        let req = make_request(
2445            "ListTagsForResource",
2446            &json!({"resourceArn": arn}).to_string(),
2447        );
2448        let resp = svc.list_tags_for_resource(&req).unwrap();
2449        let b = body_json(&resp);
2450        let tags = b["tags"].as_array().unwrap();
2451        assert_eq!(tags.len(), 1);
2452        assert_eq!(tags[0]["key"], "env");
2453
2454        // Untag
2455        let body = json!({
2456            "resourceArn": arn,
2457            "tagKeys": ["env"],
2458        });
2459        let req = make_request("UntagResource", &body.to_string());
2460        svc.untag_resource(&req).unwrap();
2461
2462        // Verify empty
2463        let req = make_request(
2464            "ListTagsForResource",
2465            &json!({"resourceArn": arn}).to_string(),
2466        );
2467        let resp = svc.list_tags_for_resource(&req).unwrap();
2468        let b = body_json(&resp);
2469        assert!(b["tags"].as_array().unwrap().is_empty());
2470    }
2471
2472    #[test]
2473    fn tag_resource_not_found() {
2474        let svc = StepFunctionsService::new(make_state());
2475        let body = json!({
2476            "resourceArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
2477            "tags": [{"key": "k", "value": "v"}],
2478        });
2479        let req = make_request("TagResource", &body.to_string());
2480        let err = expect_err(svc.tag_resource(&req));
2481        assert!(err.to_string().contains("ResourceNotFound"));
2482    }
2483
2484    // ── Helper function tests ──
2485
2486    #[test]
2487    fn test_validate_name() {
2488        assert!(validate_name("valid-name").is_ok());
2489        assert!(validate_name("under_score").is_ok());
2490        assert!(validate_name("").is_err());
2491        assert!(validate_name("has spaces").is_err());
2492        assert!(validate_name(&"a".repeat(81)).is_err());
2493    }
2494
2495    #[test]
2496    fn test_validate_definition() {
2497        assert!(validate_definition(VALID_DEF).is_ok());
2498        assert!(validate_definition("not json").is_err());
2499        assert!(validate_definition(r#"{"States":{}}"#).is_err()); // missing StartAt
2500        assert!(validate_definition(r#"{"StartAt":"S"}"#).is_err()); // missing States
2501    }
2502
2503    #[test]
2504    fn test_validate_arn() {
2505        assert!(validate_arn("arn:aws:states:us-east-1:123:sm:test").is_ok());
2506        assert!(validate_arn("not-an-arn").is_err());
2507    }
2508
2509    #[test]
2510    fn test_camel_to_details_key() {
2511        assert_eq!(camel_to_details_key("PassStateEntered"), "passStateEntered");
2512        assert_eq!(camel_to_details_key(""), "");
2513    }
2514
2515    #[test]
2516    fn test_is_mutating_action() {
2517        assert!(is_mutating_action("CreateStateMachine"));
2518        assert!(is_mutating_action("StartExecution"));
2519        assert!(!is_mutating_action("DescribeStateMachine"));
2520        assert!(!is_mutating_action("ListStateMachines"));
2521    }
2522
2523    // ── StartSyncExecution ──
2524
2525    fn create_express_sm(svc: &StepFunctionsService, name: &str) -> String {
2526        let body = json!({
2527            "name": name,
2528            "definition": VALID_DEF,
2529            "roleArn": "arn:aws:iam::123456789012:role/test",
2530            "type": "EXPRESS",
2531        });
2532        let req = make_request("CreateStateMachine", &body.to_string());
2533        let resp = svc.create_state_machine(&req).unwrap();
2534        let b = body_json(&resp);
2535        b["stateMachineArn"].as_str().unwrap().to_string()
2536    }
2537
2538    #[tokio::test]
2539    async fn start_sync_execution_basic() {
2540        let svc = StepFunctionsService::new(make_state());
2541        let arn = create_express_sm(&svc, "sync-sm");
2542
2543        let body = json!({
2544            "stateMachineArn": arn,
2545            "input": r#"{"key":"value"}"#,
2546        });
2547        let req = make_request("StartSyncExecution", &body.to_string());
2548        let resp = svc.start_sync_execution(&req).await.unwrap();
2549        let b = body_json(&resp);
2550        assert!(b["executionArn"]
2551            .as_str()
2552            .unwrap()
2553            .contains("express:sync-sm"));
2554        assert_eq!(b["stateMachineArn"], arn);
2555        assert_eq!(b["status"], "SUCCEEDED");
2556        assert!(b["startDate"].as_i64().is_some());
2557        assert!(b["stopDate"].as_i64().is_some());
2558        assert!(b["output"].as_str().is_some());
2559        assert!(b["billingDetails"]["billedDurationInMilliseconds"]
2560            .as_i64()
2561            .is_some());
2562    }
2563
2564    #[tokio::test]
2565    async fn start_sync_execution_not_express() {
2566        let svc = StepFunctionsService::new(make_state());
2567        let arn = create_sm(&svc, "std-sm");
2568
2569        let body = json!({"stateMachineArn": arn});
2570        let req = make_request("StartSyncExecution", &body.to_string());
2571        let err = expect_err(svc.start_sync_execution(&req).await);
2572        assert!(err.to_string().contains("StateMachineTypeNotSupported"));
2573    }
2574
2575    #[tokio::test]
2576    async fn start_sync_execution_sm_not_found() {
2577        let svc = StepFunctionsService::new(make_state());
2578        let body = json!({
2579            "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
2580        });
2581        let req = make_request("StartSyncExecution", &body.to_string());
2582        let err = expect_err(svc.start_sync_execution(&req).await);
2583        assert!(err.to_string().contains("StateMachineDoesNotExist"));
2584    }
2585
2586    #[tokio::test]
2587    async fn start_sync_execution_records_introspection_fields() {
2588        let svc = StepFunctionsService::new(make_state());
2589        let arn = create_express_sm(&svc, "sync-introspect");
2590
2591        let body = json!({"stateMachineArn": arn, "input": "{}"});
2592        let req = make_request("StartSyncExecution", &body.to_string());
2593        let resp = svc.start_sync_execution(&req).await.unwrap();
2594        let b = body_json(&resp);
2595        let exec_arn = b["executionArn"].as_str().unwrap().to_string();
2596
2597        let accounts = svc.state.read();
2598        let state = accounts.get("123456789012").unwrap();
2599        let stored = state
2600            .executions
2601            .get(&exec_arn)
2602            .expect("sync execution should be persisted for introspection");
2603        assert!(stored.is_sync, "sync executions must be marked is_sync");
2604        assert_eq!(stored.billed_memory_mb, Some(64));
2605        assert!(
2606            stored.billed_duration_ms.is_some(),
2607            "billed_duration_ms must be populated after sync run"
2608        );
2609        assert!(
2610            stored.parent_execution_arn.is_none(),
2611            "top-level sync execution has no parent"
2612        );
2613    }
2614
2615    #[tokio::test]
2616    async fn start_sync_execution_invalid_input() {
2617        let svc = StepFunctionsService::new(make_state());
2618        let arn = create_express_sm(&svc, "bad-input-sync");
2619
2620        let body = json!({
2621            "stateMachineArn": arn,
2622            "input": "not json",
2623        });
2624        let req = make_request("StartSyncExecution", &body.to_string());
2625        let err = expect_err(svc.start_sync_execution(&req).await);
2626        assert!(err.to_string().contains("InvalidExecutionInput"));
2627    }
2628}