1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use chrono::Utc;
6use http::StatusCode;
7use serde_json::{json, Value};
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::pagination::paginate;
12use fakecloud_core::registry::ServiceRegistry;
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_core::validation::*;
15use fakecloud_dynamodb::SharedDynamoDbState;
16use fakecloud_persistence::SnapshotStore;
17
18use crate::interpreter;
19use crate::state::{
20 Execution, ExecutionStatus, SharedStepFunctionsState, StateMachine, StateMachineStatus,
21 StateMachineType, StepFunctionsSnapshot, StepFunctionsState,
22 STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
23};
24
25const SUPPORTED: &[&str] = &[
26 "CreateStateMachine",
27 "DescribeStateMachine",
28 "ListStateMachines",
29 "DeleteStateMachine",
30 "UpdateStateMachine",
31 "TagResource",
32 "UntagResource",
33 "ListTagsForResource",
34 "StartExecution",
35 "StopExecution",
36 "DescribeExecution",
37 "ListExecutions",
38 "GetExecutionHistory",
39 "DescribeStateMachineForExecution",
40 "CreateActivity",
41 "DeleteActivity",
42 "DescribeActivity",
43 "ListActivities",
44 "GetActivityTask",
45 "SendTaskFailure",
46 "SendTaskHeartbeat",
47 "SendTaskSuccess",
48 "PublishStateMachineVersion",
49 "DeleteStateMachineVersion",
50 "ListStateMachineVersions",
51 "CreateStateMachineAlias",
52 "DeleteStateMachineAlias",
53 "DescribeStateMachineAlias",
54 "ListStateMachineAliases",
55 "UpdateStateMachineAlias",
56 "DescribeMapRun",
57 "ListMapRuns",
58 "UpdateMapRun",
59 "RedriveExecution",
60 "StartSyncExecution",
61 "TestState",
62 "ValidateStateMachineDefinition",
63];
64
65pub type SharedServiceRegistry = Arc<std::sync::OnceLock<Arc<ServiceRegistry>>>;
71
72pub struct StepFunctionsService {
73 state: SharedStepFunctionsState,
74 delivery: Option<Arc<DeliveryBus>>,
75 dynamodb_state: Option<SharedDynamoDbState>,
76 registry: Option<SharedServiceRegistry>,
77 snapshot_store: Option<Arc<dyn SnapshotStore>>,
78 snapshot_lock: Arc<AsyncMutex<()>>,
79}
80
81impl StepFunctionsService {
82 pub fn new(state: SharedStepFunctionsState) -> Self {
83 Self {
84 state,
85 delivery: None,
86 dynamodb_state: None,
87 registry: None,
88 snapshot_store: None,
89 snapshot_lock: Arc::new(AsyncMutex::new(())),
90 }
91 }
92
93 pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
94 self.delivery = Some(delivery);
95 self
96 }
97
98 pub fn with_dynamodb(mut self, dynamodb_state: SharedDynamoDbState) -> Self {
99 self.dynamodb_state = Some(dynamodb_state);
100 self
101 }
102
103 pub fn with_registry(mut self, registry: SharedServiceRegistry) -> Self {
108 self.registry = Some(registry);
109 self
110 }
111
112 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
113 self.snapshot_store = Some(store);
114 self
115 }
116
117 async fn save_snapshot(&self) {
118 let Some(store) = self.snapshot_store.clone() else {
119 return;
120 };
121 let _guard = self.snapshot_lock.lock().await;
122 let snapshot = StepFunctionsSnapshot {
123 schema_version: STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
124 state: None,
125 accounts: Some(self.state.read().clone()),
126 };
127 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
128 let bytes = serde_json::to_vec(&snapshot)
129 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
130 store.save(&bytes)
131 })
132 .await;
133 match join {
134 Ok(Ok(())) => {}
135 Ok(Err(err)) => tracing::error!(%err, "failed to write stepfunctions snapshot"),
136 Err(err) => tracing::error!(%err, "stepfunctions snapshot task panicked"),
137 }
138 }
139}
140
141fn is_mutating_action(action: &str) -> bool {
142 matches!(
143 action,
144 "CreateStateMachine"
145 | "DeleteStateMachine"
146 | "UpdateStateMachine"
147 | "TagResource"
148 | "UntagResource"
149 | "StartExecution"
150 | "StopExecution"
151 | "CreateActivity"
152 | "DeleteActivity"
153 | "GetActivityTask"
154 | "SendTaskFailure"
155 | "SendTaskHeartbeat"
156 | "SendTaskSuccess"
157 | "PublishStateMachineVersion"
158 | "DeleteStateMachineVersion"
159 | "CreateStateMachineAlias"
160 | "DeleteStateMachineAlias"
161 | "UpdateStateMachineAlias"
162 | "UpdateMapRun"
163 | "RedriveExecution"
164 | "StartSyncExecution"
165 )
166}
167
168#[async_trait]
169impl AwsService for StepFunctionsService {
170 fn service_name(&self) -> &str {
171 "states"
172 }
173
174 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
175 let mutates = is_mutating_action(req.action.as_str());
176 let result = match req.action.as_str() {
177 "CreateStateMachine" => self.create_state_machine(&req),
178 "DescribeStateMachine" => self.describe_state_machine(&req),
179 "ListStateMachines" => self.list_state_machines(&req),
180 "DeleteStateMachine" => self.delete_state_machine(&req),
181 "UpdateStateMachine" => self.update_state_machine(&req),
182 "TagResource" => self.tag_resource(&req),
183 "UntagResource" => self.untag_resource(&req),
184 "ListTagsForResource" => self.list_tags_for_resource(&req),
185 "StartExecution" => self.start_execution(&req),
186 "StopExecution" => self.stop_execution(&req),
187 "DescribeExecution" => self.describe_execution(&req),
188 "ListExecutions" => self.list_executions(&req),
189 "GetExecutionHistory" => self.get_execution_history(&req),
190 "DescribeStateMachineForExecution" => self.describe_state_machine_for_execution(&req),
191 "CreateActivity" => self.create_activity(&req),
192 "DeleteActivity" => self.delete_activity(&req),
193 "DescribeActivity" => self.describe_activity(&req),
194 "ListActivities" => self.list_activities(&req),
195 "GetActivityTask" => self.get_activity_task(&req).await,
196 "SendTaskFailure" => self.send_task_failure(&req),
197 "SendTaskHeartbeat" => self.send_task_heartbeat(&req),
198 "SendTaskSuccess" => self.send_task_success(&req),
199 "PublishStateMachineVersion" => self.publish_state_machine_version(&req),
200 "DeleteStateMachineVersion" => self.delete_state_machine_version(&req),
201 "ListStateMachineVersions" => self.list_state_machine_versions(&req),
202 "CreateStateMachineAlias" => self.create_state_machine_alias(&req),
203 "DeleteStateMachineAlias" => self.delete_state_machine_alias(&req),
204 "DescribeStateMachineAlias" => self.describe_state_machine_alias(&req),
205 "ListStateMachineAliases" => self.list_state_machine_aliases(&req),
206 "UpdateStateMachineAlias" => self.update_state_machine_alias(&req),
207 "DescribeMapRun" => self.describe_map_run(&req),
208 "ListMapRuns" => self.list_map_runs(&req),
209 "UpdateMapRun" => self.update_map_run(&req),
210 "RedriveExecution" => self.redrive_execution(&req),
211 "StartSyncExecution" => self.start_sync_execution(&req).await,
212 "TestState" => self.test_state(&req),
213 "ValidateStateMachineDefinition" => self.validate_state_machine_definition(&req),
214 _ => Err(AwsServiceError::action_not_implemented(
215 "states",
216 &req.action,
217 )),
218 };
219 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
220 self.save_snapshot().await;
221 }
222 result
223 }
224
225 fn supported_actions(&self) -> &[&str] {
226 SUPPORTED
227 }
228}
229
230impl StepFunctionsService {
231 fn create_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
234 let body = req.json_body();
235
236 validate_required("name", &body["name"])?;
237 let name = body["name"].as_str().ok_or_else(|| missing("name"))?;
238 validate_name(name)?;
239
240 validate_required("definition", &body["definition"])?;
241 let definition = body["definition"]
242 .as_str()
243 .ok_or_else(|| missing("definition"))?;
244 validate_definition(definition)?;
245
246 validate_required("roleArn", &body["roleArn"])?;
247 let role_arn = body["roleArn"].as_str().ok_or_else(|| missing("roleArn"))?;
248 validate_arn(role_arn)?;
249
250 let machine_type = if let Some(t) = body["type"].as_str() {
251 StateMachineType::parse(t).ok_or_else(|| {
252 AwsServiceError::aws_error(
253 StatusCode::BAD_REQUEST,
254 "ValidationException",
255 format!(
256 "Value '{t}' at 'type' failed to satisfy constraint: \
257 Member must satisfy enum value set: [STANDARD, EXPRESS]"
258 ),
259 )
260 })?
261 } else {
262 StateMachineType::Standard
263 };
264
265 let mut accounts = self.state.write();
266 let state = accounts.get_or_create(&req.account_id);
267 let arn = state.state_machine_arn(name);
268
269 if state.state_machines.values().any(|sm| sm.name == name) {
271 return Err(AwsServiceError::aws_error(
272 StatusCode::CONFLICT,
273 "StateMachineAlreadyExists",
274 format!("State Machine Already Exists: '{arn}'"),
275 ));
276 }
277
278 let now = Utc::now();
279 let revision_id = uuid::Uuid::new_v4().to_string();
280
281 let mut tags = BTreeMap::new();
282 if !body["tags"].is_null() {
283 fakecloud_core::tags::apply_tags(&mut tags, &body, "tags", "key", "value").map_err(
284 |f| {
285 AwsServiceError::aws_error(
286 StatusCode::BAD_REQUEST,
287 "ValidationException",
288 format!("{f} must be a list"),
289 )
290 },
291 )?;
292 }
293
294 let sm = StateMachine {
295 name: name.to_string(),
296 arn: arn.clone(),
297 definition: definition.to_string(),
298 role_arn: role_arn.to_string(),
299 machine_type,
300 status: StateMachineStatus::Active,
301 creation_date: now,
302 update_date: now,
303 tags,
304 revision_id: revision_id.clone(),
305 logging_configuration: body.get("loggingConfiguration").cloned(),
306 tracing_configuration: body.get("tracingConfiguration").cloned(),
307 description: body["description"].as_str().unwrap_or("").to_string(),
308 };
309
310 state.state_machines.insert(arn.clone(), sm);
311
312 Ok(AwsResponse::ok_json(json!({
313 "stateMachineArn": arn,
314 "creationDate": now.timestamp() as f64,
315 "stateMachineVersionArn": arn,
316 })))
317 }
318
319 fn describe_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
320 let body = req.json_body();
321 validate_required("stateMachineArn", &body["stateMachineArn"])?;
322 let arn = body["stateMachineArn"]
323 .as_str()
324 .ok_or_else(|| missing("stateMachineArn"))?;
325 validate_arn(arn)?;
326
327 let accounts = self.state.read();
328 let empty = StepFunctionsState::new(&req.account_id, &req.region);
329 let state = accounts.get(&req.account_id).unwrap_or(&empty);
330 let sm = state
331 .state_machines
332 .get(arn)
333 .ok_or_else(|| state_machine_not_found(arn))?;
334
335 Ok(AwsResponse::ok_json(state_machine_to_json(sm)))
336 }
337
338 fn list_state_machines(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
339 let body = req.json_body();
340 let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
341 validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
342 let next_token = body["nextToken"].as_str();
343
344 let accounts = self.state.read();
345 let empty = StepFunctionsState::new(&req.account_id, &req.region);
346 let state = accounts.get(&req.account_id).unwrap_or(&empty);
347 let mut machines: Vec<&StateMachine> = state.state_machines.values().collect();
348 machines.sort_by(|a, b| a.name.cmp(&b.name));
349
350 let items: Vec<Value> = machines
351 .iter()
352 .map(|sm| {
353 json!({
354 "name": sm.name,
355 "stateMachineArn": sm.arn,
356 "type": sm.machine_type.as_str(),
357 "creationDate": sm.creation_date.timestamp() as f64,
358 })
359 })
360 .collect();
361
362 let (page, token) = paginate(&items, next_token, max_results);
363
364 let mut resp = json!({ "stateMachines": page });
365 if let Some(t) = token {
366 resp["nextToken"] = json!(t);
367 }
368 Ok(AwsResponse::ok_json(resp))
369 }
370
371 fn delete_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
372 let body = req.json_body();
373 validate_required("stateMachineArn", &body["stateMachineArn"])?;
374 let arn = body["stateMachineArn"]
375 .as_str()
376 .ok_or_else(|| missing("stateMachineArn"))?;
377 validate_arn(arn)?;
378
379 let mut accounts = self.state.write();
380 let state = accounts.get_or_create(&req.account_id);
381 state.state_machines.remove(arn);
383
384 Ok(AwsResponse::ok_json(json!({})))
385 }
386
387 fn update_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
388 let body = req.json_body();
389 validate_required("stateMachineArn", &body["stateMachineArn"])?;
390 let arn = body["stateMachineArn"]
391 .as_str()
392 .ok_or_else(|| missing("stateMachineArn"))?;
393 validate_arn(arn)?;
394
395 let mut accounts = self.state.write();
396 let state = accounts.get_or_create(&req.account_id);
397 let sm = state
398 .state_machines
399 .get_mut(arn)
400 .ok_or_else(|| state_machine_not_found(arn))?;
401
402 if let Some(definition) = body["definition"].as_str() {
403 validate_definition(definition)?;
404 sm.definition = definition.to_string();
405 }
406
407 if let Some(role_arn) = body["roleArn"].as_str() {
408 validate_arn(role_arn)?;
409 sm.role_arn = role_arn.to_string();
410 }
411
412 if let Some(logging) = body.get("loggingConfiguration") {
413 sm.logging_configuration = Some(logging.clone());
414 }
415
416 if let Some(tracing) = body.get("tracingConfiguration") {
417 sm.tracing_configuration = Some(tracing.clone());
418 }
419
420 if let Some(description) = body["description"].as_str() {
421 sm.description = description.to_string();
422 }
423
424 let now = Utc::now();
425 sm.update_date = now;
426 sm.revision_id = uuid::Uuid::new_v4().to_string();
427
428 let revision_id = sm.revision_id.clone();
429 let sm_arn = sm.arn.clone();
430
431 Ok(AwsResponse::ok_json(json!({
432 "updateDate": now.timestamp() as f64,
433 "revisionId": revision_id,
434 "stateMachineVersionArn": sm_arn,
435 })))
436 }
437
438 fn start_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
441 let body = req.json_body();
442 validate_required("stateMachineArn", &body["stateMachineArn"])?;
443 let sm_arn = body["stateMachineArn"]
444 .as_str()
445 .ok_or_else(|| missing("stateMachineArn"))?;
446 validate_arn(sm_arn)?;
447
448 let input = body["input"].as_str().map(|s| s.to_string());
449
450 if let Some(ref input_str) = input {
452 let _: serde_json::Value = serde_json::from_str(input_str).map_err(|_| {
453 AwsServiceError::aws_error(
454 StatusCode::BAD_REQUEST,
455 "InvalidExecutionInput",
456 "Invalid execution input: must be valid JSON".to_string(),
457 )
458 })?;
459 }
460
461 let execution_name = body["name"]
462 .as_str()
463 .map(|s| s.to_string())
464 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
465
466 if let Some(name) = body["name"].as_str() {
467 validate_name(name)?;
468 }
469
470 let mut accounts = self.state.write();
471 let state = accounts.get_or_create(&req.account_id);
472 let sm = state
473 .state_machines
474 .get(sm_arn)
475 .ok_or_else(|| state_machine_not_found(sm_arn))?;
476
477 let sm_name = sm.name.clone();
478 let definition = sm.definition.clone();
479 let exec_arn = state.execution_arn(&sm_name, &execution_name);
480
481 if state.executions.contains_key(&exec_arn) {
483 return Err(AwsServiceError::aws_error(
484 StatusCode::CONFLICT,
485 "ExecutionAlreadyExists",
486 format!("Execution Already Exists: '{exec_arn}'"),
487 ));
488 }
489
490 let now = Utc::now();
491 let execution = Execution {
492 execution_arn: exec_arn.clone(),
493 state_machine_arn: sm_arn.to_string(),
494 state_machine_name: sm_name,
495 name: execution_name,
496 status: ExecutionStatus::Running,
497 input: input.clone(),
498 output: None,
499 start_date: now,
500 stop_date: None,
501 error: None,
502 cause: None,
503 history_events: vec![],
504 parent_execution_arn: None,
505 is_sync: false,
506 billed_duration_ms: None,
507 billed_memory_mb: None,
508 };
509
510 state.executions.insert(exec_arn.clone(), execution);
511 let logging_config = sm.logging_configuration.clone();
512 drop(accounts);
513
514 let shared_state = self.state.clone();
516 let exec_arn_clone = exec_arn.clone();
517 let input_clone = input;
518 let delivery = self.delivery.clone();
519 let dynamodb_state = self.dynamodb_state.clone();
520 let registry = self.registry.clone();
521 tokio::spawn(async move {
522 interpreter::execute_state_machine(
523 shared_state,
524 exec_arn_clone,
525 definition,
526 input_clone,
527 delivery,
528 dynamodb_state,
529 registry,
530 logging_config,
531 )
532 .await;
533 });
534
535 Ok(AwsResponse::ok_json(json!({
536 "executionArn": exec_arn,
537 "startDate": now.timestamp() as f64,
538 })))
539 }
540
541 fn stop_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
542 let body = req.json_body();
543 validate_required("executionArn", &body["executionArn"])?;
544 let exec_arn = body["executionArn"]
545 .as_str()
546 .ok_or_else(|| missing("executionArn"))?;
547
548 let error = body["error"].as_str().map(|s| s.to_string());
549 let cause = body["cause"].as_str().map(|s| s.to_string());
550
551 let mut accounts = self.state.write();
552 let state = accounts.get_or_create(&req.account_id);
553 let exec = state
554 .executions
555 .get_mut(exec_arn)
556 .ok_or_else(|| execution_not_found(exec_arn))?;
557
558 if exec.status != ExecutionStatus::Running {
559 return Err(AwsServiceError::aws_error(
560 StatusCode::BAD_REQUEST,
561 "ExecutionNotRunning",
562 format!("Execution is not running: '{exec_arn}'"),
563 ));
564 }
565
566 let now = Utc::now();
567 exec.status = ExecutionStatus::Aborted;
568 exec.stop_date = Some(now);
569 exec.error = error;
570 exec.cause = cause;
571
572 Ok(AwsResponse::ok_json(json!({
573 "stopDate": now.timestamp() as f64,
574 })))
575 }
576
577 fn describe_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
578 let body = req.json_body();
579 validate_required("executionArn", &body["executionArn"])?;
580 let exec_arn = body["executionArn"]
581 .as_str()
582 .ok_or_else(|| missing("executionArn"))?;
583
584 let accounts = self.state.read();
585 let empty = StepFunctionsState::new(&req.account_id, &req.region);
586 let state = accounts.get(&req.account_id).unwrap_or(&empty);
587 let exec = state
588 .executions
589 .get(exec_arn)
590 .ok_or_else(|| execution_not_found(exec_arn))?;
591
592 Ok(AwsResponse::ok_json(execution_to_json(exec)))
593 }
594
595 fn list_executions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
596 let body = req.json_body();
597 validate_required("stateMachineArn", &body["stateMachineArn"])?;
598 let sm_arn = body["stateMachineArn"]
599 .as_str()
600 .ok_or_else(|| missing("stateMachineArn"))?;
601 validate_arn(sm_arn)?;
602
603 let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
604 validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
605 let next_token = body["nextToken"].as_str();
606 let status_filter = body["statusFilter"].as_str();
607
608 let accounts = self.state.read();
609 let empty = StepFunctionsState::new(&req.account_id, &req.region);
610 let state = accounts.get(&req.account_id).unwrap_or(&empty);
611
612 if !state.state_machines.contains_key(sm_arn) {
614 return Err(state_machine_not_found(sm_arn));
615 }
616
617 let mut executions: Vec<&Execution> = state
618 .executions
619 .values()
620 .filter(|e| e.state_machine_arn == sm_arn)
621 .filter(|e| {
622 status_filter
623 .map(|sf| e.status.as_str() == sf)
624 .unwrap_or(true)
625 })
626 .collect();
627
628 executions.sort_by_key(|e| std::cmp::Reverse(e.start_date));
630
631 let items: Vec<Value> = executions
632 .iter()
633 .map(|e| {
634 let mut item = json!({
635 "executionArn": e.execution_arn,
636 "stateMachineArn": e.state_machine_arn,
637 "name": e.name,
638 "status": e.status.as_str(),
639 "startDate": e.start_date.timestamp() as f64,
640 });
641 if let Some(stop) = e.stop_date {
642 item["stopDate"] = json!(stop.timestamp() as f64);
643 }
644 item
645 })
646 .collect();
647
648 let (page, token) = paginate(&items, next_token, max_results);
649
650 let mut resp = json!({ "executions": page });
651 if let Some(t) = token {
652 resp["nextToken"] = json!(t);
653 }
654 Ok(AwsResponse::ok_json(resp))
655 }
656
657 fn get_execution_history(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
658 let body = req.json_body();
659 validate_required("executionArn", &body["executionArn"])?;
660 let exec_arn = body["executionArn"]
661 .as_str()
662 .ok_or_else(|| missing("executionArn"))?;
663
664 let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
665 validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
666 let next_token = body["nextToken"].as_str();
667 let reverse_order = body["reverseOrder"].as_bool().unwrap_or(false);
668
669 let accounts = self.state.read();
670 let empty = StepFunctionsState::new(&req.account_id, &req.region);
671 let state = accounts.get(&req.account_id).unwrap_or(&empty);
672 let exec = state
673 .executions
674 .get(exec_arn)
675 .ok_or_else(|| execution_not_found(exec_arn))?;
676
677 let mut events: Vec<Value> = exec
678 .history_events
679 .iter()
680 .map(|e| {
681 json!({
682 "id": e.id,
683 "type": e.event_type,
684 "timestamp": e.timestamp.timestamp() as f64,
685 "previousEventId": e.previous_event_id,
686 format!("{}EventDetails", camel_to_details_key(&e.event_type)): e.details,
687 })
688 })
689 .collect();
690
691 if reverse_order {
692 events.reverse();
693 }
694
695 let (page, token) = paginate(&events, next_token, max_results);
696
697 let mut resp = json!({ "events": page });
698 if let Some(t) = token {
699 resp["nextToken"] = json!(t);
700 }
701 Ok(AwsResponse::ok_json(resp))
702 }
703
704 fn describe_state_machine_for_execution(
705 &self,
706 req: &AwsRequest,
707 ) -> Result<AwsResponse, AwsServiceError> {
708 let body = req.json_body();
709 validate_required("executionArn", &body["executionArn"])?;
710 let exec_arn = body["executionArn"]
711 .as_str()
712 .ok_or_else(|| missing("executionArn"))?;
713
714 let accounts = self.state.read();
715 let empty = StepFunctionsState::new(&req.account_id, &req.region);
716 let state = accounts.get(&req.account_id).unwrap_or(&empty);
717 let exec = state
718 .executions
719 .get(exec_arn)
720 .ok_or_else(|| execution_not_found(exec_arn))?;
721
722 let sm = state
723 .state_machines
724 .get(&exec.state_machine_arn)
725 .ok_or_else(|| state_machine_not_found(&exec.state_machine_arn))?;
726
727 Ok(AwsResponse::ok_json(state_machine_to_json(sm)))
728 }
729
730 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
733 let body = req.json_body();
734 validate_required("resourceArn", &body["resourceArn"])?;
735 let arn = body["resourceArn"]
736 .as_str()
737 .ok_or_else(|| missing("resourceArn"))?;
738 validate_arn(arn)?;
739 validate_required("tags", &body["tags"])?;
740
741 let mut accounts = self.state.write();
742 let state = accounts.get_or_create(&req.account_id);
743 let sm = state
744 .state_machines
745 .get_mut(arn)
746 .ok_or_else(|| resource_not_found(arn))?;
747
748 fakecloud_core::tags::apply_tags(&mut sm.tags, &body, "tags", "key", "value").map_err(
749 |f| {
750 AwsServiceError::aws_error(
751 StatusCode::BAD_REQUEST,
752 "ValidationException",
753 format!("{f} must be a list"),
754 )
755 },
756 )?;
757
758 Ok(AwsResponse::ok_json(json!({})))
759 }
760
761 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
762 let body = req.json_body();
763 validate_required("resourceArn", &body["resourceArn"])?;
764 let arn = body["resourceArn"]
765 .as_str()
766 .ok_or_else(|| missing("resourceArn"))?;
767 validate_arn(arn)?;
768 validate_required("tagKeys", &body["tagKeys"])?;
769
770 let mut accounts = self.state.write();
771 let state = accounts.get_or_create(&req.account_id);
772 let sm = state
773 .state_machines
774 .get_mut(arn)
775 .ok_or_else(|| resource_not_found(arn))?;
776
777 fakecloud_core::tags::remove_tags(&mut sm.tags, &body, "tagKeys").map_err(|f| {
778 AwsServiceError::aws_error(
779 StatusCode::BAD_REQUEST,
780 "ValidationException",
781 format!("{f} must be a list"),
782 )
783 })?;
784
785 Ok(AwsResponse::ok_json(json!({})))
786 }
787
788 fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
789 let body = req.json_body();
790 validate_required("resourceArn", &body["resourceArn"])?;
791 let arn = body["resourceArn"]
792 .as_str()
793 .ok_or_else(|| missing("resourceArn"))?;
794 validate_arn(arn)?;
795
796 let accounts = self.state.read();
797 let empty = StepFunctionsState::new(&req.account_id, &req.region);
798 let state = accounts.get(&req.account_id).unwrap_or(&empty);
799 let sm = state
800 .state_machines
801 .get(arn)
802 .ok_or_else(|| resource_not_found(arn))?;
803
804 let tags = fakecloud_core::tags::tags_to_json(&sm.tags, "key", "value");
805
806 Ok(AwsResponse::ok_json(json!({ "tags": tags })))
807 }
808
809 fn create_activity(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
812 let body = req.json_body();
813 let name = body["name"].as_str().ok_or_else(|| missing("name"))?;
814 validate_name(name)?;
815 let mut accounts = self.state.write();
816 let state = accounts.get_or_create(&req.account_id);
817 let arn = format!(
818 "arn:aws:states:{}:{}:activity:{}",
819 state.region, state.account_id, name
820 );
821 if state.activities.contains_key(&arn) {
822 return Err(AwsServiceError::aws_error(
823 StatusCode::BAD_REQUEST,
824 "ActivityAlreadyExists",
825 format!("Activity already exists: {arn}"),
826 ));
827 }
828 let activity = crate::state::Activity {
829 name: name.to_string(),
830 arn: arn.clone(),
831 creation_date: chrono::Utc::now(),
832 tags: BTreeMap::new(),
833 };
834 state.activities.insert(arn.clone(), activity.clone());
835 Ok(AwsResponse::ok_json(json!({
836 "activityArn": arn,
837 "creationDate": activity.creation_date.timestamp(),
838 })))
839 }
840
841 fn delete_activity(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
842 let body = req.json_body();
843 let arn = body["activityArn"]
844 .as_str()
845 .ok_or_else(|| missing("activityArn"))?
846 .to_string();
847 let mut accounts = self.state.write();
848 let state = accounts.get_or_create(&req.account_id);
849 state.activities.remove(&arn);
850 Ok(AwsResponse::ok_json(json!({})))
851 }
852
853 fn describe_activity(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
854 let body = req.json_body();
855 let arn = body["activityArn"]
856 .as_str()
857 .ok_or_else(|| missing("activityArn"))?
858 .to_string();
859 let accounts = self.state.read();
860 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
861 let state = accounts.get(&req.account_id).unwrap_or(&empty);
862 let a = state.activities.get(&arn).ok_or_else(|| {
863 AwsServiceError::aws_error(
864 StatusCode::BAD_REQUEST,
865 "ActivityDoesNotExist",
866 format!("Activity does not exist: {arn}"),
867 )
868 })?;
869 Ok(AwsResponse::ok_json(json!({
870 "activityArn": a.arn,
871 "name": a.name,
872 "creationDate": a.creation_date.timestamp(),
873 })))
874 }
875
876 fn list_activities(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
877 let accounts = self.state.read();
878 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
879 let state = accounts.get(&req.account_id).unwrap_or(&empty);
880 let mut activities: Vec<&crate::state::Activity> = state.activities.values().collect();
881 activities.sort_by(|a, b| a.name.cmp(&b.name));
882 let body = json!({
883 "activities": activities.iter().map(|a| json!({
884 "activityArn": a.arn,
885 "name": a.name,
886 "creationDate": a.creation_date.timestamp(),
887 })).collect::<Vec<_>>(),
888 });
889 Ok(AwsResponse::ok_json(body))
890 }
891
892 async fn get_activity_task(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
893 let body = req.json_body();
894 let arn = body["activityArn"]
895 .as_str()
896 .ok_or_else(|| missing("activityArn"))?
897 .to_string();
898 {
900 let accounts = self.state.read();
901 let state = accounts
902 .get(&req.account_id)
903 .ok_or_else(|| activity_not_found(&arn))?;
904 if !state.activities.contains_key(&arn) {
905 return Err(activity_not_found(&arn));
906 }
907 }
908
909 let max_wait_secs: u64 = std::env::var("FAKECLOUD_SFN_GET_ACTIVITY_TIMEOUT_SECS")
912 .ok()
913 .and_then(|s| s.parse().ok())
914 .unwrap_or(5);
915 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(max_wait_secs);
916
917 loop {
918 {
920 let mut accounts = self.state.write();
921 let state = accounts.get_or_create(&req.account_id);
922 let mut candidates: Vec<(String, chrono::DateTime<chrono::Utc>)> = state
923 .task_tokens
924 .iter()
925 .filter(|(_, t)| t.activity_arn == arn && t.status == "PENDING")
926 .map(|(k, t)| (k.clone(), t.created_at))
927 .collect();
928 candidates.sort_by_key(|c| c.1);
929 if let Some((token, _)) = candidates.into_iter().next() {
930 let now = chrono::Utc::now();
931 let entry = state.task_tokens.get_mut(&token).expect("just looked up");
932 entry.status = "IN_PROGRESS".to_string();
933 entry.last_heartbeat_at = Some(now);
934 let input = entry.input.clone().unwrap_or_else(|| "{}".to_string());
935 return Ok(AwsResponse::ok_json(json!({
936 "taskToken": token,
937 "input": input,
938 })));
939 }
940 }
941 if std::time::Instant::now() >= deadline {
942 return Ok(AwsResponse::ok_json(json!({
945 "taskToken": "",
946 "input": "",
947 })));
948 }
949 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
950 }
951 }
952
953 fn send_task_success(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
954 self.update_task_token(req, "SUCCEEDED")
955 }
956
957 fn send_task_failure(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
958 self.update_task_token(req, "FAILED")
959 }
960
961 fn send_task_heartbeat(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
962 let body = req.json_body();
967 let token = body["taskToken"]
968 .as_str()
969 .ok_or_else(|| missing("taskToken"))?
970 .to_string();
971 let mut accounts = self.state.write();
972 let state = accounts.get_or_create(&req.account_id);
973 let entry = state
974 .task_tokens
975 .get_mut(&token)
976 .ok_or_else(|| task_does_not_exist(&token))?;
977 entry.last_heartbeat_at = Some(chrono::Utc::now());
978 Ok(AwsResponse::ok_json(json!({})))
979 }
980
981 fn update_task_token(
982 &self,
983 req: &AwsRequest,
984 new_status: &str,
985 ) -> Result<AwsResponse, AwsServiceError> {
986 let body = req.json_body();
987 let token = body["taskToken"]
988 .as_str()
989 .ok_or_else(|| missing("taskToken"))?
990 .to_string();
991 let mut accounts = self.state.write();
992 let state = accounts.get_or_create(&req.account_id);
993 let entry = state
994 .task_tokens
995 .get_mut(&token)
996 .ok_or_else(|| task_does_not_exist(&token))?;
997 entry.status = new_status.to_string();
998 if new_status == "SUCCEEDED" {
999 entry.output = body["output"].as_str().map(String::from);
1000 } else if new_status == "FAILED" {
1001 entry.error = body["error"].as_str().map(String::from);
1002 entry.cause = body["cause"].as_str().map(String::from);
1003 }
1004 Ok(AwsResponse::ok_json(json!({})))
1005 }
1006
1007 fn publish_state_machine_version(
1010 &self,
1011 req: &AwsRequest,
1012 ) -> Result<AwsResponse, AwsServiceError> {
1013 let body = req.json_body();
1014 let arn = body["stateMachineArn"]
1015 .as_str()
1016 .ok_or_else(|| missing("stateMachineArn"))?
1017 .to_string();
1018 let description = body["description"].as_str().unwrap_or("").to_string();
1019 let mut accounts = self.state.write();
1020 let state = accounts.get_or_create(&req.account_id);
1021 if !state.state_machines.contains_key(&arn) {
1022 return Err(state_machine_not_found(&arn));
1023 }
1024 let version = state
1025 .state_machine_versions
1026 .values()
1027 .filter(|v| v.state_machine_arn == arn)
1028 .map(|v| v.version)
1029 .max()
1030 .unwrap_or(0)
1031 + 1;
1032 let version_arn = format!("{arn}:{version}");
1033 let v = crate::state::StateMachineVersion {
1034 state_machine_arn: arn,
1035 version,
1036 revision_id: format!("rev-{version}"),
1037 description,
1038 creation_date: chrono::Utc::now(),
1039 };
1040 state
1041 .state_machine_versions
1042 .insert(version_arn.clone(), v.clone());
1043 Ok(AwsResponse::ok_json(json!({
1044 "stateMachineVersionArn": version_arn,
1045 "creationDate": v.creation_date.timestamp(),
1046 })))
1047 }
1048
1049 fn delete_state_machine_version(
1050 &self,
1051 req: &AwsRequest,
1052 ) -> Result<AwsResponse, AwsServiceError> {
1053 let body = req.json_body();
1054 let arn = body["stateMachineVersionArn"]
1055 .as_str()
1056 .ok_or_else(|| missing("stateMachineVersionArn"))?
1057 .to_string();
1058 let mut accounts = self.state.write();
1059 let state = accounts.get_or_create(&req.account_id);
1060 state.state_machine_versions.remove(&arn);
1061 Ok(AwsResponse::ok_json(json!({})))
1062 }
1063
1064 fn list_state_machine_versions(
1065 &self,
1066 req: &AwsRequest,
1067 ) -> Result<AwsResponse, AwsServiceError> {
1068 let body = req.json_body();
1069 let arn = body["stateMachineArn"]
1070 .as_str()
1071 .ok_or_else(|| missing("stateMachineArn"))?
1072 .to_string();
1073 let accounts = self.state.read();
1074 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1075 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1076 let mut versions: Vec<&crate::state::StateMachineVersion> = state
1077 .state_machine_versions
1078 .values()
1079 .filter(|v| v.state_machine_arn == arn)
1080 .collect();
1081 versions.sort_by_key(|v| std::cmp::Reverse(v.version));
1082 let resp = json!({
1083 "stateMachineVersions": versions.iter().map(|v| json!({
1084 "stateMachineVersionArn": format!("{}:{}", v.state_machine_arn, v.version),
1085 "creationDate": v.creation_date.timestamp(),
1086 })).collect::<Vec<_>>(),
1087 });
1088 Ok(AwsResponse::ok_json(resp))
1089 }
1090
1091 fn create_state_machine_alias(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1092 let body = req.json_body();
1093 let name = body["name"]
1094 .as_str()
1095 .ok_or_else(|| missing("name"))?
1096 .to_string();
1097 validate_name(&name)?;
1098 let routing_cfg = body["routingConfiguration"]
1099 .as_array()
1100 .ok_or_else(|| missing("routingConfiguration"))?;
1101 let routes = parse_routing_configuration(routing_cfg)?;
1102 let parent_arn = routes[0]
1103 .state_machine_version_arn
1104 .rsplit_once(':')
1105 .map(|(parent, _)| parent.to_string())
1106 .unwrap_or_default();
1107 let alias_arn = format!("{parent_arn}:{name}");
1108 let now = chrono::Utc::now();
1109 let alias = crate::state::StateMachineAlias {
1110 name,
1111 arn: alias_arn.clone(),
1112 description: body["description"].as_str().unwrap_or("").to_string(),
1113 routing_configuration: routes,
1114 creation_date: now,
1115 update_date: now,
1116 };
1117 let mut accounts = self.state.write();
1118 let state = accounts.get_or_create(&req.account_id);
1119 state.state_machine_aliases.insert(alias_arn.clone(), alias);
1120 Ok(AwsResponse::ok_json(json!({
1121 "stateMachineAliasArn": alias_arn,
1122 "creationDate": now.timestamp(),
1123 })))
1124 }
1125
1126 fn delete_state_machine_alias(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1127 let body = req.json_body();
1128 let arn = body["stateMachineAliasArn"]
1129 .as_str()
1130 .ok_or_else(|| missing("stateMachineAliasArn"))?
1131 .to_string();
1132 let mut accounts = self.state.write();
1133 let state = accounts.get_or_create(&req.account_id);
1134 state.state_machine_aliases.remove(&arn);
1135 Ok(AwsResponse::ok_json(json!({})))
1136 }
1137
1138 fn describe_state_machine_alias(
1139 &self,
1140 req: &AwsRequest,
1141 ) -> Result<AwsResponse, AwsServiceError> {
1142 let body = req.json_body();
1143 let arn = body["stateMachineAliasArn"]
1144 .as_str()
1145 .ok_or_else(|| missing("stateMachineAliasArn"))?
1146 .to_string();
1147 let accounts = self.state.read();
1148 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1149 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1150 let alias = state
1151 .state_machine_aliases
1152 .get(&arn)
1153 .ok_or_else(|| resource_not_found(&arn))?;
1154 Ok(AwsResponse::ok_json(state_machine_alias_to_json(alias)))
1155 }
1156
1157 fn list_state_machine_aliases(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1158 let body = req.json_body();
1159 let parent = body["stateMachineArn"]
1160 .as_str()
1161 .ok_or_else(|| missing("stateMachineArn"))?
1162 .to_string();
1163 let accounts = self.state.read();
1164 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1165 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1166 let parent_prefix = format!("{parent}:");
1169 let mut aliases: Vec<&crate::state::StateMachineAlias> = state
1170 .state_machine_aliases
1171 .values()
1172 .filter(|a| a.arn.starts_with(&parent_prefix))
1173 .collect();
1174 aliases.sort_by(|a, b| a.name.cmp(&b.name));
1175 Ok(AwsResponse::ok_json(json!({
1176 "stateMachineAliases": aliases.iter().map(|a| json!({
1177 "stateMachineAliasArn": a.arn,
1178 "creationDate": a.creation_date.timestamp(),
1179 })).collect::<Vec<_>>(),
1180 })))
1181 }
1182
1183 fn update_state_machine_alias(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1184 let body = req.json_body();
1185 let arn = body["stateMachineAliasArn"]
1186 .as_str()
1187 .ok_or_else(|| missing("stateMachineAliasArn"))?
1188 .to_string();
1189 let mut accounts = self.state.write();
1190 let state = accounts.get_or_create(&req.account_id);
1191 let alias = state
1192 .state_machine_aliases
1193 .get_mut(&arn)
1194 .ok_or_else(|| resource_not_found(&arn))?;
1195 if let Some(d) = body["description"].as_str() {
1196 alias.description = d.to_string();
1197 }
1198 if let Some(routes) = body["routingConfiguration"].as_array() {
1199 alias.routing_configuration = parse_routing_configuration(routes)?;
1200 }
1201 alias.update_date = chrono::Utc::now();
1202 Ok(AwsResponse::ok_json(json!({
1203 "updateDate": alias.update_date.timestamp(),
1204 })))
1205 }
1206
1207 fn describe_map_run(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1210 let body = req.json_body();
1211 let arn = body["mapRunArn"]
1212 .as_str()
1213 .ok_or_else(|| missing("mapRunArn"))?
1214 .to_string();
1215 let accounts = self.state.read();
1216 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1217 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1218 let mr = state
1219 .map_runs
1220 .get(&arn)
1221 .ok_or_else(|| resource_not_found(&arn))?;
1222 Ok(AwsResponse::ok_json(map_run_to_json(mr)))
1223 }
1224
1225 fn list_map_runs(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1226 let body = req.json_body();
1227 let exec_arn = body["executionArn"].as_str().map(String::from);
1228 let accounts = self.state.read();
1229 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1230 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1231 let runs: Vec<&crate::state::MapRun> = state
1232 .map_runs
1233 .values()
1234 .filter(|r| exec_arn.as_deref().is_none_or(|e| r.execution_arn == e))
1235 .collect();
1236 Ok(AwsResponse::ok_json(json!({
1237 "mapRuns": runs.iter().map(|r| json!({
1238 "mapRunArn": r.map_run_arn,
1239 "executionArn": r.execution_arn,
1240 "stateMachineArn": "",
1241 "startDate": r.start_date.timestamp(),
1242 "stopDate": r.stop_date.map(|d| d.timestamp()),
1243 })).collect::<Vec<_>>(),
1244 })))
1245 }
1246
1247 fn update_map_run(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1248 let body = req.json_body();
1249 let arn = body["mapRunArn"]
1250 .as_str()
1251 .ok_or_else(|| missing("mapRunArn"))?
1252 .to_string();
1253 let mut accounts = self.state.write();
1254 let state = accounts.get_or_create(&req.account_id);
1255 let mr = state
1256 .map_runs
1257 .get_mut(&arn)
1258 .ok_or_else(|| resource_not_found(&arn))?;
1259 if let Some(c) = body["maxConcurrency"].as_i64() {
1260 mr.max_concurrency = c as i32;
1261 }
1262 if let Some(p) = body["toleratedFailurePercentage"].as_f64() {
1263 mr.tolerated_failure_percentage = p;
1264 }
1265 if let Some(c) = body["toleratedFailureCount"].as_i64() {
1266 mr.tolerated_failure_count = c;
1267 }
1268 Ok(AwsResponse::ok_json(json!({})))
1269 }
1270
1271 fn redrive_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1274 let body = req.json_body();
1275 let arn = body["executionArn"]
1276 .as_str()
1277 .ok_or_else(|| missing("executionArn"))?
1278 .to_string();
1279 let mut accounts = self.state.write();
1280 let state = accounts.get_or_create(&req.account_id);
1281 let exec = state.executions.get_mut(&arn).ok_or_else(|| {
1282 AwsServiceError::aws_error(
1283 StatusCode::BAD_REQUEST,
1284 "ExecutionDoesNotExist",
1285 format!("Execution does not exist: {arn}"),
1286 )
1287 })?;
1288 exec.status = crate::state::ExecutionStatus::Running;
1289 exec.stop_date = None;
1290 Ok(AwsResponse::ok_json(json!({
1291 "redriveDate": chrono::Utc::now().timestamp(),
1292 })))
1293 }
1294
1295 async fn start_sync_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1296 let body = req.json_body();
1297 let sm_arn = body["stateMachineArn"]
1298 .as_str()
1299 .ok_or_else(|| missing("stateMachineArn"))?
1300 .to_string();
1301 let input = body["input"].as_str().unwrap_or("{}").to_string();
1302 if serde_json::from_str::<serde_json::Value>(&input).is_err() {
1303 return Err(AwsServiceError::aws_error(
1304 StatusCode::BAD_REQUEST,
1305 "InvalidExecutionInput",
1306 "Execution input is not valid JSON.",
1307 ));
1308 }
1309 let (exec_arn, definition, logging_config) = {
1310 let mut accounts = self.state.write();
1311 let state = accounts.get_or_create(&req.account_id);
1312 let sm = state
1313 .state_machines
1314 .get(&sm_arn)
1315 .ok_or_else(|| state_machine_not_found(&sm_arn))?;
1316 if sm.machine_type != crate::state::StateMachineType::Express {
1317 return Err(AwsServiceError::aws_error(
1318 StatusCode::BAD_REQUEST,
1319 "StateMachineTypeNotSupported",
1320 "StartSyncExecution is only supported for EXPRESS state machines.",
1321 ));
1322 }
1323 let now = chrono::Utc::now();
1324 let exec_name = format!("sync-{}", now.timestamp_millis());
1325 let exec_arn = format!(
1326 "arn:aws:states:{}:{}:express:{}:{}",
1327 state.region, state.account_id, sm.name, exec_name
1328 );
1329 let execution = Execution {
1330 execution_arn: exec_arn.clone(),
1331 state_machine_arn: sm_arn.clone(),
1332 state_machine_name: sm.name.clone(),
1333 name: exec_name.clone(),
1334 status: ExecutionStatus::Running,
1335 input: Some(input.clone()),
1336 output: None,
1337 start_date: now,
1338 stop_date: None,
1339 error: None,
1340 cause: None,
1341 history_events: vec![],
1342 parent_execution_arn: None,
1343 is_sync: true,
1344 billed_duration_ms: None,
1345 billed_memory_mb: None,
1346 };
1347 state.executions.insert(exec_arn.clone(), execution);
1348 (
1349 exec_arn,
1350 sm.definition.clone(),
1351 sm.logging_configuration.clone(),
1352 )
1353 };
1354
1355 interpreter::execute_state_machine(
1356 self.state.clone(),
1357 exec_arn.clone(),
1358 definition,
1359 Some(input),
1360 self.delivery.clone(),
1361 self.dynamodb_state.clone(),
1362 self.registry.clone(),
1363 logging_config,
1364 )
1365 .await;
1366
1367 {
1370 let mut accounts = self.state.write();
1371 if let Some(state) = accounts.get_mut(&req.account_id) {
1372 if let Some(exec) = state.executions.get_mut(&exec_arn) {
1373 let duration_ms = exec
1374 .stop_date
1375 .map_or(0, |stop| (stop - exec.start_date).num_milliseconds())
1376 .max(0);
1377 exec.billed_duration_ms = Some(duration_ms);
1378 exec.billed_memory_mb = Some(64);
1379 }
1380 }
1381 }
1382
1383 let accounts = self.state.read();
1384 let state = accounts.get(&req.account_id).unwrap();
1385 let exec = state
1386 .executions
1387 .get(&exec_arn)
1388 .ok_or_else(|| execution_not_found(&exec_arn))?;
1389
1390 let mut resp = json!({
1391 "executionArn": exec.execution_arn,
1392 "stateMachineArn": exec.state_machine_arn,
1393 "name": exec.name,
1394 "startDate": exec.start_date.timestamp(),
1395 "stopDate": exec.stop_date.map(|d| d.timestamp()),
1396 "status": exec.status.as_str(),
1397 "input": exec.input.as_deref().unwrap_or("{}"),
1398 });
1399
1400 if let Some(ref output) = exec.output {
1401 resp["output"] = json!(output);
1402 }
1403 if let Some(ref error) = exec.error {
1404 resp["error"] = json!(error);
1405 }
1406 if let Some(ref cause) = exec.cause {
1407 resp["cause"] = json!(cause);
1408 }
1409
1410 let duration_ms = exec
1411 .stop_date
1412 .map_or(0, |stop| (stop - exec.start_date).num_milliseconds());
1413 resp["billingDetails"] = json!({
1414 "billedMemoryUsedInMB": 64,
1415 "billedDurationInMilliseconds": duration_ms.max(0),
1416 });
1417
1418 Ok(AwsResponse::ok_json(resp))
1419 }
1420
1421 fn test_state(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1422 let body = req.json_body();
1423 let definition = body["definition"]
1424 .as_str()
1425 .ok_or_else(|| missing("definition"))?;
1426 validate_definition(definition)?;
1427 let _role_arn = body["roleArn"].as_str().ok_or_else(|| missing("roleArn"))?;
1428 let input = body["input"].as_str().unwrap_or("{}").to_string();
1429 Ok(AwsResponse::ok_json(json!({
1433 "output": input,
1434 "status": "SUCCEEDED",
1435 "nextState": "End",
1436 })))
1437 }
1438
1439 fn validate_state_machine_definition(
1440 &self,
1441 req: &AwsRequest,
1442 ) -> Result<AwsResponse, AwsServiceError> {
1443 let body = req.json_body();
1444 let definition = body["definition"]
1445 .as_str()
1446 .ok_or_else(|| missing("definition"))?;
1447 match validate_definition(definition) {
1448 Ok(()) => Ok(AwsResponse::ok_json(json!({
1449 "result": "OK",
1450 "diagnostics": [],
1451 }))),
1452 Err(e) => Ok(AwsResponse::ok_json(json!({
1453 "result": "FAIL",
1454 "diagnostics": [{
1455 "severity": "ERROR",
1456 "code": "INVALID_DEFINITION",
1457 "message": e.to_string(),
1458 }],
1459 }))),
1460 }
1461 }
1462}
1463
1464fn state_machine_alias_to_json(alias: &crate::state::StateMachineAlias) -> Value {
1465 json!({
1466 "stateMachineAliasArn": alias.arn,
1467 "name": alias.name,
1468 "description": alias.description,
1469 "routingConfiguration": alias.routing_configuration.iter().map(|r| json!({
1470 "stateMachineVersionArn": r.state_machine_version_arn,
1471 "weight": r.weight,
1472 })).collect::<Vec<_>>(),
1473 "creationDate": alias.creation_date.timestamp(),
1474 "updateDate": alias.update_date.timestamp(),
1475 })
1476}
1477
1478fn map_run_to_json(mr: &crate::state::MapRun) -> Value {
1479 json!({
1480 "mapRunArn": mr.map_run_arn,
1481 "executionArn": mr.execution_arn,
1482 "maxConcurrency": mr.max_concurrency,
1483 "toleratedFailurePercentage": mr.tolerated_failure_percentage,
1484 "toleratedFailureCount": mr.tolerated_failure_count,
1485 "status": mr.status,
1486 "startDate": mr.start_date.timestamp(),
1487 "stopDate": mr.stop_date.map(|d| d.timestamp()),
1488 })
1489}
1490
1491fn state_machine_to_json(sm: &StateMachine) -> Value {
1494 let mut resp = json!({
1495 "name": sm.name,
1496 "stateMachineArn": sm.arn,
1497 "definition": sm.definition,
1498 "roleArn": sm.role_arn,
1499 "type": sm.machine_type.as_str(),
1500 "status": sm.status.as_str(),
1501 "creationDate": sm.creation_date.timestamp() as f64,
1502 "updateDate": sm.update_date.timestamp() as f64,
1503 "revisionId": sm.revision_id,
1504 "label": sm.name,
1505 });
1506
1507 if !sm.description.is_empty() {
1508 resp["description"] = json!(sm.description);
1509 }
1510
1511 if let Some(ref logging) = sm.logging_configuration {
1512 resp["loggingConfiguration"] = logging.clone();
1513 } else {
1514 resp["loggingConfiguration"] = json!({
1515 "level": "OFF",
1516 "includeExecutionData": false,
1517 "destinations": [],
1518 });
1519 }
1520
1521 if let Some(ref tracing) = sm.tracing_configuration {
1522 resp["tracingConfiguration"] = tracing.clone();
1523 } else {
1524 resp["tracingConfiguration"] = json!({
1525 "enabled": false,
1526 });
1527 }
1528
1529 resp
1530}
1531
1532fn missing(name: &str) -> AwsServiceError {
1533 AwsServiceError::aws_error(
1534 StatusCode::BAD_REQUEST,
1535 "ValidationException",
1536 format!("The request must contain the parameter {name}."),
1537 )
1538}
1539
1540fn state_machine_not_found(arn: &str) -> AwsServiceError {
1541 AwsServiceError::aws_error(
1542 StatusCode::BAD_REQUEST,
1543 "StateMachineDoesNotExist",
1544 format!("State Machine Does Not Exist: '{arn}'"),
1545 )
1546}
1547
1548fn activity_not_found(arn: &str) -> AwsServiceError {
1549 AwsServiceError::aws_error(
1550 StatusCode::BAD_REQUEST,
1551 "ActivityDoesNotExist",
1552 format!("Activity does not exist: {arn}"),
1553 )
1554}
1555
1556fn task_does_not_exist(token: &str) -> AwsServiceError {
1557 AwsServiceError::aws_error(
1558 StatusCode::BAD_REQUEST,
1559 "TaskDoesNotExist",
1560 format!("Task does not exist: {token}"),
1561 )
1562}
1563
1564fn resource_not_found(arn: &str) -> AwsServiceError {
1565 AwsServiceError::aws_error(
1566 StatusCode::BAD_REQUEST,
1567 "ResourceNotFound",
1568 format!("Resource not found: '{arn}'"),
1569 )
1570}
1571
1572fn parse_routing_configuration(
1577 routes: &[serde_json::Value],
1578) -> Result<Vec<crate::state::AliasRoute>, AwsServiceError> {
1579 if routes.is_empty() || routes.len() > 2 {
1580 return Err(AwsServiceError::aws_error(
1581 StatusCode::BAD_REQUEST,
1582 "ValidationException",
1583 "routingConfiguration must contain 1 or 2 routes.",
1584 ));
1585 }
1586 let parsed: Vec<crate::state::AliasRoute> = routes
1587 .iter()
1588 .map(|r| {
1589 let arn = r["stateMachineVersionArn"].as_str().ok_or_else(|| {
1590 AwsServiceError::aws_error(
1591 StatusCode::BAD_REQUEST,
1592 "ValidationException",
1593 "routingConfiguration entries must contain stateMachineVersionArn.",
1594 )
1595 })?;
1596 let weight = r["weight"].as_i64().ok_or_else(|| {
1597 AwsServiceError::aws_error(
1598 StatusCode::BAD_REQUEST,
1599 "ValidationException",
1600 "routingConfiguration entries must contain a numeric weight.",
1601 )
1602 })?;
1603 if !(0..=100).contains(&weight) {
1604 return Err(AwsServiceError::aws_error(
1605 StatusCode::BAD_REQUEST,
1606 "ValidationException",
1607 format!("Invalid routing weight {weight}; must be 0-100."),
1608 ));
1609 }
1610 Ok(crate::state::AliasRoute {
1611 state_machine_version_arn: arn.to_string(),
1612 weight: weight as i32,
1613 })
1614 })
1615 .collect::<Result<_, _>>()?;
1616 let total: i32 = parsed.iter().map(|r| r.weight).sum();
1617 if total != 100 {
1618 return Err(AwsServiceError::aws_error(
1619 StatusCode::BAD_REQUEST,
1620 "ValidationException",
1621 format!("routingConfiguration weights must sum to 100, got {total}."),
1622 ));
1623 }
1624 Ok(parsed)
1625}
1626
1627fn validate_name(name: &str) -> Result<(), AwsServiceError> {
1628 if name.is_empty() || name.len() > 80 {
1629 return Err(AwsServiceError::aws_error(
1630 StatusCode::BAD_REQUEST,
1631 "InvalidName",
1632 format!("Invalid Name: '{name}' (length must be between 1 and 80 characters)"),
1633 ));
1634 }
1635 if !name
1637 .chars()
1638 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
1639 {
1640 return Err(AwsServiceError::aws_error(
1641 StatusCode::BAD_REQUEST,
1642 "InvalidName",
1643 format!(
1644 "Invalid Name: '{name}' (must only contain alphanumeric characters, hyphens, and underscores)"
1645 ),
1646 ));
1647 }
1648 Ok(())
1649}
1650
1651fn validate_definition(definition: &str) -> Result<(), AwsServiceError> {
1652 let parsed: Value = serde_json::from_str(definition).map_err(|e| {
1653 AwsServiceError::aws_error(
1654 StatusCode::BAD_REQUEST,
1655 "InvalidDefinition",
1656 format!("Invalid State Machine Definition: '{e}'"),
1657 )
1658 })?;
1659
1660 if parsed.get("StartAt").and_then(|v| v.as_str()).is_none() {
1661 return Err(AwsServiceError::aws_error(
1662 StatusCode::BAD_REQUEST,
1663 "InvalidDefinition",
1664 "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
1665 .to_string(),
1666 ));
1667 }
1668
1669 let states_obj = parsed
1670 .get("States")
1671 .and_then(|v| v.as_object())
1672 .ok_or_else(|| {
1673 AwsServiceError::aws_error(
1674 StatusCode::BAD_REQUEST,
1675 "InvalidDefinition",
1676 "Invalid State Machine Definition: 'MISSING_STATES' (States field is required)"
1677 .to_string(),
1678 )
1679 })?;
1680
1681 let start_at = parsed["StartAt"].as_str().ok_or_else(|| {
1682 AwsServiceError::aws_error(
1683 StatusCode::BAD_REQUEST,
1684 "InvalidDefinition",
1685 "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
1686 .to_string(),
1687 )
1688 })?;
1689 if !states_obj.contains_key(start_at) {
1690 return Err(AwsServiceError::aws_error(
1691 StatusCode::BAD_REQUEST,
1692 "InvalidDefinition",
1693 format!(
1694 "Invalid State Machine Definition: 'MISSING_TRANSITION_TARGET' \
1695 (StartAt '{start_at}' does not reference a valid state)"
1696 ),
1697 ));
1698 }
1699
1700 Ok(())
1701}
1702
1703fn execution_not_found(arn: &str) -> AwsServiceError {
1704 AwsServiceError::aws_error(
1705 StatusCode::BAD_REQUEST,
1706 "ExecutionDoesNotExist",
1707 format!("Execution Does Not Exist: '{arn}'"),
1708 )
1709}
1710
1711fn execution_to_json(exec: &Execution) -> Value {
1712 let mut resp = json!({
1713 "executionArn": exec.execution_arn,
1714 "stateMachineArn": exec.state_machine_arn,
1715 "name": exec.name,
1716 "status": exec.status.as_str(),
1717 "startDate": exec.start_date.timestamp() as f64,
1718 });
1719
1720 if let Some(ref input) = exec.input {
1721 resp["input"] = json!(input);
1722 }
1723 if let Some(ref output) = exec.output {
1724 resp["output"] = json!(output);
1725 }
1726 if let Some(stop) = exec.stop_date {
1727 resp["stopDate"] = json!(stop.timestamp() as f64);
1728 }
1729 if let Some(ref error) = exec.error {
1730 resp["error"] = json!(error);
1731 }
1732 if let Some(ref cause) = exec.cause {
1733 resp["cause"] = json!(cause);
1734 }
1735
1736 resp
1737}
1738
1739fn camel_to_details_key(event_type: &str) -> String {
1741 let mut chars = event_type.chars();
1742 match chars.next() {
1743 None => String::new(),
1744 Some(c) => c.to_lowercase().to_string() + chars.as_str(),
1745 }
1746}
1747
1748fn validate_arn(arn: &str) -> Result<(), AwsServiceError> {
1749 if !arn.starts_with("arn:") {
1750 return Err(AwsServiceError::aws_error(
1751 StatusCode::BAD_REQUEST,
1752 "InvalidArn",
1753 format!("Invalid Arn: '{arn}'"),
1754 ));
1755 }
1756 Ok(())
1757}
1758
1759pub fn start_execution_from_delivery(
1768 state: &SharedStepFunctionsState,
1769 delivery: &Option<Arc<DeliveryBus>>,
1770 dynamodb_state: &Option<SharedDynamoDbState>,
1771 registry: &Option<SharedServiceRegistry>,
1772 state_machine_arn: &str,
1773 input: &str,
1774) {
1775 if serde_json::from_str::<serde_json::Value>(input).is_err() {
1777 tracing::warn!(
1778 state_machine_arn,
1779 "Step Functions delivery: invalid JSON input, skipping execution"
1780 );
1781 return;
1782 }
1783
1784 let execution_name = uuid::Uuid::new_v4().to_string();
1785
1786 let account_id = state_machine_arn
1788 .split(':')
1789 .nth(4)
1790 .unwrap_or("000000000000")
1791 .to_string();
1792
1793 let mut accounts = state.write();
1794 let st = accounts.get_or_create(&account_id);
1795 let sm = match st.state_machines.get(state_machine_arn) {
1796 Some(sm) => sm,
1797 None => {
1798 tracing::warn!(
1799 state_machine_arn,
1800 "Step Functions delivery: state machine not found"
1801 );
1802 return;
1803 }
1804 };
1805
1806 let sm_name = sm.name.clone();
1807 let definition = sm.definition.clone();
1808 let exec_arn = st.execution_arn(&sm_name, &execution_name);
1809
1810 let now = Utc::now();
1811 let execution = Execution {
1812 execution_arn: exec_arn.clone(),
1813 state_machine_arn: state_machine_arn.to_string(),
1814 state_machine_name: sm_name,
1815 name: execution_name,
1816 status: ExecutionStatus::Running,
1817 input: Some(input.to_string()),
1818 output: None,
1819 start_date: now,
1820 stop_date: None,
1821 error: None,
1822 cause: None,
1823 history_events: vec![],
1824 parent_execution_arn: None,
1825 is_sync: false,
1826 billed_duration_ms: None,
1827 billed_memory_mb: None,
1828 };
1829
1830 st.executions.insert(exec_arn.clone(), execution);
1831 let logging_config = sm.logging_configuration.clone();
1832 drop(accounts);
1833
1834 let shared_state = state.clone();
1835 let delivery = delivery.clone();
1836 let dynamodb_state = dynamodb_state.clone();
1837 let registry = registry.clone();
1838 let input = Some(input.to_string());
1839 tokio::spawn(async move {
1840 interpreter::execute_state_machine(
1841 shared_state,
1842 exec_arn,
1843 definition,
1844 input,
1845 delivery,
1846 dynamodb_state,
1847 registry,
1848 logging_config,
1849 )
1850 .await;
1851 });
1852}
1853
1854#[cfg(test)]
1855mod tests {
1856 use super::*;
1857 use http::{HeaderMap, Method};
1858 use parking_lot::RwLock;
1859 use serde_json::Value;
1860 use std::collections::HashMap;
1861 use std::sync::Arc;
1862
1863 fn make_state() -> SharedStepFunctionsState {
1864 Arc::new(RwLock::new(
1865 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
1866 ))
1867 }
1868
1869 fn make_request(action: &str, body: &str) -> AwsRequest {
1870 AwsRequest {
1871 service: "states".to_string(),
1872 action: action.to_string(),
1873 region: "us-east-1".to_string(),
1874 account_id: "123456789012".to_string(),
1875 request_id: "test-id".to_string(),
1876 headers: HeaderMap::new(),
1877 query_params: HashMap::new(),
1878 body: body.as_bytes().to_vec().into(),
1879 body_stream: parking_lot::Mutex::new(None),
1880 path_segments: vec![],
1881 raw_path: "/".to_string(),
1882 raw_query: String::new(),
1883 method: Method::POST,
1884 is_query_protocol: false,
1885 access_key_id: None,
1886 principal: None,
1887 }
1888 }
1889
1890 fn body_json(resp: &AwsResponse) -> Value {
1891 serde_json::from_slice(resp.body.expect_bytes()).unwrap()
1892 }
1893
1894 fn expect_err(result: Result<AwsResponse, AwsServiceError>) -> AwsServiceError {
1895 match result {
1896 Err(e) => e,
1897 Ok(_) => panic!("expected error, got Ok"),
1898 }
1899 }
1900
1901 const VALID_DEF: &str = r#"{"StartAt":"Pass","States":{"Pass":{"Type":"Pass","End":true}}}"#;
1902
1903 fn create_sm(svc: &StepFunctionsService, name: &str) -> String {
1904 let body = json!({
1905 "name": name,
1906 "definition": VALID_DEF,
1907 "roleArn": "arn:aws:iam::123456789012:role/test",
1908 });
1909 let req = make_request("CreateStateMachine", &body.to_string());
1910 let resp = svc.create_state_machine(&req).unwrap();
1911 let b = body_json(&resp);
1912 b["stateMachineArn"].as_str().unwrap().to_string()
1913 }
1914
1915 #[test]
1918 fn create_state_machine_basic() {
1919 let svc = StepFunctionsService::new(make_state());
1920 let arn = create_sm(&svc, "test-sm");
1921 assert!(arn.contains("test-sm"));
1922 }
1923
1924 #[test]
1925 fn create_state_machine_with_express_type() {
1926 let svc = StepFunctionsService::new(make_state());
1927 let body = json!({
1928 "name": "express-sm",
1929 "definition": VALID_DEF,
1930 "roleArn": "arn:aws:iam::123456789012:role/r",
1931 "type": "EXPRESS",
1932 });
1933 let req = make_request("CreateStateMachine", &body.to_string());
1934 let resp = svc.create_state_machine(&req).unwrap();
1935 let b = body_json(&resp);
1936 assert!(b["stateMachineArn"].as_str().is_some());
1937 }
1938
1939 #[test]
1940 fn create_state_machine_duplicate_fails() {
1941 let svc = StepFunctionsService::new(make_state());
1942 create_sm(&svc, "dup-sm");
1943 let body = json!({
1944 "name": "dup-sm",
1945 "definition": VALID_DEF,
1946 "roleArn": "arn:aws:iam::123456789012:role/r",
1947 });
1948 let req = make_request("CreateStateMachine", &body.to_string());
1949 let err = expect_err(svc.create_state_machine(&req));
1950 assert!(err.to_string().contains("StateMachineAlreadyExists"));
1951 }
1952
1953 #[test]
1954 fn create_state_machine_missing_name() {
1955 let svc = StepFunctionsService::new(make_state());
1956 let body = json!({
1957 "definition": VALID_DEF,
1958 "roleArn": "arn:aws:iam::123456789012:role/r",
1959 });
1960 let req = make_request("CreateStateMachine", &body.to_string());
1961 assert!(svc.create_state_machine(&req).is_err());
1962 }
1963
1964 #[test]
1965 fn create_state_machine_invalid_definition() {
1966 let svc = StepFunctionsService::new(make_state());
1967 let body = json!({
1968 "name": "bad-def",
1969 "definition": "not json",
1970 "roleArn": "arn:aws:iam::123456789012:role/r",
1971 });
1972 let req = make_request("CreateStateMachine", &body.to_string());
1973 let err = expect_err(svc.create_state_machine(&req));
1974 assert!(err.to_string().contains("InvalidDefinition"));
1975 }
1976
1977 #[test]
1978 fn create_state_machine_definition_missing_start_at() {
1979 let svc = StepFunctionsService::new(make_state());
1980 let body = json!({
1981 "name": "no-start",
1982 "definition": r#"{"States":{"S":{"Type":"Pass","End":true}}}"#,
1983 "roleArn": "arn:aws:iam::123456789012:role/r",
1984 });
1985 let req = make_request("CreateStateMachine", &body.to_string());
1986 let err = expect_err(svc.create_state_machine(&req));
1987 assert!(err.to_string().contains("InvalidDefinition"));
1988 }
1989
1990 #[test]
1991 fn create_state_machine_definition_missing_states() {
1992 let svc = StepFunctionsService::new(make_state());
1993 let body = json!({
1994 "name": "no-states",
1995 "definition": r#"{"StartAt":"S"}"#,
1996 "roleArn": "arn:aws:iam::123456789012:role/r",
1997 });
1998 let req = make_request("CreateStateMachine", &body.to_string());
1999 let err = expect_err(svc.create_state_machine(&req));
2000 assert!(err.to_string().contains("InvalidDefinition"));
2001 }
2002
2003 #[test]
2004 fn create_state_machine_definition_start_at_not_in_states() {
2005 let svc = StepFunctionsService::new(make_state());
2006 let body = json!({
2007 "name": "bad-start",
2008 "definition": r#"{"StartAt":"Missing","States":{"S":{"Type":"Pass","End":true}}}"#,
2009 "roleArn": "arn:aws:iam::123456789012:role/r",
2010 });
2011 let req = make_request("CreateStateMachine", &body.to_string());
2012 let err = expect_err(svc.create_state_machine(&req));
2013 assert!(err.to_string().contains("MISSING_TRANSITION_TARGET"));
2014 }
2015
2016 #[test]
2017 fn create_state_machine_invalid_type() {
2018 let svc = StepFunctionsService::new(make_state());
2019 let body = json!({
2020 "name": "bad-type",
2021 "definition": VALID_DEF,
2022 "roleArn": "arn:aws:iam::123456789012:role/r",
2023 "type": "INVALID",
2024 });
2025 let req = make_request("CreateStateMachine", &body.to_string());
2026 assert!(svc.create_state_machine(&req).is_err());
2027 }
2028
2029 #[test]
2030 fn create_state_machine_invalid_arn() {
2031 let svc = StepFunctionsService::new(make_state());
2032 let body = json!({
2033 "name": "bad-arn",
2034 "definition": VALID_DEF,
2035 "roleArn": "not-an-arn",
2036 });
2037 let req = make_request("CreateStateMachine", &body.to_string());
2038 let err = expect_err(svc.create_state_machine(&req));
2039 assert!(err.to_string().contains("InvalidArn"));
2040 }
2041
2042 #[test]
2043 fn create_state_machine_invalid_name() {
2044 let svc = StepFunctionsService::new(make_state());
2045 let body = json!({
2046 "name": "has spaces!",
2047 "definition": VALID_DEF,
2048 "roleArn": "arn:aws:iam::123456789012:role/r",
2049 });
2050 let req = make_request("CreateStateMachine", &body.to_string());
2051 let err = expect_err(svc.create_state_machine(&req));
2052 assert!(err.to_string().contains("InvalidName"));
2053 }
2054
2055 #[test]
2056 fn create_state_machine_name_too_long() {
2057 let svc = StepFunctionsService::new(make_state());
2058 let long_name = "a".repeat(81);
2059 let body = json!({
2060 "name": long_name,
2061 "definition": VALID_DEF,
2062 "roleArn": "arn:aws:iam::123456789012:role/r",
2063 });
2064 let req = make_request("CreateStateMachine", &body.to_string());
2065 let err = expect_err(svc.create_state_machine(&req));
2066 assert!(err.to_string().contains("InvalidName"));
2067 }
2068
2069 #[test]
2072 fn describe_state_machine_found() {
2073 let svc = StepFunctionsService::new(make_state());
2074 let arn = create_sm(&svc, "desc-sm");
2075
2076 let req = make_request(
2077 "DescribeStateMachine",
2078 &json!({"stateMachineArn": arn}).to_string(),
2079 );
2080 let resp = svc.describe_state_machine(&req).unwrap();
2081 let b = body_json(&resp);
2082 assert_eq!(b["name"], "desc-sm");
2083 assert_eq!(b["status"], "ACTIVE");
2084 assert!(b["definition"].as_str().is_some());
2085 }
2086
2087 #[test]
2088 fn describe_state_machine_not_found() {
2089 let svc = StepFunctionsService::new(make_state());
2090 let req = make_request(
2091 "DescribeStateMachine",
2092 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
2093 .to_string(),
2094 );
2095 let err = expect_err(svc.describe_state_machine(&req));
2096 assert!(err.to_string().contains("StateMachineDoesNotExist"));
2097 }
2098
2099 #[test]
2102 fn list_state_machines_empty() {
2103 let svc = StepFunctionsService::new(make_state());
2104 let req = make_request("ListStateMachines", "{}");
2105 let resp = svc.list_state_machines(&req).unwrap();
2106 let b = body_json(&resp);
2107 assert!(b["stateMachines"].as_array().unwrap().is_empty());
2108 }
2109
2110 #[test]
2111 fn list_state_machines_returns_created() {
2112 let svc = StepFunctionsService::new(make_state());
2113 create_sm(&svc, "sm-1");
2114 create_sm(&svc, "sm-2");
2115
2116 let req = make_request("ListStateMachines", "{}");
2117 let resp = svc.list_state_machines(&req).unwrap();
2118 let b = body_json(&resp);
2119 assert_eq!(b["stateMachines"].as_array().unwrap().len(), 2);
2120 }
2121
2122 #[test]
2125 fn delete_state_machine() {
2126 let svc = StepFunctionsService::new(make_state());
2127 let arn = create_sm(&svc, "del-sm");
2128
2129 let req = make_request(
2130 "DeleteStateMachine",
2131 &json!({"stateMachineArn": arn}).to_string(),
2132 );
2133 svc.delete_state_machine(&req).unwrap();
2134
2135 let req = make_request(
2137 "DescribeStateMachine",
2138 &json!({"stateMachineArn": arn}).to_string(),
2139 );
2140 assert!(svc.describe_state_machine(&req).is_err());
2141 }
2142
2143 #[test]
2144 fn delete_state_machine_nonexistent_succeeds() {
2145 let svc = StepFunctionsService::new(make_state());
2146 let req = make_request(
2147 "DeleteStateMachine",
2148 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
2149 .to_string(),
2150 );
2151 svc.delete_state_machine(&req).unwrap();
2153 }
2154
2155 #[test]
2158 fn update_state_machine() {
2159 let svc = StepFunctionsService::new(make_state());
2160 let arn = create_sm(&svc, "upd-sm");
2161
2162 let new_def = r#"{"StartAt":"NewPass","States":{"NewPass":{"Type":"Pass","End":true}}}"#;
2163 let body = json!({
2164 "stateMachineArn": arn,
2165 "definition": new_def,
2166 "description": "updated",
2167 });
2168 let req = make_request("UpdateStateMachine", &body.to_string());
2169 let resp = svc.update_state_machine(&req).unwrap();
2170 let b = body_json(&resp);
2171 assert!(b["updateDate"].as_f64().is_some());
2172
2173 let req = make_request(
2175 "DescribeStateMachine",
2176 &json!({"stateMachineArn": arn}).to_string(),
2177 );
2178 let resp = svc.describe_state_machine(&req).unwrap();
2179 let b = body_json(&resp);
2180 assert!(b["definition"].as_str().unwrap().contains("NewPass"));
2181 assert_eq!(b["description"], "updated");
2182 }
2183
2184 #[test]
2185 fn update_state_machine_not_found() {
2186 let svc = StepFunctionsService::new(make_state());
2187 let body = json!({
2188 "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
2189 "definition": VALID_DEF,
2190 });
2191 let req = make_request("UpdateStateMachine", &body.to_string());
2192 let err = expect_err(svc.update_state_machine(&req));
2193 assert!(err.to_string().contains("StateMachineDoesNotExist"));
2194 }
2195
2196 #[tokio::test]
2199 async fn start_execution_basic() {
2200 let svc = StepFunctionsService::new(make_state());
2201 let arn = create_sm(&svc, "exec-sm");
2202
2203 let body = json!({
2204 "stateMachineArn": arn,
2205 "input": r#"{"key":"value"}"#,
2206 });
2207 let req = make_request("StartExecution", &body.to_string());
2208 let resp = svc.start_execution(&req).unwrap();
2209 let b = body_json(&resp);
2210 assert!(b["executionArn"].as_str().is_some());
2211 assert!(b["startDate"].as_f64().is_some());
2212 }
2213
2214 #[tokio::test]
2215 async fn start_execution_with_name() {
2216 let svc = StepFunctionsService::new(make_state());
2217 let arn = create_sm(&svc, "named-exec");
2218
2219 let body = json!({
2220 "stateMachineArn": arn,
2221 "name": "my-execution",
2222 });
2223 let req = make_request("StartExecution", &body.to_string());
2224 let resp = svc.start_execution(&req).unwrap();
2225 let b = body_json(&resp);
2226 assert!(b["executionArn"].as_str().unwrap().contains("my-execution"));
2227 }
2228
2229 #[tokio::test]
2230 async fn start_execution_sm_not_found() {
2231 let svc = StepFunctionsService::new(make_state());
2232 let body = json!({
2233 "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
2234 });
2235 let req = make_request("StartExecution", &body.to_string());
2236 let err = expect_err(svc.start_execution(&req));
2237 assert!(err.to_string().contains("StateMachineDoesNotExist"));
2238 }
2239
2240 #[tokio::test]
2241 async fn start_execution_invalid_input() {
2242 let svc = StepFunctionsService::new(make_state());
2243 let arn = create_sm(&svc, "bad-input");
2244
2245 let body = json!({
2246 "stateMachineArn": arn,
2247 "input": "not json",
2248 });
2249 let req = make_request("StartExecution", &body.to_string());
2250 let err = expect_err(svc.start_execution(&req));
2251 assert!(err.to_string().contains("InvalidExecutionInput"));
2252 }
2253
2254 #[tokio::test]
2255 async fn start_execution_duplicate_name() {
2256 let svc = StepFunctionsService::new(make_state());
2257 let arn = create_sm(&svc, "dup-exec");
2258
2259 let body = json!({
2260 "stateMachineArn": arn,
2261 "name": "same-name",
2262 });
2263 let req = make_request("StartExecution", &body.to_string());
2264 svc.start_execution(&req).unwrap();
2265
2266 let req = make_request("StartExecution", &body.to_string());
2267 let err = expect_err(svc.start_execution(&req));
2268 assert!(err.to_string().contains("ExecutionAlreadyExists"));
2269 }
2270
2271 #[tokio::test]
2274 async fn describe_execution_found() {
2275 let svc = StepFunctionsService::new(make_state());
2276 let sm_arn = create_sm(&svc, "desc-exec");
2277
2278 let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
2279 let req = make_request("StartExecution", &body.to_string());
2280 let resp = svc.start_execution(&req).unwrap();
2281 let exec_arn = body_json(&resp)["executionArn"]
2282 .as_str()
2283 .unwrap()
2284 .to_string();
2285
2286 let req = make_request(
2287 "DescribeExecution",
2288 &json!({"executionArn": exec_arn}).to_string(),
2289 );
2290 let resp = svc.describe_execution(&req).unwrap();
2291 let b = body_json(&resp);
2292 assert_eq!(b["name"], "e1");
2293 assert_eq!(b["status"], "RUNNING");
2294 }
2295
2296 #[tokio::test]
2297 async fn describe_execution_not_found() {
2298 let svc = StepFunctionsService::new(make_state());
2299 let req = make_request(
2300 "DescribeExecution",
2301 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
2302 .to_string(),
2303 );
2304 let err = expect_err(svc.describe_execution(&req));
2305 assert!(err.to_string().contains("ExecutionDoesNotExist"));
2306 }
2307
2308 #[tokio::test]
2311 async fn stop_execution() {
2312 let svc = StepFunctionsService::new(make_state());
2313 let sm_arn = create_sm(&svc, "stop-sm");
2314
2315 let body = json!({"stateMachineArn": sm_arn, "name": "stop-e"});
2316 let req = make_request("StartExecution", &body.to_string());
2317 let resp = svc.start_execution(&req).unwrap();
2318 let exec_arn = body_json(&resp)["executionArn"]
2319 .as_str()
2320 .unwrap()
2321 .to_string();
2322
2323 let body = json!({
2324 "executionArn": exec_arn,
2325 "error": "UserAborted",
2326 "cause": "test stop",
2327 });
2328 let req = make_request("StopExecution", &body.to_string());
2329 let resp = svc.stop_execution(&req).unwrap();
2330 let b = body_json(&resp);
2331 assert!(b["stopDate"].as_f64().is_some());
2332
2333 let req = make_request(
2335 "DescribeExecution",
2336 &json!({"executionArn": exec_arn}).to_string(),
2337 );
2338 let resp = svc.describe_execution(&req).unwrap();
2339 let b = body_json(&resp);
2340 assert_eq!(b["status"], "ABORTED");
2341 assert_eq!(b["error"], "UserAborted");
2342 }
2343
2344 #[tokio::test]
2345 async fn stop_execution_not_found() {
2346 let svc = StepFunctionsService::new(make_state());
2347 let req = make_request(
2348 "StopExecution",
2349 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
2350 .to_string(),
2351 );
2352 let err = expect_err(svc.stop_execution(&req));
2353 assert!(err.to_string().contains("ExecutionDoesNotExist"));
2354 }
2355
2356 #[tokio::test]
2359 async fn list_executions() {
2360 let svc = StepFunctionsService::new(make_state());
2361 let sm_arn = create_sm(&svc, "list-exec");
2362
2363 for i in 0..3 {
2364 let body = json!({"stateMachineArn": sm_arn, "name": format!("e{i}")});
2365 let req = make_request("StartExecution", &body.to_string());
2366 svc.start_execution(&req).unwrap();
2367 }
2368
2369 let req = make_request(
2370 "ListExecutions",
2371 &json!({"stateMachineArn": sm_arn}).to_string(),
2372 );
2373 let resp = svc.list_executions(&req).unwrap();
2374 let b = body_json(&resp);
2375 assert_eq!(b["executions"].as_array().unwrap().len(), 3);
2376 }
2377
2378 #[tokio::test]
2379 async fn list_executions_sm_not_found() {
2380 let svc = StepFunctionsService::new(make_state());
2381 let req = make_request(
2382 "ListExecutions",
2383 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
2384 .to_string(),
2385 );
2386 let err = expect_err(svc.list_executions(&req));
2387 assert!(err.to_string().contains("StateMachineDoesNotExist"));
2388 }
2389
2390 #[tokio::test]
2393 async fn get_execution_history_not_found() {
2394 let svc = StepFunctionsService::new(make_state());
2395 let req = make_request(
2396 "GetExecutionHistory",
2397 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
2398 .to_string(),
2399 );
2400 let err = expect_err(svc.get_execution_history(&req));
2401 assert!(err.to_string().contains("ExecutionDoesNotExist"));
2402 }
2403
2404 #[tokio::test]
2407 async fn describe_sm_for_execution() {
2408 let svc = StepFunctionsService::new(make_state());
2409 let sm_arn = create_sm(&svc, "sm-for-exec");
2410
2411 let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
2412 let req = make_request("StartExecution", &body.to_string());
2413 let resp = svc.start_execution(&req).unwrap();
2414 let exec_arn = body_json(&resp)["executionArn"]
2415 .as_str()
2416 .unwrap()
2417 .to_string();
2418
2419 let req = make_request(
2420 "DescribeStateMachineForExecution",
2421 &json!({"executionArn": exec_arn}).to_string(),
2422 );
2423 let resp = svc.describe_state_machine_for_execution(&req).unwrap();
2424 let b = body_json(&resp);
2425 assert_eq!(b["name"], "sm-for-exec");
2426 }
2427
2428 #[test]
2431 fn tag_untag_list_tags() {
2432 let svc = StepFunctionsService::new(make_state());
2433 let arn = create_sm(&svc, "tagged-sm");
2434
2435 let body = json!({
2437 "resourceArn": arn,
2438 "tags": [{"key": "env", "value": "prod"}],
2439 });
2440 let req = make_request("TagResource", &body.to_string());
2441 svc.tag_resource(&req).unwrap();
2442
2443 let req = make_request(
2445 "ListTagsForResource",
2446 &json!({"resourceArn": arn}).to_string(),
2447 );
2448 let resp = svc.list_tags_for_resource(&req).unwrap();
2449 let b = body_json(&resp);
2450 let tags = b["tags"].as_array().unwrap();
2451 assert_eq!(tags.len(), 1);
2452 assert_eq!(tags[0]["key"], "env");
2453
2454 let body = json!({
2456 "resourceArn": arn,
2457 "tagKeys": ["env"],
2458 });
2459 let req = make_request("UntagResource", &body.to_string());
2460 svc.untag_resource(&req).unwrap();
2461
2462 let req = make_request(
2464 "ListTagsForResource",
2465 &json!({"resourceArn": arn}).to_string(),
2466 );
2467 let resp = svc.list_tags_for_resource(&req).unwrap();
2468 let b = body_json(&resp);
2469 assert!(b["tags"].as_array().unwrap().is_empty());
2470 }
2471
2472 #[test]
2473 fn tag_resource_not_found() {
2474 let svc = StepFunctionsService::new(make_state());
2475 let body = json!({
2476 "resourceArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
2477 "tags": [{"key": "k", "value": "v"}],
2478 });
2479 let req = make_request("TagResource", &body.to_string());
2480 let err = expect_err(svc.tag_resource(&req));
2481 assert!(err.to_string().contains("ResourceNotFound"));
2482 }
2483
2484 #[test]
2487 fn test_validate_name() {
2488 assert!(validate_name("valid-name").is_ok());
2489 assert!(validate_name("under_score").is_ok());
2490 assert!(validate_name("").is_err());
2491 assert!(validate_name("has spaces").is_err());
2492 assert!(validate_name(&"a".repeat(81)).is_err());
2493 }
2494
2495 #[test]
2496 fn test_validate_definition() {
2497 assert!(validate_definition(VALID_DEF).is_ok());
2498 assert!(validate_definition("not json").is_err());
2499 assert!(validate_definition(r#"{"States":{}}"#).is_err()); assert!(validate_definition(r#"{"StartAt":"S"}"#).is_err()); }
2502
2503 #[test]
2504 fn test_validate_arn() {
2505 assert!(validate_arn("arn:aws:states:us-east-1:123:sm:test").is_ok());
2506 assert!(validate_arn("not-an-arn").is_err());
2507 }
2508
2509 #[test]
2510 fn test_camel_to_details_key() {
2511 assert_eq!(camel_to_details_key("PassStateEntered"), "passStateEntered");
2512 assert_eq!(camel_to_details_key(""), "");
2513 }
2514
2515 #[test]
2516 fn test_is_mutating_action() {
2517 assert!(is_mutating_action("CreateStateMachine"));
2518 assert!(is_mutating_action("StartExecution"));
2519 assert!(!is_mutating_action("DescribeStateMachine"));
2520 assert!(!is_mutating_action("ListStateMachines"));
2521 }
2522
2523 fn create_express_sm(svc: &StepFunctionsService, name: &str) -> String {
2526 let body = json!({
2527 "name": name,
2528 "definition": VALID_DEF,
2529 "roleArn": "arn:aws:iam::123456789012:role/test",
2530 "type": "EXPRESS",
2531 });
2532 let req = make_request("CreateStateMachine", &body.to_string());
2533 let resp = svc.create_state_machine(&req).unwrap();
2534 let b = body_json(&resp);
2535 b["stateMachineArn"].as_str().unwrap().to_string()
2536 }
2537
2538 #[tokio::test]
2539 async fn start_sync_execution_basic() {
2540 let svc = StepFunctionsService::new(make_state());
2541 let arn = create_express_sm(&svc, "sync-sm");
2542
2543 let body = json!({
2544 "stateMachineArn": arn,
2545 "input": r#"{"key":"value"}"#,
2546 });
2547 let req = make_request("StartSyncExecution", &body.to_string());
2548 let resp = svc.start_sync_execution(&req).await.unwrap();
2549 let b = body_json(&resp);
2550 assert!(b["executionArn"]
2551 .as_str()
2552 .unwrap()
2553 .contains("express:sync-sm"));
2554 assert_eq!(b["stateMachineArn"], arn);
2555 assert_eq!(b["status"], "SUCCEEDED");
2556 assert!(b["startDate"].as_i64().is_some());
2557 assert!(b["stopDate"].as_i64().is_some());
2558 assert!(b["output"].as_str().is_some());
2559 assert!(b["billingDetails"]["billedDurationInMilliseconds"]
2560 .as_i64()
2561 .is_some());
2562 }
2563
2564 #[tokio::test]
2565 async fn start_sync_execution_not_express() {
2566 let svc = StepFunctionsService::new(make_state());
2567 let arn = create_sm(&svc, "std-sm");
2568
2569 let body = json!({"stateMachineArn": arn});
2570 let req = make_request("StartSyncExecution", &body.to_string());
2571 let err = expect_err(svc.start_sync_execution(&req).await);
2572 assert!(err.to_string().contains("StateMachineTypeNotSupported"));
2573 }
2574
2575 #[tokio::test]
2576 async fn start_sync_execution_sm_not_found() {
2577 let svc = StepFunctionsService::new(make_state());
2578 let body = json!({
2579 "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
2580 });
2581 let req = make_request("StartSyncExecution", &body.to_string());
2582 let err = expect_err(svc.start_sync_execution(&req).await);
2583 assert!(err.to_string().contains("StateMachineDoesNotExist"));
2584 }
2585
2586 #[tokio::test]
2587 async fn start_sync_execution_records_introspection_fields() {
2588 let svc = StepFunctionsService::new(make_state());
2589 let arn = create_express_sm(&svc, "sync-introspect");
2590
2591 let body = json!({"stateMachineArn": arn, "input": "{}"});
2592 let req = make_request("StartSyncExecution", &body.to_string());
2593 let resp = svc.start_sync_execution(&req).await.unwrap();
2594 let b = body_json(&resp);
2595 let exec_arn = b["executionArn"].as_str().unwrap().to_string();
2596
2597 let accounts = svc.state.read();
2598 let state = accounts.get("123456789012").unwrap();
2599 let stored = state
2600 .executions
2601 .get(&exec_arn)
2602 .expect("sync execution should be persisted for introspection");
2603 assert!(stored.is_sync, "sync executions must be marked is_sync");
2604 assert_eq!(stored.billed_memory_mb, Some(64));
2605 assert!(
2606 stored.billed_duration_ms.is_some(),
2607 "billed_duration_ms must be populated after sync run"
2608 );
2609 assert!(
2610 stored.parent_execution_arn.is_none(),
2611 "top-level sync execution has no parent"
2612 );
2613 }
2614
2615 #[tokio::test]
2616 async fn start_sync_execution_invalid_input() {
2617 let svc = StepFunctionsService::new(make_state());
2618 let arn = create_express_sm(&svc, "bad-input-sync");
2619
2620 let body = json!({
2621 "stateMachineArn": arn,
2622 "input": "not json",
2623 });
2624 let req = make_request("StartSyncExecution", &body.to_string());
2625 let err = expect_err(svc.start_sync_execution(&req).await);
2626 assert!(err.to_string().contains("InvalidExecutionInput"));
2627 }
2628}