Skip to main content

fakecloud_stepfunctions/
service.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use chrono::Utc;
6use http::StatusCode;
7use serde_json::{json, Value};
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::pagination::paginate;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_core::validation::*;
14use fakecloud_dynamodb::state::SharedDynamoDbState;
15use fakecloud_persistence::SnapshotStore;
16
17use crate::interpreter;
18use crate::state::{
19    Execution, ExecutionStatus, SharedStepFunctionsState, StateMachine, StateMachineStatus,
20    StateMachineType, StepFunctionsSnapshot, StepFunctionsState,
21    STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
22};
23
24const SUPPORTED: &[&str] = &[
25    "CreateStateMachine",
26    "DescribeStateMachine",
27    "ListStateMachines",
28    "DeleteStateMachine",
29    "UpdateStateMachine",
30    "TagResource",
31    "UntagResource",
32    "ListTagsForResource",
33    "StartExecution",
34    "StopExecution",
35    "DescribeExecution",
36    "ListExecutions",
37    "GetExecutionHistory",
38    "DescribeStateMachineForExecution",
39];
40
41pub struct StepFunctionsService {
42    state: SharedStepFunctionsState,
43    delivery: Option<Arc<DeliveryBus>>,
44    dynamodb_state: Option<SharedDynamoDbState>,
45    snapshot_store: Option<Arc<dyn SnapshotStore>>,
46    snapshot_lock: Arc<AsyncMutex<()>>,
47}
48
49impl StepFunctionsService {
50    pub fn new(state: SharedStepFunctionsState) -> Self {
51        Self {
52            state,
53            delivery: None,
54            dynamodb_state: None,
55            snapshot_store: None,
56            snapshot_lock: Arc::new(AsyncMutex::new(())),
57        }
58    }
59
60    pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
61        self.delivery = Some(delivery);
62        self
63    }
64
65    pub fn with_dynamodb(mut self, dynamodb_state: SharedDynamoDbState) -> Self {
66        self.dynamodb_state = Some(dynamodb_state);
67        self
68    }
69
70    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
71        self.snapshot_store = Some(store);
72        self
73    }
74
75    async fn save_snapshot(&self) {
76        let Some(store) = self.snapshot_store.clone() else {
77            return;
78        };
79        let _guard = self.snapshot_lock.lock().await;
80        let snapshot = StepFunctionsSnapshot {
81            schema_version: STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
82            state: None,
83            accounts: Some(self.state.read().clone()),
84        };
85        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
86            let bytes = serde_json::to_vec(&snapshot)
87                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
88            store.save(&bytes)
89        })
90        .await;
91        match join {
92            Ok(Ok(())) => {}
93            Ok(Err(err)) => tracing::error!(%err, "failed to write stepfunctions snapshot"),
94            Err(err) => tracing::error!(%err, "stepfunctions snapshot task panicked"),
95        }
96    }
97}
98
99fn is_mutating_action(action: &str) -> bool {
100    matches!(
101        action,
102        "CreateStateMachine"
103            | "DeleteStateMachine"
104            | "UpdateStateMachine"
105            | "TagResource"
106            | "UntagResource"
107            | "StartExecution"
108            | "StopExecution"
109    )
110}
111
112#[async_trait]
113impl AwsService for StepFunctionsService {
114    fn service_name(&self) -> &str {
115        "states"
116    }
117
118    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
119        let mutates = is_mutating_action(req.action.as_str());
120        let result = match req.action.as_str() {
121            "CreateStateMachine" => self.create_state_machine(&req),
122            "DescribeStateMachine" => self.describe_state_machine(&req),
123            "ListStateMachines" => self.list_state_machines(&req),
124            "DeleteStateMachine" => self.delete_state_machine(&req),
125            "UpdateStateMachine" => self.update_state_machine(&req),
126            "TagResource" => self.tag_resource(&req),
127            "UntagResource" => self.untag_resource(&req),
128            "ListTagsForResource" => self.list_tags_for_resource(&req),
129            "StartExecution" => self.start_execution(&req),
130            "StopExecution" => self.stop_execution(&req),
131            "DescribeExecution" => self.describe_execution(&req),
132            "ListExecutions" => self.list_executions(&req),
133            "GetExecutionHistory" => self.get_execution_history(&req),
134            "DescribeStateMachineForExecution" => self.describe_state_machine_for_execution(&req),
135            _ => Err(AwsServiceError::action_not_implemented(
136                "states",
137                &req.action,
138            )),
139        };
140        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
141            self.save_snapshot().await;
142        }
143        result
144    }
145
146    fn supported_actions(&self) -> &[&str] {
147        SUPPORTED
148    }
149}
150
151impl StepFunctionsService {
152    // ─── State Machine CRUD ─────────────────────────────────────────────
153
154    fn create_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
155        let body = req.json_body();
156
157        validate_required("name", &body["name"])?;
158        let name = body["name"].as_str().ok_or_else(|| missing("name"))?;
159        validate_name(name)?;
160
161        validate_required("definition", &body["definition"])?;
162        let definition = body["definition"]
163            .as_str()
164            .ok_or_else(|| missing("definition"))?;
165        validate_definition(definition)?;
166
167        validate_required("roleArn", &body["roleArn"])?;
168        let role_arn = body["roleArn"].as_str().ok_or_else(|| missing("roleArn"))?;
169        validate_arn(role_arn)?;
170
171        let machine_type = if let Some(t) = body["type"].as_str() {
172            StateMachineType::parse(t).ok_or_else(|| {
173                AwsServiceError::aws_error(
174                    StatusCode::BAD_REQUEST,
175                    "ValidationException",
176                    format!(
177                        "Value '{t}' at 'type' failed to satisfy constraint: \
178                         Member must satisfy enum value set: [STANDARD, EXPRESS]"
179                    ),
180                )
181            })?
182        } else {
183            StateMachineType::Standard
184        };
185
186        let mut accounts = self.state.write();
187        let state = accounts.get_or_create(&req.account_id);
188        let arn = state.state_machine_arn(name);
189
190        // Check if name already exists
191        if state.state_machines.values().any(|sm| sm.name == name) {
192            return Err(AwsServiceError::aws_error(
193                StatusCode::CONFLICT,
194                "StateMachineAlreadyExists",
195                format!("State Machine Already Exists: '{arn}'"),
196            ));
197        }
198
199        let now = Utc::now();
200        let revision_id = uuid::Uuid::new_v4().to_string();
201
202        let mut tags = HashMap::new();
203        if !body["tags"].is_null() {
204            fakecloud_core::tags::apply_tags(&mut tags, &body, "tags", "key", "value").map_err(
205                |f| {
206                    AwsServiceError::aws_error(
207                        StatusCode::BAD_REQUEST,
208                        "ValidationException",
209                        format!("{f} must be a list"),
210                    )
211                },
212            )?;
213        }
214
215        let sm = StateMachine {
216            name: name.to_string(),
217            arn: arn.clone(),
218            definition: definition.to_string(),
219            role_arn: role_arn.to_string(),
220            machine_type,
221            status: StateMachineStatus::Active,
222            creation_date: now,
223            update_date: now,
224            tags,
225            revision_id: revision_id.clone(),
226            logging_configuration: body.get("loggingConfiguration").cloned(),
227            tracing_configuration: body.get("tracingConfiguration").cloned(),
228            description: body["description"].as_str().unwrap_or("").to_string(),
229        };
230
231        state.state_machines.insert(arn.clone(), sm);
232
233        Ok(AwsResponse::ok_json(json!({
234            "stateMachineArn": arn,
235            "creationDate": now.timestamp() as f64,
236            "stateMachineVersionArn": arn,
237        })))
238    }
239
240    fn describe_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
241        let body = req.json_body();
242        validate_required("stateMachineArn", &body["stateMachineArn"])?;
243        let arn = body["stateMachineArn"]
244            .as_str()
245            .ok_or_else(|| missing("stateMachineArn"))?;
246        validate_arn(arn)?;
247
248        let accounts = self.state.read();
249        let empty = StepFunctionsState::new(&req.account_id, &req.region);
250        let state = accounts.get(&req.account_id).unwrap_or(&empty);
251        let sm = state
252            .state_machines
253            .get(arn)
254            .ok_or_else(|| state_machine_not_found(arn))?;
255
256        Ok(AwsResponse::ok_json(state_machine_to_json(sm)))
257    }
258
259    fn list_state_machines(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
260        let body = req.json_body();
261        let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
262        validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
263        let next_token = body["nextToken"].as_str();
264
265        let accounts = self.state.read();
266        let empty = StepFunctionsState::new(&req.account_id, &req.region);
267        let state = accounts.get(&req.account_id).unwrap_or(&empty);
268        let mut machines: Vec<&StateMachine> = state.state_machines.values().collect();
269        machines.sort_by(|a, b| a.name.cmp(&b.name));
270
271        let items: Vec<Value> = machines
272            .iter()
273            .map(|sm| {
274                json!({
275                    "name": sm.name,
276                    "stateMachineArn": sm.arn,
277                    "type": sm.machine_type.as_str(),
278                    "creationDate": sm.creation_date.timestamp() as f64,
279                })
280            })
281            .collect();
282
283        let (page, token) = paginate(&items, next_token, max_results);
284
285        let mut resp = json!({ "stateMachines": page });
286        if let Some(t) = token {
287            resp["nextToken"] = json!(t);
288        }
289        Ok(AwsResponse::ok_json(resp))
290    }
291
292    fn delete_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
293        let body = req.json_body();
294        validate_required("stateMachineArn", &body["stateMachineArn"])?;
295        let arn = body["stateMachineArn"]
296            .as_str()
297            .ok_or_else(|| missing("stateMachineArn"))?;
298        validate_arn(arn)?;
299
300        let mut accounts = self.state.write();
301        let state = accounts.get_or_create(&req.account_id);
302        // AWS returns success even if it doesn't exist
303        state.state_machines.remove(arn);
304
305        Ok(AwsResponse::ok_json(json!({})))
306    }
307
308    fn update_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
309        let body = req.json_body();
310        validate_required("stateMachineArn", &body["stateMachineArn"])?;
311        let arn = body["stateMachineArn"]
312            .as_str()
313            .ok_or_else(|| missing("stateMachineArn"))?;
314        validate_arn(arn)?;
315
316        let mut accounts = self.state.write();
317        let state = accounts.get_or_create(&req.account_id);
318        let sm = state
319            .state_machines
320            .get_mut(arn)
321            .ok_or_else(|| state_machine_not_found(arn))?;
322
323        if let Some(definition) = body["definition"].as_str() {
324            validate_definition(definition)?;
325            sm.definition = definition.to_string();
326        }
327
328        if let Some(role_arn) = body["roleArn"].as_str() {
329            validate_arn(role_arn)?;
330            sm.role_arn = role_arn.to_string();
331        }
332
333        if let Some(logging) = body.get("loggingConfiguration") {
334            sm.logging_configuration = Some(logging.clone());
335        }
336
337        if let Some(tracing) = body.get("tracingConfiguration") {
338            sm.tracing_configuration = Some(tracing.clone());
339        }
340
341        if let Some(description) = body["description"].as_str() {
342            sm.description = description.to_string();
343        }
344
345        let now = Utc::now();
346        sm.update_date = now;
347        sm.revision_id = uuid::Uuid::new_v4().to_string();
348
349        let revision_id = sm.revision_id.clone();
350        let sm_arn = sm.arn.clone();
351
352        Ok(AwsResponse::ok_json(json!({
353            "updateDate": now.timestamp() as f64,
354            "revisionId": revision_id,
355            "stateMachineVersionArn": sm_arn,
356        })))
357    }
358
359    // ─── Execution Lifecycle ──────────────────────────────────────────
360
361    fn start_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
362        let body = req.json_body();
363        validate_required("stateMachineArn", &body["stateMachineArn"])?;
364        let sm_arn = body["stateMachineArn"]
365            .as_str()
366            .ok_or_else(|| missing("stateMachineArn"))?;
367        validate_arn(sm_arn)?;
368
369        let input = body["input"].as_str().map(|s| s.to_string());
370
371        // Validate input is valid JSON if provided
372        if let Some(ref input_str) = input {
373            let _: serde_json::Value = serde_json::from_str(input_str).map_err(|_| {
374                AwsServiceError::aws_error(
375                    StatusCode::BAD_REQUEST,
376                    "InvalidExecutionInput",
377                    "Invalid execution input: must be valid JSON".to_string(),
378                )
379            })?;
380        }
381
382        let execution_name = body["name"]
383            .as_str()
384            .map(|s| s.to_string())
385            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
386
387        if let Some(name) = body["name"].as_str() {
388            validate_name(name)?;
389        }
390
391        let mut accounts = self.state.write();
392        let state = accounts.get_or_create(&req.account_id);
393        let sm = state
394            .state_machines
395            .get(sm_arn)
396            .ok_or_else(|| state_machine_not_found(sm_arn))?;
397
398        let sm_name = sm.name.clone();
399        let definition = sm.definition.clone();
400        let exec_arn = state.execution_arn(&sm_name, &execution_name);
401
402        // Check for duplicate execution name
403        if state.executions.contains_key(&exec_arn) {
404            return Err(AwsServiceError::aws_error(
405                StatusCode::CONFLICT,
406                "ExecutionAlreadyExists",
407                format!("Execution Already Exists: '{exec_arn}'"),
408            ));
409        }
410
411        let now = Utc::now();
412        let execution = Execution {
413            execution_arn: exec_arn.clone(),
414            state_machine_arn: sm_arn.to_string(),
415            state_machine_name: sm_name,
416            name: execution_name,
417            status: ExecutionStatus::Running,
418            input: input.clone(),
419            output: None,
420            start_date: now,
421            stop_date: None,
422            error: None,
423            cause: None,
424            history_events: vec![],
425        };
426
427        state.executions.insert(exec_arn.clone(), execution);
428        drop(accounts);
429
430        // Spawn async execution
431        let shared_state = self.state.clone();
432        let exec_arn_clone = exec_arn.clone();
433        let input_clone = input;
434        let delivery = self.delivery.clone();
435        let dynamodb_state = self.dynamodb_state.clone();
436        tokio::spawn(async move {
437            interpreter::execute_state_machine(
438                shared_state,
439                exec_arn_clone,
440                definition,
441                input_clone,
442                delivery,
443                dynamodb_state,
444            )
445            .await;
446        });
447
448        Ok(AwsResponse::ok_json(json!({
449            "executionArn": exec_arn,
450            "startDate": now.timestamp() as f64,
451        })))
452    }
453
454    fn stop_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
455        let body = req.json_body();
456        validate_required("executionArn", &body["executionArn"])?;
457        let exec_arn = body["executionArn"]
458            .as_str()
459            .ok_or_else(|| missing("executionArn"))?;
460
461        let error = body["error"].as_str().map(|s| s.to_string());
462        let cause = body["cause"].as_str().map(|s| s.to_string());
463
464        let mut accounts = self.state.write();
465        let state = accounts.get_or_create(&req.account_id);
466        let exec = state
467            .executions
468            .get_mut(exec_arn)
469            .ok_or_else(|| execution_not_found(exec_arn))?;
470
471        if exec.status != ExecutionStatus::Running {
472            return Err(AwsServiceError::aws_error(
473                StatusCode::BAD_REQUEST,
474                "ExecutionNotRunning",
475                format!("Execution is not running: '{exec_arn}'"),
476            ));
477        }
478
479        let now = Utc::now();
480        exec.status = ExecutionStatus::Aborted;
481        exec.stop_date = Some(now);
482        exec.error = error;
483        exec.cause = cause;
484
485        Ok(AwsResponse::ok_json(json!({
486            "stopDate": now.timestamp() as f64,
487        })))
488    }
489
490    fn describe_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
491        let body = req.json_body();
492        validate_required("executionArn", &body["executionArn"])?;
493        let exec_arn = body["executionArn"]
494            .as_str()
495            .ok_or_else(|| missing("executionArn"))?;
496
497        let accounts = self.state.read();
498        let empty = StepFunctionsState::new(&req.account_id, &req.region);
499        let state = accounts.get(&req.account_id).unwrap_or(&empty);
500        let exec = state
501            .executions
502            .get(exec_arn)
503            .ok_or_else(|| execution_not_found(exec_arn))?;
504
505        Ok(AwsResponse::ok_json(execution_to_json(exec)))
506    }
507
508    fn list_executions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
509        let body = req.json_body();
510        validate_required("stateMachineArn", &body["stateMachineArn"])?;
511        let sm_arn = body["stateMachineArn"]
512            .as_str()
513            .ok_or_else(|| missing("stateMachineArn"))?;
514        validate_arn(sm_arn)?;
515
516        let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
517        validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
518        let next_token = body["nextToken"].as_str();
519        let status_filter = body["statusFilter"].as_str();
520
521        let accounts = self.state.read();
522        let empty = StepFunctionsState::new(&req.account_id, &req.region);
523        let state = accounts.get(&req.account_id).unwrap_or(&empty);
524
525        // Verify state machine exists
526        if !state.state_machines.contains_key(sm_arn) {
527            return Err(state_machine_not_found(sm_arn));
528        }
529
530        let mut executions: Vec<&Execution> = state
531            .executions
532            .values()
533            .filter(|e| e.state_machine_arn == sm_arn)
534            .filter(|e| {
535                status_filter
536                    .map(|sf| e.status.as_str() == sf)
537                    .unwrap_or(true)
538            })
539            .collect();
540
541        // Sort by start date descending (most recent first)
542        executions.sort_by_key(|e| std::cmp::Reverse(e.start_date));
543
544        let items: Vec<Value> = executions
545            .iter()
546            .map(|e| {
547                let mut item = json!({
548                    "executionArn": e.execution_arn,
549                    "stateMachineArn": e.state_machine_arn,
550                    "name": e.name,
551                    "status": e.status.as_str(),
552                    "startDate": e.start_date.timestamp() as f64,
553                });
554                if let Some(stop) = e.stop_date {
555                    item["stopDate"] = json!(stop.timestamp() as f64);
556                }
557                item
558            })
559            .collect();
560
561        let (page, token) = paginate(&items, next_token, max_results);
562
563        let mut resp = json!({ "executions": page });
564        if let Some(t) = token {
565            resp["nextToken"] = json!(t);
566        }
567        Ok(AwsResponse::ok_json(resp))
568    }
569
570    fn get_execution_history(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
571        let body = req.json_body();
572        validate_required("executionArn", &body["executionArn"])?;
573        let exec_arn = body["executionArn"]
574            .as_str()
575            .ok_or_else(|| missing("executionArn"))?;
576
577        let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
578        validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
579        let next_token = body["nextToken"].as_str();
580        let reverse_order = body["reverseOrder"].as_bool().unwrap_or(false);
581
582        let accounts = self.state.read();
583        let empty = StepFunctionsState::new(&req.account_id, &req.region);
584        let state = accounts.get(&req.account_id).unwrap_or(&empty);
585        let exec = state
586            .executions
587            .get(exec_arn)
588            .ok_or_else(|| execution_not_found(exec_arn))?;
589
590        let mut events: Vec<Value> = exec
591            .history_events
592            .iter()
593            .map(|e| {
594                json!({
595                    "id": e.id,
596                    "type": e.event_type,
597                    "timestamp": e.timestamp.timestamp() as f64,
598                    "previousEventId": e.previous_event_id,
599                    format!("{}EventDetails", camel_to_details_key(&e.event_type)): e.details,
600                })
601            })
602            .collect();
603
604        if reverse_order {
605            events.reverse();
606        }
607
608        let (page, token) = paginate(&events, next_token, max_results);
609
610        let mut resp = json!({ "events": page });
611        if let Some(t) = token {
612            resp["nextToken"] = json!(t);
613        }
614        Ok(AwsResponse::ok_json(resp))
615    }
616
617    fn describe_state_machine_for_execution(
618        &self,
619        req: &AwsRequest,
620    ) -> Result<AwsResponse, AwsServiceError> {
621        let body = req.json_body();
622        validate_required("executionArn", &body["executionArn"])?;
623        let exec_arn = body["executionArn"]
624            .as_str()
625            .ok_or_else(|| missing("executionArn"))?;
626
627        let accounts = self.state.read();
628        let empty = StepFunctionsState::new(&req.account_id, &req.region);
629        let state = accounts.get(&req.account_id).unwrap_or(&empty);
630        let exec = state
631            .executions
632            .get(exec_arn)
633            .ok_or_else(|| execution_not_found(exec_arn))?;
634
635        let sm = state
636            .state_machines
637            .get(&exec.state_machine_arn)
638            .ok_or_else(|| state_machine_not_found(&exec.state_machine_arn))?;
639
640        Ok(AwsResponse::ok_json(state_machine_to_json(sm)))
641    }
642
643    // ─── Tagging ────────────────────────────────────────────────────────
644
645    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
646        let body = req.json_body();
647        validate_required("resourceArn", &body["resourceArn"])?;
648        let arn = body["resourceArn"]
649            .as_str()
650            .ok_or_else(|| missing("resourceArn"))?;
651        validate_arn(arn)?;
652        validate_required("tags", &body["tags"])?;
653
654        let mut accounts = self.state.write();
655        let state = accounts.get_or_create(&req.account_id);
656        let sm = state
657            .state_machines
658            .get_mut(arn)
659            .ok_or_else(|| resource_not_found(arn))?;
660
661        fakecloud_core::tags::apply_tags(&mut sm.tags, &body, "tags", "key", "value").map_err(
662            |f| {
663                AwsServiceError::aws_error(
664                    StatusCode::BAD_REQUEST,
665                    "ValidationException",
666                    format!("{f} must be a list"),
667                )
668            },
669        )?;
670
671        Ok(AwsResponse::ok_json(json!({})))
672    }
673
674    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
675        let body = req.json_body();
676        validate_required("resourceArn", &body["resourceArn"])?;
677        let arn = body["resourceArn"]
678            .as_str()
679            .ok_or_else(|| missing("resourceArn"))?;
680        validate_arn(arn)?;
681        validate_required("tagKeys", &body["tagKeys"])?;
682
683        let mut accounts = self.state.write();
684        let state = accounts.get_or_create(&req.account_id);
685        let sm = state
686            .state_machines
687            .get_mut(arn)
688            .ok_or_else(|| resource_not_found(arn))?;
689
690        fakecloud_core::tags::remove_tags(&mut sm.tags, &body, "tagKeys").map_err(|f| {
691            AwsServiceError::aws_error(
692                StatusCode::BAD_REQUEST,
693                "ValidationException",
694                format!("{f} must be a list"),
695            )
696        })?;
697
698        Ok(AwsResponse::ok_json(json!({})))
699    }
700
701    fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
702        let body = req.json_body();
703        validate_required("resourceArn", &body["resourceArn"])?;
704        let arn = body["resourceArn"]
705            .as_str()
706            .ok_or_else(|| missing("resourceArn"))?;
707        validate_arn(arn)?;
708
709        let accounts = self.state.read();
710        let empty = StepFunctionsState::new(&req.account_id, &req.region);
711        let state = accounts.get(&req.account_id).unwrap_or(&empty);
712        let sm = state
713            .state_machines
714            .get(arn)
715            .ok_or_else(|| resource_not_found(arn))?;
716
717        let tags = fakecloud_core::tags::tags_to_json(&sm.tags, "key", "value");
718
719        Ok(AwsResponse::ok_json(json!({ "tags": tags })))
720    }
721}
722
723// ─── Helpers ────────────────────────────────────────────────────────────
724
725fn state_machine_to_json(sm: &StateMachine) -> Value {
726    let mut resp = json!({
727        "name": sm.name,
728        "stateMachineArn": sm.arn,
729        "definition": sm.definition,
730        "roleArn": sm.role_arn,
731        "type": sm.machine_type.as_str(),
732        "status": sm.status.as_str(),
733        "creationDate": sm.creation_date.timestamp() as f64,
734        "updateDate": sm.update_date.timestamp() as f64,
735        "revisionId": sm.revision_id,
736        "label": sm.name,
737    });
738
739    if !sm.description.is_empty() {
740        resp["description"] = json!(sm.description);
741    }
742
743    if let Some(ref logging) = sm.logging_configuration {
744        resp["loggingConfiguration"] = logging.clone();
745    } else {
746        resp["loggingConfiguration"] = json!({
747            "level": "OFF",
748            "includeExecutionData": false,
749            "destinations": [],
750        });
751    }
752
753    if let Some(ref tracing) = sm.tracing_configuration {
754        resp["tracingConfiguration"] = tracing.clone();
755    } else {
756        resp["tracingConfiguration"] = json!({
757            "enabled": false,
758        });
759    }
760
761    resp
762}
763
764fn missing(name: &str) -> AwsServiceError {
765    AwsServiceError::aws_error(
766        StatusCode::BAD_REQUEST,
767        "ValidationException",
768        format!("The request must contain the parameter {name}."),
769    )
770}
771
772fn state_machine_not_found(arn: &str) -> AwsServiceError {
773    AwsServiceError::aws_error(
774        StatusCode::BAD_REQUEST,
775        "StateMachineDoesNotExist",
776        format!("State Machine Does Not Exist: '{arn}'"),
777    )
778}
779
780fn resource_not_found(arn: &str) -> AwsServiceError {
781    AwsServiceError::aws_error(
782        StatusCode::BAD_REQUEST,
783        "ResourceNotFound",
784        format!("Resource not found: '{arn}'"),
785    )
786}
787
788fn validate_name(name: &str) -> Result<(), AwsServiceError> {
789    if name.is_empty() || name.len() > 80 {
790        return Err(AwsServiceError::aws_error(
791            StatusCode::BAD_REQUEST,
792            "InvalidName",
793            format!("Invalid Name: '{name}' (length must be between 1 and 80 characters)"),
794        ));
795    }
796    // Only allow alphanumeric, hyphens, and underscores
797    if !name
798        .chars()
799        .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
800    {
801        return Err(AwsServiceError::aws_error(
802            StatusCode::BAD_REQUEST,
803            "InvalidName",
804            format!(
805                "Invalid Name: '{name}' (must only contain alphanumeric characters, hyphens, and underscores)"
806            ),
807        ));
808    }
809    Ok(())
810}
811
812fn validate_definition(definition: &str) -> Result<(), AwsServiceError> {
813    let parsed: Value = serde_json::from_str(definition).map_err(|e| {
814        AwsServiceError::aws_error(
815            StatusCode::BAD_REQUEST,
816            "InvalidDefinition",
817            format!("Invalid State Machine Definition: '{e}'"),
818        )
819    })?;
820
821    if parsed.get("StartAt").and_then(|v| v.as_str()).is_none() {
822        return Err(AwsServiceError::aws_error(
823            StatusCode::BAD_REQUEST,
824            "InvalidDefinition",
825            "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
826                .to_string(),
827        ));
828    }
829
830    let states_obj = parsed
831        .get("States")
832        .and_then(|v| v.as_object())
833        .ok_or_else(|| {
834            AwsServiceError::aws_error(
835                StatusCode::BAD_REQUEST,
836                "InvalidDefinition",
837                "Invalid State Machine Definition: 'MISSING_STATES' (States field is required)"
838                    .to_string(),
839            )
840        })?;
841
842    let start_at = parsed["StartAt"].as_str().ok_or_else(|| {
843        AwsServiceError::aws_error(
844            StatusCode::BAD_REQUEST,
845            "InvalidDefinition",
846            "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
847                .to_string(),
848        )
849    })?;
850    if !states_obj.contains_key(start_at) {
851        return Err(AwsServiceError::aws_error(
852            StatusCode::BAD_REQUEST,
853            "InvalidDefinition",
854            format!(
855                "Invalid State Machine Definition: 'MISSING_TRANSITION_TARGET' \
856                 (StartAt '{start_at}' does not reference a valid state)"
857            ),
858        ));
859    }
860
861    Ok(())
862}
863
864fn execution_not_found(arn: &str) -> AwsServiceError {
865    AwsServiceError::aws_error(
866        StatusCode::BAD_REQUEST,
867        "ExecutionDoesNotExist",
868        format!("Execution Does Not Exist: '{arn}'"),
869    )
870}
871
872fn execution_to_json(exec: &Execution) -> Value {
873    let mut resp = json!({
874        "executionArn": exec.execution_arn,
875        "stateMachineArn": exec.state_machine_arn,
876        "name": exec.name,
877        "status": exec.status.as_str(),
878        "startDate": exec.start_date.timestamp() as f64,
879    });
880
881    if let Some(ref input) = exec.input {
882        resp["input"] = json!(input);
883    }
884    if let Some(ref output) = exec.output {
885        resp["output"] = json!(output);
886    }
887    if let Some(stop) = exec.stop_date {
888        resp["stopDate"] = json!(stop.timestamp() as f64);
889    }
890    if let Some(ref error) = exec.error {
891        resp["error"] = json!(error);
892    }
893    if let Some(ref cause) = exec.cause {
894        resp["cause"] = json!(cause);
895    }
896
897    resp
898}
899
900/// Convert event type like "PassStateEntered" to the details key format "passStateEntered".
901fn camel_to_details_key(event_type: &str) -> String {
902    let mut chars = event_type.chars();
903    match chars.next() {
904        None => String::new(),
905        Some(c) => c.to_lowercase().to_string() + chars.as_str(),
906    }
907}
908
909fn validate_arn(arn: &str) -> Result<(), AwsServiceError> {
910    if !arn.starts_with("arn:") {
911        return Err(AwsServiceError::aws_error(
912            StatusCode::BAD_REQUEST,
913            "InvalidArn",
914            format!("Invalid Arn: '{arn}'"),
915        ));
916    }
917    Ok(())
918}
919
920/// Start a Step Functions execution from a cross-service delivery (e.g. EventBridge).
921///
922/// This is the public entry point used by `StepFunctionsDeliveryImpl` in the server crate.
923/// It mirrors the logic from `StartExecution` but without the AWS request/response wrapper.
924/// Start a Step Functions execution from a cross-service delivery (e.g. EventBridge).
925///
926/// This is the public entry point used by `StepFunctionsDeliveryImpl` in the server crate.
927/// It mirrors the logic from `StartExecution` but without the AWS request/response wrapper.
928pub fn start_execution_from_delivery(
929    state: &SharedStepFunctionsState,
930    delivery: &Option<Arc<DeliveryBus>>,
931    dynamodb_state: &Option<SharedDynamoDbState>,
932    state_machine_arn: &str,
933    input: &str,
934) {
935    // Validate input is valid JSON
936    if serde_json::from_str::<serde_json::Value>(input).is_err() {
937        tracing::warn!(
938            state_machine_arn,
939            "Step Functions delivery: invalid JSON input, skipping execution"
940        );
941        return;
942    }
943
944    let execution_name = uuid::Uuid::new_v4().to_string();
945
946    // Extract account_id from the state machine ARN
947    let account_id = state_machine_arn
948        .split(':')
949        .nth(4)
950        .unwrap_or("000000000000")
951        .to_string();
952
953    let mut accounts = state.write();
954    let st = accounts.get_or_create(&account_id);
955    let sm = match st.state_machines.get(state_machine_arn) {
956        Some(sm) => sm,
957        None => {
958            tracing::warn!(
959                state_machine_arn,
960                "Step Functions delivery: state machine not found"
961            );
962            return;
963        }
964    };
965
966    let sm_name = sm.name.clone();
967    let definition = sm.definition.clone();
968    let exec_arn = st.execution_arn(&sm_name, &execution_name);
969
970    let now = Utc::now();
971    let execution = Execution {
972        execution_arn: exec_arn.clone(),
973        state_machine_arn: state_machine_arn.to_string(),
974        state_machine_name: sm_name,
975        name: execution_name,
976        status: ExecutionStatus::Running,
977        input: Some(input.to_string()),
978        output: None,
979        start_date: now,
980        stop_date: None,
981        error: None,
982        cause: None,
983        history_events: vec![],
984    };
985
986    st.executions.insert(exec_arn.clone(), execution);
987    drop(accounts);
988
989    let shared_state = state.clone();
990    let delivery = delivery.clone();
991    let dynamodb_state = dynamodb_state.clone();
992    let input = Some(input.to_string());
993    tokio::spawn(async move {
994        interpreter::execute_state_machine(
995            shared_state,
996            exec_arn,
997            definition,
998            input,
999            delivery,
1000            dynamodb_state,
1001        )
1002        .await;
1003    });
1004}
1005
1006#[cfg(test)]
1007mod tests {
1008    use super::*;
1009    use http::{HeaderMap, Method};
1010    use parking_lot::RwLock;
1011    use serde_json::Value;
1012    use std::sync::Arc;
1013
1014    fn make_state() -> SharedStepFunctionsState {
1015        Arc::new(RwLock::new(
1016            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
1017        ))
1018    }
1019
1020    fn make_request(action: &str, body: &str) -> AwsRequest {
1021        AwsRequest {
1022            service: "states".to_string(),
1023            action: action.to_string(),
1024            region: "us-east-1".to_string(),
1025            account_id: "123456789012".to_string(),
1026            request_id: "test-id".to_string(),
1027            headers: HeaderMap::new(),
1028            query_params: HashMap::new(),
1029            body: body.as_bytes().to_vec().into(),
1030            path_segments: vec![],
1031            raw_path: "/".to_string(),
1032            raw_query: String::new(),
1033            method: Method::POST,
1034            is_query_protocol: false,
1035            access_key_id: None,
1036            principal: None,
1037        }
1038    }
1039
1040    fn body_json(resp: &AwsResponse) -> Value {
1041        serde_json::from_slice(resp.body.expect_bytes()).unwrap()
1042    }
1043
1044    fn expect_err(result: Result<AwsResponse, AwsServiceError>) -> AwsServiceError {
1045        match result {
1046            Err(e) => e,
1047            Ok(_) => panic!("expected error, got Ok"),
1048        }
1049    }
1050
1051    const VALID_DEF: &str = r#"{"StartAt":"Pass","States":{"Pass":{"Type":"Pass","End":true}}}"#;
1052
1053    fn create_sm(svc: &StepFunctionsService, name: &str) -> String {
1054        let body = json!({
1055            "name": name,
1056            "definition": VALID_DEF,
1057            "roleArn": "arn:aws:iam::123456789012:role/test",
1058        });
1059        let req = make_request("CreateStateMachine", &body.to_string());
1060        let resp = svc.create_state_machine(&req).unwrap();
1061        let b = body_json(&resp);
1062        b["stateMachineArn"].as_str().unwrap().to_string()
1063    }
1064
1065    // ── CreateStateMachine ──
1066
1067    #[test]
1068    fn create_state_machine_basic() {
1069        let svc = StepFunctionsService::new(make_state());
1070        let arn = create_sm(&svc, "test-sm");
1071        assert!(arn.contains("test-sm"));
1072    }
1073
1074    #[test]
1075    fn create_state_machine_with_express_type() {
1076        let svc = StepFunctionsService::new(make_state());
1077        let body = json!({
1078            "name": "express-sm",
1079            "definition": VALID_DEF,
1080            "roleArn": "arn:aws:iam::123456789012:role/r",
1081            "type": "EXPRESS",
1082        });
1083        let req = make_request("CreateStateMachine", &body.to_string());
1084        let resp = svc.create_state_machine(&req).unwrap();
1085        let b = body_json(&resp);
1086        assert!(b["stateMachineArn"].as_str().is_some());
1087    }
1088
1089    #[test]
1090    fn create_state_machine_duplicate_fails() {
1091        let svc = StepFunctionsService::new(make_state());
1092        create_sm(&svc, "dup-sm");
1093        let body = json!({
1094            "name": "dup-sm",
1095            "definition": VALID_DEF,
1096            "roleArn": "arn:aws:iam::123456789012:role/r",
1097        });
1098        let req = make_request("CreateStateMachine", &body.to_string());
1099        let err = expect_err(svc.create_state_machine(&req));
1100        assert!(err.to_string().contains("StateMachineAlreadyExists"));
1101    }
1102
1103    #[test]
1104    fn create_state_machine_missing_name() {
1105        let svc = StepFunctionsService::new(make_state());
1106        let body = json!({
1107            "definition": VALID_DEF,
1108            "roleArn": "arn:aws:iam::123456789012:role/r",
1109        });
1110        let req = make_request("CreateStateMachine", &body.to_string());
1111        assert!(svc.create_state_machine(&req).is_err());
1112    }
1113
1114    #[test]
1115    fn create_state_machine_invalid_definition() {
1116        let svc = StepFunctionsService::new(make_state());
1117        let body = json!({
1118            "name": "bad-def",
1119            "definition": "not json",
1120            "roleArn": "arn:aws:iam::123456789012:role/r",
1121        });
1122        let req = make_request("CreateStateMachine", &body.to_string());
1123        let err = expect_err(svc.create_state_machine(&req));
1124        assert!(err.to_string().contains("InvalidDefinition"));
1125    }
1126
1127    #[test]
1128    fn create_state_machine_definition_missing_start_at() {
1129        let svc = StepFunctionsService::new(make_state());
1130        let body = json!({
1131            "name": "no-start",
1132            "definition": r#"{"States":{"S":{"Type":"Pass","End":true}}}"#,
1133            "roleArn": "arn:aws:iam::123456789012:role/r",
1134        });
1135        let req = make_request("CreateStateMachine", &body.to_string());
1136        let err = expect_err(svc.create_state_machine(&req));
1137        assert!(err.to_string().contains("InvalidDefinition"));
1138    }
1139
1140    #[test]
1141    fn create_state_machine_definition_missing_states() {
1142        let svc = StepFunctionsService::new(make_state());
1143        let body = json!({
1144            "name": "no-states",
1145            "definition": r#"{"StartAt":"S"}"#,
1146            "roleArn": "arn:aws:iam::123456789012:role/r",
1147        });
1148        let req = make_request("CreateStateMachine", &body.to_string());
1149        let err = expect_err(svc.create_state_machine(&req));
1150        assert!(err.to_string().contains("InvalidDefinition"));
1151    }
1152
1153    #[test]
1154    fn create_state_machine_definition_start_at_not_in_states() {
1155        let svc = StepFunctionsService::new(make_state());
1156        let body = json!({
1157            "name": "bad-start",
1158            "definition": r#"{"StartAt":"Missing","States":{"S":{"Type":"Pass","End":true}}}"#,
1159            "roleArn": "arn:aws:iam::123456789012:role/r",
1160        });
1161        let req = make_request("CreateStateMachine", &body.to_string());
1162        let err = expect_err(svc.create_state_machine(&req));
1163        assert!(err.to_string().contains("MISSING_TRANSITION_TARGET"));
1164    }
1165
1166    #[test]
1167    fn create_state_machine_invalid_type() {
1168        let svc = StepFunctionsService::new(make_state());
1169        let body = json!({
1170            "name": "bad-type",
1171            "definition": VALID_DEF,
1172            "roleArn": "arn:aws:iam::123456789012:role/r",
1173            "type": "INVALID",
1174        });
1175        let req = make_request("CreateStateMachine", &body.to_string());
1176        assert!(svc.create_state_machine(&req).is_err());
1177    }
1178
1179    #[test]
1180    fn create_state_machine_invalid_arn() {
1181        let svc = StepFunctionsService::new(make_state());
1182        let body = json!({
1183            "name": "bad-arn",
1184            "definition": VALID_DEF,
1185            "roleArn": "not-an-arn",
1186        });
1187        let req = make_request("CreateStateMachine", &body.to_string());
1188        let err = expect_err(svc.create_state_machine(&req));
1189        assert!(err.to_string().contains("InvalidArn"));
1190    }
1191
1192    #[test]
1193    fn create_state_machine_invalid_name() {
1194        let svc = StepFunctionsService::new(make_state());
1195        let body = json!({
1196            "name": "has spaces!",
1197            "definition": VALID_DEF,
1198            "roleArn": "arn:aws:iam::123456789012:role/r",
1199        });
1200        let req = make_request("CreateStateMachine", &body.to_string());
1201        let err = expect_err(svc.create_state_machine(&req));
1202        assert!(err.to_string().contains("InvalidName"));
1203    }
1204
1205    #[test]
1206    fn create_state_machine_name_too_long() {
1207        let svc = StepFunctionsService::new(make_state());
1208        let long_name = "a".repeat(81);
1209        let body = json!({
1210            "name": long_name,
1211            "definition": VALID_DEF,
1212            "roleArn": "arn:aws:iam::123456789012:role/r",
1213        });
1214        let req = make_request("CreateStateMachine", &body.to_string());
1215        let err = expect_err(svc.create_state_machine(&req));
1216        assert!(err.to_string().contains("InvalidName"));
1217    }
1218
1219    // ── DescribeStateMachine ──
1220
1221    #[test]
1222    fn describe_state_machine_found() {
1223        let svc = StepFunctionsService::new(make_state());
1224        let arn = create_sm(&svc, "desc-sm");
1225
1226        let req = make_request(
1227            "DescribeStateMachine",
1228            &json!({"stateMachineArn": arn}).to_string(),
1229        );
1230        let resp = svc.describe_state_machine(&req).unwrap();
1231        let b = body_json(&resp);
1232        assert_eq!(b["name"], "desc-sm");
1233        assert_eq!(b["status"], "ACTIVE");
1234        assert!(b["definition"].as_str().is_some());
1235    }
1236
1237    #[test]
1238    fn describe_state_machine_not_found() {
1239        let svc = StepFunctionsService::new(make_state());
1240        let req = make_request(
1241            "DescribeStateMachine",
1242            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1243                .to_string(),
1244        );
1245        let err = expect_err(svc.describe_state_machine(&req));
1246        assert!(err.to_string().contains("StateMachineDoesNotExist"));
1247    }
1248
1249    // ── ListStateMachines ──
1250
1251    #[test]
1252    fn list_state_machines_empty() {
1253        let svc = StepFunctionsService::new(make_state());
1254        let req = make_request("ListStateMachines", "{}");
1255        let resp = svc.list_state_machines(&req).unwrap();
1256        let b = body_json(&resp);
1257        assert!(b["stateMachines"].as_array().unwrap().is_empty());
1258    }
1259
1260    #[test]
1261    fn list_state_machines_returns_created() {
1262        let svc = StepFunctionsService::new(make_state());
1263        create_sm(&svc, "sm-1");
1264        create_sm(&svc, "sm-2");
1265
1266        let req = make_request("ListStateMachines", "{}");
1267        let resp = svc.list_state_machines(&req).unwrap();
1268        let b = body_json(&resp);
1269        assert_eq!(b["stateMachines"].as_array().unwrap().len(), 2);
1270    }
1271
1272    // ── DeleteStateMachine ──
1273
1274    #[test]
1275    fn delete_state_machine() {
1276        let svc = StepFunctionsService::new(make_state());
1277        let arn = create_sm(&svc, "del-sm");
1278
1279        let req = make_request(
1280            "DeleteStateMachine",
1281            &json!({"stateMachineArn": arn}).to_string(),
1282        );
1283        svc.delete_state_machine(&req).unwrap();
1284
1285        // Describe should fail
1286        let req = make_request(
1287            "DescribeStateMachine",
1288            &json!({"stateMachineArn": arn}).to_string(),
1289        );
1290        assert!(svc.describe_state_machine(&req).is_err());
1291    }
1292
1293    #[test]
1294    fn delete_state_machine_nonexistent_succeeds() {
1295        let svc = StepFunctionsService::new(make_state());
1296        let req = make_request(
1297            "DeleteStateMachine",
1298            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1299                .to_string(),
1300        );
1301        // AWS returns success even for nonexistent
1302        svc.delete_state_machine(&req).unwrap();
1303    }
1304
1305    // ── UpdateStateMachine ──
1306
1307    #[test]
1308    fn update_state_machine() {
1309        let svc = StepFunctionsService::new(make_state());
1310        let arn = create_sm(&svc, "upd-sm");
1311
1312        let new_def = r#"{"StartAt":"NewPass","States":{"NewPass":{"Type":"Pass","End":true}}}"#;
1313        let body = json!({
1314            "stateMachineArn": arn,
1315            "definition": new_def,
1316            "description": "updated",
1317        });
1318        let req = make_request("UpdateStateMachine", &body.to_string());
1319        let resp = svc.update_state_machine(&req).unwrap();
1320        let b = body_json(&resp);
1321        assert!(b["updateDate"].as_f64().is_some());
1322
1323        // Verify
1324        let req = make_request(
1325            "DescribeStateMachine",
1326            &json!({"stateMachineArn": arn}).to_string(),
1327        );
1328        let resp = svc.describe_state_machine(&req).unwrap();
1329        let b = body_json(&resp);
1330        assert!(b["definition"].as_str().unwrap().contains("NewPass"));
1331        assert_eq!(b["description"], "updated");
1332    }
1333
1334    #[test]
1335    fn update_state_machine_not_found() {
1336        let svc = StepFunctionsService::new(make_state());
1337        let body = json!({
1338            "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1339            "definition": VALID_DEF,
1340        });
1341        let req = make_request("UpdateStateMachine", &body.to_string());
1342        let err = expect_err(svc.update_state_machine(&req));
1343        assert!(err.to_string().contains("StateMachineDoesNotExist"));
1344    }
1345
1346    // ── StartExecution ──
1347
1348    #[tokio::test]
1349    async fn start_execution_basic() {
1350        let svc = StepFunctionsService::new(make_state());
1351        let arn = create_sm(&svc, "exec-sm");
1352
1353        let body = json!({
1354            "stateMachineArn": arn,
1355            "input": r#"{"key":"value"}"#,
1356        });
1357        let req = make_request("StartExecution", &body.to_string());
1358        let resp = svc.start_execution(&req).unwrap();
1359        let b = body_json(&resp);
1360        assert!(b["executionArn"].as_str().is_some());
1361        assert!(b["startDate"].as_f64().is_some());
1362    }
1363
1364    #[tokio::test]
1365    async fn start_execution_with_name() {
1366        let svc = StepFunctionsService::new(make_state());
1367        let arn = create_sm(&svc, "named-exec");
1368
1369        let body = json!({
1370            "stateMachineArn": arn,
1371            "name": "my-execution",
1372        });
1373        let req = make_request("StartExecution", &body.to_string());
1374        let resp = svc.start_execution(&req).unwrap();
1375        let b = body_json(&resp);
1376        assert!(b["executionArn"].as_str().unwrap().contains("my-execution"));
1377    }
1378
1379    #[tokio::test]
1380    async fn start_execution_sm_not_found() {
1381        let svc = StepFunctionsService::new(make_state());
1382        let body = json!({
1383            "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1384        });
1385        let req = make_request("StartExecution", &body.to_string());
1386        let err = expect_err(svc.start_execution(&req));
1387        assert!(err.to_string().contains("StateMachineDoesNotExist"));
1388    }
1389
1390    #[tokio::test]
1391    async fn start_execution_invalid_input() {
1392        let svc = StepFunctionsService::new(make_state());
1393        let arn = create_sm(&svc, "bad-input");
1394
1395        let body = json!({
1396            "stateMachineArn": arn,
1397            "input": "not json",
1398        });
1399        let req = make_request("StartExecution", &body.to_string());
1400        let err = expect_err(svc.start_execution(&req));
1401        assert!(err.to_string().contains("InvalidExecutionInput"));
1402    }
1403
1404    #[tokio::test]
1405    async fn start_execution_duplicate_name() {
1406        let svc = StepFunctionsService::new(make_state());
1407        let arn = create_sm(&svc, "dup-exec");
1408
1409        let body = json!({
1410            "stateMachineArn": arn,
1411            "name": "same-name",
1412        });
1413        let req = make_request("StartExecution", &body.to_string());
1414        svc.start_execution(&req).unwrap();
1415
1416        let req = make_request("StartExecution", &body.to_string());
1417        let err = expect_err(svc.start_execution(&req));
1418        assert!(err.to_string().contains("ExecutionAlreadyExists"));
1419    }
1420
1421    // ── DescribeExecution ──
1422
1423    #[tokio::test]
1424    async fn describe_execution_found() {
1425        let svc = StepFunctionsService::new(make_state());
1426        let sm_arn = create_sm(&svc, "desc-exec");
1427
1428        let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
1429        let req = make_request("StartExecution", &body.to_string());
1430        let resp = svc.start_execution(&req).unwrap();
1431        let exec_arn = body_json(&resp)["executionArn"]
1432            .as_str()
1433            .unwrap()
1434            .to_string();
1435
1436        let req = make_request(
1437            "DescribeExecution",
1438            &json!({"executionArn": exec_arn}).to_string(),
1439        );
1440        let resp = svc.describe_execution(&req).unwrap();
1441        let b = body_json(&resp);
1442        assert_eq!(b["name"], "e1");
1443        assert_eq!(b["status"], "RUNNING");
1444    }
1445
1446    #[tokio::test]
1447    async fn describe_execution_not_found() {
1448        let svc = StepFunctionsService::new(make_state());
1449        let req = make_request(
1450            "DescribeExecution",
1451            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1452                .to_string(),
1453        );
1454        let err = expect_err(svc.describe_execution(&req));
1455        assert!(err.to_string().contains("ExecutionDoesNotExist"));
1456    }
1457
1458    // ── StopExecution ──
1459
1460    #[tokio::test]
1461    async fn stop_execution() {
1462        let svc = StepFunctionsService::new(make_state());
1463        let sm_arn = create_sm(&svc, "stop-sm");
1464
1465        let body = json!({"stateMachineArn": sm_arn, "name": "stop-e"});
1466        let req = make_request("StartExecution", &body.to_string());
1467        let resp = svc.start_execution(&req).unwrap();
1468        let exec_arn = body_json(&resp)["executionArn"]
1469            .as_str()
1470            .unwrap()
1471            .to_string();
1472
1473        let body = json!({
1474            "executionArn": exec_arn,
1475            "error": "UserAborted",
1476            "cause": "test stop",
1477        });
1478        let req = make_request("StopExecution", &body.to_string());
1479        let resp = svc.stop_execution(&req).unwrap();
1480        let b = body_json(&resp);
1481        assert!(b["stopDate"].as_f64().is_some());
1482
1483        // Verify aborted
1484        let req = make_request(
1485            "DescribeExecution",
1486            &json!({"executionArn": exec_arn}).to_string(),
1487        );
1488        let resp = svc.describe_execution(&req).unwrap();
1489        let b = body_json(&resp);
1490        assert_eq!(b["status"], "ABORTED");
1491        assert_eq!(b["error"], "UserAborted");
1492    }
1493
1494    #[tokio::test]
1495    async fn stop_execution_not_found() {
1496        let svc = StepFunctionsService::new(make_state());
1497        let req = make_request(
1498            "StopExecution",
1499            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1500                .to_string(),
1501        );
1502        let err = expect_err(svc.stop_execution(&req));
1503        assert!(err.to_string().contains("ExecutionDoesNotExist"));
1504    }
1505
1506    // ── ListExecutions ──
1507
1508    #[tokio::test]
1509    async fn list_executions() {
1510        let svc = StepFunctionsService::new(make_state());
1511        let sm_arn = create_sm(&svc, "list-exec");
1512
1513        for i in 0..3 {
1514            let body = json!({"stateMachineArn": sm_arn, "name": format!("e{i}")});
1515            let req = make_request("StartExecution", &body.to_string());
1516            svc.start_execution(&req).unwrap();
1517        }
1518
1519        let req = make_request(
1520            "ListExecutions",
1521            &json!({"stateMachineArn": sm_arn}).to_string(),
1522        );
1523        let resp = svc.list_executions(&req).unwrap();
1524        let b = body_json(&resp);
1525        assert_eq!(b["executions"].as_array().unwrap().len(), 3);
1526    }
1527
1528    #[tokio::test]
1529    async fn list_executions_sm_not_found() {
1530        let svc = StepFunctionsService::new(make_state());
1531        let req = make_request(
1532            "ListExecutions",
1533            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1534                .to_string(),
1535        );
1536        let err = expect_err(svc.list_executions(&req));
1537        assert!(err.to_string().contains("StateMachineDoesNotExist"));
1538    }
1539
1540    // ── GetExecutionHistory ──
1541
1542    #[tokio::test]
1543    async fn get_execution_history_not_found() {
1544        let svc = StepFunctionsService::new(make_state());
1545        let req = make_request(
1546            "GetExecutionHistory",
1547            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1548                .to_string(),
1549        );
1550        let err = expect_err(svc.get_execution_history(&req));
1551        assert!(err.to_string().contains("ExecutionDoesNotExist"));
1552    }
1553
1554    // ── DescribeStateMachineForExecution ──
1555
1556    #[tokio::test]
1557    async fn describe_sm_for_execution() {
1558        let svc = StepFunctionsService::new(make_state());
1559        let sm_arn = create_sm(&svc, "sm-for-exec");
1560
1561        let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
1562        let req = make_request("StartExecution", &body.to_string());
1563        let resp = svc.start_execution(&req).unwrap();
1564        let exec_arn = body_json(&resp)["executionArn"]
1565            .as_str()
1566            .unwrap()
1567            .to_string();
1568
1569        let req = make_request(
1570            "DescribeStateMachineForExecution",
1571            &json!({"executionArn": exec_arn}).to_string(),
1572        );
1573        let resp = svc.describe_state_machine_for_execution(&req).unwrap();
1574        let b = body_json(&resp);
1575        assert_eq!(b["name"], "sm-for-exec");
1576    }
1577
1578    // ── Tags ──
1579
1580    #[test]
1581    fn tag_untag_list_tags() {
1582        let svc = StepFunctionsService::new(make_state());
1583        let arn = create_sm(&svc, "tagged-sm");
1584
1585        // Tag
1586        let body = json!({
1587            "resourceArn": arn,
1588            "tags": [{"key": "env", "value": "prod"}],
1589        });
1590        let req = make_request("TagResource", &body.to_string());
1591        svc.tag_resource(&req).unwrap();
1592
1593        // List
1594        let req = make_request(
1595            "ListTagsForResource",
1596            &json!({"resourceArn": arn}).to_string(),
1597        );
1598        let resp = svc.list_tags_for_resource(&req).unwrap();
1599        let b = body_json(&resp);
1600        let tags = b["tags"].as_array().unwrap();
1601        assert_eq!(tags.len(), 1);
1602        assert_eq!(tags[0]["key"], "env");
1603
1604        // Untag
1605        let body = json!({
1606            "resourceArn": arn,
1607            "tagKeys": ["env"],
1608        });
1609        let req = make_request("UntagResource", &body.to_string());
1610        svc.untag_resource(&req).unwrap();
1611
1612        // Verify empty
1613        let req = make_request(
1614            "ListTagsForResource",
1615            &json!({"resourceArn": arn}).to_string(),
1616        );
1617        let resp = svc.list_tags_for_resource(&req).unwrap();
1618        let b = body_json(&resp);
1619        assert!(b["tags"].as_array().unwrap().is_empty());
1620    }
1621
1622    #[test]
1623    fn tag_resource_not_found() {
1624        let svc = StepFunctionsService::new(make_state());
1625        let body = json!({
1626            "resourceArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1627            "tags": [{"key": "k", "value": "v"}],
1628        });
1629        let req = make_request("TagResource", &body.to_string());
1630        let err = expect_err(svc.tag_resource(&req));
1631        assert!(err.to_string().contains("ResourceNotFound"));
1632    }
1633
1634    // ── Helper function tests ──
1635
1636    #[test]
1637    fn test_validate_name() {
1638        assert!(validate_name("valid-name").is_ok());
1639        assert!(validate_name("under_score").is_ok());
1640        assert!(validate_name("").is_err());
1641        assert!(validate_name("has spaces").is_err());
1642        assert!(validate_name(&"a".repeat(81)).is_err());
1643    }
1644
1645    #[test]
1646    fn test_validate_definition() {
1647        assert!(validate_definition(VALID_DEF).is_ok());
1648        assert!(validate_definition("not json").is_err());
1649        assert!(validate_definition(r#"{"States":{}}"#).is_err()); // missing StartAt
1650        assert!(validate_definition(r#"{"StartAt":"S"}"#).is_err()); // missing States
1651    }
1652
1653    #[test]
1654    fn test_validate_arn() {
1655        assert!(validate_arn("arn:aws:states:us-east-1:123:sm:test").is_ok());
1656        assert!(validate_arn("not-an-arn").is_err());
1657    }
1658
1659    #[test]
1660    fn test_camel_to_details_key() {
1661        assert_eq!(camel_to_details_key("PassStateEntered"), "passStateEntered");
1662        assert_eq!(camel_to_details_key(""), "");
1663    }
1664
1665    #[test]
1666    fn test_is_mutating_action() {
1667        assert!(is_mutating_action("CreateStateMachine"));
1668        assert!(is_mutating_action("StartExecution"));
1669        assert!(!is_mutating_action("DescribeStateMachine"));
1670        assert!(!is_mutating_action("ListStateMachines"));
1671    }
1672}