1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use chrono::Utc;
6use http::StatusCode;
7use serde_json::{json, Value};
8
9use fakecloud_core::delivery::DeliveryBus;
10use fakecloud_core::pagination::paginate;
11use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
12use fakecloud_core::validation::*;
13use fakecloud_dynamodb::state::SharedDynamoDbState;
14
15use crate::interpreter;
16use crate::state::{
17 Execution, ExecutionStatus, SharedStepFunctionsState, StateMachine, StateMachineStatus,
18 StateMachineType,
19};
20
21const SUPPORTED: &[&str] = &[
22 "CreateStateMachine",
23 "DescribeStateMachine",
24 "ListStateMachines",
25 "DeleteStateMachine",
26 "UpdateStateMachine",
27 "TagResource",
28 "UntagResource",
29 "ListTagsForResource",
30 "StartExecution",
31 "StopExecution",
32 "DescribeExecution",
33 "ListExecutions",
34 "GetExecutionHistory",
35 "DescribeStateMachineForExecution",
36];
37
38pub struct StepFunctionsService {
39 state: SharedStepFunctionsState,
40 delivery: Option<Arc<DeliveryBus>>,
41 dynamodb_state: Option<SharedDynamoDbState>,
42}
43
44impl StepFunctionsService {
45 pub fn new(state: SharedStepFunctionsState) -> Self {
46 Self {
47 state,
48 delivery: None,
49 dynamodb_state: None,
50 }
51 }
52
53 pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
54 self.delivery = Some(delivery);
55 self
56 }
57
58 pub fn with_dynamodb(mut self, dynamodb_state: SharedDynamoDbState) -> Self {
59 self.dynamodb_state = Some(dynamodb_state);
60 self
61 }
62}
63
64#[async_trait]
65impl AwsService for StepFunctionsService {
66 fn service_name(&self) -> &str {
67 "states"
68 }
69
70 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
71 match req.action.as_str() {
72 "CreateStateMachine" => self.create_state_machine(&req),
73 "DescribeStateMachine" => self.describe_state_machine(&req),
74 "ListStateMachines" => self.list_state_machines(&req),
75 "DeleteStateMachine" => self.delete_state_machine(&req),
76 "UpdateStateMachine" => self.update_state_machine(&req),
77 "TagResource" => self.tag_resource(&req),
78 "UntagResource" => self.untag_resource(&req),
79 "ListTagsForResource" => self.list_tags_for_resource(&req),
80 "StartExecution" => self.start_execution(&req),
81 "StopExecution" => self.stop_execution(&req),
82 "DescribeExecution" => self.describe_execution(&req),
83 "ListExecutions" => self.list_executions(&req),
84 "GetExecutionHistory" => self.get_execution_history(&req),
85 "DescribeStateMachineForExecution" => self.describe_state_machine_for_execution(&req),
86 _ => Err(AwsServiceError::action_not_implemented(
87 "states",
88 &req.action,
89 )),
90 }
91 }
92
93 fn supported_actions(&self) -> &[&str] {
94 SUPPORTED
95 }
96}
97
98impl StepFunctionsService {
99 fn create_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
102 let body = req.json_body();
103
104 validate_required("name", &body["name"])?;
105 let name = body["name"].as_str().ok_or_else(|| missing("name"))?;
106 validate_name(name)?;
107
108 validate_required("definition", &body["definition"])?;
109 let definition = body["definition"]
110 .as_str()
111 .ok_or_else(|| missing("definition"))?;
112 validate_definition(definition)?;
113
114 validate_required("roleArn", &body["roleArn"])?;
115 let role_arn = body["roleArn"].as_str().ok_or_else(|| missing("roleArn"))?;
116 validate_arn(role_arn)?;
117
118 let machine_type = if let Some(t) = body["type"].as_str() {
119 StateMachineType::parse(t).ok_or_else(|| {
120 AwsServiceError::aws_error(
121 StatusCode::BAD_REQUEST,
122 "ValidationException",
123 format!(
124 "Value '{t}' at 'type' failed to satisfy constraint: \
125 Member must satisfy enum value set: [STANDARD, EXPRESS]"
126 ),
127 )
128 })?
129 } else {
130 StateMachineType::Standard
131 };
132
133 let mut state = self.state.write();
134 let arn = state.state_machine_arn(name);
135
136 if state.state_machines.values().any(|sm| sm.name == name) {
138 return Err(AwsServiceError::aws_error(
139 StatusCode::CONFLICT,
140 "StateMachineAlreadyExists",
141 format!("State Machine Already Exists: '{arn}'"),
142 ));
143 }
144
145 let now = Utc::now();
146 let revision_id = uuid::Uuid::new_v4().to_string();
147
148 let mut tags = HashMap::new();
149 if !body["tags"].is_null() {
150 fakecloud_core::tags::apply_tags(&mut tags, &body, "tags", "key", "value").map_err(
151 |f| {
152 AwsServiceError::aws_error(
153 StatusCode::BAD_REQUEST,
154 "ValidationException",
155 format!("{f} must be a list"),
156 )
157 },
158 )?;
159 }
160
161 let sm = StateMachine {
162 name: name.to_string(),
163 arn: arn.clone(),
164 definition: definition.to_string(),
165 role_arn: role_arn.to_string(),
166 machine_type,
167 status: StateMachineStatus::Active,
168 creation_date: now,
169 update_date: now,
170 tags,
171 revision_id: revision_id.clone(),
172 logging_configuration: body.get("loggingConfiguration").cloned(),
173 tracing_configuration: body.get("tracingConfiguration").cloned(),
174 description: body["description"].as_str().unwrap_or("").to_string(),
175 };
176
177 state.state_machines.insert(arn.clone(), sm);
178
179 Ok(AwsResponse::ok_json(json!({
180 "stateMachineArn": arn,
181 "creationDate": now.timestamp() as f64,
182 "stateMachineVersionArn": arn,
183 })))
184 }
185
186 fn describe_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
187 let body = req.json_body();
188 validate_required("stateMachineArn", &body["stateMachineArn"])?;
189 let arn = body["stateMachineArn"]
190 .as_str()
191 .ok_or_else(|| missing("stateMachineArn"))?;
192 validate_arn(arn)?;
193
194 let state = self.state.read();
195 let sm = state
196 .state_machines
197 .get(arn)
198 .ok_or_else(|| state_machine_not_found(arn))?;
199
200 Ok(AwsResponse::ok_json(state_machine_to_json(sm)))
201 }
202
203 fn list_state_machines(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
204 let body = req.json_body();
205 let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
206 validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
207 let next_token = body["nextToken"].as_str();
208
209 let state = self.state.read();
210 let mut machines: Vec<&StateMachine> = state.state_machines.values().collect();
211 machines.sort_by(|a, b| a.name.cmp(&b.name));
212
213 let items: Vec<Value> = machines
214 .iter()
215 .map(|sm| {
216 json!({
217 "name": sm.name,
218 "stateMachineArn": sm.arn,
219 "type": sm.machine_type.as_str(),
220 "creationDate": sm.creation_date.timestamp() as f64,
221 })
222 })
223 .collect();
224
225 let (page, token) = paginate(&items, next_token, max_results);
226
227 let mut resp = json!({ "stateMachines": page });
228 if let Some(t) = token {
229 resp["nextToken"] = json!(t);
230 }
231 Ok(AwsResponse::ok_json(resp))
232 }
233
234 fn delete_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
235 let body = req.json_body();
236 validate_required("stateMachineArn", &body["stateMachineArn"])?;
237 let arn = body["stateMachineArn"]
238 .as_str()
239 .ok_or_else(|| missing("stateMachineArn"))?;
240 validate_arn(arn)?;
241
242 let mut state = self.state.write();
243 state.state_machines.remove(arn);
245
246 Ok(AwsResponse::ok_json(json!({})))
247 }
248
249 fn update_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
250 let body = req.json_body();
251 validate_required("stateMachineArn", &body["stateMachineArn"])?;
252 let arn = body["stateMachineArn"]
253 .as_str()
254 .ok_or_else(|| missing("stateMachineArn"))?;
255 validate_arn(arn)?;
256
257 let mut state = self.state.write();
258 let sm = state
259 .state_machines
260 .get_mut(arn)
261 .ok_or_else(|| state_machine_not_found(arn))?;
262
263 if let Some(definition) = body["definition"].as_str() {
264 validate_definition(definition)?;
265 sm.definition = definition.to_string();
266 }
267
268 if let Some(role_arn) = body["roleArn"].as_str() {
269 validate_arn(role_arn)?;
270 sm.role_arn = role_arn.to_string();
271 }
272
273 if let Some(logging) = body.get("loggingConfiguration") {
274 sm.logging_configuration = Some(logging.clone());
275 }
276
277 if let Some(tracing) = body.get("tracingConfiguration") {
278 sm.tracing_configuration = Some(tracing.clone());
279 }
280
281 if let Some(description) = body["description"].as_str() {
282 sm.description = description.to_string();
283 }
284
285 let now = Utc::now();
286 sm.update_date = now;
287 sm.revision_id = uuid::Uuid::new_v4().to_string();
288
289 let revision_id = sm.revision_id.clone();
290 let sm_arn = sm.arn.clone();
291
292 Ok(AwsResponse::ok_json(json!({
293 "updateDate": now.timestamp() as f64,
294 "revisionId": revision_id,
295 "stateMachineVersionArn": sm_arn,
296 })))
297 }
298
299 fn start_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
302 let body = req.json_body();
303 validate_required("stateMachineArn", &body["stateMachineArn"])?;
304 let sm_arn = body["stateMachineArn"]
305 .as_str()
306 .ok_or_else(|| missing("stateMachineArn"))?;
307 validate_arn(sm_arn)?;
308
309 let input = body["input"].as_str().map(|s| s.to_string());
310
311 if let Some(ref input_str) = input {
313 let _: serde_json::Value = serde_json::from_str(input_str).map_err(|_| {
314 AwsServiceError::aws_error(
315 StatusCode::BAD_REQUEST,
316 "InvalidExecutionInput",
317 "Invalid execution input: must be valid JSON".to_string(),
318 )
319 })?;
320 }
321
322 let execution_name = body["name"]
323 .as_str()
324 .map(|s| s.to_string())
325 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
326
327 if let Some(name) = body["name"].as_str() {
328 validate_name(name)?;
329 }
330
331 let mut state = self.state.write();
332 let sm = state
333 .state_machines
334 .get(sm_arn)
335 .ok_or_else(|| state_machine_not_found(sm_arn))?;
336
337 let sm_name = sm.name.clone();
338 let definition = sm.definition.clone();
339 let exec_arn = state.execution_arn(&sm_name, &execution_name);
340
341 if state.executions.contains_key(&exec_arn) {
343 return Err(AwsServiceError::aws_error(
344 StatusCode::CONFLICT,
345 "ExecutionAlreadyExists",
346 format!("Execution Already Exists: '{exec_arn}'"),
347 ));
348 }
349
350 let now = Utc::now();
351 let execution = Execution {
352 execution_arn: exec_arn.clone(),
353 state_machine_arn: sm_arn.to_string(),
354 state_machine_name: sm_name,
355 name: execution_name,
356 status: ExecutionStatus::Running,
357 input: input.clone(),
358 output: None,
359 start_date: now,
360 stop_date: None,
361 error: None,
362 cause: None,
363 history_events: vec![],
364 };
365
366 state.executions.insert(exec_arn.clone(), execution);
367 drop(state);
368
369 let shared_state = self.state.clone();
371 let exec_arn_clone = exec_arn.clone();
372 let input_clone = input;
373 let delivery = self.delivery.clone();
374 let dynamodb_state = self.dynamodb_state.clone();
375 tokio::spawn(async move {
376 interpreter::execute_state_machine(
377 shared_state,
378 exec_arn_clone,
379 definition,
380 input_clone,
381 delivery,
382 dynamodb_state,
383 )
384 .await;
385 });
386
387 Ok(AwsResponse::ok_json(json!({
388 "executionArn": exec_arn,
389 "startDate": now.timestamp() as f64,
390 })))
391 }
392
393 fn stop_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
394 let body = req.json_body();
395 validate_required("executionArn", &body["executionArn"])?;
396 let exec_arn = body["executionArn"]
397 .as_str()
398 .ok_or_else(|| missing("executionArn"))?;
399
400 let error = body["error"].as_str().map(|s| s.to_string());
401 let cause = body["cause"].as_str().map(|s| s.to_string());
402
403 let mut state = self.state.write();
404 let exec = state
405 .executions
406 .get_mut(exec_arn)
407 .ok_or_else(|| execution_not_found(exec_arn))?;
408
409 if exec.status != ExecutionStatus::Running {
410 return Err(AwsServiceError::aws_error(
411 StatusCode::BAD_REQUEST,
412 "ExecutionNotRunning",
413 format!("Execution is not running: '{exec_arn}'"),
414 ));
415 }
416
417 let now = Utc::now();
418 exec.status = ExecutionStatus::Aborted;
419 exec.stop_date = Some(now);
420 exec.error = error;
421 exec.cause = cause;
422
423 Ok(AwsResponse::ok_json(json!({
424 "stopDate": now.timestamp() as f64,
425 })))
426 }
427
428 fn describe_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
429 let body = req.json_body();
430 validate_required("executionArn", &body["executionArn"])?;
431 let exec_arn = body["executionArn"]
432 .as_str()
433 .ok_or_else(|| missing("executionArn"))?;
434
435 let state = self.state.read();
436 let exec = state
437 .executions
438 .get(exec_arn)
439 .ok_or_else(|| execution_not_found(exec_arn))?;
440
441 Ok(AwsResponse::ok_json(execution_to_json(exec)))
442 }
443
444 fn list_executions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
445 let body = req.json_body();
446 validate_required("stateMachineArn", &body["stateMachineArn"])?;
447 let sm_arn = body["stateMachineArn"]
448 .as_str()
449 .ok_or_else(|| missing("stateMachineArn"))?;
450 validate_arn(sm_arn)?;
451
452 let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
453 validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
454 let next_token = body["nextToken"].as_str();
455 let status_filter = body["statusFilter"].as_str();
456
457 let state = self.state.read();
458
459 if !state.state_machines.contains_key(sm_arn) {
461 return Err(state_machine_not_found(sm_arn));
462 }
463
464 let mut executions: Vec<&Execution> = state
465 .executions
466 .values()
467 .filter(|e| e.state_machine_arn == sm_arn)
468 .filter(|e| {
469 status_filter
470 .map(|sf| e.status.as_str() == sf)
471 .unwrap_or(true)
472 })
473 .collect();
474
475 executions.sort_by(|a, b| b.start_date.cmp(&a.start_date));
477
478 let items: Vec<Value> = executions
479 .iter()
480 .map(|e| {
481 let mut item = json!({
482 "executionArn": e.execution_arn,
483 "stateMachineArn": e.state_machine_arn,
484 "name": e.name,
485 "status": e.status.as_str(),
486 "startDate": e.start_date.timestamp() as f64,
487 });
488 if let Some(stop) = e.stop_date {
489 item["stopDate"] = json!(stop.timestamp() as f64);
490 }
491 item
492 })
493 .collect();
494
495 let (page, token) = paginate(&items, next_token, max_results);
496
497 let mut resp = json!({ "executions": page });
498 if let Some(t) = token {
499 resp["nextToken"] = json!(t);
500 }
501 Ok(AwsResponse::ok_json(resp))
502 }
503
504 fn get_execution_history(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
505 let body = req.json_body();
506 validate_required("executionArn", &body["executionArn"])?;
507 let exec_arn = body["executionArn"]
508 .as_str()
509 .ok_or_else(|| missing("executionArn"))?;
510
511 let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
512 validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
513 let next_token = body["nextToken"].as_str();
514 let reverse_order = body["reverseOrder"].as_bool().unwrap_or(false);
515
516 let state = self.state.read();
517 let exec = state
518 .executions
519 .get(exec_arn)
520 .ok_or_else(|| execution_not_found(exec_arn))?;
521
522 let mut events: Vec<Value> = exec
523 .history_events
524 .iter()
525 .map(|e| {
526 json!({
527 "id": e.id,
528 "type": e.event_type,
529 "timestamp": e.timestamp.timestamp() as f64,
530 "previousEventId": e.previous_event_id,
531 format!("{}EventDetails", camel_to_details_key(&e.event_type)): e.details,
532 })
533 })
534 .collect();
535
536 if reverse_order {
537 events.reverse();
538 }
539
540 let (page, token) = paginate(&events, next_token, max_results);
541
542 let mut resp = json!({ "events": page });
543 if let Some(t) = token {
544 resp["nextToken"] = json!(t);
545 }
546 Ok(AwsResponse::ok_json(resp))
547 }
548
549 fn describe_state_machine_for_execution(
550 &self,
551 req: &AwsRequest,
552 ) -> Result<AwsResponse, AwsServiceError> {
553 let body = req.json_body();
554 validate_required("executionArn", &body["executionArn"])?;
555 let exec_arn = body["executionArn"]
556 .as_str()
557 .ok_or_else(|| missing("executionArn"))?;
558
559 let state = self.state.read();
560 let exec = state
561 .executions
562 .get(exec_arn)
563 .ok_or_else(|| execution_not_found(exec_arn))?;
564
565 let sm = state
566 .state_machines
567 .get(&exec.state_machine_arn)
568 .ok_or_else(|| state_machine_not_found(&exec.state_machine_arn))?;
569
570 Ok(AwsResponse::ok_json(state_machine_to_json(sm)))
571 }
572
573 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
576 let body = req.json_body();
577 validate_required("resourceArn", &body["resourceArn"])?;
578 let arn = body["resourceArn"]
579 .as_str()
580 .ok_or_else(|| missing("resourceArn"))?;
581 validate_arn(arn)?;
582 validate_required("tags", &body["tags"])?;
583
584 let mut state = self.state.write();
585 let sm = state
586 .state_machines
587 .get_mut(arn)
588 .ok_or_else(|| resource_not_found(arn))?;
589
590 fakecloud_core::tags::apply_tags(&mut sm.tags, &body, "tags", "key", "value").map_err(
591 |f| {
592 AwsServiceError::aws_error(
593 StatusCode::BAD_REQUEST,
594 "ValidationException",
595 format!("{f} must be a list"),
596 )
597 },
598 )?;
599
600 Ok(AwsResponse::ok_json(json!({})))
601 }
602
603 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
604 let body = req.json_body();
605 validate_required("resourceArn", &body["resourceArn"])?;
606 let arn = body["resourceArn"]
607 .as_str()
608 .ok_or_else(|| missing("resourceArn"))?;
609 validate_arn(arn)?;
610 validate_required("tagKeys", &body["tagKeys"])?;
611
612 let mut state = self.state.write();
613 let sm = state
614 .state_machines
615 .get_mut(arn)
616 .ok_or_else(|| resource_not_found(arn))?;
617
618 fakecloud_core::tags::remove_tags(&mut sm.tags, &body, "tagKeys").map_err(|f| {
619 AwsServiceError::aws_error(
620 StatusCode::BAD_REQUEST,
621 "ValidationException",
622 format!("{f} must be a list"),
623 )
624 })?;
625
626 Ok(AwsResponse::ok_json(json!({})))
627 }
628
629 fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
630 let body = req.json_body();
631 validate_required("resourceArn", &body["resourceArn"])?;
632 let arn = body["resourceArn"]
633 .as_str()
634 .ok_or_else(|| missing("resourceArn"))?;
635 validate_arn(arn)?;
636
637 let state = self.state.read();
638 let sm = state
639 .state_machines
640 .get(arn)
641 .ok_or_else(|| resource_not_found(arn))?;
642
643 let tags = fakecloud_core::tags::tags_to_json(&sm.tags, "key", "value");
644
645 Ok(AwsResponse::ok_json(json!({ "tags": tags })))
646 }
647}
648
649fn state_machine_to_json(sm: &StateMachine) -> Value {
652 let mut resp = json!({
653 "name": sm.name,
654 "stateMachineArn": sm.arn,
655 "definition": sm.definition,
656 "roleArn": sm.role_arn,
657 "type": sm.machine_type.as_str(),
658 "status": sm.status.as_str(),
659 "creationDate": sm.creation_date.timestamp() as f64,
660 "updateDate": sm.update_date.timestamp() as f64,
661 "revisionId": sm.revision_id,
662 "label": sm.name,
663 });
664
665 if !sm.description.is_empty() {
666 resp["description"] = json!(sm.description);
667 }
668
669 if let Some(ref logging) = sm.logging_configuration {
670 resp["loggingConfiguration"] = logging.clone();
671 } else {
672 resp["loggingConfiguration"] = json!({
673 "level": "OFF",
674 "includeExecutionData": false,
675 "destinations": [],
676 });
677 }
678
679 if let Some(ref tracing) = sm.tracing_configuration {
680 resp["tracingConfiguration"] = tracing.clone();
681 } else {
682 resp["tracingConfiguration"] = json!({
683 "enabled": false,
684 });
685 }
686
687 resp
688}
689
690fn missing(name: &str) -> AwsServiceError {
691 AwsServiceError::aws_error(
692 StatusCode::BAD_REQUEST,
693 "ValidationException",
694 format!("The request must contain the parameter {name}."),
695 )
696}
697
698fn state_machine_not_found(arn: &str) -> AwsServiceError {
699 AwsServiceError::aws_error(
700 StatusCode::BAD_REQUEST,
701 "StateMachineDoesNotExist",
702 format!("State Machine Does Not Exist: '{arn}'"),
703 )
704}
705
706fn resource_not_found(arn: &str) -> AwsServiceError {
707 AwsServiceError::aws_error(
708 StatusCode::BAD_REQUEST,
709 "ResourceNotFound",
710 format!("Resource not found: '{arn}'"),
711 )
712}
713
714fn validate_name(name: &str) -> Result<(), AwsServiceError> {
715 if name.is_empty() || name.len() > 80 {
716 return Err(AwsServiceError::aws_error(
717 StatusCode::BAD_REQUEST,
718 "InvalidName",
719 format!("Invalid Name: '{name}' (length must be between 1 and 80 characters)"),
720 ));
721 }
722 if !name
724 .chars()
725 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
726 {
727 return Err(AwsServiceError::aws_error(
728 StatusCode::BAD_REQUEST,
729 "InvalidName",
730 format!(
731 "Invalid Name: '{name}' (must only contain alphanumeric characters, hyphens, and underscores)"
732 ),
733 ));
734 }
735 Ok(())
736}
737
738fn validate_definition(definition: &str) -> Result<(), AwsServiceError> {
739 let parsed: Value = serde_json::from_str(definition).map_err(|e| {
740 AwsServiceError::aws_error(
741 StatusCode::BAD_REQUEST,
742 "InvalidDefinition",
743 format!("Invalid State Machine Definition: '{e}'"),
744 )
745 })?;
746
747 if parsed.get("StartAt").and_then(|v| v.as_str()).is_none() {
748 return Err(AwsServiceError::aws_error(
749 StatusCode::BAD_REQUEST,
750 "InvalidDefinition",
751 "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
752 .to_string(),
753 ));
754 }
755
756 let states = parsed.get("States").and_then(|v| v.as_object());
757 if states.is_none() {
758 return Err(AwsServiceError::aws_error(
759 StatusCode::BAD_REQUEST,
760 "InvalidDefinition",
761 "Invalid State Machine Definition: 'MISSING_STATES' (States field is required)"
762 .to_string(),
763 ));
764 }
765
766 let start_at = parsed["StartAt"].as_str().unwrap();
767 let states_obj = states.unwrap();
768 if !states_obj.contains_key(start_at) {
769 return Err(AwsServiceError::aws_error(
770 StatusCode::BAD_REQUEST,
771 "InvalidDefinition",
772 format!(
773 "Invalid State Machine Definition: 'MISSING_TRANSITION_TARGET' \
774 (StartAt '{start_at}' does not reference a valid state)"
775 ),
776 ));
777 }
778
779 Ok(())
780}
781
782fn execution_not_found(arn: &str) -> AwsServiceError {
783 AwsServiceError::aws_error(
784 StatusCode::BAD_REQUEST,
785 "ExecutionDoesNotExist",
786 format!("Execution Does Not Exist: '{arn}'"),
787 )
788}
789
790fn execution_to_json(exec: &Execution) -> Value {
791 let mut resp = json!({
792 "executionArn": exec.execution_arn,
793 "stateMachineArn": exec.state_machine_arn,
794 "name": exec.name,
795 "status": exec.status.as_str(),
796 "startDate": exec.start_date.timestamp() as f64,
797 });
798
799 if let Some(ref input) = exec.input {
800 resp["input"] = json!(input);
801 }
802 if let Some(ref output) = exec.output {
803 resp["output"] = json!(output);
804 }
805 if let Some(stop) = exec.stop_date {
806 resp["stopDate"] = json!(stop.timestamp() as f64);
807 }
808 if let Some(ref error) = exec.error {
809 resp["error"] = json!(error);
810 }
811 if let Some(ref cause) = exec.cause {
812 resp["cause"] = json!(cause);
813 }
814
815 resp
816}
817
818fn camel_to_details_key(event_type: &str) -> String {
820 let mut chars = event_type.chars();
821 match chars.next() {
822 None => String::new(),
823 Some(c) => c.to_lowercase().to_string() + chars.as_str(),
824 }
825}
826
827fn validate_arn(arn: &str) -> Result<(), AwsServiceError> {
828 if !arn.starts_with("arn:") {
829 return Err(AwsServiceError::aws_error(
830 StatusCode::BAD_REQUEST,
831 "InvalidArn",
832 format!("Invalid Arn: '{arn}'"),
833 ));
834 }
835 Ok(())
836}