1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use chrono::Utc;
6use http::StatusCode;
7use serde_json::{json, Value};
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::pagination::paginate;
12use fakecloud_core::registry::ServiceRegistry;
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_core::validation::*;
15use fakecloud_dynamodb::SharedDynamoDbState;
16use fakecloud_persistence::SnapshotStore;
17
18use crate::interpreter;
19use crate::state::{
20 Execution, ExecutionStatus, SharedStepFunctionsState, StateMachine, StateMachineStatus,
21 StateMachineType, StepFunctionsSnapshot, StepFunctionsState,
22 STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
23};
24
25const SUPPORTED: &[&str] = &[
26 "CreateStateMachine",
27 "DescribeStateMachine",
28 "ListStateMachines",
29 "DeleteStateMachine",
30 "UpdateStateMachine",
31 "TagResource",
32 "UntagResource",
33 "ListTagsForResource",
34 "StartExecution",
35 "StopExecution",
36 "DescribeExecution",
37 "ListExecutions",
38 "GetExecutionHistory",
39 "DescribeStateMachineForExecution",
40 "CreateActivity",
41 "DeleteActivity",
42 "DescribeActivity",
43 "ListActivities",
44 "GetActivityTask",
45 "SendTaskFailure",
46 "SendTaskHeartbeat",
47 "SendTaskSuccess",
48 "PublishStateMachineVersion",
49 "DeleteStateMachineVersion",
50 "ListStateMachineVersions",
51 "CreateStateMachineAlias",
52 "DeleteStateMachineAlias",
53 "DescribeStateMachineAlias",
54 "ListStateMachineAliases",
55 "UpdateStateMachineAlias",
56 "DescribeMapRun",
57 "ListMapRuns",
58 "UpdateMapRun",
59 "RedriveExecution",
60 "StartSyncExecution",
61 "TestState",
62 "ValidateStateMachineDefinition",
63];
64
65pub type SharedServiceRegistry = Arc<std::sync::OnceLock<Arc<ServiceRegistry>>>;
71
72pub struct StepFunctionsService {
73 state: SharedStepFunctionsState,
74 delivery: Option<Arc<DeliveryBus>>,
75 dynamodb_state: Option<SharedDynamoDbState>,
76 registry: Option<SharedServiceRegistry>,
77 snapshot_store: Option<Arc<dyn SnapshotStore>>,
78 snapshot_lock: Arc<AsyncMutex<()>>,
79}
80
81mod activities;
82mod executions;
83mod map_runs;
84mod state_machines;
85mod tags;
86mod tasks;
87mod validation;
88
89impl StepFunctionsService {
90 pub fn new(state: SharedStepFunctionsState) -> Self {
91 Self {
92 state,
93 delivery: None,
94 dynamodb_state: None,
95 registry: None,
96 snapshot_store: None,
97 snapshot_lock: Arc::new(AsyncMutex::new(())),
98 }
99 }
100
101 pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
102 self.delivery = Some(delivery);
103 self
104 }
105
106 pub fn with_dynamodb(mut self, dynamodb_state: SharedDynamoDbState) -> Self {
107 self.dynamodb_state = Some(dynamodb_state);
108 self
109 }
110
111 pub fn with_registry(mut self, registry: SharedServiceRegistry) -> Self {
116 self.registry = Some(registry);
117 self
118 }
119
120 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
121 self.snapshot_store = Some(store);
122 self
123 }
124
125 async fn save_snapshot(&self) {
126 let Some(store) = self.snapshot_store.clone() else {
127 return;
128 };
129 let _guard = self.snapshot_lock.lock().await;
130 let snapshot = StepFunctionsSnapshot {
131 schema_version: STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
132 state: None,
133 accounts: Some(self.state.read().clone()),
134 };
135 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
136 let bytes = serde_json::to_vec(&snapshot)
137 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
138 store.save(&bytes)
139 })
140 .await;
141 match join {
142 Ok(Ok(())) => {}
143 Ok(Err(err)) => tracing::error!(%err, "failed to write stepfunctions snapshot"),
144 Err(err) => tracing::error!(%err, "stepfunctions snapshot task panicked"),
145 }
146 }
147}
148
149fn is_mutating_action(action: &str) -> bool {
150 matches!(
151 action,
152 "CreateStateMachine"
153 | "DeleteStateMachine"
154 | "UpdateStateMachine"
155 | "TagResource"
156 | "UntagResource"
157 | "StartExecution"
158 | "StopExecution"
159 | "CreateActivity"
160 | "DeleteActivity"
161 | "GetActivityTask"
162 | "SendTaskFailure"
163 | "SendTaskHeartbeat"
164 | "SendTaskSuccess"
165 | "PublishStateMachineVersion"
166 | "DeleteStateMachineVersion"
167 | "CreateStateMachineAlias"
168 | "DeleteStateMachineAlias"
169 | "UpdateStateMachineAlias"
170 | "UpdateMapRun"
171 | "RedriveExecution"
172 | "StartSyncExecution"
173 )
174}
175
176#[async_trait]
177impl AwsService for StepFunctionsService {
178 fn service_name(&self) -> &str {
179 "states"
180 }
181
182 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
183 let mutates = is_mutating_action(req.action.as_str());
184 let result = match req.action.as_str() {
185 "CreateStateMachine" => self.create_state_machine(&req),
186 "DescribeStateMachine" => self.describe_state_machine(&req),
187 "ListStateMachines" => self.list_state_machines(&req),
188 "DeleteStateMachine" => self.delete_state_machine(&req),
189 "UpdateStateMachine" => self.update_state_machine(&req),
190 "TagResource" => self.tag_resource(&req),
191 "UntagResource" => self.untag_resource(&req),
192 "ListTagsForResource" => self.list_tags_for_resource(&req),
193 "StartExecution" => self.start_execution(&req),
194 "StopExecution" => self.stop_execution(&req),
195 "DescribeExecution" => self.describe_execution(&req),
196 "ListExecutions" => self.list_executions(&req),
197 "GetExecutionHistory" => self.get_execution_history(&req),
198 "DescribeStateMachineForExecution" => self.describe_state_machine_for_execution(&req),
199 "CreateActivity" => self.create_activity(&req),
200 "DeleteActivity" => self.delete_activity(&req),
201 "DescribeActivity" => self.describe_activity(&req),
202 "ListActivities" => self.list_activities(&req),
203 "GetActivityTask" => self.get_activity_task(&req).await,
204 "SendTaskFailure" => self.send_task_failure(&req),
205 "SendTaskHeartbeat" => self.send_task_heartbeat(&req),
206 "SendTaskSuccess" => self.send_task_success(&req),
207 "PublishStateMachineVersion" => self.publish_state_machine_version(&req),
208 "DeleteStateMachineVersion" => self.delete_state_machine_version(&req),
209 "ListStateMachineVersions" => self.list_state_machine_versions(&req),
210 "CreateStateMachineAlias" => self.create_state_machine_alias(&req),
211 "DeleteStateMachineAlias" => self.delete_state_machine_alias(&req),
212 "DescribeStateMachineAlias" => self.describe_state_machine_alias(&req),
213 "ListStateMachineAliases" => self.list_state_machine_aliases(&req),
214 "UpdateStateMachineAlias" => self.update_state_machine_alias(&req),
215 "DescribeMapRun" => self.describe_map_run(&req),
216 "ListMapRuns" => self.list_map_runs(&req),
217 "UpdateMapRun" => self.update_map_run(&req),
218 "RedriveExecution" => self.redrive_execution(&req),
219 "StartSyncExecution" => self.start_sync_execution(&req).await,
220 "TestState" => self.test_state(&req),
221 "ValidateStateMachineDefinition" => self.validate_state_machine_definition(&req),
222 _ => Err(AwsServiceError::action_not_implemented(
223 "states",
224 &req.action,
225 )),
226 };
227 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
228 self.save_snapshot().await;
229 }
230 result
231 }
232
233 fn supported_actions(&self) -> &[&str] {
234 SUPPORTED
235 }
236}
237
238impl StepFunctionsService {
239 fn list_activities(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
240 let body = req.json_body();
241 let raw_max_results = body["maxResults"].as_i64();
242 if let Some(mr) = raw_max_results {
243 validate_max_results(mr)?;
244 }
245 let next_token = body["nextToken"].as_str();
246 if let Some(t) = next_token {
247 validate_page_token(t)?;
248 }
249 let max_results = match raw_max_results.unwrap_or(0) {
253 0 => 100,
254 n => n as usize,
255 };
256 let accounts = self.state.read();
257 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
258 let state = accounts.get(&req.account_id).unwrap_or(&empty);
259 let mut activities: Vec<&crate::state::Activity> = state.activities.values().collect();
260 activities.sort_by(|a, b| a.name.cmp(&b.name));
261 let items: Vec<Value> = activities
262 .iter()
263 .map(|a| {
264 json!({
265 "activityArn": a.arn,
266 "name": a.name,
267 "creationDate": a.creation_date.timestamp(),
268 })
269 })
270 .collect();
271 let (page, token) = paginate(&items, next_token, max_results);
272 let mut resp = json!({ "activities": page });
273 if let Some(t) = token {
274 resp["nextToken"] = json!(t);
275 }
276 Ok(AwsResponse::ok_json(resp))
277 }
278}
279
280fn state_machine_alias_to_json(alias: &crate::state::StateMachineAlias) -> Value {
281 json!({
282 "stateMachineAliasArn": alias.arn,
283 "name": alias.name,
284 "description": alias.description,
285 "routingConfiguration": alias.routing_configuration.iter().map(|r| json!({
286 "stateMachineVersionArn": r.state_machine_version_arn,
287 "weight": r.weight,
288 })).collect::<Vec<_>>(),
289 "creationDate": alias.creation_date.timestamp(),
290 "updateDate": alias.update_date.timestamp(),
291 })
292}
293
294fn map_run_to_json(mr: &crate::state::MapRun) -> Value {
295 json!({
296 "mapRunArn": mr.map_run_arn,
297 "executionArn": mr.execution_arn,
298 "maxConcurrency": mr.max_concurrency,
299 "toleratedFailurePercentage": mr.tolerated_failure_percentage,
300 "toleratedFailureCount": mr.tolerated_failure_count,
301 "status": mr.status,
302 "startDate": mr.start_date.timestamp(),
303 "stopDate": mr.stop_date.map(|d| d.timestamp()),
304 })
305}
306
307fn state_machine_to_json(sm: &StateMachine) -> Value {
310 let mut resp = json!({
311 "name": sm.name,
312 "stateMachineArn": sm.arn,
313 "definition": sm.definition,
314 "roleArn": sm.role_arn,
315 "type": sm.machine_type.as_str(),
316 "status": sm.status.as_str(),
317 "creationDate": sm.creation_date.timestamp() as f64,
318 "updateDate": sm.update_date.timestamp() as f64,
319 "revisionId": sm.revision_id,
320 "label": sm.name,
321 });
322
323 if !sm.description.is_empty() {
324 resp["description"] = json!(sm.description);
325 }
326
327 if let Some(ref logging) = sm.logging_configuration {
328 resp["loggingConfiguration"] = logging.clone();
329 } else {
330 resp["loggingConfiguration"] = json!({
331 "level": "OFF",
332 "includeExecutionData": false,
333 "destinations": [],
334 });
335 }
336
337 if let Some(ref tracing) = sm.tracing_configuration {
338 resp["tracingConfiguration"] = tracing.clone();
339 } else {
340 resp["tracingConfiguration"] = json!({
341 "enabled": false,
342 });
343 }
344
345 resp
346}
347
348fn missing(name: &str) -> AwsServiceError {
349 AwsServiceError::aws_error(
350 StatusCode::BAD_REQUEST,
351 "ValidationException",
352 format!("The request must contain the parameter {name}."),
353 )
354}
355
356fn state_machine_not_found(arn: &str) -> AwsServiceError {
357 AwsServiceError::aws_error(
358 StatusCode::BAD_REQUEST,
359 "StateMachineDoesNotExist",
360 format!("State Machine Does Not Exist: '{arn}'"),
361 )
362}
363
364fn activity_not_found(arn: &str) -> AwsServiceError {
365 AwsServiceError::aws_error(
366 StatusCode::BAD_REQUEST,
367 "ActivityDoesNotExist",
368 format!("Activity does not exist: {arn}"),
369 )
370}
371
372fn task_does_not_exist(token: &str) -> AwsServiceError {
373 AwsServiceError::aws_error(
374 StatusCode::BAD_REQUEST,
375 "TaskDoesNotExist",
376 format!("Task does not exist: {token}"),
377 )
378}
379
380fn resource_not_found(arn: &str) -> AwsServiceError {
381 AwsServiceError::aws_error(
382 StatusCode::BAD_REQUEST,
383 "ResourceNotFound",
384 format!("Resource not found: '{arn}'"),
385 )
386}
387
388fn parse_routing_configuration(
393 routes: &[serde_json::Value],
394) -> Result<Vec<crate::state::AliasRoute>, AwsServiceError> {
395 if routes.is_empty() || routes.len() > 2 {
396 return Err(AwsServiceError::aws_error(
397 StatusCode::BAD_REQUEST,
398 "ValidationException",
399 "routingConfiguration must contain 1 or 2 routes.",
400 ));
401 }
402 let parsed: Vec<crate::state::AliasRoute> = routes
403 .iter()
404 .map(|r| {
405 let arn = r["stateMachineVersionArn"].as_str().ok_or_else(|| {
406 AwsServiceError::aws_error(
407 StatusCode::BAD_REQUEST,
408 "ValidationException",
409 "routingConfiguration entries must contain stateMachineVersionArn.",
410 )
411 })?;
412 let weight = r["weight"].as_i64().ok_or_else(|| {
413 AwsServiceError::aws_error(
414 StatusCode::BAD_REQUEST,
415 "ValidationException",
416 "routingConfiguration entries must contain a numeric weight.",
417 )
418 })?;
419 if !(0..=100).contains(&weight) {
420 return Err(AwsServiceError::aws_error(
421 StatusCode::BAD_REQUEST,
422 "ValidationException",
423 format!("Invalid routing weight {weight}; must be 0-100."),
424 ));
425 }
426 Ok(crate::state::AliasRoute {
427 state_machine_version_arn: arn.to_string(),
428 weight: weight as i32,
429 })
430 })
431 .collect::<Result<_, _>>()?;
432 let total: i32 = parsed.iter().map(|r| r.weight).sum();
433 if total != 100 {
434 return Err(AwsServiceError::aws_error(
435 StatusCode::BAD_REQUEST,
436 "ValidationException",
437 format!("routingConfiguration weights must sum to 100, got {total}."),
438 ));
439 }
440 Ok(parsed)
441}
442
443fn validate_name(name: &str) -> Result<(), AwsServiceError> {
444 if name.is_empty() || name.len() > 80 {
445 return Err(AwsServiceError::aws_error(
446 StatusCode::BAD_REQUEST,
447 "InvalidName",
448 format!("Invalid Name: '{name}' (length must be between 1 and 80 characters)"),
449 ));
450 }
451 if !name
453 .chars()
454 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
455 {
456 return Err(AwsServiceError::aws_error(
457 StatusCode::BAD_REQUEST,
458 "InvalidName",
459 format!(
460 "Invalid Name: '{name}' (must only contain alphanumeric characters, hyphens, and underscores)"
461 ),
462 ));
463 }
464 Ok(())
465}
466
467fn validate_definition(definition: &str) -> Result<(), AwsServiceError> {
468 let parsed: Value = serde_json::from_str(definition).map_err(|e| {
469 AwsServiceError::aws_error(
470 StatusCode::BAD_REQUEST,
471 "InvalidDefinition",
472 format!("Invalid State Machine Definition: '{e}'"),
473 )
474 })?;
475
476 if parsed.get("StartAt").and_then(|v| v.as_str()).is_none() {
477 return Err(AwsServiceError::aws_error(
478 StatusCode::BAD_REQUEST,
479 "InvalidDefinition",
480 "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
481 .to_string(),
482 ));
483 }
484
485 let states_obj = parsed
486 .get("States")
487 .and_then(|v| v.as_object())
488 .ok_or_else(|| {
489 AwsServiceError::aws_error(
490 StatusCode::BAD_REQUEST,
491 "InvalidDefinition",
492 "Invalid State Machine Definition: 'MISSING_STATES' (States field is required)"
493 .to_string(),
494 )
495 })?;
496
497 let start_at = parsed["StartAt"].as_str().ok_or_else(|| {
498 AwsServiceError::aws_error(
499 StatusCode::BAD_REQUEST,
500 "InvalidDefinition",
501 "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
502 .to_string(),
503 )
504 })?;
505 if !states_obj.contains_key(start_at) {
506 return Err(AwsServiceError::aws_error(
507 StatusCode::BAD_REQUEST,
508 "InvalidDefinition",
509 format!(
510 "Invalid State Machine Definition: 'MISSING_TRANSITION_TARGET' \
511 (StartAt '{start_at}' does not reference a valid state)"
512 ),
513 ));
514 }
515
516 Ok(())
517}
518
519fn execution_not_found(arn: &str) -> AwsServiceError {
520 AwsServiceError::aws_error(
521 StatusCode::BAD_REQUEST,
522 "ExecutionDoesNotExist",
523 format!("Execution Does Not Exist: '{arn}'"),
524 )
525}
526
527fn execution_to_json(exec: &Execution) -> Value {
528 let mut resp = json!({
529 "executionArn": exec.execution_arn,
530 "stateMachineArn": exec.state_machine_arn,
531 "name": exec.name,
532 "status": exec.status.as_str(),
533 "startDate": exec.start_date.timestamp() as f64,
534 });
535
536 if let Some(ref input) = exec.input {
537 resp["input"] = json!(input);
538 }
539 if let Some(ref output) = exec.output {
540 resp["output"] = json!(output);
541 }
542 if let Some(stop) = exec.stop_date {
543 resp["stopDate"] = json!(stop.timestamp() as f64);
544 }
545 if let Some(ref error) = exec.error {
546 resp["error"] = json!(error);
547 }
548 if let Some(ref cause) = exec.cause {
549 resp["cause"] = json!(cause);
550 }
551
552 resp
553}
554
555fn camel_to_details_key(event_type: &str) -> String {
557 let mut chars = event_type.chars();
558 match chars.next() {
559 None => String::new(),
560 Some(c) => c.to_lowercase().to_string() + chars.as_str(),
561 }
562}
563
564fn validate_arn(arn: &str) -> Result<(), AwsServiceError> {
565 if !arn.starts_with("arn:") {
566 return Err(AwsServiceError::aws_error(
567 StatusCode::BAD_REQUEST,
568 "InvalidArn",
569 format!("Invalid Arn: '{arn}'"),
570 ));
571 }
572 Ok(())
573}
574
575fn validate_arn_length(field: &str, value: &str, max: usize) -> Result<(), AwsServiceError> {
579 if value.is_empty() || value.len() > max {
580 return Err(AwsServiceError::aws_error(
581 StatusCode::BAD_REQUEST,
582 "InvalidArn",
583 format!("Invalid Arn at '{field}': must be 1..={max} characters"),
584 ));
585 }
586 Ok(())
587}
588
589fn validate_page_token(value: &str) -> Result<(), AwsServiceError> {
593 if value.is_empty() || value.len() > 1024 {
594 return Err(AwsServiceError::aws_error(
595 StatusCode::BAD_REQUEST,
596 "InvalidToken",
597 "nextToken must be 1..=1024 characters",
598 ));
599 }
600 Ok(())
601}
602
603fn validate_max_results(value: i64) -> Result<(), AwsServiceError> {
609 if !(0..=1000).contains(&value) {
610 return Err(AwsServiceError::aws_error(
611 StatusCode::BAD_REQUEST,
612 "InvalidToken",
613 format!("maxResults '{value}' is outside 0..=1000"),
614 ));
615 }
616 Ok(())
617}
618
619pub fn start_execution_from_delivery(
628 state: &SharedStepFunctionsState,
629 delivery: &Option<Arc<DeliveryBus>>,
630 dynamodb_state: &Option<SharedDynamoDbState>,
631 registry: &Option<SharedServiceRegistry>,
632 state_machine_arn: &str,
633 input: &str,
634) {
635 if serde_json::from_str::<serde_json::Value>(input).is_err() {
637 tracing::warn!(
638 state_machine_arn,
639 "Step Functions delivery: invalid JSON input, skipping execution"
640 );
641 return;
642 }
643
644 let execution_name = uuid::Uuid::new_v4().to_string();
645
646 let account_id = state_machine_arn
648 .split(':')
649 .nth(4)
650 .unwrap_or("000000000000")
651 .to_string();
652
653 let mut accounts = state.write();
654 let st = accounts.get_or_create(&account_id);
655 let sm = match st.state_machines.get(state_machine_arn) {
656 Some(sm) => sm,
657 None => {
658 tracing::warn!(
659 state_machine_arn,
660 "Step Functions delivery: state machine not found"
661 );
662 return;
663 }
664 };
665
666 let sm_name = sm.name.clone();
667 let definition = sm.definition.clone();
668 let exec_arn = st.execution_arn(&sm_name, &execution_name);
669
670 let now = Utc::now();
671 let execution = Execution {
672 execution_arn: exec_arn.clone(),
673 state_machine_arn: state_machine_arn.to_string(),
674 state_machine_name: sm_name,
675 name: execution_name,
676 status: ExecutionStatus::Running,
677 input: Some(input.to_string()),
678 output: None,
679 start_date: now,
680 stop_date: None,
681 error: None,
682 cause: None,
683 history_events: vec![],
684 parent_execution_arn: None,
685 is_sync: false,
686 billed_duration_ms: None,
687 billed_memory_mb: None,
688 };
689
690 st.executions.insert(exec_arn.clone(), execution);
691 let logging_config = sm.logging_configuration.clone();
692 drop(accounts);
693
694 let shared_state = state.clone();
695 let delivery = delivery.clone();
696 let dynamodb_state = dynamodb_state.clone();
697 let registry = registry.clone();
698 let input = Some(input.to_string());
699 tokio::spawn(async move {
700 interpreter::execute_state_machine(
701 shared_state,
702 exec_arn,
703 definition,
704 input,
705 delivery,
706 dynamodb_state,
707 registry,
708 logging_config,
709 )
710 .await;
711 });
712}
713
714#[cfg(test)]
715mod tests {
716 use super::*;
717 use http::{HeaderMap, Method};
718 use parking_lot::RwLock;
719 use serde_json::Value;
720 use std::collections::HashMap;
721 use std::sync::Arc;
722
723 fn make_state() -> SharedStepFunctionsState {
724 Arc::new(RwLock::new(
725 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
726 ))
727 }
728
729 fn make_request(action: &str, body: &str) -> AwsRequest {
730 AwsRequest {
731 service: "states".to_string(),
732 action: action.to_string(),
733 region: "us-east-1".to_string(),
734 account_id: "123456789012".to_string(),
735 request_id: "test-id".to_string(),
736 headers: HeaderMap::new(),
737 query_params: HashMap::new(),
738 body: body.as_bytes().to_vec().into(),
739 body_stream: parking_lot::Mutex::new(None),
740 path_segments: vec![],
741 raw_path: "/".to_string(),
742 raw_query: String::new(),
743 method: Method::POST,
744 is_query_protocol: false,
745 access_key_id: None,
746 principal: None,
747 }
748 }
749
750 fn body_json(resp: &AwsResponse) -> Value {
751 serde_json::from_slice(resp.body.expect_bytes()).unwrap()
752 }
753
754 fn expect_err(result: Result<AwsResponse, AwsServiceError>) -> AwsServiceError {
755 match result {
756 Err(e) => e,
757 Ok(_) => panic!("expected error, got Ok"),
758 }
759 }
760
761 const VALID_DEF: &str = r#"{"StartAt":"Pass","States":{"Pass":{"Type":"Pass","End":true}}}"#;
762
763 fn create_sm(svc: &StepFunctionsService, name: &str) -> String {
764 let body = json!({
765 "name": name,
766 "definition": VALID_DEF,
767 "roleArn": "arn:aws:iam::123456789012:role/test",
768 });
769 let req = make_request("CreateStateMachine", &body.to_string());
770 let resp = svc.create_state_machine(&req).unwrap();
771 let b = body_json(&resp);
772 b["stateMachineArn"].as_str().unwrap().to_string()
773 }
774
775 #[test]
778 fn create_state_machine_basic() {
779 let svc = StepFunctionsService::new(make_state());
780 let arn = create_sm(&svc, "test-sm");
781 assert!(arn.contains("test-sm"));
782 }
783
784 #[test]
785 fn create_state_machine_with_express_type() {
786 let svc = StepFunctionsService::new(make_state());
787 let body = json!({
788 "name": "express-sm",
789 "definition": VALID_DEF,
790 "roleArn": "arn:aws:iam::123456789012:role/r",
791 "type": "EXPRESS",
792 });
793 let req = make_request("CreateStateMachine", &body.to_string());
794 let resp = svc.create_state_machine(&req).unwrap();
795 let b = body_json(&resp);
796 assert!(b["stateMachineArn"].as_str().is_some());
797 }
798
799 #[test]
800 fn create_state_machine_duplicate_fails() {
801 let svc = StepFunctionsService::new(make_state());
802 create_sm(&svc, "dup-sm");
803 let body = json!({
804 "name": "dup-sm",
805 "definition": VALID_DEF,
806 "roleArn": "arn:aws:iam::123456789012:role/r",
807 });
808 let req = make_request("CreateStateMachine", &body.to_string());
809 let err = expect_err(svc.create_state_machine(&req));
810 assert!(err.to_string().contains("StateMachineAlreadyExists"));
811 }
812
813 #[test]
814 fn create_state_machine_missing_name() {
815 let svc = StepFunctionsService::new(make_state());
816 let body = json!({
817 "definition": VALID_DEF,
818 "roleArn": "arn:aws:iam::123456789012:role/r",
819 });
820 let req = make_request("CreateStateMachine", &body.to_string());
821 assert!(svc.create_state_machine(&req).is_err());
822 }
823
824 #[test]
825 fn create_state_machine_invalid_definition() {
826 let svc = StepFunctionsService::new(make_state());
827 let body = json!({
828 "name": "bad-def",
829 "definition": "not json",
830 "roleArn": "arn:aws:iam::123456789012:role/r",
831 });
832 let req = make_request("CreateStateMachine", &body.to_string());
833 let err = expect_err(svc.create_state_machine(&req));
834 assert!(err.to_string().contains("InvalidDefinition"));
835 }
836
837 #[test]
838 fn create_state_machine_definition_missing_start_at() {
839 let svc = StepFunctionsService::new(make_state());
840 let body = json!({
841 "name": "no-start",
842 "definition": r#"{"States":{"S":{"Type":"Pass","End":true}}}"#,
843 "roleArn": "arn:aws:iam::123456789012:role/r",
844 });
845 let req = make_request("CreateStateMachine", &body.to_string());
846 let err = expect_err(svc.create_state_machine(&req));
847 assert!(err.to_string().contains("InvalidDefinition"));
848 }
849
850 #[test]
851 fn create_state_machine_definition_missing_states() {
852 let svc = StepFunctionsService::new(make_state());
853 let body = json!({
854 "name": "no-states",
855 "definition": r#"{"StartAt":"S"}"#,
856 "roleArn": "arn:aws:iam::123456789012:role/r",
857 });
858 let req = make_request("CreateStateMachine", &body.to_string());
859 let err = expect_err(svc.create_state_machine(&req));
860 assert!(err.to_string().contains("InvalidDefinition"));
861 }
862
863 #[test]
864 fn create_state_machine_definition_start_at_not_in_states() {
865 let svc = StepFunctionsService::new(make_state());
866 let body = json!({
867 "name": "bad-start",
868 "definition": r#"{"StartAt":"Missing","States":{"S":{"Type":"Pass","End":true}}}"#,
869 "roleArn": "arn:aws:iam::123456789012:role/r",
870 });
871 let req = make_request("CreateStateMachine", &body.to_string());
872 let err = expect_err(svc.create_state_machine(&req));
873 assert!(err.to_string().contains("MISSING_TRANSITION_TARGET"));
874 }
875
876 #[test]
877 fn create_state_machine_invalid_type() {
878 let svc = StepFunctionsService::new(make_state());
879 let body = json!({
880 "name": "bad-type",
881 "definition": VALID_DEF,
882 "roleArn": "arn:aws:iam::123456789012:role/r",
883 "type": "INVALID",
884 });
885 let req = make_request("CreateStateMachine", &body.to_string());
886 assert!(svc.create_state_machine(&req).is_err());
887 }
888
889 #[test]
890 fn create_state_machine_invalid_arn() {
891 let svc = StepFunctionsService::new(make_state());
892 let body = json!({
893 "name": "bad-arn",
894 "definition": VALID_DEF,
895 "roleArn": "not-an-arn",
896 });
897 let req = make_request("CreateStateMachine", &body.to_string());
898 let err = expect_err(svc.create_state_machine(&req));
899 assert!(err.to_string().contains("InvalidArn"));
900 }
901
902 #[test]
903 fn create_state_machine_invalid_name() {
904 let svc = StepFunctionsService::new(make_state());
905 let body = json!({
906 "name": "has spaces!",
907 "definition": VALID_DEF,
908 "roleArn": "arn:aws:iam::123456789012:role/r",
909 });
910 let req = make_request("CreateStateMachine", &body.to_string());
911 let err = expect_err(svc.create_state_machine(&req));
912 assert!(err.to_string().contains("InvalidName"));
913 }
914
915 #[test]
916 fn create_state_machine_name_too_long() {
917 let svc = StepFunctionsService::new(make_state());
918 let long_name = "a".repeat(81);
919 let body = json!({
920 "name": long_name,
921 "definition": VALID_DEF,
922 "roleArn": "arn:aws:iam::123456789012:role/r",
923 });
924 let req = make_request("CreateStateMachine", &body.to_string());
925 let err = expect_err(svc.create_state_machine(&req));
926 assert!(err.to_string().contains("InvalidName"));
927 }
928
929 #[test]
932 fn describe_state_machine_found() {
933 let svc = StepFunctionsService::new(make_state());
934 let arn = create_sm(&svc, "desc-sm");
935
936 let req = make_request(
937 "DescribeStateMachine",
938 &json!({"stateMachineArn": arn}).to_string(),
939 );
940 let resp = svc.describe_state_machine(&req).unwrap();
941 let b = body_json(&resp);
942 assert_eq!(b["name"], "desc-sm");
943 assert_eq!(b["status"], "ACTIVE");
944 assert!(b["definition"].as_str().is_some());
945 }
946
947 #[test]
948 fn describe_state_machine_not_found() {
949 let svc = StepFunctionsService::new(make_state());
950 let req = make_request(
951 "DescribeStateMachine",
952 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
953 .to_string(),
954 );
955 let err = expect_err(svc.describe_state_machine(&req));
956 assert!(err.to_string().contains("StateMachineDoesNotExist"));
957 }
958
959 #[test]
962 fn list_state_machines_empty() {
963 let svc = StepFunctionsService::new(make_state());
964 let req = make_request("ListStateMachines", "{}");
965 let resp = svc.list_state_machines(&req).unwrap();
966 let b = body_json(&resp);
967 assert!(b["stateMachines"].as_array().unwrap().is_empty());
968 }
969
970 #[test]
971 fn list_state_machines_returns_created() {
972 let svc = StepFunctionsService::new(make_state());
973 create_sm(&svc, "sm-1");
974 create_sm(&svc, "sm-2");
975
976 let req = make_request("ListStateMachines", "{}");
977 let resp = svc.list_state_machines(&req).unwrap();
978 let b = body_json(&resp);
979 assert_eq!(b["stateMachines"].as_array().unwrap().len(), 2);
980 }
981
982 #[test]
985 fn delete_state_machine() {
986 let svc = StepFunctionsService::new(make_state());
987 let arn = create_sm(&svc, "del-sm");
988
989 let req = make_request(
990 "DeleteStateMachine",
991 &json!({"stateMachineArn": arn}).to_string(),
992 );
993 svc.delete_state_machine(&req).unwrap();
994
995 let req = make_request(
997 "DescribeStateMachine",
998 &json!({"stateMachineArn": arn}).to_string(),
999 );
1000 assert!(svc.describe_state_machine(&req).is_err());
1001 }
1002
1003 #[test]
1004 fn delete_state_machine_nonexistent_succeeds() {
1005 let svc = StepFunctionsService::new(make_state());
1006 let req = make_request(
1007 "DeleteStateMachine",
1008 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1009 .to_string(),
1010 );
1011 svc.delete_state_machine(&req).unwrap();
1013 }
1014
1015 #[test]
1018 fn update_state_machine() {
1019 let svc = StepFunctionsService::new(make_state());
1020 let arn = create_sm(&svc, "upd-sm");
1021
1022 let new_def = r#"{"StartAt":"NewPass","States":{"NewPass":{"Type":"Pass","End":true}}}"#;
1023 let body = json!({
1024 "stateMachineArn": arn,
1025 "definition": new_def,
1026 "description": "updated",
1027 });
1028 let req = make_request("UpdateStateMachine", &body.to_string());
1029 let resp = svc.update_state_machine(&req).unwrap();
1030 let b = body_json(&resp);
1031 assert!(b["updateDate"].as_f64().is_some());
1032
1033 let req = make_request(
1035 "DescribeStateMachine",
1036 &json!({"stateMachineArn": arn}).to_string(),
1037 );
1038 let resp = svc.describe_state_machine(&req).unwrap();
1039 let b = body_json(&resp);
1040 assert!(b["definition"].as_str().unwrap().contains("NewPass"));
1041 assert_eq!(b["description"], "updated");
1042 }
1043
1044 #[test]
1045 fn update_state_machine_not_found() {
1046 let svc = StepFunctionsService::new(make_state());
1047 let body = json!({
1048 "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1049 "definition": VALID_DEF,
1050 });
1051 let req = make_request("UpdateStateMachine", &body.to_string());
1052 let err = expect_err(svc.update_state_machine(&req));
1053 assert!(err.to_string().contains("StateMachineDoesNotExist"));
1054 }
1055
1056 #[tokio::test]
1059 async fn start_execution_basic() {
1060 let svc = StepFunctionsService::new(make_state());
1061 let arn = create_sm(&svc, "exec-sm");
1062
1063 let body = json!({
1064 "stateMachineArn": arn,
1065 "input": r#"{"key":"value"}"#,
1066 });
1067 let req = make_request("StartExecution", &body.to_string());
1068 let resp = svc.start_execution(&req).unwrap();
1069 let b = body_json(&resp);
1070 assert!(b["executionArn"].as_str().is_some());
1071 assert!(b["startDate"].as_f64().is_some());
1072 }
1073
1074 #[tokio::test]
1075 async fn start_execution_with_name() {
1076 let svc = StepFunctionsService::new(make_state());
1077 let arn = create_sm(&svc, "named-exec");
1078
1079 let body = json!({
1080 "stateMachineArn": arn,
1081 "name": "my-execution",
1082 });
1083 let req = make_request("StartExecution", &body.to_string());
1084 let resp = svc.start_execution(&req).unwrap();
1085 let b = body_json(&resp);
1086 assert!(b["executionArn"].as_str().unwrap().contains("my-execution"));
1087 }
1088
1089 #[tokio::test]
1090 async fn start_execution_sm_not_found() {
1091 let svc = StepFunctionsService::new(make_state());
1092 let body = json!({
1093 "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1094 });
1095 let req = make_request("StartExecution", &body.to_string());
1096 let err = expect_err(svc.start_execution(&req));
1097 assert!(err.to_string().contains("StateMachineDoesNotExist"));
1098 }
1099
1100 #[tokio::test]
1101 async fn start_execution_invalid_input() {
1102 let svc = StepFunctionsService::new(make_state());
1103 let arn = create_sm(&svc, "bad-input");
1104
1105 let body = json!({
1106 "stateMachineArn": arn,
1107 "input": "not json",
1108 });
1109 let req = make_request("StartExecution", &body.to_string());
1110 let err = expect_err(svc.start_execution(&req));
1111 assert!(err.to_string().contains("InvalidExecutionInput"));
1112 }
1113
1114 #[tokio::test]
1115 async fn start_execution_duplicate_name() {
1116 let svc = StepFunctionsService::new(make_state());
1117 let arn = create_sm(&svc, "dup-exec");
1118
1119 let body = json!({
1120 "stateMachineArn": arn,
1121 "name": "same-name",
1122 });
1123 let req = make_request("StartExecution", &body.to_string());
1124 svc.start_execution(&req).unwrap();
1125
1126 let req = make_request("StartExecution", &body.to_string());
1127 let err = expect_err(svc.start_execution(&req));
1128 assert!(err.to_string().contains("ExecutionAlreadyExists"));
1129 }
1130
1131 #[tokio::test]
1134 async fn describe_execution_found() {
1135 let svc = StepFunctionsService::new(make_state());
1136 let sm_arn = create_sm(&svc, "desc-exec");
1137
1138 let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
1139 let req = make_request("StartExecution", &body.to_string());
1140 let resp = svc.start_execution(&req).unwrap();
1141 let exec_arn = body_json(&resp)["executionArn"]
1142 .as_str()
1143 .unwrap()
1144 .to_string();
1145
1146 let req = make_request(
1147 "DescribeExecution",
1148 &json!({"executionArn": exec_arn}).to_string(),
1149 );
1150 let resp = svc.describe_execution(&req).unwrap();
1151 let b = body_json(&resp);
1152 assert_eq!(b["name"], "e1");
1153 assert_eq!(b["status"], "RUNNING");
1154 }
1155
1156 #[tokio::test]
1157 async fn describe_execution_not_found() {
1158 let svc = StepFunctionsService::new(make_state());
1159 let req = make_request(
1160 "DescribeExecution",
1161 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1162 .to_string(),
1163 );
1164 let err = expect_err(svc.describe_execution(&req));
1165 assert!(err.to_string().contains("ExecutionDoesNotExist"));
1166 }
1167
1168 #[tokio::test]
1171 async fn stop_execution() {
1172 let svc = StepFunctionsService::new(make_state());
1173 let sm_arn = create_sm(&svc, "stop-sm");
1174
1175 let body = json!({"stateMachineArn": sm_arn, "name": "stop-e"});
1176 let req = make_request("StartExecution", &body.to_string());
1177 let resp = svc.start_execution(&req).unwrap();
1178 let exec_arn = body_json(&resp)["executionArn"]
1179 .as_str()
1180 .unwrap()
1181 .to_string();
1182
1183 let body = json!({
1184 "executionArn": exec_arn,
1185 "error": "UserAborted",
1186 "cause": "test stop",
1187 });
1188 let req = make_request("StopExecution", &body.to_string());
1189 let resp = svc.stop_execution(&req).unwrap();
1190 let b = body_json(&resp);
1191 assert!(b["stopDate"].as_f64().is_some());
1192
1193 let req = make_request(
1195 "DescribeExecution",
1196 &json!({"executionArn": exec_arn}).to_string(),
1197 );
1198 let resp = svc.describe_execution(&req).unwrap();
1199 let b = body_json(&resp);
1200 assert_eq!(b["status"], "ABORTED");
1201 assert_eq!(b["error"], "UserAborted");
1202 }
1203
1204 #[tokio::test]
1205 async fn stop_execution_not_found() {
1206 let svc = StepFunctionsService::new(make_state());
1207 let req = make_request(
1208 "StopExecution",
1209 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1210 .to_string(),
1211 );
1212 let err = expect_err(svc.stop_execution(&req));
1213 assert!(err.to_string().contains("ExecutionDoesNotExist"));
1214 }
1215
1216 #[tokio::test]
1219 async fn list_executions() {
1220 let svc = StepFunctionsService::new(make_state());
1221 let sm_arn = create_sm(&svc, "list-exec");
1222
1223 for i in 0..3 {
1224 let body = json!({"stateMachineArn": sm_arn, "name": format!("e{i}")});
1225 let req = make_request("StartExecution", &body.to_string());
1226 svc.start_execution(&req).unwrap();
1227 }
1228
1229 let req = make_request(
1230 "ListExecutions",
1231 &json!({"stateMachineArn": sm_arn}).to_string(),
1232 );
1233 let resp = svc.list_executions(&req).unwrap();
1234 let b = body_json(&resp);
1235 assert_eq!(b["executions"].as_array().unwrap().len(), 3);
1236 }
1237
1238 #[tokio::test]
1239 async fn list_executions_sm_not_found() {
1240 let svc = StepFunctionsService::new(make_state());
1241 let req = make_request(
1242 "ListExecutions",
1243 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1244 .to_string(),
1245 );
1246 let err = expect_err(svc.list_executions(&req));
1247 assert!(err.to_string().contains("StateMachineDoesNotExist"));
1248 }
1249
1250 #[tokio::test]
1253 async fn get_execution_history_not_found() {
1254 let svc = StepFunctionsService::new(make_state());
1255 let req = make_request(
1256 "GetExecutionHistory",
1257 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1258 .to_string(),
1259 );
1260 let err = expect_err(svc.get_execution_history(&req));
1261 assert!(err.to_string().contains("ExecutionDoesNotExist"));
1262 }
1263
1264 #[tokio::test]
1267 async fn describe_sm_for_execution() {
1268 let svc = StepFunctionsService::new(make_state());
1269 let sm_arn = create_sm(&svc, "sm-for-exec");
1270
1271 let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
1272 let req = make_request("StartExecution", &body.to_string());
1273 let resp = svc.start_execution(&req).unwrap();
1274 let exec_arn = body_json(&resp)["executionArn"]
1275 .as_str()
1276 .unwrap()
1277 .to_string();
1278
1279 let req = make_request(
1280 "DescribeStateMachineForExecution",
1281 &json!({"executionArn": exec_arn}).to_string(),
1282 );
1283 let resp = svc.describe_state_machine_for_execution(&req).unwrap();
1284 let b = body_json(&resp);
1285 assert_eq!(b["name"], "sm-for-exec");
1286 }
1287
1288 #[test]
1291 fn tag_untag_list_tags() {
1292 let svc = StepFunctionsService::new(make_state());
1293 let arn = create_sm(&svc, "tagged-sm");
1294
1295 let body = json!({
1297 "resourceArn": arn,
1298 "tags": [{"key": "env", "value": "prod"}],
1299 });
1300 let req = make_request("TagResource", &body.to_string());
1301 svc.tag_resource(&req).unwrap();
1302
1303 let req = make_request(
1305 "ListTagsForResource",
1306 &json!({"resourceArn": arn}).to_string(),
1307 );
1308 let resp = svc.list_tags_for_resource(&req).unwrap();
1309 let b = body_json(&resp);
1310 let tags = b["tags"].as_array().unwrap();
1311 assert_eq!(tags.len(), 1);
1312 assert_eq!(tags[0]["key"], "env");
1313
1314 let body = json!({
1316 "resourceArn": arn,
1317 "tagKeys": ["env"],
1318 });
1319 let req = make_request("UntagResource", &body.to_string());
1320 svc.untag_resource(&req).unwrap();
1321
1322 let req = make_request(
1324 "ListTagsForResource",
1325 &json!({"resourceArn": arn}).to_string(),
1326 );
1327 let resp = svc.list_tags_for_resource(&req).unwrap();
1328 let b = body_json(&resp);
1329 assert!(b["tags"].as_array().unwrap().is_empty());
1330 }
1331
1332 #[test]
1333 fn tag_resource_not_found() {
1334 let svc = StepFunctionsService::new(make_state());
1335 let body = json!({
1336 "resourceArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1337 "tags": [{"key": "k", "value": "v"}],
1338 });
1339 let req = make_request("TagResource", &body.to_string());
1340 let err = expect_err(svc.tag_resource(&req));
1341 assert!(err.to_string().contains("ResourceNotFound"));
1342 }
1343
1344 #[test]
1347 fn test_validate_name() {
1348 assert!(validate_name("valid-name").is_ok());
1349 assert!(validate_name("under_score").is_ok());
1350 assert!(validate_name("").is_err());
1351 assert!(validate_name("has spaces").is_err());
1352 assert!(validate_name(&"a".repeat(81)).is_err());
1353 }
1354
1355 #[test]
1356 fn test_validate_definition() {
1357 assert!(validate_definition(VALID_DEF).is_ok());
1358 assert!(validate_definition("not json").is_err());
1359 assert!(validate_definition(r#"{"States":{}}"#).is_err()); assert!(validate_definition(r#"{"StartAt":"S"}"#).is_err()); }
1362
1363 #[test]
1364 fn test_validate_arn() {
1365 assert!(validate_arn("arn:aws:states:us-east-1:123:sm:test").is_ok());
1366 assert!(validate_arn("not-an-arn").is_err());
1367 }
1368
1369 #[test]
1370 fn test_camel_to_details_key() {
1371 assert_eq!(camel_to_details_key("PassStateEntered"), "passStateEntered");
1372 assert_eq!(camel_to_details_key(""), "");
1373 }
1374
1375 #[test]
1376 fn test_is_mutating_action() {
1377 assert!(is_mutating_action("CreateStateMachine"));
1378 assert!(is_mutating_action("StartExecution"));
1379 assert!(!is_mutating_action("DescribeStateMachine"));
1380 assert!(!is_mutating_action("ListStateMachines"));
1381 }
1382
1383 fn create_express_sm(svc: &StepFunctionsService, name: &str) -> String {
1386 let body = json!({
1387 "name": name,
1388 "definition": VALID_DEF,
1389 "roleArn": "arn:aws:iam::123456789012:role/test",
1390 "type": "EXPRESS",
1391 });
1392 let req = make_request("CreateStateMachine", &body.to_string());
1393 let resp = svc.create_state_machine(&req).unwrap();
1394 let b = body_json(&resp);
1395 b["stateMachineArn"].as_str().unwrap().to_string()
1396 }
1397
1398 #[tokio::test]
1399 async fn start_sync_execution_basic() {
1400 let svc = StepFunctionsService::new(make_state());
1401 let arn = create_express_sm(&svc, "sync-sm");
1402
1403 let body = json!({
1404 "stateMachineArn": arn,
1405 "input": r#"{"key":"value"}"#,
1406 });
1407 let req = make_request("StartSyncExecution", &body.to_string());
1408 let resp = svc.start_sync_execution(&req).await.unwrap();
1409 let b = body_json(&resp);
1410 assert!(b["executionArn"]
1411 .as_str()
1412 .unwrap()
1413 .contains("express:sync-sm"));
1414 assert_eq!(b["stateMachineArn"], arn);
1415 assert_eq!(b["status"], "SUCCEEDED");
1416 assert!(b["startDate"].as_i64().is_some());
1417 assert!(b["stopDate"].as_i64().is_some());
1418 assert!(b["output"].as_str().is_some());
1419 assert!(b["billingDetails"]["billedDurationInMilliseconds"]
1420 .as_i64()
1421 .is_some());
1422 }
1423
1424 #[tokio::test]
1425 async fn start_sync_execution_not_express() {
1426 let svc = StepFunctionsService::new(make_state());
1427 let arn = create_sm(&svc, "std-sm");
1428
1429 let body = json!({"stateMachineArn": arn});
1430 let req = make_request("StartSyncExecution", &body.to_string());
1431 let err = expect_err(svc.start_sync_execution(&req).await);
1432 assert!(err.to_string().contains("StateMachineTypeNotSupported"));
1433 }
1434
1435 #[tokio::test]
1436 async fn start_sync_execution_sm_not_found() {
1437 let svc = StepFunctionsService::new(make_state());
1438 let body = json!({
1439 "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1440 });
1441 let req = make_request("StartSyncExecution", &body.to_string());
1442 let err = expect_err(svc.start_sync_execution(&req).await);
1443 assert!(err.to_string().contains("StateMachineDoesNotExist"));
1444 }
1445
1446 #[tokio::test]
1447 async fn start_sync_execution_records_introspection_fields() {
1448 let svc = StepFunctionsService::new(make_state());
1449 let arn = create_express_sm(&svc, "sync-introspect");
1450
1451 let body = json!({"stateMachineArn": arn, "input": "{}"});
1452 let req = make_request("StartSyncExecution", &body.to_string());
1453 let resp = svc.start_sync_execution(&req).await.unwrap();
1454 let b = body_json(&resp);
1455 let exec_arn = b["executionArn"].as_str().unwrap().to_string();
1456
1457 let accounts = svc.state.read();
1458 let state = accounts.get("123456789012").unwrap();
1459 let stored = state
1460 .executions
1461 .get(&exec_arn)
1462 .expect("sync execution should be persisted for introspection");
1463 assert!(stored.is_sync, "sync executions must be marked is_sync");
1464 assert_eq!(stored.billed_memory_mb, Some(64));
1465 assert!(
1466 stored.billed_duration_ms.is_some(),
1467 "billed_duration_ms must be populated after sync run"
1468 );
1469 assert!(
1470 stored.parent_execution_arn.is_none(),
1471 "top-level sync execution has no parent"
1472 );
1473 }
1474
1475 #[tokio::test]
1476 async fn start_sync_execution_invalid_input() {
1477 let svc = StepFunctionsService::new(make_state());
1478 let arn = create_express_sm(&svc, "bad-input-sync");
1479
1480 let body = json!({
1481 "stateMachineArn": arn,
1482 "input": "not json",
1483 });
1484 let req = make_request("StartSyncExecution", &body.to_string());
1485 let err = expect_err(svc.start_sync_execution(&req).await);
1486 assert!(err.to_string().contains("InvalidExecutionInput"));
1487 }
1488}