1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use chrono::Utc;
6use http::StatusCode;
7use serde_json::{json, Value};
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::pagination::paginate_checked;
12use fakecloud_core::registry::ServiceRegistry;
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_core::validation::*;
15use fakecloud_dynamodb::SharedDynamoDbState;
16use fakecloud_persistence::SnapshotStore;
17
18use crate::interpreter;
19use crate::state::{
20 Execution, ExecutionStatus, SharedStepFunctionsState, StateMachine, StateMachineStatus,
21 StateMachineType, StepFunctionsSnapshot, StepFunctionsState,
22 STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
23};
24
25const SUPPORTED: &[&str] = &[
26 "CreateStateMachine",
27 "DescribeStateMachine",
28 "ListStateMachines",
29 "DeleteStateMachine",
30 "UpdateStateMachine",
31 "TagResource",
32 "UntagResource",
33 "ListTagsForResource",
34 "StartExecution",
35 "StopExecution",
36 "DescribeExecution",
37 "ListExecutions",
38 "GetExecutionHistory",
39 "DescribeStateMachineForExecution",
40 "CreateActivity",
41 "DeleteActivity",
42 "DescribeActivity",
43 "ListActivities",
44 "GetActivityTask",
45 "SendTaskFailure",
46 "SendTaskHeartbeat",
47 "SendTaskSuccess",
48 "PublishStateMachineVersion",
49 "DeleteStateMachineVersion",
50 "ListStateMachineVersions",
51 "CreateStateMachineAlias",
52 "DeleteStateMachineAlias",
53 "DescribeStateMachineAlias",
54 "ListStateMachineAliases",
55 "UpdateStateMachineAlias",
56 "DescribeMapRun",
57 "ListMapRuns",
58 "UpdateMapRun",
59 "RedriveExecution",
60 "StartSyncExecution",
61 "TestState",
62 "ValidateStateMachineDefinition",
63];
64
65pub type SharedServiceRegistry = Arc<std::sync::OnceLock<Arc<ServiceRegistry>>>;
71
72pub struct StepFunctionsService {
73 state: SharedStepFunctionsState,
74 delivery: Option<Arc<DeliveryBus>>,
75 dynamodb_state: Option<SharedDynamoDbState>,
76 registry: Option<SharedServiceRegistry>,
77 snapshot_store: Option<Arc<dyn SnapshotStore>>,
78 snapshot_lock: Arc<AsyncMutex<()>>,
79}
80
81mod activities;
82mod executions;
83mod map_runs;
84mod state_machines;
85mod tags;
86mod tasks;
87mod validation;
88
89impl StepFunctionsService {
90 pub fn new(state: SharedStepFunctionsState) -> Self {
91 Self {
92 state,
93 delivery: None,
94 dynamodb_state: None,
95 registry: None,
96 snapshot_store: None,
97 snapshot_lock: Arc::new(AsyncMutex::new(())),
98 }
99 }
100
101 pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
102 self.delivery = Some(delivery);
103 self
104 }
105
106 pub fn with_dynamodb(mut self, dynamodb_state: SharedDynamoDbState) -> Self {
107 self.dynamodb_state = Some(dynamodb_state);
108 self
109 }
110
111 pub fn with_registry(mut self, registry: SharedServiceRegistry) -> Self {
116 self.registry = Some(registry);
117 self
118 }
119
120 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
121 self.snapshot_store = Some(store);
122 self
123 }
124
125 async fn save_snapshot(&self) {
126 let Some(store) = self.snapshot_store.clone() else {
127 return;
128 };
129 let _guard = self.snapshot_lock.lock().await;
130 let snapshot = StepFunctionsSnapshot {
131 schema_version: STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
132 state: None,
133 accounts: Some(self.state.read().clone()),
134 };
135 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
136 let bytes = serde_json::to_vec(&snapshot)
137 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
138 store.save(&bytes)
139 })
140 .await;
141 match join {
142 Ok(Ok(())) => {}
143 Ok(Err(err)) => tracing::error!(%err, "failed to write stepfunctions snapshot"),
144 Err(err) => tracing::error!(%err, "stepfunctions snapshot task panicked"),
145 }
146 }
147}
148
149fn is_mutating_action(action: &str) -> bool {
150 matches!(
151 action,
152 "CreateStateMachine"
153 | "DeleteStateMachine"
154 | "UpdateStateMachine"
155 | "TagResource"
156 | "UntagResource"
157 | "StartExecution"
158 | "StopExecution"
159 | "CreateActivity"
160 | "DeleteActivity"
161 | "GetActivityTask"
162 | "SendTaskFailure"
163 | "SendTaskHeartbeat"
164 | "SendTaskSuccess"
165 | "PublishStateMachineVersion"
166 | "DeleteStateMachineVersion"
167 | "CreateStateMachineAlias"
168 | "DeleteStateMachineAlias"
169 | "UpdateStateMachineAlias"
170 | "UpdateMapRun"
171 | "RedriveExecution"
172 | "StartSyncExecution"
173 )
174}
175
176#[async_trait]
177impl AwsService for StepFunctionsService {
178 fn service_name(&self) -> &str {
179 "states"
180 }
181
182 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
183 let mutates = is_mutating_action(req.action.as_str());
184 let result = match req.action.as_str() {
185 "CreateStateMachine" => self.create_state_machine(&req),
186 "DescribeStateMachine" => self.describe_state_machine(&req),
187 "ListStateMachines" => self.list_state_machines(&req),
188 "DeleteStateMachine" => self.delete_state_machine(&req),
189 "UpdateStateMachine" => self.update_state_machine(&req),
190 "TagResource" => self.tag_resource(&req),
191 "UntagResource" => self.untag_resource(&req),
192 "ListTagsForResource" => self.list_tags_for_resource(&req),
193 "StartExecution" => self.start_execution(&req),
194 "StopExecution" => self.stop_execution(&req),
195 "DescribeExecution" => self.describe_execution(&req),
196 "ListExecutions" => self.list_executions(&req),
197 "GetExecutionHistory" => self.get_execution_history(&req),
198 "DescribeStateMachineForExecution" => self.describe_state_machine_for_execution(&req),
199 "CreateActivity" => self.create_activity(&req),
200 "DeleteActivity" => self.delete_activity(&req),
201 "DescribeActivity" => self.describe_activity(&req),
202 "ListActivities" => self.list_activities(&req),
203 "GetActivityTask" => self.get_activity_task(&req).await,
204 "SendTaskFailure" => self.send_task_failure(&req),
205 "SendTaskHeartbeat" => self.send_task_heartbeat(&req),
206 "SendTaskSuccess" => self.send_task_success(&req),
207 "PublishStateMachineVersion" => self.publish_state_machine_version(&req),
208 "DeleteStateMachineVersion" => self.delete_state_machine_version(&req),
209 "ListStateMachineVersions" => self.list_state_machine_versions(&req),
210 "CreateStateMachineAlias" => self.create_state_machine_alias(&req),
211 "DeleteStateMachineAlias" => self.delete_state_machine_alias(&req),
212 "DescribeStateMachineAlias" => self.describe_state_machine_alias(&req),
213 "ListStateMachineAliases" => self.list_state_machine_aliases(&req),
214 "UpdateStateMachineAlias" => self.update_state_machine_alias(&req),
215 "DescribeMapRun" => self.describe_map_run(&req),
216 "ListMapRuns" => self.list_map_runs(&req),
217 "UpdateMapRun" => self.update_map_run(&req),
218 "RedriveExecution" => self.redrive_execution(&req),
219 "StartSyncExecution" => self.start_sync_execution(&req).await,
220 "TestState" => self.test_state(&req),
221 "ValidateStateMachineDefinition" => self.validate_state_machine_definition(&req),
222 _ => Err(AwsServiceError::action_not_implemented(
223 "states",
224 &req.action,
225 )),
226 };
227 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
228 self.save_snapshot().await;
229 }
230 result
231 }
232
233 fn supported_actions(&self) -> &[&str] {
234 SUPPORTED
235 }
236}
237
238impl StepFunctionsService {
239 fn list_activities(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
240 let body = req.json_body();
241 let raw_max_results = body["maxResults"].as_i64();
242 if let Some(mr) = raw_max_results {
243 validate_max_results(mr)?;
244 }
245 let next_token = body["nextToken"].as_str();
246 if let Some(t) = next_token {
247 validate_page_token(t)?;
248 }
249 let max_results = match raw_max_results.unwrap_or(0) {
253 0 => 100,
254 n => n as usize,
255 };
256 let accounts = self.state.read();
257 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
258 let state = accounts.get(&req.account_id).unwrap_or(&empty);
259 let mut activities: Vec<&crate::state::Activity> = state.activities.values().collect();
260 activities.sort_by(|a, b| a.name.cmp(&b.name));
261 let items: Vec<Value> = activities
262 .iter()
263 .map(|a| {
264 json!({
265 "activityArn": a.arn,
266 "name": a.name,
267 "creationDate": a.creation_date.timestamp(),
268 })
269 })
270 .collect();
271 let (page, token) =
272 paginate_checked(&items, next_token, max_results).map_err(|_| invalid_token())?;
273 let mut resp = json!({ "activities": page });
274 if let Some(t) = token {
275 resp["nextToken"] = json!(t);
276 }
277 Ok(AwsResponse::ok_json(resp))
278 }
279}
280
281fn state_machine_alias_to_json(alias: &crate::state::StateMachineAlias) -> Value {
282 json!({
283 "stateMachineAliasArn": alias.arn,
284 "name": alias.name,
285 "description": alias.description,
286 "routingConfiguration": alias.routing_configuration.iter().map(|r| json!({
287 "stateMachineVersionArn": r.state_machine_version_arn,
288 "weight": r.weight,
289 })).collect::<Vec<_>>(),
290 "creationDate": alias.creation_date.timestamp(),
291 "updateDate": alias.update_date.timestamp(),
292 })
293}
294
295fn map_run_to_json(mr: &crate::state::MapRun) -> Value {
296 json!({
297 "mapRunArn": mr.map_run_arn,
298 "executionArn": mr.execution_arn,
299 "maxConcurrency": mr.max_concurrency,
300 "toleratedFailurePercentage": mr.tolerated_failure_percentage,
301 "toleratedFailureCount": mr.tolerated_failure_count,
302 "status": mr.status,
303 "startDate": mr.start_date.timestamp(),
304 "stopDate": mr.stop_date.map(|d| d.timestamp()),
305 })
306}
307
308fn state_machine_to_json(sm: &StateMachine) -> Value {
311 let mut resp = json!({
312 "name": sm.name,
313 "stateMachineArn": sm.arn,
314 "definition": sm.definition,
315 "roleArn": sm.role_arn,
316 "type": sm.machine_type.as_str(),
317 "status": sm.status.as_str(),
318 "creationDate": sm.creation_date.timestamp() as f64,
319 "updateDate": sm.update_date.timestamp() as f64,
320 "revisionId": sm.revision_id,
321 "label": sm.name,
322 });
323
324 if !sm.description.is_empty() {
325 resp["description"] = json!(sm.description);
326 }
327
328 if let Some(ref logging) = sm.logging_configuration {
329 resp["loggingConfiguration"] = logging.clone();
330 } else {
331 resp["loggingConfiguration"] = json!({
332 "level": "OFF",
333 "includeExecutionData": false,
334 "destinations": [],
335 });
336 }
337
338 if let Some(ref tracing) = sm.tracing_configuration {
339 resp["tracingConfiguration"] = tracing.clone();
340 } else {
341 resp["tracingConfiguration"] = json!({
342 "enabled": false,
343 });
344 }
345
346 resp
347}
348
349fn missing(name: &str) -> AwsServiceError {
350 AwsServiceError::aws_error(
351 StatusCode::BAD_REQUEST,
352 "ValidationException",
353 format!("The request must contain the parameter {name}."),
354 )
355}
356
357fn state_machine_not_found(arn: &str) -> AwsServiceError {
358 AwsServiceError::aws_error(
359 StatusCode::BAD_REQUEST,
360 "StateMachineDoesNotExist",
361 format!("State Machine Does Not Exist: '{arn}'"),
362 )
363}
364
365fn activity_not_found(arn: &str) -> AwsServiceError {
366 AwsServiceError::aws_error(
367 StatusCode::BAD_REQUEST,
368 "ActivityDoesNotExist",
369 format!("Activity does not exist: {arn}"),
370 )
371}
372
373fn task_does_not_exist(token: &str) -> AwsServiceError {
374 AwsServiceError::aws_error(
375 StatusCode::BAD_REQUEST,
376 "TaskDoesNotExist",
377 format!("Task does not exist: {token}"),
378 )
379}
380
381fn resource_not_found(arn: &str) -> AwsServiceError {
382 AwsServiceError::aws_error(
383 StatusCode::BAD_REQUEST,
384 "ResourceNotFound",
385 format!("Resource not found: '{arn}'"),
386 )
387}
388
389fn parse_routing_configuration(
394 routes: &[serde_json::Value],
395) -> Result<Vec<crate::state::AliasRoute>, AwsServiceError> {
396 if routes.is_empty() || routes.len() > 2 {
397 return Err(AwsServiceError::aws_error(
398 StatusCode::BAD_REQUEST,
399 "ValidationException",
400 "routingConfiguration must contain 1 or 2 routes.",
401 ));
402 }
403 let parsed: Vec<crate::state::AliasRoute> = routes
404 .iter()
405 .map(|r| {
406 let arn = r["stateMachineVersionArn"].as_str().ok_or_else(|| {
407 AwsServiceError::aws_error(
408 StatusCode::BAD_REQUEST,
409 "ValidationException",
410 "routingConfiguration entries must contain stateMachineVersionArn.",
411 )
412 })?;
413 let weight = r["weight"].as_i64().ok_or_else(|| {
414 AwsServiceError::aws_error(
415 StatusCode::BAD_REQUEST,
416 "ValidationException",
417 "routingConfiguration entries must contain a numeric weight.",
418 )
419 })?;
420 if !(0..=100).contains(&weight) {
421 return Err(AwsServiceError::aws_error(
422 StatusCode::BAD_REQUEST,
423 "ValidationException",
424 format!("Invalid routing weight {weight}; must be 0-100."),
425 ));
426 }
427 Ok(crate::state::AliasRoute {
428 state_machine_version_arn: arn.to_string(),
429 weight: weight as i32,
430 })
431 })
432 .collect::<Result<_, _>>()?;
433 let total: i32 = parsed.iter().map(|r| r.weight).sum();
434 if total != 100 {
435 return Err(AwsServiceError::aws_error(
436 StatusCode::BAD_REQUEST,
437 "ValidationException",
438 format!("routingConfiguration weights must sum to 100, got {total}."),
439 ));
440 }
441 Ok(parsed)
442}
443
444fn validate_name(name: &str) -> Result<(), AwsServiceError> {
445 if name.is_empty() || name.len() > 80 {
446 return Err(AwsServiceError::aws_error(
447 StatusCode::BAD_REQUEST,
448 "InvalidName",
449 format!("Invalid Name: '{name}' (length must be between 1 and 80 characters)"),
450 ));
451 }
452 if !name
454 .chars()
455 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
456 {
457 return Err(AwsServiceError::aws_error(
458 StatusCode::BAD_REQUEST,
459 "InvalidName",
460 format!(
461 "Invalid Name: '{name}' (must only contain alphanumeric characters, hyphens, and underscores)"
462 ),
463 ));
464 }
465 Ok(())
466}
467
468fn validate_definition(definition: &str) -> Result<(), AwsServiceError> {
469 let parsed: Value = serde_json::from_str(definition).map_err(|e| {
470 AwsServiceError::aws_error(
471 StatusCode::BAD_REQUEST,
472 "InvalidDefinition",
473 format!("Invalid State Machine Definition: '{e}'"),
474 )
475 })?;
476
477 if parsed.get("StartAt").and_then(|v| v.as_str()).is_none() {
478 return Err(AwsServiceError::aws_error(
479 StatusCode::BAD_REQUEST,
480 "InvalidDefinition",
481 "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
482 .to_string(),
483 ));
484 }
485
486 let states_obj = parsed
487 .get("States")
488 .and_then(|v| v.as_object())
489 .ok_or_else(|| {
490 AwsServiceError::aws_error(
491 StatusCode::BAD_REQUEST,
492 "InvalidDefinition",
493 "Invalid State Machine Definition: 'MISSING_STATES' (States field is required)"
494 .to_string(),
495 )
496 })?;
497
498 let start_at = parsed["StartAt"].as_str().ok_or_else(|| {
499 AwsServiceError::aws_error(
500 StatusCode::BAD_REQUEST,
501 "InvalidDefinition",
502 "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
503 .to_string(),
504 )
505 })?;
506 if !states_obj.contains_key(start_at) {
507 return Err(AwsServiceError::aws_error(
508 StatusCode::BAD_REQUEST,
509 "InvalidDefinition",
510 format!(
511 "Invalid State Machine Definition: 'MISSING_TRANSITION_TARGET' \
512 (StartAt '{start_at}' does not reference a valid state)"
513 ),
514 ));
515 }
516
517 Ok(())
518}
519
520fn execution_not_found(arn: &str) -> AwsServiceError {
521 AwsServiceError::aws_error(
522 StatusCode::BAD_REQUEST,
523 "ExecutionDoesNotExist",
524 format!("Execution Does Not Exist: '{arn}'"),
525 )
526}
527
528fn execution_to_json(exec: &Execution) -> Value {
529 let mut resp = json!({
530 "executionArn": exec.execution_arn,
531 "stateMachineArn": exec.state_machine_arn,
532 "name": exec.name,
533 "status": exec.status.as_str(),
534 "startDate": exec.start_date.timestamp() as f64,
535 });
536
537 if let Some(ref input) = exec.input {
538 resp["input"] = json!(input);
539 }
540 if let Some(ref output) = exec.output {
541 resp["output"] = json!(output);
542 }
543 if let Some(stop) = exec.stop_date {
544 resp["stopDate"] = json!(stop.timestamp() as f64);
545 }
546 if let Some(ref error) = exec.error {
547 resp["error"] = json!(error);
548 }
549 if let Some(ref cause) = exec.cause {
550 resp["cause"] = json!(cause);
551 }
552
553 resp
554}
555
556fn camel_to_details_key(event_type: &str) -> String {
558 let mut chars = event_type.chars();
559 match chars.next() {
560 None => String::new(),
561 Some(c) => c.to_lowercase().to_string() + chars.as_str(),
562 }
563}
564
565fn validate_arn(arn: &str) -> Result<(), AwsServiceError> {
566 if !arn.starts_with("arn:") {
567 return Err(AwsServiceError::aws_error(
568 StatusCode::BAD_REQUEST,
569 "InvalidArn",
570 format!("Invalid Arn: '{arn}'"),
571 ));
572 }
573 Ok(())
574}
575
576fn validate_arn_length(field: &str, value: &str, max: usize) -> Result<(), AwsServiceError> {
580 if value.is_empty() || value.len() > max {
581 return Err(AwsServiceError::aws_error(
582 StatusCode::BAD_REQUEST,
583 "InvalidArn",
584 format!("Invalid Arn at '{field}': must be 1..={max} characters"),
585 ));
586 }
587 Ok(())
588}
589
590pub(super) fn invalid_token() -> AwsServiceError {
594 AwsServiceError::aws_error(StatusCode::BAD_REQUEST, "InvalidToken", "Invalid nextToken")
595}
596
597fn validate_page_token(value: &str) -> Result<(), AwsServiceError> {
601 if value.is_empty() || value.len() > 1024 {
602 return Err(AwsServiceError::aws_error(
603 StatusCode::BAD_REQUEST,
604 "InvalidToken",
605 "nextToken must be 1..=1024 characters",
606 ));
607 }
608 Ok(())
609}
610
611fn validate_max_results(value: i64) -> Result<(), AwsServiceError> {
617 if !(0..=1000).contains(&value) {
618 return Err(AwsServiceError::aws_error(
619 StatusCode::BAD_REQUEST,
620 "InvalidToken",
621 format!("maxResults '{value}' is outside 0..=1000"),
622 ));
623 }
624 Ok(())
625}
626
627pub fn start_execution_from_delivery(
636 state: &SharedStepFunctionsState,
637 delivery: &Option<Arc<DeliveryBus>>,
638 dynamodb_state: &Option<SharedDynamoDbState>,
639 registry: &Option<SharedServiceRegistry>,
640 state_machine_arn: &str,
641 input: &str,
642) {
643 if serde_json::from_str::<serde_json::Value>(input).is_err() {
645 tracing::warn!(
646 state_machine_arn,
647 "Step Functions delivery: invalid JSON input, skipping execution"
648 );
649 return;
650 }
651
652 let execution_name = uuid::Uuid::new_v4().to_string();
653
654 let account_id = state_machine_arn
656 .split(':')
657 .nth(4)
658 .unwrap_or("000000000000")
659 .to_string();
660
661 let mut accounts = state.write();
662 let st = accounts.get_or_create(&account_id);
663 let sm = match st.state_machines.get(state_machine_arn) {
664 Some(sm) => sm,
665 None => {
666 tracing::warn!(
667 state_machine_arn,
668 "Step Functions delivery: state machine not found"
669 );
670 return;
671 }
672 };
673
674 let sm_name = sm.name.clone();
675 let definition = sm.definition.clone();
676 let exec_arn = st.execution_arn(&sm_name, &execution_name);
677
678 let now = Utc::now();
679 let execution = Execution {
680 execution_arn: exec_arn.clone(),
681 state_machine_arn: state_machine_arn.to_string(),
682 state_machine_name: sm_name,
683 name: execution_name,
684 status: ExecutionStatus::Running,
685 input: Some(input.to_string()),
686 output: None,
687 start_date: now,
688 stop_date: None,
689 error: None,
690 cause: None,
691 history_events: vec![],
692 parent_execution_arn: None,
693 is_sync: false,
694 billed_duration_ms: None,
695 billed_memory_mb: None,
696 };
697
698 st.executions.insert(exec_arn.clone(), execution);
699 let logging_config = sm.logging_configuration.clone();
700 drop(accounts);
701
702 let shared_state = state.clone();
703 let delivery = delivery.clone();
704 let dynamodb_state = dynamodb_state.clone();
705 let registry = registry.clone();
706 let input = Some(input.to_string());
707 tokio::spawn(async move {
708 interpreter::execute_state_machine(
709 shared_state,
710 exec_arn,
711 definition,
712 input,
713 delivery,
714 dynamodb_state,
715 registry,
716 logging_config,
717 )
718 .await;
719 });
720}
721
722#[cfg(test)]
723mod tests {
724 use super::*;
725 use http::{HeaderMap, Method};
726 use parking_lot::RwLock;
727 use serde_json::Value;
728 use std::collections::HashMap;
729 use std::sync::Arc;
730
731 fn make_state() -> SharedStepFunctionsState {
732 Arc::new(RwLock::new(
733 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
734 ))
735 }
736
737 fn make_request(action: &str, body: &str) -> AwsRequest {
738 AwsRequest {
739 service: "states".to_string(),
740 action: action.to_string(),
741 region: "us-east-1".to_string(),
742 account_id: "123456789012".to_string(),
743 request_id: "test-id".to_string(),
744 headers: HeaderMap::new(),
745 query_params: HashMap::new(),
746 body: body.as_bytes().to_vec().into(),
747 body_stream: parking_lot::Mutex::new(None),
748 path_segments: vec![],
749 raw_path: "/".to_string(),
750 raw_query: String::new(),
751 method: Method::POST,
752 is_query_protocol: false,
753 access_key_id: None,
754 principal: None,
755 }
756 }
757
758 fn body_json(resp: &AwsResponse) -> Value {
759 serde_json::from_slice(resp.body.expect_bytes()).unwrap()
760 }
761
762 fn expect_err(result: Result<AwsResponse, AwsServiceError>) -> AwsServiceError {
763 match result {
764 Err(e) => e,
765 Ok(_) => panic!("expected error, got Ok"),
766 }
767 }
768
769 const VALID_DEF: &str = r#"{"StartAt":"Pass","States":{"Pass":{"Type":"Pass","End":true}}}"#;
770
771 fn create_sm(svc: &StepFunctionsService, name: &str) -> String {
772 let body = json!({
773 "name": name,
774 "definition": VALID_DEF,
775 "roleArn": "arn:aws:iam::123456789012:role/test",
776 });
777 let req = make_request("CreateStateMachine", &body.to_string());
778 let resp = svc.create_state_machine(&req).unwrap();
779 let b = body_json(&resp);
780 b["stateMachineArn"].as_str().unwrap().to_string()
781 }
782
783 #[test]
786 fn create_state_machine_basic() {
787 let svc = StepFunctionsService::new(make_state());
788 let arn = create_sm(&svc, "test-sm");
789 assert!(arn.contains("test-sm"));
790 }
791
792 #[test]
793 fn create_state_machine_with_express_type() {
794 let svc = StepFunctionsService::new(make_state());
795 let body = json!({
796 "name": "express-sm",
797 "definition": VALID_DEF,
798 "roleArn": "arn:aws:iam::123456789012:role/r",
799 "type": "EXPRESS",
800 });
801 let req = make_request("CreateStateMachine", &body.to_string());
802 let resp = svc.create_state_machine(&req).unwrap();
803 let b = body_json(&resp);
804 assert!(b["stateMachineArn"].as_str().is_some());
805 }
806
807 #[test]
808 fn create_state_machine_duplicate_fails() {
809 let svc = StepFunctionsService::new(make_state());
810 create_sm(&svc, "dup-sm");
811 let body = json!({
812 "name": "dup-sm",
813 "definition": VALID_DEF,
814 "roleArn": "arn:aws:iam::123456789012:role/r",
815 });
816 let req = make_request("CreateStateMachine", &body.to_string());
817 let err = expect_err(svc.create_state_machine(&req));
818 assert!(err.to_string().contains("StateMachineAlreadyExists"));
819 }
820
821 #[test]
822 fn create_state_machine_missing_name() {
823 let svc = StepFunctionsService::new(make_state());
824 let body = json!({
825 "definition": VALID_DEF,
826 "roleArn": "arn:aws:iam::123456789012:role/r",
827 });
828 let req = make_request("CreateStateMachine", &body.to_string());
829 assert!(svc.create_state_machine(&req).is_err());
830 }
831
832 #[test]
833 fn create_state_machine_invalid_definition() {
834 let svc = StepFunctionsService::new(make_state());
835 let body = json!({
836 "name": "bad-def",
837 "definition": "not json",
838 "roleArn": "arn:aws:iam::123456789012:role/r",
839 });
840 let req = make_request("CreateStateMachine", &body.to_string());
841 let err = expect_err(svc.create_state_machine(&req));
842 assert!(err.to_string().contains("InvalidDefinition"));
843 }
844
845 #[test]
846 fn create_state_machine_definition_missing_start_at() {
847 let svc = StepFunctionsService::new(make_state());
848 let body = json!({
849 "name": "no-start",
850 "definition": r#"{"States":{"S":{"Type":"Pass","End":true}}}"#,
851 "roleArn": "arn:aws:iam::123456789012:role/r",
852 });
853 let req = make_request("CreateStateMachine", &body.to_string());
854 let err = expect_err(svc.create_state_machine(&req));
855 assert!(err.to_string().contains("InvalidDefinition"));
856 }
857
858 #[test]
859 fn create_state_machine_definition_missing_states() {
860 let svc = StepFunctionsService::new(make_state());
861 let body = json!({
862 "name": "no-states",
863 "definition": r#"{"StartAt":"S"}"#,
864 "roleArn": "arn:aws:iam::123456789012:role/r",
865 });
866 let req = make_request("CreateStateMachine", &body.to_string());
867 let err = expect_err(svc.create_state_machine(&req));
868 assert!(err.to_string().contains("InvalidDefinition"));
869 }
870
871 #[test]
872 fn create_state_machine_definition_start_at_not_in_states() {
873 let svc = StepFunctionsService::new(make_state());
874 let body = json!({
875 "name": "bad-start",
876 "definition": r#"{"StartAt":"Missing","States":{"S":{"Type":"Pass","End":true}}}"#,
877 "roleArn": "arn:aws:iam::123456789012:role/r",
878 });
879 let req = make_request("CreateStateMachine", &body.to_string());
880 let err = expect_err(svc.create_state_machine(&req));
881 assert!(err.to_string().contains("MISSING_TRANSITION_TARGET"));
882 }
883
884 #[test]
885 fn create_state_machine_invalid_type() {
886 let svc = StepFunctionsService::new(make_state());
887 let body = json!({
888 "name": "bad-type",
889 "definition": VALID_DEF,
890 "roleArn": "arn:aws:iam::123456789012:role/r",
891 "type": "INVALID",
892 });
893 let req = make_request("CreateStateMachine", &body.to_string());
894 assert!(svc.create_state_machine(&req).is_err());
895 }
896
897 #[test]
898 fn create_state_machine_invalid_arn() {
899 let svc = StepFunctionsService::new(make_state());
900 let body = json!({
901 "name": "bad-arn",
902 "definition": VALID_DEF,
903 "roleArn": "not-an-arn",
904 });
905 let req = make_request("CreateStateMachine", &body.to_string());
906 let err = expect_err(svc.create_state_machine(&req));
907 assert!(err.to_string().contains("InvalidArn"));
908 }
909
910 #[test]
911 fn create_state_machine_invalid_name() {
912 let svc = StepFunctionsService::new(make_state());
913 let body = json!({
914 "name": "has spaces!",
915 "definition": VALID_DEF,
916 "roleArn": "arn:aws:iam::123456789012:role/r",
917 });
918 let req = make_request("CreateStateMachine", &body.to_string());
919 let err = expect_err(svc.create_state_machine(&req));
920 assert!(err.to_string().contains("InvalidName"));
921 }
922
923 #[test]
924 fn create_state_machine_name_too_long() {
925 let svc = StepFunctionsService::new(make_state());
926 let long_name = "a".repeat(81);
927 let body = json!({
928 "name": long_name,
929 "definition": VALID_DEF,
930 "roleArn": "arn:aws:iam::123456789012:role/r",
931 });
932 let req = make_request("CreateStateMachine", &body.to_string());
933 let err = expect_err(svc.create_state_machine(&req));
934 assert!(err.to_string().contains("InvalidName"));
935 }
936
937 #[test]
940 fn describe_state_machine_found() {
941 let svc = StepFunctionsService::new(make_state());
942 let arn = create_sm(&svc, "desc-sm");
943
944 let req = make_request(
945 "DescribeStateMachine",
946 &json!({"stateMachineArn": arn}).to_string(),
947 );
948 let resp = svc.describe_state_machine(&req).unwrap();
949 let b = body_json(&resp);
950 assert_eq!(b["name"], "desc-sm");
951 assert_eq!(b["status"], "ACTIVE");
952 assert!(b["definition"].as_str().is_some());
953 }
954
955 #[test]
956 fn describe_state_machine_not_found() {
957 let svc = StepFunctionsService::new(make_state());
958 let req = make_request(
959 "DescribeStateMachine",
960 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
961 .to_string(),
962 );
963 let err = expect_err(svc.describe_state_machine(&req));
964 assert!(err.to_string().contains("StateMachineDoesNotExist"));
965 }
966
967 #[test]
970 fn list_state_machines_empty() {
971 let svc = StepFunctionsService::new(make_state());
972 let req = make_request("ListStateMachines", "{}");
973 let resp = svc.list_state_machines(&req).unwrap();
974 let b = body_json(&resp);
975 assert!(b["stateMachines"].as_array().unwrap().is_empty());
976 }
977
978 #[test]
979 fn list_state_machines_returns_created() {
980 let svc = StepFunctionsService::new(make_state());
981 create_sm(&svc, "sm-1");
982 create_sm(&svc, "sm-2");
983
984 let req = make_request("ListStateMachines", "{}");
985 let resp = svc.list_state_machines(&req).unwrap();
986 let b = body_json(&resp);
987 assert_eq!(b["stateMachines"].as_array().unwrap().len(), 2);
988 }
989
990 #[test]
993 fn delete_state_machine() {
994 let svc = StepFunctionsService::new(make_state());
995 let arn = create_sm(&svc, "del-sm");
996
997 let req = make_request(
998 "DeleteStateMachine",
999 &json!({"stateMachineArn": arn}).to_string(),
1000 );
1001 svc.delete_state_machine(&req).unwrap();
1002
1003 let req = make_request(
1005 "DescribeStateMachine",
1006 &json!({"stateMachineArn": arn}).to_string(),
1007 );
1008 assert!(svc.describe_state_machine(&req).is_err());
1009 }
1010
1011 #[test]
1012 fn delete_state_machine_nonexistent_succeeds() {
1013 let svc = StepFunctionsService::new(make_state());
1014 let req = make_request(
1015 "DeleteStateMachine",
1016 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1017 .to_string(),
1018 );
1019 svc.delete_state_machine(&req).unwrap();
1021 }
1022
1023 #[test]
1026 fn update_state_machine() {
1027 let svc = StepFunctionsService::new(make_state());
1028 let arn = create_sm(&svc, "upd-sm");
1029
1030 let new_def = r#"{"StartAt":"NewPass","States":{"NewPass":{"Type":"Pass","End":true}}}"#;
1031 let body = json!({
1032 "stateMachineArn": arn,
1033 "definition": new_def,
1034 "description": "updated",
1035 });
1036 let req = make_request("UpdateStateMachine", &body.to_string());
1037 let resp = svc.update_state_machine(&req).unwrap();
1038 let b = body_json(&resp);
1039 assert!(b["updateDate"].as_f64().is_some());
1040
1041 let req = make_request(
1043 "DescribeStateMachine",
1044 &json!({"stateMachineArn": arn}).to_string(),
1045 );
1046 let resp = svc.describe_state_machine(&req).unwrap();
1047 let b = body_json(&resp);
1048 assert!(b["definition"].as_str().unwrap().contains("NewPass"));
1049 assert_eq!(b["description"], "updated");
1050 }
1051
1052 #[test]
1053 fn update_state_machine_not_found() {
1054 let svc = StepFunctionsService::new(make_state());
1055 let body = json!({
1056 "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1057 "definition": VALID_DEF,
1058 });
1059 let req = make_request("UpdateStateMachine", &body.to_string());
1060 let err = expect_err(svc.update_state_machine(&req));
1061 assert!(err.to_string().contains("StateMachineDoesNotExist"));
1062 }
1063
1064 #[tokio::test]
1067 async fn start_execution_basic() {
1068 let svc = StepFunctionsService::new(make_state());
1069 let arn = create_sm(&svc, "exec-sm");
1070
1071 let body = json!({
1072 "stateMachineArn": arn,
1073 "input": r#"{"key":"value"}"#,
1074 });
1075 let req = make_request("StartExecution", &body.to_string());
1076 let resp = svc.start_execution(&req).unwrap();
1077 let b = body_json(&resp);
1078 assert!(b["executionArn"].as_str().is_some());
1079 assert!(b["startDate"].as_f64().is_some());
1080 }
1081
1082 #[tokio::test]
1083 async fn start_execution_with_name() {
1084 let svc = StepFunctionsService::new(make_state());
1085 let arn = create_sm(&svc, "named-exec");
1086
1087 let body = json!({
1088 "stateMachineArn": arn,
1089 "name": "my-execution",
1090 });
1091 let req = make_request("StartExecution", &body.to_string());
1092 let resp = svc.start_execution(&req).unwrap();
1093 let b = body_json(&resp);
1094 assert!(b["executionArn"].as_str().unwrap().contains("my-execution"));
1095 }
1096
1097 #[tokio::test]
1098 async fn start_execution_sm_not_found() {
1099 let svc = StepFunctionsService::new(make_state());
1100 let body = json!({
1101 "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1102 });
1103 let req = make_request("StartExecution", &body.to_string());
1104 let err = expect_err(svc.start_execution(&req));
1105 assert!(err.to_string().contains("StateMachineDoesNotExist"));
1106 }
1107
1108 #[tokio::test]
1109 async fn start_execution_invalid_input() {
1110 let svc = StepFunctionsService::new(make_state());
1111 let arn = create_sm(&svc, "bad-input");
1112
1113 let body = json!({
1114 "stateMachineArn": arn,
1115 "input": "not json",
1116 });
1117 let req = make_request("StartExecution", &body.to_string());
1118 let err = expect_err(svc.start_execution(&req));
1119 assert!(err.to_string().contains("InvalidExecutionInput"));
1120 }
1121
1122 #[tokio::test]
1123 async fn start_execution_duplicate_name() {
1124 let svc = StepFunctionsService::new(make_state());
1125 let arn = create_sm(&svc, "dup-exec");
1126
1127 let body = json!({
1128 "stateMachineArn": arn,
1129 "name": "same-name",
1130 });
1131 let req = make_request("StartExecution", &body.to_string());
1132 svc.start_execution(&req).unwrap();
1133
1134 let req = make_request("StartExecution", &body.to_string());
1135 let err = expect_err(svc.start_execution(&req));
1136 assert!(err.to_string().contains("ExecutionAlreadyExists"));
1137 }
1138
1139 #[tokio::test]
1142 async fn describe_execution_found() {
1143 let svc = StepFunctionsService::new(make_state());
1144 let sm_arn = create_sm(&svc, "desc-exec");
1145
1146 let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
1147 let req = make_request("StartExecution", &body.to_string());
1148 let resp = svc.start_execution(&req).unwrap();
1149 let exec_arn = body_json(&resp)["executionArn"]
1150 .as_str()
1151 .unwrap()
1152 .to_string();
1153
1154 let req = make_request(
1155 "DescribeExecution",
1156 &json!({"executionArn": exec_arn}).to_string(),
1157 );
1158 let resp = svc.describe_execution(&req).unwrap();
1159 let b = body_json(&resp);
1160 assert_eq!(b["name"], "e1");
1161 assert_eq!(b["status"], "RUNNING");
1162 }
1163
1164 #[tokio::test]
1165 async fn describe_execution_not_found() {
1166 let svc = StepFunctionsService::new(make_state());
1167 let req = make_request(
1168 "DescribeExecution",
1169 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1170 .to_string(),
1171 );
1172 let err = expect_err(svc.describe_execution(&req));
1173 assert!(err.to_string().contains("ExecutionDoesNotExist"));
1174 }
1175
1176 #[tokio::test]
1179 async fn stop_execution() {
1180 let svc = StepFunctionsService::new(make_state());
1181 let sm_arn = create_sm(&svc, "stop-sm");
1182
1183 let body = json!({"stateMachineArn": sm_arn, "name": "stop-e"});
1184 let req = make_request("StartExecution", &body.to_string());
1185 let resp = svc.start_execution(&req).unwrap();
1186 let exec_arn = body_json(&resp)["executionArn"]
1187 .as_str()
1188 .unwrap()
1189 .to_string();
1190
1191 let body = json!({
1192 "executionArn": exec_arn,
1193 "error": "UserAborted",
1194 "cause": "test stop",
1195 });
1196 let req = make_request("StopExecution", &body.to_string());
1197 let resp = svc.stop_execution(&req).unwrap();
1198 let b = body_json(&resp);
1199 assert!(b["stopDate"].as_f64().is_some());
1200
1201 let req = make_request(
1203 "DescribeExecution",
1204 &json!({"executionArn": exec_arn}).to_string(),
1205 );
1206 let resp = svc.describe_execution(&req).unwrap();
1207 let b = body_json(&resp);
1208 assert_eq!(b["status"], "ABORTED");
1209 assert_eq!(b["error"], "UserAborted");
1210 }
1211
1212 #[tokio::test]
1213 async fn stop_execution_not_found() {
1214 let svc = StepFunctionsService::new(make_state());
1215 let req = make_request(
1216 "StopExecution",
1217 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1218 .to_string(),
1219 );
1220 let err = expect_err(svc.stop_execution(&req));
1221 assert!(err.to_string().contains("ExecutionDoesNotExist"));
1222 }
1223
1224 #[tokio::test]
1227 async fn list_executions() {
1228 let svc = StepFunctionsService::new(make_state());
1229 let sm_arn = create_sm(&svc, "list-exec");
1230
1231 for i in 0..3 {
1232 let body = json!({"stateMachineArn": sm_arn, "name": format!("e{i}")});
1233 let req = make_request("StartExecution", &body.to_string());
1234 svc.start_execution(&req).unwrap();
1235 }
1236
1237 let req = make_request(
1238 "ListExecutions",
1239 &json!({"stateMachineArn": sm_arn}).to_string(),
1240 );
1241 let resp = svc.list_executions(&req).unwrap();
1242 let b = body_json(&resp);
1243 assert_eq!(b["executions"].as_array().unwrap().len(), 3);
1244 }
1245
1246 #[tokio::test]
1247 async fn list_executions_sm_not_found() {
1248 let svc = StepFunctionsService::new(make_state());
1249 let req = make_request(
1250 "ListExecutions",
1251 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1252 .to_string(),
1253 );
1254 let err = expect_err(svc.list_executions(&req));
1255 assert!(err.to_string().contains("StateMachineDoesNotExist"));
1256 }
1257
1258 #[tokio::test]
1261 async fn get_execution_history_not_found() {
1262 let svc = StepFunctionsService::new(make_state());
1263 let req = make_request(
1264 "GetExecutionHistory",
1265 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1266 .to_string(),
1267 );
1268 let err = expect_err(svc.get_execution_history(&req));
1269 assert!(err.to_string().contains("ExecutionDoesNotExist"));
1270 }
1271
1272 #[tokio::test]
1275 async fn describe_sm_for_execution() {
1276 let svc = StepFunctionsService::new(make_state());
1277 let sm_arn = create_sm(&svc, "sm-for-exec");
1278
1279 let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
1280 let req = make_request("StartExecution", &body.to_string());
1281 let resp = svc.start_execution(&req).unwrap();
1282 let exec_arn = body_json(&resp)["executionArn"]
1283 .as_str()
1284 .unwrap()
1285 .to_string();
1286
1287 let req = make_request(
1288 "DescribeStateMachineForExecution",
1289 &json!({"executionArn": exec_arn}).to_string(),
1290 );
1291 let resp = svc.describe_state_machine_for_execution(&req).unwrap();
1292 let b = body_json(&resp);
1293 assert_eq!(b["name"], "sm-for-exec");
1294 }
1295
1296 #[test]
1299 fn tag_untag_list_tags() {
1300 let svc = StepFunctionsService::new(make_state());
1301 let arn = create_sm(&svc, "tagged-sm");
1302
1303 let body = json!({
1305 "resourceArn": arn,
1306 "tags": [{"key": "env", "value": "prod"}],
1307 });
1308 let req = make_request("TagResource", &body.to_string());
1309 svc.tag_resource(&req).unwrap();
1310
1311 let req = make_request(
1313 "ListTagsForResource",
1314 &json!({"resourceArn": arn}).to_string(),
1315 );
1316 let resp = svc.list_tags_for_resource(&req).unwrap();
1317 let b = body_json(&resp);
1318 let tags = b["tags"].as_array().unwrap();
1319 assert_eq!(tags.len(), 1);
1320 assert_eq!(tags[0]["key"], "env");
1321
1322 let body = json!({
1324 "resourceArn": arn,
1325 "tagKeys": ["env"],
1326 });
1327 let req = make_request("UntagResource", &body.to_string());
1328 svc.untag_resource(&req).unwrap();
1329
1330 let req = make_request(
1332 "ListTagsForResource",
1333 &json!({"resourceArn": arn}).to_string(),
1334 );
1335 let resp = svc.list_tags_for_resource(&req).unwrap();
1336 let b = body_json(&resp);
1337 assert!(b["tags"].as_array().unwrap().is_empty());
1338 }
1339
1340 #[test]
1341 fn tag_resource_not_found() {
1342 let svc = StepFunctionsService::new(make_state());
1343 let body = json!({
1344 "resourceArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1345 "tags": [{"key": "k", "value": "v"}],
1346 });
1347 let req = make_request("TagResource", &body.to_string());
1348 let err = expect_err(svc.tag_resource(&req));
1349 assert!(err.to_string().contains("ResourceNotFound"));
1350 }
1351
1352 #[test]
1355 fn test_validate_name() {
1356 assert!(validate_name("valid-name").is_ok());
1357 assert!(validate_name("under_score").is_ok());
1358 assert!(validate_name("").is_err());
1359 assert!(validate_name("has spaces").is_err());
1360 assert!(validate_name(&"a".repeat(81)).is_err());
1361 }
1362
1363 #[test]
1364 fn test_validate_definition() {
1365 assert!(validate_definition(VALID_DEF).is_ok());
1366 assert!(validate_definition("not json").is_err());
1367 assert!(validate_definition(r#"{"States":{}}"#).is_err()); assert!(validate_definition(r#"{"StartAt":"S"}"#).is_err()); }
1370
1371 #[test]
1372 fn test_validate_arn() {
1373 assert!(validate_arn("arn:aws:states:us-east-1:123:sm:test").is_ok());
1374 assert!(validate_arn("not-an-arn").is_err());
1375 }
1376
1377 #[test]
1378 fn test_camel_to_details_key() {
1379 assert_eq!(camel_to_details_key("PassStateEntered"), "passStateEntered");
1380 assert_eq!(camel_to_details_key(""), "");
1381 }
1382
1383 #[test]
1384 fn test_is_mutating_action() {
1385 assert!(is_mutating_action("CreateStateMachine"));
1386 assert!(is_mutating_action("StartExecution"));
1387 assert!(!is_mutating_action("DescribeStateMachine"));
1388 assert!(!is_mutating_action("ListStateMachines"));
1389 }
1390
1391 fn create_express_sm(svc: &StepFunctionsService, name: &str) -> String {
1394 let body = json!({
1395 "name": name,
1396 "definition": VALID_DEF,
1397 "roleArn": "arn:aws:iam::123456789012:role/test",
1398 "type": "EXPRESS",
1399 });
1400 let req = make_request("CreateStateMachine", &body.to_string());
1401 let resp = svc.create_state_machine(&req).unwrap();
1402 let b = body_json(&resp);
1403 b["stateMachineArn"].as_str().unwrap().to_string()
1404 }
1405
1406 #[tokio::test]
1407 async fn start_sync_execution_basic() {
1408 let svc = StepFunctionsService::new(make_state());
1409 let arn = create_express_sm(&svc, "sync-sm");
1410
1411 let body = json!({
1412 "stateMachineArn": arn,
1413 "input": r#"{"key":"value"}"#,
1414 });
1415 let req = make_request("StartSyncExecution", &body.to_string());
1416 let resp = svc.start_sync_execution(&req).await.unwrap();
1417 let b = body_json(&resp);
1418 assert!(b["executionArn"]
1419 .as_str()
1420 .unwrap()
1421 .contains("express:sync-sm"));
1422 assert_eq!(b["stateMachineArn"], arn);
1423 assert_eq!(b["status"], "SUCCEEDED");
1424 assert!(b["startDate"].as_i64().is_some());
1425 assert!(b["stopDate"].as_i64().is_some());
1426 assert!(b["output"].as_str().is_some());
1427 assert!(b["billingDetails"]["billedDurationInMilliseconds"]
1428 .as_i64()
1429 .is_some());
1430 }
1431
1432 #[tokio::test]
1433 async fn start_sync_execution_not_express() {
1434 let svc = StepFunctionsService::new(make_state());
1435 let arn = create_sm(&svc, "std-sm");
1436
1437 let body = json!({"stateMachineArn": arn});
1438 let req = make_request("StartSyncExecution", &body.to_string());
1439 let err = expect_err(svc.start_sync_execution(&req).await);
1440 assert!(err.to_string().contains("StateMachineTypeNotSupported"));
1441 }
1442
1443 #[tokio::test]
1444 async fn start_sync_execution_sm_not_found() {
1445 let svc = StepFunctionsService::new(make_state());
1446 let body = json!({
1447 "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1448 });
1449 let req = make_request("StartSyncExecution", &body.to_string());
1450 let err = expect_err(svc.start_sync_execution(&req).await);
1451 assert!(err.to_string().contains("StateMachineDoesNotExist"));
1452 }
1453
1454 #[tokio::test]
1455 async fn start_sync_execution_records_introspection_fields() {
1456 let svc = StepFunctionsService::new(make_state());
1457 let arn = create_express_sm(&svc, "sync-introspect");
1458
1459 let body = json!({"stateMachineArn": arn, "input": "{}"});
1460 let req = make_request("StartSyncExecution", &body.to_string());
1461 let resp = svc.start_sync_execution(&req).await.unwrap();
1462 let b = body_json(&resp);
1463 let exec_arn = b["executionArn"].as_str().unwrap().to_string();
1464
1465 let accounts = svc.state.read();
1466 let state = accounts.get("123456789012").unwrap();
1467 let stored = state
1468 .executions
1469 .get(&exec_arn)
1470 .expect("sync execution should be persisted for introspection");
1471 assert!(stored.is_sync, "sync executions must be marked is_sync");
1472 assert_eq!(stored.billed_memory_mb, Some(64));
1473 assert!(
1474 stored.billed_duration_ms.is_some(),
1475 "billed_duration_ms must be populated after sync run"
1476 );
1477 assert!(
1478 stored.parent_execution_arn.is_none(),
1479 "top-level sync execution has no parent"
1480 );
1481 }
1482
1483 #[tokio::test]
1484 async fn start_sync_execution_invalid_input() {
1485 let svc = StepFunctionsService::new(make_state());
1486 let arn = create_express_sm(&svc, "bad-input-sync");
1487
1488 let body = json!({
1489 "stateMachineArn": arn,
1490 "input": "not json",
1491 });
1492 let req = make_request("StartSyncExecution", &body.to_string());
1493 let err = expect_err(svc.start_sync_execution(&req).await);
1494 assert!(err.to_string().contains("InvalidExecutionInput"));
1495 }
1496}
1497
1498#[cfg(test)]
1499mod pagination_reject_test {
1500 #[test]
1501 fn paginate_checked_rejects_invalid_token() {
1502 use fakecloud_core::pagination::paginate_checked;
1503 let items: Vec<i32> = (0..5).collect();
1504 assert!(paginate_checked(&items, Some("bad"), 3).is_err());
1505 assert!(paginate_checked(&items, Some("2"), 3).is_ok());
1506 }
1507}