1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use chrono::Utc;
6use http::StatusCode;
7use serde_json::{json, Value};
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::pagination::paginate;
12use fakecloud_core::registry::ServiceRegistry;
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_core::validation::*;
15use fakecloud_dynamodb::SharedDynamoDbState;
16use fakecloud_persistence::SnapshotStore;
17
18use crate::interpreter;
19use crate::state::{
20 Execution, ExecutionStatus, SharedStepFunctionsState, StateMachine, StateMachineStatus,
21 StateMachineType, StepFunctionsSnapshot, StepFunctionsState,
22 STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
23};
24
25const SUPPORTED: &[&str] = &[
26 "CreateStateMachine",
27 "DescribeStateMachine",
28 "ListStateMachines",
29 "DeleteStateMachine",
30 "UpdateStateMachine",
31 "TagResource",
32 "UntagResource",
33 "ListTagsForResource",
34 "StartExecution",
35 "StopExecution",
36 "DescribeExecution",
37 "ListExecutions",
38 "GetExecutionHistory",
39 "DescribeStateMachineForExecution",
40 "CreateActivity",
41 "DeleteActivity",
42 "DescribeActivity",
43 "ListActivities",
44 "GetActivityTask",
45 "SendTaskFailure",
46 "SendTaskHeartbeat",
47 "SendTaskSuccess",
48 "PublishStateMachineVersion",
49 "DeleteStateMachineVersion",
50 "ListStateMachineVersions",
51 "CreateStateMachineAlias",
52 "DeleteStateMachineAlias",
53 "DescribeStateMachineAlias",
54 "ListStateMachineAliases",
55 "UpdateStateMachineAlias",
56 "DescribeMapRun",
57 "ListMapRuns",
58 "UpdateMapRun",
59 "RedriveExecution",
60 "StartSyncExecution",
61 "TestState",
62 "ValidateStateMachineDefinition",
63];
64
65pub type SharedServiceRegistry = Arc<std::sync::OnceLock<Arc<ServiceRegistry>>>;
71
72pub struct StepFunctionsService {
73 state: SharedStepFunctionsState,
74 delivery: Option<Arc<DeliveryBus>>,
75 dynamodb_state: Option<SharedDynamoDbState>,
76 registry: Option<SharedServiceRegistry>,
77 snapshot_store: Option<Arc<dyn SnapshotStore>>,
78 snapshot_lock: Arc<AsyncMutex<()>>,
79}
80
81impl StepFunctionsService {
82 pub fn new(state: SharedStepFunctionsState) -> Self {
83 Self {
84 state,
85 delivery: None,
86 dynamodb_state: None,
87 registry: None,
88 snapshot_store: None,
89 snapshot_lock: Arc::new(AsyncMutex::new(())),
90 }
91 }
92
93 pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
94 self.delivery = Some(delivery);
95 self
96 }
97
98 pub fn with_dynamodb(mut self, dynamodb_state: SharedDynamoDbState) -> Self {
99 self.dynamodb_state = Some(dynamodb_state);
100 self
101 }
102
103 pub fn with_registry(mut self, registry: SharedServiceRegistry) -> Self {
108 self.registry = Some(registry);
109 self
110 }
111
112 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
113 self.snapshot_store = Some(store);
114 self
115 }
116
117 async fn save_snapshot(&self) {
118 let Some(store) = self.snapshot_store.clone() else {
119 return;
120 };
121 let _guard = self.snapshot_lock.lock().await;
122 let snapshot = StepFunctionsSnapshot {
123 schema_version: STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
124 state: None,
125 accounts: Some(self.state.read().clone()),
126 };
127 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
128 let bytes = serde_json::to_vec(&snapshot)
129 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
130 store.save(&bytes)
131 })
132 .await;
133 match join {
134 Ok(Ok(())) => {}
135 Ok(Err(err)) => tracing::error!(%err, "failed to write stepfunctions snapshot"),
136 Err(err) => tracing::error!(%err, "stepfunctions snapshot task panicked"),
137 }
138 }
139}
140
141fn is_mutating_action(action: &str) -> bool {
142 matches!(
143 action,
144 "CreateStateMachine"
145 | "DeleteStateMachine"
146 | "UpdateStateMachine"
147 | "TagResource"
148 | "UntagResource"
149 | "StartExecution"
150 | "StopExecution"
151 | "CreateActivity"
152 | "DeleteActivity"
153 | "GetActivityTask"
154 | "SendTaskFailure"
155 | "SendTaskHeartbeat"
156 | "SendTaskSuccess"
157 | "PublishStateMachineVersion"
158 | "DeleteStateMachineVersion"
159 | "CreateStateMachineAlias"
160 | "DeleteStateMachineAlias"
161 | "UpdateStateMachineAlias"
162 | "UpdateMapRun"
163 | "RedriveExecution"
164 | "StartSyncExecution"
165 )
166}
167
168#[async_trait]
169impl AwsService for StepFunctionsService {
170 fn service_name(&self) -> &str {
171 "states"
172 }
173
174 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
175 let mutates = is_mutating_action(req.action.as_str());
176 let result = match req.action.as_str() {
177 "CreateStateMachine" => self.create_state_machine(&req),
178 "DescribeStateMachine" => self.describe_state_machine(&req),
179 "ListStateMachines" => self.list_state_machines(&req),
180 "DeleteStateMachine" => self.delete_state_machine(&req),
181 "UpdateStateMachine" => self.update_state_machine(&req),
182 "TagResource" => self.tag_resource(&req),
183 "UntagResource" => self.untag_resource(&req),
184 "ListTagsForResource" => self.list_tags_for_resource(&req),
185 "StartExecution" => self.start_execution(&req),
186 "StopExecution" => self.stop_execution(&req),
187 "DescribeExecution" => self.describe_execution(&req),
188 "ListExecutions" => self.list_executions(&req),
189 "GetExecutionHistory" => self.get_execution_history(&req),
190 "DescribeStateMachineForExecution" => self.describe_state_machine_for_execution(&req),
191 "CreateActivity" => self.create_activity(&req),
192 "DeleteActivity" => self.delete_activity(&req),
193 "DescribeActivity" => self.describe_activity(&req),
194 "ListActivities" => self.list_activities(&req),
195 "GetActivityTask" => self.get_activity_task(&req).await,
196 "SendTaskFailure" => self.send_task_failure(&req),
197 "SendTaskHeartbeat" => self.send_task_heartbeat(&req),
198 "SendTaskSuccess" => self.send_task_success(&req),
199 "PublishStateMachineVersion" => self.publish_state_machine_version(&req),
200 "DeleteStateMachineVersion" => self.delete_state_machine_version(&req),
201 "ListStateMachineVersions" => self.list_state_machine_versions(&req),
202 "CreateStateMachineAlias" => self.create_state_machine_alias(&req),
203 "DeleteStateMachineAlias" => self.delete_state_machine_alias(&req),
204 "DescribeStateMachineAlias" => self.describe_state_machine_alias(&req),
205 "ListStateMachineAliases" => self.list_state_machine_aliases(&req),
206 "UpdateStateMachineAlias" => self.update_state_machine_alias(&req),
207 "DescribeMapRun" => self.describe_map_run(&req),
208 "ListMapRuns" => self.list_map_runs(&req),
209 "UpdateMapRun" => self.update_map_run(&req),
210 "RedriveExecution" => self.redrive_execution(&req),
211 "StartSyncExecution" => self.start_sync_execution(&req).await,
212 "TestState" => self.test_state(&req),
213 "ValidateStateMachineDefinition" => self.validate_state_machine_definition(&req),
214 _ => Err(AwsServiceError::action_not_implemented(
215 "states",
216 &req.action,
217 )),
218 };
219 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
220 self.save_snapshot().await;
221 }
222 result
223 }
224
225 fn supported_actions(&self) -> &[&str] {
226 SUPPORTED
227 }
228}
229
230impl StepFunctionsService {
231 fn create_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
234 let body = req.json_body();
235
236 validate_required("name", &body["name"])?;
237 let name = body["name"].as_str().ok_or_else(|| missing("name"))?;
238 validate_name(name)?;
239
240 validate_required("definition", &body["definition"])?;
241 let definition = body["definition"]
242 .as_str()
243 .ok_or_else(|| missing("definition"))?;
244 validate_definition(definition)?;
245
246 validate_required("roleArn", &body["roleArn"])?;
247 let role_arn = body["roleArn"].as_str().ok_or_else(|| missing("roleArn"))?;
248 validate_arn(role_arn)?;
249
250 let machine_type = if let Some(t) = body["type"].as_str() {
251 StateMachineType::parse(t).ok_or_else(|| {
252 AwsServiceError::aws_error(
253 StatusCode::BAD_REQUEST,
254 "ValidationException",
255 format!(
256 "Value '{t}' at 'type' failed to satisfy constraint: \
257 Member must satisfy enum value set: [STANDARD, EXPRESS]"
258 ),
259 )
260 })?
261 } else {
262 StateMachineType::Standard
263 };
264
265 let mut accounts = self.state.write();
266 let state = accounts.get_or_create(&req.account_id);
267 let arn = state.state_machine_arn(name);
268
269 if state.state_machines.values().any(|sm| sm.name == name) {
271 return Err(AwsServiceError::aws_error(
272 StatusCode::CONFLICT,
273 "StateMachineAlreadyExists",
274 format!("State Machine Already Exists: '{arn}'"),
275 ));
276 }
277
278 let now = Utc::now();
279 let revision_id = uuid::Uuid::new_v4().to_string();
280
281 let mut tags = BTreeMap::new();
282 if !body["tags"].is_null() {
283 fakecloud_core::tags::apply_tags(&mut tags, &body, "tags", "key", "value").map_err(
284 |f| {
285 AwsServiceError::aws_error(
286 StatusCode::BAD_REQUEST,
287 "ValidationException",
288 format!("{f} must be a list"),
289 )
290 },
291 )?;
292 }
293
294 let sm = StateMachine {
295 name: name.to_string(),
296 arn: arn.clone(),
297 definition: definition.to_string(),
298 role_arn: role_arn.to_string(),
299 machine_type,
300 status: StateMachineStatus::Active,
301 creation_date: now,
302 update_date: now,
303 tags,
304 revision_id: revision_id.clone(),
305 logging_configuration: body.get("loggingConfiguration").cloned(),
306 tracing_configuration: body.get("tracingConfiguration").cloned(),
307 description: body["description"].as_str().unwrap_or("").to_string(),
308 };
309
310 state.state_machines.insert(arn.clone(), sm);
311
312 Ok(AwsResponse::ok_json(json!({
313 "stateMachineArn": arn,
314 "creationDate": now.timestamp() as f64,
315 "stateMachineVersionArn": arn,
316 })))
317 }
318
319 fn describe_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
320 let body = req.json_body();
321 validate_required("stateMachineArn", &body["stateMachineArn"])?;
322 let arn = body["stateMachineArn"]
323 .as_str()
324 .ok_or_else(|| missing("stateMachineArn"))?;
325 validate_arn(arn)?;
326
327 let accounts = self.state.read();
328 let empty = StepFunctionsState::new(&req.account_id, &req.region);
329 let state = accounts.get(&req.account_id).unwrap_or(&empty);
330 let sm = state
331 .state_machines
332 .get(arn)
333 .ok_or_else(|| state_machine_not_found(arn))?;
334
335 Ok(AwsResponse::ok_json(state_machine_to_json(sm)))
336 }
337
338 fn list_state_machines(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
339 let body = req.json_body();
340 let raw_max_results = body["maxResults"].as_i64();
341 if let Some(mr) = raw_max_results {
342 validate_max_results(mr)?;
343 }
344 let max_results = match raw_max_results.unwrap_or(0) {
346 0 => 100,
347 n => n as usize,
348 };
349 let next_token = body["nextToken"].as_str();
350 if let Some(t) = next_token {
351 validate_page_token(t)?;
352 }
353
354 let accounts = self.state.read();
355 let empty = StepFunctionsState::new(&req.account_id, &req.region);
356 let state = accounts.get(&req.account_id).unwrap_or(&empty);
357 let mut machines: Vec<&StateMachine> = state.state_machines.values().collect();
358 machines.sort_by(|a, b| a.name.cmp(&b.name));
359
360 let items: Vec<Value> = machines
361 .iter()
362 .map(|sm| {
363 json!({
364 "name": sm.name,
365 "stateMachineArn": sm.arn,
366 "type": sm.machine_type.as_str(),
367 "creationDate": sm.creation_date.timestamp() as f64,
368 })
369 })
370 .collect();
371
372 let (page, token) = paginate(&items, next_token, max_results);
373
374 let mut resp = json!({ "stateMachines": page });
375 if let Some(t) = token {
376 resp["nextToken"] = json!(t);
377 }
378 Ok(AwsResponse::ok_json(resp))
379 }
380
381 fn delete_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
382 let body = req.json_body();
383 validate_required("stateMachineArn", &body["stateMachineArn"])?;
384 let arn = body["stateMachineArn"]
385 .as_str()
386 .ok_or_else(|| missing("stateMachineArn"))?;
387 validate_arn(arn)?;
388
389 let mut accounts = self.state.write();
390 let state = accounts.get_or_create(&req.account_id);
391 state.state_machines.remove(arn);
393
394 Ok(AwsResponse::ok_json(json!({})))
395 }
396
397 fn update_state_machine(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
398 let body = req.json_body();
399 validate_required("stateMachineArn", &body["stateMachineArn"])?;
400 let arn = body["stateMachineArn"]
401 .as_str()
402 .ok_or_else(|| missing("stateMachineArn"))?;
403 validate_arn(arn)?;
404
405 let mut accounts = self.state.write();
406 let state = accounts.get_or_create(&req.account_id);
407 let sm = state
408 .state_machines
409 .get_mut(arn)
410 .ok_or_else(|| state_machine_not_found(arn))?;
411
412 if let Some(definition) = body["definition"].as_str() {
413 validate_definition(definition)?;
414 sm.definition = definition.to_string();
415 }
416
417 if let Some(role_arn) = body["roleArn"].as_str() {
418 validate_arn(role_arn)?;
419 sm.role_arn = role_arn.to_string();
420 }
421
422 if let Some(logging) = body.get("loggingConfiguration") {
423 sm.logging_configuration = Some(logging.clone());
424 }
425
426 if let Some(tracing) = body.get("tracingConfiguration") {
427 sm.tracing_configuration = Some(tracing.clone());
428 }
429
430 if let Some(description) = body["description"].as_str() {
431 sm.description = description.to_string();
432 }
433
434 let now = Utc::now();
435 sm.update_date = now;
436 sm.revision_id = uuid::Uuid::new_v4().to_string();
437
438 let revision_id = sm.revision_id.clone();
439 let sm_arn = sm.arn.clone();
440
441 Ok(AwsResponse::ok_json(json!({
442 "updateDate": now.timestamp() as f64,
443 "revisionId": revision_id,
444 "stateMachineVersionArn": sm_arn,
445 })))
446 }
447
448 fn start_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
451 let body = req.json_body();
452 validate_required("stateMachineArn", &body["stateMachineArn"])?;
453 let sm_arn = body["stateMachineArn"]
454 .as_str()
455 .ok_or_else(|| missing("stateMachineArn"))?;
456 validate_arn(sm_arn)?;
457
458 let input = body["input"].as_str().map(|s| s.to_string());
459
460 if let Some(ref input_str) = input {
462 let _: serde_json::Value = serde_json::from_str(input_str).map_err(|_| {
463 AwsServiceError::aws_error(
464 StatusCode::BAD_REQUEST,
465 "InvalidExecutionInput",
466 "Invalid execution input: must be valid JSON".to_string(),
467 )
468 })?;
469 }
470
471 let execution_name = body["name"]
472 .as_str()
473 .map(|s| s.to_string())
474 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
475
476 if let Some(name) = body["name"].as_str() {
477 validate_name(name)?;
478 }
479
480 let mut accounts = self.state.write();
481 let state = accounts.get_or_create(&req.account_id);
482 let sm = state
483 .state_machines
484 .get(sm_arn)
485 .ok_or_else(|| state_machine_not_found(sm_arn))?;
486
487 let sm_name = sm.name.clone();
488 let definition = sm.definition.clone();
489 let exec_arn = state.execution_arn(&sm_name, &execution_name);
490
491 if state.executions.contains_key(&exec_arn) {
493 return Err(AwsServiceError::aws_error(
494 StatusCode::CONFLICT,
495 "ExecutionAlreadyExists",
496 format!("Execution Already Exists: '{exec_arn}'"),
497 ));
498 }
499
500 let now = Utc::now();
501 let execution = Execution {
502 execution_arn: exec_arn.clone(),
503 state_machine_arn: sm_arn.to_string(),
504 state_machine_name: sm_name,
505 name: execution_name,
506 status: ExecutionStatus::Running,
507 input: input.clone(),
508 output: None,
509 start_date: now,
510 stop_date: None,
511 error: None,
512 cause: None,
513 history_events: vec![],
514 parent_execution_arn: None,
515 is_sync: false,
516 billed_duration_ms: None,
517 billed_memory_mb: None,
518 };
519
520 state.executions.insert(exec_arn.clone(), execution);
521 let logging_config = sm.logging_configuration.clone();
522 drop(accounts);
523
524 let shared_state = self.state.clone();
526 let exec_arn_clone = exec_arn.clone();
527 let input_clone = input;
528 let delivery = self.delivery.clone();
529 let dynamodb_state = self.dynamodb_state.clone();
530 let registry = self.registry.clone();
531 tokio::spawn(async move {
532 interpreter::execute_state_machine(
533 shared_state,
534 exec_arn_clone,
535 definition,
536 input_clone,
537 delivery,
538 dynamodb_state,
539 registry,
540 logging_config,
541 )
542 .await;
543 });
544
545 Ok(AwsResponse::ok_json(json!({
546 "executionArn": exec_arn,
547 "startDate": now.timestamp() as f64,
548 })))
549 }
550
551 fn stop_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
552 let body = req.json_body();
553 validate_required("executionArn", &body["executionArn"])?;
554 let exec_arn = body["executionArn"]
555 .as_str()
556 .ok_or_else(|| missing("executionArn"))?;
557
558 let error = body["error"].as_str().map(|s| s.to_string());
559 let cause = body["cause"].as_str().map(|s| s.to_string());
560
561 let mut accounts = self.state.write();
562 let state = accounts.get_or_create(&req.account_id);
563 let exec = state
564 .executions
565 .get_mut(exec_arn)
566 .ok_or_else(|| execution_not_found(exec_arn))?;
567
568 if exec.status != ExecutionStatus::Running {
569 return Err(AwsServiceError::aws_error(
570 StatusCode::BAD_REQUEST,
571 "ExecutionNotRunning",
572 format!("Execution is not running: '{exec_arn}'"),
573 ));
574 }
575
576 let now = Utc::now();
577 exec.status = ExecutionStatus::Aborted;
578 exec.stop_date = Some(now);
579 exec.error = error;
580 exec.cause = cause;
581
582 Ok(AwsResponse::ok_json(json!({
583 "stopDate": now.timestamp() as f64,
584 })))
585 }
586
587 fn describe_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
588 let body = req.json_body();
589 validate_required("executionArn", &body["executionArn"])?;
590 let exec_arn = body["executionArn"]
591 .as_str()
592 .ok_or_else(|| missing("executionArn"))?;
593
594 let accounts = self.state.read();
595 let empty = StepFunctionsState::new(&req.account_id, &req.region);
596 let state = accounts.get(&req.account_id).unwrap_or(&empty);
597 let exec = state
598 .executions
599 .get(exec_arn)
600 .ok_or_else(|| execution_not_found(exec_arn))?;
601
602 Ok(AwsResponse::ok_json(execution_to_json(exec)))
603 }
604
605 fn list_executions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
606 let body = req.json_body();
607 validate_required("stateMachineArn", &body["stateMachineArn"])?;
608 let sm_arn = body["stateMachineArn"]
609 .as_str()
610 .ok_or_else(|| missing("stateMachineArn"))?;
611 validate_arn(sm_arn)?;
612
613 let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
614 validate_range_i64("maxResults", max_results as i64, 1, 1000)?;
615 let next_token = body["nextToken"].as_str();
616 let status_filter = body["statusFilter"].as_str();
617
618 let accounts = self.state.read();
619 let empty = StepFunctionsState::new(&req.account_id, &req.region);
620 let state = accounts.get(&req.account_id).unwrap_or(&empty);
621
622 if !state.state_machines.contains_key(sm_arn) {
624 return Err(state_machine_not_found(sm_arn));
625 }
626
627 let mut executions: Vec<&Execution> = state
628 .executions
629 .values()
630 .filter(|e| e.state_machine_arn == sm_arn)
631 .filter(|e| {
632 status_filter
633 .map(|sf| e.status.as_str() == sf)
634 .unwrap_or(true)
635 })
636 .collect();
637
638 executions.sort_by_key(|e| std::cmp::Reverse(e.start_date));
640
641 let items: Vec<Value> = executions
642 .iter()
643 .map(|e| {
644 let mut item = json!({
645 "executionArn": e.execution_arn,
646 "stateMachineArn": e.state_machine_arn,
647 "name": e.name,
648 "status": e.status.as_str(),
649 "startDate": e.start_date.timestamp() as f64,
650 });
651 if let Some(stop) = e.stop_date {
652 item["stopDate"] = json!(stop.timestamp() as f64);
653 }
654 item
655 })
656 .collect();
657
658 let (page, token) = paginate(&items, next_token, max_results);
659
660 let mut resp = json!({ "executions": page });
661 if let Some(t) = token {
662 resp["nextToken"] = json!(t);
663 }
664 Ok(AwsResponse::ok_json(resp))
665 }
666
667 fn get_execution_history(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
668 let body = req.json_body();
669 validate_required("executionArn", &body["executionArn"])?;
670 let exec_arn = body["executionArn"]
671 .as_str()
672 .ok_or_else(|| missing("executionArn"))?;
673 validate_arn_length("executionArn", exec_arn, 256)?;
674
675 if let Some(mr) = body["maxResults"].as_i64() {
676 validate_max_results(mr)?;
677 }
678 let max_results = body["maxResults"].as_i64().unwrap_or(100) as usize;
679 let next_token = body["nextToken"].as_str();
680 if let Some(t) = next_token {
681 validate_page_token(t)?;
682 }
683 let reverse_order = body["reverseOrder"].as_bool().unwrap_or(false);
684
685 let accounts = self.state.read();
686 let empty = StepFunctionsState::new(&req.account_id, &req.region);
687 let state = accounts.get(&req.account_id).unwrap_or(&empty);
688 let exec = state
689 .executions
690 .get(exec_arn)
691 .ok_or_else(|| execution_not_found(exec_arn))?;
692
693 let mut events: Vec<Value> = exec
694 .history_events
695 .iter()
696 .map(|e| {
697 json!({
698 "id": e.id,
699 "type": e.event_type,
700 "timestamp": e.timestamp.timestamp() as f64,
701 "previousEventId": e.previous_event_id,
702 format!("{}EventDetails", camel_to_details_key(&e.event_type)): e.details,
703 })
704 })
705 .collect();
706
707 if reverse_order {
708 events.reverse();
709 }
710
711 let (page, token) = paginate(&events, next_token, max_results);
712
713 let mut resp = json!({ "events": page });
714 if let Some(t) = token {
715 resp["nextToken"] = json!(t);
716 }
717 Ok(AwsResponse::ok_json(resp))
718 }
719
720 fn describe_state_machine_for_execution(
721 &self,
722 req: &AwsRequest,
723 ) -> Result<AwsResponse, AwsServiceError> {
724 let body = req.json_body();
725 validate_required("executionArn", &body["executionArn"])?;
726 let exec_arn = body["executionArn"]
727 .as_str()
728 .ok_or_else(|| missing("executionArn"))?;
729
730 let accounts = self.state.read();
731 let empty = StepFunctionsState::new(&req.account_id, &req.region);
732 let state = accounts.get(&req.account_id).unwrap_or(&empty);
733 let exec = state
734 .executions
735 .get(exec_arn)
736 .ok_or_else(|| execution_not_found(exec_arn))?;
737
738 let sm = state
739 .state_machines
740 .get(&exec.state_machine_arn)
741 .ok_or_else(|| state_machine_not_found(&exec.state_machine_arn))?;
742
743 Ok(AwsResponse::ok_json(state_machine_to_json(sm)))
744 }
745
746 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
749 let body = req.json_body();
750 validate_required("resourceArn", &body["resourceArn"])?;
751 let arn = body["resourceArn"]
752 .as_str()
753 .ok_or_else(|| missing("resourceArn"))?;
754 validate_arn(arn)?;
755 validate_required("tags", &body["tags"])?;
756
757 let mut accounts = self.state.write();
758 let state = accounts.get_or_create(&req.account_id);
759 let sm = state
760 .state_machines
761 .get_mut(arn)
762 .ok_or_else(|| resource_not_found(arn))?;
763
764 fakecloud_core::tags::apply_tags(&mut sm.tags, &body, "tags", "key", "value").map_err(
765 |f| {
766 AwsServiceError::aws_error(
767 StatusCode::BAD_REQUEST,
768 "ValidationException",
769 format!("{f} must be a list"),
770 )
771 },
772 )?;
773
774 Ok(AwsResponse::ok_json(json!({})))
775 }
776
777 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
778 let body = req.json_body();
779 validate_required("resourceArn", &body["resourceArn"])?;
780 let arn = body["resourceArn"]
781 .as_str()
782 .ok_or_else(|| missing("resourceArn"))?;
783 validate_arn(arn)?;
784 validate_required("tagKeys", &body["tagKeys"])?;
785
786 let mut accounts = self.state.write();
787 let state = accounts.get_or_create(&req.account_id);
788 let sm = state
789 .state_machines
790 .get_mut(arn)
791 .ok_or_else(|| resource_not_found(arn))?;
792
793 fakecloud_core::tags::remove_tags(&mut sm.tags, &body, "tagKeys").map_err(|f| {
794 AwsServiceError::aws_error(
795 StatusCode::BAD_REQUEST,
796 "ValidationException",
797 format!("{f} must be a list"),
798 )
799 })?;
800
801 Ok(AwsResponse::ok_json(json!({})))
802 }
803
804 fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
805 let body = req.json_body();
806 validate_required("resourceArn", &body["resourceArn"])?;
807 let arn = body["resourceArn"]
808 .as_str()
809 .ok_or_else(|| missing("resourceArn"))?;
810 validate_arn(arn)?;
811
812 let accounts = self.state.read();
813 let empty = StepFunctionsState::new(&req.account_id, &req.region);
814 let state = accounts.get(&req.account_id).unwrap_or(&empty);
815 let sm = state
816 .state_machines
817 .get(arn)
818 .ok_or_else(|| resource_not_found(arn))?;
819
820 let tags = fakecloud_core::tags::tags_to_json(&sm.tags, "key", "value");
821
822 Ok(AwsResponse::ok_json(json!({ "tags": tags })))
823 }
824
825 fn create_activity(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
828 let body = req.json_body();
829 let name = body["name"].as_str().ok_or_else(|| missing("name"))?;
830 validate_name(name)?;
831 let mut accounts = self.state.write();
832 let state = accounts.get_or_create(&req.account_id);
833 let arn = format!(
834 "arn:aws:states:{}:{}:activity:{}",
835 state.region, state.account_id, name
836 );
837 if state.activities.contains_key(&arn) {
838 return Err(AwsServiceError::aws_error(
839 StatusCode::BAD_REQUEST,
840 "ActivityAlreadyExists",
841 format!("Activity already exists: {arn}"),
842 ));
843 }
844 let activity = crate::state::Activity {
845 name: name.to_string(),
846 arn: arn.clone(),
847 creation_date: chrono::Utc::now(),
848 tags: BTreeMap::new(),
849 };
850 state.activities.insert(arn.clone(), activity.clone());
851 Ok(AwsResponse::ok_json(json!({
852 "activityArn": arn,
853 "creationDate": activity.creation_date.timestamp(),
854 })))
855 }
856
857 fn delete_activity(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
858 let body = req.json_body();
859 let arn = body["activityArn"]
860 .as_str()
861 .ok_or_else(|| missing("activityArn"))?
862 .to_string();
863 validate_arn_length("activityArn", &arn, 256)?;
864 let mut accounts = self.state.write();
865 let state = accounts.get_or_create(&req.account_id);
866 state.activities.remove(&arn);
867 Ok(AwsResponse::ok_json(json!({})))
868 }
869
870 fn describe_activity(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
871 let body = req.json_body();
872 let arn = body["activityArn"]
873 .as_str()
874 .ok_or_else(|| missing("activityArn"))?
875 .to_string();
876 let accounts = self.state.read();
877 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
878 let state = accounts.get(&req.account_id).unwrap_or(&empty);
879 let a = state.activities.get(&arn).ok_or_else(|| {
880 AwsServiceError::aws_error(
881 StatusCode::BAD_REQUEST,
882 "ActivityDoesNotExist",
883 format!("Activity does not exist: {arn}"),
884 )
885 })?;
886 Ok(AwsResponse::ok_json(json!({
887 "activityArn": a.arn,
888 "name": a.name,
889 "creationDate": a.creation_date.timestamp(),
890 })))
891 }
892
893 fn list_activities(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
894 let body = req.json_body();
895 let raw_max_results = body["maxResults"].as_i64();
896 if let Some(mr) = raw_max_results {
897 validate_max_results(mr)?;
898 }
899 let next_token = body["nextToken"].as_str();
900 if let Some(t) = next_token {
901 validate_page_token(t)?;
902 }
903 let max_results = match raw_max_results.unwrap_or(0) {
907 0 => 100,
908 n => n as usize,
909 };
910 let accounts = self.state.read();
911 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
912 let state = accounts.get(&req.account_id).unwrap_or(&empty);
913 let mut activities: Vec<&crate::state::Activity> = state.activities.values().collect();
914 activities.sort_by(|a, b| a.name.cmp(&b.name));
915 let items: Vec<Value> = activities
916 .iter()
917 .map(|a| {
918 json!({
919 "activityArn": a.arn,
920 "name": a.name,
921 "creationDate": a.creation_date.timestamp(),
922 })
923 })
924 .collect();
925 let (page, token) = paginate(&items, next_token, max_results);
926 let mut resp = json!({ "activities": page });
927 if let Some(t) = token {
928 resp["nextToken"] = json!(t);
929 }
930 Ok(AwsResponse::ok_json(resp))
931 }
932
933 async fn get_activity_task(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
934 let body = req.json_body();
935 let arn = body["activityArn"]
936 .as_str()
937 .ok_or_else(|| missing("activityArn"))?
938 .to_string();
939 {
941 let accounts = self.state.read();
942 let state = accounts
943 .get(&req.account_id)
944 .ok_or_else(|| activity_not_found(&arn))?;
945 if !state.activities.contains_key(&arn) {
946 return Err(activity_not_found(&arn));
947 }
948 }
949
950 let max_wait_secs: u64 = std::env::var("FAKECLOUD_SFN_GET_ACTIVITY_TIMEOUT_SECS")
953 .ok()
954 .and_then(|s| s.parse().ok())
955 .unwrap_or(5);
956 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(max_wait_secs);
957
958 loop {
959 {
961 let mut accounts = self.state.write();
962 let state = accounts.get_or_create(&req.account_id);
963 let mut candidates: Vec<(String, chrono::DateTime<chrono::Utc>)> = state
964 .task_tokens
965 .iter()
966 .filter(|(_, t)| t.activity_arn == arn && t.status == "PENDING")
967 .map(|(k, t)| (k.clone(), t.created_at))
968 .collect();
969 candidates.sort_by_key(|c| c.1);
970 if let Some((token, _)) = candidates.into_iter().next() {
971 let now = chrono::Utc::now();
972 let entry = state.task_tokens.get_mut(&token).expect("just looked up");
973 entry.status = "IN_PROGRESS".to_string();
974 entry.last_heartbeat_at = Some(now);
975 let input = entry.input.clone().unwrap_or_else(|| "{}".to_string());
976 return Ok(AwsResponse::ok_json(json!({
977 "taskToken": token,
978 "input": input,
979 })));
980 }
981 }
982 if std::time::Instant::now() >= deadline {
983 return Ok(AwsResponse::ok_json(json!({
986 "taskToken": "",
987 "input": "",
988 })));
989 }
990 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
991 }
992 }
993
994 fn send_task_success(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
995 self.update_task_token(req, "SUCCEEDED")
996 }
997
998 fn send_task_failure(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
999 self.update_task_token(req, "FAILED")
1000 }
1001
1002 fn send_task_heartbeat(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1003 let body = req.json_body();
1008 let token = body["taskToken"]
1009 .as_str()
1010 .ok_or_else(|| missing("taskToken"))?
1011 .to_string();
1012 let mut accounts = self.state.write();
1013 let state = accounts.get_or_create(&req.account_id);
1014 let entry = state
1015 .task_tokens
1016 .get_mut(&token)
1017 .ok_or_else(|| task_does_not_exist(&token))?;
1018 entry.last_heartbeat_at = Some(chrono::Utc::now());
1019 Ok(AwsResponse::ok_json(json!({})))
1020 }
1021
1022 fn update_task_token(
1023 &self,
1024 req: &AwsRequest,
1025 new_status: &str,
1026 ) -> Result<AwsResponse, AwsServiceError> {
1027 let body = req.json_body();
1028 let token = body["taskToken"]
1029 .as_str()
1030 .ok_or_else(|| missing("taskToken"))?
1031 .to_string();
1032 let mut accounts = self.state.write();
1033 let state = accounts.get_or_create(&req.account_id);
1034 let entry = state
1035 .task_tokens
1036 .get_mut(&token)
1037 .ok_or_else(|| task_does_not_exist(&token))?;
1038 entry.status = new_status.to_string();
1039 if new_status == "SUCCEEDED" {
1040 entry.output = body["output"].as_str().map(String::from);
1041 } else if new_status == "FAILED" {
1042 entry.error = body["error"].as_str().map(String::from);
1043 entry.cause = body["cause"].as_str().map(String::from);
1044 }
1045 Ok(AwsResponse::ok_json(json!({})))
1046 }
1047
1048 fn publish_state_machine_version(
1051 &self,
1052 req: &AwsRequest,
1053 ) -> Result<AwsResponse, AwsServiceError> {
1054 let body = req.json_body();
1055 let arn = body["stateMachineArn"]
1056 .as_str()
1057 .ok_or_else(|| missing("stateMachineArn"))?
1058 .to_string();
1059 let description = body["description"].as_str().unwrap_or("").to_string();
1060 let mut accounts = self.state.write();
1061 let state = accounts.get_or_create(&req.account_id);
1062 if !state.state_machines.contains_key(&arn) {
1063 return Err(state_machine_not_found(&arn));
1064 }
1065 let version = state
1066 .state_machine_versions
1067 .values()
1068 .filter(|v| v.state_machine_arn == arn)
1069 .map(|v| v.version)
1070 .max()
1071 .unwrap_or(0)
1072 + 1;
1073 let version_arn = format!("{arn}:{version}");
1074 let v = crate::state::StateMachineVersion {
1075 state_machine_arn: arn,
1076 version,
1077 revision_id: format!("rev-{version}"),
1078 description,
1079 creation_date: chrono::Utc::now(),
1080 };
1081 state
1082 .state_machine_versions
1083 .insert(version_arn.clone(), v.clone());
1084 Ok(AwsResponse::ok_json(json!({
1085 "stateMachineVersionArn": version_arn,
1086 "creationDate": v.creation_date.timestamp(),
1087 })))
1088 }
1089
1090 fn delete_state_machine_version(
1091 &self,
1092 req: &AwsRequest,
1093 ) -> Result<AwsResponse, AwsServiceError> {
1094 let body = req.json_body();
1095 let arn = body["stateMachineVersionArn"]
1096 .as_str()
1097 .ok_or_else(|| missing("stateMachineVersionArn"))?
1098 .to_string();
1099 validate_arn_length("stateMachineVersionArn", &arn, 2000)?;
1100 let mut accounts = self.state.write();
1101 let state = accounts.get_or_create(&req.account_id);
1102 state.state_machine_versions.remove(&arn);
1103 Ok(AwsResponse::ok_json(json!({})))
1104 }
1105
1106 fn list_state_machine_versions(
1107 &self,
1108 req: &AwsRequest,
1109 ) -> Result<AwsResponse, AwsServiceError> {
1110 let body = req.json_body();
1111 let arn = body["stateMachineArn"]
1112 .as_str()
1113 .ok_or_else(|| missing("stateMachineArn"))?
1114 .to_string();
1115 validate_arn_length("stateMachineArn", &arn, 256)?;
1116 let raw_max_results = body["maxResults"].as_i64();
1117 if let Some(mr) = raw_max_results {
1118 validate_max_results(mr)?;
1119 }
1120 let next_token = body["nextToken"].as_str();
1121 if let Some(t) = next_token {
1122 validate_page_token(t)?;
1123 }
1124 let max_results = match raw_max_results.unwrap_or(0) {
1125 0 => 100,
1126 n => n as usize,
1127 };
1128 let accounts = self.state.read();
1129 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1130 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1131 let mut versions: Vec<&crate::state::StateMachineVersion> = state
1132 .state_machine_versions
1133 .values()
1134 .filter(|v| v.state_machine_arn == arn)
1135 .collect();
1136 versions.sort_by_key(|v| std::cmp::Reverse(v.version));
1137 let items: Vec<Value> = versions
1138 .iter()
1139 .map(|v| {
1140 json!({
1141 "stateMachineVersionArn": format!("{}:{}", v.state_machine_arn, v.version),
1142 "creationDate": v.creation_date.timestamp(),
1143 })
1144 })
1145 .collect();
1146 let (page, token) = paginate(&items, next_token, max_results);
1147 let mut resp = json!({ "stateMachineVersions": page });
1148 if let Some(t) = token {
1149 resp["nextToken"] = json!(t);
1150 }
1151 Ok(AwsResponse::ok_json(resp))
1152 }
1153
1154 fn create_state_machine_alias(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1155 let body = req.json_body();
1156 let name = body["name"]
1157 .as_str()
1158 .ok_or_else(|| missing("name"))?
1159 .to_string();
1160 validate_name(&name)?;
1161 let routing_cfg = body["routingConfiguration"]
1162 .as_array()
1163 .ok_or_else(|| missing("routingConfiguration"))?;
1164 let routes = parse_routing_configuration(routing_cfg)?;
1165 let parent_arn = routes[0]
1166 .state_machine_version_arn
1167 .rsplit_once(':')
1168 .map(|(parent, _)| parent.to_string())
1169 .unwrap_or_default();
1170 let alias_arn = format!("{parent_arn}:{name}");
1171 let now = chrono::Utc::now();
1172 let alias = crate::state::StateMachineAlias {
1173 name,
1174 arn: alias_arn.clone(),
1175 description: body["description"].as_str().unwrap_or("").to_string(),
1176 routing_configuration: routes,
1177 creation_date: now,
1178 update_date: now,
1179 };
1180 let mut accounts = self.state.write();
1181 let state = accounts.get_or_create(&req.account_id);
1182 state.state_machine_aliases.insert(alias_arn.clone(), alias);
1183 Ok(AwsResponse::ok_json(json!({
1184 "stateMachineAliasArn": alias_arn,
1185 "creationDate": now.timestamp(),
1186 })))
1187 }
1188
1189 fn delete_state_machine_alias(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1190 let body = req.json_body();
1191 let arn = body["stateMachineAliasArn"]
1192 .as_str()
1193 .ok_or_else(|| missing("stateMachineAliasArn"))?
1194 .to_string();
1195 validate_arn_length("stateMachineAliasArn", &arn, 256)?;
1196 let mut accounts = self.state.write();
1197 let state = accounts.get_or_create(&req.account_id);
1198 state.state_machine_aliases.remove(&arn);
1199 Ok(AwsResponse::ok_json(json!({})))
1200 }
1201
1202 fn describe_state_machine_alias(
1203 &self,
1204 req: &AwsRequest,
1205 ) -> Result<AwsResponse, AwsServiceError> {
1206 let body = req.json_body();
1207 let arn = body["stateMachineAliasArn"]
1208 .as_str()
1209 .ok_or_else(|| missing("stateMachineAliasArn"))?
1210 .to_string();
1211 let accounts = self.state.read();
1212 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1213 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1214 let alias = state
1215 .state_machine_aliases
1216 .get(&arn)
1217 .ok_or_else(|| resource_not_found(&arn))?;
1218 Ok(AwsResponse::ok_json(state_machine_alias_to_json(alias)))
1219 }
1220
1221 fn list_state_machine_aliases(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1222 let body = req.json_body();
1223 let parent = body["stateMachineArn"]
1224 .as_str()
1225 .ok_or_else(|| missing("stateMachineArn"))?
1226 .to_string();
1227 validate_arn_length("stateMachineArn", &parent, 256)?;
1228 let raw_max_results = body["maxResults"].as_i64();
1229 if let Some(mr) = raw_max_results {
1230 validate_max_results(mr)?;
1231 }
1232 let next_token = body["nextToken"].as_str();
1233 if let Some(t) = next_token {
1234 validate_page_token(t)?;
1235 }
1236 let max_results = match raw_max_results.unwrap_or(0) {
1237 0 => 100,
1238 n => n as usize,
1239 };
1240 let accounts = self.state.read();
1241 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1242 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1243 let parent_prefix = format!("{parent}:");
1246 let mut aliases: Vec<&crate::state::StateMachineAlias> = state
1247 .state_machine_aliases
1248 .values()
1249 .filter(|a| a.arn.starts_with(&parent_prefix))
1250 .collect();
1251 aliases.sort_by(|a, b| a.name.cmp(&b.name));
1252 let items: Vec<Value> = aliases
1253 .iter()
1254 .map(|a| {
1255 json!({
1256 "stateMachineAliasArn": a.arn,
1257 "creationDate": a.creation_date.timestamp(),
1258 })
1259 })
1260 .collect();
1261 let (page, token) = paginate(&items, next_token, max_results);
1262 let mut resp = json!({ "stateMachineAliases": page });
1263 if let Some(t) = token {
1264 resp["nextToken"] = json!(t);
1265 }
1266 Ok(AwsResponse::ok_json(resp))
1267 }
1268
1269 fn update_state_machine_alias(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1270 let body = req.json_body();
1271 let arn = body["stateMachineAliasArn"]
1272 .as_str()
1273 .ok_or_else(|| missing("stateMachineAliasArn"))?
1274 .to_string();
1275 let mut accounts = self.state.write();
1276 let state = accounts.get_or_create(&req.account_id);
1277 let alias = state
1278 .state_machine_aliases
1279 .get_mut(&arn)
1280 .ok_or_else(|| resource_not_found(&arn))?;
1281 if let Some(d) = body["description"].as_str() {
1282 alias.description = d.to_string();
1283 }
1284 if let Some(routes) = body["routingConfiguration"].as_array() {
1285 alias.routing_configuration = parse_routing_configuration(routes)?;
1286 }
1287 alias.update_date = chrono::Utc::now();
1288 Ok(AwsResponse::ok_json(json!({
1289 "updateDate": alias.update_date.timestamp(),
1290 })))
1291 }
1292
1293 fn describe_map_run(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1296 let body = req.json_body();
1297 let arn = body["mapRunArn"]
1298 .as_str()
1299 .ok_or_else(|| missing("mapRunArn"))?
1300 .to_string();
1301 let accounts = self.state.read();
1302 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1303 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1304 let mr = state
1305 .map_runs
1306 .get(&arn)
1307 .ok_or_else(|| resource_not_found(&arn))?;
1308 Ok(AwsResponse::ok_json(map_run_to_json(mr)))
1309 }
1310
1311 fn list_map_runs(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1312 let body = req.json_body();
1313 let exec_arn = body["executionArn"]
1315 .as_str()
1316 .ok_or_else(|| missing("executionArn"))?
1317 .to_string();
1318 validate_arn_length("executionArn", &exec_arn, 256)?;
1319 let raw_max_results = body["maxResults"].as_i64();
1320 if let Some(mr) = raw_max_results {
1321 validate_max_results(mr)?;
1322 }
1323 let next_token = body["nextToken"].as_str();
1324 if let Some(t) = next_token {
1325 validate_page_token(t)?;
1326 }
1327 let max_results = match raw_max_results.unwrap_or(0) {
1328 0 => 100,
1329 n => n as usize,
1330 };
1331 let accounts = self.state.read();
1332 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
1333 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1334 let mut runs: Vec<&crate::state::MapRun> = state
1335 .map_runs
1336 .values()
1337 .filter(|r| r.execution_arn == exec_arn)
1338 .collect();
1339 runs.sort_by_key(|r| r.start_date);
1341 let items: Vec<Value> = runs
1342 .iter()
1343 .map(|r| {
1344 json!({
1345 "mapRunArn": r.map_run_arn,
1346 "executionArn": r.execution_arn,
1347 "stateMachineArn": "",
1348 "startDate": r.start_date.timestamp(),
1349 "stopDate": r.stop_date.map(|d| d.timestamp()),
1350 })
1351 })
1352 .collect();
1353 let (page, token) = paginate(&items, next_token, max_results);
1354 let mut resp = json!({ "mapRuns": page });
1355 if let Some(t) = token {
1356 resp["nextToken"] = json!(t);
1357 }
1358 Ok(AwsResponse::ok_json(resp))
1359 }
1360
1361 fn update_map_run(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1362 let body = req.json_body();
1363 let arn = body["mapRunArn"]
1364 .as_str()
1365 .ok_or_else(|| missing("mapRunArn"))?
1366 .to_string();
1367 let mut accounts = self.state.write();
1368 let state = accounts.get_or_create(&req.account_id);
1369 let mr = state
1370 .map_runs
1371 .get_mut(&arn)
1372 .ok_or_else(|| resource_not_found(&arn))?;
1373 if let Some(c) = body["maxConcurrency"].as_i64() {
1374 mr.max_concurrency = c as i32;
1375 }
1376 if let Some(p) = body["toleratedFailurePercentage"].as_f64() {
1377 mr.tolerated_failure_percentage = p;
1378 }
1379 if let Some(c) = body["toleratedFailureCount"].as_i64() {
1380 mr.tolerated_failure_count = c;
1381 }
1382 Ok(AwsResponse::ok_json(json!({})))
1383 }
1384
1385 fn redrive_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1388 let body = req.json_body();
1389 let arn = body["executionArn"]
1390 .as_str()
1391 .ok_or_else(|| missing("executionArn"))?
1392 .to_string();
1393 let mut accounts = self.state.write();
1394 let state = accounts.get_or_create(&req.account_id);
1395 let exec = state.executions.get_mut(&arn).ok_or_else(|| {
1396 AwsServiceError::aws_error(
1397 StatusCode::BAD_REQUEST,
1398 "ExecutionDoesNotExist",
1399 format!("Execution does not exist: {arn}"),
1400 )
1401 })?;
1402 exec.status = crate::state::ExecutionStatus::Running;
1403 exec.stop_date = None;
1404 Ok(AwsResponse::ok_json(json!({
1405 "redriveDate": chrono::Utc::now().timestamp(),
1406 })))
1407 }
1408
1409 async fn start_sync_execution(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1410 let body = req.json_body();
1411 let sm_arn = body["stateMachineArn"]
1412 .as_str()
1413 .ok_or_else(|| missing("stateMachineArn"))?
1414 .to_string();
1415 let input = body["input"].as_str().unwrap_or("{}").to_string();
1416 if serde_json::from_str::<serde_json::Value>(&input).is_err() {
1417 return Err(AwsServiceError::aws_error(
1418 StatusCode::BAD_REQUEST,
1419 "InvalidExecutionInput",
1420 "Execution input is not valid JSON.",
1421 ));
1422 }
1423 let (exec_arn, definition, logging_config) = {
1424 let mut accounts = self.state.write();
1425 let state = accounts.get_or_create(&req.account_id);
1426 let sm = state
1427 .state_machines
1428 .get(&sm_arn)
1429 .ok_or_else(|| state_machine_not_found(&sm_arn))?;
1430 if sm.machine_type != crate::state::StateMachineType::Express {
1431 return Err(AwsServiceError::aws_error(
1432 StatusCode::BAD_REQUEST,
1433 "StateMachineTypeNotSupported",
1434 "StartSyncExecution is only supported for EXPRESS state machines.",
1435 ));
1436 }
1437 let now = chrono::Utc::now();
1438 let exec_name = format!("sync-{}", now.timestamp_millis());
1439 let exec_arn = format!(
1440 "arn:aws:states:{}:{}:express:{}:{}",
1441 state.region, state.account_id, sm.name, exec_name
1442 );
1443 let execution = Execution {
1444 execution_arn: exec_arn.clone(),
1445 state_machine_arn: sm_arn.clone(),
1446 state_machine_name: sm.name.clone(),
1447 name: exec_name.clone(),
1448 status: ExecutionStatus::Running,
1449 input: Some(input.clone()),
1450 output: None,
1451 start_date: now,
1452 stop_date: None,
1453 error: None,
1454 cause: None,
1455 history_events: vec![],
1456 parent_execution_arn: None,
1457 is_sync: true,
1458 billed_duration_ms: None,
1459 billed_memory_mb: None,
1460 };
1461 state.executions.insert(exec_arn.clone(), execution);
1462 (
1463 exec_arn,
1464 sm.definition.clone(),
1465 sm.logging_configuration.clone(),
1466 )
1467 };
1468
1469 interpreter::execute_state_machine(
1470 self.state.clone(),
1471 exec_arn.clone(),
1472 definition,
1473 Some(input),
1474 self.delivery.clone(),
1475 self.dynamodb_state.clone(),
1476 self.registry.clone(),
1477 logging_config,
1478 )
1479 .await;
1480
1481 {
1484 let mut accounts = self.state.write();
1485 if let Some(state) = accounts.get_mut(&req.account_id) {
1486 if let Some(exec) = state.executions.get_mut(&exec_arn) {
1487 let duration_ms = exec
1488 .stop_date
1489 .map_or(0, |stop| (stop - exec.start_date).num_milliseconds())
1490 .max(0);
1491 exec.billed_duration_ms = Some(duration_ms);
1492 exec.billed_memory_mb = Some(64);
1493 }
1494 }
1495 }
1496
1497 let accounts = self.state.read();
1498 let state = accounts.get(&req.account_id).unwrap();
1499 let exec = state
1500 .executions
1501 .get(&exec_arn)
1502 .ok_or_else(|| execution_not_found(&exec_arn))?;
1503
1504 let mut resp = json!({
1505 "executionArn": exec.execution_arn,
1506 "stateMachineArn": exec.state_machine_arn,
1507 "name": exec.name,
1508 "startDate": exec.start_date.timestamp(),
1509 "stopDate": exec.stop_date.map(|d| d.timestamp()),
1510 "status": exec.status.as_str(),
1511 "input": exec.input.as_deref().unwrap_or("{}"),
1512 });
1513
1514 if let Some(ref output) = exec.output {
1515 resp["output"] = json!(output);
1516 }
1517 if let Some(ref error) = exec.error {
1518 resp["error"] = json!(error);
1519 }
1520 if let Some(ref cause) = exec.cause {
1521 resp["cause"] = json!(cause);
1522 }
1523
1524 let duration_ms = exec
1525 .stop_date
1526 .map_or(0, |stop| (stop - exec.start_date).num_milliseconds());
1527 resp["billingDetails"] = json!({
1528 "billedMemoryUsedInMB": 64,
1529 "billedDurationInMilliseconds": duration_ms.max(0),
1530 });
1531
1532 Ok(AwsResponse::ok_json(resp))
1533 }
1534
1535 fn test_state(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1536 let body = req.json_body();
1537 let definition = body["definition"]
1538 .as_str()
1539 .ok_or_else(|| missing("definition"))?;
1540 validate_definition(definition)?;
1541 let _role_arn = body["roleArn"].as_str().ok_or_else(|| missing("roleArn"))?;
1542 let input = body["input"].as_str().unwrap_or("{}").to_string();
1543 Ok(AwsResponse::ok_json(json!({
1547 "output": input,
1548 "status": "SUCCEEDED",
1549 "nextState": "End",
1550 })))
1551 }
1552
1553 fn validate_state_machine_definition(
1554 &self,
1555 req: &AwsRequest,
1556 ) -> Result<AwsResponse, AwsServiceError> {
1557 let body = req.json_body();
1558 let definition = body["definition"]
1559 .as_str()
1560 .ok_or_else(|| missing("definition"))?;
1561 if definition.is_empty() {
1562 return Err(AwsServiceError::aws_error(
1563 StatusCode::BAD_REQUEST,
1564 "ValidationException",
1565 "definition must be 1..=1048576 characters",
1566 ));
1567 }
1568 if let Some(mr) = body["maxResults"].as_i64() {
1569 if !(0..=100).contains(&mr) {
1570 return Err(AwsServiceError::aws_error(
1571 StatusCode::BAD_REQUEST,
1572 "ValidationException",
1573 format!("maxResults '{mr}' is outside 0..=100"),
1574 ));
1575 }
1576 }
1577 if let Some(sev) = body["severity"].as_str() {
1578 if !matches!(sev, "ERROR" | "WARNING") {
1579 return Err(AwsServiceError::aws_error(
1580 StatusCode::BAD_REQUEST,
1581 "ValidationException",
1582 format!("severity '{sev}' must be ERROR or WARNING"),
1583 ));
1584 }
1585 }
1586 if let Some(ty) = body["type"].as_str() {
1587 if !matches!(ty, "STANDARD" | "EXPRESS") {
1588 return Err(AwsServiceError::aws_error(
1589 StatusCode::BAD_REQUEST,
1590 "ValidationException",
1591 format!("type '{ty}' must be STANDARD or EXPRESS"),
1592 ));
1593 }
1594 }
1595 match validate_definition(definition) {
1596 Ok(()) => Ok(AwsResponse::ok_json(json!({
1597 "result": "OK",
1598 "diagnostics": [],
1599 }))),
1600 Err(e) => Ok(AwsResponse::ok_json(json!({
1601 "result": "FAIL",
1602 "diagnostics": [{
1603 "severity": "ERROR",
1604 "code": "INVALID_DEFINITION",
1605 "message": e.to_string(),
1606 }],
1607 }))),
1608 }
1609 }
1610}
1611
1612fn state_machine_alias_to_json(alias: &crate::state::StateMachineAlias) -> Value {
1613 json!({
1614 "stateMachineAliasArn": alias.arn,
1615 "name": alias.name,
1616 "description": alias.description,
1617 "routingConfiguration": alias.routing_configuration.iter().map(|r| json!({
1618 "stateMachineVersionArn": r.state_machine_version_arn,
1619 "weight": r.weight,
1620 })).collect::<Vec<_>>(),
1621 "creationDate": alias.creation_date.timestamp(),
1622 "updateDate": alias.update_date.timestamp(),
1623 })
1624}
1625
1626fn map_run_to_json(mr: &crate::state::MapRun) -> Value {
1627 json!({
1628 "mapRunArn": mr.map_run_arn,
1629 "executionArn": mr.execution_arn,
1630 "maxConcurrency": mr.max_concurrency,
1631 "toleratedFailurePercentage": mr.tolerated_failure_percentage,
1632 "toleratedFailureCount": mr.tolerated_failure_count,
1633 "status": mr.status,
1634 "startDate": mr.start_date.timestamp(),
1635 "stopDate": mr.stop_date.map(|d| d.timestamp()),
1636 })
1637}
1638
1639fn state_machine_to_json(sm: &StateMachine) -> Value {
1642 let mut resp = json!({
1643 "name": sm.name,
1644 "stateMachineArn": sm.arn,
1645 "definition": sm.definition,
1646 "roleArn": sm.role_arn,
1647 "type": sm.machine_type.as_str(),
1648 "status": sm.status.as_str(),
1649 "creationDate": sm.creation_date.timestamp() as f64,
1650 "updateDate": sm.update_date.timestamp() as f64,
1651 "revisionId": sm.revision_id,
1652 "label": sm.name,
1653 });
1654
1655 if !sm.description.is_empty() {
1656 resp["description"] = json!(sm.description);
1657 }
1658
1659 if let Some(ref logging) = sm.logging_configuration {
1660 resp["loggingConfiguration"] = logging.clone();
1661 } else {
1662 resp["loggingConfiguration"] = json!({
1663 "level": "OFF",
1664 "includeExecutionData": false,
1665 "destinations": [],
1666 });
1667 }
1668
1669 if let Some(ref tracing) = sm.tracing_configuration {
1670 resp["tracingConfiguration"] = tracing.clone();
1671 } else {
1672 resp["tracingConfiguration"] = json!({
1673 "enabled": false,
1674 });
1675 }
1676
1677 resp
1678}
1679
1680fn missing(name: &str) -> AwsServiceError {
1681 AwsServiceError::aws_error(
1682 StatusCode::BAD_REQUEST,
1683 "ValidationException",
1684 format!("The request must contain the parameter {name}."),
1685 )
1686}
1687
1688fn state_machine_not_found(arn: &str) -> AwsServiceError {
1689 AwsServiceError::aws_error(
1690 StatusCode::BAD_REQUEST,
1691 "StateMachineDoesNotExist",
1692 format!("State Machine Does Not Exist: '{arn}'"),
1693 )
1694}
1695
1696fn activity_not_found(arn: &str) -> AwsServiceError {
1697 AwsServiceError::aws_error(
1698 StatusCode::BAD_REQUEST,
1699 "ActivityDoesNotExist",
1700 format!("Activity does not exist: {arn}"),
1701 )
1702}
1703
1704fn task_does_not_exist(token: &str) -> AwsServiceError {
1705 AwsServiceError::aws_error(
1706 StatusCode::BAD_REQUEST,
1707 "TaskDoesNotExist",
1708 format!("Task does not exist: {token}"),
1709 )
1710}
1711
1712fn resource_not_found(arn: &str) -> AwsServiceError {
1713 AwsServiceError::aws_error(
1714 StatusCode::BAD_REQUEST,
1715 "ResourceNotFound",
1716 format!("Resource not found: '{arn}'"),
1717 )
1718}
1719
1720fn parse_routing_configuration(
1725 routes: &[serde_json::Value],
1726) -> Result<Vec<crate::state::AliasRoute>, AwsServiceError> {
1727 if routes.is_empty() || routes.len() > 2 {
1728 return Err(AwsServiceError::aws_error(
1729 StatusCode::BAD_REQUEST,
1730 "ValidationException",
1731 "routingConfiguration must contain 1 or 2 routes.",
1732 ));
1733 }
1734 let parsed: Vec<crate::state::AliasRoute> = routes
1735 .iter()
1736 .map(|r| {
1737 let arn = r["stateMachineVersionArn"].as_str().ok_or_else(|| {
1738 AwsServiceError::aws_error(
1739 StatusCode::BAD_REQUEST,
1740 "ValidationException",
1741 "routingConfiguration entries must contain stateMachineVersionArn.",
1742 )
1743 })?;
1744 let weight = r["weight"].as_i64().ok_or_else(|| {
1745 AwsServiceError::aws_error(
1746 StatusCode::BAD_REQUEST,
1747 "ValidationException",
1748 "routingConfiguration entries must contain a numeric weight.",
1749 )
1750 })?;
1751 if !(0..=100).contains(&weight) {
1752 return Err(AwsServiceError::aws_error(
1753 StatusCode::BAD_REQUEST,
1754 "ValidationException",
1755 format!("Invalid routing weight {weight}; must be 0-100."),
1756 ));
1757 }
1758 Ok(crate::state::AliasRoute {
1759 state_machine_version_arn: arn.to_string(),
1760 weight: weight as i32,
1761 })
1762 })
1763 .collect::<Result<_, _>>()?;
1764 let total: i32 = parsed.iter().map(|r| r.weight).sum();
1765 if total != 100 {
1766 return Err(AwsServiceError::aws_error(
1767 StatusCode::BAD_REQUEST,
1768 "ValidationException",
1769 format!("routingConfiguration weights must sum to 100, got {total}."),
1770 ));
1771 }
1772 Ok(parsed)
1773}
1774
1775fn validate_name(name: &str) -> Result<(), AwsServiceError> {
1776 if name.is_empty() || name.len() > 80 {
1777 return Err(AwsServiceError::aws_error(
1778 StatusCode::BAD_REQUEST,
1779 "InvalidName",
1780 format!("Invalid Name: '{name}' (length must be between 1 and 80 characters)"),
1781 ));
1782 }
1783 if !name
1785 .chars()
1786 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
1787 {
1788 return Err(AwsServiceError::aws_error(
1789 StatusCode::BAD_REQUEST,
1790 "InvalidName",
1791 format!(
1792 "Invalid Name: '{name}' (must only contain alphanumeric characters, hyphens, and underscores)"
1793 ),
1794 ));
1795 }
1796 Ok(())
1797}
1798
1799fn validate_definition(definition: &str) -> Result<(), AwsServiceError> {
1800 let parsed: Value = serde_json::from_str(definition).map_err(|e| {
1801 AwsServiceError::aws_error(
1802 StatusCode::BAD_REQUEST,
1803 "InvalidDefinition",
1804 format!("Invalid State Machine Definition: '{e}'"),
1805 )
1806 })?;
1807
1808 if parsed.get("StartAt").and_then(|v| v.as_str()).is_none() {
1809 return Err(AwsServiceError::aws_error(
1810 StatusCode::BAD_REQUEST,
1811 "InvalidDefinition",
1812 "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
1813 .to_string(),
1814 ));
1815 }
1816
1817 let states_obj = parsed
1818 .get("States")
1819 .and_then(|v| v.as_object())
1820 .ok_or_else(|| {
1821 AwsServiceError::aws_error(
1822 StatusCode::BAD_REQUEST,
1823 "InvalidDefinition",
1824 "Invalid State Machine Definition: 'MISSING_STATES' (States field is required)"
1825 .to_string(),
1826 )
1827 })?;
1828
1829 let start_at = parsed["StartAt"].as_str().ok_or_else(|| {
1830 AwsServiceError::aws_error(
1831 StatusCode::BAD_REQUEST,
1832 "InvalidDefinition",
1833 "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
1834 .to_string(),
1835 )
1836 })?;
1837 if !states_obj.contains_key(start_at) {
1838 return Err(AwsServiceError::aws_error(
1839 StatusCode::BAD_REQUEST,
1840 "InvalidDefinition",
1841 format!(
1842 "Invalid State Machine Definition: 'MISSING_TRANSITION_TARGET' \
1843 (StartAt '{start_at}' does not reference a valid state)"
1844 ),
1845 ));
1846 }
1847
1848 Ok(())
1849}
1850
1851fn execution_not_found(arn: &str) -> AwsServiceError {
1852 AwsServiceError::aws_error(
1853 StatusCode::BAD_REQUEST,
1854 "ExecutionDoesNotExist",
1855 format!("Execution Does Not Exist: '{arn}'"),
1856 )
1857}
1858
1859fn execution_to_json(exec: &Execution) -> Value {
1860 let mut resp = json!({
1861 "executionArn": exec.execution_arn,
1862 "stateMachineArn": exec.state_machine_arn,
1863 "name": exec.name,
1864 "status": exec.status.as_str(),
1865 "startDate": exec.start_date.timestamp() as f64,
1866 });
1867
1868 if let Some(ref input) = exec.input {
1869 resp["input"] = json!(input);
1870 }
1871 if let Some(ref output) = exec.output {
1872 resp["output"] = json!(output);
1873 }
1874 if let Some(stop) = exec.stop_date {
1875 resp["stopDate"] = json!(stop.timestamp() as f64);
1876 }
1877 if let Some(ref error) = exec.error {
1878 resp["error"] = json!(error);
1879 }
1880 if let Some(ref cause) = exec.cause {
1881 resp["cause"] = json!(cause);
1882 }
1883
1884 resp
1885}
1886
1887fn camel_to_details_key(event_type: &str) -> String {
1889 let mut chars = event_type.chars();
1890 match chars.next() {
1891 None => String::new(),
1892 Some(c) => c.to_lowercase().to_string() + chars.as_str(),
1893 }
1894}
1895
1896fn validate_arn(arn: &str) -> Result<(), AwsServiceError> {
1897 if !arn.starts_with("arn:") {
1898 return Err(AwsServiceError::aws_error(
1899 StatusCode::BAD_REQUEST,
1900 "InvalidArn",
1901 format!("Invalid Arn: '{arn}'"),
1902 ));
1903 }
1904 Ok(())
1905}
1906
1907fn validate_arn_length(field: &str, value: &str, max: usize) -> Result<(), AwsServiceError> {
1911 if value.is_empty() || value.len() > max {
1912 return Err(AwsServiceError::aws_error(
1913 StatusCode::BAD_REQUEST,
1914 "InvalidArn",
1915 format!("Invalid Arn at '{field}': must be 1..={max} characters"),
1916 ));
1917 }
1918 Ok(())
1919}
1920
1921fn validate_page_token(value: &str) -> Result<(), AwsServiceError> {
1925 if value.is_empty() || value.len() > 1024 {
1926 return Err(AwsServiceError::aws_error(
1927 StatusCode::BAD_REQUEST,
1928 "InvalidToken",
1929 "nextToken must be 1..=1024 characters",
1930 ));
1931 }
1932 Ok(())
1933}
1934
1935fn validate_max_results(value: i64) -> Result<(), AwsServiceError> {
1941 if !(0..=1000).contains(&value) {
1942 return Err(AwsServiceError::aws_error(
1943 StatusCode::BAD_REQUEST,
1944 "InvalidToken",
1945 format!("maxResults '{value}' is outside 0..=1000"),
1946 ));
1947 }
1948 Ok(())
1949}
1950
1951pub fn start_execution_from_delivery(
1960 state: &SharedStepFunctionsState,
1961 delivery: &Option<Arc<DeliveryBus>>,
1962 dynamodb_state: &Option<SharedDynamoDbState>,
1963 registry: &Option<SharedServiceRegistry>,
1964 state_machine_arn: &str,
1965 input: &str,
1966) {
1967 if serde_json::from_str::<serde_json::Value>(input).is_err() {
1969 tracing::warn!(
1970 state_machine_arn,
1971 "Step Functions delivery: invalid JSON input, skipping execution"
1972 );
1973 return;
1974 }
1975
1976 let execution_name = uuid::Uuid::new_v4().to_string();
1977
1978 let account_id = state_machine_arn
1980 .split(':')
1981 .nth(4)
1982 .unwrap_or("000000000000")
1983 .to_string();
1984
1985 let mut accounts = state.write();
1986 let st = accounts.get_or_create(&account_id);
1987 let sm = match st.state_machines.get(state_machine_arn) {
1988 Some(sm) => sm,
1989 None => {
1990 tracing::warn!(
1991 state_machine_arn,
1992 "Step Functions delivery: state machine not found"
1993 );
1994 return;
1995 }
1996 };
1997
1998 let sm_name = sm.name.clone();
1999 let definition = sm.definition.clone();
2000 let exec_arn = st.execution_arn(&sm_name, &execution_name);
2001
2002 let now = Utc::now();
2003 let execution = Execution {
2004 execution_arn: exec_arn.clone(),
2005 state_machine_arn: state_machine_arn.to_string(),
2006 state_machine_name: sm_name,
2007 name: execution_name,
2008 status: ExecutionStatus::Running,
2009 input: Some(input.to_string()),
2010 output: None,
2011 start_date: now,
2012 stop_date: None,
2013 error: None,
2014 cause: None,
2015 history_events: vec![],
2016 parent_execution_arn: None,
2017 is_sync: false,
2018 billed_duration_ms: None,
2019 billed_memory_mb: None,
2020 };
2021
2022 st.executions.insert(exec_arn.clone(), execution);
2023 let logging_config = sm.logging_configuration.clone();
2024 drop(accounts);
2025
2026 let shared_state = state.clone();
2027 let delivery = delivery.clone();
2028 let dynamodb_state = dynamodb_state.clone();
2029 let registry = registry.clone();
2030 let input = Some(input.to_string());
2031 tokio::spawn(async move {
2032 interpreter::execute_state_machine(
2033 shared_state,
2034 exec_arn,
2035 definition,
2036 input,
2037 delivery,
2038 dynamodb_state,
2039 registry,
2040 logging_config,
2041 )
2042 .await;
2043 });
2044}
2045
2046#[cfg(test)]
2047mod tests {
2048 use super::*;
2049 use http::{HeaderMap, Method};
2050 use parking_lot::RwLock;
2051 use serde_json::Value;
2052 use std::collections::HashMap;
2053 use std::sync::Arc;
2054
2055 fn make_state() -> SharedStepFunctionsState {
2056 Arc::new(RwLock::new(
2057 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
2058 ))
2059 }
2060
2061 fn make_request(action: &str, body: &str) -> AwsRequest {
2062 AwsRequest {
2063 service: "states".to_string(),
2064 action: action.to_string(),
2065 region: "us-east-1".to_string(),
2066 account_id: "123456789012".to_string(),
2067 request_id: "test-id".to_string(),
2068 headers: HeaderMap::new(),
2069 query_params: HashMap::new(),
2070 body: body.as_bytes().to_vec().into(),
2071 body_stream: parking_lot::Mutex::new(None),
2072 path_segments: vec![],
2073 raw_path: "/".to_string(),
2074 raw_query: String::new(),
2075 method: Method::POST,
2076 is_query_protocol: false,
2077 access_key_id: None,
2078 principal: None,
2079 }
2080 }
2081
2082 fn body_json(resp: &AwsResponse) -> Value {
2083 serde_json::from_slice(resp.body.expect_bytes()).unwrap()
2084 }
2085
2086 fn expect_err(result: Result<AwsResponse, AwsServiceError>) -> AwsServiceError {
2087 match result {
2088 Err(e) => e,
2089 Ok(_) => panic!("expected error, got Ok"),
2090 }
2091 }
2092
2093 const VALID_DEF: &str = r#"{"StartAt":"Pass","States":{"Pass":{"Type":"Pass","End":true}}}"#;
2094
2095 fn create_sm(svc: &StepFunctionsService, name: &str) -> String {
2096 let body = json!({
2097 "name": name,
2098 "definition": VALID_DEF,
2099 "roleArn": "arn:aws:iam::123456789012:role/test",
2100 });
2101 let req = make_request("CreateStateMachine", &body.to_string());
2102 let resp = svc.create_state_machine(&req).unwrap();
2103 let b = body_json(&resp);
2104 b["stateMachineArn"].as_str().unwrap().to_string()
2105 }
2106
2107 #[test]
2110 fn create_state_machine_basic() {
2111 let svc = StepFunctionsService::new(make_state());
2112 let arn = create_sm(&svc, "test-sm");
2113 assert!(arn.contains("test-sm"));
2114 }
2115
2116 #[test]
2117 fn create_state_machine_with_express_type() {
2118 let svc = StepFunctionsService::new(make_state());
2119 let body = json!({
2120 "name": "express-sm",
2121 "definition": VALID_DEF,
2122 "roleArn": "arn:aws:iam::123456789012:role/r",
2123 "type": "EXPRESS",
2124 });
2125 let req = make_request("CreateStateMachine", &body.to_string());
2126 let resp = svc.create_state_machine(&req).unwrap();
2127 let b = body_json(&resp);
2128 assert!(b["stateMachineArn"].as_str().is_some());
2129 }
2130
2131 #[test]
2132 fn create_state_machine_duplicate_fails() {
2133 let svc = StepFunctionsService::new(make_state());
2134 create_sm(&svc, "dup-sm");
2135 let body = json!({
2136 "name": "dup-sm",
2137 "definition": VALID_DEF,
2138 "roleArn": "arn:aws:iam::123456789012:role/r",
2139 });
2140 let req = make_request("CreateStateMachine", &body.to_string());
2141 let err = expect_err(svc.create_state_machine(&req));
2142 assert!(err.to_string().contains("StateMachineAlreadyExists"));
2143 }
2144
2145 #[test]
2146 fn create_state_machine_missing_name() {
2147 let svc = StepFunctionsService::new(make_state());
2148 let body = json!({
2149 "definition": VALID_DEF,
2150 "roleArn": "arn:aws:iam::123456789012:role/r",
2151 });
2152 let req = make_request("CreateStateMachine", &body.to_string());
2153 assert!(svc.create_state_machine(&req).is_err());
2154 }
2155
2156 #[test]
2157 fn create_state_machine_invalid_definition() {
2158 let svc = StepFunctionsService::new(make_state());
2159 let body = json!({
2160 "name": "bad-def",
2161 "definition": "not json",
2162 "roleArn": "arn:aws:iam::123456789012:role/r",
2163 });
2164 let req = make_request("CreateStateMachine", &body.to_string());
2165 let err = expect_err(svc.create_state_machine(&req));
2166 assert!(err.to_string().contains("InvalidDefinition"));
2167 }
2168
2169 #[test]
2170 fn create_state_machine_definition_missing_start_at() {
2171 let svc = StepFunctionsService::new(make_state());
2172 let body = json!({
2173 "name": "no-start",
2174 "definition": r#"{"States":{"S":{"Type":"Pass","End":true}}}"#,
2175 "roleArn": "arn:aws:iam::123456789012:role/r",
2176 });
2177 let req = make_request("CreateStateMachine", &body.to_string());
2178 let err = expect_err(svc.create_state_machine(&req));
2179 assert!(err.to_string().contains("InvalidDefinition"));
2180 }
2181
2182 #[test]
2183 fn create_state_machine_definition_missing_states() {
2184 let svc = StepFunctionsService::new(make_state());
2185 let body = json!({
2186 "name": "no-states",
2187 "definition": r#"{"StartAt":"S"}"#,
2188 "roleArn": "arn:aws:iam::123456789012:role/r",
2189 });
2190 let req = make_request("CreateStateMachine", &body.to_string());
2191 let err = expect_err(svc.create_state_machine(&req));
2192 assert!(err.to_string().contains("InvalidDefinition"));
2193 }
2194
2195 #[test]
2196 fn create_state_machine_definition_start_at_not_in_states() {
2197 let svc = StepFunctionsService::new(make_state());
2198 let body = json!({
2199 "name": "bad-start",
2200 "definition": r#"{"StartAt":"Missing","States":{"S":{"Type":"Pass","End":true}}}"#,
2201 "roleArn": "arn:aws:iam::123456789012:role/r",
2202 });
2203 let req = make_request("CreateStateMachine", &body.to_string());
2204 let err = expect_err(svc.create_state_machine(&req));
2205 assert!(err.to_string().contains("MISSING_TRANSITION_TARGET"));
2206 }
2207
2208 #[test]
2209 fn create_state_machine_invalid_type() {
2210 let svc = StepFunctionsService::new(make_state());
2211 let body = json!({
2212 "name": "bad-type",
2213 "definition": VALID_DEF,
2214 "roleArn": "arn:aws:iam::123456789012:role/r",
2215 "type": "INVALID",
2216 });
2217 let req = make_request("CreateStateMachine", &body.to_string());
2218 assert!(svc.create_state_machine(&req).is_err());
2219 }
2220
2221 #[test]
2222 fn create_state_machine_invalid_arn() {
2223 let svc = StepFunctionsService::new(make_state());
2224 let body = json!({
2225 "name": "bad-arn",
2226 "definition": VALID_DEF,
2227 "roleArn": "not-an-arn",
2228 });
2229 let req = make_request("CreateStateMachine", &body.to_string());
2230 let err = expect_err(svc.create_state_machine(&req));
2231 assert!(err.to_string().contains("InvalidArn"));
2232 }
2233
2234 #[test]
2235 fn create_state_machine_invalid_name() {
2236 let svc = StepFunctionsService::new(make_state());
2237 let body = json!({
2238 "name": "has spaces!",
2239 "definition": VALID_DEF,
2240 "roleArn": "arn:aws:iam::123456789012:role/r",
2241 });
2242 let req = make_request("CreateStateMachine", &body.to_string());
2243 let err = expect_err(svc.create_state_machine(&req));
2244 assert!(err.to_string().contains("InvalidName"));
2245 }
2246
2247 #[test]
2248 fn create_state_machine_name_too_long() {
2249 let svc = StepFunctionsService::new(make_state());
2250 let long_name = "a".repeat(81);
2251 let body = json!({
2252 "name": long_name,
2253 "definition": VALID_DEF,
2254 "roleArn": "arn:aws:iam::123456789012:role/r",
2255 });
2256 let req = make_request("CreateStateMachine", &body.to_string());
2257 let err = expect_err(svc.create_state_machine(&req));
2258 assert!(err.to_string().contains("InvalidName"));
2259 }
2260
2261 #[test]
2264 fn describe_state_machine_found() {
2265 let svc = StepFunctionsService::new(make_state());
2266 let arn = create_sm(&svc, "desc-sm");
2267
2268 let req = make_request(
2269 "DescribeStateMachine",
2270 &json!({"stateMachineArn": arn}).to_string(),
2271 );
2272 let resp = svc.describe_state_machine(&req).unwrap();
2273 let b = body_json(&resp);
2274 assert_eq!(b["name"], "desc-sm");
2275 assert_eq!(b["status"], "ACTIVE");
2276 assert!(b["definition"].as_str().is_some());
2277 }
2278
2279 #[test]
2280 fn describe_state_machine_not_found() {
2281 let svc = StepFunctionsService::new(make_state());
2282 let req = make_request(
2283 "DescribeStateMachine",
2284 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
2285 .to_string(),
2286 );
2287 let err = expect_err(svc.describe_state_machine(&req));
2288 assert!(err.to_string().contains("StateMachineDoesNotExist"));
2289 }
2290
2291 #[test]
2294 fn list_state_machines_empty() {
2295 let svc = StepFunctionsService::new(make_state());
2296 let req = make_request("ListStateMachines", "{}");
2297 let resp = svc.list_state_machines(&req).unwrap();
2298 let b = body_json(&resp);
2299 assert!(b["stateMachines"].as_array().unwrap().is_empty());
2300 }
2301
2302 #[test]
2303 fn list_state_machines_returns_created() {
2304 let svc = StepFunctionsService::new(make_state());
2305 create_sm(&svc, "sm-1");
2306 create_sm(&svc, "sm-2");
2307
2308 let req = make_request("ListStateMachines", "{}");
2309 let resp = svc.list_state_machines(&req).unwrap();
2310 let b = body_json(&resp);
2311 assert_eq!(b["stateMachines"].as_array().unwrap().len(), 2);
2312 }
2313
2314 #[test]
2317 fn delete_state_machine() {
2318 let svc = StepFunctionsService::new(make_state());
2319 let arn = create_sm(&svc, "del-sm");
2320
2321 let req = make_request(
2322 "DeleteStateMachine",
2323 &json!({"stateMachineArn": arn}).to_string(),
2324 );
2325 svc.delete_state_machine(&req).unwrap();
2326
2327 let req = make_request(
2329 "DescribeStateMachine",
2330 &json!({"stateMachineArn": arn}).to_string(),
2331 );
2332 assert!(svc.describe_state_machine(&req).is_err());
2333 }
2334
2335 #[test]
2336 fn delete_state_machine_nonexistent_succeeds() {
2337 let svc = StepFunctionsService::new(make_state());
2338 let req = make_request(
2339 "DeleteStateMachine",
2340 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
2341 .to_string(),
2342 );
2343 svc.delete_state_machine(&req).unwrap();
2345 }
2346
2347 #[test]
2350 fn update_state_machine() {
2351 let svc = StepFunctionsService::new(make_state());
2352 let arn = create_sm(&svc, "upd-sm");
2353
2354 let new_def = r#"{"StartAt":"NewPass","States":{"NewPass":{"Type":"Pass","End":true}}}"#;
2355 let body = json!({
2356 "stateMachineArn": arn,
2357 "definition": new_def,
2358 "description": "updated",
2359 });
2360 let req = make_request("UpdateStateMachine", &body.to_string());
2361 let resp = svc.update_state_machine(&req).unwrap();
2362 let b = body_json(&resp);
2363 assert!(b["updateDate"].as_f64().is_some());
2364
2365 let req = make_request(
2367 "DescribeStateMachine",
2368 &json!({"stateMachineArn": arn}).to_string(),
2369 );
2370 let resp = svc.describe_state_machine(&req).unwrap();
2371 let b = body_json(&resp);
2372 assert!(b["definition"].as_str().unwrap().contains("NewPass"));
2373 assert_eq!(b["description"], "updated");
2374 }
2375
2376 #[test]
2377 fn update_state_machine_not_found() {
2378 let svc = StepFunctionsService::new(make_state());
2379 let body = json!({
2380 "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
2381 "definition": VALID_DEF,
2382 });
2383 let req = make_request("UpdateStateMachine", &body.to_string());
2384 let err = expect_err(svc.update_state_machine(&req));
2385 assert!(err.to_string().contains("StateMachineDoesNotExist"));
2386 }
2387
2388 #[tokio::test]
2391 async fn start_execution_basic() {
2392 let svc = StepFunctionsService::new(make_state());
2393 let arn = create_sm(&svc, "exec-sm");
2394
2395 let body = json!({
2396 "stateMachineArn": arn,
2397 "input": r#"{"key":"value"}"#,
2398 });
2399 let req = make_request("StartExecution", &body.to_string());
2400 let resp = svc.start_execution(&req).unwrap();
2401 let b = body_json(&resp);
2402 assert!(b["executionArn"].as_str().is_some());
2403 assert!(b["startDate"].as_f64().is_some());
2404 }
2405
2406 #[tokio::test]
2407 async fn start_execution_with_name() {
2408 let svc = StepFunctionsService::new(make_state());
2409 let arn = create_sm(&svc, "named-exec");
2410
2411 let body = json!({
2412 "stateMachineArn": arn,
2413 "name": "my-execution",
2414 });
2415 let req = make_request("StartExecution", &body.to_string());
2416 let resp = svc.start_execution(&req).unwrap();
2417 let b = body_json(&resp);
2418 assert!(b["executionArn"].as_str().unwrap().contains("my-execution"));
2419 }
2420
2421 #[tokio::test]
2422 async fn start_execution_sm_not_found() {
2423 let svc = StepFunctionsService::new(make_state());
2424 let body = json!({
2425 "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
2426 });
2427 let req = make_request("StartExecution", &body.to_string());
2428 let err = expect_err(svc.start_execution(&req));
2429 assert!(err.to_string().contains("StateMachineDoesNotExist"));
2430 }
2431
2432 #[tokio::test]
2433 async fn start_execution_invalid_input() {
2434 let svc = StepFunctionsService::new(make_state());
2435 let arn = create_sm(&svc, "bad-input");
2436
2437 let body = json!({
2438 "stateMachineArn": arn,
2439 "input": "not json",
2440 });
2441 let req = make_request("StartExecution", &body.to_string());
2442 let err = expect_err(svc.start_execution(&req));
2443 assert!(err.to_string().contains("InvalidExecutionInput"));
2444 }
2445
2446 #[tokio::test]
2447 async fn start_execution_duplicate_name() {
2448 let svc = StepFunctionsService::new(make_state());
2449 let arn = create_sm(&svc, "dup-exec");
2450
2451 let body = json!({
2452 "stateMachineArn": arn,
2453 "name": "same-name",
2454 });
2455 let req = make_request("StartExecution", &body.to_string());
2456 svc.start_execution(&req).unwrap();
2457
2458 let req = make_request("StartExecution", &body.to_string());
2459 let err = expect_err(svc.start_execution(&req));
2460 assert!(err.to_string().contains("ExecutionAlreadyExists"));
2461 }
2462
2463 #[tokio::test]
2466 async fn describe_execution_found() {
2467 let svc = StepFunctionsService::new(make_state());
2468 let sm_arn = create_sm(&svc, "desc-exec");
2469
2470 let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
2471 let req = make_request("StartExecution", &body.to_string());
2472 let resp = svc.start_execution(&req).unwrap();
2473 let exec_arn = body_json(&resp)["executionArn"]
2474 .as_str()
2475 .unwrap()
2476 .to_string();
2477
2478 let req = make_request(
2479 "DescribeExecution",
2480 &json!({"executionArn": exec_arn}).to_string(),
2481 );
2482 let resp = svc.describe_execution(&req).unwrap();
2483 let b = body_json(&resp);
2484 assert_eq!(b["name"], "e1");
2485 assert_eq!(b["status"], "RUNNING");
2486 }
2487
2488 #[tokio::test]
2489 async fn describe_execution_not_found() {
2490 let svc = StepFunctionsService::new(make_state());
2491 let req = make_request(
2492 "DescribeExecution",
2493 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
2494 .to_string(),
2495 );
2496 let err = expect_err(svc.describe_execution(&req));
2497 assert!(err.to_string().contains("ExecutionDoesNotExist"));
2498 }
2499
2500 #[tokio::test]
2503 async fn stop_execution() {
2504 let svc = StepFunctionsService::new(make_state());
2505 let sm_arn = create_sm(&svc, "stop-sm");
2506
2507 let body = json!({"stateMachineArn": sm_arn, "name": "stop-e"});
2508 let req = make_request("StartExecution", &body.to_string());
2509 let resp = svc.start_execution(&req).unwrap();
2510 let exec_arn = body_json(&resp)["executionArn"]
2511 .as_str()
2512 .unwrap()
2513 .to_string();
2514
2515 let body = json!({
2516 "executionArn": exec_arn,
2517 "error": "UserAborted",
2518 "cause": "test stop",
2519 });
2520 let req = make_request("StopExecution", &body.to_string());
2521 let resp = svc.stop_execution(&req).unwrap();
2522 let b = body_json(&resp);
2523 assert!(b["stopDate"].as_f64().is_some());
2524
2525 let req = make_request(
2527 "DescribeExecution",
2528 &json!({"executionArn": exec_arn}).to_string(),
2529 );
2530 let resp = svc.describe_execution(&req).unwrap();
2531 let b = body_json(&resp);
2532 assert_eq!(b["status"], "ABORTED");
2533 assert_eq!(b["error"], "UserAborted");
2534 }
2535
2536 #[tokio::test]
2537 async fn stop_execution_not_found() {
2538 let svc = StepFunctionsService::new(make_state());
2539 let req = make_request(
2540 "StopExecution",
2541 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
2542 .to_string(),
2543 );
2544 let err = expect_err(svc.stop_execution(&req));
2545 assert!(err.to_string().contains("ExecutionDoesNotExist"));
2546 }
2547
2548 #[tokio::test]
2551 async fn list_executions() {
2552 let svc = StepFunctionsService::new(make_state());
2553 let sm_arn = create_sm(&svc, "list-exec");
2554
2555 for i in 0..3 {
2556 let body = json!({"stateMachineArn": sm_arn, "name": format!("e{i}")});
2557 let req = make_request("StartExecution", &body.to_string());
2558 svc.start_execution(&req).unwrap();
2559 }
2560
2561 let req = make_request(
2562 "ListExecutions",
2563 &json!({"stateMachineArn": sm_arn}).to_string(),
2564 );
2565 let resp = svc.list_executions(&req).unwrap();
2566 let b = body_json(&resp);
2567 assert_eq!(b["executions"].as_array().unwrap().len(), 3);
2568 }
2569
2570 #[tokio::test]
2571 async fn list_executions_sm_not_found() {
2572 let svc = StepFunctionsService::new(make_state());
2573 let req = make_request(
2574 "ListExecutions",
2575 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
2576 .to_string(),
2577 );
2578 let err = expect_err(svc.list_executions(&req));
2579 assert!(err.to_string().contains("StateMachineDoesNotExist"));
2580 }
2581
2582 #[tokio::test]
2585 async fn get_execution_history_not_found() {
2586 let svc = StepFunctionsService::new(make_state());
2587 let req = make_request(
2588 "GetExecutionHistory",
2589 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
2590 .to_string(),
2591 );
2592 let err = expect_err(svc.get_execution_history(&req));
2593 assert!(err.to_string().contains("ExecutionDoesNotExist"));
2594 }
2595
2596 #[tokio::test]
2599 async fn describe_sm_for_execution() {
2600 let svc = StepFunctionsService::new(make_state());
2601 let sm_arn = create_sm(&svc, "sm-for-exec");
2602
2603 let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
2604 let req = make_request("StartExecution", &body.to_string());
2605 let resp = svc.start_execution(&req).unwrap();
2606 let exec_arn = body_json(&resp)["executionArn"]
2607 .as_str()
2608 .unwrap()
2609 .to_string();
2610
2611 let req = make_request(
2612 "DescribeStateMachineForExecution",
2613 &json!({"executionArn": exec_arn}).to_string(),
2614 );
2615 let resp = svc.describe_state_machine_for_execution(&req).unwrap();
2616 let b = body_json(&resp);
2617 assert_eq!(b["name"], "sm-for-exec");
2618 }
2619
2620 #[test]
2623 fn tag_untag_list_tags() {
2624 let svc = StepFunctionsService::new(make_state());
2625 let arn = create_sm(&svc, "tagged-sm");
2626
2627 let body = json!({
2629 "resourceArn": arn,
2630 "tags": [{"key": "env", "value": "prod"}],
2631 });
2632 let req = make_request("TagResource", &body.to_string());
2633 svc.tag_resource(&req).unwrap();
2634
2635 let req = make_request(
2637 "ListTagsForResource",
2638 &json!({"resourceArn": arn}).to_string(),
2639 );
2640 let resp = svc.list_tags_for_resource(&req).unwrap();
2641 let b = body_json(&resp);
2642 let tags = b["tags"].as_array().unwrap();
2643 assert_eq!(tags.len(), 1);
2644 assert_eq!(tags[0]["key"], "env");
2645
2646 let body = json!({
2648 "resourceArn": arn,
2649 "tagKeys": ["env"],
2650 });
2651 let req = make_request("UntagResource", &body.to_string());
2652 svc.untag_resource(&req).unwrap();
2653
2654 let req = make_request(
2656 "ListTagsForResource",
2657 &json!({"resourceArn": arn}).to_string(),
2658 );
2659 let resp = svc.list_tags_for_resource(&req).unwrap();
2660 let b = body_json(&resp);
2661 assert!(b["tags"].as_array().unwrap().is_empty());
2662 }
2663
2664 #[test]
2665 fn tag_resource_not_found() {
2666 let svc = StepFunctionsService::new(make_state());
2667 let body = json!({
2668 "resourceArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
2669 "tags": [{"key": "k", "value": "v"}],
2670 });
2671 let req = make_request("TagResource", &body.to_string());
2672 let err = expect_err(svc.tag_resource(&req));
2673 assert!(err.to_string().contains("ResourceNotFound"));
2674 }
2675
2676 #[test]
2679 fn test_validate_name() {
2680 assert!(validate_name("valid-name").is_ok());
2681 assert!(validate_name("under_score").is_ok());
2682 assert!(validate_name("").is_err());
2683 assert!(validate_name("has spaces").is_err());
2684 assert!(validate_name(&"a".repeat(81)).is_err());
2685 }
2686
2687 #[test]
2688 fn test_validate_definition() {
2689 assert!(validate_definition(VALID_DEF).is_ok());
2690 assert!(validate_definition("not json").is_err());
2691 assert!(validate_definition(r#"{"States":{}}"#).is_err()); assert!(validate_definition(r#"{"StartAt":"S"}"#).is_err()); }
2694
2695 #[test]
2696 fn test_validate_arn() {
2697 assert!(validate_arn("arn:aws:states:us-east-1:123:sm:test").is_ok());
2698 assert!(validate_arn("not-an-arn").is_err());
2699 }
2700
2701 #[test]
2702 fn test_camel_to_details_key() {
2703 assert_eq!(camel_to_details_key("PassStateEntered"), "passStateEntered");
2704 assert_eq!(camel_to_details_key(""), "");
2705 }
2706
2707 #[test]
2708 fn test_is_mutating_action() {
2709 assert!(is_mutating_action("CreateStateMachine"));
2710 assert!(is_mutating_action("StartExecution"));
2711 assert!(!is_mutating_action("DescribeStateMachine"));
2712 assert!(!is_mutating_action("ListStateMachines"));
2713 }
2714
2715 fn create_express_sm(svc: &StepFunctionsService, name: &str) -> String {
2718 let body = json!({
2719 "name": name,
2720 "definition": VALID_DEF,
2721 "roleArn": "arn:aws:iam::123456789012:role/test",
2722 "type": "EXPRESS",
2723 });
2724 let req = make_request("CreateStateMachine", &body.to_string());
2725 let resp = svc.create_state_machine(&req).unwrap();
2726 let b = body_json(&resp);
2727 b["stateMachineArn"].as_str().unwrap().to_string()
2728 }
2729
2730 #[tokio::test]
2731 async fn start_sync_execution_basic() {
2732 let svc = StepFunctionsService::new(make_state());
2733 let arn = create_express_sm(&svc, "sync-sm");
2734
2735 let body = json!({
2736 "stateMachineArn": arn,
2737 "input": r#"{"key":"value"}"#,
2738 });
2739 let req = make_request("StartSyncExecution", &body.to_string());
2740 let resp = svc.start_sync_execution(&req).await.unwrap();
2741 let b = body_json(&resp);
2742 assert!(b["executionArn"]
2743 .as_str()
2744 .unwrap()
2745 .contains("express:sync-sm"));
2746 assert_eq!(b["stateMachineArn"], arn);
2747 assert_eq!(b["status"], "SUCCEEDED");
2748 assert!(b["startDate"].as_i64().is_some());
2749 assert!(b["stopDate"].as_i64().is_some());
2750 assert!(b["output"].as_str().is_some());
2751 assert!(b["billingDetails"]["billedDurationInMilliseconds"]
2752 .as_i64()
2753 .is_some());
2754 }
2755
2756 #[tokio::test]
2757 async fn start_sync_execution_not_express() {
2758 let svc = StepFunctionsService::new(make_state());
2759 let arn = create_sm(&svc, "std-sm");
2760
2761 let body = json!({"stateMachineArn": arn});
2762 let req = make_request("StartSyncExecution", &body.to_string());
2763 let err = expect_err(svc.start_sync_execution(&req).await);
2764 assert!(err.to_string().contains("StateMachineTypeNotSupported"));
2765 }
2766
2767 #[tokio::test]
2768 async fn start_sync_execution_sm_not_found() {
2769 let svc = StepFunctionsService::new(make_state());
2770 let body = json!({
2771 "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
2772 });
2773 let req = make_request("StartSyncExecution", &body.to_string());
2774 let err = expect_err(svc.start_sync_execution(&req).await);
2775 assert!(err.to_string().contains("StateMachineDoesNotExist"));
2776 }
2777
2778 #[tokio::test]
2779 async fn start_sync_execution_records_introspection_fields() {
2780 let svc = StepFunctionsService::new(make_state());
2781 let arn = create_express_sm(&svc, "sync-introspect");
2782
2783 let body = json!({"stateMachineArn": arn, "input": "{}"});
2784 let req = make_request("StartSyncExecution", &body.to_string());
2785 let resp = svc.start_sync_execution(&req).await.unwrap();
2786 let b = body_json(&resp);
2787 let exec_arn = b["executionArn"].as_str().unwrap().to_string();
2788
2789 let accounts = svc.state.read();
2790 let state = accounts.get("123456789012").unwrap();
2791 let stored = state
2792 .executions
2793 .get(&exec_arn)
2794 .expect("sync execution should be persisted for introspection");
2795 assert!(stored.is_sync, "sync executions must be marked is_sync");
2796 assert_eq!(stored.billed_memory_mb, Some(64));
2797 assert!(
2798 stored.billed_duration_ms.is_some(),
2799 "billed_duration_ms must be populated after sync run"
2800 );
2801 assert!(
2802 stored.parent_execution_arn.is_none(),
2803 "top-level sync execution has no parent"
2804 );
2805 }
2806
2807 #[tokio::test]
2808 async fn start_sync_execution_invalid_input() {
2809 let svc = StepFunctionsService::new(make_state());
2810 let arn = create_express_sm(&svc, "bad-input-sync");
2811
2812 let body = json!({
2813 "stateMachineArn": arn,
2814 "input": "not json",
2815 });
2816 let req = make_request("StartSyncExecution", &body.to_string());
2817 let err = expect_err(svc.start_sync_execution(&req).await);
2818 assert!(err.to_string().contains("InvalidExecutionInput"));
2819 }
2820}