1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use chrono::Utc;
6use http::StatusCode;
7use serde_json::{json, Value};
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::pagination::paginate;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_core::validation::*;
14use fakecloud_dynamodb::state::SharedDynamoDbState;
15use fakecloud_persistence::SnapshotStore;
16
17use crate::interpreter;
18use crate::state::{
19 Execution, ExecutionStatus, SharedStepFunctionsState, StateMachine, StateMachineStatus,
20 StateMachineType, StepFunctionsSnapshot, StepFunctionsState,
21 STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
22};
23
24const SUPPORTED: &[&str] = &[
25 "CreateStateMachine",
26 "DescribeStateMachine",
27 "ListStateMachines",
28 "DeleteStateMachine",
29 "UpdateStateMachine",
30 "TagResource",
31 "UntagResource",
32 "ListTagsForResource",
33 "StartExecution",
34 "StopExecution",
35 "DescribeExecution",
36 "ListExecutions",
37 "GetExecutionHistory",
38 "DescribeStateMachineForExecution",
39 "CreateActivity",
40 "DeleteActivity",
41 "DescribeActivity",
42 "ListActivities",
43 "GetActivityTask",
44 "SendTaskFailure",
45 "SendTaskHeartbeat",
46 "SendTaskSuccess",
47 "PublishStateMachineVersion",
48 "DeleteStateMachineVersion",
49 "ListStateMachineVersions",
50 "CreateStateMachineAlias",
51 "DeleteStateMachineAlias",
52 "DescribeStateMachineAlias",
53 "ListStateMachineAliases",
54 "UpdateStateMachineAlias",
55 "DescribeMapRun",
56 "ListMapRuns",
57 "UpdateMapRun",
58 "RedriveExecution",
59 "StartSyncExecution",
60 "TestState",
61 "ValidateStateMachineDefinition",
62];
63
64pub struct StepFunctionsService {
65 state: SharedStepFunctionsState,
66 delivery: Option<Arc<DeliveryBus>>,
67 dynamodb_state: Option<SharedDynamoDbState>,
68 snapshot_store: Option<Arc<dyn SnapshotStore>>,
69 snapshot_lock: Arc<AsyncMutex<()>>,
70}
71
72impl StepFunctionsService {
73 pub fn new(state: SharedStepFunctionsState) -> Self {
74 Self {
75 state,
76 delivery: None,
77 dynamodb_state: None,
78 snapshot_store: None,
79 snapshot_lock: Arc::new(AsyncMutex::new(())),
80 }
81 }
82
83 pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
84 self.delivery = Some(delivery);
85 self
86 }
87
88 pub fn with_dynamodb(mut self, dynamodb_state: SharedDynamoDbState) -> Self {
89 self.dynamodb_state = Some(dynamodb_state);
90 self
91 }
92
93 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
94 self.snapshot_store = Some(store);
95 self
96 }
97
98 async fn save_snapshot(&self) {
99 let Some(store) = self.snapshot_store.clone() else {
100 return;
101 };
102 let _guard = self.snapshot_lock.lock().await;
103 let snapshot = StepFunctionsSnapshot {
104 schema_version: STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
105 state: None,
106 accounts: Some(self.state.read().clone()),
107 };
108 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
109 let bytes = serde_json::to_vec(&snapshot)
110 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
111 store.save(&bytes)
112 })
113 .await;
114 match join {
115 Ok(Ok(())) => {}
116 Ok(Err(err)) => tracing::error!(%err, "failed to write stepfunctions snapshot"),
117 Err(err) => tracing::error!(%err, "stepfunctions snapshot task panicked"),
118 }
119 }
120}
121
122fn is_mutating_action(action: &str) -> bool {
123 matches!(
124 action,
125 "CreateStateMachine"
126 | "DeleteStateMachine"
127 | "UpdateStateMachine"
128 | "TagResource"
129 | "UntagResource"
130 | "StartExecution"
131 | "StopExecution"
132 | "CreateActivity"
133 | "DeleteActivity"
134 | "GetActivityTask"
135 | "SendTaskFailure"
136 | "SendTaskHeartbeat"
137 | "SendTaskSuccess"
138 | "PublishStateMachineVersion"
139 | "DeleteStateMachineVersion"
140 | "CreateStateMachineAlias"
141 | "DeleteStateMachineAlias"
142 | "UpdateStateMachineAlias"
143 | "UpdateMapRun"
144 | "RedriveExecution"
145 | "StartSyncExecution"
146 )
147}
148
149#[async_trait]
150impl AwsService for StepFunctionsService {
151 fn service_name(&self) -> &str {
152 "states"
153 }
154
155 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
156 let mutates = is_mutating_action(req.action.as_str());
157 let result = match req.action.as_str() {
158 "CreateStateMachine" => self.create_state_machine(&req),
159 "DescribeStateMachine" => self.describe_state_machine(&req),
160 "ListStateMachines" => self.list_state_machines(&req),
161 "DeleteStateMachine" => self.delete_state_machine(&req),
162 "UpdateStateMachine" => self.update_state_machine(&req),
163 "TagResource" => self.tag_resource(&req),
164 "UntagResource" => self.untag_resource(&req),
165 "ListTagsForResource" => self.list_tags_for_resource(&req),
166 "StartExecution" => self.start_execution(&req),
167 "StopExecution" => self.stop_execution(&req),
168 "DescribeExecution" => self.describe_execution(&req),
169 "ListExecutions" => self.list_executions(&req),
170 "GetExecutionHistory" => self.get_execution_history(&req),
171 "DescribeStateMachineForExecution" => self.describe_state_machine_for_execution(&req),
172 "CreateActivity" => self.create_activity(&req),
173 "DeleteActivity" => self.delete_activity(&req),
174 "DescribeActivity" => self.describe_activity(&req),
175 "ListActivities" => self.list_activities(&req),
176 "GetActivityTask" => self.get_activity_task(&req),
177 "SendTaskFailure" => self.send_task_failure(&req),
178 "SendTaskHeartbeat" => self.send_task_heartbeat(&req),
179 "SendTaskSuccess" => self.send_task_success(&req),
180 "PublishStateMachineVersion" => self.publish_state_machine_version(&req),
181 "DeleteStateMachineVersion" => self.delete_state_machine_version(&req),
182 "ListStateMachineVersions" => self.list_state_machine_versions(&req),
183 "CreateStateMachineAlias" => self.create_state_machine_alias(&req),
184 "DeleteStateMachineAlias" => self.delete_state_machine_alias(&req),
185 "DescribeStateMachineAlias" => self.describe_state_machine_alias(&req),
186 "ListStateMachineAliases" => self.list_state_machine_aliases(&req),
187 "UpdateStateMachineAlias" => self.update_state_machine_alias(&req),
188 "DescribeMapRun" => self.describe_map_run(&req),
189 "ListMapRuns" => self.list_map_runs(&req),
190 "UpdateMapRun" => self.update_map_run(&req),
191 "RedriveExecution" => self.redrive_execution(&req),
192 "StartSyncExecution" => self.start_sync_execution(&req),
193 "TestState" => self.test_state(&req),
194 "ValidateStateMachineDefinition" => self.validate_state_machine_definition(&req),
195 _ => Err(AwsServiceError::action_not_implemented(
196 "states",
197 &req.action,
198 )),
199 };
200 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
201 self.save_snapshot().await;
202 }
203 result
204 }
205
206 fn supported_actions(&self) -> &[&str] {
207 SUPPORTED
208 }
209}
210
211impl StepFunctionsService {
212 fn create_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
215 let body = req.json_body();
216
217 validate_required("name", &body["name"])?;
218 let name = body["name"].as_str().ok_or_else(|| missing("name"))?;
219 validate_name(name)?;
220
221 validate_required("definition", &body["definition"])?;
222 let definition = body["definition"]
223 .as_str()
224 .ok_or_else(|| missing("definition"))?;
225 validate_definition(definition)?;
226
227 validate_required("roleArn", &body["roleArn"])?;
228 let role_arn = body["roleArn"].as_str().ok_or_else(|| missing("roleArn"))?;
229 validate_arn(role_arn)?;
230
231 let machine_type = if let Some(t) = body["type"].as_str() {
232 StateMachineType::parse(t).ok_or_else(|| {
233 AwsServiceError::aws_error(
234 StatusCode::BAD_REQUEST,
235 "ValidationException",
236 format!(
237 "Value '{t}' at 'type' failed to satisfy constraint: \
238 Member must satisfy enum value set: [STANDARD, EXPRESS]"
239 ),
240 )
241 })?
242 } else {
243 StateMachineType::Standard
244 };
245
246 let mut accounts = self.state.write();
247 let state = accounts.get_or_create(&req.account_id);
248 let arn = state.state_machine_arn(name);
249
250 if state.state_machines.values().any(|sm| sm.name == name) {
252 return Err(AwsServiceError::aws_error(
253 StatusCode::CONFLICT,
254 "StateMachineAlreadyExists",
255 format!("State Machine Already Exists: '{arn}'"),
256 ));
257 }
258
259 let now = Utc::now();
260 let revision_id = uuid::Uuid::new_v4().to_string();
261
262 let mut tags = HashMap::new();
263 if !body["tags"].is_null() {
264 fakecloud_core::tags::apply_tags(&mut tags, &body, "tags", "key", "value").map_err(
265 |f| {
266 AwsServiceError::aws_error(
267 StatusCode::BAD_REQUEST,
268 "ValidationException",
269 format!("{f} must be a list"),
270 )
271 },
272 )?;
273 }
274
275 let sm = StateMachine {
276 name: name.to_string(),
277 arn: arn.clone(),
278 definition: definition.to_string(),
279 role_arn: role_arn.to_string(),
280 machine_type,
281 status: StateMachineStatus::Active,
282 creation_date: now,
283 update_date: now,
284 tags,
285 revision_id: revision_id.clone(),
286 logging_configuration: body.get("loggingConfiguration").cloned(),
287 tracing_configuration: body.get("tracingConfiguration").cloned(),
288 description: body["description"].as_str().unwrap_or("").to_string(),
289 };
290
291 state.state_machines.insert(arn.clone(), sm);
292
293 Ok(AwsResponse::ok_json(json!({
294 "stateMachineArn": arn,
295 "creationDate": now.timestamp() as f64,
296 "stateMachineVersionArn": arn,
297 })))
298 }
299
300 fn describe_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
301 let body = req.json_body();
302 validate_required("stateMachineArn", &body["stateMachineArn"])?;
303 let arn = body["stateMachineArn"]
304 .as_str()
305 .ok_or_else(|| missing("stateMachineArn"))?;
306 validate_arn(arn)?;
307
308 let accounts = self.state.read();
309 let empty = StepFunctionsState::new(&req.account_id, &req.region);
310 let state = accounts.get(&req.account_id).unwrap_or(&empty);
311 let sm = state
312 .state_machines
313 .get(arn)
314 .ok_or_else(|| state_machine_not_found(arn))?;
315
316 Ok(AwsResponse::ok_json(state_machine_to_json(sm)))
317 }
318
319 fn list_state_machines(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
320 let body = req.json_body();
321 let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
322 validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
323 let next_token = body["nextToken"].as_str();
324
325 let accounts = self.state.read();
326 let empty = StepFunctionsState::new(&req.account_id, &req.region);
327 let state = accounts.get(&req.account_id).unwrap_or(&empty);
328 let mut machines: Vec<&StateMachine> = state.state_machines.values().collect();
329 machines.sort_by(|a, b| a.name.cmp(&b.name));
330
331 let items: Vec<Value> = machines
332 .iter()
333 .map(|sm| {
334 json!({
335 "name": sm.name,
336 "stateMachineArn": sm.arn,
337 "type": sm.machine_type.as_str(),
338 "creationDate": sm.creation_date.timestamp() as f64,
339 })
340 })
341 .collect();
342
343 let (page, token) = paginate(&items, next_token, max_results);
344
345 let mut resp = json!({ "stateMachines": page });
346 if let Some(t) = token {
347 resp["nextToken"] = json!(t);
348 }
349 Ok(AwsResponse::ok_json(resp))
350 }
351
352 fn delete_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
353 let body = req.json_body();
354 validate_required("stateMachineArn", &body["stateMachineArn"])?;
355 let arn = body["stateMachineArn"]
356 .as_str()
357 .ok_or_else(|| missing("stateMachineArn"))?;
358 validate_arn(arn)?;
359
360 let mut accounts = self.state.write();
361 let state = accounts.get_or_create(&req.account_id);
362 state.state_machines.remove(arn);
364
365 Ok(AwsResponse::ok_json(json!({})))
366 }
367
368 fn update_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
369 let body = req.json_body();
370 validate_required("stateMachineArn", &body["stateMachineArn"])?;
371 let arn = body["stateMachineArn"]
372 .as_str()
373 .ok_or_else(|| missing("stateMachineArn"))?;
374 validate_arn(arn)?;
375
376 let mut accounts = self.state.write();
377 let state = accounts.get_or_create(&req.account_id);
378 let sm = state
379 .state_machines
380 .get_mut(arn)
381 .ok_or_else(|| state_machine_not_found(arn))?;
382
383 if let Some(definition) = body["definition"].as_str() {
384 validate_definition(definition)?;
385 sm.definition = definition.to_string();
386 }
387
388 if let Some(role_arn) = body["roleArn"].as_str() {
389 validate_arn(role_arn)?;
390 sm.role_arn = role_arn.to_string();
391 }
392
393 if let Some(logging) = body.get("loggingConfiguration") {
394 sm.logging_configuration = Some(logging.clone());
395 }
396
397 if let Some(tracing) = body.get("tracingConfiguration") {
398 sm.tracing_configuration = Some(tracing.clone());
399 }
400
401 if let Some(description) = body["description"].as_str() {
402 sm.description = description.to_string();
403 }
404
405 let now = Utc::now();
406 sm.update_date = now;
407 sm.revision_id = uuid::Uuid::new_v4().to_string();
408
409 let revision_id = sm.revision_id.clone();
410 let sm_arn = sm.arn.clone();
411
412 Ok(AwsResponse::ok_json(json!({
413 "updateDate": now.timestamp() as f64,
414 "revisionId": revision_id,
415 "stateMachineVersionArn": sm_arn,
416 })))
417 }
418
419 fn start_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
422 let body = req.json_body();
423 validate_required("stateMachineArn", &body["stateMachineArn"])?;
424 let sm_arn = body["stateMachineArn"]
425 .as_str()
426 .ok_or_else(|| missing("stateMachineArn"))?;
427 validate_arn(sm_arn)?;
428
429 let input = body["input"].as_str().map(|s| s.to_string());
430
431 if let Some(ref input_str) = input {
433 let _: serde_json::Value = serde_json::from_str(input_str).map_err(|_| {
434 AwsServiceError::aws_error(
435 StatusCode::BAD_REQUEST,
436 "InvalidExecutionInput",
437 "Invalid execution input: must be valid JSON".to_string(),
438 )
439 })?;
440 }
441
442 let execution_name = body["name"]
443 .as_str()
444 .map(|s| s.to_string())
445 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
446
447 if let Some(name) = body["name"].as_str() {
448 validate_name(name)?;
449 }
450
451 let mut accounts = self.state.write();
452 let state = accounts.get_or_create(&req.account_id);
453 let sm = state
454 .state_machines
455 .get(sm_arn)
456 .ok_or_else(|| state_machine_not_found(sm_arn))?;
457
458 let sm_name = sm.name.clone();
459 let definition = sm.definition.clone();
460 let exec_arn = state.execution_arn(&sm_name, &execution_name);
461
462 if state.executions.contains_key(&exec_arn) {
464 return Err(AwsServiceError::aws_error(
465 StatusCode::CONFLICT,
466 "ExecutionAlreadyExists",
467 format!("Execution Already Exists: '{exec_arn}'"),
468 ));
469 }
470
471 let now = Utc::now();
472 let execution = Execution {
473 execution_arn: exec_arn.clone(),
474 state_machine_arn: sm_arn.to_string(),
475 state_machine_name: sm_name,
476 name: execution_name,
477 status: ExecutionStatus::Running,
478 input: input.clone(),
479 output: None,
480 start_date: now,
481 stop_date: None,
482 error: None,
483 cause: None,
484 history_events: vec![],
485 };
486
487 state.executions.insert(exec_arn.clone(), execution);
488 drop(accounts);
489
490 let shared_state = self.state.clone();
492 let exec_arn_clone = exec_arn.clone();
493 let input_clone = input;
494 let delivery = self.delivery.clone();
495 let dynamodb_state = self.dynamodb_state.clone();
496 tokio::spawn(async move {
497 interpreter::execute_state_machine(
498 shared_state,
499 exec_arn_clone,
500 definition,
501 input_clone,
502 delivery,
503 dynamodb_state,
504 )
505 .await;
506 });
507
508 Ok(AwsResponse::ok_json(json!({
509 "executionArn": exec_arn,
510 "startDate": now.timestamp() as f64,
511 })))
512 }
513
514 fn stop_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
515 let body = req.json_body();
516 validate_required("executionArn", &body["executionArn"])?;
517 let exec_arn = body["executionArn"]
518 .as_str()
519 .ok_or_else(|| missing("executionArn"))?;
520
521 let error = body["error"].as_str().map(|s| s.to_string());
522 let cause = body["cause"].as_str().map(|s| s.to_string());
523
524 let mut accounts = self.state.write();
525 let state = accounts.get_or_create(&req.account_id);
526 let exec = state
527 .executions
528 .get_mut(exec_arn)
529 .ok_or_else(|| execution_not_found(exec_arn))?;
530
531 if exec.status != ExecutionStatus::Running {
532 return Err(AwsServiceError::aws_error(
533 StatusCode::BAD_REQUEST,
534 "ExecutionNotRunning",
535 format!("Execution is not running: '{exec_arn}'"),
536 ));
537 }
538
539 let now = Utc::now();
540 exec.status = ExecutionStatus::Aborted;
541 exec.stop_date = Some(now);
542 exec.error = error;
543 exec.cause = cause;
544
545 Ok(AwsResponse::ok_json(json!({
546 "stopDate": now.timestamp() as f64,
547 })))
548 }
549
550 fn describe_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
551 let body = req.json_body();
552 validate_required("executionArn", &body["executionArn"])?;
553 let exec_arn = body["executionArn"]
554 .as_str()
555 .ok_or_else(|| missing("executionArn"))?;
556
557 let accounts = self.state.read();
558 let empty = StepFunctionsState::new(&req.account_id, &req.region);
559 let state = accounts.get(&req.account_id).unwrap_or(&empty);
560 let exec = state
561 .executions
562 .get(exec_arn)
563 .ok_or_else(|| execution_not_found(exec_arn))?;
564
565 Ok(AwsResponse::ok_json(execution_to_json(exec)))
566 }
567
568 fn list_executions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
569 let body = req.json_body();
570 validate_required("stateMachineArn", &body["stateMachineArn"])?;
571 let sm_arn = body["stateMachineArn"]
572 .as_str()
573 .ok_or_else(|| missing("stateMachineArn"))?;
574 validate_arn(sm_arn)?;
575
576 let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
577 validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
578 let next_token = body["nextToken"].as_str();
579 let status_filter = body["statusFilter"].as_str();
580
581 let accounts = self.state.read();
582 let empty = StepFunctionsState::new(&req.account_id, &req.region);
583 let state = accounts.get(&req.account_id).unwrap_or(&empty);
584
585 if !state.state_machines.contains_key(sm_arn) {
587 return Err(state_machine_not_found(sm_arn));
588 }
589
590 let mut executions: Vec<&Execution> = state
591 .executions
592 .values()
593 .filter(|e| e.state_machine_arn == sm_arn)
594 .filter(|e| {
595 status_filter
596 .map(|sf| e.status.as_str() == sf)
597 .unwrap_or(true)
598 })
599 .collect();
600
601 executions.sort_by_key(|e| std::cmp::Reverse(e.start_date));
603
604 let items: Vec<Value> = executions
605 .iter()
606 .map(|e| {
607 let mut item = json!({
608 "executionArn": e.execution_arn,
609 "stateMachineArn": e.state_machine_arn,
610 "name": e.name,
611 "status": e.status.as_str(),
612 "startDate": e.start_date.timestamp() as f64,
613 });
614 if let Some(stop) = e.stop_date {
615 item["stopDate"] = json!(stop.timestamp() as f64);
616 }
617 item
618 })
619 .collect();
620
621 let (page, token) = paginate(&items, next_token, max_results);
622
623 let mut resp = json!({ "executions": page });
624 if let Some(t) = token {
625 resp["nextToken"] = json!(t);
626 }
627 Ok(AwsResponse::ok_json(resp))
628 }
629
630 fn get_execution_history(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
631 let body = req.json_body();
632 validate_required("executionArn", &body["executionArn"])?;
633 let exec_arn = body["executionArn"]
634 .as_str()
635 .ok_or_else(|| missing("executionArn"))?;
636
637 let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
638 validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
639 let next_token = body["nextToken"].as_str();
640 let reverse_order = body["reverseOrder"].as_bool().unwrap_or(false);
641
642 let accounts = self.state.read();
643 let empty = StepFunctionsState::new(&req.account_id, &req.region);
644 let state = accounts.get(&req.account_id).unwrap_or(&empty);
645 let exec = state
646 .executions
647 .get(exec_arn)
648 .ok_or_else(|| execution_not_found(exec_arn))?;
649
650 let mut events: Vec<Value> = exec
651 .history_events
652 .iter()
653 .map(|e| {
654 json!({
655 "id": e.id,
656 "type": e.event_type,
657 "timestamp": e.timestamp.timestamp() as f64,
658 "previousEventId": e.previous_event_id,
659 format!("{}EventDetails", camel_to_details_key(&e.event_type)): e.details,
660 })
661 })
662 .collect();
663
664 if reverse_order {
665 events.reverse();
666 }
667
668 let (page, token) = paginate(&events, next_token, max_results);
669
670 let mut resp = json!({ "events": page });
671 if let Some(t) = token {
672 resp["nextToken"] = json!(t);
673 }
674 Ok(AwsResponse::ok_json(resp))
675 }
676
677 fn describe_state_machine_for_execution(
678 &self,
679 req: &AwsRequest,
680 ) -> Result<AwsResponse, AwsServiceError> {
681 let body = req.json_body();
682 validate_required("executionArn", &body["executionArn"])?;
683 let exec_arn = body["executionArn"]
684 .as_str()
685 .ok_or_else(|| missing("executionArn"))?;
686
687 let accounts = self.state.read();
688 let empty = StepFunctionsState::new(&req.account_id, &req.region);
689 let state = accounts.get(&req.account_id).unwrap_or(&empty);
690 let exec = state
691 .executions
692 .get(exec_arn)
693 .ok_or_else(|| execution_not_found(exec_arn))?;
694
695 let sm = state
696 .state_machines
697 .get(&exec.state_machine_arn)
698 .ok_or_else(|| state_machine_not_found(&exec.state_machine_arn))?;
699
700 Ok(AwsResponse::ok_json(state_machine_to_json(sm)))
701 }
702
703 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
706 let body = req.json_body();
707 validate_required("resourceArn", &body["resourceArn"])?;
708 let arn = body["resourceArn"]
709 .as_str()
710 .ok_or_else(|| missing("resourceArn"))?;
711 validate_arn(arn)?;
712 validate_required("tags", &body["tags"])?;
713
714 let mut accounts = self.state.write();
715 let state = accounts.get_or_create(&req.account_id);
716 let sm = state
717 .state_machines
718 .get_mut(arn)
719 .ok_or_else(|| resource_not_found(arn))?;
720
721 fakecloud_core::tags::apply_tags(&mut sm.tags, &body, "tags", "key", "value").map_err(
722 |f| {
723 AwsServiceError::aws_error(
724 StatusCode::BAD_REQUEST,
725 "ValidationException",
726 format!("{f} must be a list"),
727 )
728 },
729 )?;
730
731 Ok(AwsResponse::ok_json(json!({})))
732 }
733
734 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
735 let body = req.json_body();
736 validate_required("resourceArn", &body["resourceArn"])?;
737 let arn = body["resourceArn"]
738 .as_str()
739 .ok_or_else(|| missing("resourceArn"))?;
740 validate_arn(arn)?;
741 validate_required("tagKeys", &body["tagKeys"])?;
742
743 let mut accounts = self.state.write();
744 let state = accounts.get_or_create(&req.account_id);
745 let sm = state
746 .state_machines
747 .get_mut(arn)
748 .ok_or_else(|| resource_not_found(arn))?;
749
750 fakecloud_core::tags::remove_tags(&mut sm.tags, &body, "tagKeys").map_err(|f| {
751 AwsServiceError::aws_error(
752 StatusCode::BAD_REQUEST,
753 "ValidationException",
754 format!("{f} must be a list"),
755 )
756 })?;
757
758 Ok(AwsResponse::ok_json(json!({})))
759 }
760
761 fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
762 let body = req.json_body();
763 validate_required("resourceArn", &body["resourceArn"])?;
764 let arn = body["resourceArn"]
765 .as_str()
766 .ok_or_else(|| missing("resourceArn"))?;
767 validate_arn(arn)?;
768
769 let accounts = self.state.read();
770 let empty = StepFunctionsState::new(&req.account_id, &req.region);
771 let state = accounts.get(&req.account_id).unwrap_or(&empty);
772 let sm = state
773 .state_machines
774 .get(arn)
775 .ok_or_else(|| resource_not_found(arn))?;
776
777 let tags = fakecloud_core::tags::tags_to_json(&sm.tags, "key", "value");
778
779 Ok(AwsResponse::ok_json(json!({ "tags": tags })))
780 }
781
782 fn create_activity(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
785 let body = req.json_body();
786 let name = body["name"].as_str().ok_or_else(|| missing("name"))?;
787 validate_name(name)?;
788 let mut accounts = self.state.write();
789 let state = accounts.get_or_create(&req.account_id);
790 let arn = format!(
791 "arn:aws:states:{}:{}:activity:{}",
792 state.region, state.account_id, name
793 );
794 if state.activities.contains_key(&arn) {
795 return Err(AwsServiceError::aws_error(
796 StatusCode::BAD_REQUEST,
797 "ActivityAlreadyExists",
798 format!("Activity already exists: {arn}"),
799 ));
800 }
801 let activity = crate::state::Activity {
802 name: name.to_string(),
803 arn: arn.clone(),
804 creation_date: chrono::Utc::now(),
805 tags: HashMap::new(),
806 };
807 state.activities.insert(arn.clone(), activity.clone());
808 Ok(AwsResponse::ok_json(json!({
809 "activityArn": arn,
810 "creationDate": activity.creation_date.timestamp(),
811 })))
812 }
813
814 fn delete_activity(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
815 let body = req.json_body();
816 let arn = body["activityArn"]
817 .as_str()
818 .ok_or_else(|| missing("activityArn"))?
819 .to_string();
820 let mut accounts = self.state.write();
821 let state = accounts.get_or_create(&req.account_id);
822 state.activities.remove(&arn);
823 Ok(AwsResponse::ok_json(json!({})))
824 }
825
826 fn describe_activity(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
827 let body = req.json_body();
828 let arn = body["activityArn"]
829 .as_str()
830 .ok_or_else(|| missing("activityArn"))?
831 .to_string();
832 let accounts = self.state.read();
833 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
834 let state = accounts.get(&req.account_id).unwrap_or(&empty);
835 let a = state.activities.get(&arn).ok_or_else(|| {
836 AwsServiceError::aws_error(
837 StatusCode::BAD_REQUEST,
838 "ActivityDoesNotExist",
839 format!("Activity does not exist: {arn}"),
840 )
841 })?;
842 Ok(AwsResponse::ok_json(json!({
843 "activityArn": a.arn,
844 "name": a.name,
845 "creationDate": a.creation_date.timestamp(),
846 })))
847 }
848
849 fn list_activities(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
850 let accounts = self.state.read();
851 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
852 let state = accounts.get(&req.account_id).unwrap_or(&empty);
853 let mut activities: Vec<&crate::state::Activity> = state.activities.values().collect();
854 activities.sort_by(|a, b| a.name.cmp(&b.name));
855 let body = json!({
856 "activities": activities.iter().map(|a| json!({
857 "activityArn": a.arn,
858 "name": a.name,
859 "creationDate": a.creation_date.timestamp(),
860 })).collect::<Vec<_>>(),
861 });
862 Ok(AwsResponse::ok_json(body))
863 }
864
865 fn get_activity_task(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
866 let body = req.json_body();
867 let arn = body["activityArn"]
868 .as_str()
869 .ok_or_else(|| missing("activityArn"))?
870 .to_string();
871 let mut accounts = self.state.write();
872 let state = accounts.get_or_create(&req.account_id);
873 if !state.activities.contains_key(&arn) {
874 return Err(AwsServiceError::aws_error(
875 StatusCode::BAD_REQUEST,
876 "ActivityDoesNotExist",
877 format!("Activity does not exist: {arn}"),
878 ));
879 }
880 let token = format!(
883 "FCToken-{}",
884 chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0)
885 );
886 state.task_tokens.insert(
887 token.clone(),
888 crate::state::TaskTokenState {
889 activity_arn: arn,
890 status: "PENDING".to_string(),
891 output: None,
892 error: None,
893 cause: None,
894 },
895 );
896 Ok(AwsResponse::ok_json(json!({
897 "taskToken": token,
898 "input": "{}",
899 })))
900 }
901
902 fn send_task_success(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
903 self.update_task_token(req, "SUCCEEDED")
904 }
905
906 fn send_task_failure(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
907 self.update_task_token(req, "FAILED")
908 }
909
910 fn send_task_heartbeat(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
911 self.update_task_token(req, "HEARTBEAT")
912 }
913
914 fn update_task_token(
915 &self,
916 req: &AwsRequest,
917 new_status: &str,
918 ) -> Result<AwsResponse, AwsServiceError> {
919 let body = req.json_body();
920 let token = body["taskToken"]
921 .as_str()
922 .ok_or_else(|| missing("taskToken"))?
923 .to_string();
924 let mut accounts = self.state.write();
925 let state = accounts.get_or_create(&req.account_id);
926 let entry = state.task_tokens.get_mut(&token).ok_or_else(|| {
927 AwsServiceError::aws_error(
928 StatusCode::BAD_REQUEST,
929 "TaskDoesNotExist",
930 format!("Task does not exist: {token}"),
931 )
932 })?;
933 entry.status = new_status.to_string();
934 if new_status == "SUCCEEDED" {
935 entry.output = body["output"].as_str().map(String::from);
936 } else if new_status == "FAILED" {
937 entry.error = body["error"].as_str().map(String::from);
938 entry.cause = body["cause"].as_str().map(String::from);
939 }
940 Ok(AwsResponse::ok_json(json!({})))
941 }
942
943 fn publish_state_machine_version(
946 &self,
947 req: &AwsRequest,
948 ) -> Result<AwsResponse, AwsServiceError> {
949 let body = req.json_body();
950 let arn = body["stateMachineArn"]
951 .as_str()
952 .ok_or_else(|| missing("stateMachineArn"))?
953 .to_string();
954 let description = body["description"].as_str().unwrap_or("").to_string();
955 let mut accounts = self.state.write();
956 let state = accounts.get_or_create(&req.account_id);
957 if !state.state_machines.contains_key(&arn) {
958 return Err(state_machine_not_found(&arn));
959 }
960 let version = state
961 .state_machine_versions
962 .values()
963 .filter(|v| v.state_machine_arn == arn)
964 .map(|v| v.version)
965 .max()
966 .unwrap_or(0)
967 + 1;
968 let version_arn = format!("{arn}:{version}");
969 let v = crate::state::StateMachineVersion {
970 state_machine_arn: arn,
971 version,
972 revision_id: format!("rev-{version}"),
973 description,
974 creation_date: chrono::Utc::now(),
975 };
976 state
977 .state_machine_versions
978 .insert(version_arn.clone(), v.clone());
979 Ok(AwsResponse::ok_json(json!({
980 "stateMachineVersionArn": version_arn,
981 "creationDate": v.creation_date.timestamp(),
982 })))
983 }
984
985 fn delete_state_machine_version(
986 &self,
987 req: &AwsRequest,
988 ) -> Result<AwsResponse, AwsServiceError> {
989 let body = req.json_body();
990 let arn = body["stateMachineVersionArn"]
991 .as_str()
992 .ok_or_else(|| missing("stateMachineVersionArn"))?
993 .to_string();
994 let mut accounts = self.state.write();
995 let state = accounts.get_or_create(&req.account_id);
996 state.state_machine_versions.remove(&arn);
997 Ok(AwsResponse::ok_json(json!({})))
998 }
999
1000 fn list_state_machine_versions(
1001 &self,
1002 req: &AwsRequest,
1003 ) -> Result<AwsResponse, AwsServiceError> {
1004 let body = req.json_body();
1005 let arn = body["stateMachineArn"]
1006 .as_str()
1007 .ok_or_else(|| missing("stateMachineArn"))?
1008 .to_string();
1009 let accounts = self.state.read();
1010 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1011 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1012 let mut versions: Vec<&crate::state::StateMachineVersion> = state
1013 .state_machine_versions
1014 .values()
1015 .filter(|v| v.state_machine_arn == arn)
1016 .collect();
1017 versions.sort_by_key(|v| std::cmp::Reverse(v.version));
1018 let resp = json!({
1019 "stateMachineVersions": versions.iter().map(|v| json!({
1020 "stateMachineVersionArn": format!("{}:{}", v.state_machine_arn, v.version),
1021 "creationDate": v.creation_date.timestamp(),
1022 })).collect::<Vec<_>>(),
1023 });
1024 Ok(AwsResponse::ok_json(resp))
1025 }
1026
1027 fn create_state_machine_alias(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1028 let body = req.json_body();
1029 let name = body["name"]
1030 .as_str()
1031 .ok_or_else(|| missing("name"))?
1032 .to_string();
1033 validate_name(&name)?;
1034 let routing_cfg = body["routingConfiguration"]
1035 .as_array()
1036 .ok_or_else(|| missing("routingConfiguration"))?;
1037 let routes = parse_routing_configuration(routing_cfg)?;
1038 let parent_arn = routes[0]
1039 .state_machine_version_arn
1040 .rsplit_once(':')
1041 .map(|(parent, _)| parent.to_string())
1042 .unwrap_or_default();
1043 let alias_arn = format!("{parent_arn}:{name}");
1044 let now = chrono::Utc::now();
1045 let alias = crate::state::StateMachineAlias {
1046 name,
1047 arn: alias_arn.clone(),
1048 description: body["description"].as_str().unwrap_or("").to_string(),
1049 routing_configuration: routes,
1050 creation_date: now,
1051 update_date: now,
1052 };
1053 let mut accounts = self.state.write();
1054 let state = accounts.get_or_create(&req.account_id);
1055 state.state_machine_aliases.insert(alias_arn.clone(), alias);
1056 Ok(AwsResponse::ok_json(json!({
1057 "stateMachineAliasArn": alias_arn,
1058 "creationDate": now.timestamp(),
1059 })))
1060 }
1061
1062 fn delete_state_machine_alias(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1063 let body = req.json_body();
1064 let arn = body["stateMachineAliasArn"]
1065 .as_str()
1066 .ok_or_else(|| missing("stateMachineAliasArn"))?
1067 .to_string();
1068 let mut accounts = self.state.write();
1069 let state = accounts.get_or_create(&req.account_id);
1070 state.state_machine_aliases.remove(&arn);
1071 Ok(AwsResponse::ok_json(json!({})))
1072 }
1073
1074 fn describe_state_machine_alias(
1075 &self,
1076 req: &AwsRequest,
1077 ) -> Result<AwsResponse, AwsServiceError> {
1078 let body = req.json_body();
1079 let arn = body["stateMachineAliasArn"]
1080 .as_str()
1081 .ok_or_else(|| missing("stateMachineAliasArn"))?
1082 .to_string();
1083 let accounts = self.state.read();
1084 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1085 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1086 let alias = state
1087 .state_machine_aliases
1088 .get(&arn)
1089 .ok_or_else(|| resource_not_found(&arn))?;
1090 Ok(AwsResponse::ok_json(state_machine_alias_to_json(alias)))
1091 }
1092
1093 fn list_state_machine_aliases(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1094 let body = req.json_body();
1095 let parent = body["stateMachineArn"]
1096 .as_str()
1097 .ok_or_else(|| missing("stateMachineArn"))?
1098 .to_string();
1099 let accounts = self.state.read();
1100 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1101 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1102 let parent_prefix = format!("{parent}:");
1105 let mut aliases: Vec<&crate::state::StateMachineAlias> = state
1106 .state_machine_aliases
1107 .values()
1108 .filter(|a| a.arn.starts_with(&parent_prefix))
1109 .collect();
1110 aliases.sort_by(|a, b| a.name.cmp(&b.name));
1111 Ok(AwsResponse::ok_json(json!({
1112 "stateMachineAliases": aliases.iter().map(|a| json!({
1113 "stateMachineAliasArn": a.arn,
1114 "creationDate": a.creation_date.timestamp(),
1115 })).collect::<Vec<_>>(),
1116 })))
1117 }
1118
1119 fn update_state_machine_alias(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1120 let body = req.json_body();
1121 let arn = body["stateMachineAliasArn"]
1122 .as_str()
1123 .ok_or_else(|| missing("stateMachineAliasArn"))?
1124 .to_string();
1125 let mut accounts = self.state.write();
1126 let state = accounts.get_or_create(&req.account_id);
1127 let alias = state
1128 .state_machine_aliases
1129 .get_mut(&arn)
1130 .ok_or_else(|| resource_not_found(&arn))?;
1131 if let Some(d) = body["description"].as_str() {
1132 alias.description = d.to_string();
1133 }
1134 if let Some(routes) = body["routingConfiguration"].as_array() {
1135 alias.routing_configuration = parse_routing_configuration(routes)?;
1136 }
1137 alias.update_date = chrono::Utc::now();
1138 Ok(AwsResponse::ok_json(json!({
1139 "updateDate": alias.update_date.timestamp(),
1140 })))
1141 }
1142
1143 fn describe_map_run(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1146 let body = req.json_body();
1147 let arn = body["mapRunArn"]
1148 .as_str()
1149 .ok_or_else(|| missing("mapRunArn"))?
1150 .to_string();
1151 let accounts = self.state.read();
1152 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1153 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1154 let mr = state
1155 .map_runs
1156 .get(&arn)
1157 .ok_or_else(|| resource_not_found(&arn))?;
1158 Ok(AwsResponse::ok_json(map_run_to_json(mr)))
1159 }
1160
1161 fn list_map_runs(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1162 let body = req.json_body();
1163 let exec_arn = body["executionArn"].as_str().map(String::from);
1164 let accounts = self.state.read();
1165 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1166 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1167 let runs: Vec<&crate::state::MapRun> = state
1168 .map_runs
1169 .values()
1170 .filter(|r| exec_arn.as_deref().is_none_or(|e| r.execution_arn == e))
1171 .collect();
1172 Ok(AwsResponse::ok_json(json!({
1173 "mapRuns": runs.iter().map(|r| json!({
1174 "mapRunArn": r.map_run_arn,
1175 "executionArn": r.execution_arn,
1176 "stateMachineArn": "",
1177 "startDate": r.start_date.timestamp(),
1178 "stopDate": r.stop_date.map(|d| d.timestamp()),
1179 })).collect::<Vec<_>>(),
1180 })))
1181 }
1182
1183 fn update_map_run(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1184 let body = req.json_body();
1185 let arn = body["mapRunArn"]
1186 .as_str()
1187 .ok_or_else(|| missing("mapRunArn"))?
1188 .to_string();
1189 let mut accounts = self.state.write();
1190 let state = accounts.get_or_create(&req.account_id);
1191 let mr = state
1192 .map_runs
1193 .get_mut(&arn)
1194 .ok_or_else(|| resource_not_found(&arn))?;
1195 if let Some(c) = body["maxConcurrency"].as_i64() {
1196 mr.max_concurrency = c as i32;
1197 }
1198 if let Some(p) = body["toleratedFailurePercentage"].as_f64() {
1199 mr.tolerated_failure_percentage = p;
1200 }
1201 if let Some(c) = body["toleratedFailureCount"].as_i64() {
1202 mr.tolerated_failure_count = c;
1203 }
1204 Ok(AwsResponse::ok_json(json!({})))
1205 }
1206
1207 fn redrive_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1210 let body = req.json_body();
1211 let arn = body["executionArn"]
1212 .as_str()
1213 .ok_or_else(|| missing("executionArn"))?
1214 .to_string();
1215 let mut accounts = self.state.write();
1216 let state = accounts.get_or_create(&req.account_id);
1217 let exec = state.executions.get_mut(&arn).ok_or_else(|| {
1218 AwsServiceError::aws_error(
1219 StatusCode::BAD_REQUEST,
1220 "ExecutionDoesNotExist",
1221 format!("Execution does not exist: {arn}"),
1222 )
1223 })?;
1224 exec.status = crate::state::ExecutionStatus::Running;
1225 exec.stop_date = None;
1226 Ok(AwsResponse::ok_json(json!({
1227 "redriveDate": chrono::Utc::now().timestamp(),
1228 })))
1229 }
1230
1231 fn start_sync_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1232 let body = req.json_body();
1233 let sm_arn = body["stateMachineArn"]
1234 .as_str()
1235 .ok_or_else(|| missing("stateMachineArn"))?
1236 .to_string();
1237 let input = body["input"].as_str().unwrap_or("{}").to_string();
1238 if serde_json::from_str::<serde_json::Value>(&input).is_err() {
1239 return Err(AwsServiceError::aws_error(
1240 StatusCode::BAD_REQUEST,
1241 "InvalidExecutionInput",
1242 "Execution input is not valid JSON.",
1243 ));
1244 }
1245 let mut accounts = self.state.write();
1246 let state = accounts.get_or_create(&req.account_id);
1247 let sm = state
1248 .state_machines
1249 .get(&sm_arn)
1250 .ok_or_else(|| state_machine_not_found(&sm_arn))?;
1251 if sm.machine_type != crate::state::StateMachineType::Express {
1252 return Err(AwsServiceError::aws_error(
1253 StatusCode::BAD_REQUEST,
1254 "StateMachineTypeNotSupported",
1255 "StartSyncExecution is only supported for EXPRESS state machines.",
1256 ));
1257 }
1258 let now = chrono::Utc::now();
1259 let exec_arn = format!(
1260 "arn:aws:states:{}:{}:express:{}:sync-{}",
1261 state.region,
1262 state.account_id,
1263 sm.name,
1264 now.timestamp_millis()
1265 );
1266 Ok(AwsResponse::ok_json(json!({
1267 "executionArn": exec_arn,
1268 "stateMachineArn": sm_arn,
1269 "name": "sync",
1270 "startDate": now.timestamp(),
1271 "stopDate": now.timestamp(),
1272 "status": "SUCCEEDED",
1273 "input": input,
1274 "output": "{}",
1275 "billingDetails": {
1276 "billedMemoryUsedInMB": 64,
1277 "billedDurationInMilliseconds": 1,
1278 },
1279 })))
1280 }
1281
1282 fn test_state(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1283 let body = req.json_body();
1284 let definition = body["definition"]
1285 .as_str()
1286 .ok_or_else(|| missing("definition"))?;
1287 validate_definition(definition)?;
1288 let _role_arn = body["roleArn"].as_str().ok_or_else(|| missing("roleArn"))?;
1289 let input = body["input"].as_str().unwrap_or("{}").to_string();
1290 Ok(AwsResponse::ok_json(json!({
1294 "output": input,
1295 "status": "SUCCEEDED",
1296 "nextState": "End",
1297 })))
1298 }
1299
1300 fn validate_state_machine_definition(
1301 &self,
1302 req: &AwsRequest,
1303 ) -> Result<AwsResponse, AwsServiceError> {
1304 let body = req.json_body();
1305 let definition = body["definition"]
1306 .as_str()
1307 .ok_or_else(|| missing("definition"))?;
1308 match validate_definition(definition) {
1309 Ok(()) => Ok(AwsResponse::ok_json(json!({
1310 "result": "OK",
1311 "diagnostics": [],
1312 }))),
1313 Err(e) => Ok(AwsResponse::ok_json(json!({
1314 "result": "FAIL",
1315 "diagnostics": [{
1316 "severity": "ERROR",
1317 "code": "INVALID_DEFINITION",
1318 "message": e.to_string(),
1319 }],
1320 }))),
1321 }
1322 }
1323}
1324
1325fn state_machine_alias_to_json(alias: &crate::state::StateMachineAlias) -> Value {
1326 json!({
1327 "stateMachineAliasArn": alias.arn,
1328 "name": alias.name,
1329 "description": alias.description,
1330 "routingConfiguration": alias.routing_configuration.iter().map(|r| json!({
1331 "stateMachineVersionArn": r.state_machine_version_arn,
1332 "weight": r.weight,
1333 })).collect::<Vec<_>>(),
1334 "creationDate": alias.creation_date.timestamp(),
1335 "updateDate": alias.update_date.timestamp(),
1336 })
1337}
1338
1339fn map_run_to_json(mr: &crate::state::MapRun) -> Value {
1340 json!({
1341 "mapRunArn": mr.map_run_arn,
1342 "executionArn": mr.execution_arn,
1343 "maxConcurrency": mr.max_concurrency,
1344 "toleratedFailurePercentage": mr.tolerated_failure_percentage,
1345 "toleratedFailureCount": mr.tolerated_failure_count,
1346 "status": mr.status,
1347 "startDate": mr.start_date.timestamp(),
1348 "stopDate": mr.stop_date.map(|d| d.timestamp()),
1349 })
1350}
1351
1352fn state_machine_to_json(sm: &StateMachine) -> Value {
1355 let mut resp = json!({
1356 "name": sm.name,
1357 "stateMachineArn": sm.arn,
1358 "definition": sm.definition,
1359 "roleArn": sm.role_arn,
1360 "type": sm.machine_type.as_str(),
1361 "status": sm.status.as_str(),
1362 "creationDate": sm.creation_date.timestamp() as f64,
1363 "updateDate": sm.update_date.timestamp() as f64,
1364 "revisionId": sm.revision_id,
1365 "label": sm.name,
1366 });
1367
1368 if !sm.description.is_empty() {
1369 resp["description"] = json!(sm.description);
1370 }
1371
1372 if let Some(ref logging) = sm.logging_configuration {
1373 resp["loggingConfiguration"] = logging.clone();
1374 } else {
1375 resp["loggingConfiguration"] = json!({
1376 "level": "OFF",
1377 "includeExecutionData": false,
1378 "destinations": [],
1379 });
1380 }
1381
1382 if let Some(ref tracing) = sm.tracing_configuration {
1383 resp["tracingConfiguration"] = tracing.clone();
1384 } else {
1385 resp["tracingConfiguration"] = json!({
1386 "enabled": false,
1387 });
1388 }
1389
1390 resp
1391}
1392
1393fn missing(name: &str) -> AwsServiceError {
1394 AwsServiceError::aws_error(
1395 StatusCode::BAD_REQUEST,
1396 "ValidationException",
1397 format!("The request must contain the parameter {name}."),
1398 )
1399}
1400
1401fn state_machine_not_found(arn: &str) -> AwsServiceError {
1402 AwsServiceError::aws_error(
1403 StatusCode::BAD_REQUEST,
1404 "StateMachineDoesNotExist",
1405 format!("State Machine Does Not Exist: '{arn}'"),
1406 )
1407}
1408
1409fn resource_not_found(arn: &str) -> AwsServiceError {
1410 AwsServiceError::aws_error(
1411 StatusCode::BAD_REQUEST,
1412 "ResourceNotFound",
1413 format!("Resource not found: '{arn}'"),
1414 )
1415}
1416
1417fn parse_routing_configuration(
1422 routes: &[serde_json::Value],
1423) -> Result<Vec<crate::state::AliasRoute>, AwsServiceError> {
1424 if routes.is_empty() || routes.len() > 2 {
1425 return Err(AwsServiceError::aws_error(
1426 StatusCode::BAD_REQUEST,
1427 "ValidationException",
1428 "routingConfiguration must contain 1 or 2 routes.",
1429 ));
1430 }
1431 let parsed: Vec<crate::state::AliasRoute> = routes
1432 .iter()
1433 .map(|r| {
1434 let arn = r["stateMachineVersionArn"].as_str().ok_or_else(|| {
1435 AwsServiceError::aws_error(
1436 StatusCode::BAD_REQUEST,
1437 "ValidationException",
1438 "routingConfiguration entries must contain stateMachineVersionArn.",
1439 )
1440 })?;
1441 let weight = r["weight"].as_i64().ok_or_else(|| {
1442 AwsServiceError::aws_error(
1443 StatusCode::BAD_REQUEST,
1444 "ValidationException",
1445 "routingConfiguration entries must contain a numeric weight.",
1446 )
1447 })?;
1448 if !(0..=100).contains(&weight) {
1449 return Err(AwsServiceError::aws_error(
1450 StatusCode::BAD_REQUEST,
1451 "ValidationException",
1452 format!("Invalid routing weight {weight}; must be 0-100."),
1453 ));
1454 }
1455 Ok(crate::state::AliasRoute {
1456 state_machine_version_arn: arn.to_string(),
1457 weight: weight as i32,
1458 })
1459 })
1460 .collect::<Result<_, _>>()?;
1461 let total: i32 = parsed.iter().map(|r| r.weight).sum();
1462 if total != 100 {
1463 return Err(AwsServiceError::aws_error(
1464 StatusCode::BAD_REQUEST,
1465 "ValidationException",
1466 format!("routingConfiguration weights must sum to 100, got {total}."),
1467 ));
1468 }
1469 Ok(parsed)
1470}
1471
1472fn validate_name(name: &str) -> Result<(), AwsServiceError> {
1473 if name.is_empty() || name.len() > 80 {
1474 return Err(AwsServiceError::aws_error(
1475 StatusCode::BAD_REQUEST,
1476 "InvalidName",
1477 format!("Invalid Name: '{name}' (length must be between 1 and 80 characters)"),
1478 ));
1479 }
1480 if !name
1482 .chars()
1483 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
1484 {
1485 return Err(AwsServiceError::aws_error(
1486 StatusCode::BAD_REQUEST,
1487 "InvalidName",
1488 format!(
1489 "Invalid Name: '{name}' (must only contain alphanumeric characters, hyphens, and underscores)"
1490 ),
1491 ));
1492 }
1493 Ok(())
1494}
1495
1496fn validate_definition(definition: &str) -> Result<(), AwsServiceError> {
1497 let parsed: Value = serde_json::from_str(definition).map_err(|e| {
1498 AwsServiceError::aws_error(
1499 StatusCode::BAD_REQUEST,
1500 "InvalidDefinition",
1501 format!("Invalid State Machine Definition: '{e}'"),
1502 )
1503 })?;
1504
1505 if parsed.get("StartAt").and_then(|v| v.as_str()).is_none() {
1506 return Err(AwsServiceError::aws_error(
1507 StatusCode::BAD_REQUEST,
1508 "InvalidDefinition",
1509 "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
1510 .to_string(),
1511 ));
1512 }
1513
1514 let states_obj = parsed
1515 .get("States")
1516 .and_then(|v| v.as_object())
1517 .ok_or_else(|| {
1518 AwsServiceError::aws_error(
1519 StatusCode::BAD_REQUEST,
1520 "InvalidDefinition",
1521 "Invalid State Machine Definition: 'MISSING_STATES' (States field is required)"
1522 .to_string(),
1523 )
1524 })?;
1525
1526 let start_at = parsed["StartAt"].as_str().ok_or_else(|| {
1527 AwsServiceError::aws_error(
1528 StatusCode::BAD_REQUEST,
1529 "InvalidDefinition",
1530 "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
1531 .to_string(),
1532 )
1533 })?;
1534 if !states_obj.contains_key(start_at) {
1535 return Err(AwsServiceError::aws_error(
1536 StatusCode::BAD_REQUEST,
1537 "InvalidDefinition",
1538 format!(
1539 "Invalid State Machine Definition: 'MISSING_TRANSITION_TARGET' \
1540 (StartAt '{start_at}' does not reference a valid state)"
1541 ),
1542 ));
1543 }
1544
1545 Ok(())
1546}
1547
1548fn execution_not_found(arn: &str) -> AwsServiceError {
1549 AwsServiceError::aws_error(
1550 StatusCode::BAD_REQUEST,
1551 "ExecutionDoesNotExist",
1552 format!("Execution Does Not Exist: '{arn}'"),
1553 )
1554}
1555
1556fn execution_to_json(exec: &Execution) -> Value {
1557 let mut resp = json!({
1558 "executionArn": exec.execution_arn,
1559 "stateMachineArn": exec.state_machine_arn,
1560 "name": exec.name,
1561 "status": exec.status.as_str(),
1562 "startDate": exec.start_date.timestamp() as f64,
1563 });
1564
1565 if let Some(ref input) = exec.input {
1566 resp["input"] = json!(input);
1567 }
1568 if let Some(ref output) = exec.output {
1569 resp["output"] = json!(output);
1570 }
1571 if let Some(stop) = exec.stop_date {
1572 resp["stopDate"] = json!(stop.timestamp() as f64);
1573 }
1574 if let Some(ref error) = exec.error {
1575 resp["error"] = json!(error);
1576 }
1577 if let Some(ref cause) = exec.cause {
1578 resp["cause"] = json!(cause);
1579 }
1580
1581 resp
1582}
1583
1584fn camel_to_details_key(event_type: &str) -> String {
1586 let mut chars = event_type.chars();
1587 match chars.next() {
1588 None => String::new(),
1589 Some(c) => c.to_lowercase().to_string() + chars.as_str(),
1590 }
1591}
1592
1593fn validate_arn(arn: &str) -> Result<(), AwsServiceError> {
1594 if !arn.starts_with("arn:") {
1595 return Err(AwsServiceError::aws_error(
1596 StatusCode::BAD_REQUEST,
1597 "InvalidArn",
1598 format!("Invalid Arn: '{arn}'"),
1599 ));
1600 }
1601 Ok(())
1602}
1603
1604pub fn start_execution_from_delivery(
1613 state: &SharedStepFunctionsState,
1614 delivery: &Option<Arc<DeliveryBus>>,
1615 dynamodb_state: &Option<SharedDynamoDbState>,
1616 state_machine_arn: &str,
1617 input: &str,
1618) {
1619 if serde_json::from_str::<serde_json::Value>(input).is_err() {
1621 tracing::warn!(
1622 state_machine_arn,
1623 "Step Functions delivery: invalid JSON input, skipping execution"
1624 );
1625 return;
1626 }
1627
1628 let execution_name = uuid::Uuid::new_v4().to_string();
1629
1630 let account_id = state_machine_arn
1632 .split(':')
1633 .nth(4)
1634 .unwrap_or("000000000000")
1635 .to_string();
1636
1637 let mut accounts = state.write();
1638 let st = accounts.get_or_create(&account_id);
1639 let sm = match st.state_machines.get(state_machine_arn) {
1640 Some(sm) => sm,
1641 None => {
1642 tracing::warn!(
1643 state_machine_arn,
1644 "Step Functions delivery: state machine not found"
1645 );
1646 return;
1647 }
1648 };
1649
1650 let sm_name = sm.name.clone();
1651 let definition = sm.definition.clone();
1652 let exec_arn = st.execution_arn(&sm_name, &execution_name);
1653
1654 let now = Utc::now();
1655 let execution = Execution {
1656 execution_arn: exec_arn.clone(),
1657 state_machine_arn: state_machine_arn.to_string(),
1658 state_machine_name: sm_name,
1659 name: execution_name,
1660 status: ExecutionStatus::Running,
1661 input: Some(input.to_string()),
1662 output: None,
1663 start_date: now,
1664 stop_date: None,
1665 error: None,
1666 cause: None,
1667 history_events: vec![],
1668 };
1669
1670 st.executions.insert(exec_arn.clone(), execution);
1671 drop(accounts);
1672
1673 let shared_state = state.clone();
1674 let delivery = delivery.clone();
1675 let dynamodb_state = dynamodb_state.clone();
1676 let input = Some(input.to_string());
1677 tokio::spawn(async move {
1678 interpreter::execute_state_machine(
1679 shared_state,
1680 exec_arn,
1681 definition,
1682 input,
1683 delivery,
1684 dynamodb_state,
1685 )
1686 .await;
1687 });
1688}
1689
1690#[cfg(test)]
1691mod tests {
1692 use super::*;
1693 use http::{HeaderMap, Method};
1694 use parking_lot::RwLock;
1695 use serde_json::Value;
1696 use std::sync::Arc;
1697
1698 fn make_state() -> SharedStepFunctionsState {
1699 Arc::new(RwLock::new(
1700 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
1701 ))
1702 }
1703
1704 fn make_request(action: &str, body: &str) -> AwsRequest {
1705 AwsRequest {
1706 service: "states".to_string(),
1707 action: action.to_string(),
1708 region: "us-east-1".to_string(),
1709 account_id: "123456789012".to_string(),
1710 request_id: "test-id".to_string(),
1711 headers: HeaderMap::new(),
1712 query_params: HashMap::new(),
1713 body: body.as_bytes().to_vec().into(),
1714 path_segments: vec![],
1715 raw_path: "/".to_string(),
1716 raw_query: String::new(),
1717 method: Method::POST,
1718 is_query_protocol: false,
1719 access_key_id: None,
1720 principal: None,
1721 }
1722 }
1723
1724 fn body_json(resp: &AwsResponse) -> Value {
1725 serde_json::from_slice(resp.body.expect_bytes()).unwrap()
1726 }
1727
1728 fn expect_err(result: Result<AwsResponse, AwsServiceError>) -> AwsServiceError {
1729 match result {
1730 Err(e) => e,
1731 Ok(_) => panic!("expected error, got Ok"),
1732 }
1733 }
1734
1735 const VALID_DEF: &str = r#"{"StartAt":"Pass","States":{"Pass":{"Type":"Pass","End":true}}}"#;
1736
1737 fn create_sm(svc: &StepFunctionsService, name: &str) -> String {
1738 let body = json!({
1739 "name": name,
1740 "definition": VALID_DEF,
1741 "roleArn": "arn:aws:iam::123456789012:role/test",
1742 });
1743 let req = make_request("CreateStateMachine", &body.to_string());
1744 let resp = svc.create_state_machine(&req).unwrap();
1745 let b = body_json(&resp);
1746 b["stateMachineArn"].as_str().unwrap().to_string()
1747 }
1748
1749 #[test]
1752 fn create_state_machine_basic() {
1753 let svc = StepFunctionsService::new(make_state());
1754 let arn = create_sm(&svc, "test-sm");
1755 assert!(arn.contains("test-sm"));
1756 }
1757
1758 #[test]
1759 fn create_state_machine_with_express_type() {
1760 let svc = StepFunctionsService::new(make_state());
1761 let body = json!({
1762 "name": "express-sm",
1763 "definition": VALID_DEF,
1764 "roleArn": "arn:aws:iam::123456789012:role/r",
1765 "type": "EXPRESS",
1766 });
1767 let req = make_request("CreateStateMachine", &body.to_string());
1768 let resp = svc.create_state_machine(&req).unwrap();
1769 let b = body_json(&resp);
1770 assert!(b["stateMachineArn"].as_str().is_some());
1771 }
1772
1773 #[test]
1774 fn create_state_machine_duplicate_fails() {
1775 let svc = StepFunctionsService::new(make_state());
1776 create_sm(&svc, "dup-sm");
1777 let body = json!({
1778 "name": "dup-sm",
1779 "definition": VALID_DEF,
1780 "roleArn": "arn:aws:iam::123456789012:role/r",
1781 });
1782 let req = make_request("CreateStateMachine", &body.to_string());
1783 let err = expect_err(svc.create_state_machine(&req));
1784 assert!(err.to_string().contains("StateMachineAlreadyExists"));
1785 }
1786
1787 #[test]
1788 fn create_state_machine_missing_name() {
1789 let svc = StepFunctionsService::new(make_state());
1790 let body = json!({
1791 "definition": VALID_DEF,
1792 "roleArn": "arn:aws:iam::123456789012:role/r",
1793 });
1794 let req = make_request("CreateStateMachine", &body.to_string());
1795 assert!(svc.create_state_machine(&req).is_err());
1796 }
1797
1798 #[test]
1799 fn create_state_machine_invalid_definition() {
1800 let svc = StepFunctionsService::new(make_state());
1801 let body = json!({
1802 "name": "bad-def",
1803 "definition": "not json",
1804 "roleArn": "arn:aws:iam::123456789012:role/r",
1805 });
1806 let req = make_request("CreateStateMachine", &body.to_string());
1807 let err = expect_err(svc.create_state_machine(&req));
1808 assert!(err.to_string().contains("InvalidDefinition"));
1809 }
1810
1811 #[test]
1812 fn create_state_machine_definition_missing_start_at() {
1813 let svc = StepFunctionsService::new(make_state());
1814 let body = json!({
1815 "name": "no-start",
1816 "definition": r#"{"States":{"S":{"Type":"Pass","End":true}}}"#,
1817 "roleArn": "arn:aws:iam::123456789012:role/r",
1818 });
1819 let req = make_request("CreateStateMachine", &body.to_string());
1820 let err = expect_err(svc.create_state_machine(&req));
1821 assert!(err.to_string().contains("InvalidDefinition"));
1822 }
1823
1824 #[test]
1825 fn create_state_machine_definition_missing_states() {
1826 let svc = StepFunctionsService::new(make_state());
1827 let body = json!({
1828 "name": "no-states",
1829 "definition": r#"{"StartAt":"S"}"#,
1830 "roleArn": "arn:aws:iam::123456789012:role/r",
1831 });
1832 let req = make_request("CreateStateMachine", &body.to_string());
1833 let err = expect_err(svc.create_state_machine(&req));
1834 assert!(err.to_string().contains("InvalidDefinition"));
1835 }
1836
1837 #[test]
1838 fn create_state_machine_definition_start_at_not_in_states() {
1839 let svc = StepFunctionsService::new(make_state());
1840 let body = json!({
1841 "name": "bad-start",
1842 "definition": r#"{"StartAt":"Missing","States":{"S":{"Type":"Pass","End":true}}}"#,
1843 "roleArn": "arn:aws:iam::123456789012:role/r",
1844 });
1845 let req = make_request("CreateStateMachine", &body.to_string());
1846 let err = expect_err(svc.create_state_machine(&req));
1847 assert!(err.to_string().contains("MISSING_TRANSITION_TARGET"));
1848 }
1849
1850 #[test]
1851 fn create_state_machine_invalid_type() {
1852 let svc = StepFunctionsService::new(make_state());
1853 let body = json!({
1854 "name": "bad-type",
1855 "definition": VALID_DEF,
1856 "roleArn": "arn:aws:iam::123456789012:role/r",
1857 "type": "INVALID",
1858 });
1859 let req = make_request("CreateStateMachine", &body.to_string());
1860 assert!(svc.create_state_machine(&req).is_err());
1861 }
1862
1863 #[test]
1864 fn create_state_machine_invalid_arn() {
1865 let svc = StepFunctionsService::new(make_state());
1866 let body = json!({
1867 "name": "bad-arn",
1868 "definition": VALID_DEF,
1869 "roleArn": "not-an-arn",
1870 });
1871 let req = make_request("CreateStateMachine", &body.to_string());
1872 let err = expect_err(svc.create_state_machine(&req));
1873 assert!(err.to_string().contains("InvalidArn"));
1874 }
1875
1876 #[test]
1877 fn create_state_machine_invalid_name() {
1878 let svc = StepFunctionsService::new(make_state());
1879 let body = json!({
1880 "name": "has spaces!",
1881 "definition": VALID_DEF,
1882 "roleArn": "arn:aws:iam::123456789012:role/r",
1883 });
1884 let req = make_request("CreateStateMachine", &body.to_string());
1885 let err = expect_err(svc.create_state_machine(&req));
1886 assert!(err.to_string().contains("InvalidName"));
1887 }
1888
1889 #[test]
1890 fn create_state_machine_name_too_long() {
1891 let svc = StepFunctionsService::new(make_state());
1892 let long_name = "a".repeat(81);
1893 let body = json!({
1894 "name": long_name,
1895 "definition": VALID_DEF,
1896 "roleArn": "arn:aws:iam::123456789012:role/r",
1897 });
1898 let req = make_request("CreateStateMachine", &body.to_string());
1899 let err = expect_err(svc.create_state_machine(&req));
1900 assert!(err.to_string().contains("InvalidName"));
1901 }
1902
1903 #[test]
1906 fn describe_state_machine_found() {
1907 let svc = StepFunctionsService::new(make_state());
1908 let arn = create_sm(&svc, "desc-sm");
1909
1910 let req = make_request(
1911 "DescribeStateMachine",
1912 &json!({"stateMachineArn": arn}).to_string(),
1913 );
1914 let resp = svc.describe_state_machine(&req).unwrap();
1915 let b = body_json(&resp);
1916 assert_eq!(b["name"], "desc-sm");
1917 assert_eq!(b["status"], "ACTIVE");
1918 assert!(b["definition"].as_str().is_some());
1919 }
1920
1921 #[test]
1922 fn describe_state_machine_not_found() {
1923 let svc = StepFunctionsService::new(make_state());
1924 let req = make_request(
1925 "DescribeStateMachine",
1926 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1927 .to_string(),
1928 );
1929 let err = expect_err(svc.describe_state_machine(&req));
1930 assert!(err.to_string().contains("StateMachineDoesNotExist"));
1931 }
1932
1933 #[test]
1936 fn list_state_machines_empty() {
1937 let svc = StepFunctionsService::new(make_state());
1938 let req = make_request("ListStateMachines", "{}");
1939 let resp = svc.list_state_machines(&req).unwrap();
1940 let b = body_json(&resp);
1941 assert!(b["stateMachines"].as_array().unwrap().is_empty());
1942 }
1943
1944 #[test]
1945 fn list_state_machines_returns_created() {
1946 let svc = StepFunctionsService::new(make_state());
1947 create_sm(&svc, "sm-1");
1948 create_sm(&svc, "sm-2");
1949
1950 let req = make_request("ListStateMachines", "{}");
1951 let resp = svc.list_state_machines(&req).unwrap();
1952 let b = body_json(&resp);
1953 assert_eq!(b["stateMachines"].as_array().unwrap().len(), 2);
1954 }
1955
1956 #[test]
1959 fn delete_state_machine() {
1960 let svc = StepFunctionsService::new(make_state());
1961 let arn = create_sm(&svc, "del-sm");
1962
1963 let req = make_request(
1964 "DeleteStateMachine",
1965 &json!({"stateMachineArn": arn}).to_string(),
1966 );
1967 svc.delete_state_machine(&req).unwrap();
1968
1969 let req = make_request(
1971 "DescribeStateMachine",
1972 &json!({"stateMachineArn": arn}).to_string(),
1973 );
1974 assert!(svc.describe_state_machine(&req).is_err());
1975 }
1976
1977 #[test]
1978 fn delete_state_machine_nonexistent_succeeds() {
1979 let svc = StepFunctionsService::new(make_state());
1980 let req = make_request(
1981 "DeleteStateMachine",
1982 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1983 .to_string(),
1984 );
1985 svc.delete_state_machine(&req).unwrap();
1987 }
1988
1989 #[test]
1992 fn update_state_machine() {
1993 let svc = StepFunctionsService::new(make_state());
1994 let arn = create_sm(&svc, "upd-sm");
1995
1996 let new_def = r#"{"StartAt":"NewPass","States":{"NewPass":{"Type":"Pass","End":true}}}"#;
1997 let body = json!({
1998 "stateMachineArn": arn,
1999 "definition": new_def,
2000 "description": "updated",
2001 });
2002 let req = make_request("UpdateStateMachine", &body.to_string());
2003 let resp = svc.update_state_machine(&req).unwrap();
2004 let b = body_json(&resp);
2005 assert!(b["updateDate"].as_f64().is_some());
2006
2007 let req = make_request(
2009 "DescribeStateMachine",
2010 &json!({"stateMachineArn": arn}).to_string(),
2011 );
2012 let resp = svc.describe_state_machine(&req).unwrap();
2013 let b = body_json(&resp);
2014 assert!(b["definition"].as_str().unwrap().contains("NewPass"));
2015 assert_eq!(b["description"], "updated");
2016 }
2017
2018 #[test]
2019 fn update_state_machine_not_found() {
2020 let svc = StepFunctionsService::new(make_state());
2021 let body = json!({
2022 "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
2023 "definition": VALID_DEF,
2024 });
2025 let req = make_request("UpdateStateMachine", &body.to_string());
2026 let err = expect_err(svc.update_state_machine(&req));
2027 assert!(err.to_string().contains("StateMachineDoesNotExist"));
2028 }
2029
2030 #[tokio::test]
2033 async fn start_execution_basic() {
2034 let svc = StepFunctionsService::new(make_state());
2035 let arn = create_sm(&svc, "exec-sm");
2036
2037 let body = json!({
2038 "stateMachineArn": arn,
2039 "input": r#"{"key":"value"}"#,
2040 });
2041 let req = make_request("StartExecution", &body.to_string());
2042 let resp = svc.start_execution(&req).unwrap();
2043 let b = body_json(&resp);
2044 assert!(b["executionArn"].as_str().is_some());
2045 assert!(b["startDate"].as_f64().is_some());
2046 }
2047
2048 #[tokio::test]
2049 async fn start_execution_with_name() {
2050 let svc = StepFunctionsService::new(make_state());
2051 let arn = create_sm(&svc, "named-exec");
2052
2053 let body = json!({
2054 "stateMachineArn": arn,
2055 "name": "my-execution",
2056 });
2057 let req = make_request("StartExecution", &body.to_string());
2058 let resp = svc.start_execution(&req).unwrap();
2059 let b = body_json(&resp);
2060 assert!(b["executionArn"].as_str().unwrap().contains("my-execution"));
2061 }
2062
2063 #[tokio::test]
2064 async fn start_execution_sm_not_found() {
2065 let svc = StepFunctionsService::new(make_state());
2066 let body = json!({
2067 "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
2068 });
2069 let req = make_request("StartExecution", &body.to_string());
2070 let err = expect_err(svc.start_execution(&req));
2071 assert!(err.to_string().contains("StateMachineDoesNotExist"));
2072 }
2073
2074 #[tokio::test]
2075 async fn start_execution_invalid_input() {
2076 let svc = StepFunctionsService::new(make_state());
2077 let arn = create_sm(&svc, "bad-input");
2078
2079 let body = json!({
2080 "stateMachineArn": arn,
2081 "input": "not json",
2082 });
2083 let req = make_request("StartExecution", &body.to_string());
2084 let err = expect_err(svc.start_execution(&req));
2085 assert!(err.to_string().contains("InvalidExecutionInput"));
2086 }
2087
2088 #[tokio::test]
2089 async fn start_execution_duplicate_name() {
2090 let svc = StepFunctionsService::new(make_state());
2091 let arn = create_sm(&svc, "dup-exec");
2092
2093 let body = json!({
2094 "stateMachineArn": arn,
2095 "name": "same-name",
2096 });
2097 let req = make_request("StartExecution", &body.to_string());
2098 svc.start_execution(&req).unwrap();
2099
2100 let req = make_request("StartExecution", &body.to_string());
2101 let err = expect_err(svc.start_execution(&req));
2102 assert!(err.to_string().contains("ExecutionAlreadyExists"));
2103 }
2104
2105 #[tokio::test]
2108 async fn describe_execution_found() {
2109 let svc = StepFunctionsService::new(make_state());
2110 let sm_arn = create_sm(&svc, "desc-exec");
2111
2112 let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
2113 let req = make_request("StartExecution", &body.to_string());
2114 let resp = svc.start_execution(&req).unwrap();
2115 let exec_arn = body_json(&resp)["executionArn"]
2116 .as_str()
2117 .unwrap()
2118 .to_string();
2119
2120 let req = make_request(
2121 "DescribeExecution",
2122 &json!({"executionArn": exec_arn}).to_string(),
2123 );
2124 let resp = svc.describe_execution(&req).unwrap();
2125 let b = body_json(&resp);
2126 assert_eq!(b["name"], "e1");
2127 assert_eq!(b["status"], "RUNNING");
2128 }
2129
2130 #[tokio::test]
2131 async fn describe_execution_not_found() {
2132 let svc = StepFunctionsService::new(make_state());
2133 let req = make_request(
2134 "DescribeExecution",
2135 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
2136 .to_string(),
2137 );
2138 let err = expect_err(svc.describe_execution(&req));
2139 assert!(err.to_string().contains("ExecutionDoesNotExist"));
2140 }
2141
2142 #[tokio::test]
2145 async fn stop_execution() {
2146 let svc = StepFunctionsService::new(make_state());
2147 let sm_arn = create_sm(&svc, "stop-sm");
2148
2149 let body = json!({"stateMachineArn": sm_arn, "name": "stop-e"});
2150 let req = make_request("StartExecution", &body.to_string());
2151 let resp = svc.start_execution(&req).unwrap();
2152 let exec_arn = body_json(&resp)["executionArn"]
2153 .as_str()
2154 .unwrap()
2155 .to_string();
2156
2157 let body = json!({
2158 "executionArn": exec_arn,
2159 "error": "UserAborted",
2160 "cause": "test stop",
2161 });
2162 let req = make_request("StopExecution", &body.to_string());
2163 let resp = svc.stop_execution(&req).unwrap();
2164 let b = body_json(&resp);
2165 assert!(b["stopDate"].as_f64().is_some());
2166
2167 let req = make_request(
2169 "DescribeExecution",
2170 &json!({"executionArn": exec_arn}).to_string(),
2171 );
2172 let resp = svc.describe_execution(&req).unwrap();
2173 let b = body_json(&resp);
2174 assert_eq!(b["status"], "ABORTED");
2175 assert_eq!(b["error"], "UserAborted");
2176 }
2177
2178 #[tokio::test]
2179 async fn stop_execution_not_found() {
2180 let svc = StepFunctionsService::new(make_state());
2181 let req = make_request(
2182 "StopExecution",
2183 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
2184 .to_string(),
2185 );
2186 let err = expect_err(svc.stop_execution(&req));
2187 assert!(err.to_string().contains("ExecutionDoesNotExist"));
2188 }
2189
2190 #[tokio::test]
2193 async fn list_executions() {
2194 let svc = StepFunctionsService::new(make_state());
2195 let sm_arn = create_sm(&svc, "list-exec");
2196
2197 for i in 0..3 {
2198 let body = json!({"stateMachineArn": sm_arn, "name": format!("e{i}")});
2199 let req = make_request("StartExecution", &body.to_string());
2200 svc.start_execution(&req).unwrap();
2201 }
2202
2203 let req = make_request(
2204 "ListExecutions",
2205 &json!({"stateMachineArn": sm_arn}).to_string(),
2206 );
2207 let resp = svc.list_executions(&req).unwrap();
2208 let b = body_json(&resp);
2209 assert_eq!(b["executions"].as_array().unwrap().len(), 3);
2210 }
2211
2212 #[tokio::test]
2213 async fn list_executions_sm_not_found() {
2214 let svc = StepFunctionsService::new(make_state());
2215 let req = make_request(
2216 "ListExecutions",
2217 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
2218 .to_string(),
2219 );
2220 let err = expect_err(svc.list_executions(&req));
2221 assert!(err.to_string().contains("StateMachineDoesNotExist"));
2222 }
2223
2224 #[tokio::test]
2227 async fn get_execution_history_not_found() {
2228 let svc = StepFunctionsService::new(make_state());
2229 let req = make_request(
2230 "GetExecutionHistory",
2231 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
2232 .to_string(),
2233 );
2234 let err = expect_err(svc.get_execution_history(&req));
2235 assert!(err.to_string().contains("ExecutionDoesNotExist"));
2236 }
2237
2238 #[tokio::test]
2241 async fn describe_sm_for_execution() {
2242 let svc = StepFunctionsService::new(make_state());
2243 let sm_arn = create_sm(&svc, "sm-for-exec");
2244
2245 let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
2246 let req = make_request("StartExecution", &body.to_string());
2247 let resp = svc.start_execution(&req).unwrap();
2248 let exec_arn = body_json(&resp)["executionArn"]
2249 .as_str()
2250 .unwrap()
2251 .to_string();
2252
2253 let req = make_request(
2254 "DescribeStateMachineForExecution",
2255 &json!({"executionArn": exec_arn}).to_string(),
2256 );
2257 let resp = svc.describe_state_machine_for_execution(&req).unwrap();
2258 let b = body_json(&resp);
2259 assert_eq!(b["name"], "sm-for-exec");
2260 }
2261
2262 #[test]
2265 fn tag_untag_list_tags() {
2266 let svc = StepFunctionsService::new(make_state());
2267 let arn = create_sm(&svc, "tagged-sm");
2268
2269 let body = json!({
2271 "resourceArn": arn,
2272 "tags": [{"key": "env", "value": "prod"}],
2273 });
2274 let req = make_request("TagResource", &body.to_string());
2275 svc.tag_resource(&req).unwrap();
2276
2277 let req = make_request(
2279 "ListTagsForResource",
2280 &json!({"resourceArn": arn}).to_string(),
2281 );
2282 let resp = svc.list_tags_for_resource(&req).unwrap();
2283 let b = body_json(&resp);
2284 let tags = b["tags"].as_array().unwrap();
2285 assert_eq!(tags.len(), 1);
2286 assert_eq!(tags[0]["key"], "env");
2287
2288 let body = json!({
2290 "resourceArn": arn,
2291 "tagKeys": ["env"],
2292 });
2293 let req = make_request("UntagResource", &body.to_string());
2294 svc.untag_resource(&req).unwrap();
2295
2296 let req = make_request(
2298 "ListTagsForResource",
2299 &json!({"resourceArn": arn}).to_string(),
2300 );
2301 let resp = svc.list_tags_for_resource(&req).unwrap();
2302 let b = body_json(&resp);
2303 assert!(b["tags"].as_array().unwrap().is_empty());
2304 }
2305
2306 #[test]
2307 fn tag_resource_not_found() {
2308 let svc = StepFunctionsService::new(make_state());
2309 let body = json!({
2310 "resourceArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
2311 "tags": [{"key": "k", "value": "v"}],
2312 });
2313 let req = make_request("TagResource", &body.to_string());
2314 let err = expect_err(svc.tag_resource(&req));
2315 assert!(err.to_string().contains("ResourceNotFound"));
2316 }
2317
2318 #[test]
2321 fn test_validate_name() {
2322 assert!(validate_name("valid-name").is_ok());
2323 assert!(validate_name("under_score").is_ok());
2324 assert!(validate_name("").is_err());
2325 assert!(validate_name("has spaces").is_err());
2326 assert!(validate_name(&"a".repeat(81)).is_err());
2327 }
2328
2329 #[test]
2330 fn test_validate_definition() {
2331 assert!(validate_definition(VALID_DEF).is_ok());
2332 assert!(validate_definition("not json").is_err());
2333 assert!(validate_definition(r#"{"States":{}}"#).is_err()); assert!(validate_definition(r#"{"StartAt":"S"}"#).is_err()); }
2336
2337 #[test]
2338 fn test_validate_arn() {
2339 assert!(validate_arn("arn:aws:states:us-east-1:123:sm:test").is_ok());
2340 assert!(validate_arn("not-an-arn").is_err());
2341 }
2342
2343 #[test]
2344 fn test_camel_to_details_key() {
2345 assert_eq!(camel_to_details_key("PassStateEntered"), "passStateEntered");
2346 assert_eq!(camel_to_details_key(""), "");
2347 }
2348
2349 #[test]
2350 fn test_is_mutating_action() {
2351 assert!(is_mutating_action("CreateStateMachine"));
2352 assert!(is_mutating_action("StartExecution"));
2353 assert!(!is_mutating_action("DescribeStateMachine"));
2354 assert!(!is_mutating_action("ListStateMachines"));
2355 }
2356}