1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use chrono::Utc;
6use http::StatusCode;
7use serde_json::{json, Value};
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::pagination::paginate;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_core::validation::*;
14use fakecloud_dynamodb::state::SharedDynamoDbState;
15use fakecloud_persistence::SnapshotStore;
16
17use crate::interpreter;
18use crate::state::{
19 Execution, ExecutionStatus, SharedStepFunctionsState, StateMachine, StateMachineStatus,
20 StateMachineType, StepFunctionsSnapshot, StepFunctionsState,
21 STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
22};
23
24const SUPPORTED: &[&str] = &[
25 "CreateStateMachine",
26 "DescribeStateMachine",
27 "ListStateMachines",
28 "DeleteStateMachine",
29 "UpdateStateMachine",
30 "TagResource",
31 "UntagResource",
32 "ListTagsForResource",
33 "StartExecution",
34 "StopExecution",
35 "DescribeExecution",
36 "ListExecutions",
37 "GetExecutionHistory",
38 "DescribeStateMachineForExecution",
39];
40
41pub struct StepFunctionsService {
42 state: SharedStepFunctionsState,
43 delivery: Option<Arc<DeliveryBus>>,
44 dynamodb_state: Option<SharedDynamoDbState>,
45 snapshot_store: Option<Arc<dyn SnapshotStore>>,
46 snapshot_lock: Arc<AsyncMutex<()>>,
47}
48
49impl StepFunctionsService {
50 pub fn new(state: SharedStepFunctionsState) -> Self {
51 Self {
52 state,
53 delivery: None,
54 dynamodb_state: None,
55 snapshot_store: None,
56 snapshot_lock: Arc::new(AsyncMutex::new(())),
57 }
58 }
59
60 pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
61 self.delivery = Some(delivery);
62 self
63 }
64
65 pub fn with_dynamodb(mut self, dynamodb_state: SharedDynamoDbState) -> Self {
66 self.dynamodb_state = Some(dynamodb_state);
67 self
68 }
69
70 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
71 self.snapshot_store = Some(store);
72 self
73 }
74
75 async fn save_snapshot(&self) {
76 let Some(store) = self.snapshot_store.clone() else {
77 return;
78 };
79 let _guard = self.snapshot_lock.lock().await;
80 let snapshot = StepFunctionsSnapshot {
81 schema_version: STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
82 state: None,
83 accounts: Some(self.state.read().clone()),
84 };
85 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
86 let bytes = serde_json::to_vec(&snapshot)
87 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
88 store.save(&bytes)
89 })
90 .await;
91 match join {
92 Ok(Ok(())) => {}
93 Ok(Err(err)) => tracing::error!(%err, "failed to write stepfunctions snapshot"),
94 Err(err) => tracing::error!(%err, "stepfunctions snapshot task panicked"),
95 }
96 }
97}
98
99fn is_mutating_action(action: &str) -> bool {
100 matches!(
101 action,
102 "CreateStateMachine"
103 | "DeleteStateMachine"
104 | "UpdateStateMachine"
105 | "TagResource"
106 | "UntagResource"
107 | "StartExecution"
108 | "StopExecution"
109 )
110}
111
112#[async_trait]
113impl AwsService for StepFunctionsService {
114 fn service_name(&self) -> &str {
115 "states"
116 }
117
118 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
119 let mutates = is_mutating_action(req.action.as_str());
120 let result = match req.action.as_str() {
121 "CreateStateMachine" => self.create_state_machine(&req),
122 "DescribeStateMachine" => self.describe_state_machine(&req),
123 "ListStateMachines" => self.list_state_machines(&req),
124 "DeleteStateMachine" => self.delete_state_machine(&req),
125 "UpdateStateMachine" => self.update_state_machine(&req),
126 "TagResource" => self.tag_resource(&req),
127 "UntagResource" => self.untag_resource(&req),
128 "ListTagsForResource" => self.list_tags_for_resource(&req),
129 "StartExecution" => self.start_execution(&req),
130 "StopExecution" => self.stop_execution(&req),
131 "DescribeExecution" => self.describe_execution(&req),
132 "ListExecutions" => self.list_executions(&req),
133 "GetExecutionHistory" => self.get_execution_history(&req),
134 "DescribeStateMachineForExecution" => self.describe_state_machine_for_execution(&req),
135 _ => Err(AwsServiceError::action_not_implemented(
136 "states",
137 &req.action,
138 )),
139 };
140 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
141 self.save_snapshot().await;
142 }
143 result
144 }
145
146 fn supported_actions(&self) -> &[&str] {
147 SUPPORTED
148 }
149}
150
151impl StepFunctionsService {
152 fn create_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
155 let body = req.json_body();
156
157 validate_required("name", &body["name"])?;
158 let name = body["name"].as_str().ok_or_else(|| missing("name"))?;
159 validate_name(name)?;
160
161 validate_required("definition", &body["definition"])?;
162 let definition = body["definition"]
163 .as_str()
164 .ok_or_else(|| missing("definition"))?;
165 validate_definition(definition)?;
166
167 validate_required("roleArn", &body["roleArn"])?;
168 let role_arn = body["roleArn"].as_str().ok_or_else(|| missing("roleArn"))?;
169 validate_arn(role_arn)?;
170
171 let machine_type = if let Some(t) = body["type"].as_str() {
172 StateMachineType::parse(t).ok_or_else(|| {
173 AwsServiceError::aws_error(
174 StatusCode::BAD_REQUEST,
175 "ValidationException",
176 format!(
177 "Value '{t}' at 'type' failed to satisfy constraint: \
178 Member must satisfy enum value set: [STANDARD, EXPRESS]"
179 ),
180 )
181 })?
182 } else {
183 StateMachineType::Standard
184 };
185
186 let mut accounts = self.state.write();
187 let state = accounts.get_or_create(&req.account_id);
188 let arn = state.state_machine_arn(name);
189
190 if state.state_machines.values().any(|sm| sm.name == name) {
192 return Err(AwsServiceError::aws_error(
193 StatusCode::CONFLICT,
194 "StateMachineAlreadyExists",
195 format!("State Machine Already Exists: '{arn}'"),
196 ));
197 }
198
199 let now = Utc::now();
200 let revision_id = uuid::Uuid::new_v4().to_string();
201
202 let mut tags = HashMap::new();
203 if !body["tags"].is_null() {
204 fakecloud_core::tags::apply_tags(&mut tags, &body, "tags", "key", "value").map_err(
205 |f| {
206 AwsServiceError::aws_error(
207 StatusCode::BAD_REQUEST,
208 "ValidationException",
209 format!("{f} must be a list"),
210 )
211 },
212 )?;
213 }
214
215 let sm = StateMachine {
216 name: name.to_string(),
217 arn: arn.clone(),
218 definition: definition.to_string(),
219 role_arn: role_arn.to_string(),
220 machine_type,
221 status: StateMachineStatus::Active,
222 creation_date: now,
223 update_date: now,
224 tags,
225 revision_id: revision_id.clone(),
226 logging_configuration: body.get("loggingConfiguration").cloned(),
227 tracing_configuration: body.get("tracingConfiguration").cloned(),
228 description: body["description"].as_str().unwrap_or("").to_string(),
229 };
230
231 state.state_machines.insert(arn.clone(), sm);
232
233 Ok(AwsResponse::ok_json(json!({
234 "stateMachineArn": arn,
235 "creationDate": now.timestamp() as f64,
236 "stateMachineVersionArn": arn,
237 })))
238 }
239
240 fn describe_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
241 let body = req.json_body();
242 validate_required("stateMachineArn", &body["stateMachineArn"])?;
243 let arn = body["stateMachineArn"]
244 .as_str()
245 .ok_or_else(|| missing("stateMachineArn"))?;
246 validate_arn(arn)?;
247
248 let accounts = self.state.read();
249 let empty = StepFunctionsState::new(&req.account_id, &req.region);
250 let state = accounts.get(&req.account_id).unwrap_or(&empty);
251 let sm = state
252 .state_machines
253 .get(arn)
254 .ok_or_else(|| state_machine_not_found(arn))?;
255
256 Ok(AwsResponse::ok_json(state_machine_to_json(sm)))
257 }
258
259 fn list_state_machines(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
260 let body = req.json_body();
261 let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
262 validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
263 let next_token = body["nextToken"].as_str();
264
265 let accounts = self.state.read();
266 let empty = StepFunctionsState::new(&req.account_id, &req.region);
267 let state = accounts.get(&req.account_id).unwrap_or(&empty);
268 let mut machines: Vec<&StateMachine> = state.state_machines.values().collect();
269 machines.sort_by(|a, b| a.name.cmp(&b.name));
270
271 let items: Vec<Value> = machines
272 .iter()
273 .map(|sm| {
274 json!({
275 "name": sm.name,
276 "stateMachineArn": sm.arn,
277 "type": sm.machine_type.as_str(),
278 "creationDate": sm.creation_date.timestamp() as f64,
279 })
280 })
281 .collect();
282
283 let (page, token) = paginate(&items, next_token, max_results);
284
285 let mut resp = json!({ "stateMachines": page });
286 if let Some(t) = token {
287 resp["nextToken"] = json!(t);
288 }
289 Ok(AwsResponse::ok_json(resp))
290 }
291
292 fn delete_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
293 let body = req.json_body();
294 validate_required("stateMachineArn", &body["stateMachineArn"])?;
295 let arn = body["stateMachineArn"]
296 .as_str()
297 .ok_or_else(|| missing("stateMachineArn"))?;
298 validate_arn(arn)?;
299
300 let mut accounts = self.state.write();
301 let state = accounts.get_or_create(&req.account_id);
302 state.state_machines.remove(arn);
304
305 Ok(AwsResponse::ok_json(json!({})))
306 }
307
308 fn update_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
309 let body = req.json_body();
310 validate_required("stateMachineArn", &body["stateMachineArn"])?;
311 let arn = body["stateMachineArn"]
312 .as_str()
313 .ok_or_else(|| missing("stateMachineArn"))?;
314 validate_arn(arn)?;
315
316 let mut accounts = self.state.write();
317 let state = accounts.get_or_create(&req.account_id);
318 let sm = state
319 .state_machines
320 .get_mut(arn)
321 .ok_or_else(|| state_machine_not_found(arn))?;
322
323 if let Some(definition) = body["definition"].as_str() {
324 validate_definition(definition)?;
325 sm.definition = definition.to_string();
326 }
327
328 if let Some(role_arn) = body["roleArn"].as_str() {
329 validate_arn(role_arn)?;
330 sm.role_arn = role_arn.to_string();
331 }
332
333 if let Some(logging) = body.get("loggingConfiguration") {
334 sm.logging_configuration = Some(logging.clone());
335 }
336
337 if let Some(tracing) = body.get("tracingConfiguration") {
338 sm.tracing_configuration = Some(tracing.clone());
339 }
340
341 if let Some(description) = body["description"].as_str() {
342 sm.description = description.to_string();
343 }
344
345 let now = Utc::now();
346 sm.update_date = now;
347 sm.revision_id = uuid::Uuid::new_v4().to_string();
348
349 let revision_id = sm.revision_id.clone();
350 let sm_arn = sm.arn.clone();
351
352 Ok(AwsResponse::ok_json(json!({
353 "updateDate": now.timestamp() as f64,
354 "revisionId": revision_id,
355 "stateMachineVersionArn": sm_arn,
356 })))
357 }
358
359 fn start_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
362 let body = req.json_body();
363 validate_required("stateMachineArn", &body["stateMachineArn"])?;
364 let sm_arn = body["stateMachineArn"]
365 .as_str()
366 .ok_or_else(|| missing("stateMachineArn"))?;
367 validate_arn(sm_arn)?;
368
369 let input = body["input"].as_str().map(|s| s.to_string());
370
371 if let Some(ref input_str) = input {
373 let _: serde_json::Value = serde_json::from_str(input_str).map_err(|_| {
374 AwsServiceError::aws_error(
375 StatusCode::BAD_REQUEST,
376 "InvalidExecutionInput",
377 "Invalid execution input: must be valid JSON".to_string(),
378 )
379 })?;
380 }
381
382 let execution_name = body["name"]
383 .as_str()
384 .map(|s| s.to_string())
385 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
386
387 if let Some(name) = body["name"].as_str() {
388 validate_name(name)?;
389 }
390
391 let mut accounts = self.state.write();
392 let state = accounts.get_or_create(&req.account_id);
393 let sm = state
394 .state_machines
395 .get(sm_arn)
396 .ok_or_else(|| state_machine_not_found(sm_arn))?;
397
398 let sm_name = sm.name.clone();
399 let definition = sm.definition.clone();
400 let exec_arn = state.execution_arn(&sm_name, &execution_name);
401
402 if state.executions.contains_key(&exec_arn) {
404 return Err(AwsServiceError::aws_error(
405 StatusCode::CONFLICT,
406 "ExecutionAlreadyExists",
407 format!("Execution Already Exists: '{exec_arn}'"),
408 ));
409 }
410
411 let now = Utc::now();
412 let execution = Execution {
413 execution_arn: exec_arn.clone(),
414 state_machine_arn: sm_arn.to_string(),
415 state_machine_name: sm_name,
416 name: execution_name,
417 status: ExecutionStatus::Running,
418 input: input.clone(),
419 output: None,
420 start_date: now,
421 stop_date: None,
422 error: None,
423 cause: None,
424 history_events: vec![],
425 };
426
427 state.executions.insert(exec_arn.clone(), execution);
428 drop(accounts);
429
430 let shared_state = self.state.clone();
432 let exec_arn_clone = exec_arn.clone();
433 let input_clone = input;
434 let delivery = self.delivery.clone();
435 let dynamodb_state = self.dynamodb_state.clone();
436 tokio::spawn(async move {
437 interpreter::execute_state_machine(
438 shared_state,
439 exec_arn_clone,
440 definition,
441 input_clone,
442 delivery,
443 dynamodb_state,
444 )
445 .await;
446 });
447
448 Ok(AwsResponse::ok_json(json!({
449 "executionArn": exec_arn,
450 "startDate": now.timestamp() as f64,
451 })))
452 }
453
454 fn stop_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
455 let body = req.json_body();
456 validate_required("executionArn", &body["executionArn"])?;
457 let exec_arn = body["executionArn"]
458 .as_str()
459 .ok_or_else(|| missing("executionArn"))?;
460
461 let error = body["error"].as_str().map(|s| s.to_string());
462 let cause = body["cause"].as_str().map(|s| s.to_string());
463
464 let mut accounts = self.state.write();
465 let state = accounts.get_or_create(&req.account_id);
466 let exec = state
467 .executions
468 .get_mut(exec_arn)
469 .ok_or_else(|| execution_not_found(exec_arn))?;
470
471 if exec.status != ExecutionStatus::Running {
472 return Err(AwsServiceError::aws_error(
473 StatusCode::BAD_REQUEST,
474 "ExecutionNotRunning",
475 format!("Execution is not running: '{exec_arn}'"),
476 ));
477 }
478
479 let now = Utc::now();
480 exec.status = ExecutionStatus::Aborted;
481 exec.stop_date = Some(now);
482 exec.error = error;
483 exec.cause = cause;
484
485 Ok(AwsResponse::ok_json(json!({
486 "stopDate": now.timestamp() as f64,
487 })))
488 }
489
490 fn describe_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
491 let body = req.json_body();
492 validate_required("executionArn", &body["executionArn"])?;
493 let exec_arn = body["executionArn"]
494 .as_str()
495 .ok_or_else(|| missing("executionArn"))?;
496
497 let accounts = self.state.read();
498 let empty = StepFunctionsState::new(&req.account_id, &req.region);
499 let state = accounts.get(&req.account_id).unwrap_or(&empty);
500 let exec = state
501 .executions
502 .get(exec_arn)
503 .ok_or_else(|| execution_not_found(exec_arn))?;
504
505 Ok(AwsResponse::ok_json(execution_to_json(exec)))
506 }
507
508 fn list_executions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
509 let body = req.json_body();
510 validate_required("stateMachineArn", &body["stateMachineArn"])?;
511 let sm_arn = body["stateMachineArn"]
512 .as_str()
513 .ok_or_else(|| missing("stateMachineArn"))?;
514 validate_arn(sm_arn)?;
515
516 let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
517 validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
518 let next_token = body["nextToken"].as_str();
519 let status_filter = body["statusFilter"].as_str();
520
521 let accounts = self.state.read();
522 let empty = StepFunctionsState::new(&req.account_id, &req.region);
523 let state = accounts.get(&req.account_id).unwrap_or(&empty);
524
525 if !state.state_machines.contains_key(sm_arn) {
527 return Err(state_machine_not_found(sm_arn));
528 }
529
530 let mut executions: Vec<&Execution> = state
531 .executions
532 .values()
533 .filter(|e| e.state_machine_arn == sm_arn)
534 .filter(|e| {
535 status_filter
536 .map(|sf| e.status.as_str() == sf)
537 .unwrap_or(true)
538 })
539 .collect();
540
541 executions.sort_by_key(|e| std::cmp::Reverse(e.start_date));
543
544 let items: Vec<Value> = executions
545 .iter()
546 .map(|e| {
547 let mut item = json!({
548 "executionArn": e.execution_arn,
549 "stateMachineArn": e.state_machine_arn,
550 "name": e.name,
551 "status": e.status.as_str(),
552 "startDate": e.start_date.timestamp() as f64,
553 });
554 if let Some(stop) = e.stop_date {
555 item["stopDate"] = json!(stop.timestamp() as f64);
556 }
557 item
558 })
559 .collect();
560
561 let (page, token) = paginate(&items, next_token, max_results);
562
563 let mut resp = json!({ "executions": page });
564 if let Some(t) = token {
565 resp["nextToken"] = json!(t);
566 }
567 Ok(AwsResponse::ok_json(resp))
568 }
569
570 fn get_execution_history(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
571 let body = req.json_body();
572 validate_required("executionArn", &body["executionArn"])?;
573 let exec_arn = body["executionArn"]
574 .as_str()
575 .ok_or_else(|| missing("executionArn"))?;
576
577 let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
578 validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
579 let next_token = body["nextToken"].as_str();
580 let reverse_order = body["reverseOrder"].as_bool().unwrap_or(false);
581
582 let accounts = self.state.read();
583 let empty = StepFunctionsState::new(&req.account_id, &req.region);
584 let state = accounts.get(&req.account_id).unwrap_or(&empty);
585 let exec = state
586 .executions
587 .get(exec_arn)
588 .ok_or_else(|| execution_not_found(exec_arn))?;
589
590 let mut events: Vec<Value> = exec
591 .history_events
592 .iter()
593 .map(|e| {
594 json!({
595 "id": e.id,
596 "type": e.event_type,
597 "timestamp": e.timestamp.timestamp() as f64,
598 "previousEventId": e.previous_event_id,
599 format!("{}EventDetails", camel_to_details_key(&e.event_type)): e.details,
600 })
601 })
602 .collect();
603
604 if reverse_order {
605 events.reverse();
606 }
607
608 let (page, token) = paginate(&events, next_token, max_results);
609
610 let mut resp = json!({ "events": page });
611 if let Some(t) = token {
612 resp["nextToken"] = json!(t);
613 }
614 Ok(AwsResponse::ok_json(resp))
615 }
616
617 fn describe_state_machine_for_execution(
618 &self,
619 req: &AwsRequest,
620 ) -> Result<AwsResponse, AwsServiceError> {
621 let body = req.json_body();
622 validate_required("executionArn", &body["executionArn"])?;
623 let exec_arn = body["executionArn"]
624 .as_str()
625 .ok_or_else(|| missing("executionArn"))?;
626
627 let accounts = self.state.read();
628 let empty = StepFunctionsState::new(&req.account_id, &req.region);
629 let state = accounts.get(&req.account_id).unwrap_or(&empty);
630 let exec = state
631 .executions
632 .get(exec_arn)
633 .ok_or_else(|| execution_not_found(exec_arn))?;
634
635 let sm = state
636 .state_machines
637 .get(&exec.state_machine_arn)
638 .ok_or_else(|| state_machine_not_found(&exec.state_machine_arn))?;
639
640 Ok(AwsResponse::ok_json(state_machine_to_json(sm)))
641 }
642
643 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
646 let body = req.json_body();
647 validate_required("resourceArn", &body["resourceArn"])?;
648 let arn = body["resourceArn"]
649 .as_str()
650 .ok_or_else(|| missing("resourceArn"))?;
651 validate_arn(arn)?;
652 validate_required("tags", &body["tags"])?;
653
654 let mut accounts = self.state.write();
655 let state = accounts.get_or_create(&req.account_id);
656 let sm = state
657 .state_machines
658 .get_mut(arn)
659 .ok_or_else(|| resource_not_found(arn))?;
660
661 fakecloud_core::tags::apply_tags(&mut sm.tags, &body, "tags", "key", "value").map_err(
662 |f| {
663 AwsServiceError::aws_error(
664 StatusCode::BAD_REQUEST,
665 "ValidationException",
666 format!("{f} must be a list"),
667 )
668 },
669 )?;
670
671 Ok(AwsResponse::ok_json(json!({})))
672 }
673
674 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
675 let body = req.json_body();
676 validate_required("resourceArn", &body["resourceArn"])?;
677 let arn = body["resourceArn"]
678 .as_str()
679 .ok_or_else(|| missing("resourceArn"))?;
680 validate_arn(arn)?;
681 validate_required("tagKeys", &body["tagKeys"])?;
682
683 let mut accounts = self.state.write();
684 let state = accounts.get_or_create(&req.account_id);
685 let sm = state
686 .state_machines
687 .get_mut(arn)
688 .ok_or_else(|| resource_not_found(arn))?;
689
690 fakecloud_core::tags::remove_tags(&mut sm.tags, &body, "tagKeys").map_err(|f| {
691 AwsServiceError::aws_error(
692 StatusCode::BAD_REQUEST,
693 "ValidationException",
694 format!("{f} must be a list"),
695 )
696 })?;
697
698 Ok(AwsResponse::ok_json(json!({})))
699 }
700
701 fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
702 let body = req.json_body();
703 validate_required("resourceArn", &body["resourceArn"])?;
704 let arn = body["resourceArn"]
705 .as_str()
706 .ok_or_else(|| missing("resourceArn"))?;
707 validate_arn(arn)?;
708
709 let accounts = self.state.read();
710 let empty = StepFunctionsState::new(&req.account_id, &req.region);
711 let state = accounts.get(&req.account_id).unwrap_or(&empty);
712 let sm = state
713 .state_machines
714 .get(arn)
715 .ok_or_else(|| resource_not_found(arn))?;
716
717 let tags = fakecloud_core::tags::tags_to_json(&sm.tags, "key", "value");
718
719 Ok(AwsResponse::ok_json(json!({ "tags": tags })))
720 }
721}
722
723fn state_machine_to_json(sm: &StateMachine) -> Value {
726 let mut resp = json!({
727 "name": sm.name,
728 "stateMachineArn": sm.arn,
729 "definition": sm.definition,
730 "roleArn": sm.role_arn,
731 "type": sm.machine_type.as_str(),
732 "status": sm.status.as_str(),
733 "creationDate": sm.creation_date.timestamp() as f64,
734 "updateDate": sm.update_date.timestamp() as f64,
735 "revisionId": sm.revision_id,
736 "label": sm.name,
737 });
738
739 if !sm.description.is_empty() {
740 resp["description"] = json!(sm.description);
741 }
742
743 if let Some(ref logging) = sm.logging_configuration {
744 resp["loggingConfiguration"] = logging.clone();
745 } else {
746 resp["loggingConfiguration"] = json!({
747 "level": "OFF",
748 "includeExecutionData": false,
749 "destinations": [],
750 });
751 }
752
753 if let Some(ref tracing) = sm.tracing_configuration {
754 resp["tracingConfiguration"] = tracing.clone();
755 } else {
756 resp["tracingConfiguration"] = json!({
757 "enabled": false,
758 });
759 }
760
761 resp
762}
763
764fn missing(name: &str) -> AwsServiceError {
765 AwsServiceError::aws_error(
766 StatusCode::BAD_REQUEST,
767 "ValidationException",
768 format!("The request must contain the parameter {name}."),
769 )
770}
771
772fn state_machine_not_found(arn: &str) -> AwsServiceError {
773 AwsServiceError::aws_error(
774 StatusCode::BAD_REQUEST,
775 "StateMachineDoesNotExist",
776 format!("State Machine Does Not Exist: '{arn}'"),
777 )
778}
779
780fn resource_not_found(arn: &str) -> AwsServiceError {
781 AwsServiceError::aws_error(
782 StatusCode::BAD_REQUEST,
783 "ResourceNotFound",
784 format!("Resource not found: '{arn}'"),
785 )
786}
787
788fn validate_name(name: &str) -> Result<(), AwsServiceError> {
789 if name.is_empty() || name.len() > 80 {
790 return Err(AwsServiceError::aws_error(
791 StatusCode::BAD_REQUEST,
792 "InvalidName",
793 format!("Invalid Name: '{name}' (length must be between 1 and 80 characters)"),
794 ));
795 }
796 if !name
798 .chars()
799 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
800 {
801 return Err(AwsServiceError::aws_error(
802 StatusCode::BAD_REQUEST,
803 "InvalidName",
804 format!(
805 "Invalid Name: '{name}' (must only contain alphanumeric characters, hyphens, and underscores)"
806 ),
807 ));
808 }
809 Ok(())
810}
811
812fn validate_definition(definition: &str) -> Result<(), AwsServiceError> {
813 let parsed: Value = serde_json::from_str(definition).map_err(|e| {
814 AwsServiceError::aws_error(
815 StatusCode::BAD_REQUEST,
816 "InvalidDefinition",
817 format!("Invalid State Machine Definition: '{e}'"),
818 )
819 })?;
820
821 if parsed.get("StartAt").and_then(|v| v.as_str()).is_none() {
822 return Err(AwsServiceError::aws_error(
823 StatusCode::BAD_REQUEST,
824 "InvalidDefinition",
825 "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
826 .to_string(),
827 ));
828 }
829
830 let states_obj = parsed
831 .get("States")
832 .and_then(|v| v.as_object())
833 .ok_or_else(|| {
834 AwsServiceError::aws_error(
835 StatusCode::BAD_REQUEST,
836 "InvalidDefinition",
837 "Invalid State Machine Definition: 'MISSING_STATES' (States field is required)"
838 .to_string(),
839 )
840 })?;
841
842 let start_at = parsed["StartAt"].as_str().ok_or_else(|| {
843 AwsServiceError::aws_error(
844 StatusCode::BAD_REQUEST,
845 "InvalidDefinition",
846 "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
847 .to_string(),
848 )
849 })?;
850 if !states_obj.contains_key(start_at) {
851 return Err(AwsServiceError::aws_error(
852 StatusCode::BAD_REQUEST,
853 "InvalidDefinition",
854 format!(
855 "Invalid State Machine Definition: 'MISSING_TRANSITION_TARGET' \
856 (StartAt '{start_at}' does not reference a valid state)"
857 ),
858 ));
859 }
860
861 Ok(())
862}
863
864fn execution_not_found(arn: &str) -> AwsServiceError {
865 AwsServiceError::aws_error(
866 StatusCode::BAD_REQUEST,
867 "ExecutionDoesNotExist",
868 format!("Execution Does Not Exist: '{arn}'"),
869 )
870}
871
872fn execution_to_json(exec: &Execution) -> Value {
873 let mut resp = json!({
874 "executionArn": exec.execution_arn,
875 "stateMachineArn": exec.state_machine_arn,
876 "name": exec.name,
877 "status": exec.status.as_str(),
878 "startDate": exec.start_date.timestamp() as f64,
879 });
880
881 if let Some(ref input) = exec.input {
882 resp["input"] = json!(input);
883 }
884 if let Some(ref output) = exec.output {
885 resp["output"] = json!(output);
886 }
887 if let Some(stop) = exec.stop_date {
888 resp["stopDate"] = json!(stop.timestamp() as f64);
889 }
890 if let Some(ref error) = exec.error {
891 resp["error"] = json!(error);
892 }
893 if let Some(ref cause) = exec.cause {
894 resp["cause"] = json!(cause);
895 }
896
897 resp
898}
899
900fn camel_to_details_key(event_type: &str) -> String {
902 let mut chars = event_type.chars();
903 match chars.next() {
904 None => String::new(),
905 Some(c) => c.to_lowercase().to_string() + chars.as_str(),
906 }
907}
908
909fn validate_arn(arn: &str) -> Result<(), AwsServiceError> {
910 if !arn.starts_with("arn:") {
911 return Err(AwsServiceError::aws_error(
912 StatusCode::BAD_REQUEST,
913 "InvalidArn",
914 format!("Invalid Arn: '{arn}'"),
915 ));
916 }
917 Ok(())
918}
919
920pub fn start_execution_from_delivery(
929 state: &SharedStepFunctionsState,
930 delivery: &Option<Arc<DeliveryBus>>,
931 dynamodb_state: &Option<SharedDynamoDbState>,
932 state_machine_arn: &str,
933 input: &str,
934) {
935 if serde_json::from_str::<serde_json::Value>(input).is_err() {
937 tracing::warn!(
938 state_machine_arn,
939 "Step Functions delivery: invalid JSON input, skipping execution"
940 );
941 return;
942 }
943
944 let execution_name = uuid::Uuid::new_v4().to_string();
945
946 let account_id = state_machine_arn
948 .split(':')
949 .nth(4)
950 .unwrap_or("000000000000")
951 .to_string();
952
953 let mut accounts = state.write();
954 let st = accounts.get_or_create(&account_id);
955 let sm = match st.state_machines.get(state_machine_arn) {
956 Some(sm) => sm,
957 None => {
958 tracing::warn!(
959 state_machine_arn,
960 "Step Functions delivery: state machine not found"
961 );
962 return;
963 }
964 };
965
966 let sm_name = sm.name.clone();
967 let definition = sm.definition.clone();
968 let exec_arn = st.execution_arn(&sm_name, &execution_name);
969
970 let now = Utc::now();
971 let execution = Execution {
972 execution_arn: exec_arn.clone(),
973 state_machine_arn: state_machine_arn.to_string(),
974 state_machine_name: sm_name,
975 name: execution_name,
976 status: ExecutionStatus::Running,
977 input: Some(input.to_string()),
978 output: None,
979 start_date: now,
980 stop_date: None,
981 error: None,
982 cause: None,
983 history_events: vec![],
984 };
985
986 st.executions.insert(exec_arn.clone(), execution);
987 drop(accounts);
988
989 let shared_state = state.clone();
990 let delivery = delivery.clone();
991 let dynamodb_state = dynamodb_state.clone();
992 let input = Some(input.to_string());
993 tokio::spawn(async move {
994 interpreter::execute_state_machine(
995 shared_state,
996 exec_arn,
997 definition,
998 input,
999 delivery,
1000 dynamodb_state,
1001 )
1002 .await;
1003 });
1004}
1005
1006#[cfg(test)]
1007mod tests {
1008 use super::*;
1009 use http::{HeaderMap, Method};
1010 use parking_lot::RwLock;
1011 use serde_json::Value;
1012 use std::sync::Arc;
1013
1014 fn make_state() -> SharedStepFunctionsState {
1015 Arc::new(RwLock::new(
1016 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
1017 ))
1018 }
1019
1020 fn make_request(action: &str, body: &str) -> AwsRequest {
1021 AwsRequest {
1022 service: "states".to_string(),
1023 action: action.to_string(),
1024 region: "us-east-1".to_string(),
1025 account_id: "123456789012".to_string(),
1026 request_id: "test-id".to_string(),
1027 headers: HeaderMap::new(),
1028 query_params: HashMap::new(),
1029 body: body.as_bytes().to_vec().into(),
1030 path_segments: vec![],
1031 raw_path: "/".to_string(),
1032 raw_query: String::new(),
1033 method: Method::POST,
1034 is_query_protocol: false,
1035 access_key_id: None,
1036 principal: None,
1037 }
1038 }
1039
1040 fn body_json(resp: &AwsResponse) -> Value {
1041 serde_json::from_slice(resp.body.expect_bytes()).unwrap()
1042 }
1043
1044 fn expect_err(result: Result<AwsResponse, AwsServiceError>) -> AwsServiceError {
1045 match result {
1046 Err(e) => e,
1047 Ok(_) => panic!("expected error, got Ok"),
1048 }
1049 }
1050
1051 const VALID_DEF: &str = r#"{"StartAt":"Pass","States":{"Pass":{"Type":"Pass","End":true}}}"#;
1052
1053 fn create_sm(svc: &StepFunctionsService, name: &str) -> String {
1054 let body = json!({
1055 "name": name,
1056 "definition": VALID_DEF,
1057 "roleArn": "arn:aws:iam::123456789012:role/test",
1058 });
1059 let req = make_request("CreateStateMachine", &body.to_string());
1060 let resp = svc.create_state_machine(&req).unwrap();
1061 let b = body_json(&resp);
1062 b["stateMachineArn"].as_str().unwrap().to_string()
1063 }
1064
1065 #[test]
1068 fn create_state_machine_basic() {
1069 let svc = StepFunctionsService::new(make_state());
1070 let arn = create_sm(&svc, "test-sm");
1071 assert!(arn.contains("test-sm"));
1072 }
1073
1074 #[test]
1075 fn create_state_machine_with_express_type() {
1076 let svc = StepFunctionsService::new(make_state());
1077 let body = json!({
1078 "name": "express-sm",
1079 "definition": VALID_DEF,
1080 "roleArn": "arn:aws:iam::123456789012:role/r",
1081 "type": "EXPRESS",
1082 });
1083 let req = make_request("CreateStateMachine", &body.to_string());
1084 let resp = svc.create_state_machine(&req).unwrap();
1085 let b = body_json(&resp);
1086 assert!(b["stateMachineArn"].as_str().is_some());
1087 }
1088
1089 #[test]
1090 fn create_state_machine_duplicate_fails() {
1091 let svc = StepFunctionsService::new(make_state());
1092 create_sm(&svc, "dup-sm");
1093 let body = json!({
1094 "name": "dup-sm",
1095 "definition": VALID_DEF,
1096 "roleArn": "arn:aws:iam::123456789012:role/r",
1097 });
1098 let req = make_request("CreateStateMachine", &body.to_string());
1099 let err = expect_err(svc.create_state_machine(&req));
1100 assert!(err.to_string().contains("StateMachineAlreadyExists"));
1101 }
1102
1103 #[test]
1104 fn create_state_machine_missing_name() {
1105 let svc = StepFunctionsService::new(make_state());
1106 let body = json!({
1107 "definition": VALID_DEF,
1108 "roleArn": "arn:aws:iam::123456789012:role/r",
1109 });
1110 let req = make_request("CreateStateMachine", &body.to_string());
1111 assert!(svc.create_state_machine(&req).is_err());
1112 }
1113
1114 #[test]
1115 fn create_state_machine_invalid_definition() {
1116 let svc = StepFunctionsService::new(make_state());
1117 let body = json!({
1118 "name": "bad-def",
1119 "definition": "not json",
1120 "roleArn": "arn:aws:iam::123456789012:role/r",
1121 });
1122 let req = make_request("CreateStateMachine", &body.to_string());
1123 let err = expect_err(svc.create_state_machine(&req));
1124 assert!(err.to_string().contains("InvalidDefinition"));
1125 }
1126
1127 #[test]
1128 fn create_state_machine_definition_missing_start_at() {
1129 let svc = StepFunctionsService::new(make_state());
1130 let body = json!({
1131 "name": "no-start",
1132 "definition": r#"{"States":{"S":{"Type":"Pass","End":true}}}"#,
1133 "roleArn": "arn:aws:iam::123456789012:role/r",
1134 });
1135 let req = make_request("CreateStateMachine", &body.to_string());
1136 let err = expect_err(svc.create_state_machine(&req));
1137 assert!(err.to_string().contains("InvalidDefinition"));
1138 }
1139
1140 #[test]
1141 fn create_state_machine_definition_missing_states() {
1142 let svc = StepFunctionsService::new(make_state());
1143 let body = json!({
1144 "name": "no-states",
1145 "definition": r#"{"StartAt":"S"}"#,
1146 "roleArn": "arn:aws:iam::123456789012:role/r",
1147 });
1148 let req = make_request("CreateStateMachine", &body.to_string());
1149 let err = expect_err(svc.create_state_machine(&req));
1150 assert!(err.to_string().contains("InvalidDefinition"));
1151 }
1152
1153 #[test]
1154 fn create_state_machine_definition_start_at_not_in_states() {
1155 let svc = StepFunctionsService::new(make_state());
1156 let body = json!({
1157 "name": "bad-start",
1158 "definition": r#"{"StartAt":"Missing","States":{"S":{"Type":"Pass","End":true}}}"#,
1159 "roleArn": "arn:aws:iam::123456789012:role/r",
1160 });
1161 let req = make_request("CreateStateMachine", &body.to_string());
1162 let err = expect_err(svc.create_state_machine(&req));
1163 assert!(err.to_string().contains("MISSING_TRANSITION_TARGET"));
1164 }
1165
1166 #[test]
1167 fn create_state_machine_invalid_type() {
1168 let svc = StepFunctionsService::new(make_state());
1169 let body = json!({
1170 "name": "bad-type",
1171 "definition": VALID_DEF,
1172 "roleArn": "arn:aws:iam::123456789012:role/r",
1173 "type": "INVALID",
1174 });
1175 let req = make_request("CreateStateMachine", &body.to_string());
1176 assert!(svc.create_state_machine(&req).is_err());
1177 }
1178
1179 #[test]
1180 fn create_state_machine_invalid_arn() {
1181 let svc = StepFunctionsService::new(make_state());
1182 let body = json!({
1183 "name": "bad-arn",
1184 "definition": VALID_DEF,
1185 "roleArn": "not-an-arn",
1186 });
1187 let req = make_request("CreateStateMachine", &body.to_string());
1188 let err = expect_err(svc.create_state_machine(&req));
1189 assert!(err.to_string().contains("InvalidArn"));
1190 }
1191
1192 #[test]
1193 fn create_state_machine_invalid_name() {
1194 let svc = StepFunctionsService::new(make_state());
1195 let body = json!({
1196 "name": "has spaces!",
1197 "definition": VALID_DEF,
1198 "roleArn": "arn:aws:iam::123456789012:role/r",
1199 });
1200 let req = make_request("CreateStateMachine", &body.to_string());
1201 let err = expect_err(svc.create_state_machine(&req));
1202 assert!(err.to_string().contains("InvalidName"));
1203 }
1204
1205 #[test]
1206 fn create_state_machine_name_too_long() {
1207 let svc = StepFunctionsService::new(make_state());
1208 let long_name = "a".repeat(81);
1209 let body = json!({
1210 "name": long_name,
1211 "definition": VALID_DEF,
1212 "roleArn": "arn:aws:iam::123456789012:role/r",
1213 });
1214 let req = make_request("CreateStateMachine", &body.to_string());
1215 let err = expect_err(svc.create_state_machine(&req));
1216 assert!(err.to_string().contains("InvalidName"));
1217 }
1218
1219 #[test]
1222 fn describe_state_machine_found() {
1223 let svc = StepFunctionsService::new(make_state());
1224 let arn = create_sm(&svc, "desc-sm");
1225
1226 let req = make_request(
1227 "DescribeStateMachine",
1228 &json!({"stateMachineArn": arn}).to_string(),
1229 );
1230 let resp = svc.describe_state_machine(&req).unwrap();
1231 let b = body_json(&resp);
1232 assert_eq!(b["name"], "desc-sm");
1233 assert_eq!(b["status"], "ACTIVE");
1234 assert!(b["definition"].as_str().is_some());
1235 }
1236
1237 #[test]
1238 fn describe_state_machine_not_found() {
1239 let svc = StepFunctionsService::new(make_state());
1240 let req = make_request(
1241 "DescribeStateMachine",
1242 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1243 .to_string(),
1244 );
1245 let err = expect_err(svc.describe_state_machine(&req));
1246 assert!(err.to_string().contains("StateMachineDoesNotExist"));
1247 }
1248
1249 #[test]
1252 fn list_state_machines_empty() {
1253 let svc = StepFunctionsService::new(make_state());
1254 let req = make_request("ListStateMachines", "{}");
1255 let resp = svc.list_state_machines(&req).unwrap();
1256 let b = body_json(&resp);
1257 assert!(b["stateMachines"].as_array().unwrap().is_empty());
1258 }
1259
1260 #[test]
1261 fn list_state_machines_returns_created() {
1262 let svc = StepFunctionsService::new(make_state());
1263 create_sm(&svc, "sm-1");
1264 create_sm(&svc, "sm-2");
1265
1266 let req = make_request("ListStateMachines", "{}");
1267 let resp = svc.list_state_machines(&req).unwrap();
1268 let b = body_json(&resp);
1269 assert_eq!(b["stateMachines"].as_array().unwrap().len(), 2);
1270 }
1271
1272 #[test]
1275 fn delete_state_machine() {
1276 let svc = StepFunctionsService::new(make_state());
1277 let arn = create_sm(&svc, "del-sm");
1278
1279 let req = make_request(
1280 "DeleteStateMachine",
1281 &json!({"stateMachineArn": arn}).to_string(),
1282 );
1283 svc.delete_state_machine(&req).unwrap();
1284
1285 let req = make_request(
1287 "DescribeStateMachine",
1288 &json!({"stateMachineArn": arn}).to_string(),
1289 );
1290 assert!(svc.describe_state_machine(&req).is_err());
1291 }
1292
1293 #[test]
1294 fn delete_state_machine_nonexistent_succeeds() {
1295 let svc = StepFunctionsService::new(make_state());
1296 let req = make_request(
1297 "DeleteStateMachine",
1298 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1299 .to_string(),
1300 );
1301 svc.delete_state_machine(&req).unwrap();
1303 }
1304
1305 #[test]
1308 fn update_state_machine() {
1309 let svc = StepFunctionsService::new(make_state());
1310 let arn = create_sm(&svc, "upd-sm");
1311
1312 let new_def = r#"{"StartAt":"NewPass","States":{"NewPass":{"Type":"Pass","End":true}}}"#;
1313 let body = json!({
1314 "stateMachineArn": arn,
1315 "definition": new_def,
1316 "description": "updated",
1317 });
1318 let req = make_request("UpdateStateMachine", &body.to_string());
1319 let resp = svc.update_state_machine(&req).unwrap();
1320 let b = body_json(&resp);
1321 assert!(b["updateDate"].as_f64().is_some());
1322
1323 let req = make_request(
1325 "DescribeStateMachine",
1326 &json!({"stateMachineArn": arn}).to_string(),
1327 );
1328 let resp = svc.describe_state_machine(&req).unwrap();
1329 let b = body_json(&resp);
1330 assert!(b["definition"].as_str().unwrap().contains("NewPass"));
1331 assert_eq!(b["description"], "updated");
1332 }
1333
1334 #[test]
1335 fn update_state_machine_not_found() {
1336 let svc = StepFunctionsService::new(make_state());
1337 let body = json!({
1338 "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1339 "definition": VALID_DEF,
1340 });
1341 let req = make_request("UpdateStateMachine", &body.to_string());
1342 let err = expect_err(svc.update_state_machine(&req));
1343 assert!(err.to_string().contains("StateMachineDoesNotExist"));
1344 }
1345
1346 #[tokio::test]
1349 async fn start_execution_basic() {
1350 let svc = StepFunctionsService::new(make_state());
1351 let arn = create_sm(&svc, "exec-sm");
1352
1353 let body = json!({
1354 "stateMachineArn": arn,
1355 "input": r#"{"key":"value"}"#,
1356 });
1357 let req = make_request("StartExecution", &body.to_string());
1358 let resp = svc.start_execution(&req).unwrap();
1359 let b = body_json(&resp);
1360 assert!(b["executionArn"].as_str().is_some());
1361 assert!(b["startDate"].as_f64().is_some());
1362 }
1363
1364 #[tokio::test]
1365 async fn start_execution_with_name() {
1366 let svc = StepFunctionsService::new(make_state());
1367 let arn = create_sm(&svc, "named-exec");
1368
1369 let body = json!({
1370 "stateMachineArn": arn,
1371 "name": "my-execution",
1372 });
1373 let req = make_request("StartExecution", &body.to_string());
1374 let resp = svc.start_execution(&req).unwrap();
1375 let b = body_json(&resp);
1376 assert!(b["executionArn"].as_str().unwrap().contains("my-execution"));
1377 }
1378
1379 #[tokio::test]
1380 async fn start_execution_sm_not_found() {
1381 let svc = StepFunctionsService::new(make_state());
1382 let body = json!({
1383 "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1384 });
1385 let req = make_request("StartExecution", &body.to_string());
1386 let err = expect_err(svc.start_execution(&req));
1387 assert!(err.to_string().contains("StateMachineDoesNotExist"));
1388 }
1389
1390 #[tokio::test]
1391 async fn start_execution_invalid_input() {
1392 let svc = StepFunctionsService::new(make_state());
1393 let arn = create_sm(&svc, "bad-input");
1394
1395 let body = json!({
1396 "stateMachineArn": arn,
1397 "input": "not json",
1398 });
1399 let req = make_request("StartExecution", &body.to_string());
1400 let err = expect_err(svc.start_execution(&req));
1401 assert!(err.to_string().contains("InvalidExecutionInput"));
1402 }
1403
1404 #[tokio::test]
1405 async fn start_execution_duplicate_name() {
1406 let svc = StepFunctionsService::new(make_state());
1407 let arn = create_sm(&svc, "dup-exec");
1408
1409 let body = json!({
1410 "stateMachineArn": arn,
1411 "name": "same-name",
1412 });
1413 let req = make_request("StartExecution", &body.to_string());
1414 svc.start_execution(&req).unwrap();
1415
1416 let req = make_request("StartExecution", &body.to_string());
1417 let err = expect_err(svc.start_execution(&req));
1418 assert!(err.to_string().contains("ExecutionAlreadyExists"));
1419 }
1420
1421 #[tokio::test]
1424 async fn describe_execution_found() {
1425 let svc = StepFunctionsService::new(make_state());
1426 let sm_arn = create_sm(&svc, "desc-exec");
1427
1428 let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
1429 let req = make_request("StartExecution", &body.to_string());
1430 let resp = svc.start_execution(&req).unwrap();
1431 let exec_arn = body_json(&resp)["executionArn"]
1432 .as_str()
1433 .unwrap()
1434 .to_string();
1435
1436 let req = make_request(
1437 "DescribeExecution",
1438 &json!({"executionArn": exec_arn}).to_string(),
1439 );
1440 let resp = svc.describe_execution(&req).unwrap();
1441 let b = body_json(&resp);
1442 assert_eq!(b["name"], "e1");
1443 assert_eq!(b["status"], "RUNNING");
1444 }
1445
1446 #[tokio::test]
1447 async fn describe_execution_not_found() {
1448 let svc = StepFunctionsService::new(make_state());
1449 let req = make_request(
1450 "DescribeExecution",
1451 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1452 .to_string(),
1453 );
1454 let err = expect_err(svc.describe_execution(&req));
1455 assert!(err.to_string().contains("ExecutionDoesNotExist"));
1456 }
1457
1458 #[tokio::test]
1461 async fn stop_execution() {
1462 let svc = StepFunctionsService::new(make_state());
1463 let sm_arn = create_sm(&svc, "stop-sm");
1464
1465 let body = json!({"stateMachineArn": sm_arn, "name": "stop-e"});
1466 let req = make_request("StartExecution", &body.to_string());
1467 let resp = svc.start_execution(&req).unwrap();
1468 let exec_arn = body_json(&resp)["executionArn"]
1469 .as_str()
1470 .unwrap()
1471 .to_string();
1472
1473 let body = json!({
1474 "executionArn": exec_arn,
1475 "error": "UserAborted",
1476 "cause": "test stop",
1477 });
1478 let req = make_request("StopExecution", &body.to_string());
1479 let resp = svc.stop_execution(&req).unwrap();
1480 let b = body_json(&resp);
1481 assert!(b["stopDate"].as_f64().is_some());
1482
1483 let req = make_request(
1485 "DescribeExecution",
1486 &json!({"executionArn": exec_arn}).to_string(),
1487 );
1488 let resp = svc.describe_execution(&req).unwrap();
1489 let b = body_json(&resp);
1490 assert_eq!(b["status"], "ABORTED");
1491 assert_eq!(b["error"], "UserAborted");
1492 }
1493
1494 #[tokio::test]
1495 async fn stop_execution_not_found() {
1496 let svc = StepFunctionsService::new(make_state());
1497 let req = make_request(
1498 "StopExecution",
1499 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1500 .to_string(),
1501 );
1502 let err = expect_err(svc.stop_execution(&req));
1503 assert!(err.to_string().contains("ExecutionDoesNotExist"));
1504 }
1505
1506 #[tokio::test]
1509 async fn list_executions() {
1510 let svc = StepFunctionsService::new(make_state());
1511 let sm_arn = create_sm(&svc, "list-exec");
1512
1513 for i in 0..3 {
1514 let body = json!({"stateMachineArn": sm_arn, "name": format!("e{i}")});
1515 let req = make_request("StartExecution", &body.to_string());
1516 svc.start_execution(&req).unwrap();
1517 }
1518
1519 let req = make_request(
1520 "ListExecutions",
1521 &json!({"stateMachineArn": sm_arn}).to_string(),
1522 );
1523 let resp = svc.list_executions(&req).unwrap();
1524 let b = body_json(&resp);
1525 assert_eq!(b["executions"].as_array().unwrap().len(), 3);
1526 }
1527
1528 #[tokio::test]
1529 async fn list_executions_sm_not_found() {
1530 let svc = StepFunctionsService::new(make_state());
1531 let req = make_request(
1532 "ListExecutions",
1533 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1534 .to_string(),
1535 );
1536 let err = expect_err(svc.list_executions(&req));
1537 assert!(err.to_string().contains("StateMachineDoesNotExist"));
1538 }
1539
1540 #[tokio::test]
1543 async fn get_execution_history_not_found() {
1544 let svc = StepFunctionsService::new(make_state());
1545 let req = make_request(
1546 "GetExecutionHistory",
1547 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1548 .to_string(),
1549 );
1550 let err = expect_err(svc.get_execution_history(&req));
1551 assert!(err.to_string().contains("ExecutionDoesNotExist"));
1552 }
1553
1554 #[tokio::test]
1557 async fn describe_sm_for_execution() {
1558 let svc = StepFunctionsService::new(make_state());
1559 let sm_arn = create_sm(&svc, "sm-for-exec");
1560
1561 let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
1562 let req = make_request("StartExecution", &body.to_string());
1563 let resp = svc.start_execution(&req).unwrap();
1564 let exec_arn = body_json(&resp)["executionArn"]
1565 .as_str()
1566 .unwrap()
1567 .to_string();
1568
1569 let req = make_request(
1570 "DescribeStateMachineForExecution",
1571 &json!({"executionArn": exec_arn}).to_string(),
1572 );
1573 let resp = svc.describe_state_machine_for_execution(&req).unwrap();
1574 let b = body_json(&resp);
1575 assert_eq!(b["name"], "sm-for-exec");
1576 }
1577
1578 #[test]
1581 fn tag_untag_list_tags() {
1582 let svc = StepFunctionsService::new(make_state());
1583 let arn = create_sm(&svc, "tagged-sm");
1584
1585 let body = json!({
1587 "resourceArn": arn,
1588 "tags": [{"key": "env", "value": "prod"}],
1589 });
1590 let req = make_request("TagResource", &body.to_string());
1591 svc.tag_resource(&req).unwrap();
1592
1593 let req = make_request(
1595 "ListTagsForResource",
1596 &json!({"resourceArn": arn}).to_string(),
1597 );
1598 let resp = svc.list_tags_for_resource(&req).unwrap();
1599 let b = body_json(&resp);
1600 let tags = b["tags"].as_array().unwrap();
1601 assert_eq!(tags.len(), 1);
1602 assert_eq!(tags[0]["key"], "env");
1603
1604 let body = json!({
1606 "resourceArn": arn,
1607 "tagKeys": ["env"],
1608 });
1609 let req = make_request("UntagResource", &body.to_string());
1610 svc.untag_resource(&req).unwrap();
1611
1612 let req = make_request(
1614 "ListTagsForResource",
1615 &json!({"resourceArn": arn}).to_string(),
1616 );
1617 let resp = svc.list_tags_for_resource(&req).unwrap();
1618 let b = body_json(&resp);
1619 assert!(b["tags"].as_array().unwrap().is_empty());
1620 }
1621
1622 #[test]
1623 fn tag_resource_not_found() {
1624 let svc = StepFunctionsService::new(make_state());
1625 let body = json!({
1626 "resourceArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1627 "tags": [{"key": "k", "value": "v"}],
1628 });
1629 let req = make_request("TagResource", &body.to_string());
1630 let err = expect_err(svc.tag_resource(&req));
1631 assert!(err.to_string().contains("ResourceNotFound"));
1632 }
1633
1634 #[test]
1637 fn test_validate_name() {
1638 assert!(validate_name("valid-name").is_ok());
1639 assert!(validate_name("under_score").is_ok());
1640 assert!(validate_name("").is_err());
1641 assert!(validate_name("has spaces").is_err());
1642 assert!(validate_name(&"a".repeat(81)).is_err());
1643 }
1644
1645 #[test]
1646 fn test_validate_definition() {
1647 assert!(validate_definition(VALID_DEF).is_ok());
1648 assert!(validate_definition("not json").is_err());
1649 assert!(validate_definition(r#"{"States":{}}"#).is_err()); assert!(validate_definition(r#"{"StartAt":"S"}"#).is_err()); }
1652
1653 #[test]
1654 fn test_validate_arn() {
1655 assert!(validate_arn("arn:aws:states:us-east-1:123:sm:test").is_ok());
1656 assert!(validate_arn("not-an-arn").is_err());
1657 }
1658
1659 #[test]
1660 fn test_camel_to_details_key() {
1661 assert_eq!(camel_to_details_key("PassStateEntered"), "passStateEntered");
1662 assert_eq!(camel_to_details_key(""), "");
1663 }
1664
1665 #[test]
1666 fn test_is_mutating_action() {
1667 assert!(is_mutating_action("CreateStateMachine"));
1668 assert!(is_mutating_action("StartExecution"));
1669 assert!(!is_mutating_action("DescribeStateMachine"));
1670 assert!(!is_mutating_action("ListStateMachines"));
1671 }
1672}