1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use chrono::Utc;
6use http::StatusCode;
7use serde_json::{json, Value};
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::pagination::paginate_checked;
12use fakecloud_core::registry::ServiceRegistry;
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_core::validation::*;
15use fakecloud_dynamodb::SharedDynamoDbState;
16use fakecloud_persistence::SnapshotStore;
17
18use crate::interpreter;
19use crate::state::{
20 Execution, ExecutionStatus, SharedStepFunctionsState, StateMachine, StateMachineStatus,
21 StateMachineType, StepFunctionsSnapshot, StepFunctionsState,
22 STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
23};
24
25const SUPPORTED: &[&str] = &[
26 "CreateStateMachine",
27 "DescribeStateMachine",
28 "ListStateMachines",
29 "DeleteStateMachine",
30 "UpdateStateMachine",
31 "TagResource",
32 "UntagResource",
33 "ListTagsForResource",
34 "StartExecution",
35 "StopExecution",
36 "DescribeExecution",
37 "ListExecutions",
38 "GetExecutionHistory",
39 "DescribeStateMachineForExecution",
40 "CreateActivity",
41 "DeleteActivity",
42 "DescribeActivity",
43 "ListActivities",
44 "GetActivityTask",
45 "SendTaskFailure",
46 "SendTaskHeartbeat",
47 "SendTaskSuccess",
48 "PublishStateMachineVersion",
49 "DeleteStateMachineVersion",
50 "ListStateMachineVersions",
51 "CreateStateMachineAlias",
52 "DeleteStateMachineAlias",
53 "DescribeStateMachineAlias",
54 "ListStateMachineAliases",
55 "UpdateStateMachineAlias",
56 "DescribeMapRun",
57 "ListMapRuns",
58 "UpdateMapRun",
59 "RedriveExecution",
60 "StartSyncExecution",
61 "TestState",
62 "ValidateStateMachineDefinition",
63];
64
65pub type SharedServiceRegistry = Arc<std::sync::OnceLock<Arc<ServiceRegistry>>>;
71
72pub struct StepFunctionsService {
73 state: SharedStepFunctionsState,
74 delivery: Option<Arc<DeliveryBus>>,
75 dynamodb_state: Option<SharedDynamoDbState>,
76 registry: Option<SharedServiceRegistry>,
77 snapshot_store: Option<Arc<dyn SnapshotStore>>,
78 snapshot_lock: Arc<AsyncMutex<()>>,
79}
80
81mod activities;
82mod executions;
83mod map_runs;
84mod state_machines;
85mod tags;
86mod tasks;
87mod validation;
88
89impl StepFunctionsService {
90 pub fn new(state: SharedStepFunctionsState) -> Self {
91 Self {
92 state,
93 delivery: None,
94 dynamodb_state: None,
95 registry: None,
96 snapshot_store: None,
97 snapshot_lock: Arc::new(AsyncMutex::new(())),
98 }
99 }
100
101 pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
102 self.delivery = Some(delivery);
103 self
104 }
105
106 pub fn with_dynamodb(mut self, dynamodb_state: SharedDynamoDbState) -> Self {
107 self.dynamodb_state = Some(dynamodb_state);
108 self
109 }
110
111 pub fn with_registry(mut self, registry: SharedServiceRegistry) -> Self {
116 self.registry = Some(registry);
117 self
118 }
119
120 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
121 self.snapshot_store = Some(store);
122 self
123 }
124
125 async fn save_snapshot(&self) {
126 save_stepfunctions_snapshot(
127 &self.state,
128 self.snapshot_store.clone(),
129 &self.snapshot_lock,
130 )
131 .await;
132 }
133
134 pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
140 let store = self.snapshot_store.clone()?;
141 let state = self.state.clone();
142 let lock = self.snapshot_lock.clone();
143 Some(Arc::new(move || {
144 let state = state.clone();
145 let store = store.clone();
146 let lock = lock.clone();
147 Box::pin(async move {
148 save_stepfunctions_snapshot(&state, Some(store), &lock).await;
149 })
150 }))
151 }
152}
153
154pub async fn save_stepfunctions_snapshot(
160 state: &SharedStepFunctionsState,
161 store: Option<Arc<dyn SnapshotStore>>,
162 lock: &AsyncMutex<()>,
163) {
164 let Some(store) = store else {
165 return;
166 };
167 let _guard = lock.lock().await;
168 let snapshot = StepFunctionsSnapshot {
169 schema_version: STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
170 state: None,
171 accounts: Some(state.read().clone()),
172 };
173 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
174 let bytes = serde_json::to_vec(&snapshot)
175 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
176 store.save(&bytes)
177 })
178 .await;
179 match join {
180 Ok(Ok(())) => {}
181 Ok(Err(err)) => tracing::error!(%err, "failed to write stepfunctions snapshot"),
182 Err(err) => tracing::error!(%err, "stepfunctions snapshot task panicked"),
183 }
184}
185
186fn is_mutating_action(action: &str) -> bool {
187 matches!(
188 action,
189 "CreateStateMachine"
190 | "DeleteStateMachine"
191 | "UpdateStateMachine"
192 | "TagResource"
193 | "UntagResource"
194 | "StartExecution"
195 | "StopExecution"
196 | "CreateActivity"
197 | "DeleteActivity"
198 | "GetActivityTask"
199 | "SendTaskFailure"
200 | "SendTaskHeartbeat"
201 | "SendTaskSuccess"
202 | "PublishStateMachineVersion"
203 | "DeleteStateMachineVersion"
204 | "CreateStateMachineAlias"
205 | "DeleteStateMachineAlias"
206 | "UpdateStateMachineAlias"
207 | "UpdateMapRun"
208 | "RedriveExecution"
209 | "StartSyncExecution"
210 )
211}
212
213#[async_trait]
214impl AwsService for StepFunctionsService {
215 fn service_name(&self) -> &str {
216 "states"
217 }
218
219 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
220 let mutates = is_mutating_action(req.action.as_str());
221 let result = match req.action.as_str() {
222 "CreateStateMachine" => self.create_state_machine(&req),
223 "DescribeStateMachine" => self.describe_state_machine(&req),
224 "ListStateMachines" => self.list_state_machines(&req),
225 "DeleteStateMachine" => self.delete_state_machine(&req),
226 "UpdateStateMachine" => self.update_state_machine(&req),
227 "TagResource" => self.tag_resource(&req),
228 "UntagResource" => self.untag_resource(&req),
229 "ListTagsForResource" => self.list_tags_for_resource(&req),
230 "StartExecution" => self.start_execution(&req),
231 "StopExecution" => self.stop_execution(&req),
232 "DescribeExecution" => self.describe_execution(&req),
233 "ListExecutions" => self.list_executions(&req),
234 "GetExecutionHistory" => self.get_execution_history(&req),
235 "DescribeStateMachineForExecution" => self.describe_state_machine_for_execution(&req),
236 "CreateActivity" => self.create_activity(&req),
237 "DeleteActivity" => self.delete_activity(&req),
238 "DescribeActivity" => self.describe_activity(&req),
239 "ListActivities" => self.list_activities(&req),
240 "GetActivityTask" => self.get_activity_task(&req).await,
241 "SendTaskFailure" => self.send_task_failure(&req),
242 "SendTaskHeartbeat" => self.send_task_heartbeat(&req),
243 "SendTaskSuccess" => self.send_task_success(&req),
244 "PublishStateMachineVersion" => self.publish_state_machine_version(&req),
245 "DeleteStateMachineVersion" => self.delete_state_machine_version(&req),
246 "ListStateMachineVersions" => self.list_state_machine_versions(&req),
247 "CreateStateMachineAlias" => self.create_state_machine_alias(&req),
248 "DeleteStateMachineAlias" => self.delete_state_machine_alias(&req),
249 "DescribeStateMachineAlias" => self.describe_state_machine_alias(&req),
250 "ListStateMachineAliases" => self.list_state_machine_aliases(&req),
251 "UpdateStateMachineAlias" => self.update_state_machine_alias(&req),
252 "DescribeMapRun" => self.describe_map_run(&req),
253 "ListMapRuns" => self.list_map_runs(&req),
254 "UpdateMapRun" => self.update_map_run(&req),
255 "RedriveExecution" => self.redrive_execution(&req),
256 "StartSyncExecution" => self.start_sync_execution(&req).await,
257 "TestState" => self.test_state(&req),
258 "ValidateStateMachineDefinition" => self.validate_state_machine_definition(&req),
259 _ => Err(AwsServiceError::action_not_implemented(
260 "states",
261 &req.action,
262 )),
263 };
264 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
265 self.save_snapshot().await;
266 }
267 result
268 }
269
270 fn supported_actions(&self) -> &[&str] {
271 SUPPORTED
272 }
273}
274
275impl StepFunctionsService {
276 fn list_activities(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
277 let body = req.json_body();
278 let raw_max_results = body["maxResults"].as_i64();
279 if let Some(mr) = raw_max_results {
280 validate_max_results(mr)?;
281 }
282 let next_token = body["nextToken"].as_str();
283 if let Some(t) = next_token {
284 validate_page_token(t)?;
285 }
286 let max_results = match raw_max_results.unwrap_or(0) {
290 0 => 100,
291 n => n as usize,
292 };
293 let accounts = self.state.read();
294 let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
295 let state = accounts.get(&req.account_id).unwrap_or(&empty);
296 let mut activities: Vec<&crate::state::Activity> = state.activities.values().collect();
297 activities.sort_by(|a, b| a.name.cmp(&b.name));
298 let items: Vec<Value> = activities
299 .iter()
300 .map(|a| {
301 json!({
302 "activityArn": a.arn,
303 "name": a.name,
304 "creationDate": a.creation_date.timestamp(),
305 })
306 })
307 .collect();
308 let (page, token) =
309 paginate_checked(&items, next_token, max_results).map_err(|_| invalid_token())?;
310 let mut resp = json!({ "activities": page });
311 if let Some(t) = token {
312 resp["nextToken"] = json!(t);
313 }
314 Ok(AwsResponse::ok_json(resp))
315 }
316}
317
318fn state_machine_alias_to_json(alias: &crate::state::StateMachineAlias) -> Value {
319 json!({
320 "stateMachineAliasArn": alias.arn,
321 "name": alias.name,
322 "description": alias.description,
323 "routingConfiguration": alias.routing_configuration.iter().map(|r| json!({
324 "stateMachineVersionArn": r.state_machine_version_arn,
325 "weight": r.weight,
326 })).collect::<Vec<_>>(),
327 "creationDate": alias.creation_date.timestamp(),
328 "updateDate": alias.update_date.timestamp(),
329 })
330}
331
332fn map_run_to_json(mr: &crate::state::MapRun) -> Value {
333 json!({
334 "mapRunArn": mr.map_run_arn,
335 "executionArn": mr.execution_arn,
336 "maxConcurrency": mr.max_concurrency,
337 "toleratedFailurePercentage": mr.tolerated_failure_percentage,
338 "toleratedFailureCount": mr.tolerated_failure_count,
339 "status": mr.status,
340 "startDate": mr.start_date.timestamp(),
341 "stopDate": mr.stop_date.map(|d| d.timestamp()),
342 })
343}
344
345fn state_machine_to_json(sm: &StateMachine) -> Value {
348 let mut resp = json!({
349 "name": sm.name,
350 "stateMachineArn": sm.arn,
351 "definition": sm.definition,
352 "roleArn": sm.role_arn,
353 "type": sm.machine_type.as_str(),
354 "status": sm.status.as_str(),
355 "creationDate": sm.creation_date.timestamp() as f64,
356 "updateDate": sm.update_date.timestamp() as f64,
357 "revisionId": sm.revision_id,
358 "label": sm.name,
359 });
360
361 if !sm.description.is_empty() {
362 resp["description"] = json!(sm.description);
363 }
364
365 if let Some(ref logging) = sm.logging_configuration {
366 resp["loggingConfiguration"] = logging.clone();
367 } else {
368 resp["loggingConfiguration"] = json!({
369 "level": "OFF",
370 "includeExecutionData": false,
371 "destinations": [],
372 });
373 }
374
375 if let Some(ref tracing) = sm.tracing_configuration {
376 resp["tracingConfiguration"] = tracing.clone();
377 } else {
378 resp["tracingConfiguration"] = json!({
379 "enabled": false,
380 });
381 }
382
383 resp
384}
385
386fn missing(name: &str) -> AwsServiceError {
387 AwsServiceError::aws_error(
388 StatusCode::BAD_REQUEST,
389 "ValidationException",
390 format!("The request must contain the parameter {name}."),
391 )
392}
393
394fn state_machine_not_found(arn: &str) -> AwsServiceError {
395 AwsServiceError::aws_error(
396 StatusCode::BAD_REQUEST,
397 "StateMachineDoesNotExist",
398 format!("State Machine Does Not Exist: '{arn}'"),
399 )
400}
401
402fn activity_not_found(arn: &str) -> AwsServiceError {
403 AwsServiceError::aws_error(
404 StatusCode::BAD_REQUEST,
405 "ActivityDoesNotExist",
406 format!("Activity does not exist: {arn}"),
407 )
408}
409
410fn task_does_not_exist(token: &str) -> AwsServiceError {
411 AwsServiceError::aws_error(
412 StatusCode::BAD_REQUEST,
413 "TaskDoesNotExist",
414 format!("Task does not exist: {token}"),
415 )
416}
417
418fn resource_not_found(arn: &str) -> AwsServiceError {
419 AwsServiceError::aws_error(
420 StatusCode::BAD_REQUEST,
421 "ResourceNotFound",
422 format!("Resource not found: '{arn}'"),
423 )
424}
425
426fn parse_routing_configuration(
431 routes: &[serde_json::Value],
432) -> Result<Vec<crate::state::AliasRoute>, AwsServiceError> {
433 if routes.is_empty() || routes.len() > 2 {
434 return Err(AwsServiceError::aws_error(
435 StatusCode::BAD_REQUEST,
436 "ValidationException",
437 "routingConfiguration must contain 1 or 2 routes.",
438 ));
439 }
440 let parsed: Vec<crate::state::AliasRoute> = routes
441 .iter()
442 .map(|r| {
443 let arn = r["stateMachineVersionArn"].as_str().ok_or_else(|| {
444 AwsServiceError::aws_error(
445 StatusCode::BAD_REQUEST,
446 "ValidationException",
447 "routingConfiguration entries must contain stateMachineVersionArn.",
448 )
449 })?;
450 let weight = r["weight"].as_i64().ok_or_else(|| {
451 AwsServiceError::aws_error(
452 StatusCode::BAD_REQUEST,
453 "ValidationException",
454 "routingConfiguration entries must contain a numeric weight.",
455 )
456 })?;
457 if !(0..=100).contains(&weight) {
458 return Err(AwsServiceError::aws_error(
459 StatusCode::BAD_REQUEST,
460 "ValidationException",
461 format!("Invalid routing weight {weight}; must be 0-100."),
462 ));
463 }
464 Ok(crate::state::AliasRoute {
465 state_machine_version_arn: arn.to_string(),
466 weight: weight as i32,
467 })
468 })
469 .collect::<Result<_, _>>()?;
470 let total: i32 = parsed.iter().map(|r| r.weight).sum();
471 if total != 100 {
472 return Err(AwsServiceError::aws_error(
473 StatusCode::BAD_REQUEST,
474 "ValidationException",
475 format!("routingConfiguration weights must sum to 100, got {total}."),
476 ));
477 }
478 Ok(parsed)
479}
480
481fn validate_name(name: &str) -> Result<(), AwsServiceError> {
482 if name.is_empty() || name.len() > 80 {
483 return Err(AwsServiceError::aws_error(
484 StatusCode::BAD_REQUEST,
485 "InvalidName",
486 format!("Invalid Name: '{name}' (length must be between 1 and 80 characters)"),
487 ));
488 }
489 if !name
491 .chars()
492 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
493 {
494 return Err(AwsServiceError::aws_error(
495 StatusCode::BAD_REQUEST,
496 "InvalidName",
497 format!(
498 "Invalid Name: '{name}' (must only contain alphanumeric characters, hyphens, and underscores)"
499 ),
500 ));
501 }
502 Ok(())
503}
504
505fn validate_definition(definition: &str) -> Result<(), AwsServiceError> {
506 let parsed: Value = serde_json::from_str(definition).map_err(|e| {
507 AwsServiceError::aws_error(
508 StatusCode::BAD_REQUEST,
509 "InvalidDefinition",
510 format!("Invalid State Machine Definition: '{e}'"),
511 )
512 })?;
513
514 if parsed.get("StartAt").and_then(|v| v.as_str()).is_none() {
515 return Err(AwsServiceError::aws_error(
516 StatusCode::BAD_REQUEST,
517 "InvalidDefinition",
518 "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
519 .to_string(),
520 ));
521 }
522
523 let states_obj = parsed
524 .get("States")
525 .and_then(|v| v.as_object())
526 .ok_or_else(|| {
527 AwsServiceError::aws_error(
528 StatusCode::BAD_REQUEST,
529 "InvalidDefinition",
530 "Invalid State Machine Definition: 'MISSING_STATES' (States field is required)"
531 .to_string(),
532 )
533 })?;
534
535 let start_at = parsed["StartAt"].as_str().ok_or_else(|| {
536 AwsServiceError::aws_error(
537 StatusCode::BAD_REQUEST,
538 "InvalidDefinition",
539 "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
540 .to_string(),
541 )
542 })?;
543 if !states_obj.contains_key(start_at) {
544 return Err(AwsServiceError::aws_error(
545 StatusCode::BAD_REQUEST,
546 "InvalidDefinition",
547 format!(
548 "Invalid State Machine Definition: 'MISSING_TRANSITION_TARGET' \
549 (StartAt '{start_at}' does not reference a valid state)"
550 ),
551 ));
552 }
553
554 for (state_name, state_val) in states_obj {
558 validate_state_paths(state_name, state_val)?;
559 }
560
561 Ok(())
562}
563
564fn validate_state_paths(state_name: &str, state: &Value) -> Result<(), AwsServiceError> {
568 for field in ["InputPath", "OutputPath", "ResultPath"] {
569 if let Some(p) = state.get(field).and_then(|v| v.as_str()) {
570 if (field != "ResultPath" && p == "null") || is_valid_reference_path(p) {
574 continue;
575 }
576 return Err(invalid_reference_path(state_name, field, p));
577 }
578 }
579
580 if state.get("Type").and_then(|v| v.as_str()) == Some("Choice") {
581 if let Some(choices) = state.get("Choices").and_then(|v| v.as_array()) {
582 for rule in choices {
583 validate_choice_variables(state_name, rule)?;
584 }
585 }
586 }
587
588 Ok(())
589}
590
591fn validate_choice_variables(state_name: &str, rule: &Value) -> Result<(), AwsServiceError> {
594 if let Some(v) = rule.get("Variable").and_then(|v| v.as_str()) {
595 if !is_valid_reference_path(v) {
596 return Err(invalid_reference_path(state_name, "Variable", v));
597 }
598 }
599 for combinator in ["And", "Or"] {
600 if let Some(nested) = rule.get(combinator).and_then(|v| v.as_array()) {
601 for n in nested {
602 validate_choice_variables(state_name, n)?;
603 }
604 }
605 }
606 if let Some(n) = rule.get("Not") {
607 validate_choice_variables(state_name, n)?;
608 }
609 Ok(())
610}
611
612fn is_valid_reference_path(path: &str) -> bool {
616 if path != "$" && !path.starts_with("$.") && !path.starts_with("$[") {
617 return false;
618 }
619 let body = path
620 .strip_prefix("$.")
621 .or_else(|| path.strip_prefix('$'))
622 .unwrap_or(path);
623 for part in body.split('.') {
624 if !segment_is_valid(part) {
625 return false;
626 }
627 }
628 true
629}
630
631fn segment_is_valid(part: &str) -> bool {
632 match part.find('[') {
633 None => !part.contains(']'),
634 Some(open) => {
635 if !part.ends_with(']') {
636 return false;
637 }
638 let inner = &part[open + 1..part.len() - 1];
639 !inner.is_empty() && inner.parse::<usize>().is_ok()
641 }
642 }
643}
644
645fn invalid_reference_path(state_name: &str, field: &str, path: &str) -> AwsServiceError {
646 AwsServiceError::aws_error(
647 StatusCode::BAD_REQUEST,
648 "InvalidDefinition",
649 format!(
650 "Invalid State Machine Definition: 'SCHEMA_VALIDATION_FAILED' \
651 (The {field} field of state '{state_name}' is not a valid reference path: '{path}')"
652 ),
653 )
654}
655
656fn execution_not_found(arn: &str) -> AwsServiceError {
657 AwsServiceError::aws_error(
658 StatusCode::BAD_REQUEST,
659 "ExecutionDoesNotExist",
660 format!("Execution Does Not Exist: '{arn}'"),
661 )
662}
663
664fn execution_to_json(exec: &Execution) -> Value {
665 let mut resp = json!({
666 "executionArn": exec.execution_arn,
667 "stateMachineArn": exec.state_machine_arn,
668 "name": exec.name,
669 "status": exec.status.as_str(),
670 "startDate": exec.start_date.timestamp() as f64,
671 });
672
673 if let Some(ref input) = exec.input {
674 resp["input"] = json!(input);
675 }
676 if let Some(ref output) = exec.output {
677 resp["output"] = json!(output);
678 }
679 if let Some(stop) = exec.stop_date {
680 resp["stopDate"] = json!(stop.timestamp() as f64);
681 }
682 if let Some(ref error) = exec.error {
683 resp["error"] = json!(error);
684 }
685 if let Some(ref cause) = exec.cause {
686 resp["cause"] = json!(cause);
687 }
688
689 resp
690}
691
692fn camel_to_details_key(event_type: &str) -> String {
694 let mut chars = event_type.chars();
695 match chars.next() {
696 None => String::new(),
697 Some(c) => c.to_lowercase().to_string() + chars.as_str(),
698 }
699}
700
701fn validate_arn(arn: &str) -> Result<(), AwsServiceError> {
702 if !arn.starts_with("arn:") {
703 return Err(AwsServiceError::aws_error(
704 StatusCode::BAD_REQUEST,
705 "InvalidArn",
706 format!("Invalid Arn: '{arn}'"),
707 ));
708 }
709 Ok(())
710}
711
712fn validate_arn_length(field: &str, value: &str, max: usize) -> Result<(), AwsServiceError> {
716 if value.is_empty() || value.len() > max {
717 return Err(AwsServiceError::aws_error(
718 StatusCode::BAD_REQUEST,
719 "InvalidArn",
720 format!("Invalid Arn at '{field}': must be 1..={max} characters"),
721 ));
722 }
723 Ok(())
724}
725
726pub(super) fn invalid_token() -> AwsServiceError {
730 AwsServiceError::aws_error(StatusCode::BAD_REQUEST, "InvalidToken", "Invalid nextToken")
731}
732
733fn validate_page_token(value: &str) -> Result<(), AwsServiceError> {
737 if value.is_empty() || value.len() > 1024 {
738 return Err(AwsServiceError::aws_error(
739 StatusCode::BAD_REQUEST,
740 "InvalidToken",
741 "nextToken must be 1..=1024 characters",
742 ));
743 }
744 Ok(())
745}
746
747fn validate_max_results(value: i64) -> Result<(), AwsServiceError> {
753 if !(0..=1000).contains(&value) {
754 return Err(AwsServiceError::aws_error(
755 StatusCode::BAD_REQUEST,
756 "InvalidToken",
757 format!("maxResults '{value}' is outside 0..=1000"),
758 ));
759 }
760 Ok(())
761}
762
763pub fn start_execution_from_delivery(
772 state: &SharedStepFunctionsState,
773 delivery: &Option<Arc<DeliveryBus>>,
774 dynamodb_state: &Option<SharedDynamoDbState>,
775 registry: &Option<SharedServiceRegistry>,
776 state_machine_arn: &str,
777 input: &str,
778) {
779 if serde_json::from_str::<serde_json::Value>(input).is_err() {
781 tracing::warn!(
782 state_machine_arn,
783 "Step Functions delivery: invalid JSON input, skipping execution"
784 );
785 return;
786 }
787
788 let execution_name = uuid::Uuid::new_v4().to_string();
789
790 let account_id = state_machine_arn
792 .split(':')
793 .nth(4)
794 .unwrap_or("000000000000")
795 .to_string();
796
797 let mut accounts = state.write();
798 let st = accounts.get_or_create(&account_id);
799 let sm = match st.state_machines.get(state_machine_arn) {
800 Some(sm) => sm,
801 None => {
802 tracing::warn!(
803 state_machine_arn,
804 "Step Functions delivery: state machine not found"
805 );
806 return;
807 }
808 };
809
810 let sm_name = sm.name.clone();
811 let definition = sm.definition.clone();
812 let exec_arn = st.execution_arn(&sm_name, &execution_name);
813
814 let now = Utc::now();
815 let execution = Execution {
816 execution_arn: exec_arn.clone(),
817 state_machine_arn: state_machine_arn.to_string(),
818 state_machine_name: sm_name,
819 name: execution_name,
820 status: ExecutionStatus::Running,
821 input: Some(input.to_string()),
822 output: None,
823 start_date: now,
824 stop_date: None,
825 error: None,
826 cause: None,
827 history_events: vec![],
828 parent_execution_arn: None,
829 is_sync: false,
830 billed_duration_ms: None,
831 billed_memory_mb: None,
832 };
833
834 st.executions.insert(exec_arn.clone(), execution);
835 let logging_config = sm.logging_configuration.clone();
836 drop(accounts);
837
838 let shared_state = state.clone();
839 let delivery = delivery.clone();
840 let dynamodb_state = dynamodb_state.clone();
841 let registry = registry.clone();
842 let input = Some(input.to_string());
843 tokio::spawn(async move {
844 interpreter::execute_state_machine(
845 shared_state,
846 exec_arn,
847 definition,
848 input,
849 delivery,
850 dynamodb_state,
851 registry,
852 logging_config,
853 )
854 .await;
855 });
856}
857
858#[cfg(test)]
859mod tests {
860 use super::*;
861 use http::{HeaderMap, Method};
862 use parking_lot::RwLock;
863 use serde_json::Value;
864 use std::collections::HashMap;
865 use std::sync::Arc;
866
867 fn make_state() -> SharedStepFunctionsState {
868 Arc::new(RwLock::new(
869 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
870 ))
871 }
872
873 fn make_request(action: &str, body: &str) -> AwsRequest {
874 AwsRequest {
875 service: "states".to_string(),
876 action: action.to_string(),
877 region: "us-east-1".to_string(),
878 account_id: "123456789012".to_string(),
879 request_id: "test-id".to_string(),
880 headers: HeaderMap::new(),
881 query_params: HashMap::new(),
882 body: body.as_bytes().to_vec().into(),
883 body_stream: parking_lot::Mutex::new(None),
884 path_segments: vec![],
885 raw_path: "/".to_string(),
886 raw_query: String::new(),
887 method: Method::POST,
888 is_query_protocol: false,
889 access_key_id: None,
890 principal: None,
891 }
892 }
893
894 fn body_json(resp: &AwsResponse) -> Value {
895 serde_json::from_slice(resp.body.expect_bytes()).unwrap()
896 }
897
898 fn expect_err(result: Result<AwsResponse, AwsServiceError>) -> AwsServiceError {
899 match result {
900 Err(e) => e,
901 Ok(_) => panic!("expected error, got Ok"),
902 }
903 }
904
905 const VALID_DEF: &str = r#"{"StartAt":"Pass","States":{"Pass":{"Type":"Pass","End":true}}}"#;
906
907 fn create_sm(svc: &StepFunctionsService, name: &str) -> String {
908 let body = json!({
909 "name": name,
910 "definition": VALID_DEF,
911 "roleArn": "arn:aws:iam::123456789012:role/test",
912 });
913 let req = make_request("CreateStateMachine", &body.to_string());
914 let resp = svc.create_state_machine(&req).unwrap();
915 let b = body_json(&resp);
916 b["stateMachineArn"].as_str().unwrap().to_string()
917 }
918
919 #[test]
922 fn create_state_machine_basic() {
923 let svc = StepFunctionsService::new(make_state());
924 let arn = create_sm(&svc, "test-sm");
925 assert!(arn.contains("test-sm"));
926 }
927
928 #[test]
929 fn create_state_machine_with_express_type() {
930 let svc = StepFunctionsService::new(make_state());
931 let body = json!({
932 "name": "express-sm",
933 "definition": VALID_DEF,
934 "roleArn": "arn:aws:iam::123456789012:role/r",
935 "type": "EXPRESS",
936 });
937 let req = make_request("CreateStateMachine", &body.to_string());
938 let resp = svc.create_state_machine(&req).unwrap();
939 let b = body_json(&resp);
940 assert!(b["stateMachineArn"].as_str().is_some());
941 }
942
943 #[test]
944 fn create_state_machine_duplicate_fails() {
945 let svc = StepFunctionsService::new(make_state());
946 create_sm(&svc, "dup-sm");
947 let body = json!({
948 "name": "dup-sm",
949 "definition": VALID_DEF,
950 "roleArn": "arn:aws:iam::123456789012:role/r",
951 });
952 let req = make_request("CreateStateMachine", &body.to_string());
953 let err = expect_err(svc.create_state_machine(&req));
954 assert!(err.to_string().contains("StateMachineAlreadyExists"));
955 }
956
957 #[test]
958 fn create_state_machine_missing_name() {
959 let svc = StepFunctionsService::new(make_state());
960 let body = json!({
961 "definition": VALID_DEF,
962 "roleArn": "arn:aws:iam::123456789012:role/r",
963 });
964 let req = make_request("CreateStateMachine", &body.to_string());
965 assert!(svc.create_state_machine(&req).is_err());
966 }
967
968 #[test]
969 fn create_state_machine_invalid_definition() {
970 let svc = StepFunctionsService::new(make_state());
971 let body = json!({
972 "name": "bad-def",
973 "definition": "not json",
974 "roleArn": "arn:aws:iam::123456789012:role/r",
975 });
976 let req = make_request("CreateStateMachine", &body.to_string());
977 let err = expect_err(svc.create_state_machine(&req));
978 assert!(err.to_string().contains("InvalidDefinition"));
979 }
980
981 #[test]
982 fn create_state_machine_definition_missing_start_at() {
983 let svc = StepFunctionsService::new(make_state());
984 let body = json!({
985 "name": "no-start",
986 "definition": r#"{"States":{"S":{"Type":"Pass","End":true}}}"#,
987 "roleArn": "arn:aws:iam::123456789012:role/r",
988 });
989 let req = make_request("CreateStateMachine", &body.to_string());
990 let err = expect_err(svc.create_state_machine(&req));
991 assert!(err.to_string().contains("InvalidDefinition"));
992 }
993
994 #[test]
995 fn create_state_machine_definition_missing_states() {
996 let svc = StepFunctionsService::new(make_state());
997 let body = json!({
998 "name": "no-states",
999 "definition": r#"{"StartAt":"S"}"#,
1000 "roleArn": "arn:aws:iam::123456789012:role/r",
1001 });
1002 let req = make_request("CreateStateMachine", &body.to_string());
1003 let err = expect_err(svc.create_state_machine(&req));
1004 assert!(err.to_string().contains("InvalidDefinition"));
1005 }
1006
1007 #[test]
1008 fn create_state_machine_definition_start_at_not_in_states() {
1009 let svc = StepFunctionsService::new(make_state());
1010 let body = json!({
1011 "name": "bad-start",
1012 "definition": r#"{"StartAt":"Missing","States":{"S":{"Type":"Pass","End":true}}}"#,
1013 "roleArn": "arn:aws:iam::123456789012:role/r",
1014 });
1015 let req = make_request("CreateStateMachine", &body.to_string());
1016 let err = expect_err(svc.create_state_machine(&req));
1017 assert!(err.to_string().contains("MISSING_TRANSITION_TARGET"));
1018 }
1019
1020 #[test]
1021 fn create_state_machine_invalid_type() {
1022 let svc = StepFunctionsService::new(make_state());
1023 let body = json!({
1024 "name": "bad-type",
1025 "definition": VALID_DEF,
1026 "roleArn": "arn:aws:iam::123456789012:role/r",
1027 "type": "INVALID",
1028 });
1029 let req = make_request("CreateStateMachine", &body.to_string());
1030 assert!(svc.create_state_machine(&req).is_err());
1031 }
1032
1033 #[test]
1034 fn create_state_machine_invalid_arn() {
1035 let svc = StepFunctionsService::new(make_state());
1036 let body = json!({
1037 "name": "bad-arn",
1038 "definition": VALID_DEF,
1039 "roleArn": "not-an-arn",
1040 });
1041 let req = make_request("CreateStateMachine", &body.to_string());
1042 let err = expect_err(svc.create_state_machine(&req));
1043 assert!(err.to_string().contains("InvalidArn"));
1044 }
1045
1046 #[test]
1047 fn create_state_machine_invalid_name() {
1048 let svc = StepFunctionsService::new(make_state());
1049 let body = json!({
1050 "name": "has spaces!",
1051 "definition": VALID_DEF,
1052 "roleArn": "arn:aws:iam::123456789012:role/r",
1053 });
1054 let req = make_request("CreateStateMachine", &body.to_string());
1055 let err = expect_err(svc.create_state_machine(&req));
1056 assert!(err.to_string().contains("InvalidName"));
1057 }
1058
1059 #[test]
1060 fn create_state_machine_name_too_long() {
1061 let svc = StepFunctionsService::new(make_state());
1062 let long_name = "a".repeat(81);
1063 let body = json!({
1064 "name": long_name,
1065 "definition": VALID_DEF,
1066 "roleArn": "arn:aws:iam::123456789012:role/r",
1067 });
1068 let req = make_request("CreateStateMachine", &body.to_string());
1069 let err = expect_err(svc.create_state_machine(&req));
1070 assert!(err.to_string().contains("InvalidName"));
1071 }
1072
1073 #[test]
1076 fn describe_state_machine_found() {
1077 let svc = StepFunctionsService::new(make_state());
1078 let arn = create_sm(&svc, "desc-sm");
1079
1080 let req = make_request(
1081 "DescribeStateMachine",
1082 &json!({"stateMachineArn": arn}).to_string(),
1083 );
1084 let resp = svc.describe_state_machine(&req).unwrap();
1085 let b = body_json(&resp);
1086 assert_eq!(b["name"], "desc-sm");
1087 assert_eq!(b["status"], "ACTIVE");
1088 assert!(b["definition"].as_str().is_some());
1089 }
1090
1091 #[test]
1092 fn describe_state_machine_not_found() {
1093 let svc = StepFunctionsService::new(make_state());
1094 let req = make_request(
1095 "DescribeStateMachine",
1096 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1097 .to_string(),
1098 );
1099 let err = expect_err(svc.describe_state_machine(&req));
1100 assert!(err.to_string().contains("StateMachineDoesNotExist"));
1101 }
1102
1103 #[test]
1106 fn list_state_machines_empty() {
1107 let svc = StepFunctionsService::new(make_state());
1108 let req = make_request("ListStateMachines", "{}");
1109 let resp = svc.list_state_machines(&req).unwrap();
1110 let b = body_json(&resp);
1111 assert!(b["stateMachines"].as_array().unwrap().is_empty());
1112 }
1113
1114 #[test]
1115 fn list_state_machines_returns_created() {
1116 let svc = StepFunctionsService::new(make_state());
1117 create_sm(&svc, "sm-1");
1118 create_sm(&svc, "sm-2");
1119
1120 let req = make_request("ListStateMachines", "{}");
1121 let resp = svc.list_state_machines(&req).unwrap();
1122 let b = body_json(&resp);
1123 assert_eq!(b["stateMachines"].as_array().unwrap().len(), 2);
1124 }
1125
1126 #[test]
1129 fn delete_state_machine() {
1130 let svc = StepFunctionsService::new(make_state());
1131 let arn = create_sm(&svc, "del-sm");
1132
1133 let req = make_request(
1134 "DeleteStateMachine",
1135 &json!({"stateMachineArn": arn}).to_string(),
1136 );
1137 svc.delete_state_machine(&req).unwrap();
1138
1139 let req = make_request(
1141 "DescribeStateMachine",
1142 &json!({"stateMachineArn": arn}).to_string(),
1143 );
1144 assert!(svc.describe_state_machine(&req).is_err());
1145 }
1146
1147 #[test]
1148 fn delete_state_machine_nonexistent_succeeds() {
1149 let svc = StepFunctionsService::new(make_state());
1150 let req = make_request(
1151 "DeleteStateMachine",
1152 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1153 .to_string(),
1154 );
1155 svc.delete_state_machine(&req).unwrap();
1157 }
1158
1159 #[test]
1162 fn update_state_machine() {
1163 let svc = StepFunctionsService::new(make_state());
1164 let arn = create_sm(&svc, "upd-sm");
1165
1166 let new_def = r#"{"StartAt":"NewPass","States":{"NewPass":{"Type":"Pass","End":true}}}"#;
1167 let body = json!({
1168 "stateMachineArn": arn,
1169 "definition": new_def,
1170 "description": "updated",
1171 });
1172 let req = make_request("UpdateStateMachine", &body.to_string());
1173 let resp = svc.update_state_machine(&req).unwrap();
1174 let b = body_json(&resp);
1175 assert!(b["updateDate"].as_f64().is_some());
1176
1177 let req = make_request(
1179 "DescribeStateMachine",
1180 &json!({"stateMachineArn": arn}).to_string(),
1181 );
1182 let resp = svc.describe_state_machine(&req).unwrap();
1183 let b = body_json(&resp);
1184 assert!(b["definition"].as_str().unwrap().contains("NewPass"));
1185 assert_eq!(b["description"], "updated");
1186 }
1187
1188 #[test]
1189 fn update_state_machine_not_found() {
1190 let svc = StepFunctionsService::new(make_state());
1191 let body = json!({
1192 "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1193 "definition": VALID_DEF,
1194 });
1195 let req = make_request("UpdateStateMachine", &body.to_string());
1196 let err = expect_err(svc.update_state_machine(&req));
1197 assert!(err.to_string().contains("StateMachineDoesNotExist"));
1198 }
1199
1200 #[tokio::test]
1203 async fn start_execution_basic() {
1204 let svc = StepFunctionsService::new(make_state());
1205 let arn = create_sm(&svc, "exec-sm");
1206
1207 let body = json!({
1208 "stateMachineArn": arn,
1209 "input": r#"{"key":"value"}"#,
1210 });
1211 let req = make_request("StartExecution", &body.to_string());
1212 let resp = svc.start_execution(&req).unwrap();
1213 let b = body_json(&resp);
1214 assert!(b["executionArn"].as_str().is_some());
1215 assert!(b["startDate"].as_f64().is_some());
1216 }
1217
1218 #[tokio::test]
1219 async fn start_execution_with_name() {
1220 let svc = StepFunctionsService::new(make_state());
1221 let arn = create_sm(&svc, "named-exec");
1222
1223 let body = json!({
1224 "stateMachineArn": arn,
1225 "name": "my-execution",
1226 });
1227 let req = make_request("StartExecution", &body.to_string());
1228 let resp = svc.start_execution(&req).unwrap();
1229 let b = body_json(&resp);
1230 assert!(b["executionArn"].as_str().unwrap().contains("my-execution"));
1231 }
1232
1233 #[tokio::test]
1234 async fn start_execution_sm_not_found() {
1235 let svc = StepFunctionsService::new(make_state());
1236 let body = json!({
1237 "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1238 });
1239 let req = make_request("StartExecution", &body.to_string());
1240 let err = expect_err(svc.start_execution(&req));
1241 assert!(err.to_string().contains("StateMachineDoesNotExist"));
1242 }
1243
1244 #[tokio::test]
1245 async fn start_execution_invalid_input() {
1246 let svc = StepFunctionsService::new(make_state());
1247 let arn = create_sm(&svc, "bad-input");
1248
1249 let body = json!({
1250 "stateMachineArn": arn,
1251 "input": "not json",
1252 });
1253 let req = make_request("StartExecution", &body.to_string());
1254 let err = expect_err(svc.start_execution(&req));
1255 assert!(err.to_string().contains("InvalidExecutionInput"));
1256 }
1257
1258 #[tokio::test]
1259 async fn start_execution_duplicate_name() {
1260 let svc = StepFunctionsService::new(make_state());
1261 let arn = create_sm(&svc, "dup-exec");
1262
1263 let body = json!({
1264 "stateMachineArn": arn,
1265 "name": "same-name",
1266 });
1267 let req = make_request("StartExecution", &body.to_string());
1268 svc.start_execution(&req).unwrap();
1269
1270 let req = make_request("StartExecution", &body.to_string());
1271 let err = expect_err(svc.start_execution(&req));
1272 assert!(err.to_string().contains("ExecutionAlreadyExists"));
1273 }
1274
1275 #[tokio::test]
1278 async fn describe_execution_found() {
1279 let svc = StepFunctionsService::new(make_state());
1280 let sm_arn = create_sm(&svc, "desc-exec");
1281
1282 let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
1283 let req = make_request("StartExecution", &body.to_string());
1284 let resp = svc.start_execution(&req).unwrap();
1285 let exec_arn = body_json(&resp)["executionArn"]
1286 .as_str()
1287 .unwrap()
1288 .to_string();
1289
1290 let req = make_request(
1291 "DescribeExecution",
1292 &json!({"executionArn": exec_arn}).to_string(),
1293 );
1294 let resp = svc.describe_execution(&req).unwrap();
1295 let b = body_json(&resp);
1296 assert_eq!(b["name"], "e1");
1297 assert_eq!(b["status"], "RUNNING");
1298 }
1299
1300 #[tokio::test]
1301 async fn describe_execution_not_found() {
1302 let svc = StepFunctionsService::new(make_state());
1303 let req = make_request(
1304 "DescribeExecution",
1305 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1306 .to_string(),
1307 );
1308 let err = expect_err(svc.describe_execution(&req));
1309 assert!(err.to_string().contains("ExecutionDoesNotExist"));
1310 }
1311
1312 #[tokio::test]
1315 async fn stop_execution() {
1316 let svc = StepFunctionsService::new(make_state());
1317 let sm_arn = create_sm(&svc, "stop-sm");
1318
1319 let body = json!({"stateMachineArn": sm_arn, "name": "stop-e"});
1320 let req = make_request("StartExecution", &body.to_string());
1321 let resp = svc.start_execution(&req).unwrap();
1322 let exec_arn = body_json(&resp)["executionArn"]
1323 .as_str()
1324 .unwrap()
1325 .to_string();
1326
1327 let body = json!({
1328 "executionArn": exec_arn,
1329 "error": "UserAborted",
1330 "cause": "test stop",
1331 });
1332 let req = make_request("StopExecution", &body.to_string());
1333 let resp = svc.stop_execution(&req).unwrap();
1334 let b = body_json(&resp);
1335 assert!(b["stopDate"].as_f64().is_some());
1336
1337 let req = make_request(
1339 "DescribeExecution",
1340 &json!({"executionArn": exec_arn}).to_string(),
1341 );
1342 let resp = svc.describe_execution(&req).unwrap();
1343 let b = body_json(&resp);
1344 assert_eq!(b["status"], "ABORTED");
1345 assert_eq!(b["error"], "UserAborted");
1346 }
1347
1348 #[tokio::test]
1349 async fn stop_execution_not_found() {
1350 let svc = StepFunctionsService::new(make_state());
1351 let req = make_request(
1352 "StopExecution",
1353 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1354 .to_string(),
1355 );
1356 let err = expect_err(svc.stop_execution(&req));
1357 assert!(err.to_string().contains("ExecutionDoesNotExist"));
1358 }
1359
1360 #[tokio::test]
1363 async fn list_executions() {
1364 let svc = StepFunctionsService::new(make_state());
1365 let sm_arn = create_sm(&svc, "list-exec");
1366
1367 for i in 0..3 {
1368 let body = json!({"stateMachineArn": sm_arn, "name": format!("e{i}")});
1369 let req = make_request("StartExecution", &body.to_string());
1370 svc.start_execution(&req).unwrap();
1371 }
1372
1373 let req = make_request(
1374 "ListExecutions",
1375 &json!({"stateMachineArn": sm_arn}).to_string(),
1376 );
1377 let resp = svc.list_executions(&req).unwrap();
1378 let b = body_json(&resp);
1379 assert_eq!(b["executions"].as_array().unwrap().len(), 3);
1380 }
1381
1382 #[tokio::test]
1383 async fn list_executions_sm_not_found() {
1384 let svc = StepFunctionsService::new(make_state());
1385 let req = make_request(
1386 "ListExecutions",
1387 &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1388 .to_string(),
1389 );
1390 let err = expect_err(svc.list_executions(&req));
1391 assert!(err.to_string().contains("StateMachineDoesNotExist"));
1392 }
1393
1394 #[tokio::test]
1397 async fn get_execution_history_not_found() {
1398 let svc = StepFunctionsService::new(make_state());
1399 let req = make_request(
1400 "GetExecutionHistory",
1401 &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1402 .to_string(),
1403 );
1404 let err = expect_err(svc.get_execution_history(&req));
1405 assert!(err.to_string().contains("ExecutionDoesNotExist"));
1406 }
1407
1408 #[tokio::test]
1411 async fn describe_sm_for_execution() {
1412 let svc = StepFunctionsService::new(make_state());
1413 let sm_arn = create_sm(&svc, "sm-for-exec");
1414
1415 let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
1416 let req = make_request("StartExecution", &body.to_string());
1417 let resp = svc.start_execution(&req).unwrap();
1418 let exec_arn = body_json(&resp)["executionArn"]
1419 .as_str()
1420 .unwrap()
1421 .to_string();
1422
1423 let req = make_request(
1424 "DescribeStateMachineForExecution",
1425 &json!({"executionArn": exec_arn}).to_string(),
1426 );
1427 let resp = svc.describe_state_machine_for_execution(&req).unwrap();
1428 let b = body_json(&resp);
1429 assert_eq!(b["name"], "sm-for-exec");
1430 }
1431
1432 #[test]
1435 fn tag_untag_list_tags() {
1436 let svc = StepFunctionsService::new(make_state());
1437 let arn = create_sm(&svc, "tagged-sm");
1438
1439 let body = json!({
1441 "resourceArn": arn,
1442 "tags": [{"key": "env", "value": "prod"}],
1443 });
1444 let req = make_request("TagResource", &body.to_string());
1445 svc.tag_resource(&req).unwrap();
1446
1447 let req = make_request(
1449 "ListTagsForResource",
1450 &json!({"resourceArn": arn}).to_string(),
1451 );
1452 let resp = svc.list_tags_for_resource(&req).unwrap();
1453 let b = body_json(&resp);
1454 let tags = b["tags"].as_array().unwrap();
1455 assert_eq!(tags.len(), 1);
1456 assert_eq!(tags[0]["key"], "env");
1457
1458 let body = json!({
1460 "resourceArn": arn,
1461 "tagKeys": ["env"],
1462 });
1463 let req = make_request("UntagResource", &body.to_string());
1464 svc.untag_resource(&req).unwrap();
1465
1466 let req = make_request(
1468 "ListTagsForResource",
1469 &json!({"resourceArn": arn}).to_string(),
1470 );
1471 let resp = svc.list_tags_for_resource(&req).unwrap();
1472 let b = body_json(&resp);
1473 assert!(b["tags"].as_array().unwrap().is_empty());
1474 }
1475
1476 #[test]
1477 fn tag_resource_not_found() {
1478 let svc = StepFunctionsService::new(make_state());
1479 let body = json!({
1480 "resourceArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1481 "tags": [{"key": "k", "value": "v"}],
1482 });
1483 let req = make_request("TagResource", &body.to_string());
1484 let err = expect_err(svc.tag_resource(&req));
1485 assert!(err.to_string().contains("ResourceNotFound"));
1486 }
1487
1488 #[test]
1491 fn test_validate_name() {
1492 assert!(validate_name("valid-name").is_ok());
1493 assert!(validate_name("under_score").is_ok());
1494 assert!(validate_name("").is_err());
1495 assert!(validate_name("has spaces").is_err());
1496 assert!(validate_name(&"a".repeat(81)).is_err());
1497 }
1498
1499 #[test]
1500 fn test_validate_definition() {
1501 assert!(validate_definition(VALID_DEF).is_ok());
1502 assert!(validate_definition("not json").is_err());
1503 assert!(validate_definition(r#"{"States":{}}"#).is_err()); assert!(validate_definition(r#"{"StartAt":"S"}"#).is_err()); }
1506
1507 #[test]
1508 fn test_validate_definition_rejects_malformed_paths() {
1509 let def =
1511 r#"{"StartAt":"P","States":{"P":{"Type":"Pass","InputPath":"$.arr[","End":true}}}"#;
1512 assert!(validate_definition(def).is_err());
1513
1514 let def =
1516 "{\"StartAt\":\"P\",\"States\":{\"P\":{\"Type\":\"Pass\",\"OutputPath\":\"$.x[\u{00e9}\",\"End\":true}}}";
1517 assert!(validate_definition(def).is_err());
1518
1519 let def = r#"{"StartAt":"C","States":{"C":{"Type":"Choice","Choices":[{"Variable":"$.n[","NumericEquals":1,"Next":"P"}]},"P":{"Type":"Pass","End":true}}}"#;
1521 assert!(validate_definition(def).is_err());
1522
1523 let def =
1525 r#"{"StartAt":"P","States":{"P":{"Type":"Pass","InputPath":"foo.bar","End":true}}}"#;
1526 assert!(validate_definition(def).is_err());
1527
1528 let def =
1530 r#"{"StartAt":"P","States":{"P":{"Type":"Pass","ResultPath":"$.x[]","End":true}}}"#;
1531 assert!(validate_definition(def).is_err());
1532 }
1533
1534 #[test]
1535 fn test_validate_definition_accepts_well_formed_paths() {
1536 let def = r#"{"StartAt":"P","States":{
1539 "P":{"Type":"Pass","InputPath":"$.a.b[0].c","OutputPath":"$","ResultPath":"$.out","Next":"C"},
1540 "C":{"Type":"Choice","Choices":[
1541 {"And":[{"Variable":"$.items[2]","NumericEquals":1},{"Variable":"$.flag","BooleanEquals":true}],"Next":"S"}
1542 ],"Default":"S"},
1543 "S":{"Type":"Succeed","InputPath":"null"}
1544 }}"#;
1545 assert!(validate_definition(def).is_ok());
1546 }
1547
1548 #[test]
1549 fn test_is_valid_reference_path() {
1550 assert!(is_valid_reference_path("$"));
1551 assert!(is_valid_reference_path("$.foo"));
1552 assert!(is_valid_reference_path("$.foo.bar[3].baz"));
1553 assert!(is_valid_reference_path("$[0]"));
1554 assert!(!is_valid_reference_path("$.arr["));
1555 assert!(!is_valid_reference_path("$.x[\u{00e9}"));
1556 assert!(!is_valid_reference_path("$.x[]"));
1557 assert!(!is_valid_reference_path("$.x[abc]"));
1558 assert!(!is_valid_reference_path("foo.bar"));
1559 assert!(!is_valid_reference_path(""));
1560 }
1561
1562 #[test]
1563 fn test_validate_arn() {
1564 assert!(validate_arn("arn:aws:states:us-east-1:123:sm:test").is_ok());
1565 assert!(validate_arn("not-an-arn").is_err());
1566 }
1567
1568 #[test]
1569 fn test_camel_to_details_key() {
1570 assert_eq!(camel_to_details_key("PassStateEntered"), "passStateEntered");
1571 assert_eq!(camel_to_details_key(""), "");
1572 }
1573
1574 #[test]
1575 fn test_is_mutating_action() {
1576 assert!(is_mutating_action("CreateStateMachine"));
1577 assert!(is_mutating_action("StartExecution"));
1578 assert!(!is_mutating_action("DescribeStateMachine"));
1579 assert!(!is_mutating_action("ListStateMachines"));
1580 }
1581
1582 fn create_express_sm(svc: &StepFunctionsService, name: &str) -> String {
1585 let body = json!({
1586 "name": name,
1587 "definition": VALID_DEF,
1588 "roleArn": "arn:aws:iam::123456789012:role/test",
1589 "type": "EXPRESS",
1590 });
1591 let req = make_request("CreateStateMachine", &body.to_string());
1592 let resp = svc.create_state_machine(&req).unwrap();
1593 let b = body_json(&resp);
1594 b["stateMachineArn"].as_str().unwrap().to_string()
1595 }
1596
1597 #[tokio::test]
1598 async fn start_sync_execution_basic() {
1599 let svc = StepFunctionsService::new(make_state());
1600 let arn = create_express_sm(&svc, "sync-sm");
1601
1602 let body = json!({
1603 "stateMachineArn": arn,
1604 "input": r#"{"key":"value"}"#,
1605 });
1606 let req = make_request("StartSyncExecution", &body.to_string());
1607 let resp = svc.start_sync_execution(&req).await.unwrap();
1608 let b = body_json(&resp);
1609 assert!(b["executionArn"]
1610 .as_str()
1611 .unwrap()
1612 .contains("express:sync-sm"));
1613 assert_eq!(b["stateMachineArn"], arn);
1614 assert_eq!(b["status"], "SUCCEEDED");
1615 assert!(b["startDate"].as_i64().is_some());
1616 assert!(b["stopDate"].as_i64().is_some());
1617 assert!(b["output"].as_str().is_some());
1618 assert!(b["billingDetails"]["billedDurationInMilliseconds"]
1619 .as_i64()
1620 .is_some());
1621 }
1622
1623 #[tokio::test]
1624 async fn start_sync_execution_not_express() {
1625 let svc = StepFunctionsService::new(make_state());
1626 let arn = create_sm(&svc, "std-sm");
1627
1628 let body = json!({"stateMachineArn": arn});
1629 let req = make_request("StartSyncExecution", &body.to_string());
1630 let err = expect_err(svc.start_sync_execution(&req).await);
1631 assert!(err.to_string().contains("StateMachineTypeNotSupported"));
1632 }
1633
1634 #[tokio::test]
1635 async fn start_sync_execution_sm_not_found() {
1636 let svc = StepFunctionsService::new(make_state());
1637 let body = json!({
1638 "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1639 });
1640 let req = make_request("StartSyncExecution", &body.to_string());
1641 let err = expect_err(svc.start_sync_execution(&req).await);
1642 assert!(err.to_string().contains("StateMachineDoesNotExist"));
1643 }
1644
1645 #[tokio::test]
1646 async fn start_sync_execution_records_introspection_fields() {
1647 let svc = StepFunctionsService::new(make_state());
1648 let arn = create_express_sm(&svc, "sync-introspect");
1649
1650 let body = json!({"stateMachineArn": arn, "input": "{}"});
1651 let req = make_request("StartSyncExecution", &body.to_string());
1652 let resp = svc.start_sync_execution(&req).await.unwrap();
1653 let b = body_json(&resp);
1654 let exec_arn = b["executionArn"].as_str().unwrap().to_string();
1655
1656 let accounts = svc.state.read();
1657 let state = accounts.get("123456789012").unwrap();
1658 let stored = state
1659 .executions
1660 .get(&exec_arn)
1661 .expect("sync execution should be persisted for introspection");
1662 assert!(stored.is_sync, "sync executions must be marked is_sync");
1663 assert_eq!(stored.billed_memory_mb, Some(64));
1664 assert!(
1665 stored.billed_duration_ms.is_some(),
1666 "billed_duration_ms must be populated after sync run"
1667 );
1668 assert!(
1669 stored.parent_execution_arn.is_none(),
1670 "top-level sync execution has no parent"
1671 );
1672 }
1673
1674 #[tokio::test]
1675 async fn start_sync_execution_invalid_input() {
1676 let svc = StepFunctionsService::new(make_state());
1677 let arn = create_express_sm(&svc, "bad-input-sync");
1678
1679 let body = json!({
1680 "stateMachineArn": arn,
1681 "input": "not json",
1682 });
1683 let req = make_request("StartSyncExecution", &body.to_string());
1684 let err = expect_err(svc.start_sync_execution(&req).await);
1685 assert!(err.to_string().contains("InvalidExecutionInput"));
1686 }
1687
1688 #[test]
1690 fn snapshot_hook_is_none_without_store() {
1691 let svc = StepFunctionsService::new(make_state());
1692 assert!(svc.snapshot_hook().is_none());
1693 }
1694
1695 #[tokio::test]
1699 async fn snapshot_hook_fires_with_store() {
1700 let store: Arc<dyn fakecloud_persistence::SnapshotStore> =
1701 Arc::new(fakecloud_persistence::MemorySnapshotStore::new());
1702 let svc = StepFunctionsService::new(make_state()).with_snapshot_store(store);
1703 let hook = svc
1704 .snapshot_hook()
1705 .expect("hook present when a store is set");
1706 hook().await;
1707 }
1708}
1709
1710#[cfg(test)]
1711mod pagination_reject_test {
1712 #[test]
1713 fn paginate_checked_rejects_invalid_token() {
1714 use fakecloud_core::pagination::paginate_checked;
1715 let items: Vec<i32> = (0..5).collect();
1716 assert!(paginate_checked(&items, Some("bad"), 3).is_err());
1717 assert!(paginate_checked(&items, Some("2"), 3).is_ok());
1718 }
1719}