1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use chrono::Utc;
6use http::StatusCode;
7use serde_json::{json, Value};
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::pagination::paginate_checked;
12use fakecloud_core::registry::ServiceRegistry;
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_core::validation::*;
15use fakecloud_dynamodb::SharedDynamoDbState;
16use fakecloud_persistence::SnapshotStore;
17
18use crate::interpreter;
19use crate::state::{
20 Execution, ExecutionStatus, SharedStepFunctionsState, StateMachine, StateMachineStatus,
21 StateMachineType, StepFunctionsSnapshot, StepFunctionsState,
22 STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
23};
24
25const SUPPORTED: &[&str] = &[
26 "CreateStateMachine",
27 "DescribeStateMachine",
28 "ListStateMachines",
29 "DeleteStateMachine",
30 "UpdateStateMachine",
31 "TagResource",
32 "UntagResource",
33 "ListTagsForResource",
34 "StartExecution",
35 "StopExecution",
36 "DescribeExecution",
37 "ListExecutions",
38 "GetExecutionHistory",
39 "DescribeStateMachineForExecution",
40 "CreateActivity",
41 "DeleteActivity",
42 "DescribeActivity",
43 "ListActivities",
44 "GetActivityTask",
45 "SendTaskFailure",
46 "SendTaskHeartbeat",
47 "SendTaskSuccess",
48 "PublishStateMachineVersion",
49 "DeleteStateMachineVersion",
50 "ListStateMachineVersions",
51 "CreateStateMachineAlias",
52 "DeleteStateMachineAlias",
53 "DescribeStateMachineAlias",
54 "ListStateMachineAliases",
55 "UpdateStateMachineAlias",
56 "DescribeMapRun",
57 "ListMapRuns",
58 "UpdateMapRun",
59 "RedriveExecution",
60 "StartSyncExecution",
61 "TestState",
62 "ValidateStateMachineDefinition",
63];
64
65pub type SharedServiceRegistry = Arc<std::sync::OnceLock<Arc<ServiceRegistry>>>;
71
72pub struct StepFunctionsService {
73 state: SharedStepFunctionsState,
74 delivery: Option<Arc<DeliveryBus>>,
75 dynamodb_state: Option<SharedDynamoDbState>,
76 registry: Option<SharedServiceRegistry>,
77 snapshot_store: Option<Arc<dyn SnapshotStore>>,
78 snapshot_lock: Arc<AsyncMutex<()>>,
79}
80
81mod activities;
82mod executions;
83mod map_runs;
84mod state_machines;
85mod tags;
86mod tasks;
87mod validation;
88
89impl StepFunctionsService {
90 pub fn new(state: SharedStepFunctionsState) -> Self {
91 Self {
92 state,
93 delivery: None,
94 dynamodb_state: None,
95 registry: None,
96 snapshot_store: None,
97 snapshot_lock: Arc::new(AsyncMutex::new(())),
98 }
99 }
100
101 pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
102 self.delivery = Some(delivery);
103 self
104 }
105
106 pub fn with_dynamodb(mut self, dynamodb_state: SharedDynamoDbState) -> Self {
107 self.dynamodb_state = Some(dynamodb_state);
108 self
109 }
110
111 pub fn with_registry(mut self, registry: SharedServiceRegistry) -> Self {
116 self.registry = Some(registry);
117 self
118 }
119
120 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
121 self.snapshot_store = Some(store);
122 self
123 }
124
125 async fn save_snapshot(&self) {
126 save_stepfunctions_snapshot(
127 &self.state,
128 self.snapshot_store.clone(),
129 &self.snapshot_lock,
130 )
131 .await;
132 }
133
134 pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
140 let store = self.snapshot_store.clone()?;
141 let state = self.state.clone();
142 let lock = self.snapshot_lock.clone();
143 Some(Arc::new(move || {
144 let state = state.clone();
145 let store = store.clone();
146 let lock = lock.clone();
147 Box::pin(async move {
148 save_stepfunctions_snapshot(&state, Some(store), &lock).await;
149 })
150 }))
151 }
152}
153
154pub async fn save_stepfunctions_snapshot(
160 state: &SharedStepFunctionsState,
161 store: Option<Arc<dyn SnapshotStore>>,
162 lock: &AsyncMutex<()>,
163) {
164 let Some(store) = store else {
165 return;
166 };
167 let _guard = lock.lock().await;
168 let snapshot = StepFunctionsSnapshot {
169 schema_version: STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
170 state: None,
171 accounts: Some(state.read().clone()),
172 };
173 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
174 let bytes = serde_json::to_vec(&snapshot)
175 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
176 store.save(&bytes)
177 })
178 .await;
179 match join {
180 Ok(Ok(())) => {}
181 Ok(Err(err)) => tracing::error!(%err, "failed to write stepfunctions snapshot"),
182 Err(err) => tracing::error!(%err, "stepfunctions snapshot task panicked"),
183 }
184}
185
186pub fn reconcile_interrupted_executions(state: &SharedStepFunctionsState) -> usize {
196 let now = Utc::now();
197 let mut count = 0;
198 let mut accounts = state.write();
199 let account_ids: Vec<String> = accounts.iter().map(|(id, _)| id.to_string()).collect();
200 for account_id in account_ids {
201 let Some(s) = accounts.get_mut(&account_id) else {
202 continue;
203 };
204 for exec in s.executions.values_mut() {
205 if matches!(
206 exec.status,
207 ExecutionStatus::Running | ExecutionStatus::PendingRedrive
208 ) {
209 exec.status = ExecutionStatus::Aborted;
210 exec.stop_date = Some(now);
211 exec.error = Some("Fakecloud.Restart".to_string());
212 exec.cause = Some(
213 "Execution was interrupted by a fakecloud restart and cannot be resumed"
214 .to_string(),
215 );
216 count += 1;
217 }
218 }
219 }
220 count
221}
222
223fn is_mutating_action(action: &str) -> bool {
224 matches!(
225 action,
226 "CreateStateMachine"
227 | "DeleteStateMachine"
228 | "UpdateStateMachine"
229 | "TagResource"
230 | "UntagResource"
231 | "StartExecution"
232 | "StopExecution"
233 | "CreateActivity"
234 | "DeleteActivity"
235 | "GetActivityTask"
236 | "SendTaskFailure"
237 | "SendTaskHeartbeat"
238 | "SendTaskSuccess"
239 | "PublishStateMachineVersion"
240 | "DeleteStateMachineVersion"
241 | "CreateStateMachineAlias"
242 | "DeleteStateMachineAlias"
243 | "UpdateStateMachineAlias"
244 | "UpdateMapRun"
245 | "RedriveExecution"
246 | "StartSyncExecution"
247 )
248}
249
250#[async_trait]
251impl AwsService for StepFunctionsService {
252 fn service_name(&self) -> &str {
253 "states"
254 }
255
256 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
257 let mutates = is_mutating_action(req.action.as_str());
258 let result = match req.action.as_str() {
259 "CreateStateMachine" => self.create_state_machine(&req),
260 "DescribeStateMachine" => self.describe_state_machine(&req),
261 "ListStateMachines" => self.list_state_machines(&req),
262 "DeleteStateMachine" => self.delete_state_machine(&req),
263 "UpdateStateMachine" => self.update_state_machine(&req),
264 "TagResource" => self.tag_resource(&req),
265 "UntagResource" => self.untag_resource(&req),
266 "ListTagsForResource" => self.list_tags_for_resource(&req),
267 "StartExecution" => self.start_execution(&req),
268 "StopExecution" => self.stop_execution(&req),
269 "DescribeExecution" => self.describe_execution(&req),
270 "ListExecutions" => self.list_executions(&req),
271 "GetExecutionHistory" => self.get_execution_history(&req),
272 "DescribeStateMachineForExecution" => self.describe_state_machine_for_execution(&req),
273 "CreateActivity" => self.create_activity(&req),
274 "DeleteActivity" => self.delete_activity(&req),
275 "DescribeActivity" => self.describe_activity(&req),
276 "ListActivities" => self.list_activities(&req),
277 "GetActivityTask" => self.get_activity_task(&req).await,
278 "SendTaskFailure" => self.send_task_failure(&req),
279 "SendTaskHeartbeat" => self.send_task_heartbeat(&req),
280 "SendTaskSuccess" => self.send_task_success(&req),
281 "PublishStateMachineVersion" => self.publish_state_machine_version(&req),
282 "DeleteStateMachineVersion" => self.delete_state_machine_version(&req),
283 "ListStateMachineVersions" => self.list_state_machine_versions(&req),
284 "CreateStateMachineAlias" => self.create_state_machine_alias(&req),
285 "DeleteStateMachineAlias" => self.delete_state_machine_alias(&req),
286 "DescribeStateMachineAlias" => self.describe_state_machine_alias(&req),
287 "ListStateMachineAliases" => self.list_state_machine_aliases(&req),
288 "UpdateStateMachineAlias" => self.update_state_machine_alias(&req),
289 "DescribeMapRun" => self.describe_map_run(&req),
290 "ListMapRuns" => self.list_map_runs(&req),
291 "UpdateMapRun" => self.update_map_run(&req),
292 "RedriveExecution" => self.redrive_execution(&req),
293 "StartSyncExecution" => self.start_sync_execution(&req).await,
294 "TestState" => self.test_state(&req),
295 "ValidateStateMachineDefinition" => self.validate_state_machine_definition(&req),
296 _ => Err(AwsServiceError::action_not_implemented(
297 "states",
298 &req.action,
299 )),
300 };
301 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
302 self.save_snapshot().await;
303 }
304 result
305 }
306
307 fn supported_actions(&self) -> &[&str] {
308 SUPPORTED
309 }
310}
311
312impl StepFunctionsService {
313 fn list_activities(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
314 let body = req.json_body();
315 let raw_max_results = body["maxResults"].as_i64();
316 if let Some(mr) = raw_max_results {
317 validate_max_results(mr)?;
318 }
319 let next_token = body["nextToken"].as_str();
320 if let Some(t) = next_token {
321 validate_page_token(t)?;
322 }
323 let max_results = match raw_max_results.unwrap_or(0) {
327 0 => 100,
328 n => n as usize,
329 };
330 let accounts = self.state.read();
331 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
332 let state = accounts.get(&req.account_id).unwrap_or(&empty);
333 let mut activities: Vec<&crate::state::Activity> = state.activities.values().collect();
334 activities.sort_by(|a, b| a.name.cmp(&b.name));
335 let items: Vec<Value> = activities
336 .iter()
337 .map(|a| {
338 json!({
339 "activityArn": a.arn,
340 "name": a.name,
341 "creationDate": a.creation_date.timestamp(),
342 })
343 })
344 .collect();
345 let (page, token) =
346 paginate_checked(&items, next_token, max_results).map_err(|_| invalid_token())?;
347 let mut resp = json!({ "activities": page });
348 if let Some(t) = token {
349 resp["nextToken"] = json!(t);
350 }
351 Ok(AwsResponse::ok_json(resp))
352 }
353}
354
355fn state_machine_alias_to_json(alias: &crate::state::StateMachineAlias) -> Value {
356 json!({
357 "stateMachineAliasArn": alias.arn,
358 "name": alias.name,
359 "description": alias.description,
360 "routingConfiguration": alias.routing_configuration.iter().map(|r| json!({
361 "stateMachineVersionArn": r.state_machine_version_arn,
362 "weight": r.weight,
363 })).collect::<Vec<_>>(),
364 "creationDate": alias.creation_date.timestamp(),
365 "updateDate": alias.update_date.timestamp(),
366 })
367}
368
369fn map_run_to_json(mr: &crate::state::MapRun) -> Value {
370 json!({
371 "mapRunArn": mr.map_run_arn,
372 "executionArn": mr.execution_arn,
373 "maxConcurrency": mr.max_concurrency,
374 "toleratedFailurePercentage": mr.tolerated_failure_percentage,
375 "toleratedFailureCount": mr.tolerated_failure_count,
376 "status": mr.status,
377 "startDate": mr.start_date.timestamp(),
378 "stopDate": mr.stop_date.map(|d| d.timestamp()),
379 })
380}
381
382fn state_machine_to_json(sm: &StateMachine) -> Value {
385 let mut resp = json!({
386 "name": sm.name,
387 "stateMachineArn": sm.arn,
388 "definition": sm.definition,
389 "roleArn": sm.role_arn,
390 "type": sm.machine_type.as_str(),
391 "status": sm.status.as_str(),
392 "creationDate": sm.creation_date.timestamp() as f64,
393 "updateDate": sm.update_date.timestamp() as f64,
394 "revisionId": sm.revision_id,
395 "label": sm.name,
396 });
397
398 if !sm.description.is_empty() {
399 resp["description"] = json!(sm.description);
400 }
401
402 if let Some(ref logging) = sm.logging_configuration {
403 resp["loggingConfiguration"] = logging.clone();
404 } else {
405 resp["loggingConfiguration"] = json!({
406 "level": "OFF",
407 "includeExecutionData": false,
408 "destinations": [],
409 });
410 }
411
412 if let Some(ref tracing) = sm.tracing_configuration {
413 resp["tracingConfiguration"] = tracing.clone();
414 } else {
415 resp["tracingConfiguration"] = json!({
416 "enabled": false,
417 });
418 }
419
420 resp
421}
422
423fn missing(name: &str) -> AwsServiceError {
424 AwsServiceError::aws_error(
425 StatusCode::BAD_REQUEST,
426 "ValidationException",
427 format!("The request must contain the parameter {name}."),
428 )
429}
430
431fn state_machine_not_found(arn: &str) -> AwsServiceError {
432 AwsServiceError::aws_error(
433 StatusCode::BAD_REQUEST,
434 "StateMachineDoesNotExist",
435 format!("State Machine Does Not Exist: '{arn}'"),
436 )
437}
438
439fn activity_not_found(arn: &str) -> AwsServiceError {
440 AwsServiceError::aws_error(
441 StatusCode::BAD_REQUEST,
442 "ActivityDoesNotExist",
443 format!("Activity does not exist: {arn}"),
444 )
445}
446
447fn task_does_not_exist(token: &str) -> AwsServiceError {
448 AwsServiceError::aws_error(
449 StatusCode::BAD_REQUEST,
450 "TaskDoesNotExist",
451 format!("Task does not exist: {token}"),
452 )
453}
454
455fn resource_not_found(arn: &str) -> AwsServiceError {
456 AwsServiceError::aws_error(
457 StatusCode::BAD_REQUEST,
458 "ResourceNotFound",
459 format!("Resource not found: '{arn}'"),
460 )
461}
462
463fn parse_routing_configuration(
468 routes: &[serde_json::Value],
469) -> Result<Vec<crate::state::AliasRoute>, AwsServiceError> {
470 if routes.is_empty() || routes.len() > 2 {
471 return Err(AwsServiceError::aws_error(
472 StatusCode::BAD_REQUEST,
473 "ValidationException",
474 "routingConfiguration must contain 1 or 2 routes.",
475 ));
476 }
477 let parsed: Vec<crate::state::AliasRoute> = routes
478 .iter()
479 .map(|r| {
480 let arn = r["stateMachineVersionArn"].as_str().ok_or_else(|| {
481 AwsServiceError::aws_error(
482 StatusCode::BAD_REQUEST,
483 "ValidationException",
484 "routingConfiguration entries must contain stateMachineVersionArn.",
485 )
486 })?;
487 let weight = r["weight"].as_i64().ok_or_else(|| {
488 AwsServiceError::aws_error(
489 StatusCode::BAD_REQUEST,
490 "ValidationException",
491 "routingConfiguration entries must contain a numeric weight.",
492 )
493 })?;
494 if !(0..=100).contains(&weight) {
495 return Err(AwsServiceError::aws_error(
496 StatusCode::BAD_REQUEST,
497 "ValidationException",
498 format!("Invalid routing weight {weight}; must be 0-100."),
499 ));
500 }
501 Ok(crate::state::AliasRoute {
502 state_machine_version_arn: arn.to_string(),
503 weight: weight as i32,
504 })
505 })
506 .collect::<Result<_, _>>()?;
507 let total: i32 = parsed.iter().map(|r| r.weight).sum();
508 if total != 100 {
509 return Err(AwsServiceError::aws_error(
510 StatusCode::BAD_REQUEST,
511 "ValidationException",
512 format!("routingConfiguration weights must sum to 100, got {total}."),
513 ));
514 }
515 Ok(parsed)
516}
517
518fn validate_name(name: &str) -> Result<(), AwsServiceError> {
519 if name.is_empty() || name.len() > 80 {
520 return Err(AwsServiceError::aws_error(
521 StatusCode::BAD_REQUEST,
522 "InvalidName",
523 format!("Invalid Name: '{name}' (length must be between 1 and 80 characters)"),
524 ));
525 }
526 if !name
528 .chars()
529 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
530 {
531 return Err(AwsServiceError::aws_error(
532 StatusCode::BAD_REQUEST,
533 "InvalidName",
534 format!(
535 "Invalid Name: '{name}' (must only contain alphanumeric characters, hyphens, and underscores)"
536 ),
537 ));
538 }
539 Ok(())
540}
541
542fn validate_definition(definition: &str) -> Result<(), AwsServiceError> {
543 let parsed: Value = serde_json::from_str(definition).map_err(|e| {
544 AwsServiceError::aws_error(
545 StatusCode::BAD_REQUEST,
546 "InvalidDefinition",
547 format!("Invalid State Machine Definition: '{e}'"),
548 )
549 })?;
550
551 if parsed.get("StartAt").and_then(|v| v.as_str()).is_none() {
552 return Err(AwsServiceError::aws_error(
553 StatusCode::BAD_REQUEST,
554 "InvalidDefinition",
555 "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
556 .to_string(),
557 ));
558 }
559
560 let states_obj = parsed
561 .get("States")
562 .and_then(|v| v.as_object())
563 .ok_or_else(|| {
564 AwsServiceError::aws_error(
565 StatusCode::BAD_REQUEST,
566 "InvalidDefinition",
567 "Invalid State Machine Definition: 'MISSING_STATES' (States field is required)"
568 .to_string(),
569 )
570 })?;
571
572 let start_at = parsed["StartAt"].as_str().ok_or_else(|| {
573 AwsServiceError::aws_error(
574 StatusCode::BAD_REQUEST,
575 "InvalidDefinition",
576 "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
577 .to_string(),
578 )
579 })?;
580 if !states_obj.contains_key(start_at) {
581 return Err(AwsServiceError::aws_error(
582 StatusCode::BAD_REQUEST,
583 "InvalidDefinition",
584 format!(
585 "Invalid State Machine Definition: 'MISSING_TRANSITION_TARGET' \
586 (StartAt '{start_at}' does not reference a valid state)"
587 ),
588 ));
589 }
590
591 for (state_name, state_val) in states_obj {
595 validate_state_paths(state_name, state_val)?;
596 }
597
598 Ok(())
599}
600
601fn validate_state_paths(state_name: &str, state: &Value) -> Result<(), AwsServiceError> {
605 for field in ["InputPath", "OutputPath", "ResultPath"] {
606 if let Some(p) = state.get(field).and_then(|v| v.as_str()) {
607 if (field != "ResultPath" && p == "null") || is_valid_reference_path(p) {
611 continue;
612 }
613 return Err(invalid_reference_path(state_name, field, p));
614 }
615 }
616
617 if state.get("Type").and_then(|v| v.as_str()) == Some("Choice") {
618 if let Some(choices) = state.get("Choices").and_then(|v| v.as_array()) {
619 for rule in choices {
620 validate_choice_variables(state_name, rule)?;
621 }
622 }
623 }
624
625 Ok(())
626}
627
628fn validate_choice_variables(state_name: &str, rule: &Value) -> Result<(), AwsServiceError> {
631 if let Some(v) = rule.get("Variable").and_then(|v| v.as_str()) {
632 if !is_valid_reference_path(v) {
633 return Err(invalid_reference_path(state_name, "Variable", v));
634 }
635 }
636 for combinator in ["And", "Or"] {
637 if let Some(nested) = rule.get(combinator).and_then(|v| v.as_array()) {
638 for n in nested {
639 validate_choice_variables(state_name, n)?;
640 }
641 }
642 }
643 if let Some(n) = rule.get("Not") {
644 validate_choice_variables(state_name, n)?;
645 }
646 Ok(())
647}
648
649fn is_valid_reference_path(path: &str) -> bool {
653 if path != "$" && !path.starts_with("$.") && !path.starts_with("$[") {
654 return false;
655 }
656 let body = path
657 .strip_prefix("$.")
658 .or_else(|| path.strip_prefix('$'))
659 .unwrap_or(path);
660 for part in body.split('.') {
661 if !segment_is_valid(part) {
662 return false;
663 }
664 }
665 true
666}
667
668fn segment_is_valid(part: &str) -> bool {
669 match part.find('[') {
670 None => !part.contains(']'),
671 Some(open) => {
672 if !part.ends_with(']') {
673 return false;
674 }
675 let inner = &part[open + 1..part.len() - 1];
676 !inner.is_empty() && inner.parse::<usize>().is_ok()
678 }
679 }
680}
681
682fn invalid_reference_path(state_name: &str, field: &str, path: &str) -> AwsServiceError {
683 AwsServiceError::aws_error(
684 StatusCode::BAD_REQUEST,
685 "InvalidDefinition",
686 format!(
687 "Invalid State Machine Definition: 'SCHEMA_VALIDATION_FAILED' \
688 (The {field} field of state '{state_name}' is not a valid reference path: '{path}')"
689 ),
690 )
691}
692
693fn execution_not_found(arn: &str) -> AwsServiceError {
694 AwsServiceError::aws_error(
695 StatusCode::BAD_REQUEST,
696 "ExecutionDoesNotExist",
697 format!("Execution Does Not Exist: '{arn}'"),
698 )
699}
700
701fn execution_to_json(exec: &Execution) -> Value {
702 let mut resp = json!({
703 "executionArn": exec.execution_arn,
704 "stateMachineArn": exec.state_machine_arn,
705 "name": exec.name,
706 "status": exec.status.as_str(),
707 "startDate": exec.start_date.timestamp() as f64,
708 });
709
710 if let Some(ref input) = exec.input {
711 resp["input"] = json!(input);
712 }
713 if let Some(ref output) = exec.output {
714 resp["output"] = json!(output);
715 }
716 if let Some(stop) = exec.stop_date {
717 resp["stopDate"] = json!(stop.timestamp() as f64);
718 }
719 if let Some(ref error) = exec.error {
720 resp["error"] = json!(error);
721 }
722 if let Some(ref cause) = exec.cause {
723 resp["cause"] = json!(cause);
724 }
725
726 resp
727}
728
729fn camel_to_details_key(event_type: &str) -> String {
731 let mut chars = event_type.chars();
732 match chars.next() {
733 None => String::new(),
734 Some(c) => c.to_lowercase().to_string() + chars.as_str(),
735 }
736}
737
738fn validate_arn(arn: &str) -> Result<(), AwsServiceError> {
739 if !arn.starts_with("arn:") {
740 return Err(AwsServiceError::aws_error(
741 StatusCode::BAD_REQUEST,
742 "InvalidArn",
743 format!("Invalid Arn: '{arn}'"),
744 ));
745 }
746 Ok(())
747}
748
749fn validate_arn_length(field: &str, value: &str, max: usize) -> Result<(), AwsServiceError> {
753 if value.is_empty() || value.len() > max {
754 return Err(AwsServiceError::aws_error(
755 StatusCode::BAD_REQUEST,
756 "InvalidArn",
757 format!("Invalid Arn at '{field}': must be 1..={max} characters"),
758 ));
759 }
760 Ok(())
761}
762
763pub(super) fn invalid_token() -> AwsServiceError {
767 AwsServiceError::aws_error(StatusCode::BAD_REQUEST, "InvalidToken", "Invalid nextToken")
768}
769
770fn validate_page_token(value: &str) -> Result<(), AwsServiceError> {
774 if value.is_empty() || value.len() > 1024 {
775 return Err(AwsServiceError::aws_error(
776 StatusCode::BAD_REQUEST,
777 "InvalidToken",
778 "nextToken must be 1..=1024 characters",
779 ));
780 }
781 Ok(())
782}
783
784fn validate_max_results(value: i64) -> Result<(), AwsServiceError> {
790 if !(0..=1000).contains(&value) {
791 return Err(AwsServiceError::aws_error(
792 StatusCode::BAD_REQUEST,
793 "InvalidToken",
794 format!("maxResults '{value}' is outside 0..=1000"),
795 ));
796 }
797 Ok(())
798}
799
800pub fn start_execution_from_delivery(
809 state: &SharedStepFunctionsState,
810 delivery: &Option<Arc<DeliveryBus>>,
811 dynamodb_state: &Option<SharedDynamoDbState>,
812 registry: &Option<SharedServiceRegistry>,
813 state_machine_arn: &str,
814 input: &str,
815) {
816 if serde_json::from_str::<serde_json::Value>(input).is_err() {
818 tracing::warn!(
819 state_machine_arn,
820 "Step Functions delivery: invalid JSON input, skipping execution"
821 );
822 return;
823 }
824
825 let execution_name = uuid::Uuid::new_v4().to_string();
826
827 let account_id = state_machine_arn
829 .split(':')
830 .nth(4)
831 .unwrap_or("000000000000")
832 .to_string();
833
834 let mut accounts = state.write();
835 let st = accounts.get_or_create(&account_id);
836 let sm = match st.state_machines.get(state_machine_arn) {
837 Some(sm) => sm,
838 None => {
839 tracing::warn!(
840 state_machine_arn,
841 "Step Functions delivery: state machine not found"
842 );
843 return;
844 }
845 };
846
847 let sm_name = sm.name.clone();
848 let definition = sm.definition.clone();
849 let exec_arn = st.execution_arn(&sm_name, &execution_name);
850
851 let now = Utc::now();
852 let execution = Execution {
853 execution_arn: exec_arn.clone(),
854 state_machine_arn: state_machine_arn.to_string(),
855 state_machine_name: sm_name,
856 name: execution_name,
857 status: ExecutionStatus::Running,
858 input: Some(input.to_string()),
859 output: None,
860 start_date: now,
861 stop_date: None,
862 error: None,
863 cause: None,
864 history_events: vec![],
865 parent_execution_arn: None,
866 is_sync: false,
867 billed_duration_ms: None,
868 billed_memory_mb: None,
869 };
870
871 st.executions.insert(exec_arn.clone(), execution);
872 let logging_config = sm.logging_configuration.clone();
873 drop(accounts);
874
875 let shared_state = state.clone();
876 let delivery = delivery.clone();
877 let dynamodb_state = dynamodb_state.clone();
878 let registry = registry.clone();
879 let input = Some(input.to_string());
880 tokio::spawn(async move {
881 interpreter::execute_state_machine(
882 shared_state,
883 exec_arn,
884 definition,
885 input,
886 delivery,
887 dynamodb_state,
888 registry,
889 logging_config,
890 )
891 .await;
892 });
893}
894
895#[cfg(test)]
896mod tests {
897 use super::*;
898 use http::{HeaderMap, Method};
899 use parking_lot::RwLock;
900 use serde_json::Value;
901 use std::collections::HashMap;
902 use std::sync::Arc;
903
904 fn make_state() -> SharedStepFunctionsState {
905 Arc::new(RwLock::new(
906 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
907 ))
908 }
909
910 fn make_request(action: &str, body: &str) -> AwsRequest {
911 AwsRequest {
912 service: "states".to_string(),
913 action: action.to_string(),
914 region: "us-east-1".to_string(),
915 account_id: "123456789012".to_string(),
916 request_id: "test-id".to_string(),
917 headers: HeaderMap::new(),
918 query_params: HashMap::new(),
919 body: body.as_bytes().to_vec().into(),
920 body_stream: parking_lot::Mutex::new(None),
921 path_segments: vec![],
922 raw_path: "/".to_string(),
923 raw_query: String::new(),
924 method: Method::POST,
925 is_query_protocol: false,
926 access_key_id: None,
927 principal: None,
928 }
929 }
930
931 fn body_json(resp: &AwsResponse) -> Value {
932 serde_json::from_slice(resp.body.expect_bytes()).unwrap()
933 }
934
935 fn expect_err(result: Result<AwsResponse, AwsServiceError>) -> AwsServiceError {
936 match result {
937 Err(e) => e,
938 Ok(_) => panic!("expected error, got Ok"),
939 }
940 }
941
942 const VALID_DEF: &str = r#"{"StartAt":"Pass","States":{"Pass":{"Type":"Pass","End":true}}}"#;
943
944 fn create_sm(svc: &StepFunctionsService, name: &str) -> String {
945 let body = json!({
946 "name": name,
947 "definition": VALID_DEF,
948 "roleArn": "arn:aws:iam::123456789012:role/test",
949 });
950 let req = make_request("CreateStateMachine", &body.to_string());
951 let resp = svc.create_state_machine(&req).unwrap();
952 let b = body_json(&resp);
953 b["stateMachineArn"].as_str().unwrap().to_string()
954 }
955
956 #[test]
959 fn create_state_machine_basic() {
960 let svc = StepFunctionsService::new(make_state());
961 let arn = create_sm(&svc, "test-sm");
962 assert!(arn.contains("test-sm"));
963 }
964
965 #[test]
966 fn create_state_machine_with_express_type() {
967 let svc = StepFunctionsService::new(make_state());
968 let body = json!({
969 "name": "express-sm",
970 "definition": VALID_DEF,
971 "roleArn": "arn:aws:iam::123456789012:role/r",
972 "type": "EXPRESS",
973 });
974 let req = make_request("CreateStateMachine", &body.to_string());
975 let resp = svc.create_state_machine(&req).unwrap();
976 let b = body_json(&resp);
977 assert!(b["stateMachineArn"].as_str().is_some());
978 }
979
980 #[test]
981 fn create_state_machine_duplicate_fails() {
982 let svc = StepFunctionsService::new(make_state());
983 create_sm(&svc, "dup-sm");
984 let body = json!({
985 "name": "dup-sm",
986 "definition": VALID_DEF,
987 "roleArn": "arn:aws:iam::123456789012:role/r",
988 });
989 let req = make_request("CreateStateMachine", &body.to_string());
990 let err = expect_err(svc.create_state_machine(&req));
991 assert!(err.to_string().contains("StateMachineAlreadyExists"));
992 }
993
994 #[test]
995 fn create_state_machine_missing_name() {
996 let svc = StepFunctionsService::new(make_state());
997 let body = json!({
998 "definition": VALID_DEF,
999 "roleArn": "arn:aws:iam::123456789012:role/r",
1000 });
1001 let req = make_request("CreateStateMachine", &body.to_string());
1002 assert!(svc.create_state_machine(&req).is_err());
1003 }
1004
1005 #[test]
1006 fn create_state_machine_invalid_definition() {
1007 let svc = StepFunctionsService::new(make_state());
1008 let body = json!({
1009 "name": "bad-def",
1010 "definition": "not json",
1011 "roleArn": "arn:aws:iam::123456789012:role/r",
1012 });
1013 let req = make_request("CreateStateMachine", &body.to_string());
1014 let err = expect_err(svc.create_state_machine(&req));
1015 assert!(err.to_string().contains("InvalidDefinition"));
1016 }
1017
1018 #[test]
1019 fn create_state_machine_definition_missing_start_at() {
1020 let svc = StepFunctionsService::new(make_state());
1021 let body = json!({
1022 "name": "no-start",
1023 "definition": r#"{"States":{"S":{"Type":"Pass","End":true}}}"#,
1024 "roleArn": "arn:aws:iam::123456789012:role/r",
1025 });
1026 let req = make_request("CreateStateMachine", &body.to_string());
1027 let err = expect_err(svc.create_state_machine(&req));
1028 assert!(err.to_string().contains("InvalidDefinition"));
1029 }
1030
1031 #[test]
1032 fn create_state_machine_definition_missing_states() {
1033 let svc = StepFunctionsService::new(make_state());
1034 let body = json!({
1035 "name": "no-states",
1036 "definition": r#"{"StartAt":"S"}"#,
1037 "roleArn": "arn:aws:iam::123456789012:role/r",
1038 });
1039 let req = make_request("CreateStateMachine", &body.to_string());
1040 let err = expect_err(svc.create_state_machine(&req));
1041 assert!(err.to_string().contains("InvalidDefinition"));
1042 }
1043
1044 #[test]
1045 fn create_state_machine_definition_start_at_not_in_states() {
1046 let svc = StepFunctionsService::new(make_state());
1047 let body = json!({
1048 "name": "bad-start",
1049 "definition": r#"{"StartAt":"Missing","States":{"S":{"Type":"Pass","End":true}}}"#,
1050 "roleArn": "arn:aws:iam::123456789012:role/r",
1051 });
1052 let req = make_request("CreateStateMachine", &body.to_string());
1053 let err = expect_err(svc.create_state_machine(&req));
1054 assert!(err.to_string().contains("MISSING_TRANSITION_TARGET"));
1055 }
1056
1057 #[test]
1058 fn create_state_machine_invalid_type() {
1059 let svc = StepFunctionsService::new(make_state());
1060 let body = json!({
1061 "name": "bad-type",
1062 "definition": VALID_DEF,
1063 "roleArn": "arn:aws:iam::123456789012:role/r",
1064 "type": "INVALID",
1065 });
1066 let req = make_request("CreateStateMachine", &body.to_string());
1067 assert!(svc.create_state_machine(&req).is_err());
1068 }
1069
1070 #[test]
1071 fn create_state_machine_invalid_arn() {
1072 let svc = StepFunctionsService::new(make_state());
1073 let body = json!({
1074 "name": "bad-arn",
1075 "definition": VALID_DEF,
1076 "roleArn": "not-an-arn",
1077 });
1078 let req = make_request("CreateStateMachine", &body.to_string());
1079 let err = expect_err(svc.create_state_machine(&req));
1080 assert!(err.to_string().contains("InvalidArn"));
1081 }
1082
1083 #[test]
1084 fn create_state_machine_invalid_name() {
1085 let svc = StepFunctionsService::new(make_state());
1086 let body = json!({
1087 "name": "has spaces!",
1088 "definition": VALID_DEF,
1089 "roleArn": "arn:aws:iam::123456789012:role/r",
1090 });
1091 let req = make_request("CreateStateMachine", &body.to_string());
1092 let err = expect_err(svc.create_state_machine(&req));
1093 assert!(err.to_string().contains("InvalidName"));
1094 }
1095
1096 #[test]
1097 fn create_state_machine_name_too_long() {
1098 let svc = StepFunctionsService::new(make_state());
1099 let long_name = "a".repeat(81);
1100 let body = json!({
1101 "name": long_name,
1102 "definition": VALID_DEF,
1103 "roleArn": "arn:aws:iam::123456789012:role/r",
1104 });
1105 let req = make_request("CreateStateMachine", &body.to_string());
1106 let err = expect_err(svc.create_state_machine(&req));
1107 assert!(err.to_string().contains("InvalidName"));
1108 }
1109
1110 #[test]
1113 fn describe_state_machine_found() {
1114 let svc = StepFunctionsService::new(make_state());
1115 let arn = create_sm(&svc, "desc-sm");
1116
1117 let req = make_request(
1118 "DescribeStateMachine",
1119 &json!({"stateMachineArn": arn}).to_string(),
1120 );
1121 let resp = svc.describe_state_machine(&req).unwrap();
1122 let b = body_json(&resp);
1123 assert_eq!(b["name"], "desc-sm");
1124 assert_eq!(b["status"], "ACTIVE");
1125 assert!(b["definition"].as_str().is_some());
1126 }
1127
1128 #[test]
1129 fn describe_state_machine_not_found() {
1130 let svc = StepFunctionsService::new(make_state());
1131 let req = make_request(
1132 "DescribeStateMachine",
1133 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1134 .to_string(),
1135 );
1136 let err = expect_err(svc.describe_state_machine(&req));
1137 assert!(err.to_string().contains("StateMachineDoesNotExist"));
1138 }
1139
1140 #[test]
1143 fn list_state_machines_empty() {
1144 let svc = StepFunctionsService::new(make_state());
1145 let req = make_request("ListStateMachines", "{}");
1146 let resp = svc.list_state_machines(&req).unwrap();
1147 let b = body_json(&resp);
1148 assert!(b["stateMachines"].as_array().unwrap().is_empty());
1149 }
1150
1151 #[test]
1152 fn list_state_machines_returns_created() {
1153 let svc = StepFunctionsService::new(make_state());
1154 create_sm(&svc, "sm-1");
1155 create_sm(&svc, "sm-2");
1156
1157 let req = make_request("ListStateMachines", "{}");
1158 let resp = svc.list_state_machines(&req).unwrap();
1159 let b = body_json(&resp);
1160 assert_eq!(b["stateMachines"].as_array().unwrap().len(), 2);
1161 }
1162
1163 #[test]
1166 fn delete_state_machine() {
1167 let svc = StepFunctionsService::new(make_state());
1168 let arn = create_sm(&svc, "del-sm");
1169
1170 let req = make_request(
1171 "DeleteStateMachine",
1172 &json!({"stateMachineArn": arn}).to_string(),
1173 );
1174 svc.delete_state_machine(&req).unwrap();
1175
1176 let req = make_request(
1178 "DescribeStateMachine",
1179 &json!({"stateMachineArn": arn}).to_string(),
1180 );
1181 assert!(svc.describe_state_machine(&req).is_err());
1182 }
1183
1184 #[test]
1185 fn delete_state_machine_nonexistent_succeeds() {
1186 let svc = StepFunctionsService::new(make_state());
1187 let req = make_request(
1188 "DeleteStateMachine",
1189 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1190 .to_string(),
1191 );
1192 svc.delete_state_machine(&req).unwrap();
1194 }
1195
1196 #[test]
1199 fn update_state_machine() {
1200 let svc = StepFunctionsService::new(make_state());
1201 let arn = create_sm(&svc, "upd-sm");
1202
1203 let new_def = r#"{"StartAt":"NewPass","States":{"NewPass":{"Type":"Pass","End":true}}}"#;
1204 let body = json!({
1205 "stateMachineArn": arn,
1206 "definition": new_def,
1207 "description": "updated",
1208 });
1209 let req = make_request("UpdateStateMachine", &body.to_string());
1210 let resp = svc.update_state_machine(&req).unwrap();
1211 let b = body_json(&resp);
1212 assert!(b["updateDate"].as_f64().is_some());
1213
1214 let req = make_request(
1216 "DescribeStateMachine",
1217 &json!({"stateMachineArn": arn}).to_string(),
1218 );
1219 let resp = svc.describe_state_machine(&req).unwrap();
1220 let b = body_json(&resp);
1221 assert!(b["definition"].as_str().unwrap().contains("NewPass"));
1222 assert_eq!(b["description"], "updated");
1223 }
1224
1225 #[test]
1226 fn update_state_machine_not_found() {
1227 let svc = StepFunctionsService::new(make_state());
1228 let body = json!({
1229 "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1230 "definition": VALID_DEF,
1231 });
1232 let req = make_request("UpdateStateMachine", &body.to_string());
1233 let err = expect_err(svc.update_state_machine(&req));
1234 assert!(err.to_string().contains("StateMachineDoesNotExist"));
1235 }
1236
1237 #[tokio::test]
1240 async fn start_execution_basic() {
1241 let svc = StepFunctionsService::new(make_state());
1242 let arn = create_sm(&svc, "exec-sm");
1243
1244 let body = json!({
1245 "stateMachineArn": arn,
1246 "input": r#"{"key":"value"}"#,
1247 });
1248 let req = make_request("StartExecution", &body.to_string());
1249 let resp = svc.start_execution(&req).unwrap();
1250 let b = body_json(&resp);
1251 assert!(b["executionArn"].as_str().is_some());
1252 assert!(b["startDate"].as_f64().is_some());
1253 }
1254
1255 #[tokio::test]
1256 async fn start_execution_with_name() {
1257 let svc = StepFunctionsService::new(make_state());
1258 let arn = create_sm(&svc, "named-exec");
1259
1260 let body = json!({
1261 "stateMachineArn": arn,
1262 "name": "my-execution",
1263 });
1264 let req = make_request("StartExecution", &body.to_string());
1265 let resp = svc.start_execution(&req).unwrap();
1266 let b = body_json(&resp);
1267 assert!(b["executionArn"].as_str().unwrap().contains("my-execution"));
1268 }
1269
1270 #[tokio::test]
1271 async fn start_execution_sm_not_found() {
1272 let svc = StepFunctionsService::new(make_state());
1273 let body = json!({
1274 "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1275 });
1276 let req = make_request("StartExecution", &body.to_string());
1277 let err = expect_err(svc.start_execution(&req));
1278 assert!(err.to_string().contains("StateMachineDoesNotExist"));
1279 }
1280
1281 #[tokio::test]
1282 async fn start_execution_invalid_input() {
1283 let svc = StepFunctionsService::new(make_state());
1284 let arn = create_sm(&svc, "bad-input");
1285
1286 let body = json!({
1287 "stateMachineArn": arn,
1288 "input": "not json",
1289 });
1290 let req = make_request("StartExecution", &body.to_string());
1291 let err = expect_err(svc.start_execution(&req));
1292 assert!(err.to_string().contains("InvalidExecutionInput"));
1293 }
1294
1295 #[tokio::test]
1296 async fn start_execution_duplicate_name() {
1297 let svc = StepFunctionsService::new(make_state());
1298 let arn = create_sm(&svc, "dup-exec");
1299
1300 let body = json!({
1301 "stateMachineArn": arn,
1302 "name": "same-name",
1303 });
1304 let req = make_request("StartExecution", &body.to_string());
1305 svc.start_execution(&req).unwrap();
1306
1307 let req = make_request("StartExecution", &body.to_string());
1308 let err = expect_err(svc.start_execution(&req));
1309 assert!(err.to_string().contains("ExecutionAlreadyExists"));
1310 }
1311
1312 #[tokio::test]
1315 async fn describe_execution_found() {
1316 let svc = StepFunctionsService::new(make_state());
1317 let sm_arn = create_sm(&svc, "desc-exec");
1318
1319 let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
1320 let req = make_request("StartExecution", &body.to_string());
1321 let resp = svc.start_execution(&req).unwrap();
1322 let exec_arn = body_json(&resp)["executionArn"]
1323 .as_str()
1324 .unwrap()
1325 .to_string();
1326
1327 let req = make_request(
1328 "DescribeExecution",
1329 &json!({"executionArn": exec_arn}).to_string(),
1330 );
1331 let resp = svc.describe_execution(&req).unwrap();
1332 let b = body_json(&resp);
1333 assert_eq!(b["name"], "e1");
1334 assert_eq!(b["status"], "RUNNING");
1335 }
1336
1337 #[tokio::test]
1338 async fn describe_execution_not_found() {
1339 let svc = StepFunctionsService::new(make_state());
1340 let req = make_request(
1341 "DescribeExecution",
1342 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1343 .to_string(),
1344 );
1345 let err = expect_err(svc.describe_execution(&req));
1346 assert!(err.to_string().contains("ExecutionDoesNotExist"));
1347 }
1348
1349 #[tokio::test]
1352 async fn stop_execution() {
1353 let svc = StepFunctionsService::new(make_state());
1354 let sm_arn = create_sm(&svc, "stop-sm");
1355
1356 let body = json!({"stateMachineArn": sm_arn, "name": "stop-e"});
1357 let req = make_request("StartExecution", &body.to_string());
1358 let resp = svc.start_execution(&req).unwrap();
1359 let exec_arn = body_json(&resp)["executionArn"]
1360 .as_str()
1361 .unwrap()
1362 .to_string();
1363
1364 let body = json!({
1365 "executionArn": exec_arn,
1366 "error": "UserAborted",
1367 "cause": "test stop",
1368 });
1369 let req = make_request("StopExecution", &body.to_string());
1370 let resp = svc.stop_execution(&req).unwrap();
1371 let b = body_json(&resp);
1372 assert!(b["stopDate"].as_f64().is_some());
1373
1374 let req = make_request(
1376 "DescribeExecution",
1377 &json!({"executionArn": exec_arn}).to_string(),
1378 );
1379 let resp = svc.describe_execution(&req).unwrap();
1380 let b = body_json(&resp);
1381 assert_eq!(b["status"], "ABORTED");
1382 assert_eq!(b["error"], "UserAborted");
1383 }
1384
1385 #[tokio::test]
1386 async fn stop_execution_not_found() {
1387 let svc = StepFunctionsService::new(make_state());
1388 let req = make_request(
1389 "StopExecution",
1390 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1391 .to_string(),
1392 );
1393 let err = expect_err(svc.stop_execution(&req));
1394 assert!(err.to_string().contains("ExecutionDoesNotExist"));
1395 }
1396
1397 #[tokio::test]
1400 async fn list_executions() {
1401 let svc = StepFunctionsService::new(make_state());
1402 let sm_arn = create_sm(&svc, "list-exec");
1403
1404 for i in 0..3 {
1405 let body = json!({"stateMachineArn": sm_arn, "name": format!("e{i}")});
1406 let req = make_request("StartExecution", &body.to_string());
1407 svc.start_execution(&req).unwrap();
1408 }
1409
1410 let req = make_request(
1411 "ListExecutions",
1412 &json!({"stateMachineArn": sm_arn}).to_string(),
1413 );
1414 let resp = svc.list_executions(&req).unwrap();
1415 let b = body_json(&resp);
1416 assert_eq!(b["executions"].as_array().unwrap().len(), 3);
1417 }
1418
1419 #[tokio::test]
1420 async fn list_executions_sm_not_found() {
1421 let svc = StepFunctionsService::new(make_state());
1422 let req = make_request(
1423 "ListExecutions",
1424 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1425 .to_string(),
1426 );
1427 let err = expect_err(svc.list_executions(&req));
1428 assert!(err.to_string().contains("StateMachineDoesNotExist"));
1429 }
1430
1431 #[tokio::test]
1434 async fn get_execution_history_not_found() {
1435 let svc = StepFunctionsService::new(make_state());
1436 let req = make_request(
1437 "GetExecutionHistory",
1438 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1439 .to_string(),
1440 );
1441 let err = expect_err(svc.get_execution_history(&req));
1442 assert!(err.to_string().contains("ExecutionDoesNotExist"));
1443 }
1444
1445 #[tokio::test]
1448 async fn describe_sm_for_execution() {
1449 let svc = StepFunctionsService::new(make_state());
1450 let sm_arn = create_sm(&svc, "sm-for-exec");
1451
1452 let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
1453 let req = make_request("StartExecution", &body.to_string());
1454 let resp = svc.start_execution(&req).unwrap();
1455 let exec_arn = body_json(&resp)["executionArn"]
1456 .as_str()
1457 .unwrap()
1458 .to_string();
1459
1460 let req = make_request(
1461 "DescribeStateMachineForExecution",
1462 &json!({"executionArn": exec_arn}).to_string(),
1463 );
1464 let resp = svc.describe_state_machine_for_execution(&req).unwrap();
1465 let b = body_json(&resp);
1466 assert_eq!(b["name"], "sm-for-exec");
1467 }
1468
1469 #[test]
1472 fn tag_untag_list_tags() {
1473 let svc = StepFunctionsService::new(make_state());
1474 let arn = create_sm(&svc, "tagged-sm");
1475
1476 let body = json!({
1478 "resourceArn": arn,
1479 "tags": [{"key": "env", "value": "prod"}],
1480 });
1481 let req = make_request("TagResource", &body.to_string());
1482 svc.tag_resource(&req).unwrap();
1483
1484 let req = make_request(
1486 "ListTagsForResource",
1487 &json!({"resourceArn": arn}).to_string(),
1488 );
1489 let resp = svc.list_tags_for_resource(&req).unwrap();
1490 let b = body_json(&resp);
1491 let tags = b["tags"].as_array().unwrap();
1492 assert_eq!(tags.len(), 1);
1493 assert_eq!(tags[0]["key"], "env");
1494
1495 let body = json!({
1497 "resourceArn": arn,
1498 "tagKeys": ["env"],
1499 });
1500 let req = make_request("UntagResource", &body.to_string());
1501 svc.untag_resource(&req).unwrap();
1502
1503 let req = make_request(
1505 "ListTagsForResource",
1506 &json!({"resourceArn": arn}).to_string(),
1507 );
1508 let resp = svc.list_tags_for_resource(&req).unwrap();
1509 let b = body_json(&resp);
1510 assert!(b["tags"].as_array().unwrap().is_empty());
1511 }
1512
1513 #[test]
1514 fn tag_resource_not_found() {
1515 let svc = StepFunctionsService::new(make_state());
1516 let body = json!({
1517 "resourceArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1518 "tags": [{"key": "k", "value": "v"}],
1519 });
1520 let req = make_request("TagResource", &body.to_string());
1521 let err = expect_err(svc.tag_resource(&req));
1522 assert!(err.to_string().contains("ResourceNotFound"));
1523 }
1524
1525 #[test]
1528 fn test_validate_name() {
1529 assert!(validate_name("valid-name").is_ok());
1530 assert!(validate_name("under_score").is_ok());
1531 assert!(validate_name("").is_err());
1532 assert!(validate_name("has spaces").is_err());
1533 assert!(validate_name(&"a".repeat(81)).is_err());
1534 }
1535
1536 #[test]
1537 fn test_validate_definition() {
1538 assert!(validate_definition(VALID_DEF).is_ok());
1539 assert!(validate_definition("not json").is_err());
1540 assert!(validate_definition(r#"{"States":{}}"#).is_err()); assert!(validate_definition(r#"{"StartAt":"S"}"#).is_err()); }
1543
1544 #[test]
1545 fn test_validate_definition_rejects_malformed_paths() {
1546 let def =
1548 r#"{"StartAt":"P","States":{"P":{"Type":"Pass","InputPath":"$.arr[","End":true}}}"#;
1549 assert!(validate_definition(def).is_err());
1550
1551 let def =
1553 "{\"StartAt\":\"P\",\"States\":{\"P\":{\"Type\":\"Pass\",\"OutputPath\":\"$.x[\u{00e9}\",\"End\":true}}}";
1554 assert!(validate_definition(def).is_err());
1555
1556 let def = r#"{"StartAt":"C","States":{"C":{"Type":"Choice","Choices":[{"Variable":"$.n[","NumericEquals":1,"Next":"P"}]},"P":{"Type":"Pass","End":true}}}"#;
1558 assert!(validate_definition(def).is_err());
1559
1560 let def =
1562 r#"{"StartAt":"P","States":{"P":{"Type":"Pass","InputPath":"foo.bar","End":true}}}"#;
1563 assert!(validate_definition(def).is_err());
1564
1565 let def =
1567 r#"{"StartAt":"P","States":{"P":{"Type":"Pass","ResultPath":"$.x[]","End":true}}}"#;
1568 assert!(validate_definition(def).is_err());
1569 }
1570
1571 #[test]
1572 fn test_validate_definition_accepts_well_formed_paths() {
1573 let def = r#"{"StartAt":"P","States":{
1576 "P":{"Type":"Pass","InputPath":"$.a.b[0].c","OutputPath":"$","ResultPath":"$.out","Next":"C"},
1577 "C":{"Type":"Choice","Choices":[
1578 {"And":[{"Variable":"$.items[2]","NumericEquals":1},{"Variable":"$.flag","BooleanEquals":true}],"Next":"S"}
1579 ],"Default":"S"},
1580 "S":{"Type":"Succeed","InputPath":"null"}
1581 }}"#;
1582 assert!(validate_definition(def).is_ok());
1583 }
1584
1585 #[test]
1586 fn test_is_valid_reference_path() {
1587 assert!(is_valid_reference_path("$"));
1588 assert!(is_valid_reference_path("$.foo"));
1589 assert!(is_valid_reference_path("$.foo.bar[3].baz"));
1590 assert!(is_valid_reference_path("$[0]"));
1591 assert!(!is_valid_reference_path("$.arr["));
1592 assert!(!is_valid_reference_path("$.x[\u{00e9}"));
1593 assert!(!is_valid_reference_path("$.x[]"));
1594 assert!(!is_valid_reference_path("$.x[abc]"));
1595 assert!(!is_valid_reference_path("foo.bar"));
1596 assert!(!is_valid_reference_path(""));
1597 }
1598
1599 #[test]
1600 fn test_validate_arn() {
1601 assert!(validate_arn("arn:aws:states:us-east-1:123:sm:test").is_ok());
1602 assert!(validate_arn("not-an-arn").is_err());
1603 }
1604
1605 #[test]
1606 fn test_camel_to_details_key() {
1607 assert_eq!(camel_to_details_key("PassStateEntered"), "passStateEntered");
1608 assert_eq!(camel_to_details_key(""), "");
1609 }
1610
1611 #[test]
1612 fn test_is_mutating_action() {
1613 assert!(is_mutating_action("CreateStateMachine"));
1614 assert!(is_mutating_action("StartExecution"));
1615 assert!(!is_mutating_action("DescribeStateMachine"));
1616 assert!(!is_mutating_action("ListStateMachines"));
1617 }
1618
1619 fn create_express_sm(svc: &StepFunctionsService, name: &str) -> String {
1622 let body = json!({
1623 "name": name,
1624 "definition": VALID_DEF,
1625 "roleArn": "arn:aws:iam::123456789012:role/test",
1626 "type": "EXPRESS",
1627 });
1628 let req = make_request("CreateStateMachine", &body.to_string());
1629 let resp = svc.create_state_machine(&req).unwrap();
1630 let b = body_json(&resp);
1631 b["stateMachineArn"].as_str().unwrap().to_string()
1632 }
1633
1634 #[tokio::test]
1635 async fn start_sync_execution_basic() {
1636 let svc = StepFunctionsService::new(make_state());
1637 let arn = create_express_sm(&svc, "sync-sm");
1638
1639 let body = json!({
1640 "stateMachineArn": arn,
1641 "input": r#"{"key":"value"}"#,
1642 });
1643 let req = make_request("StartSyncExecution", &body.to_string());
1644 let resp = svc.start_sync_execution(&req).await.unwrap();
1645 let b = body_json(&resp);
1646 assert!(b["executionArn"]
1647 .as_str()
1648 .unwrap()
1649 .contains("express:sync-sm"));
1650 assert_eq!(b["stateMachineArn"], arn);
1651 assert_eq!(b["status"], "SUCCEEDED");
1652 assert!(b["startDate"].as_i64().is_some());
1653 assert!(b["stopDate"].as_i64().is_some());
1654 assert!(b["output"].as_str().is_some());
1655 assert!(b["billingDetails"]["billedDurationInMilliseconds"]
1656 .as_i64()
1657 .is_some());
1658 }
1659
1660 #[tokio::test]
1661 async fn start_sync_execution_not_express() {
1662 let svc = StepFunctionsService::new(make_state());
1663 let arn = create_sm(&svc, "std-sm");
1664
1665 let body = json!({"stateMachineArn": arn});
1666 let req = make_request("StartSyncExecution", &body.to_string());
1667 let err = expect_err(svc.start_sync_execution(&req).await);
1668 assert!(err.to_string().contains("StateMachineTypeNotSupported"));
1669 }
1670
1671 #[tokio::test]
1672 async fn start_sync_execution_sm_not_found() {
1673 let svc = StepFunctionsService::new(make_state());
1674 let body = json!({
1675 "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1676 });
1677 let req = make_request("StartSyncExecution", &body.to_string());
1678 let err = expect_err(svc.start_sync_execution(&req).await);
1679 assert!(err.to_string().contains("StateMachineDoesNotExist"));
1680 }
1681
1682 #[tokio::test]
1683 async fn start_sync_execution_records_introspection_fields() {
1684 let svc = StepFunctionsService::new(make_state());
1685 let arn = create_express_sm(&svc, "sync-introspect");
1686
1687 let body = json!({"stateMachineArn": arn, "input": "{}"});
1688 let req = make_request("StartSyncExecution", &body.to_string());
1689 let resp = svc.start_sync_execution(&req).await.unwrap();
1690 let b = body_json(&resp);
1691 let exec_arn = b["executionArn"].as_str().unwrap().to_string();
1692
1693 let accounts = svc.state.read();
1694 let state = accounts.get("123456789012").unwrap();
1695 let stored = state
1696 .executions
1697 .get(&exec_arn)
1698 .expect("sync execution should be persisted for introspection");
1699 assert!(stored.is_sync, "sync executions must be marked is_sync");
1700 assert_eq!(stored.billed_memory_mb, Some(64));
1701 assert!(
1702 stored.billed_duration_ms.is_some(),
1703 "billed_duration_ms must be populated after sync run"
1704 );
1705 assert!(
1706 stored.parent_execution_arn.is_none(),
1707 "top-level sync execution has no parent"
1708 );
1709 }
1710
1711 #[tokio::test]
1712 async fn start_sync_execution_invalid_input() {
1713 let svc = StepFunctionsService::new(make_state());
1714 let arn = create_express_sm(&svc, "bad-input-sync");
1715
1716 let body = json!({
1717 "stateMachineArn": arn,
1718 "input": "not json",
1719 });
1720 let req = make_request("StartSyncExecution", &body.to_string());
1721 let err = expect_err(svc.start_sync_execution(&req).await);
1722 assert!(err.to_string().contains("InvalidExecutionInput"));
1723 }
1724
1725 #[test]
1727 fn snapshot_hook_is_none_without_store() {
1728 let svc = StepFunctionsService::new(make_state());
1729 assert!(svc.snapshot_hook().is_none());
1730 }
1731
1732 #[tokio::test]
1736 async fn snapshot_hook_fires_with_store() {
1737 let store: Arc<dyn fakecloud_persistence::SnapshotStore> =
1738 Arc::new(fakecloud_persistence::MemorySnapshotStore::new());
1739 let svc = StepFunctionsService::new(make_state()).with_snapshot_store(store);
1740 let hook = svc
1741 .snapshot_hook()
1742 .expect("hook present when a store is set");
1743 hook().await;
1744 }
1745
1746 fn make_execution(arn: &str, status: ExecutionStatus) -> Execution {
1747 Execution {
1748 execution_arn: arn.to_string(),
1749 state_machine_arn: "arn:aws:states:us-east-1:123456789012:stateMachine:sm".to_string(),
1750 state_machine_name: "sm".to_string(),
1751 name: arn.to_string(),
1752 status,
1753 input: None,
1754 output: None,
1755 start_date: Utc::now(),
1756 stop_date: None,
1757 error: None,
1758 cause: None,
1759 history_events: vec![],
1760 parent_execution_arn: None,
1761 is_sync: false,
1762 billed_duration_ms: None,
1763 billed_memory_mb: None,
1764 }
1765 }
1766
1767 #[test]
1768 fn reconcile_aborts_running_executions_on_restart() {
1769 let state = make_state();
1773 {
1774 let mut accounts = state.write();
1775 let s = accounts.get_or_create("123456789012");
1776 s.executions.insert(
1777 "running".into(),
1778 make_execution("running", ExecutionStatus::Running),
1779 );
1780 s.executions.insert(
1781 "done".into(),
1782 make_execution("done", ExecutionStatus::Succeeded),
1783 );
1784 }
1785
1786 let n = reconcile_interrupted_executions(&state);
1787 assert_eq!(n, 1, "only the RUNNING execution is reconciled");
1788
1789 let accounts = state.read();
1790 let s = accounts.get("123456789012").unwrap();
1791 let running = &s.executions["running"];
1792 assert_eq!(running.status, ExecutionStatus::Aborted);
1793 assert!(running.stop_date.is_some());
1794 assert_eq!(running.error.as_deref(), Some("Fakecloud.Restart"));
1795 assert_eq!(s.executions["done"].status, ExecutionStatus::Succeeded);
1796 }
1797}
1798
1799#[cfg(test)]
1800mod pagination_reject_test {
1801 #[test]
1802 fn paginate_checked_rejects_invalid_token() {
1803 use fakecloud_core::pagination::paginate_checked;
1804 let items: Vec<i32> = (0..5).collect();
1805 assert!(paginate_checked(&items, Some("bad"), 3).is_err());
1806 assert!(paginate_checked(&items, Some("2"), 3).is_ok());
1807 }
1808}