Skip to main content

fakecloud_stepfunctions/
service.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use chrono::Utc;
6use http::StatusCode;
7use serde_json::{json, Value};
8
9use fakecloud_core::delivery::DeliveryBus;
10use fakecloud_core::pagination::paginate;
11use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
12use fakecloud_core::validation::*;
13use fakecloud_dynamodb::state::SharedDynamoDbState;
14
15use crate::interpreter;
16use crate::state::{
17    Execution, ExecutionStatus, SharedStepFunctionsState, StateMachine, StateMachineStatus,
18    StateMachineType,
19};
20
21const SUPPORTED: &[&str] = &[
22    "CreateStateMachine",
23    "DescribeStateMachine",
24    "ListStateMachines",
25    "DeleteStateMachine",
26    "UpdateStateMachine",
27    "TagResource",
28    "UntagResource",
29    "ListTagsForResource",
30    "StartExecution",
31    "StopExecution",
32    "DescribeExecution",
33    "ListExecutions",
34    "GetExecutionHistory",
35    "DescribeStateMachineForExecution",
36];
37
38pub struct StepFunctionsService {
39    state: SharedStepFunctionsState,
40    delivery: Option<Arc<DeliveryBus>>,
41    dynamodb_state: Option<SharedDynamoDbState>,
42}
43
44impl StepFunctionsService {
45    pub fn new(state: SharedStepFunctionsState) -> Self {
46        Self {
47            state,
48            delivery: None,
49            dynamodb_state: None,
50        }
51    }
52
53    pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
54        self.delivery = Some(delivery);
55        self
56    }
57
58    pub fn with_dynamodb(mut self, dynamodb_state: SharedDynamoDbState) -> Self {
59        self.dynamodb_state = Some(dynamodb_state);
60        self
61    }
62}
63
64#[async_trait]
65impl AwsService for StepFunctionsService {
66    fn service_name(&self) -> &str {
67        "states"
68    }
69
70    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
71        match req.action.as_str() {
72            "CreateStateMachine" => self.create_state_machine(&req),
73            "DescribeStateMachine" => self.describe_state_machine(&req),
74            "ListStateMachines" => self.list_state_machines(&req),
75            "DeleteStateMachine" => self.delete_state_machine(&req),
76            "UpdateStateMachine" => self.update_state_machine(&req),
77            "TagResource" => self.tag_resource(&req),
78            "UntagResource" => self.untag_resource(&req),
79            "ListTagsForResource" => self.list_tags_for_resource(&req),
80            "StartExecution" => self.start_execution(&req),
81            "StopExecution" => self.stop_execution(&req),
82            "DescribeExecution" => self.describe_execution(&req),
83            "ListExecutions" => self.list_executions(&req),
84            "GetExecutionHistory" => self.get_execution_history(&req),
85            "DescribeStateMachineForExecution" => self.describe_state_machine_for_execution(&req),
86            _ => Err(AwsServiceError::action_not_implemented(
87                "states",
88                &req.action,
89            )),
90        }
91    }
92
93    fn supported_actions(&self) -> &[&str] {
94        SUPPORTED
95    }
96}
97
98impl StepFunctionsService {
99    // ─── State Machine CRUD ─────────────────────────────────────────────
100
101    fn create_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
102        let body = req.json_body();
103
104        validate_required("name", &body["name"])?;
105        let name = body["name"].as_str().ok_or_else(|| missing("name"))?;
106        validate_name(name)?;
107
108        validate_required("definition", &body["definition"])?;
109        let definition = body["definition"]
110            .as_str()
111            .ok_or_else(|| missing("definition"))?;
112        validate_definition(definition)?;
113
114        validate_required("roleArn", &body["roleArn"])?;
115        let role_arn = body["roleArn"].as_str().ok_or_else(|| missing("roleArn"))?;
116        validate_arn(role_arn)?;
117
118        let machine_type = if let Some(t) = body["type"].as_str() {
119            StateMachineType::parse(t).ok_or_else(|| {
120                AwsServiceError::aws_error(
121                    StatusCode::BAD_REQUEST,
122                    "ValidationException",
123                    format!(
124                        "Value '{t}' at 'type' failed to satisfy constraint: \
125                         Member must satisfy enum value set: [STANDARD, EXPRESS]"
126                    ),
127                )
128            })?
129        } else {
130            StateMachineType::Standard
131        };
132
133        let mut state = self.state.write();
134        let arn = state.state_machine_arn(name);
135
136        // Check if name already exists
137        if state.state_machines.values().any(|sm| sm.name == name) {
138            return Err(AwsServiceError::aws_error(
139                StatusCode::CONFLICT,
140                "StateMachineAlreadyExists",
141                format!("State Machine Already Exists: '{arn}'"),
142            ));
143        }
144
145        let now = Utc::now();
146        let revision_id = uuid::Uuid::new_v4().to_string();
147
148        let mut tags = HashMap::new();
149        if !body["tags"].is_null() {
150            fakecloud_core::tags::apply_tags(&mut tags, &body, "tags", "key", "value").map_err(
151                |f| {
152                    AwsServiceError::aws_error(
153                        StatusCode::BAD_REQUEST,
154                        "ValidationException",
155                        format!("{f} must be a list"),
156                    )
157                },
158            )?;
159        }
160
161        let sm = StateMachine {
162            name: name.to_string(),
163            arn: arn.clone(),
164            definition: definition.to_string(),
165            role_arn: role_arn.to_string(),
166            machine_type,
167            status: StateMachineStatus::Active,
168            creation_date: now,
169            update_date: now,
170            tags,
171            revision_id: revision_id.clone(),
172            logging_configuration: body.get("loggingConfiguration").cloned(),
173            tracing_configuration: body.get("tracingConfiguration").cloned(),
174            description: body["description"].as_str().unwrap_or("").to_string(),
175        };
176
177        state.state_machines.insert(arn.clone(), sm);
178
179        Ok(AwsResponse::ok_json(json!({
180            "stateMachineArn": arn,
181            "creationDate": now.timestamp() as f64,
182            "stateMachineVersionArn": arn,
183        })))
184    }
185
186    fn describe_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
187        let body = req.json_body();
188        validate_required("stateMachineArn", &body["stateMachineArn"])?;
189        let arn = body["stateMachineArn"]
190            .as_str()
191            .ok_or_else(|| missing("stateMachineArn"))?;
192        validate_arn(arn)?;
193
194        let state = self.state.read();
195        let sm = state
196            .state_machines
197            .get(arn)
198            .ok_or_else(|| state_machine_not_found(arn))?;
199
200        Ok(AwsResponse::ok_json(state_machine_to_json(sm)))
201    }
202
203    fn list_state_machines(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
204        let body = req.json_body();
205        let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
206        validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
207        let next_token = body["nextToken"].as_str();
208
209        let state = self.state.read();
210        let mut machines: Vec<&StateMachine> = state.state_machines.values().collect();
211        machines.sort_by(|a, b| a.name.cmp(&b.name));
212
213        let items: Vec<Value> = machines
214            .iter()
215            .map(|sm| {
216                json!({
217                    "name": sm.name,
218                    "stateMachineArn": sm.arn,
219                    "type": sm.machine_type.as_str(),
220                    "creationDate": sm.creation_date.timestamp() as f64,
221                })
222            })
223            .collect();
224
225        let (page, token) = paginate(&items, next_token, max_results);
226
227        let mut resp = json!({ "stateMachines": page });
228        if let Some(t) = token {
229            resp["nextToken"] = json!(t);
230        }
231        Ok(AwsResponse::ok_json(resp))
232    }
233
234    fn delete_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
235        let body = req.json_body();
236        validate_required("stateMachineArn", &body["stateMachineArn"])?;
237        let arn = body["stateMachineArn"]
238            .as_str()
239            .ok_or_else(|| missing("stateMachineArn"))?;
240        validate_arn(arn)?;
241
242        let mut state = self.state.write();
243        // AWS returns success even if it doesn't exist
244        state.state_machines.remove(arn);
245
246        Ok(AwsResponse::ok_json(json!({})))
247    }
248
249    fn update_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
250        let body = req.json_body();
251        validate_required("stateMachineArn", &body["stateMachineArn"])?;
252        let arn = body["stateMachineArn"]
253            .as_str()
254            .ok_or_else(|| missing("stateMachineArn"))?;
255        validate_arn(arn)?;
256
257        let mut state = self.state.write();
258        let sm = state
259            .state_machines
260            .get_mut(arn)
261            .ok_or_else(|| state_machine_not_found(arn))?;
262
263        if let Some(definition) = body["definition"].as_str() {
264            validate_definition(definition)?;
265            sm.definition = definition.to_string();
266        }
267
268        if let Some(role_arn) = body["roleArn"].as_str() {
269            validate_arn(role_arn)?;
270            sm.role_arn = role_arn.to_string();
271        }
272
273        if let Some(logging) = body.get("loggingConfiguration") {
274            sm.logging_configuration = Some(logging.clone());
275        }
276
277        if let Some(tracing) = body.get("tracingConfiguration") {
278            sm.tracing_configuration = Some(tracing.clone());
279        }
280
281        if let Some(description) = body["description"].as_str() {
282            sm.description = description.to_string();
283        }
284
285        let now = Utc::now();
286        sm.update_date = now;
287        sm.revision_id = uuid::Uuid::new_v4().to_string();
288
289        let revision_id = sm.revision_id.clone();
290        let sm_arn = sm.arn.clone();
291
292        Ok(AwsResponse::ok_json(json!({
293            "updateDate": now.timestamp() as f64,
294            "revisionId": revision_id,
295            "stateMachineVersionArn": sm_arn,
296        })))
297    }
298
299    // ─── Execution Lifecycle ──────────────────────────────────────────
300
301    fn start_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
302        let body = req.json_body();
303        validate_required("stateMachineArn", &body["stateMachineArn"])?;
304        let sm_arn = body["stateMachineArn"]
305            .as_str()
306            .ok_or_else(|| missing("stateMachineArn"))?;
307        validate_arn(sm_arn)?;
308
309        let input = body["input"].as_str().map(|s| s.to_string());
310
311        // Validate input is valid JSON if provided
312        if let Some(ref input_str) = input {
313            let _: serde_json::Value = serde_json::from_str(input_str).map_err(|_| {
314                AwsServiceError::aws_error(
315                    StatusCode::BAD_REQUEST,
316                    "InvalidExecutionInput",
317                    "Invalid execution input: must be valid JSON".to_string(),
318                )
319            })?;
320        }
321
322        let execution_name = body["name"]
323            .as_str()
324            .map(|s| s.to_string())
325            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
326
327        if let Some(name) = body["name"].as_str() {
328            validate_name(name)?;
329        }
330
331        let mut state = self.state.write();
332        let sm = state
333            .state_machines
334            .get(sm_arn)
335            .ok_or_else(|| state_machine_not_found(sm_arn))?;
336
337        let sm_name = sm.name.clone();
338        let definition = sm.definition.clone();
339        let exec_arn = state.execution_arn(&sm_name, &execution_name);
340
341        // Check for duplicate execution name
342        if state.executions.contains_key(&exec_arn) {
343            return Err(AwsServiceError::aws_error(
344                StatusCode::CONFLICT,
345                "ExecutionAlreadyExists",
346                format!("Execution Already Exists: '{exec_arn}'"),
347            ));
348        }
349
350        let now = Utc::now();
351        let execution = Execution {
352            execution_arn: exec_arn.clone(),
353            state_machine_arn: sm_arn.to_string(),
354            state_machine_name: sm_name,
355            name: execution_name,
356            status: ExecutionStatus::Running,
357            input: input.clone(),
358            output: None,
359            start_date: now,
360            stop_date: None,
361            error: None,
362            cause: None,
363            history_events: vec![],
364        };
365
366        state.executions.insert(exec_arn.clone(), execution);
367        drop(state);
368
369        // Spawn async execution
370        let shared_state = self.state.clone();
371        let exec_arn_clone = exec_arn.clone();
372        let input_clone = input;
373        let delivery = self.delivery.clone();
374        let dynamodb_state = self.dynamodb_state.clone();
375        tokio::spawn(async move {
376            interpreter::execute_state_machine(
377                shared_state,
378                exec_arn_clone,
379                definition,
380                input_clone,
381                delivery,
382                dynamodb_state,
383            )
384            .await;
385        });
386
387        Ok(AwsResponse::ok_json(json!({
388            "executionArn": exec_arn,
389            "startDate": now.timestamp() as f64,
390        })))
391    }
392
393    fn stop_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
394        let body = req.json_body();
395        validate_required("executionArn", &body["executionArn"])?;
396        let exec_arn = body["executionArn"]
397            .as_str()
398            .ok_or_else(|| missing("executionArn"))?;
399
400        let error = body["error"].as_str().map(|s| s.to_string());
401        let cause = body["cause"].as_str().map(|s| s.to_string());
402
403        let mut state = self.state.write();
404        let exec = state
405            .executions
406            .get_mut(exec_arn)
407            .ok_or_else(|| execution_not_found(exec_arn))?;
408
409        if exec.status != ExecutionStatus::Running {
410            return Err(AwsServiceError::aws_error(
411                StatusCode::BAD_REQUEST,
412                "ExecutionNotRunning",
413                format!("Execution is not running: '{exec_arn}'"),
414            ));
415        }
416
417        let now = Utc::now();
418        exec.status = ExecutionStatus::Aborted;
419        exec.stop_date = Some(now);
420        exec.error = error;
421        exec.cause = cause;
422
423        Ok(AwsResponse::ok_json(json!({
424            "stopDate": now.timestamp() as f64,
425        })))
426    }
427
428    fn describe_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
429        let body = req.json_body();
430        validate_required("executionArn", &body["executionArn"])?;
431        let exec_arn = body["executionArn"]
432            .as_str()
433            .ok_or_else(|| missing("executionArn"))?;
434
435        let state = self.state.read();
436        let exec = state
437            .executions
438            .get(exec_arn)
439            .ok_or_else(|| execution_not_found(exec_arn))?;
440
441        Ok(AwsResponse::ok_json(execution_to_json(exec)))
442    }
443
444    fn list_executions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
445        let body = req.json_body();
446        validate_required("stateMachineArn", &body["stateMachineArn"])?;
447        let sm_arn = body["stateMachineArn"]
448            .as_str()
449            .ok_or_else(|| missing("stateMachineArn"))?;
450        validate_arn(sm_arn)?;
451
452        let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
453        validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
454        let next_token = body["nextToken"].as_str();
455        let status_filter = body["statusFilter"].as_str();
456
457        let state = self.state.read();
458
459        // Verify state machine exists
460        if !state.state_machines.contains_key(sm_arn) {
461            return Err(state_machine_not_found(sm_arn));
462        }
463
464        let mut executions: Vec<&Execution> = state
465            .executions
466            .values()
467            .filter(|e| e.state_machine_arn == sm_arn)
468            .filter(|e| {
469                status_filter
470                    .map(|sf| e.status.as_str() == sf)
471                    .unwrap_or(true)
472            })
473            .collect();
474
475        // Sort by start date descending (most recent first)
476        executions.sort_by(|a, b| b.start_date.cmp(&a.start_date));
477
478        let items: Vec<Value> = executions
479            .iter()
480            .map(|e| {
481                let mut item = json!({
482                    "executionArn": e.execution_arn,
483                    "stateMachineArn": e.state_machine_arn,
484                    "name": e.name,
485                    "status": e.status.as_str(),
486                    "startDate": e.start_date.timestamp() as f64,
487                });
488                if let Some(stop) = e.stop_date {
489                    item["stopDate"] = json!(stop.timestamp() as f64);
490                }
491                item
492            })
493            .collect();
494
495        let (page, token) = paginate(&items, next_token, max_results);
496
497        let mut resp = json!({ "executions": page });
498        if let Some(t) = token {
499            resp["nextToken"] = json!(t);
500        }
501        Ok(AwsResponse::ok_json(resp))
502    }
503
504    fn get_execution_history(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
505        let body = req.json_body();
506        validate_required("executionArn", &body["executionArn"])?;
507        let exec_arn = body["executionArn"]
508            .as_str()
509            .ok_or_else(|| missing("executionArn"))?;
510
511        let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
512        validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
513        let next_token = body["nextToken"].as_str();
514        let reverse_order = body["reverseOrder"].as_bool().unwrap_or(false);
515
516        let state = self.state.read();
517        let exec = state
518            .executions
519            .get(exec_arn)
520            .ok_or_else(|| execution_not_found(exec_arn))?;
521
522        let mut events: Vec<Value> = exec
523            .history_events
524            .iter()
525            .map(|e| {
526                json!({
527                    "id": e.id,
528                    "type": e.event_type,
529                    "timestamp": e.timestamp.timestamp() as f64,
530                    "previousEventId": e.previous_event_id,
531                    format!("{}EventDetails", camel_to_details_key(&e.event_type)): e.details,
532                })
533            })
534            .collect();
535
536        if reverse_order {
537            events.reverse();
538        }
539
540        let (page, token) = paginate(&events, next_token, max_results);
541
542        let mut resp = json!({ "events": page });
543        if let Some(t) = token {
544            resp["nextToken"] = json!(t);
545        }
546        Ok(AwsResponse::ok_json(resp))
547    }
548
549    fn describe_state_machine_for_execution(
550        &self,
551        req: &AwsRequest,
552    ) -> Result<AwsResponse, AwsServiceError> {
553        let body = req.json_body();
554        validate_required("executionArn", &body["executionArn"])?;
555        let exec_arn = body["executionArn"]
556            .as_str()
557            .ok_or_else(|| missing("executionArn"))?;
558
559        let state = self.state.read();
560        let exec = state
561            .executions
562            .get(exec_arn)
563            .ok_or_else(|| execution_not_found(exec_arn))?;
564
565        let sm = state
566            .state_machines
567            .get(&exec.state_machine_arn)
568            .ok_or_else(|| state_machine_not_found(&exec.state_machine_arn))?;
569
570        Ok(AwsResponse::ok_json(state_machine_to_json(sm)))
571    }
572
573    // ─── Tagging ────────────────────────────────────────────────────────
574
575    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
576        let body = req.json_body();
577        validate_required("resourceArn", &body["resourceArn"])?;
578        let arn = body["resourceArn"]
579            .as_str()
580            .ok_or_else(|| missing("resourceArn"))?;
581        validate_arn(arn)?;
582        validate_required("tags", &body["tags"])?;
583
584        let mut state = self.state.write();
585        let sm = state
586            .state_machines
587            .get_mut(arn)
588            .ok_or_else(|| resource_not_found(arn))?;
589
590        fakecloud_core::tags::apply_tags(&mut sm.tags, &body, "tags", "key", "value").map_err(
591            |f| {
592                AwsServiceError::aws_error(
593                    StatusCode::BAD_REQUEST,
594                    "ValidationException",
595                    format!("{f} must be a list"),
596                )
597            },
598        )?;
599
600        Ok(AwsResponse::ok_json(json!({})))
601    }
602
603    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
604        let body = req.json_body();
605        validate_required("resourceArn", &body["resourceArn"])?;
606        let arn = body["resourceArn"]
607            .as_str()
608            .ok_or_else(|| missing("resourceArn"))?;
609        validate_arn(arn)?;
610        validate_required("tagKeys", &body["tagKeys"])?;
611
612        let mut state = self.state.write();
613        let sm = state
614            .state_machines
615            .get_mut(arn)
616            .ok_or_else(|| resource_not_found(arn))?;
617
618        fakecloud_core::tags::remove_tags(&mut sm.tags, &body, "tagKeys").map_err(|f| {
619            AwsServiceError::aws_error(
620                StatusCode::BAD_REQUEST,
621                "ValidationException",
622                format!("{f} must be a list"),
623            )
624        })?;
625
626        Ok(AwsResponse::ok_json(json!({})))
627    }
628
629    fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
630        let body = req.json_body();
631        validate_required("resourceArn", &body["resourceArn"])?;
632        let arn = body["resourceArn"]
633            .as_str()
634            .ok_or_else(|| missing("resourceArn"))?;
635        validate_arn(arn)?;
636
637        let state = self.state.read();
638        let sm = state
639            .state_machines
640            .get(arn)
641            .ok_or_else(|| resource_not_found(arn))?;
642
643        let tags = fakecloud_core::tags::tags_to_json(&sm.tags, "key", "value");
644
645        Ok(AwsResponse::ok_json(json!({ "tags": tags })))
646    }
647}
648
649// ─── Helpers ────────────────────────────────────────────────────────────
650
651fn state_machine_to_json(sm: &StateMachine) -> Value {
652    let mut resp = json!({
653        "name": sm.name,
654        "stateMachineArn": sm.arn,
655        "definition": sm.definition,
656        "roleArn": sm.role_arn,
657        "type": sm.machine_type.as_str(),
658        "status": sm.status.as_str(),
659        "creationDate": sm.creation_date.timestamp() as f64,
660        "updateDate": sm.update_date.timestamp() as f64,
661        "revisionId": sm.revision_id,
662        "label": sm.name,
663    });
664
665    if !sm.description.is_empty() {
666        resp["description"] = json!(sm.description);
667    }
668
669    if let Some(ref logging) = sm.logging_configuration {
670        resp["loggingConfiguration"] = logging.clone();
671    } else {
672        resp["loggingConfiguration"] = json!({
673            "level": "OFF",
674            "includeExecutionData": false,
675            "destinations": [],
676        });
677    }
678
679    if let Some(ref tracing) = sm.tracing_configuration {
680        resp["tracingConfiguration"] = tracing.clone();
681    } else {
682        resp["tracingConfiguration"] = json!({
683            "enabled": false,
684        });
685    }
686
687    resp
688}
689
690fn missing(name: &str) -> AwsServiceError {
691    AwsServiceError::aws_error(
692        StatusCode::BAD_REQUEST,
693        "ValidationException",
694        format!("The request must contain the parameter {name}."),
695    )
696}
697
698fn state_machine_not_found(arn: &str) -> AwsServiceError {
699    AwsServiceError::aws_error(
700        StatusCode::BAD_REQUEST,
701        "StateMachineDoesNotExist",
702        format!("State Machine Does Not Exist: '{arn}'"),
703    )
704}
705
706fn resource_not_found(arn: &str) -> AwsServiceError {
707    AwsServiceError::aws_error(
708        StatusCode::BAD_REQUEST,
709        "ResourceNotFound",
710        format!("Resource not found: '{arn}'"),
711    )
712}
713
714fn validate_name(name: &str) -> Result<(), AwsServiceError> {
715    if name.is_empty() || name.len() > 80 {
716        return Err(AwsServiceError::aws_error(
717            StatusCode::BAD_REQUEST,
718            "InvalidName",
719            format!("Invalid Name: '{name}' (length must be between 1 and 80 characters)"),
720        ));
721    }
722    // Only allow alphanumeric, hyphens, and underscores
723    if !name
724        .chars()
725        .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
726    {
727        return Err(AwsServiceError::aws_error(
728            StatusCode::BAD_REQUEST,
729            "InvalidName",
730            format!(
731                "Invalid Name: '{name}' (must only contain alphanumeric characters, hyphens, and underscores)"
732            ),
733        ));
734    }
735    Ok(())
736}
737
738fn validate_definition(definition: &str) -> Result<(), AwsServiceError> {
739    let parsed: Value = serde_json::from_str(definition).map_err(|e| {
740        AwsServiceError::aws_error(
741            StatusCode::BAD_REQUEST,
742            "InvalidDefinition",
743            format!("Invalid State Machine Definition: '{e}'"),
744        )
745    })?;
746
747    if parsed.get("StartAt").and_then(|v| v.as_str()).is_none() {
748        return Err(AwsServiceError::aws_error(
749            StatusCode::BAD_REQUEST,
750            "InvalidDefinition",
751            "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
752                .to_string(),
753        ));
754    }
755
756    let states = parsed.get("States").and_then(|v| v.as_object());
757    if states.is_none() {
758        return Err(AwsServiceError::aws_error(
759            StatusCode::BAD_REQUEST,
760            "InvalidDefinition",
761            "Invalid State Machine Definition: 'MISSING_STATES' (States field is required)"
762                .to_string(),
763        ));
764    }
765
766    let start_at = parsed["StartAt"].as_str().unwrap();
767    let states_obj = states.unwrap();
768    if !states_obj.contains_key(start_at) {
769        return Err(AwsServiceError::aws_error(
770            StatusCode::BAD_REQUEST,
771            "InvalidDefinition",
772            format!(
773                "Invalid State Machine Definition: 'MISSING_TRANSITION_TARGET' \
774                 (StartAt '{start_at}' does not reference a valid state)"
775            ),
776        ));
777    }
778
779    Ok(())
780}
781
782fn execution_not_found(arn: &str) -> AwsServiceError {
783    AwsServiceError::aws_error(
784        StatusCode::BAD_REQUEST,
785        "ExecutionDoesNotExist",
786        format!("Execution Does Not Exist: '{arn}'"),
787    )
788}
789
790fn execution_to_json(exec: &Execution) -> Value {
791    let mut resp = json!({
792        "executionArn": exec.execution_arn,
793        "stateMachineArn": exec.state_machine_arn,
794        "name": exec.name,
795        "status": exec.status.as_str(),
796        "startDate": exec.start_date.timestamp() as f64,
797    });
798
799    if let Some(ref input) = exec.input {
800        resp["input"] = json!(input);
801    }
802    if let Some(ref output) = exec.output {
803        resp["output"] = json!(output);
804    }
805    if let Some(stop) = exec.stop_date {
806        resp["stopDate"] = json!(stop.timestamp() as f64);
807    }
808    if let Some(ref error) = exec.error {
809        resp["error"] = json!(error);
810    }
811    if let Some(ref cause) = exec.cause {
812        resp["cause"] = json!(cause);
813    }
814
815    resp
816}
817
818/// Convert event type like "PassStateEntered" to the details key format "passStateEntered".
819fn camel_to_details_key(event_type: &str) -> String {
820    let mut chars = event_type.chars();
821    match chars.next() {
822        None => String::new(),
823        Some(c) => c.to_lowercase().to_string() + chars.as_str(),
824    }
825}
826
827fn validate_arn(arn: &str) -> Result<(), AwsServiceError> {
828    if !arn.starts_with("arn:") {
829        return Err(AwsServiceError::aws_error(
830            StatusCode::BAD_REQUEST,
831            "InvalidArn",
832            format!("Invalid Arn: '{arn}'"),
833        ));
834    }
835    Ok(())
836}