1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use chrono::Utc;
6use http::StatusCode;
7use serde_json::{json, Value};
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::pagination::paginate_checked;
12use fakecloud_core::registry::ServiceRegistry;
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_core::validation::*;
15use fakecloud_dynamodb::SharedDynamoDbState;
16use fakecloud_persistence::SnapshotStore;
17
18use crate::interpreter;
19use crate::state::{
20 Execution, ExecutionStatus, SharedStepFunctionsState, StateMachine, StateMachineStatus,
21 StateMachineType, StepFunctionsSnapshot, StepFunctionsState,
22 STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
23};
24
25const SUPPORTED: &[&str] = &[
26 "CreateStateMachine",
27 "DescribeStateMachine",
28 "ListStateMachines",
29 "DeleteStateMachine",
30 "UpdateStateMachine",
31 "TagResource",
32 "UntagResource",
33 "ListTagsForResource",
34 "StartExecution",
35 "StopExecution",
36 "DescribeExecution",
37 "ListExecutions",
38 "GetExecutionHistory",
39 "DescribeStateMachineForExecution",
40 "CreateActivity",
41 "DeleteActivity",
42 "DescribeActivity",
43 "ListActivities",
44 "GetActivityTask",
45 "SendTaskFailure",
46 "SendTaskHeartbeat",
47 "SendTaskSuccess",
48 "PublishStateMachineVersion",
49 "DeleteStateMachineVersion",
50 "ListStateMachineVersions",
51 "CreateStateMachineAlias",
52 "DeleteStateMachineAlias",
53 "DescribeStateMachineAlias",
54 "ListStateMachineAliases",
55 "UpdateStateMachineAlias",
56 "DescribeMapRun",
57 "ListMapRuns",
58 "UpdateMapRun",
59 "RedriveExecution",
60 "StartSyncExecution",
61 "TestState",
62 "ValidateStateMachineDefinition",
63];
64
65pub type SharedServiceRegistry = Arc<std::sync::OnceLock<Arc<ServiceRegistry>>>;
71
72pub struct StepFunctionsService {
73 state: SharedStepFunctionsState,
74 delivery: Option<Arc<DeliveryBus>>,
75 dynamodb_state: Option<SharedDynamoDbState>,
76 registry: Option<SharedServiceRegistry>,
77 snapshot_store: Option<Arc<dyn SnapshotStore>>,
78 snapshot_lock: Arc<AsyncMutex<()>>,
79}
80
81mod activities;
82mod executions;
83mod map_runs;
84mod state_machines;
85mod tags;
86mod tasks;
87mod validation;
88
89impl StepFunctionsService {
90 pub fn new(state: SharedStepFunctionsState) -> Self {
91 Self {
92 state,
93 delivery: None,
94 dynamodb_state: None,
95 registry: None,
96 snapshot_store: None,
97 snapshot_lock: Arc::new(AsyncMutex::new(())),
98 }
99 }
100
101 pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
102 self.delivery = Some(delivery);
103 self
104 }
105
106 pub fn with_dynamodb(mut self, dynamodb_state: SharedDynamoDbState) -> Self {
107 self.dynamodb_state = Some(dynamodb_state);
108 self
109 }
110
111 pub fn with_registry(mut self, registry: SharedServiceRegistry) -> Self {
116 self.registry = Some(registry);
117 self
118 }
119
120 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
121 self.snapshot_store = Some(store);
122 self
123 }
124
125 async fn save_snapshot(&self) {
126 save_stepfunctions_snapshot(
127 &self.state,
128 self.snapshot_store.clone(),
129 &self.snapshot_lock,
130 )
131 .await;
132 }
133
134 pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
140 let store = self.snapshot_store.clone()?;
141 let state = self.state.clone();
142 let lock = self.snapshot_lock.clone();
143 Some(Arc::new(move || {
144 let state = state.clone();
145 let store = store.clone();
146 let lock = lock.clone();
147 Box::pin(async move {
148 save_stepfunctions_snapshot(&state, Some(store), &lock).await;
149 })
150 }))
151 }
152}
153
154pub async fn save_stepfunctions_snapshot(
160 state: &SharedStepFunctionsState,
161 store: Option<Arc<dyn SnapshotStore>>,
162 lock: &AsyncMutex<()>,
163) {
164 let Some(store) = store else {
165 return;
166 };
167 let _guard = lock.lock().await;
168 let snapshot = StepFunctionsSnapshot {
169 schema_version: STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
170 state: None,
171 accounts: Some(state.read().clone()),
172 };
173 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
174 let bytes = serde_json::to_vec(&snapshot)
175 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
176 store.save(&bytes)
177 })
178 .await;
179 match join {
180 Ok(Ok(())) => {}
181 Ok(Err(err)) => tracing::error!(%err, "failed to write stepfunctions snapshot"),
182 Err(err) => tracing::error!(%err, "stepfunctions snapshot task panicked"),
183 }
184}
185
186pub fn reconcile_interrupted_executions(state: &SharedStepFunctionsState) -> usize {
196 let now = Utc::now();
197 let mut count = 0;
198 let mut accounts = state.write();
199 let account_ids: Vec<String> = accounts.iter().map(|(id, _)| id.to_string()).collect();
200 for account_id in account_ids {
201 let Some(s) = accounts.get_mut(&account_id) else {
202 continue;
203 };
204 for exec in s.executions.values_mut() {
205 if matches!(
206 exec.status,
207 ExecutionStatus::Running | ExecutionStatus::PendingRedrive
208 ) {
209 exec.status = ExecutionStatus::Aborted;
210 exec.stop_date = Some(now);
211 exec.error = Some("Fakecloud.Restart".to_string());
212 exec.cause = Some(
213 "Execution was interrupted by a fakecloud restart and cannot be resumed"
214 .to_string(),
215 );
216 count += 1;
217 }
218 }
219 }
220 count
221}
222
223fn is_mutating_action(action: &str) -> bool {
224 matches!(
225 action,
226 "CreateStateMachine"
227 | "DeleteStateMachine"
228 | "UpdateStateMachine"
229 | "TagResource"
230 | "UntagResource"
231 | "StartExecution"
232 | "StopExecution"
233 | "CreateActivity"
234 | "DeleteActivity"
235 | "GetActivityTask"
236 | "SendTaskFailure"
237 | "SendTaskHeartbeat"
238 | "SendTaskSuccess"
239 | "PublishStateMachineVersion"
240 | "DeleteStateMachineVersion"
241 | "CreateStateMachineAlias"
242 | "DeleteStateMachineAlias"
243 | "UpdateStateMachineAlias"
244 | "UpdateMapRun"
245 | "RedriveExecution"
246 | "StartSyncExecution"
247 )
248}
249
250#[async_trait]
251impl AwsService for StepFunctionsService {
252 fn service_name(&self) -> &str {
253 "states"
254 }
255
256 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
257 let mutates = is_mutating_action(req.action.as_str());
258 let result = match req.action.as_str() {
259 "CreateStateMachine" => self.create_state_machine(&req),
260 "DescribeStateMachine" => self.describe_state_machine(&req),
261 "ListStateMachines" => self.list_state_machines(&req),
262 "DeleteStateMachine" => self.delete_state_machine(&req),
263 "UpdateStateMachine" => self.update_state_machine(&req),
264 "TagResource" => self.tag_resource(&req),
265 "UntagResource" => self.untag_resource(&req),
266 "ListTagsForResource" => self.list_tags_for_resource(&req),
267 "StartExecution" => self.start_execution(&req),
268 "StopExecution" => self.stop_execution(&req),
269 "DescribeExecution" => self.describe_execution(&req),
270 "ListExecutions" => self.list_executions(&req),
271 "GetExecutionHistory" => self.get_execution_history(&req),
272 "DescribeStateMachineForExecution" => self.describe_state_machine_for_execution(&req),
273 "CreateActivity" => self.create_activity(&req),
274 "DeleteActivity" => self.delete_activity(&req),
275 "DescribeActivity" => self.describe_activity(&req),
276 "ListActivities" => self.list_activities(&req),
277 "GetActivityTask" => self.get_activity_task(&req).await,
278 "SendTaskFailure" => self.send_task_failure(&req),
279 "SendTaskHeartbeat" => self.send_task_heartbeat(&req),
280 "SendTaskSuccess" => self.send_task_success(&req),
281 "PublishStateMachineVersion" => self.publish_state_machine_version(&req),
282 "DeleteStateMachineVersion" => self.delete_state_machine_version(&req),
283 "ListStateMachineVersions" => self.list_state_machine_versions(&req),
284 "CreateStateMachineAlias" => self.create_state_machine_alias(&req),
285 "DeleteStateMachineAlias" => self.delete_state_machine_alias(&req),
286 "DescribeStateMachineAlias" => self.describe_state_machine_alias(&req),
287 "ListStateMachineAliases" => self.list_state_machine_aliases(&req),
288 "UpdateStateMachineAlias" => self.update_state_machine_alias(&req),
289 "DescribeMapRun" => self.describe_map_run(&req),
290 "ListMapRuns" => self.list_map_runs(&req),
291 "UpdateMapRun" => self.update_map_run(&req),
292 "RedriveExecution" => self.redrive_execution(&req),
293 "StartSyncExecution" => self.start_sync_execution(&req).await,
294 "TestState" => self.test_state(&req),
295 "ValidateStateMachineDefinition" => self.validate_state_machine_definition(&req),
296 _ => Err(AwsServiceError::action_not_implemented(
297 "states",
298 &req.action,
299 )),
300 };
301 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
302 self.save_snapshot().await;
303 }
304 result
305 }
306
307 fn supported_actions(&self) -> &[&str] {
308 SUPPORTED
309 }
310}
311
312impl StepFunctionsService {
313 fn list_activities(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
314 let body = req.json_body();
315 let raw_max_results = body["maxResults"].as_i64();
316 if let Some(mr) = raw_max_results {
317 validate_max_results(mr)?;
318 }
319 let next_token = body["nextToken"].as_str();
320 if let Some(t) = next_token {
321 validate_page_token(t)?;
322 }
323 let max_results = match raw_max_results.unwrap_or(0) {
327 0 => 100,
328 n => n as usize,
329 };
330 let accounts = self.state.read();
331 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
332 let state = accounts.get(&req.account_id).unwrap_or(&empty);
333 let mut activities: Vec<&crate::state::Activity> = state.activities.values().collect();
334 activities.sort_by(|a, b| a.name.cmp(&b.name));
335 let items: Vec<Value> = activities
336 .iter()
337 .map(|a| {
338 json!({
339 "activityArn": a.arn,
340 "name": a.name,
341 "creationDate": a.creation_date.timestamp(),
342 })
343 })
344 .collect();
345 let (page, token) =
346 paginate_checked(&items, next_token, max_results).map_err(|_| invalid_token())?;
347 let mut resp = json!({ "activities": page });
348 if let Some(t) = token {
349 resp["nextToken"] = json!(t);
350 }
351 Ok(AwsResponse::ok_json(resp))
352 }
353}
354
355fn state_machine_alias_to_json(alias: &crate::state::StateMachineAlias) -> Value {
356 json!({
357 "stateMachineAliasArn": alias.arn,
358 "name": alias.name,
359 "description": alias.description,
360 "routingConfiguration": alias.routing_configuration.iter().map(|r| json!({
361 "stateMachineVersionArn": r.state_machine_version_arn,
362 "weight": r.weight,
363 })).collect::<Vec<_>>(),
364 "creationDate": alias.creation_date.timestamp(),
365 "updateDate": alias.update_date.timestamp(),
366 })
367}
368
369fn map_run_to_json(mr: &crate::state::MapRun) -> Value {
370 json!({
371 "mapRunArn": mr.map_run_arn,
372 "executionArn": mr.execution_arn,
373 "maxConcurrency": mr.max_concurrency,
374 "toleratedFailurePercentage": mr.tolerated_failure_percentage,
375 "toleratedFailureCount": mr.tolerated_failure_count,
376 "status": mr.status,
377 "startDate": mr.start_date.timestamp(),
378 "stopDate": mr.stop_date.map(|d| d.timestamp()),
379 })
380}
381
382fn state_machine_to_json(sm: &StateMachine) -> Value {
385 let mut resp = json!({
386 "name": sm.name,
387 "stateMachineArn": sm.arn,
388 "definition": sm.definition,
389 "roleArn": sm.role_arn,
390 "type": sm.machine_type.as_str(),
391 "status": sm.status.as_str(),
392 "creationDate": sm.creation_date.timestamp() as f64,
393 "updateDate": sm.update_date.timestamp() as f64,
394 "revisionId": sm.revision_id,
395 "label": sm.name,
396 });
397
398 if !sm.description.is_empty() {
399 resp["description"] = json!(sm.description);
400 }
401
402 if let Some(ref logging) = sm.logging_configuration {
403 resp["loggingConfiguration"] = logging.clone();
404 } else {
405 resp["loggingConfiguration"] = json!({
406 "level": "OFF",
407 "includeExecutionData": false,
408 "destinations": [],
409 });
410 }
411
412 if let Some(ref tracing) = sm.tracing_configuration {
413 resp["tracingConfiguration"] = tracing.clone();
414 } else {
415 resp["tracingConfiguration"] = json!({
416 "enabled": false,
417 });
418 }
419
420 resp
421}
422
423fn missing(name: &str) -> AwsServiceError {
424 AwsServiceError::aws_error(
425 StatusCode::BAD_REQUEST,
426 "ValidationException",
427 format!("The request must contain the parameter {name}."),
428 )
429}
430
431fn state_machine_not_found(arn: &str) -> AwsServiceError {
432 AwsServiceError::aws_error(
433 StatusCode::BAD_REQUEST,
434 "StateMachineDoesNotExist",
435 format!("State Machine Does Not Exist: '{arn}'"),
436 )
437}
438
439fn activity_not_found(arn: &str) -> AwsServiceError {
440 AwsServiceError::aws_error(
441 StatusCode::BAD_REQUEST,
442 "ActivityDoesNotExist",
443 format!("Activity does not exist: {arn}"),
444 )
445}
446
447fn task_does_not_exist(token: &str) -> AwsServiceError {
448 AwsServiceError::aws_error(
449 StatusCode::BAD_REQUEST,
450 "TaskDoesNotExist",
451 format!("Task does not exist: {token}"),
452 )
453}
454
455fn resource_not_found(arn: &str) -> AwsServiceError {
456 AwsServiceError::aws_error(
457 StatusCode::BAD_REQUEST,
458 "ResourceNotFound",
459 format!("Resource not found: '{arn}'"),
460 )
461}
462
463fn parse_routing_configuration(
468 routes: &[serde_json::Value],
469) -> Result<Vec<crate::state::AliasRoute>, AwsServiceError> {
470 if routes.is_empty() || routes.len() > 2 {
471 return Err(AwsServiceError::aws_error(
472 StatusCode::BAD_REQUEST,
473 "ValidationException",
474 "routingConfiguration must contain 1 or 2 routes.",
475 ));
476 }
477 let parsed: Vec<crate::state::AliasRoute> = routes
478 .iter()
479 .map(|r| {
480 let arn = r["stateMachineVersionArn"].as_str().ok_or_else(|| {
481 AwsServiceError::aws_error(
482 StatusCode::BAD_REQUEST,
483 "ValidationException",
484 "routingConfiguration entries must contain stateMachineVersionArn.",
485 )
486 })?;
487 let weight = r["weight"].as_i64().ok_or_else(|| {
488 AwsServiceError::aws_error(
489 StatusCode::BAD_REQUEST,
490 "ValidationException",
491 "routingConfiguration entries must contain a numeric weight.",
492 )
493 })?;
494 if !(0..=100).contains(&weight) {
495 return Err(AwsServiceError::aws_error(
496 StatusCode::BAD_REQUEST,
497 "ValidationException",
498 format!("Invalid routing weight {weight}; must be 0-100."),
499 ));
500 }
501 Ok(crate::state::AliasRoute {
502 state_machine_version_arn: arn.to_string(),
503 weight: weight as i32,
504 })
505 })
506 .collect::<Result<_, _>>()?;
507 let total: i32 = parsed.iter().map(|r| r.weight).sum();
508 if total != 100 {
509 return Err(AwsServiceError::aws_error(
510 StatusCode::BAD_REQUEST,
511 "ValidationException",
512 format!("routingConfiguration weights must sum to 100, got {total}."),
513 ));
514 }
515 Ok(parsed)
516}
517
518fn validate_name(name: &str) -> Result<(), AwsServiceError> {
519 if name.is_empty() || name.len() > 80 {
520 return Err(AwsServiceError::aws_error(
521 StatusCode::BAD_REQUEST,
522 "InvalidName",
523 format!("Invalid Name: '{name}' (length must be between 1 and 80 characters)"),
524 ));
525 }
526 if !name
528 .chars()
529 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
530 {
531 return Err(AwsServiceError::aws_error(
532 StatusCode::BAD_REQUEST,
533 "InvalidName",
534 format!(
535 "Invalid Name: '{name}' (must only contain alphanumeric characters, hyphens, and underscores)"
536 ),
537 ));
538 }
539 Ok(())
540}
541
542fn validate_definition(definition: &str) -> Result<(), AwsServiceError> {
543 let parsed: Value = serde_json::from_str(definition).map_err(|e| {
544 AwsServiceError::aws_error(
545 StatusCode::BAD_REQUEST,
546 "InvalidDefinition",
547 format!("Invalid State Machine Definition: '{e}'"),
548 )
549 })?;
550
551 if parsed.get("StartAt").and_then(|v| v.as_str()).is_none() {
552 return Err(AwsServiceError::aws_error(
553 StatusCode::BAD_REQUEST,
554 "InvalidDefinition",
555 "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
556 .to_string(),
557 ));
558 }
559
560 let states_obj = parsed
561 .get("States")
562 .and_then(|v| v.as_object())
563 .ok_or_else(|| {
564 AwsServiceError::aws_error(
565 StatusCode::BAD_REQUEST,
566 "InvalidDefinition",
567 "Invalid State Machine Definition: 'MISSING_STATES' (States field is required)"
568 .to_string(),
569 )
570 })?;
571
572 let start_at = parsed["StartAt"].as_str().ok_or_else(|| {
573 AwsServiceError::aws_error(
574 StatusCode::BAD_REQUEST,
575 "InvalidDefinition",
576 "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
577 .to_string(),
578 )
579 })?;
580 if !states_obj.contains_key(start_at) {
581 return Err(AwsServiceError::aws_error(
582 StatusCode::BAD_REQUEST,
583 "InvalidDefinition",
584 format!(
585 "Invalid State Machine Definition: 'MISSING_TRANSITION_TARGET' \
586 (StartAt '{start_at}' does not reference a valid state)"
587 ),
588 ));
589 }
590
591 for (state_name, state_val) in states_obj {
595 validate_state_paths(state_name, state_val)?;
596 }
597
598 Ok(())
599}
600
601fn validate_state_paths(state_name: &str, state: &Value) -> Result<(), AwsServiceError> {
605 for field in ["InputPath", "OutputPath", "ResultPath"] {
606 if let Some(p) = state.get(field).and_then(|v| v.as_str()) {
607 if (field != "ResultPath" && p == "null") || is_valid_reference_path(p) {
611 continue;
612 }
613 return Err(invalid_reference_path(state_name, field, p));
614 }
615 }
616
617 if state.get("Type").and_then(|v| v.as_str()) == Some("Choice") {
618 if let Some(choices) = state.get("Choices").and_then(|v| v.as_array()) {
619 for rule in choices {
620 validate_choice_variables(state_name, rule)?;
621 }
622 }
623 }
624
625 Ok(())
626}
627
628fn validate_choice_variables(state_name: &str, rule: &Value) -> Result<(), AwsServiceError> {
631 if let Some(v) = rule.get("Variable").and_then(|v| v.as_str()) {
632 if !is_valid_reference_path(v) {
633 return Err(invalid_reference_path(state_name, "Variable", v));
634 }
635 }
636 for combinator in ["And", "Or"] {
637 if let Some(nested) = rule.get(combinator).and_then(|v| v.as_array()) {
638 for n in nested {
639 validate_choice_variables(state_name, n)?;
640 }
641 }
642 }
643 if let Some(n) = rule.get("Not") {
644 validate_choice_variables(state_name, n)?;
645 }
646 Ok(())
647}
648
649fn is_valid_reference_path(path: &str) -> bool {
653 if path != "$" && !path.starts_with("$.") && !path.starts_with("$[") {
654 return false;
655 }
656 let body = path
657 .strip_prefix("$.")
658 .or_else(|| path.strip_prefix('$'))
659 .unwrap_or(path);
660 for part in body.split('.') {
661 if !segment_is_valid(part) {
662 return false;
663 }
664 }
665 true
666}
667
668fn segment_is_valid(part: &str) -> bool {
669 match part.find('[') {
670 None => !part.contains(']'),
671 Some(open) => {
672 if !part.ends_with(']') {
673 return false;
674 }
675 let inner = &part[open + 1..part.len() - 1];
676 !inner.is_empty() && inner.parse::<usize>().is_ok()
678 }
679 }
680}
681
682fn invalid_reference_path(state_name: &str, field: &str, path: &str) -> AwsServiceError {
683 AwsServiceError::aws_error(
684 StatusCode::BAD_REQUEST,
685 "InvalidDefinition",
686 format!(
687 "Invalid State Machine Definition: 'SCHEMA_VALIDATION_FAILED' \
688 (The {field} field of state '{state_name}' is not a valid reference path: '{path}')"
689 ),
690 )
691}
692
693fn execution_not_found(arn: &str) -> AwsServiceError {
694 AwsServiceError::aws_error(
695 StatusCode::BAD_REQUEST,
696 "ExecutionDoesNotExist",
697 format!("Execution Does Not Exist: '{arn}'"),
698 )
699}
700
701fn execution_to_json(exec: &Execution) -> Value {
702 let mut resp = json!({
703 "executionArn": exec.execution_arn,
704 "stateMachineArn": exec.state_machine_arn,
705 "name": exec.name,
706 "status": exec.status.as_str(),
707 "startDate": exec.start_date.timestamp() as f64,
708 });
709
710 if let Some(ref input) = exec.input {
711 resp["input"] = json!(input);
712 }
713 if let Some(ref output) = exec.output {
714 resp["output"] = json!(output);
715 }
716 if let Some(stop) = exec.stop_date {
717 resp["stopDate"] = json!(stop.timestamp() as f64);
718 }
719 if let Some(ref error) = exec.error {
720 resp["error"] = json!(error);
721 }
722 if let Some(ref cause) = exec.cause {
723 resp["cause"] = json!(cause);
724 }
725
726 resp
727}
728
729fn camel_to_details_key(event_type: &str) -> String {
739 if event_type.ends_with("StateEntered") {
740 return "stateEntered".to_string();
741 }
742 if event_type.ends_with("StateExited") {
743 return "stateExited".to_string();
744 }
745 let mut chars = event_type.chars();
746 match chars.next() {
747 None => String::new(),
748 Some(c) => c.to_lowercase().to_string() + chars.as_str(),
749 }
750}
751
752fn execution_input_matches(stored: Option<&str>, incoming: Option<&str>) -> bool {
757 let stored = stored.unwrap_or("{}");
758 let incoming = incoming.unwrap_or("{}");
759 match (
760 serde_json::from_str::<Value>(stored),
761 serde_json::from_str::<Value>(incoming),
762 ) {
763 (Ok(a), Ok(b)) => a == b,
764 _ => stored == incoming,
765 }
766}
767
768fn validate_arn(arn: &str) -> Result<(), AwsServiceError> {
769 if !arn.starts_with("arn:") {
770 return Err(AwsServiceError::aws_error(
771 StatusCode::BAD_REQUEST,
772 "InvalidArn",
773 format!("Invalid Arn: '{arn}'"),
774 ));
775 }
776 Ok(())
777}
778
779fn validate_arn_length(field: &str, value: &str, max: usize) -> Result<(), AwsServiceError> {
783 if value.is_empty() || value.len() > max {
784 return Err(AwsServiceError::aws_error(
785 StatusCode::BAD_REQUEST,
786 "InvalidArn",
787 format!("Invalid Arn at '{field}': must be 1..={max} characters"),
788 ));
789 }
790 Ok(())
791}
792
793pub(super) fn invalid_token() -> AwsServiceError {
797 AwsServiceError::aws_error(StatusCode::BAD_REQUEST, "InvalidToken", "Invalid nextToken")
798}
799
800fn validate_page_token(value: &str) -> Result<(), AwsServiceError> {
804 if value.is_empty() || value.len() > 1024 {
805 return Err(AwsServiceError::aws_error(
806 StatusCode::BAD_REQUEST,
807 "InvalidToken",
808 "nextToken must be 1..=1024 characters",
809 ));
810 }
811 Ok(())
812}
813
814fn validate_max_results(value: i64) -> Result<(), AwsServiceError> {
820 if !(0..=1000).contains(&value) {
821 return Err(AwsServiceError::aws_error(
822 StatusCode::BAD_REQUEST,
823 "InvalidToken",
824 format!("maxResults '{value}' is outside 0..=1000"),
825 ));
826 }
827 Ok(())
828}
829
830pub fn start_execution_from_delivery(
839 state: &SharedStepFunctionsState,
840 delivery: &Option<Arc<DeliveryBus>>,
841 dynamodb_state: &Option<SharedDynamoDbState>,
842 registry: &Option<SharedServiceRegistry>,
843 state_machine_arn: &str,
844 input: &str,
845) {
846 if serde_json::from_str::<serde_json::Value>(input).is_err() {
848 tracing::warn!(
849 state_machine_arn,
850 "Step Functions delivery: invalid JSON input, skipping execution"
851 );
852 return;
853 }
854
855 let execution_name = uuid::Uuid::new_v4().to_string();
856
857 let account_id = state_machine_arn
859 .split(':')
860 .nth(4)
861 .unwrap_or("000000000000")
862 .to_string();
863
864 let mut accounts = state.write();
865 let st = accounts.get_or_create(&account_id);
866 let sm = match st.state_machines.get(state_machine_arn) {
867 Some(sm) => sm,
868 None => {
869 tracing::warn!(
870 state_machine_arn,
871 "Step Functions delivery: state machine not found"
872 );
873 return;
874 }
875 };
876
877 let sm_name = sm.name.clone();
878 let definition = sm.definition.clone();
879 let exec_arn = st.execution_arn(&sm_name, &execution_name);
880
881 let now = Utc::now();
882 let execution = Execution {
883 execution_arn: exec_arn.clone(),
884 state_machine_arn: state_machine_arn.to_string(),
885 state_machine_name: sm_name,
886 name: execution_name,
887 status: ExecutionStatus::Running,
888 input: Some(input.to_string()),
889 output: None,
890 start_date: now,
891 stop_date: None,
892 error: None,
893 cause: None,
894 history_events: vec![],
895 parent_execution_arn: None,
896 is_sync: false,
897 billed_duration_ms: None,
898 billed_memory_mb: None,
899 };
900
901 st.executions.insert(exec_arn.clone(), execution);
902 let logging_config = sm.logging_configuration.clone();
903 drop(accounts);
904
905 let shared_state = state.clone();
906 let delivery = delivery.clone();
907 let dynamodb_state = dynamodb_state.clone();
908 let registry = registry.clone();
909 let input = Some(input.to_string());
910 tokio::spawn(async move {
911 interpreter::execute_state_machine(
912 shared_state,
913 exec_arn,
914 definition,
915 input,
916 delivery,
917 dynamodb_state,
918 registry,
919 logging_config,
920 )
921 .await;
922 });
923}
924
925#[cfg(test)]
926mod tests {
927 use super::*;
928 use http::{HeaderMap, Method};
929 use parking_lot::RwLock;
930 use serde_json::Value;
931 use std::collections::HashMap;
932 use std::sync::Arc;
933
934 fn make_state() -> SharedStepFunctionsState {
935 Arc::new(RwLock::new(
936 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
937 ))
938 }
939
940 fn make_request(action: &str, body: &str) -> AwsRequest {
941 AwsRequest {
942 service: "states".to_string(),
943 action: action.to_string(),
944 region: "us-east-1".to_string(),
945 account_id: "123456789012".to_string(),
946 request_id: "test-id".to_string(),
947 headers: HeaderMap::new(),
948 query_params: HashMap::new(),
949 body: body.as_bytes().to_vec().into(),
950 body_stream: parking_lot::Mutex::new(None),
951 path_segments: vec![],
952 raw_path: "/".to_string(),
953 raw_query: String::new(),
954 method: Method::POST,
955 is_query_protocol: false,
956 access_key_id: None,
957 principal: None,
958 }
959 }
960
961 fn body_json(resp: &AwsResponse) -> Value {
962 serde_json::from_slice(resp.body.expect_bytes()).unwrap()
963 }
964
965 fn expect_err(result: Result<AwsResponse, AwsServiceError>) -> AwsServiceError {
966 match result {
967 Err(e) => e,
968 Ok(_) => panic!("expected error, got Ok"),
969 }
970 }
971
972 const VALID_DEF: &str = r#"{"StartAt":"Pass","States":{"Pass":{"Type":"Pass","End":true}}}"#;
973
974 fn create_sm(svc: &StepFunctionsService, name: &str) -> String {
975 let body = json!({
976 "name": name,
977 "definition": VALID_DEF,
978 "roleArn": "arn:aws:iam::123456789012:role/test",
979 });
980 let req = make_request("CreateStateMachine", &body.to_string());
981 let resp = svc.create_state_machine(&req).unwrap();
982 let b = body_json(&resp);
983 b["stateMachineArn"].as_str().unwrap().to_string()
984 }
985
986 #[test]
989 fn create_state_machine_basic() {
990 let svc = StepFunctionsService::new(make_state());
991 let arn = create_sm(&svc, "test-sm");
992 assert!(arn.contains("test-sm"));
993 }
994
995 #[test]
996 fn create_state_machine_with_express_type() {
997 let svc = StepFunctionsService::new(make_state());
998 let body = json!({
999 "name": "express-sm",
1000 "definition": VALID_DEF,
1001 "roleArn": "arn:aws:iam::123456789012:role/r",
1002 "type": "EXPRESS",
1003 });
1004 let req = make_request("CreateStateMachine", &body.to_string());
1005 let resp = svc.create_state_machine(&req).unwrap();
1006 let b = body_json(&resp);
1007 assert!(b["stateMachineArn"].as_str().is_some());
1008 }
1009
1010 #[test]
1011 fn create_state_machine_duplicate_fails() {
1012 let svc = StepFunctionsService::new(make_state());
1013 create_sm(&svc, "dup-sm");
1014 let body = json!({
1015 "name": "dup-sm",
1016 "definition": VALID_DEF,
1017 "roleArn": "arn:aws:iam::123456789012:role/r",
1018 });
1019 let req = make_request("CreateStateMachine", &body.to_string());
1020 let err = expect_err(svc.create_state_machine(&req));
1021 assert!(err.to_string().contains("StateMachineAlreadyExists"));
1022 }
1023
1024 #[test]
1025 fn create_state_machine_missing_name() {
1026 let svc = StepFunctionsService::new(make_state());
1027 let body = json!({
1028 "definition": VALID_DEF,
1029 "roleArn": "arn:aws:iam::123456789012:role/r",
1030 });
1031 let req = make_request("CreateStateMachine", &body.to_string());
1032 assert!(svc.create_state_machine(&req).is_err());
1033 }
1034
1035 #[test]
1036 fn create_state_machine_invalid_definition() {
1037 let svc = StepFunctionsService::new(make_state());
1038 let body = json!({
1039 "name": "bad-def",
1040 "definition": "not json",
1041 "roleArn": "arn:aws:iam::123456789012:role/r",
1042 });
1043 let req = make_request("CreateStateMachine", &body.to_string());
1044 let err = expect_err(svc.create_state_machine(&req));
1045 assert!(err.to_string().contains("InvalidDefinition"));
1046 }
1047
1048 #[test]
1049 fn create_state_machine_definition_missing_start_at() {
1050 let svc = StepFunctionsService::new(make_state());
1051 let body = json!({
1052 "name": "no-start",
1053 "definition": r#"{"States":{"S":{"Type":"Pass","End":true}}}"#,
1054 "roleArn": "arn:aws:iam::123456789012:role/r",
1055 });
1056 let req = make_request("CreateStateMachine", &body.to_string());
1057 let err = expect_err(svc.create_state_machine(&req));
1058 assert!(err.to_string().contains("InvalidDefinition"));
1059 }
1060
1061 #[test]
1062 fn create_state_machine_definition_missing_states() {
1063 let svc = StepFunctionsService::new(make_state());
1064 let body = json!({
1065 "name": "no-states",
1066 "definition": r#"{"StartAt":"S"}"#,
1067 "roleArn": "arn:aws:iam::123456789012:role/r",
1068 });
1069 let req = make_request("CreateStateMachine", &body.to_string());
1070 let err = expect_err(svc.create_state_machine(&req));
1071 assert!(err.to_string().contains("InvalidDefinition"));
1072 }
1073
1074 #[test]
1075 fn create_state_machine_definition_start_at_not_in_states() {
1076 let svc = StepFunctionsService::new(make_state());
1077 let body = json!({
1078 "name": "bad-start",
1079 "definition": r#"{"StartAt":"Missing","States":{"S":{"Type":"Pass","End":true}}}"#,
1080 "roleArn": "arn:aws:iam::123456789012:role/r",
1081 });
1082 let req = make_request("CreateStateMachine", &body.to_string());
1083 let err = expect_err(svc.create_state_machine(&req));
1084 assert!(err.to_string().contains("MISSING_TRANSITION_TARGET"));
1085 }
1086
1087 #[test]
1088 fn create_state_machine_invalid_type() {
1089 let svc = StepFunctionsService::new(make_state());
1090 let body = json!({
1091 "name": "bad-type",
1092 "definition": VALID_DEF,
1093 "roleArn": "arn:aws:iam::123456789012:role/r",
1094 "type": "INVALID",
1095 });
1096 let req = make_request("CreateStateMachine", &body.to_string());
1097 assert!(svc.create_state_machine(&req).is_err());
1098 }
1099
1100 #[test]
1101 fn create_state_machine_invalid_arn() {
1102 let svc = StepFunctionsService::new(make_state());
1103 let body = json!({
1104 "name": "bad-arn",
1105 "definition": VALID_DEF,
1106 "roleArn": "not-an-arn",
1107 });
1108 let req = make_request("CreateStateMachine", &body.to_string());
1109 let err = expect_err(svc.create_state_machine(&req));
1110 assert!(err.to_string().contains("InvalidArn"));
1111 }
1112
1113 #[test]
1114 fn create_state_machine_invalid_name() {
1115 let svc = StepFunctionsService::new(make_state());
1116 let body = json!({
1117 "name": "has spaces!",
1118 "definition": VALID_DEF,
1119 "roleArn": "arn:aws:iam::123456789012:role/r",
1120 });
1121 let req = make_request("CreateStateMachine", &body.to_string());
1122 let err = expect_err(svc.create_state_machine(&req));
1123 assert!(err.to_string().contains("InvalidName"));
1124 }
1125
1126 #[test]
1127 fn create_state_machine_name_too_long() {
1128 let svc = StepFunctionsService::new(make_state());
1129 let long_name = "a".repeat(81);
1130 let body = json!({
1131 "name": long_name,
1132 "definition": VALID_DEF,
1133 "roleArn": "arn:aws:iam::123456789012:role/r",
1134 });
1135 let req = make_request("CreateStateMachine", &body.to_string());
1136 let err = expect_err(svc.create_state_machine(&req));
1137 assert!(err.to_string().contains("InvalidName"));
1138 }
1139
1140 #[test]
1143 fn describe_state_machine_found() {
1144 let svc = StepFunctionsService::new(make_state());
1145 let arn = create_sm(&svc, "desc-sm");
1146
1147 let req = make_request(
1148 "DescribeStateMachine",
1149 &json!({"stateMachineArn": arn}).to_string(),
1150 );
1151 let resp = svc.describe_state_machine(&req).unwrap();
1152 let b = body_json(&resp);
1153 assert_eq!(b["name"], "desc-sm");
1154 assert_eq!(b["status"], "ACTIVE");
1155 assert!(b["definition"].as_str().is_some());
1156 }
1157
1158 #[test]
1159 fn describe_state_machine_not_found() {
1160 let svc = StepFunctionsService::new(make_state());
1161 let req = make_request(
1162 "DescribeStateMachine",
1163 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1164 .to_string(),
1165 );
1166 let err = expect_err(svc.describe_state_machine(&req));
1167 assert!(err.to_string().contains("StateMachineDoesNotExist"));
1168 }
1169
1170 #[test]
1173 fn list_state_machines_empty() {
1174 let svc = StepFunctionsService::new(make_state());
1175 let req = make_request("ListStateMachines", "{}");
1176 let resp = svc.list_state_machines(&req).unwrap();
1177 let b = body_json(&resp);
1178 assert!(b["stateMachines"].as_array().unwrap().is_empty());
1179 }
1180
1181 #[test]
1182 fn list_state_machines_returns_created() {
1183 let svc = StepFunctionsService::new(make_state());
1184 create_sm(&svc, "sm-1");
1185 create_sm(&svc, "sm-2");
1186
1187 let req = make_request("ListStateMachines", "{}");
1188 let resp = svc.list_state_machines(&req).unwrap();
1189 let b = body_json(&resp);
1190 assert_eq!(b["stateMachines"].as_array().unwrap().len(), 2);
1191 }
1192
1193 #[test]
1196 fn delete_state_machine() {
1197 let svc = StepFunctionsService::new(make_state());
1198 let arn = create_sm(&svc, "del-sm");
1199
1200 let req = make_request(
1201 "DeleteStateMachine",
1202 &json!({"stateMachineArn": arn}).to_string(),
1203 );
1204 svc.delete_state_machine(&req).unwrap();
1205
1206 let req = make_request(
1208 "DescribeStateMachine",
1209 &json!({"stateMachineArn": arn}).to_string(),
1210 );
1211 assert!(svc.describe_state_machine(&req).is_err());
1212 }
1213
1214 #[test]
1215 fn delete_state_machine_nonexistent_succeeds() {
1216 let svc = StepFunctionsService::new(make_state());
1217 let req = make_request(
1218 "DeleteStateMachine",
1219 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1220 .to_string(),
1221 );
1222 svc.delete_state_machine(&req).unwrap();
1224 }
1225
1226 #[test]
1229 fn update_state_machine() {
1230 let svc = StepFunctionsService::new(make_state());
1231 let arn = create_sm(&svc, "upd-sm");
1232
1233 let new_def = r#"{"StartAt":"NewPass","States":{"NewPass":{"Type":"Pass","End":true}}}"#;
1234 let body = json!({
1235 "stateMachineArn": arn,
1236 "definition": new_def,
1237 "description": "updated",
1238 });
1239 let req = make_request("UpdateStateMachine", &body.to_string());
1240 let resp = svc.update_state_machine(&req).unwrap();
1241 let b = body_json(&resp);
1242 assert!(b["updateDate"].as_f64().is_some());
1243
1244 let req = make_request(
1246 "DescribeStateMachine",
1247 &json!({"stateMachineArn": arn}).to_string(),
1248 );
1249 let resp = svc.describe_state_machine(&req).unwrap();
1250 let b = body_json(&resp);
1251 assert!(b["definition"].as_str().unwrap().contains("NewPass"));
1252 assert_eq!(b["description"], "updated");
1253 }
1254
1255 #[test]
1256 fn update_state_machine_not_found() {
1257 let svc = StepFunctionsService::new(make_state());
1258 let body = json!({
1259 "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1260 "definition": VALID_DEF,
1261 });
1262 let req = make_request("UpdateStateMachine", &body.to_string());
1263 let err = expect_err(svc.update_state_machine(&req));
1264 assert!(err.to_string().contains("StateMachineDoesNotExist"));
1265 }
1266
1267 #[tokio::test]
1270 async fn start_execution_basic() {
1271 let svc = StepFunctionsService::new(make_state());
1272 let arn = create_sm(&svc, "exec-sm");
1273
1274 let body = json!({
1275 "stateMachineArn": arn,
1276 "input": r#"{"key":"value"}"#,
1277 });
1278 let req = make_request("StartExecution", &body.to_string());
1279 let resp = svc.start_execution(&req).unwrap();
1280 let b = body_json(&resp);
1281 assert!(b["executionArn"].as_str().is_some());
1282 assert!(b["startDate"].as_f64().is_some());
1283 }
1284
1285 #[tokio::test]
1286 async fn start_execution_with_name() {
1287 let svc = StepFunctionsService::new(make_state());
1288 let arn = create_sm(&svc, "named-exec");
1289
1290 let body = json!({
1291 "stateMachineArn": arn,
1292 "name": "my-execution",
1293 });
1294 let req = make_request("StartExecution", &body.to_string());
1295 let resp = svc.start_execution(&req).unwrap();
1296 let b = body_json(&resp);
1297 assert!(b["executionArn"].as_str().unwrap().contains("my-execution"));
1298 }
1299
1300 #[tokio::test]
1301 async fn start_execution_sm_not_found() {
1302 let svc = StepFunctionsService::new(make_state());
1303 let body = json!({
1304 "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1305 });
1306 let req = make_request("StartExecution", &body.to_string());
1307 let err = expect_err(svc.start_execution(&req));
1308 assert!(err.to_string().contains("StateMachineDoesNotExist"));
1309 }
1310
1311 #[tokio::test]
1312 async fn start_execution_invalid_input() {
1313 let svc = StepFunctionsService::new(make_state());
1314 let arn = create_sm(&svc, "bad-input");
1315
1316 let body = json!({
1317 "stateMachineArn": arn,
1318 "input": "not json",
1319 });
1320 let req = make_request("StartExecution", &body.to_string());
1321 let err = expect_err(svc.start_execution(&req));
1322 assert!(err.to_string().contains("InvalidExecutionInput"));
1323 }
1324
1325 #[tokio::test]
1326 async fn start_execution_same_name_same_input_is_idempotent() {
1327 let svc = StepFunctionsService::new(make_state());
1328 let arn = create_sm(&svc, "dup-exec");
1329
1330 let body = json!({
1331 "stateMachineArn": arn,
1332 "name": "same-name",
1333 "input": "{\"a\":1}",
1334 });
1335 let req = make_request("StartExecution", &body.to_string());
1336 let first = body_json(&svc.start_execution(&req).unwrap());
1337
1338 let req = make_request("StartExecution", &body.to_string());
1340 let second = body_json(&svc.start_execution(&req).unwrap());
1341 assert_eq!(first["executionArn"], second["executionArn"]);
1342 assert_eq!(first["startDate"], second["startDate"]);
1343 }
1344
1345 #[tokio::test]
1346 async fn start_execution_same_name_different_input_conflicts() {
1347 let svc = StepFunctionsService::new(make_state());
1348 let arn = create_sm(&svc, "dup-exec-diff");
1349
1350 let req = make_request(
1351 "StartExecution",
1352 &json!({
1353 "stateMachineArn": arn,
1354 "name": "same-name",
1355 "input": "{\"a\":1}",
1356 })
1357 .to_string(),
1358 );
1359 svc.start_execution(&req).unwrap();
1360
1361 let req = make_request(
1363 "StartExecution",
1364 &json!({
1365 "stateMachineArn": arn,
1366 "name": "same-name",
1367 "input": "{\"a\":2}",
1368 })
1369 .to_string(),
1370 );
1371 let err = expect_err(svc.start_execution(&req));
1372 assert!(err.to_string().contains("ExecutionAlreadyExists"));
1373 }
1374
1375 #[tokio::test]
1376 async fn start_execution_express_name_collision_never_idempotent() {
1377 let svc = StepFunctionsService::new(make_state());
1378 let arn = create_express_sm(&svc, "dup-exec-express");
1379
1380 let body = json!({
1381 "stateMachineArn": arn,
1382 "name": "same-name",
1383 "input": "{\"a\":1}",
1384 });
1385 let req = make_request("StartExecution", &body.to_string());
1386 svc.start_execution(&req).unwrap();
1387
1388 let req = make_request("StartExecution", &body.to_string());
1390 let err = expect_err(svc.start_execution(&req));
1391 assert!(err.to_string().contains("ExecutionAlreadyExists"));
1392 }
1393
1394 #[tokio::test]
1397 async fn describe_execution_found() {
1398 let svc = StepFunctionsService::new(make_state());
1399 let sm_arn = create_sm(&svc, "desc-exec");
1400
1401 let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
1402 let req = make_request("StartExecution", &body.to_string());
1403 let resp = svc.start_execution(&req).unwrap();
1404 let exec_arn = body_json(&resp)["executionArn"]
1405 .as_str()
1406 .unwrap()
1407 .to_string();
1408
1409 let req = make_request(
1410 "DescribeExecution",
1411 &json!({"executionArn": exec_arn}).to_string(),
1412 );
1413 let resp = svc.describe_execution(&req).unwrap();
1414 let b = body_json(&resp);
1415 assert_eq!(b["name"], "e1");
1416 assert_eq!(b["status"], "RUNNING");
1417 }
1418
1419 #[tokio::test]
1420 async fn describe_execution_not_found() {
1421 let svc = StepFunctionsService::new(make_state());
1422 let req = make_request(
1423 "DescribeExecution",
1424 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1425 .to_string(),
1426 );
1427 let err = expect_err(svc.describe_execution(&req));
1428 assert!(err.to_string().contains("ExecutionDoesNotExist"));
1429 }
1430
1431 #[tokio::test]
1434 async fn stop_execution() {
1435 let svc = StepFunctionsService::new(make_state());
1436 let sm_arn = create_sm(&svc, "stop-sm");
1437
1438 let body = json!({"stateMachineArn": sm_arn, "name": "stop-e"});
1439 let req = make_request("StartExecution", &body.to_string());
1440 let resp = svc.start_execution(&req).unwrap();
1441 let exec_arn = body_json(&resp)["executionArn"]
1442 .as_str()
1443 .unwrap()
1444 .to_string();
1445
1446 let body = json!({
1447 "executionArn": exec_arn,
1448 "error": "UserAborted",
1449 "cause": "test stop",
1450 });
1451 let req = make_request("StopExecution", &body.to_string());
1452 let resp = svc.stop_execution(&req).unwrap();
1453 let b = body_json(&resp);
1454 assert!(b["stopDate"].as_f64().is_some());
1455
1456 let req = make_request(
1458 "DescribeExecution",
1459 &json!({"executionArn": exec_arn}).to_string(),
1460 );
1461 let resp = svc.describe_execution(&req).unwrap();
1462 let b = body_json(&resp);
1463 assert_eq!(b["status"], "ABORTED");
1464 assert_eq!(b["error"], "UserAborted");
1465 }
1466
1467 #[tokio::test]
1468 async fn stop_execution_not_found() {
1469 let svc = StepFunctionsService::new(make_state());
1470 let req = make_request(
1471 "StopExecution",
1472 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1473 .to_string(),
1474 );
1475 let err = expect_err(svc.stop_execution(&req));
1476 assert!(err.to_string().contains("ExecutionDoesNotExist"));
1477 }
1478
1479 #[tokio::test]
1482 async fn list_executions() {
1483 let svc = StepFunctionsService::new(make_state());
1484 let sm_arn = create_sm(&svc, "list-exec");
1485
1486 for i in 0..3 {
1487 let body = json!({"stateMachineArn": sm_arn, "name": format!("e{i}")});
1488 let req = make_request("StartExecution", &body.to_string());
1489 svc.start_execution(&req).unwrap();
1490 }
1491
1492 let req = make_request(
1493 "ListExecutions",
1494 &json!({"stateMachineArn": sm_arn}).to_string(),
1495 );
1496 let resp = svc.list_executions(&req).unwrap();
1497 let b = body_json(&resp);
1498 assert_eq!(b["executions"].as_array().unwrap().len(), 3);
1499 }
1500
1501 #[tokio::test]
1502 async fn list_executions_sm_not_found() {
1503 let svc = StepFunctionsService::new(make_state());
1504 let req = make_request(
1505 "ListExecutions",
1506 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1507 .to_string(),
1508 );
1509 let err = expect_err(svc.list_executions(&req));
1510 assert!(err.to_string().contains("StateMachineDoesNotExist"));
1511 }
1512
1513 #[tokio::test]
1516 async fn get_execution_history_not_found() {
1517 let svc = StepFunctionsService::new(make_state());
1518 let req = make_request(
1519 "GetExecutionHistory",
1520 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1521 .to_string(),
1522 );
1523 let err = expect_err(svc.get_execution_history(&req));
1524 assert!(err.to_string().contains("ExecutionDoesNotExist"));
1525 }
1526
1527 #[tokio::test]
1530 async fn describe_sm_for_execution() {
1531 let svc = StepFunctionsService::new(make_state());
1532 let sm_arn = create_sm(&svc, "sm-for-exec");
1533
1534 let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
1535 let req = make_request("StartExecution", &body.to_string());
1536 let resp = svc.start_execution(&req).unwrap();
1537 let exec_arn = body_json(&resp)["executionArn"]
1538 .as_str()
1539 .unwrap()
1540 .to_string();
1541
1542 let req = make_request(
1543 "DescribeStateMachineForExecution",
1544 &json!({"executionArn": exec_arn}).to_string(),
1545 );
1546 let resp = svc.describe_state_machine_for_execution(&req).unwrap();
1547 let b = body_json(&resp);
1548 assert_eq!(b["name"], "sm-for-exec");
1549 }
1550
1551 #[test]
1554 fn tag_untag_list_tags() {
1555 let svc = StepFunctionsService::new(make_state());
1556 let arn = create_sm(&svc, "tagged-sm");
1557
1558 let body = json!({
1560 "resourceArn": arn,
1561 "tags": [{"key": "env", "value": "prod"}],
1562 });
1563 let req = make_request("TagResource", &body.to_string());
1564 svc.tag_resource(&req).unwrap();
1565
1566 let req = make_request(
1568 "ListTagsForResource",
1569 &json!({"resourceArn": arn}).to_string(),
1570 );
1571 let resp = svc.list_tags_for_resource(&req).unwrap();
1572 let b = body_json(&resp);
1573 let tags = b["tags"].as_array().unwrap();
1574 assert_eq!(tags.len(), 1);
1575 assert_eq!(tags[0]["key"], "env");
1576
1577 let body = json!({
1579 "resourceArn": arn,
1580 "tagKeys": ["env"],
1581 });
1582 let req = make_request("UntagResource", &body.to_string());
1583 svc.untag_resource(&req).unwrap();
1584
1585 let req = make_request(
1587 "ListTagsForResource",
1588 &json!({"resourceArn": arn}).to_string(),
1589 );
1590 let resp = svc.list_tags_for_resource(&req).unwrap();
1591 let b = body_json(&resp);
1592 assert!(b["tags"].as_array().unwrap().is_empty());
1593 }
1594
1595 #[test]
1596 fn tag_resource_not_found() {
1597 let svc = StepFunctionsService::new(make_state());
1598 let body = json!({
1599 "resourceArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1600 "tags": [{"key": "k", "value": "v"}],
1601 });
1602 let req = make_request("TagResource", &body.to_string());
1603 let err = expect_err(svc.tag_resource(&req));
1604 assert!(err.to_string().contains("ResourceNotFound"));
1605 }
1606
1607 #[test]
1610 fn test_validate_name() {
1611 assert!(validate_name("valid-name").is_ok());
1612 assert!(validate_name("under_score").is_ok());
1613 assert!(validate_name("").is_err());
1614 assert!(validate_name("has spaces").is_err());
1615 assert!(validate_name(&"a".repeat(81)).is_err());
1616 }
1617
1618 #[test]
1619 fn test_validate_definition() {
1620 assert!(validate_definition(VALID_DEF).is_ok());
1621 assert!(validate_definition("not json").is_err());
1622 assert!(validate_definition(r#"{"States":{}}"#).is_err()); assert!(validate_definition(r#"{"StartAt":"S"}"#).is_err()); }
1625
1626 #[test]
1627 fn test_validate_definition_rejects_malformed_paths() {
1628 let def =
1630 r#"{"StartAt":"P","States":{"P":{"Type":"Pass","InputPath":"$.arr[","End":true}}}"#;
1631 assert!(validate_definition(def).is_err());
1632
1633 let def =
1635 "{\"StartAt\":\"P\",\"States\":{\"P\":{\"Type\":\"Pass\",\"OutputPath\":\"$.x[\u{00e9}\",\"End\":true}}}";
1636 assert!(validate_definition(def).is_err());
1637
1638 let def = r#"{"StartAt":"C","States":{"C":{"Type":"Choice","Choices":[{"Variable":"$.n[","NumericEquals":1,"Next":"P"}]},"P":{"Type":"Pass","End":true}}}"#;
1640 assert!(validate_definition(def).is_err());
1641
1642 let def =
1644 r#"{"StartAt":"P","States":{"P":{"Type":"Pass","InputPath":"foo.bar","End":true}}}"#;
1645 assert!(validate_definition(def).is_err());
1646
1647 let def =
1649 r#"{"StartAt":"P","States":{"P":{"Type":"Pass","ResultPath":"$.x[]","End":true}}}"#;
1650 assert!(validate_definition(def).is_err());
1651 }
1652
1653 #[test]
1654 fn test_validate_definition_accepts_well_formed_paths() {
1655 let def = r#"{"StartAt":"P","States":{
1658 "P":{"Type":"Pass","InputPath":"$.a.b[0].c","OutputPath":"$","ResultPath":"$.out","Next":"C"},
1659 "C":{"Type":"Choice","Choices":[
1660 {"And":[{"Variable":"$.items[2]","NumericEquals":1},{"Variable":"$.flag","BooleanEquals":true}],"Next":"S"}
1661 ],"Default":"S"},
1662 "S":{"Type":"Succeed","InputPath":"null"}
1663 }}"#;
1664 assert!(validate_definition(def).is_ok());
1665 }
1666
1667 #[test]
1668 fn test_is_valid_reference_path() {
1669 assert!(is_valid_reference_path("$"));
1670 assert!(is_valid_reference_path("$.foo"));
1671 assert!(is_valid_reference_path("$.foo.bar[3].baz"));
1672 assert!(is_valid_reference_path("$[0]"));
1673 assert!(!is_valid_reference_path("$.arr["));
1674 assert!(!is_valid_reference_path("$.x[\u{00e9}"));
1675 assert!(!is_valid_reference_path("$.x[]"));
1676 assert!(!is_valid_reference_path("$.x[abc]"));
1677 assert!(!is_valid_reference_path("foo.bar"));
1678 assert!(!is_valid_reference_path(""));
1679 }
1680
1681 #[test]
1682 fn test_validate_arn() {
1683 assert!(validate_arn("arn:aws:states:us-east-1:123:sm:test").is_ok());
1684 assert!(validate_arn("not-an-arn").is_err());
1685 }
1686
1687 #[test]
1688 fn test_camel_to_details_key() {
1689 assert_eq!(camel_to_details_key("PassStateEntered"), "stateEntered");
1691 assert_eq!(camel_to_details_key("TaskStateEntered"), "stateEntered");
1692 assert_eq!(camel_to_details_key("ChoiceStateExited"), "stateExited");
1693 assert_eq!(camel_to_details_key("MapStateExited"), "stateExited");
1694 assert_eq!(camel_to_details_key("TaskScheduled"), "taskScheduled");
1696 assert_eq!(camel_to_details_key("ExecutionStarted"), "executionStarted");
1697 assert_eq!(camel_to_details_key(""), "");
1698 }
1699
1700 #[test]
1701 fn test_is_mutating_action() {
1702 assert!(is_mutating_action("CreateStateMachine"));
1703 assert!(is_mutating_action("StartExecution"));
1704 assert!(!is_mutating_action("DescribeStateMachine"));
1705 assert!(!is_mutating_action("ListStateMachines"));
1706 }
1707
1708 fn create_express_sm(svc: &StepFunctionsService, name: &str) -> String {
1711 let body = json!({
1712 "name": name,
1713 "definition": VALID_DEF,
1714 "roleArn": "arn:aws:iam::123456789012:role/test",
1715 "type": "EXPRESS",
1716 });
1717 let req = make_request("CreateStateMachine", &body.to_string());
1718 let resp = svc.create_state_machine(&req).unwrap();
1719 let b = body_json(&resp);
1720 b["stateMachineArn"].as_str().unwrap().to_string()
1721 }
1722
1723 #[tokio::test]
1724 async fn start_sync_execution_basic() {
1725 let svc = StepFunctionsService::new(make_state());
1726 let arn = create_express_sm(&svc, "sync-sm");
1727
1728 let body = json!({
1729 "stateMachineArn": arn,
1730 "input": r#"{"key":"value"}"#,
1731 });
1732 let req = make_request("StartSyncExecution", &body.to_string());
1733 let resp = svc.start_sync_execution(&req).await.unwrap();
1734 let b = body_json(&resp);
1735 assert!(b["executionArn"]
1736 .as_str()
1737 .unwrap()
1738 .contains("express:sync-sm"));
1739 assert_eq!(b["stateMachineArn"], arn);
1740 assert_eq!(b["status"], "SUCCEEDED");
1741 assert!(b["startDate"].as_i64().is_some());
1742 assert!(b["stopDate"].as_i64().is_some());
1743 assert!(b["output"].as_str().is_some());
1744 assert!(b["billingDetails"]["billedDurationInMilliseconds"]
1745 .as_i64()
1746 .is_some());
1747 }
1748
1749 #[tokio::test]
1750 async fn start_sync_execution_not_express() {
1751 let svc = StepFunctionsService::new(make_state());
1752 let arn = create_sm(&svc, "std-sm");
1753
1754 let body = json!({"stateMachineArn": arn});
1755 let req = make_request("StartSyncExecution", &body.to_string());
1756 let err = expect_err(svc.start_sync_execution(&req).await);
1757 assert!(err.to_string().contains("StateMachineTypeNotSupported"));
1758 }
1759
1760 #[tokio::test]
1761 async fn start_sync_execution_sm_not_found() {
1762 let svc = StepFunctionsService::new(make_state());
1763 let body = json!({
1764 "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1765 });
1766 let req = make_request("StartSyncExecution", &body.to_string());
1767 let err = expect_err(svc.start_sync_execution(&req).await);
1768 assert!(err.to_string().contains("StateMachineDoesNotExist"));
1769 }
1770
1771 #[tokio::test]
1772 async fn start_sync_execution_records_introspection_fields() {
1773 let svc = StepFunctionsService::new(make_state());
1774 let arn = create_express_sm(&svc, "sync-introspect");
1775
1776 let body = json!({"stateMachineArn": arn, "input": "{}"});
1777 let req = make_request("StartSyncExecution", &body.to_string());
1778 let resp = svc.start_sync_execution(&req).await.unwrap();
1779 let b = body_json(&resp);
1780 let exec_arn = b["executionArn"].as_str().unwrap().to_string();
1781
1782 let accounts = svc.state.read();
1783 let state = accounts.get("123456789012").unwrap();
1784 let stored = state
1785 .executions
1786 .get(&exec_arn)
1787 .expect("sync execution should be persisted for introspection");
1788 assert!(stored.is_sync, "sync executions must be marked is_sync");
1789 assert_eq!(stored.billed_memory_mb, Some(64));
1790 assert!(
1791 stored.billed_duration_ms.is_some(),
1792 "billed_duration_ms must be populated after sync run"
1793 );
1794 assert!(
1795 stored.parent_execution_arn.is_none(),
1796 "top-level sync execution has no parent"
1797 );
1798 }
1799
1800 #[tokio::test]
1801 async fn start_sync_execution_invalid_input() {
1802 let svc = StepFunctionsService::new(make_state());
1803 let arn = create_express_sm(&svc, "bad-input-sync");
1804
1805 let body = json!({
1806 "stateMachineArn": arn,
1807 "input": "not json",
1808 });
1809 let req = make_request("StartSyncExecution", &body.to_string());
1810 let err = expect_err(svc.start_sync_execution(&req).await);
1811 assert!(err.to_string().contains("InvalidExecutionInput"));
1812 }
1813
1814 #[test]
1816 fn snapshot_hook_is_none_without_store() {
1817 let svc = StepFunctionsService::new(make_state());
1818 assert!(svc.snapshot_hook().is_none());
1819 }
1820
1821 #[tokio::test]
1825 async fn snapshot_hook_fires_with_store() {
1826 let store: Arc<dyn fakecloud_persistence::SnapshotStore> =
1827 Arc::new(fakecloud_persistence::MemorySnapshotStore::new());
1828 let svc = StepFunctionsService::new(make_state()).with_snapshot_store(store);
1829 let hook = svc
1830 .snapshot_hook()
1831 .expect("hook present when a store is set");
1832 hook().await;
1833 }
1834
1835 fn make_execution(arn: &str, status: ExecutionStatus) -> Execution {
1836 Execution {
1837 execution_arn: arn.to_string(),
1838 state_machine_arn: "arn:aws:states:us-east-1:123456789012:stateMachine:sm".to_string(),
1839 state_machine_name: "sm".to_string(),
1840 name: arn.to_string(),
1841 status,
1842 input: None,
1843 output: None,
1844 start_date: Utc::now(),
1845 stop_date: None,
1846 error: None,
1847 cause: None,
1848 history_events: vec![],
1849 parent_execution_arn: None,
1850 is_sync: false,
1851 billed_duration_ms: None,
1852 billed_memory_mb: None,
1853 }
1854 }
1855
1856 #[test]
1857 fn reconcile_aborts_running_executions_on_restart() {
1858 let state = make_state();
1862 {
1863 let mut accounts = state.write();
1864 let s = accounts.get_or_create("123456789012");
1865 s.executions.insert(
1866 "running".into(),
1867 make_execution("running", ExecutionStatus::Running),
1868 );
1869 s.executions.insert(
1870 "done".into(),
1871 make_execution("done", ExecutionStatus::Succeeded),
1872 );
1873 }
1874
1875 let n = reconcile_interrupted_executions(&state);
1876 assert_eq!(n, 1, "only the RUNNING execution is reconciled");
1877
1878 let accounts = state.read();
1879 let s = accounts.get("123456789012").unwrap();
1880 let running = &s.executions["running"];
1881 assert_eq!(running.status, ExecutionStatus::Aborted);
1882 assert!(running.stop_date.is_some());
1883 assert_eq!(running.error.as_deref(), Some("Fakecloud.Restart"));
1884 assert_eq!(s.executions["done"].status, ExecutionStatus::Succeeded);
1885 }
1886}
1887
1888#[cfg(test)]
1889mod pagination_reject_test {
1890 #[test]
1891 fn paginate_checked_rejects_invalid_token() {
1892 use fakecloud_core::pagination::paginate_checked;
1893 let items: Vec<i32> = (0..5).collect();
1894 assert!(paginate_checked(&items, Some("bad"), 3).is_err());
1895 assert!(paginate_checked(&items, Some("2"), 3).is_ok());
1896 }
1897}