1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use chrono::Utc;
6use http::StatusCode;
7use serde_json::{json, Value};
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::pagination::paginate;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_core::validation::*;
14use fakecloud_dynamodb::state::SharedDynamoDbState;
15use fakecloud_persistence::SnapshotStore;
16
17use crate::interpreter;
18use crate::state::{
19 Execution, ExecutionStatus, SharedStepFunctionsState, StateMachine, StateMachineStatus,
20 StateMachineType, StepFunctionsSnapshot, StepFunctionsState,
21 STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
22};
23
24const SUPPORTED: &[&str] = &[
25 "CreateStateMachine",
26 "DescribeStateMachine",
27 "ListStateMachines",
28 "DeleteStateMachine",
29 "UpdateStateMachine",
30 "TagResource",
31 "UntagResource",
32 "ListTagsForResource",
33 "StartExecution",
34 "StopExecution",
35 "DescribeExecution",
36 "ListExecutions",
37 "GetExecutionHistory",
38 "DescribeStateMachineForExecution",
39 "CreateActivity",
40 "DeleteActivity",
41 "DescribeActivity",
42 "ListActivities",
43 "GetActivityTask",
44 "SendTaskFailure",
45 "SendTaskHeartbeat",
46 "SendTaskSuccess",
47 "PublishStateMachineVersion",
48 "DeleteStateMachineVersion",
49 "ListStateMachineVersions",
50 "CreateStateMachineAlias",
51 "DeleteStateMachineAlias",
52 "DescribeStateMachineAlias",
53 "ListStateMachineAliases",
54 "UpdateStateMachineAlias",
55 "DescribeMapRun",
56 "ListMapRuns",
57 "UpdateMapRun",
58 "RedriveExecution",
59 "StartSyncExecution",
60 "TestState",
61 "ValidateStateMachineDefinition",
62];
63
64pub struct StepFunctionsService {
65 state: SharedStepFunctionsState,
66 delivery: Option<Arc<DeliveryBus>>,
67 dynamodb_state: Option<SharedDynamoDbState>,
68 snapshot_store: Option<Arc<dyn SnapshotStore>>,
69 snapshot_lock: Arc<AsyncMutex<()>>,
70}
71
72impl StepFunctionsService {
73 pub fn new(state: SharedStepFunctionsState) -> Self {
74 Self {
75 state,
76 delivery: None,
77 dynamodb_state: None,
78 snapshot_store: None,
79 snapshot_lock: Arc::new(AsyncMutex::new(())),
80 }
81 }
82
83 pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
84 self.delivery = Some(delivery);
85 self
86 }
87
88 pub fn with_dynamodb(mut self, dynamodb_state: SharedDynamoDbState) -> Self {
89 self.dynamodb_state = Some(dynamodb_state);
90 self
91 }
92
93 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
94 self.snapshot_store = Some(store);
95 self
96 }
97
98 async fn save_snapshot(&self) {
99 let Some(store) = self.snapshot_store.clone() else {
100 return;
101 };
102 let _guard = self.snapshot_lock.lock().await;
103 let snapshot = StepFunctionsSnapshot {
104 schema_version: STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
105 state: None,
106 accounts: Some(self.state.read().clone()),
107 };
108 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
109 let bytes = serde_json::to_vec(&snapshot)
110 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
111 store.save(&bytes)
112 })
113 .await;
114 match join {
115 Ok(Ok(())) => {}
116 Ok(Err(err)) => tracing::error!(%err, "failed to write stepfunctions snapshot"),
117 Err(err) => tracing::error!(%err, "stepfunctions snapshot task panicked"),
118 }
119 }
120}
121
122fn is_mutating_action(action: &str) -> bool {
123 matches!(
124 action,
125 "CreateStateMachine"
126 | "DeleteStateMachine"
127 | "UpdateStateMachine"
128 | "TagResource"
129 | "UntagResource"
130 | "StartExecution"
131 | "StopExecution"
132 | "CreateActivity"
133 | "DeleteActivity"
134 | "GetActivityTask"
135 | "SendTaskFailure"
136 | "SendTaskHeartbeat"
137 | "SendTaskSuccess"
138 | "PublishStateMachineVersion"
139 | "DeleteStateMachineVersion"
140 | "CreateStateMachineAlias"
141 | "DeleteStateMachineAlias"
142 | "UpdateStateMachineAlias"
143 | "UpdateMapRun"
144 | "RedriveExecution"
145 | "StartSyncExecution"
146 )
147}
148
149#[async_trait]
150impl AwsService for StepFunctionsService {
151 fn service_name(&self) -> &str {
152 "states"
153 }
154
155 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
156 let mutates = is_mutating_action(req.action.as_str());
157 let result = match req.action.as_str() {
158 "CreateStateMachine" => self.create_state_machine(&req),
159 "DescribeStateMachine" => self.describe_state_machine(&req),
160 "ListStateMachines" => self.list_state_machines(&req),
161 "DeleteStateMachine" => self.delete_state_machine(&req),
162 "UpdateStateMachine" => self.update_state_machine(&req),
163 "TagResource" => self.tag_resource(&req),
164 "UntagResource" => self.untag_resource(&req),
165 "ListTagsForResource" => self.list_tags_for_resource(&req),
166 "StartExecution" => self.start_execution(&req),
167 "StopExecution" => self.stop_execution(&req),
168 "DescribeExecution" => self.describe_execution(&req),
169 "ListExecutions" => self.list_executions(&req),
170 "GetExecutionHistory" => self.get_execution_history(&req),
171 "DescribeStateMachineForExecution" => self.describe_state_machine_for_execution(&req),
172 "CreateActivity" => self.create_activity(&req),
173 "DeleteActivity" => self.delete_activity(&req),
174 "DescribeActivity" => self.describe_activity(&req),
175 "ListActivities" => self.list_activities(&req),
176 "GetActivityTask" => self.get_activity_task(&req).await,
177 "SendTaskFailure" => self.send_task_failure(&req),
178 "SendTaskHeartbeat" => self.send_task_heartbeat(&req),
179 "SendTaskSuccess" => self.send_task_success(&req),
180 "PublishStateMachineVersion" => self.publish_state_machine_version(&req),
181 "DeleteStateMachineVersion" => self.delete_state_machine_version(&req),
182 "ListStateMachineVersions" => self.list_state_machine_versions(&req),
183 "CreateStateMachineAlias" => self.create_state_machine_alias(&req),
184 "DeleteStateMachineAlias" => self.delete_state_machine_alias(&req),
185 "DescribeStateMachineAlias" => self.describe_state_machine_alias(&req),
186 "ListStateMachineAliases" => self.list_state_machine_aliases(&req),
187 "UpdateStateMachineAlias" => self.update_state_machine_alias(&req),
188 "DescribeMapRun" => self.describe_map_run(&req),
189 "ListMapRuns" => self.list_map_runs(&req),
190 "UpdateMapRun" => self.update_map_run(&req),
191 "RedriveExecution" => self.redrive_execution(&req),
192 "StartSyncExecution" => self.start_sync_execution(&req),
193 "TestState" => self.test_state(&req),
194 "ValidateStateMachineDefinition" => self.validate_state_machine_definition(&req),
195 _ => Err(AwsServiceError::action_not_implemented(
196 "states",
197 &req.action,
198 )),
199 };
200 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
201 self.save_snapshot().await;
202 }
203 result
204 }
205
206 fn supported_actions(&self) -> &[&str] {
207 SUPPORTED
208 }
209}
210
211impl StepFunctionsService {
212 fn create_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
215 let body = req.json_body();
216
217 validate_required("name", &body["name"])?;
218 let name = body["name"].as_str().ok_or_else(|| missing("name"))?;
219 validate_name(name)?;
220
221 validate_required("definition", &body["definition"])?;
222 let definition = body["definition"]
223 .as_str()
224 .ok_or_else(|| missing("definition"))?;
225 validate_definition(definition)?;
226
227 validate_required("roleArn", &body["roleArn"])?;
228 let role_arn = body["roleArn"].as_str().ok_or_else(|| missing("roleArn"))?;
229 validate_arn(role_arn)?;
230
231 let machine_type = if let Some(t) = body["type"].as_str() {
232 StateMachineType::parse(t).ok_or_else(|| {
233 AwsServiceError::aws_error(
234 StatusCode::BAD_REQUEST,
235 "ValidationException",
236 format!(
237 "Value '{t}' at 'type' failed to satisfy constraint: \
238 Member must satisfy enum value set: [STANDARD, EXPRESS]"
239 ),
240 )
241 })?
242 } else {
243 StateMachineType::Standard
244 };
245
246 let mut accounts = self.state.write();
247 let state = accounts.get_or_create(&req.account_id);
248 let arn = state.state_machine_arn(name);
249
250 if state.state_machines.values().any(|sm| sm.name == name) {
252 return Err(AwsServiceError::aws_error(
253 StatusCode::CONFLICT,
254 "StateMachineAlreadyExists",
255 format!("State Machine Already Exists: '{arn}'"),
256 ));
257 }
258
259 let now = Utc::now();
260 let revision_id = uuid::Uuid::new_v4().to_string();
261
262 let mut tags = HashMap::new();
263 if !body["tags"].is_null() {
264 fakecloud_core::tags::apply_tags(&mut tags, &body, "tags", "key", "value").map_err(
265 |f| {
266 AwsServiceError::aws_error(
267 StatusCode::BAD_REQUEST,
268 "ValidationException",
269 format!("{f} must be a list"),
270 )
271 },
272 )?;
273 }
274
275 let sm = StateMachine {
276 name: name.to_string(),
277 arn: arn.clone(),
278 definition: definition.to_string(),
279 role_arn: role_arn.to_string(),
280 machine_type,
281 status: StateMachineStatus::Active,
282 creation_date: now,
283 update_date: now,
284 tags,
285 revision_id: revision_id.clone(),
286 logging_configuration: body.get("loggingConfiguration").cloned(),
287 tracing_configuration: body.get("tracingConfiguration").cloned(),
288 description: body["description"].as_str().unwrap_or("").to_string(),
289 };
290
291 state.state_machines.insert(arn.clone(), sm);
292
293 Ok(AwsResponse::ok_json(json!({
294 "stateMachineArn": arn,
295 "creationDate": now.timestamp() as f64,
296 "stateMachineVersionArn": arn,
297 })))
298 }
299
300 fn describe_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
301 let body = req.json_body();
302 validate_required("stateMachineArn", &body["stateMachineArn"])?;
303 let arn = body["stateMachineArn"]
304 .as_str()
305 .ok_or_else(|| missing("stateMachineArn"))?;
306 validate_arn(arn)?;
307
308 let accounts = self.state.read();
309 let empty = StepFunctionsState::new(&req.account_id, &req.region);
310 let state = accounts.get(&req.account_id).unwrap_or(&empty);
311 let sm = state
312 .state_machines
313 .get(arn)
314 .ok_or_else(|| state_machine_not_found(arn))?;
315
316 Ok(AwsResponse::ok_json(state_machine_to_json(sm)))
317 }
318
319 fn list_state_machines(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
320 let body = req.json_body();
321 let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
322 validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
323 let next_token = body["nextToken"].as_str();
324
325 let accounts = self.state.read();
326 let empty = StepFunctionsState::new(&req.account_id, &req.region);
327 let state = accounts.get(&req.account_id).unwrap_or(&empty);
328 let mut machines: Vec<&StateMachine> = state.state_machines.values().collect();
329 machines.sort_by(|a, b| a.name.cmp(&b.name));
330
331 let items: Vec<Value> = machines
332 .iter()
333 .map(|sm| {
334 json!({
335 "name": sm.name,
336 "stateMachineArn": sm.arn,
337 "type": sm.machine_type.as_str(),
338 "creationDate": sm.creation_date.timestamp() as f64,
339 })
340 })
341 .collect();
342
343 let (page, token) = paginate(&items, next_token, max_results);
344
345 let mut resp = json!({ "stateMachines": page });
346 if let Some(t) = token {
347 resp["nextToken"] = json!(t);
348 }
349 Ok(AwsResponse::ok_json(resp))
350 }
351
352 fn delete_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
353 let body = req.json_body();
354 validate_required("stateMachineArn", &body["stateMachineArn"])?;
355 let arn = body["stateMachineArn"]
356 .as_str()
357 .ok_or_else(|| missing("stateMachineArn"))?;
358 validate_arn(arn)?;
359
360 let mut accounts = self.state.write();
361 let state = accounts.get_or_create(&req.account_id);
362 state.state_machines.remove(arn);
364
365 Ok(AwsResponse::ok_json(json!({})))
366 }
367
368 fn update_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
369 let body = req.json_body();
370 validate_required("stateMachineArn", &body["stateMachineArn"])?;
371 let arn = body["stateMachineArn"]
372 .as_str()
373 .ok_or_else(|| missing("stateMachineArn"))?;
374 validate_arn(arn)?;
375
376 let mut accounts = self.state.write();
377 let state = accounts.get_or_create(&req.account_id);
378 let sm = state
379 .state_machines
380 .get_mut(arn)
381 .ok_or_else(|| state_machine_not_found(arn))?;
382
383 if let Some(definition) = body["definition"].as_str() {
384 validate_definition(definition)?;
385 sm.definition = definition.to_string();
386 }
387
388 if let Some(role_arn) = body["roleArn"].as_str() {
389 validate_arn(role_arn)?;
390 sm.role_arn = role_arn.to_string();
391 }
392
393 if let Some(logging) = body.get("loggingConfiguration") {
394 sm.logging_configuration = Some(logging.clone());
395 }
396
397 if let Some(tracing) = body.get("tracingConfiguration") {
398 sm.tracing_configuration = Some(tracing.clone());
399 }
400
401 if let Some(description) = body["description"].as_str() {
402 sm.description = description.to_string();
403 }
404
405 let now = Utc::now();
406 sm.update_date = now;
407 sm.revision_id = uuid::Uuid::new_v4().to_string();
408
409 let revision_id = sm.revision_id.clone();
410 let sm_arn = sm.arn.clone();
411
412 Ok(AwsResponse::ok_json(json!({
413 "updateDate": now.timestamp() as f64,
414 "revisionId": revision_id,
415 "stateMachineVersionArn": sm_arn,
416 })))
417 }
418
419 fn start_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
422 let body = req.json_body();
423 validate_required("stateMachineArn", &body["stateMachineArn"])?;
424 let sm_arn = body["stateMachineArn"]
425 .as_str()
426 .ok_or_else(|| missing("stateMachineArn"))?;
427 validate_arn(sm_arn)?;
428
429 let input = body["input"].as_str().map(|s| s.to_string());
430
431 if let Some(ref input_str) = input {
433 let _: serde_json::Value = serde_json::from_str(input_str).map_err(|_| {
434 AwsServiceError::aws_error(
435 StatusCode::BAD_REQUEST,
436 "InvalidExecutionInput",
437 "Invalid execution input: must be valid JSON".to_string(),
438 )
439 })?;
440 }
441
442 let execution_name = body["name"]
443 .as_str()
444 .map(|s| s.to_string())
445 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
446
447 if let Some(name) = body["name"].as_str() {
448 validate_name(name)?;
449 }
450
451 let mut accounts = self.state.write();
452 let state = accounts.get_or_create(&req.account_id);
453 let sm = state
454 .state_machines
455 .get(sm_arn)
456 .ok_or_else(|| state_machine_not_found(sm_arn))?;
457
458 let sm_name = sm.name.clone();
459 let definition = sm.definition.clone();
460 let exec_arn = state.execution_arn(&sm_name, &execution_name);
461
462 if state.executions.contains_key(&exec_arn) {
464 return Err(AwsServiceError::aws_error(
465 StatusCode::CONFLICT,
466 "ExecutionAlreadyExists",
467 format!("Execution Already Exists: '{exec_arn}'"),
468 ));
469 }
470
471 let now = Utc::now();
472 let execution = Execution {
473 execution_arn: exec_arn.clone(),
474 state_machine_arn: sm_arn.to_string(),
475 state_machine_name: sm_name,
476 name: execution_name,
477 status: ExecutionStatus::Running,
478 input: input.clone(),
479 output: None,
480 start_date: now,
481 stop_date: None,
482 error: None,
483 cause: None,
484 history_events: vec![],
485 };
486
487 state.executions.insert(exec_arn.clone(), execution);
488 drop(accounts);
489
490 let shared_state = self.state.clone();
492 let exec_arn_clone = exec_arn.clone();
493 let input_clone = input;
494 let delivery = self.delivery.clone();
495 let dynamodb_state = self.dynamodb_state.clone();
496 tokio::spawn(async move {
497 interpreter::execute_state_machine(
498 shared_state,
499 exec_arn_clone,
500 definition,
501 input_clone,
502 delivery,
503 dynamodb_state,
504 )
505 .await;
506 });
507
508 Ok(AwsResponse::ok_json(json!({
509 "executionArn": exec_arn,
510 "startDate": now.timestamp() as f64,
511 })))
512 }
513
514 fn stop_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
515 let body = req.json_body();
516 validate_required("executionArn", &body["executionArn"])?;
517 let exec_arn = body["executionArn"]
518 .as_str()
519 .ok_or_else(|| missing("executionArn"))?;
520
521 let error = body["error"].as_str().map(|s| s.to_string());
522 let cause = body["cause"].as_str().map(|s| s.to_string());
523
524 let mut accounts = self.state.write();
525 let state = accounts.get_or_create(&req.account_id);
526 let exec = state
527 .executions
528 .get_mut(exec_arn)
529 .ok_or_else(|| execution_not_found(exec_arn))?;
530
531 if exec.status != ExecutionStatus::Running {
532 return Err(AwsServiceError::aws_error(
533 StatusCode::BAD_REQUEST,
534 "ExecutionNotRunning",
535 format!("Execution is not running: '{exec_arn}'"),
536 ));
537 }
538
539 let now = Utc::now();
540 exec.status = ExecutionStatus::Aborted;
541 exec.stop_date = Some(now);
542 exec.error = error;
543 exec.cause = cause;
544
545 Ok(AwsResponse::ok_json(json!({
546 "stopDate": now.timestamp() as f64,
547 })))
548 }
549
550 fn describe_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
551 let body = req.json_body();
552 validate_required("executionArn", &body["executionArn"])?;
553 let exec_arn = body["executionArn"]
554 .as_str()
555 .ok_or_else(|| missing("executionArn"))?;
556
557 let accounts = self.state.read();
558 let empty = StepFunctionsState::new(&req.account_id, &req.region);
559 let state = accounts.get(&req.account_id).unwrap_or(&empty);
560 let exec = state
561 .executions
562 .get(exec_arn)
563 .ok_or_else(|| execution_not_found(exec_arn))?;
564
565 Ok(AwsResponse::ok_json(execution_to_json(exec)))
566 }
567
568 fn list_executions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
569 let body = req.json_body();
570 validate_required("stateMachineArn", &body["stateMachineArn"])?;
571 let sm_arn = body["stateMachineArn"]
572 .as_str()
573 .ok_or_else(|| missing("stateMachineArn"))?;
574 validate_arn(sm_arn)?;
575
576 let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
577 validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
578 let next_token = body["nextToken"].as_str();
579 let status_filter = body["statusFilter"].as_str();
580
581 let accounts = self.state.read();
582 let empty = StepFunctionsState::new(&req.account_id, &req.region);
583 let state = accounts.get(&req.account_id).unwrap_or(&empty);
584
585 if !state.state_machines.contains_key(sm_arn) {
587 return Err(state_machine_not_found(sm_arn));
588 }
589
590 let mut executions: Vec<&Execution> = state
591 .executions
592 .values()
593 .filter(|e| e.state_machine_arn == sm_arn)
594 .filter(|e| {
595 status_filter
596 .map(|sf| e.status.as_str() == sf)
597 .unwrap_or(true)
598 })
599 .collect();
600
601 executions.sort_by_key(|e| std::cmp::Reverse(e.start_date));
603
604 let items: Vec<Value> = executions
605 .iter()
606 .map(|e| {
607 let mut item = json!({
608 "executionArn": e.execution_arn,
609 "stateMachineArn": e.state_machine_arn,
610 "name": e.name,
611 "status": e.status.as_str(),
612 "startDate": e.start_date.timestamp() as f64,
613 });
614 if let Some(stop) = e.stop_date {
615 item["stopDate"] = json!(stop.timestamp() as f64);
616 }
617 item
618 })
619 .collect();
620
621 let (page, token) = paginate(&items, next_token, max_results);
622
623 let mut resp = json!({ "executions": page });
624 if let Some(t) = token {
625 resp["nextToken"] = json!(t);
626 }
627 Ok(AwsResponse::ok_json(resp))
628 }
629
630 fn get_execution_history(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
631 let body = req.json_body();
632 validate_required("executionArn", &body["executionArn"])?;
633 let exec_arn = body["executionArn"]
634 .as_str()
635 .ok_or_else(|| missing("executionArn"))?;
636
637 let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
638 validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
639 let next_token = body["nextToken"].as_str();
640 let reverse_order = body["reverseOrder"].as_bool().unwrap_or(false);
641
642 let accounts = self.state.read();
643 let empty = StepFunctionsState::new(&req.account_id, &req.region);
644 let state = accounts.get(&req.account_id).unwrap_or(&empty);
645 let exec = state
646 .executions
647 .get(exec_arn)
648 .ok_or_else(|| execution_not_found(exec_arn))?;
649
650 let mut events: Vec<Value> = exec
651 .history_events
652 .iter()
653 .map(|e| {
654 json!({
655 "id": e.id,
656 "type": e.event_type,
657 "timestamp": e.timestamp.timestamp() as f64,
658 "previousEventId": e.previous_event_id,
659 format!("{}EventDetails", camel_to_details_key(&e.event_type)): e.details,
660 })
661 })
662 .collect();
663
664 if reverse_order {
665 events.reverse();
666 }
667
668 let (page, token) = paginate(&events, next_token, max_results);
669
670 let mut resp = json!({ "events": page });
671 if let Some(t) = token {
672 resp["nextToken"] = json!(t);
673 }
674 Ok(AwsResponse::ok_json(resp))
675 }
676
677 fn describe_state_machine_for_execution(
678 &self,
679 req: &AwsRequest,
680 ) -> Result<AwsResponse, AwsServiceError> {
681 let body = req.json_body();
682 validate_required("executionArn", &body["executionArn"])?;
683 let exec_arn = body["executionArn"]
684 .as_str()
685 .ok_or_else(|| missing("executionArn"))?;
686
687 let accounts = self.state.read();
688 let empty = StepFunctionsState::new(&req.account_id, &req.region);
689 let state = accounts.get(&req.account_id).unwrap_or(&empty);
690 let exec = state
691 .executions
692 .get(exec_arn)
693 .ok_or_else(|| execution_not_found(exec_arn))?;
694
695 let sm = state
696 .state_machines
697 .get(&exec.state_machine_arn)
698 .ok_or_else(|| state_machine_not_found(&exec.state_machine_arn))?;
699
700 Ok(AwsResponse::ok_json(state_machine_to_json(sm)))
701 }
702
703 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
706 let body = req.json_body();
707 validate_required("resourceArn", &body["resourceArn"])?;
708 let arn = body["resourceArn"]
709 .as_str()
710 .ok_or_else(|| missing("resourceArn"))?;
711 validate_arn(arn)?;
712 validate_required("tags", &body["tags"])?;
713
714 let mut accounts = self.state.write();
715 let state = accounts.get_or_create(&req.account_id);
716 let sm = state
717 .state_machines
718 .get_mut(arn)
719 .ok_or_else(|| resource_not_found(arn))?;
720
721 fakecloud_core::tags::apply_tags(&mut sm.tags, &body, "tags", "key", "value").map_err(
722 |f| {
723 AwsServiceError::aws_error(
724 StatusCode::BAD_REQUEST,
725 "ValidationException",
726 format!("{f} must be a list"),
727 )
728 },
729 )?;
730
731 Ok(AwsResponse::ok_json(json!({})))
732 }
733
734 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
735 let body = req.json_body();
736 validate_required("resourceArn", &body["resourceArn"])?;
737 let arn = body["resourceArn"]
738 .as_str()
739 .ok_or_else(|| missing("resourceArn"))?;
740 validate_arn(arn)?;
741 validate_required("tagKeys", &body["tagKeys"])?;
742
743 let mut accounts = self.state.write();
744 let state = accounts.get_or_create(&req.account_id);
745 let sm = state
746 .state_machines
747 .get_mut(arn)
748 .ok_or_else(|| resource_not_found(arn))?;
749
750 fakecloud_core::tags::remove_tags(&mut sm.tags, &body, "tagKeys").map_err(|f| {
751 AwsServiceError::aws_error(
752 StatusCode::BAD_REQUEST,
753 "ValidationException",
754 format!("{f} must be a list"),
755 )
756 })?;
757
758 Ok(AwsResponse::ok_json(json!({})))
759 }
760
761 fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
762 let body = req.json_body();
763 validate_required("resourceArn", &body["resourceArn"])?;
764 let arn = body["resourceArn"]
765 .as_str()
766 .ok_or_else(|| missing("resourceArn"))?;
767 validate_arn(arn)?;
768
769 let accounts = self.state.read();
770 let empty = StepFunctionsState::new(&req.account_id, &req.region);
771 let state = accounts.get(&req.account_id).unwrap_or(&empty);
772 let sm = state
773 .state_machines
774 .get(arn)
775 .ok_or_else(|| resource_not_found(arn))?;
776
777 let tags = fakecloud_core::tags::tags_to_json(&sm.tags, "key", "value");
778
779 Ok(AwsResponse::ok_json(json!({ "tags": tags })))
780 }
781
782 fn create_activity(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
785 let body = req.json_body();
786 let name = body["name"].as_str().ok_or_else(|| missing("name"))?;
787 validate_name(name)?;
788 let mut accounts = self.state.write();
789 let state = accounts.get_or_create(&req.account_id);
790 let arn = format!(
791 "arn:aws:states:{}:{}:activity:{}",
792 state.region, state.account_id, name
793 );
794 if state.activities.contains_key(&arn) {
795 return Err(AwsServiceError::aws_error(
796 StatusCode::BAD_REQUEST,
797 "ActivityAlreadyExists",
798 format!("Activity already exists: {arn}"),
799 ));
800 }
801 let activity = crate::state::Activity {
802 name: name.to_string(),
803 arn: arn.clone(),
804 creation_date: chrono::Utc::now(),
805 tags: HashMap::new(),
806 };
807 state.activities.insert(arn.clone(), activity.clone());
808 Ok(AwsResponse::ok_json(json!({
809 "activityArn": arn,
810 "creationDate": activity.creation_date.timestamp(),
811 })))
812 }
813
814 fn delete_activity(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
815 let body = req.json_body();
816 let arn = body["activityArn"]
817 .as_str()
818 .ok_or_else(|| missing("activityArn"))?
819 .to_string();
820 let mut accounts = self.state.write();
821 let state = accounts.get_or_create(&req.account_id);
822 state.activities.remove(&arn);
823 Ok(AwsResponse::ok_json(json!({})))
824 }
825
826 fn describe_activity(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
827 let body = req.json_body();
828 let arn = body["activityArn"]
829 .as_str()
830 .ok_or_else(|| missing("activityArn"))?
831 .to_string();
832 let accounts = self.state.read();
833 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
834 let state = accounts.get(&req.account_id).unwrap_or(&empty);
835 let a = state.activities.get(&arn).ok_or_else(|| {
836 AwsServiceError::aws_error(
837 StatusCode::BAD_REQUEST,
838 "ActivityDoesNotExist",
839 format!("Activity does not exist: {arn}"),
840 )
841 })?;
842 Ok(AwsResponse::ok_json(json!({
843 "activityArn": a.arn,
844 "name": a.name,
845 "creationDate": a.creation_date.timestamp(),
846 })))
847 }
848
849 fn list_activities(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
850 let accounts = self.state.read();
851 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
852 let state = accounts.get(&req.account_id).unwrap_or(&empty);
853 let mut activities: Vec<&crate::state::Activity> = state.activities.values().collect();
854 activities.sort_by(|a, b| a.name.cmp(&b.name));
855 let body = json!({
856 "activities": activities.iter().map(|a| json!({
857 "activityArn": a.arn,
858 "name": a.name,
859 "creationDate": a.creation_date.timestamp(),
860 })).collect::<Vec<_>>(),
861 });
862 Ok(AwsResponse::ok_json(body))
863 }
864
865 async fn get_activity_task(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
866 let body = req.json_body();
867 let arn = body["activityArn"]
868 .as_str()
869 .ok_or_else(|| missing("activityArn"))?
870 .to_string();
871 {
873 let accounts = self.state.read();
874 let state = accounts
875 .get(&req.account_id)
876 .ok_or_else(|| activity_not_found(&arn))?;
877 if !state.activities.contains_key(&arn) {
878 return Err(activity_not_found(&arn));
879 }
880 }
881
882 let max_wait_secs: u64 = std::env::var("FAKECLOUD_SFN_GET_ACTIVITY_TIMEOUT_SECS")
885 .ok()
886 .and_then(|s| s.parse().ok())
887 .unwrap_or(5);
888 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(max_wait_secs);
889
890 loop {
891 {
893 let mut accounts = self.state.write();
894 let state = accounts.get_or_create(&req.account_id);
895 let mut candidates: Vec<(String, chrono::DateTime<chrono::Utc>)> = state
896 .task_tokens
897 .iter()
898 .filter(|(_, t)| t.activity_arn == arn && t.status == "PENDING")
899 .map(|(k, t)| (k.clone(), t.created_at))
900 .collect();
901 candidates.sort_by_key(|c| c.1);
902 if let Some((token, _)) = candidates.into_iter().next() {
903 let now = chrono::Utc::now();
904 let entry = state.task_tokens.get_mut(&token).expect("just looked up");
905 entry.status = "IN_PROGRESS".to_string();
906 entry.last_heartbeat_at = Some(now);
907 let input = entry.input.clone().unwrap_or_else(|| "{}".to_string());
908 return Ok(AwsResponse::ok_json(json!({
909 "taskToken": token,
910 "input": input,
911 })));
912 }
913 }
914 if std::time::Instant::now() >= deadline {
915 return Ok(AwsResponse::ok_json(json!({
918 "taskToken": "",
919 "input": "",
920 })));
921 }
922 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
923 }
924 }
925
926 fn send_task_success(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
927 self.update_task_token(req, "SUCCEEDED")
928 }
929
930 fn send_task_failure(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
931 self.update_task_token(req, "FAILED")
932 }
933
934 fn send_task_heartbeat(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
935 let body = req.json_body();
940 let token = body["taskToken"]
941 .as_str()
942 .ok_or_else(|| missing("taskToken"))?
943 .to_string();
944 let mut accounts = self.state.write();
945 let state = accounts.get_or_create(&req.account_id);
946 let entry = state
947 .task_tokens
948 .get_mut(&token)
949 .ok_or_else(|| task_does_not_exist(&token))?;
950 entry.last_heartbeat_at = Some(chrono::Utc::now());
951 Ok(AwsResponse::ok_json(json!({})))
952 }
953
954 fn update_task_token(
955 &self,
956 req: &AwsRequest,
957 new_status: &str,
958 ) -> Result<AwsResponse, AwsServiceError> {
959 let body = req.json_body();
960 let token = body["taskToken"]
961 .as_str()
962 .ok_or_else(|| missing("taskToken"))?
963 .to_string();
964 let mut accounts = self.state.write();
965 let state = accounts.get_or_create(&req.account_id);
966 let entry = state
967 .task_tokens
968 .get_mut(&token)
969 .ok_or_else(|| task_does_not_exist(&token))?;
970 entry.status = new_status.to_string();
971 if new_status == "SUCCEEDED" {
972 entry.output = body["output"].as_str().map(String::from);
973 } else if new_status == "FAILED" {
974 entry.error = body["error"].as_str().map(String::from);
975 entry.cause = body["cause"].as_str().map(String::from);
976 }
977 Ok(AwsResponse::ok_json(json!({})))
978 }
979
980 fn publish_state_machine_version(
983 &self,
984 req: &AwsRequest,
985 ) -> Result<AwsResponse, AwsServiceError> {
986 let body = req.json_body();
987 let arn = body["stateMachineArn"]
988 .as_str()
989 .ok_or_else(|| missing("stateMachineArn"))?
990 .to_string();
991 let description = body["description"].as_str().unwrap_or("").to_string();
992 let mut accounts = self.state.write();
993 let state = accounts.get_or_create(&req.account_id);
994 if !state.state_machines.contains_key(&arn) {
995 return Err(state_machine_not_found(&arn));
996 }
997 let version = state
998 .state_machine_versions
999 .values()
1000 .filter(|v| v.state_machine_arn == arn)
1001 .map(|v| v.version)
1002 .max()
1003 .unwrap_or(0)
1004 + 1;
1005 let version_arn = format!("{arn}:{version}");
1006 let v = crate::state::StateMachineVersion {
1007 state_machine_arn: arn,
1008 version,
1009 revision_id: format!("rev-{version}"),
1010 description,
1011 creation_date: chrono::Utc::now(),
1012 };
1013 state
1014 .state_machine_versions
1015 .insert(version_arn.clone(), v.clone());
1016 Ok(AwsResponse::ok_json(json!({
1017 "stateMachineVersionArn": version_arn,
1018 "creationDate": v.creation_date.timestamp(),
1019 })))
1020 }
1021
1022 fn delete_state_machine_version(
1023 &self,
1024 req: &AwsRequest,
1025 ) -> Result<AwsResponse, AwsServiceError> {
1026 let body = req.json_body();
1027 let arn = body["stateMachineVersionArn"]
1028 .as_str()
1029 .ok_or_else(|| missing("stateMachineVersionArn"))?
1030 .to_string();
1031 let mut accounts = self.state.write();
1032 let state = accounts.get_or_create(&req.account_id);
1033 state.state_machine_versions.remove(&arn);
1034 Ok(AwsResponse::ok_json(json!({})))
1035 }
1036
1037 fn list_state_machine_versions(
1038 &self,
1039 req: &AwsRequest,
1040 ) -> Result<AwsResponse, AwsServiceError> {
1041 let body = req.json_body();
1042 let arn = body["stateMachineArn"]
1043 .as_str()
1044 .ok_or_else(|| missing("stateMachineArn"))?
1045 .to_string();
1046 let accounts = self.state.read();
1047 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1048 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1049 let mut versions: Vec<&crate::state::StateMachineVersion> = state
1050 .state_machine_versions
1051 .values()
1052 .filter(|v| v.state_machine_arn == arn)
1053 .collect();
1054 versions.sort_by_key(|v| std::cmp::Reverse(v.version));
1055 let resp = json!({
1056 "stateMachineVersions": versions.iter().map(|v| json!({
1057 "stateMachineVersionArn": format!("{}:{}", v.state_machine_arn, v.version),
1058 "creationDate": v.creation_date.timestamp(),
1059 })).collect::<Vec<_>>(),
1060 });
1061 Ok(AwsResponse::ok_json(resp))
1062 }
1063
1064 fn create_state_machine_alias(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1065 let body = req.json_body();
1066 let name = body["name"]
1067 .as_str()
1068 .ok_or_else(|| missing("name"))?
1069 .to_string();
1070 validate_name(&name)?;
1071 let routing_cfg = body["routingConfiguration"]
1072 .as_array()
1073 .ok_or_else(|| missing("routingConfiguration"))?;
1074 let routes = parse_routing_configuration(routing_cfg)?;
1075 let parent_arn = routes[0]
1076 .state_machine_version_arn
1077 .rsplit_once(':')
1078 .map(|(parent, _)| parent.to_string())
1079 .unwrap_or_default();
1080 let alias_arn = format!("{parent_arn}:{name}");
1081 let now = chrono::Utc::now();
1082 let alias = crate::state::StateMachineAlias {
1083 name,
1084 arn: alias_arn.clone(),
1085 description: body["description"].as_str().unwrap_or("").to_string(),
1086 routing_configuration: routes,
1087 creation_date: now,
1088 update_date: now,
1089 };
1090 let mut accounts = self.state.write();
1091 let state = accounts.get_or_create(&req.account_id);
1092 state.state_machine_aliases.insert(alias_arn.clone(), alias);
1093 Ok(AwsResponse::ok_json(json!({
1094 "stateMachineAliasArn": alias_arn,
1095 "creationDate": now.timestamp(),
1096 })))
1097 }
1098
1099 fn delete_state_machine_alias(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1100 let body = req.json_body();
1101 let arn = body["stateMachineAliasArn"]
1102 .as_str()
1103 .ok_or_else(|| missing("stateMachineAliasArn"))?
1104 .to_string();
1105 let mut accounts = self.state.write();
1106 let state = accounts.get_or_create(&req.account_id);
1107 state.state_machine_aliases.remove(&arn);
1108 Ok(AwsResponse::ok_json(json!({})))
1109 }
1110
1111 fn describe_state_machine_alias(
1112 &self,
1113 req: &AwsRequest,
1114 ) -> Result<AwsResponse, AwsServiceError> {
1115 let body = req.json_body();
1116 let arn = body["stateMachineAliasArn"]
1117 .as_str()
1118 .ok_or_else(|| missing("stateMachineAliasArn"))?
1119 .to_string();
1120 let accounts = self.state.read();
1121 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1122 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1123 let alias = state
1124 .state_machine_aliases
1125 .get(&arn)
1126 .ok_or_else(|| resource_not_found(&arn))?;
1127 Ok(AwsResponse::ok_json(state_machine_alias_to_json(alias)))
1128 }
1129
1130 fn list_state_machine_aliases(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1131 let body = req.json_body();
1132 let parent = body["stateMachineArn"]
1133 .as_str()
1134 .ok_or_else(|| missing("stateMachineArn"))?
1135 .to_string();
1136 let accounts = self.state.read();
1137 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1138 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1139 let parent_prefix = format!("{parent}:");
1142 let mut aliases: Vec<&crate::state::StateMachineAlias> = state
1143 .state_machine_aliases
1144 .values()
1145 .filter(|a| a.arn.starts_with(&parent_prefix))
1146 .collect();
1147 aliases.sort_by(|a, b| a.name.cmp(&b.name));
1148 Ok(AwsResponse::ok_json(json!({
1149 "stateMachineAliases": aliases.iter().map(|a| json!({
1150 "stateMachineAliasArn": a.arn,
1151 "creationDate": a.creation_date.timestamp(),
1152 })).collect::<Vec<_>>(),
1153 })))
1154 }
1155
1156 fn update_state_machine_alias(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1157 let body = req.json_body();
1158 let arn = body["stateMachineAliasArn"]
1159 .as_str()
1160 .ok_or_else(|| missing("stateMachineAliasArn"))?
1161 .to_string();
1162 let mut accounts = self.state.write();
1163 let state = accounts.get_or_create(&req.account_id);
1164 let alias = state
1165 .state_machine_aliases
1166 .get_mut(&arn)
1167 .ok_or_else(|| resource_not_found(&arn))?;
1168 if let Some(d) = body["description"].as_str() {
1169 alias.description = d.to_string();
1170 }
1171 if let Some(routes) = body["routingConfiguration"].as_array() {
1172 alias.routing_configuration = parse_routing_configuration(routes)?;
1173 }
1174 alias.update_date = chrono::Utc::now();
1175 Ok(AwsResponse::ok_json(json!({
1176 "updateDate": alias.update_date.timestamp(),
1177 })))
1178 }
1179
1180 fn describe_map_run(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1183 let body = req.json_body();
1184 let arn = body["mapRunArn"]
1185 .as_str()
1186 .ok_or_else(|| missing("mapRunArn"))?
1187 .to_string();
1188 let accounts = self.state.read();
1189 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1190 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1191 let mr = state
1192 .map_runs
1193 .get(&arn)
1194 .ok_or_else(|| resource_not_found(&arn))?;
1195 Ok(AwsResponse::ok_json(map_run_to_json(mr)))
1196 }
1197
1198 fn list_map_runs(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1199 let body = req.json_body();
1200 let exec_arn = body["executionArn"].as_str().map(String::from);
1201 let accounts = self.state.read();
1202 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1203 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1204 let runs: Vec<&crate::state::MapRun> = state
1205 .map_runs
1206 .values()
1207 .filter(|r| exec_arn.as_deref().is_none_or(|e| r.execution_arn == e))
1208 .collect();
1209 Ok(AwsResponse::ok_json(json!({
1210 "mapRuns": runs.iter().map(|r| json!({
1211 "mapRunArn": r.map_run_arn,
1212 "executionArn": r.execution_arn,
1213 "stateMachineArn": "",
1214 "startDate": r.start_date.timestamp(),
1215 "stopDate": r.stop_date.map(|d| d.timestamp()),
1216 })).collect::<Vec<_>>(),
1217 })))
1218 }
1219
1220 fn update_map_run(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1221 let body = req.json_body();
1222 let arn = body["mapRunArn"]
1223 .as_str()
1224 .ok_or_else(|| missing("mapRunArn"))?
1225 .to_string();
1226 let mut accounts = self.state.write();
1227 let state = accounts.get_or_create(&req.account_id);
1228 let mr = state
1229 .map_runs
1230 .get_mut(&arn)
1231 .ok_or_else(|| resource_not_found(&arn))?;
1232 if let Some(c) = body["maxConcurrency"].as_i64() {
1233 mr.max_concurrency = c as i32;
1234 }
1235 if let Some(p) = body["toleratedFailurePercentage"].as_f64() {
1236 mr.tolerated_failure_percentage = p;
1237 }
1238 if let Some(c) = body["toleratedFailureCount"].as_i64() {
1239 mr.tolerated_failure_count = c;
1240 }
1241 Ok(AwsResponse::ok_json(json!({})))
1242 }
1243
1244 fn redrive_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1247 let body = req.json_body();
1248 let arn = body["executionArn"]
1249 .as_str()
1250 .ok_or_else(|| missing("executionArn"))?
1251 .to_string();
1252 let mut accounts = self.state.write();
1253 let state = accounts.get_or_create(&req.account_id);
1254 let exec = state.executions.get_mut(&arn).ok_or_else(|| {
1255 AwsServiceError::aws_error(
1256 StatusCode::BAD_REQUEST,
1257 "ExecutionDoesNotExist",
1258 format!("Execution does not exist: {arn}"),
1259 )
1260 })?;
1261 exec.status = crate::state::ExecutionStatus::Running;
1262 exec.stop_date = None;
1263 Ok(AwsResponse::ok_json(json!({
1264 "redriveDate": chrono::Utc::now().timestamp(),
1265 })))
1266 }
1267
1268 fn start_sync_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1269 let body = req.json_body();
1270 let sm_arn = body["stateMachineArn"]
1271 .as_str()
1272 .ok_or_else(|| missing("stateMachineArn"))?
1273 .to_string();
1274 let input = body["input"].as_str().unwrap_or("{}").to_string();
1275 if serde_json::from_str::<serde_json::Value>(&input).is_err() {
1276 return Err(AwsServiceError::aws_error(
1277 StatusCode::BAD_REQUEST,
1278 "InvalidExecutionInput",
1279 "Execution input is not valid JSON.",
1280 ));
1281 }
1282 let mut accounts = self.state.write();
1283 let state = accounts.get_or_create(&req.account_id);
1284 let sm = state
1285 .state_machines
1286 .get(&sm_arn)
1287 .ok_or_else(|| state_machine_not_found(&sm_arn))?;
1288 if sm.machine_type != crate::state::StateMachineType::Express {
1289 return Err(AwsServiceError::aws_error(
1290 StatusCode::BAD_REQUEST,
1291 "StateMachineTypeNotSupported",
1292 "StartSyncExecution is only supported for EXPRESS state machines.",
1293 ));
1294 }
1295 let now = chrono::Utc::now();
1296 let exec_arn = format!(
1297 "arn:aws:states:{}:{}:express:{}:sync-{}",
1298 state.region,
1299 state.account_id,
1300 sm.name,
1301 now.timestamp_millis()
1302 );
1303 Ok(AwsResponse::ok_json(json!({
1304 "executionArn": exec_arn,
1305 "stateMachineArn": sm_arn,
1306 "name": "sync",
1307 "startDate": now.timestamp(),
1308 "stopDate": now.timestamp(),
1309 "status": "SUCCEEDED",
1310 "input": input,
1311 "output": "{}",
1312 "billingDetails": {
1313 "billedMemoryUsedInMB": 64,
1314 "billedDurationInMilliseconds": 1,
1315 },
1316 })))
1317 }
1318
1319 fn test_state(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1320 let body = req.json_body();
1321 let definition = body["definition"]
1322 .as_str()
1323 .ok_or_else(|| missing("definition"))?;
1324 validate_definition(definition)?;
1325 let _role_arn = body["roleArn"].as_str().ok_or_else(|| missing("roleArn"))?;
1326 let input = body["input"].as_str().unwrap_or("{}").to_string();
1327 Ok(AwsResponse::ok_json(json!({
1331 "output": input,
1332 "status": "SUCCEEDED",
1333 "nextState": "End",
1334 })))
1335 }
1336
1337 fn validate_state_machine_definition(
1338 &self,
1339 req: &AwsRequest,
1340 ) -> Result<AwsResponse, AwsServiceError> {
1341 let body = req.json_body();
1342 let definition = body["definition"]
1343 .as_str()
1344 .ok_or_else(|| missing("definition"))?;
1345 match validate_definition(definition) {
1346 Ok(()) => Ok(AwsResponse::ok_json(json!({
1347 "result": "OK",
1348 "diagnostics": [],
1349 }))),
1350 Err(e) => Ok(AwsResponse::ok_json(json!({
1351 "result": "FAIL",
1352 "diagnostics": [{
1353 "severity": "ERROR",
1354 "code": "INVALID_DEFINITION",
1355 "message": e.to_string(),
1356 }],
1357 }))),
1358 }
1359 }
1360}
1361
1362fn state_machine_alias_to_json(alias: &crate::state::StateMachineAlias) -> Value {
1363 json!({
1364 "stateMachineAliasArn": alias.arn,
1365 "name": alias.name,
1366 "description": alias.description,
1367 "routingConfiguration": alias.routing_configuration.iter().map(|r| json!({
1368 "stateMachineVersionArn": r.state_machine_version_arn,
1369 "weight": r.weight,
1370 })).collect::<Vec<_>>(),
1371 "creationDate": alias.creation_date.timestamp(),
1372 "updateDate": alias.update_date.timestamp(),
1373 })
1374}
1375
1376fn map_run_to_json(mr: &crate::state::MapRun) -> Value {
1377 json!({
1378 "mapRunArn": mr.map_run_arn,
1379 "executionArn": mr.execution_arn,
1380 "maxConcurrency": mr.max_concurrency,
1381 "toleratedFailurePercentage": mr.tolerated_failure_percentage,
1382 "toleratedFailureCount": mr.tolerated_failure_count,
1383 "status": mr.status,
1384 "startDate": mr.start_date.timestamp(),
1385 "stopDate": mr.stop_date.map(|d| d.timestamp()),
1386 })
1387}
1388
1389fn state_machine_to_json(sm: &StateMachine) -> Value {
1392 let mut resp = json!({
1393 "name": sm.name,
1394 "stateMachineArn": sm.arn,
1395 "definition": sm.definition,
1396 "roleArn": sm.role_arn,
1397 "type": sm.machine_type.as_str(),
1398 "status": sm.status.as_str(),
1399 "creationDate": sm.creation_date.timestamp() as f64,
1400 "updateDate": sm.update_date.timestamp() as f64,
1401 "revisionId": sm.revision_id,
1402 "label": sm.name,
1403 });
1404
1405 if !sm.description.is_empty() {
1406 resp["description"] = json!(sm.description);
1407 }
1408
1409 if let Some(ref logging) = sm.logging_configuration {
1410 resp["loggingConfiguration"] = logging.clone();
1411 } else {
1412 resp["loggingConfiguration"] = json!({
1413 "level": "OFF",
1414 "includeExecutionData": false,
1415 "destinations": [],
1416 });
1417 }
1418
1419 if let Some(ref tracing) = sm.tracing_configuration {
1420 resp["tracingConfiguration"] = tracing.clone();
1421 } else {
1422 resp["tracingConfiguration"] = json!({
1423 "enabled": false,
1424 });
1425 }
1426
1427 resp
1428}
1429
1430fn missing(name: &str) -> AwsServiceError {
1431 AwsServiceError::aws_error(
1432 StatusCode::BAD_REQUEST,
1433 "ValidationException",
1434 format!("The request must contain the parameter {name}."),
1435 )
1436}
1437
1438fn state_machine_not_found(arn: &str) -> AwsServiceError {
1439 AwsServiceError::aws_error(
1440 StatusCode::BAD_REQUEST,
1441 "StateMachineDoesNotExist",
1442 format!("State Machine Does Not Exist: '{arn}'"),
1443 )
1444}
1445
1446fn activity_not_found(arn: &str) -> AwsServiceError {
1447 AwsServiceError::aws_error(
1448 StatusCode::BAD_REQUEST,
1449 "ActivityDoesNotExist",
1450 format!("Activity does not exist: {arn}"),
1451 )
1452}
1453
1454fn task_does_not_exist(token: &str) -> AwsServiceError {
1455 AwsServiceError::aws_error(
1456 StatusCode::BAD_REQUEST,
1457 "TaskDoesNotExist",
1458 format!("Task does not exist: {token}"),
1459 )
1460}
1461
1462fn resource_not_found(arn: &str) -> AwsServiceError {
1463 AwsServiceError::aws_error(
1464 StatusCode::BAD_REQUEST,
1465 "ResourceNotFound",
1466 format!("Resource not found: '{arn}'"),
1467 )
1468}
1469
1470fn parse_routing_configuration(
1475 routes: &[serde_json::Value],
1476) -> Result<Vec<crate::state::AliasRoute>, AwsServiceError> {
1477 if routes.is_empty() || routes.len() > 2 {
1478 return Err(AwsServiceError::aws_error(
1479 StatusCode::BAD_REQUEST,
1480 "ValidationException",
1481 "routingConfiguration must contain 1 or 2 routes.",
1482 ));
1483 }
1484 let parsed: Vec<crate::state::AliasRoute> = routes
1485 .iter()
1486 .map(|r| {
1487 let arn = r["stateMachineVersionArn"].as_str().ok_or_else(|| {
1488 AwsServiceError::aws_error(
1489 StatusCode::BAD_REQUEST,
1490 "ValidationException",
1491 "routingConfiguration entries must contain stateMachineVersionArn.",
1492 )
1493 })?;
1494 let weight = r["weight"].as_i64().ok_or_else(|| {
1495 AwsServiceError::aws_error(
1496 StatusCode::BAD_REQUEST,
1497 "ValidationException",
1498 "routingConfiguration entries must contain a numeric weight.",
1499 )
1500 })?;
1501 if !(0..=100).contains(&weight) {
1502 return Err(AwsServiceError::aws_error(
1503 StatusCode::BAD_REQUEST,
1504 "ValidationException",
1505 format!("Invalid routing weight {weight}; must be 0-100."),
1506 ));
1507 }
1508 Ok(crate::state::AliasRoute {
1509 state_machine_version_arn: arn.to_string(),
1510 weight: weight as i32,
1511 })
1512 })
1513 .collect::<Result<_, _>>()?;
1514 let total: i32 = parsed.iter().map(|r| r.weight).sum();
1515 if total != 100 {
1516 return Err(AwsServiceError::aws_error(
1517 StatusCode::BAD_REQUEST,
1518 "ValidationException",
1519 format!("routingConfiguration weights must sum to 100, got {total}."),
1520 ));
1521 }
1522 Ok(parsed)
1523}
1524
1525fn validate_name(name: &str) -> Result<(), AwsServiceError> {
1526 if name.is_empty() || name.len() > 80 {
1527 return Err(AwsServiceError::aws_error(
1528 StatusCode::BAD_REQUEST,
1529 "InvalidName",
1530 format!("Invalid Name: '{name}' (length must be between 1 and 80 characters)"),
1531 ));
1532 }
1533 if !name
1535 .chars()
1536 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
1537 {
1538 return Err(AwsServiceError::aws_error(
1539 StatusCode::BAD_REQUEST,
1540 "InvalidName",
1541 format!(
1542 "Invalid Name: '{name}' (must only contain alphanumeric characters, hyphens, and underscores)"
1543 ),
1544 ));
1545 }
1546 Ok(())
1547}
1548
1549fn validate_definition(definition: &str) -> Result<(), AwsServiceError> {
1550 let parsed: Value = serde_json::from_str(definition).map_err(|e| {
1551 AwsServiceError::aws_error(
1552 StatusCode::BAD_REQUEST,
1553 "InvalidDefinition",
1554 format!("Invalid State Machine Definition: '{e}'"),
1555 )
1556 })?;
1557
1558 if parsed.get("StartAt").and_then(|v| v.as_str()).is_none() {
1559 return Err(AwsServiceError::aws_error(
1560 StatusCode::BAD_REQUEST,
1561 "InvalidDefinition",
1562 "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
1563 .to_string(),
1564 ));
1565 }
1566
1567 let states_obj = parsed
1568 .get("States")
1569 .and_then(|v| v.as_object())
1570 .ok_or_else(|| {
1571 AwsServiceError::aws_error(
1572 StatusCode::BAD_REQUEST,
1573 "InvalidDefinition",
1574 "Invalid State Machine Definition: 'MISSING_STATES' (States field is required)"
1575 .to_string(),
1576 )
1577 })?;
1578
1579 let start_at = parsed["StartAt"].as_str().ok_or_else(|| {
1580 AwsServiceError::aws_error(
1581 StatusCode::BAD_REQUEST,
1582 "InvalidDefinition",
1583 "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
1584 .to_string(),
1585 )
1586 })?;
1587 if !states_obj.contains_key(start_at) {
1588 return Err(AwsServiceError::aws_error(
1589 StatusCode::BAD_REQUEST,
1590 "InvalidDefinition",
1591 format!(
1592 "Invalid State Machine Definition: 'MISSING_TRANSITION_TARGET' \
1593 (StartAt '{start_at}' does not reference a valid state)"
1594 ),
1595 ));
1596 }
1597
1598 Ok(())
1599}
1600
1601fn execution_not_found(arn: &str) -> AwsServiceError {
1602 AwsServiceError::aws_error(
1603 StatusCode::BAD_REQUEST,
1604 "ExecutionDoesNotExist",
1605 format!("Execution Does Not Exist: '{arn}'"),
1606 )
1607}
1608
1609fn execution_to_json(exec: &Execution) -> Value {
1610 let mut resp = json!({
1611 "executionArn": exec.execution_arn,
1612 "stateMachineArn": exec.state_machine_arn,
1613 "name": exec.name,
1614 "status": exec.status.as_str(),
1615 "startDate": exec.start_date.timestamp() as f64,
1616 });
1617
1618 if let Some(ref input) = exec.input {
1619 resp["input"] = json!(input);
1620 }
1621 if let Some(ref output) = exec.output {
1622 resp["output"] = json!(output);
1623 }
1624 if let Some(stop) = exec.stop_date {
1625 resp["stopDate"] = json!(stop.timestamp() as f64);
1626 }
1627 if let Some(ref error) = exec.error {
1628 resp["error"] = json!(error);
1629 }
1630 if let Some(ref cause) = exec.cause {
1631 resp["cause"] = json!(cause);
1632 }
1633
1634 resp
1635}
1636
1637fn camel_to_details_key(event_type: &str) -> String {
1639 let mut chars = event_type.chars();
1640 match chars.next() {
1641 None => String::new(),
1642 Some(c) => c.to_lowercase().to_string() + chars.as_str(),
1643 }
1644}
1645
1646fn validate_arn(arn: &str) -> Result<(), AwsServiceError> {
1647 if !arn.starts_with("arn:") {
1648 return Err(AwsServiceError::aws_error(
1649 StatusCode::BAD_REQUEST,
1650 "InvalidArn",
1651 format!("Invalid Arn: '{arn}'"),
1652 ));
1653 }
1654 Ok(())
1655}
1656
1657pub fn start_execution_from_delivery(
1666 state: &SharedStepFunctionsState,
1667 delivery: &Option<Arc<DeliveryBus>>,
1668 dynamodb_state: &Option<SharedDynamoDbState>,
1669 state_machine_arn: &str,
1670 input: &str,
1671) {
1672 if serde_json::from_str::<serde_json::Value>(input).is_err() {
1674 tracing::warn!(
1675 state_machine_arn,
1676 "Step Functions delivery: invalid JSON input, skipping execution"
1677 );
1678 return;
1679 }
1680
1681 let execution_name = uuid::Uuid::new_v4().to_string();
1682
1683 let account_id = state_machine_arn
1685 .split(':')
1686 .nth(4)
1687 .unwrap_or("000000000000")
1688 .to_string();
1689
1690 let mut accounts = state.write();
1691 let st = accounts.get_or_create(&account_id);
1692 let sm = match st.state_machines.get(state_machine_arn) {
1693 Some(sm) => sm,
1694 None => {
1695 tracing::warn!(
1696 state_machine_arn,
1697 "Step Functions delivery: state machine not found"
1698 );
1699 return;
1700 }
1701 };
1702
1703 let sm_name = sm.name.clone();
1704 let definition = sm.definition.clone();
1705 let exec_arn = st.execution_arn(&sm_name, &execution_name);
1706
1707 let now = Utc::now();
1708 let execution = Execution {
1709 execution_arn: exec_arn.clone(),
1710 state_machine_arn: state_machine_arn.to_string(),
1711 state_machine_name: sm_name,
1712 name: execution_name,
1713 status: ExecutionStatus::Running,
1714 input: Some(input.to_string()),
1715 output: None,
1716 start_date: now,
1717 stop_date: None,
1718 error: None,
1719 cause: None,
1720 history_events: vec![],
1721 };
1722
1723 st.executions.insert(exec_arn.clone(), execution);
1724 drop(accounts);
1725
1726 let shared_state = state.clone();
1727 let delivery = delivery.clone();
1728 let dynamodb_state = dynamodb_state.clone();
1729 let input = Some(input.to_string());
1730 tokio::spawn(async move {
1731 interpreter::execute_state_machine(
1732 shared_state,
1733 exec_arn,
1734 definition,
1735 input,
1736 delivery,
1737 dynamodb_state,
1738 )
1739 .await;
1740 });
1741}
1742
1743#[cfg(test)]
1744mod tests {
1745 use super::*;
1746 use http::{HeaderMap, Method};
1747 use parking_lot::RwLock;
1748 use serde_json::Value;
1749 use std::sync::Arc;
1750
1751 fn make_state() -> SharedStepFunctionsState {
1752 Arc::new(RwLock::new(
1753 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
1754 ))
1755 }
1756
1757 fn make_request(action: &str, body: &str) -> AwsRequest {
1758 AwsRequest {
1759 service: "states".to_string(),
1760 action: action.to_string(),
1761 region: "us-east-1".to_string(),
1762 account_id: "123456789012".to_string(),
1763 request_id: "test-id".to_string(),
1764 headers: HeaderMap::new(),
1765 query_params: HashMap::new(),
1766 body: body.as_bytes().to_vec().into(),
1767 body_stream: parking_lot::Mutex::new(None),
1768 path_segments: vec![],
1769 raw_path: "/".to_string(),
1770 raw_query: String::new(),
1771 method: Method::POST,
1772 is_query_protocol: false,
1773 access_key_id: None,
1774 principal: None,
1775 }
1776 }
1777
1778 fn body_json(resp: &AwsResponse) -> Value {
1779 serde_json::from_slice(resp.body.expect_bytes()).unwrap()
1780 }
1781
1782 fn expect_err(result: Result<AwsResponse, AwsServiceError>) -> AwsServiceError {
1783 match result {
1784 Err(e) => e,
1785 Ok(_) => panic!("expected error, got Ok"),
1786 }
1787 }
1788
1789 const VALID_DEF: &str = r#"{"StartAt":"Pass","States":{"Pass":{"Type":"Pass","End":true}}}"#;
1790
1791 fn create_sm(svc: &StepFunctionsService, name: &str) -> String {
1792 let body = json!({
1793 "name": name,
1794 "definition": VALID_DEF,
1795 "roleArn": "arn:aws:iam::123456789012:role/test",
1796 });
1797 let req = make_request("CreateStateMachine", &body.to_string());
1798 let resp = svc.create_state_machine(&req).unwrap();
1799 let b = body_json(&resp);
1800 b["stateMachineArn"].as_str().unwrap().to_string()
1801 }
1802
1803 #[test]
1806 fn create_state_machine_basic() {
1807 let svc = StepFunctionsService::new(make_state());
1808 let arn = create_sm(&svc, "test-sm");
1809 assert!(arn.contains("test-sm"));
1810 }
1811
1812 #[test]
1813 fn create_state_machine_with_express_type() {
1814 let svc = StepFunctionsService::new(make_state());
1815 let body = json!({
1816 "name": "express-sm",
1817 "definition": VALID_DEF,
1818 "roleArn": "arn:aws:iam::123456789012:role/r",
1819 "type": "EXPRESS",
1820 });
1821 let req = make_request("CreateStateMachine", &body.to_string());
1822 let resp = svc.create_state_machine(&req).unwrap();
1823 let b = body_json(&resp);
1824 assert!(b["stateMachineArn"].as_str().is_some());
1825 }
1826
1827 #[test]
1828 fn create_state_machine_duplicate_fails() {
1829 let svc = StepFunctionsService::new(make_state());
1830 create_sm(&svc, "dup-sm");
1831 let body = json!({
1832 "name": "dup-sm",
1833 "definition": VALID_DEF,
1834 "roleArn": "arn:aws:iam::123456789012:role/r",
1835 });
1836 let req = make_request("CreateStateMachine", &body.to_string());
1837 let err = expect_err(svc.create_state_machine(&req));
1838 assert!(err.to_string().contains("StateMachineAlreadyExists"));
1839 }
1840
1841 #[test]
1842 fn create_state_machine_missing_name() {
1843 let svc = StepFunctionsService::new(make_state());
1844 let body = json!({
1845 "definition": VALID_DEF,
1846 "roleArn": "arn:aws:iam::123456789012:role/r",
1847 });
1848 let req = make_request("CreateStateMachine", &body.to_string());
1849 assert!(svc.create_state_machine(&req).is_err());
1850 }
1851
1852 #[test]
1853 fn create_state_machine_invalid_definition() {
1854 let svc = StepFunctionsService::new(make_state());
1855 let body = json!({
1856 "name": "bad-def",
1857 "definition": "not json",
1858 "roleArn": "arn:aws:iam::123456789012:role/r",
1859 });
1860 let req = make_request("CreateStateMachine", &body.to_string());
1861 let err = expect_err(svc.create_state_machine(&req));
1862 assert!(err.to_string().contains("InvalidDefinition"));
1863 }
1864
1865 #[test]
1866 fn create_state_machine_definition_missing_start_at() {
1867 let svc = StepFunctionsService::new(make_state());
1868 let body = json!({
1869 "name": "no-start",
1870 "definition": r#"{"States":{"S":{"Type":"Pass","End":true}}}"#,
1871 "roleArn": "arn:aws:iam::123456789012:role/r",
1872 });
1873 let req = make_request("CreateStateMachine", &body.to_string());
1874 let err = expect_err(svc.create_state_machine(&req));
1875 assert!(err.to_string().contains("InvalidDefinition"));
1876 }
1877
1878 #[test]
1879 fn create_state_machine_definition_missing_states() {
1880 let svc = StepFunctionsService::new(make_state());
1881 let body = json!({
1882 "name": "no-states",
1883 "definition": r#"{"StartAt":"S"}"#,
1884 "roleArn": "arn:aws:iam::123456789012:role/r",
1885 });
1886 let req = make_request("CreateStateMachine", &body.to_string());
1887 let err = expect_err(svc.create_state_machine(&req));
1888 assert!(err.to_string().contains("InvalidDefinition"));
1889 }
1890
1891 #[test]
1892 fn create_state_machine_definition_start_at_not_in_states() {
1893 let svc = StepFunctionsService::new(make_state());
1894 let body = json!({
1895 "name": "bad-start",
1896 "definition": r#"{"StartAt":"Missing","States":{"S":{"Type":"Pass","End":true}}}"#,
1897 "roleArn": "arn:aws:iam::123456789012:role/r",
1898 });
1899 let req = make_request("CreateStateMachine", &body.to_string());
1900 let err = expect_err(svc.create_state_machine(&req));
1901 assert!(err.to_string().contains("MISSING_TRANSITION_TARGET"));
1902 }
1903
1904 #[test]
1905 fn create_state_machine_invalid_type() {
1906 let svc = StepFunctionsService::new(make_state());
1907 let body = json!({
1908 "name": "bad-type",
1909 "definition": VALID_DEF,
1910 "roleArn": "arn:aws:iam::123456789012:role/r",
1911 "type": "INVALID",
1912 });
1913 let req = make_request("CreateStateMachine", &body.to_string());
1914 assert!(svc.create_state_machine(&req).is_err());
1915 }
1916
1917 #[test]
1918 fn create_state_machine_invalid_arn() {
1919 let svc = StepFunctionsService::new(make_state());
1920 let body = json!({
1921 "name": "bad-arn",
1922 "definition": VALID_DEF,
1923 "roleArn": "not-an-arn",
1924 });
1925 let req = make_request("CreateStateMachine", &body.to_string());
1926 let err = expect_err(svc.create_state_machine(&req));
1927 assert!(err.to_string().contains("InvalidArn"));
1928 }
1929
1930 #[test]
1931 fn create_state_machine_invalid_name() {
1932 let svc = StepFunctionsService::new(make_state());
1933 let body = json!({
1934 "name": "has spaces!",
1935 "definition": VALID_DEF,
1936 "roleArn": "arn:aws:iam::123456789012:role/r",
1937 });
1938 let req = make_request("CreateStateMachine", &body.to_string());
1939 let err = expect_err(svc.create_state_machine(&req));
1940 assert!(err.to_string().contains("InvalidName"));
1941 }
1942
1943 #[test]
1944 fn create_state_machine_name_too_long() {
1945 let svc = StepFunctionsService::new(make_state());
1946 let long_name = "a".repeat(81);
1947 let body = json!({
1948 "name": long_name,
1949 "definition": VALID_DEF,
1950 "roleArn": "arn:aws:iam::123456789012:role/r",
1951 });
1952 let req = make_request("CreateStateMachine", &body.to_string());
1953 let err = expect_err(svc.create_state_machine(&req));
1954 assert!(err.to_string().contains("InvalidName"));
1955 }
1956
1957 #[test]
1960 fn describe_state_machine_found() {
1961 let svc = StepFunctionsService::new(make_state());
1962 let arn = create_sm(&svc, "desc-sm");
1963
1964 let req = make_request(
1965 "DescribeStateMachine",
1966 &json!({"stateMachineArn": arn}).to_string(),
1967 );
1968 let resp = svc.describe_state_machine(&req).unwrap();
1969 let b = body_json(&resp);
1970 assert_eq!(b["name"], "desc-sm");
1971 assert_eq!(b["status"], "ACTIVE");
1972 assert!(b["definition"].as_str().is_some());
1973 }
1974
1975 #[test]
1976 fn describe_state_machine_not_found() {
1977 let svc = StepFunctionsService::new(make_state());
1978 let req = make_request(
1979 "DescribeStateMachine",
1980 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1981 .to_string(),
1982 );
1983 let err = expect_err(svc.describe_state_machine(&req));
1984 assert!(err.to_string().contains("StateMachineDoesNotExist"));
1985 }
1986
1987 #[test]
1990 fn list_state_machines_empty() {
1991 let svc = StepFunctionsService::new(make_state());
1992 let req = make_request("ListStateMachines", "{}");
1993 let resp = svc.list_state_machines(&req).unwrap();
1994 let b = body_json(&resp);
1995 assert!(b["stateMachines"].as_array().unwrap().is_empty());
1996 }
1997
1998 #[test]
1999 fn list_state_machines_returns_created() {
2000 let svc = StepFunctionsService::new(make_state());
2001 create_sm(&svc, "sm-1");
2002 create_sm(&svc, "sm-2");
2003
2004 let req = make_request("ListStateMachines", "{}");
2005 let resp = svc.list_state_machines(&req).unwrap();
2006 let b = body_json(&resp);
2007 assert_eq!(b["stateMachines"].as_array().unwrap().len(), 2);
2008 }
2009
2010 #[test]
2013 fn delete_state_machine() {
2014 let svc = StepFunctionsService::new(make_state());
2015 let arn = create_sm(&svc, "del-sm");
2016
2017 let req = make_request(
2018 "DeleteStateMachine",
2019 &json!({"stateMachineArn": arn}).to_string(),
2020 );
2021 svc.delete_state_machine(&req).unwrap();
2022
2023 let req = make_request(
2025 "DescribeStateMachine",
2026 &json!({"stateMachineArn": arn}).to_string(),
2027 );
2028 assert!(svc.describe_state_machine(&req).is_err());
2029 }
2030
2031 #[test]
2032 fn delete_state_machine_nonexistent_succeeds() {
2033 let svc = StepFunctionsService::new(make_state());
2034 let req = make_request(
2035 "DeleteStateMachine",
2036 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
2037 .to_string(),
2038 );
2039 svc.delete_state_machine(&req).unwrap();
2041 }
2042
2043 #[test]
2046 fn update_state_machine() {
2047 let svc = StepFunctionsService::new(make_state());
2048 let arn = create_sm(&svc, "upd-sm");
2049
2050 let new_def = r#"{"StartAt":"NewPass","States":{"NewPass":{"Type":"Pass","End":true}}}"#;
2051 let body = json!({
2052 "stateMachineArn": arn,
2053 "definition": new_def,
2054 "description": "updated",
2055 });
2056 let req = make_request("UpdateStateMachine", &body.to_string());
2057 let resp = svc.update_state_machine(&req).unwrap();
2058 let b = body_json(&resp);
2059 assert!(b["updateDate"].as_f64().is_some());
2060
2061 let req = make_request(
2063 "DescribeStateMachine",
2064 &json!({"stateMachineArn": arn}).to_string(),
2065 );
2066 let resp = svc.describe_state_machine(&req).unwrap();
2067 let b = body_json(&resp);
2068 assert!(b["definition"].as_str().unwrap().contains("NewPass"));
2069 assert_eq!(b["description"], "updated");
2070 }
2071
2072 #[test]
2073 fn update_state_machine_not_found() {
2074 let svc = StepFunctionsService::new(make_state());
2075 let body = json!({
2076 "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
2077 "definition": VALID_DEF,
2078 });
2079 let req = make_request("UpdateStateMachine", &body.to_string());
2080 let err = expect_err(svc.update_state_machine(&req));
2081 assert!(err.to_string().contains("StateMachineDoesNotExist"));
2082 }
2083
2084 #[tokio::test]
2087 async fn start_execution_basic() {
2088 let svc = StepFunctionsService::new(make_state());
2089 let arn = create_sm(&svc, "exec-sm");
2090
2091 let body = json!({
2092 "stateMachineArn": arn,
2093 "input": r#"{"key":"value"}"#,
2094 });
2095 let req = make_request("StartExecution", &body.to_string());
2096 let resp = svc.start_execution(&req).unwrap();
2097 let b = body_json(&resp);
2098 assert!(b["executionArn"].as_str().is_some());
2099 assert!(b["startDate"].as_f64().is_some());
2100 }
2101
2102 #[tokio::test]
2103 async fn start_execution_with_name() {
2104 let svc = StepFunctionsService::new(make_state());
2105 let arn = create_sm(&svc, "named-exec");
2106
2107 let body = json!({
2108 "stateMachineArn": arn,
2109 "name": "my-execution",
2110 });
2111 let req = make_request("StartExecution", &body.to_string());
2112 let resp = svc.start_execution(&req).unwrap();
2113 let b = body_json(&resp);
2114 assert!(b["executionArn"].as_str().unwrap().contains("my-execution"));
2115 }
2116
2117 #[tokio::test]
2118 async fn start_execution_sm_not_found() {
2119 let svc = StepFunctionsService::new(make_state());
2120 let body = json!({
2121 "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
2122 });
2123 let req = make_request("StartExecution", &body.to_string());
2124 let err = expect_err(svc.start_execution(&req));
2125 assert!(err.to_string().contains("StateMachineDoesNotExist"));
2126 }
2127
2128 #[tokio::test]
2129 async fn start_execution_invalid_input() {
2130 let svc = StepFunctionsService::new(make_state());
2131 let arn = create_sm(&svc, "bad-input");
2132
2133 let body = json!({
2134 "stateMachineArn": arn,
2135 "input": "not json",
2136 });
2137 let req = make_request("StartExecution", &body.to_string());
2138 let err = expect_err(svc.start_execution(&req));
2139 assert!(err.to_string().contains("InvalidExecutionInput"));
2140 }
2141
2142 #[tokio::test]
2143 async fn start_execution_duplicate_name() {
2144 let svc = StepFunctionsService::new(make_state());
2145 let arn = create_sm(&svc, "dup-exec");
2146
2147 let body = json!({
2148 "stateMachineArn": arn,
2149 "name": "same-name",
2150 });
2151 let req = make_request("StartExecution", &body.to_string());
2152 svc.start_execution(&req).unwrap();
2153
2154 let req = make_request("StartExecution", &body.to_string());
2155 let err = expect_err(svc.start_execution(&req));
2156 assert!(err.to_string().contains("ExecutionAlreadyExists"));
2157 }
2158
2159 #[tokio::test]
2162 async fn describe_execution_found() {
2163 let svc = StepFunctionsService::new(make_state());
2164 let sm_arn = create_sm(&svc, "desc-exec");
2165
2166 let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
2167 let req = make_request("StartExecution", &body.to_string());
2168 let resp = svc.start_execution(&req).unwrap();
2169 let exec_arn = body_json(&resp)["executionArn"]
2170 .as_str()
2171 .unwrap()
2172 .to_string();
2173
2174 let req = make_request(
2175 "DescribeExecution",
2176 &json!({"executionArn": exec_arn}).to_string(),
2177 );
2178 let resp = svc.describe_execution(&req).unwrap();
2179 let b = body_json(&resp);
2180 assert_eq!(b["name"], "e1");
2181 assert_eq!(b["status"], "RUNNING");
2182 }
2183
2184 #[tokio::test]
2185 async fn describe_execution_not_found() {
2186 let svc = StepFunctionsService::new(make_state());
2187 let req = make_request(
2188 "DescribeExecution",
2189 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
2190 .to_string(),
2191 );
2192 let err = expect_err(svc.describe_execution(&req));
2193 assert!(err.to_string().contains("ExecutionDoesNotExist"));
2194 }
2195
2196 #[tokio::test]
2199 async fn stop_execution() {
2200 let svc = StepFunctionsService::new(make_state());
2201 let sm_arn = create_sm(&svc, "stop-sm");
2202
2203 let body = json!({"stateMachineArn": sm_arn, "name": "stop-e"});
2204 let req = make_request("StartExecution", &body.to_string());
2205 let resp = svc.start_execution(&req).unwrap();
2206 let exec_arn = body_json(&resp)["executionArn"]
2207 .as_str()
2208 .unwrap()
2209 .to_string();
2210
2211 let body = json!({
2212 "executionArn": exec_arn,
2213 "error": "UserAborted",
2214 "cause": "test stop",
2215 });
2216 let req = make_request("StopExecution", &body.to_string());
2217 let resp = svc.stop_execution(&req).unwrap();
2218 let b = body_json(&resp);
2219 assert!(b["stopDate"].as_f64().is_some());
2220
2221 let req = make_request(
2223 "DescribeExecution",
2224 &json!({"executionArn": exec_arn}).to_string(),
2225 );
2226 let resp = svc.describe_execution(&req).unwrap();
2227 let b = body_json(&resp);
2228 assert_eq!(b["status"], "ABORTED");
2229 assert_eq!(b["error"], "UserAborted");
2230 }
2231
2232 #[tokio::test]
2233 async fn stop_execution_not_found() {
2234 let svc = StepFunctionsService::new(make_state());
2235 let req = make_request(
2236 "StopExecution",
2237 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
2238 .to_string(),
2239 );
2240 let err = expect_err(svc.stop_execution(&req));
2241 assert!(err.to_string().contains("ExecutionDoesNotExist"));
2242 }
2243
2244 #[tokio::test]
2247 async fn list_executions() {
2248 let svc = StepFunctionsService::new(make_state());
2249 let sm_arn = create_sm(&svc, "list-exec");
2250
2251 for i in 0..3 {
2252 let body = json!({"stateMachineArn": sm_arn, "name": format!("e{i}")});
2253 let req = make_request("StartExecution", &body.to_string());
2254 svc.start_execution(&req).unwrap();
2255 }
2256
2257 let req = make_request(
2258 "ListExecutions",
2259 &json!({"stateMachineArn": sm_arn}).to_string(),
2260 );
2261 let resp = svc.list_executions(&req).unwrap();
2262 let b = body_json(&resp);
2263 assert_eq!(b["executions"].as_array().unwrap().len(), 3);
2264 }
2265
2266 #[tokio::test]
2267 async fn list_executions_sm_not_found() {
2268 let svc = StepFunctionsService::new(make_state());
2269 let req = make_request(
2270 "ListExecutions",
2271 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
2272 .to_string(),
2273 );
2274 let err = expect_err(svc.list_executions(&req));
2275 assert!(err.to_string().contains("StateMachineDoesNotExist"));
2276 }
2277
2278 #[tokio::test]
2281 async fn get_execution_history_not_found() {
2282 let svc = StepFunctionsService::new(make_state());
2283 let req = make_request(
2284 "GetExecutionHistory",
2285 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
2286 .to_string(),
2287 );
2288 let err = expect_err(svc.get_execution_history(&req));
2289 assert!(err.to_string().contains("ExecutionDoesNotExist"));
2290 }
2291
2292 #[tokio::test]
2295 async fn describe_sm_for_execution() {
2296 let svc = StepFunctionsService::new(make_state());
2297 let sm_arn = create_sm(&svc, "sm-for-exec");
2298
2299 let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
2300 let req = make_request("StartExecution", &body.to_string());
2301 let resp = svc.start_execution(&req).unwrap();
2302 let exec_arn = body_json(&resp)["executionArn"]
2303 .as_str()
2304 .unwrap()
2305 .to_string();
2306
2307 let req = make_request(
2308 "DescribeStateMachineForExecution",
2309 &json!({"executionArn": exec_arn}).to_string(),
2310 );
2311 let resp = svc.describe_state_machine_for_execution(&req).unwrap();
2312 let b = body_json(&resp);
2313 assert_eq!(b["name"], "sm-for-exec");
2314 }
2315
2316 #[test]
2319 fn tag_untag_list_tags() {
2320 let svc = StepFunctionsService::new(make_state());
2321 let arn = create_sm(&svc, "tagged-sm");
2322
2323 let body = json!({
2325 "resourceArn": arn,
2326 "tags": [{"key": "env", "value": "prod"}],
2327 });
2328 let req = make_request("TagResource", &body.to_string());
2329 svc.tag_resource(&req).unwrap();
2330
2331 let req = make_request(
2333 "ListTagsForResource",
2334 &json!({"resourceArn": arn}).to_string(),
2335 );
2336 let resp = svc.list_tags_for_resource(&req).unwrap();
2337 let b = body_json(&resp);
2338 let tags = b["tags"].as_array().unwrap();
2339 assert_eq!(tags.len(), 1);
2340 assert_eq!(tags[0]["key"], "env");
2341
2342 let body = json!({
2344 "resourceArn": arn,
2345 "tagKeys": ["env"],
2346 });
2347 let req = make_request("UntagResource", &body.to_string());
2348 svc.untag_resource(&req).unwrap();
2349
2350 let req = make_request(
2352 "ListTagsForResource",
2353 &json!({"resourceArn": arn}).to_string(),
2354 );
2355 let resp = svc.list_tags_for_resource(&req).unwrap();
2356 let b = body_json(&resp);
2357 assert!(b["tags"].as_array().unwrap().is_empty());
2358 }
2359
2360 #[test]
2361 fn tag_resource_not_found() {
2362 let svc = StepFunctionsService::new(make_state());
2363 let body = json!({
2364 "resourceArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
2365 "tags": [{"key": "k", "value": "v"}],
2366 });
2367 let req = make_request("TagResource", &body.to_string());
2368 let err = expect_err(svc.tag_resource(&req));
2369 assert!(err.to_string().contains("ResourceNotFound"));
2370 }
2371
2372 #[test]
2375 fn test_validate_name() {
2376 assert!(validate_name("valid-name").is_ok());
2377 assert!(validate_name("under_score").is_ok());
2378 assert!(validate_name("").is_err());
2379 assert!(validate_name("has spaces").is_err());
2380 assert!(validate_name(&"a".repeat(81)).is_err());
2381 }
2382
2383 #[test]
2384 fn test_validate_definition() {
2385 assert!(validate_definition(VALID_DEF).is_ok());
2386 assert!(validate_definition("not json").is_err());
2387 assert!(validate_definition(r#"{"States":{}}"#).is_err()); assert!(validate_definition(r#"{"StartAt":"S"}"#).is_err()); }
2390
2391 #[test]
2392 fn test_validate_arn() {
2393 assert!(validate_arn("arn:aws:states:us-east-1:123:sm:test").is_ok());
2394 assert!(validate_arn("not-an-arn").is_err());
2395 }
2396
2397 #[test]
2398 fn test_camel_to_details_key() {
2399 assert_eq!(camel_to_details_key("PassStateEntered"), "passStateEntered");
2400 assert_eq!(camel_to_details_key(""), "");
2401 }
2402
2403 #[test]
2404 fn test_is_mutating_action() {
2405 assert!(is_mutating_action("CreateStateMachine"));
2406 assert!(is_mutating_action("StartExecution"));
2407 assert!(!is_mutating_action("DescribeStateMachine"));
2408 assert!(!is_mutating_action("ListStateMachines"));
2409 }
2410}