Skip to main content

fakecloud_stepfunctions/service/
mod.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use chrono::Utc;
6use http::StatusCode;
7use serde_json::{json, Value};
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::pagination::paginate_checked;
12use fakecloud_core::registry::ServiceRegistry;
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_core::validation::*;
15use fakecloud_dynamodb::SharedDynamoDbState;
16use fakecloud_persistence::SnapshotStore;
17
18use crate::interpreter;
19use crate::state::{
20    Execution, ExecutionStatus, SharedStepFunctionsState, StateMachine, StateMachineStatus,
21    StateMachineType, StepFunctionsSnapshot, StepFunctionsState,
22    STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
23};
24
25const SUPPORTED: &[&str] = &[
26    "CreateStateMachine",
27    "DescribeStateMachine",
28    "ListStateMachines",
29    "DeleteStateMachine",
30    "UpdateStateMachine",
31    "TagResource",
32    "UntagResource",
33    "ListTagsForResource",
34    "StartExecution",
35    "StopExecution",
36    "DescribeExecution",
37    "ListExecutions",
38    "GetExecutionHistory",
39    "DescribeStateMachineForExecution",
40    "CreateActivity",
41    "DeleteActivity",
42    "DescribeActivity",
43    "ListActivities",
44    "GetActivityTask",
45    "SendTaskFailure",
46    "SendTaskHeartbeat",
47    "SendTaskSuccess",
48    "PublishStateMachineVersion",
49    "DeleteStateMachineVersion",
50    "ListStateMachineVersions",
51    "CreateStateMachineAlias",
52    "DeleteStateMachineAlias",
53    "DescribeStateMachineAlias",
54    "ListStateMachineAliases",
55    "UpdateStateMachineAlias",
56    "DescribeMapRun",
57    "ListMapRuns",
58    "UpdateMapRun",
59    "RedriveExecution",
60    "StartSyncExecution",
61    "TestState",
62    "ValidateStateMachineDefinition",
63];
64
65/// Handle to the central service registry, set by `main.rs` after every service
66/// has been registered. Wrapped in `OnceLock` so `StepFunctionsService` can be
67/// constructed (and registered into the very registry it later reads back) before
68/// the registry itself is finalized. The interpreter snapshots the inner `Arc`
69/// when it needs to dispatch generic `aws-sdk:*` Task integrations.
70pub type SharedServiceRegistry = Arc<std::sync::OnceLock<Arc<ServiceRegistry>>>;
71
72pub struct StepFunctionsService {
73    state: SharedStepFunctionsState,
74    delivery: Option<Arc<DeliveryBus>>,
75    dynamodb_state: Option<SharedDynamoDbState>,
76    registry: Option<SharedServiceRegistry>,
77    snapshot_store: Option<Arc<dyn SnapshotStore>>,
78    snapshot_lock: Arc<AsyncMutex<()>>,
79}
80
81mod activities;
82mod executions;
83mod map_runs;
84mod state_machines;
85mod tags;
86mod tasks;
87mod validation;
88
89impl StepFunctionsService {
90    pub fn new(state: SharedStepFunctionsState) -> Self {
91        Self {
92            state,
93            delivery: None,
94            dynamodb_state: None,
95            registry: None,
96            snapshot_store: None,
97            snapshot_lock: Arc::new(AsyncMutex::new(())),
98        }
99    }
100
101    pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
102        self.delivery = Some(delivery);
103        self
104    }
105
106    pub fn with_dynamodb(mut self, dynamodb_state: SharedDynamoDbState) -> Self {
107        self.dynamodb_state = Some(dynamodb_state);
108        self
109    }
110
111    /// Hand the service a deferred-fill handle to the central [`ServiceRegistry`].
112    /// `main.rs` calls [`OnceLock::set`] on the inner cell after every service
113    /// has been registered; until then the interpreter falls back to its
114    /// hand-coded SDK integrations (lambda invoke, sqs sendMessage, …).
115    pub fn with_registry(mut self, registry: SharedServiceRegistry) -> Self {
116        self.registry = Some(registry);
117        self
118    }
119
120    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
121        self.snapshot_store = Some(store);
122        self
123    }
124
125    async fn save_snapshot(&self) {
126        save_stepfunctions_snapshot(
127            &self.state,
128            self.snapshot_store.clone(),
129            &self.snapshot_lock,
130        )
131        .await;
132    }
133
134    /// Build a hook that persists the current Step Functions state when invoked,
135    /// or `None` in memory mode (no snapshot store). The CloudFormation
136    /// provisioner mutates `state` directly and uses this to write a
137    /// CFN-provisioned resource through to disk, the same way a direct mutating
138    /// API call would.
139    pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
140        let store = self.snapshot_store.clone()?;
141        let state = self.state.clone();
142        let lock = self.snapshot_lock.clone();
143        Some(Arc::new(move || {
144            let state = state.clone();
145            let store = store.clone();
146            let lock = lock.clone();
147            Box::pin(async move {
148                save_stepfunctions_snapshot(&state, Some(store), &lock).await;
149            })
150        }))
151    }
152}
153
154/// Persist the current Step Functions state as a snapshot. Offloads the serde +
155/// blocking file write to the Tokio blocking pool. Noop when `store` is `None`
156/// (memory mode). Shared by `StepFunctionsService::save_snapshot` and the
157/// CloudFormation provisioner's post-provision persist hook so both route
158/// through the same serialize-and-write path.
159pub async fn save_stepfunctions_snapshot(
160    state: &SharedStepFunctionsState,
161    store: Option<Arc<dyn SnapshotStore>>,
162    lock: &AsyncMutex<()>,
163) {
164    let Some(store) = store else {
165        return;
166    };
167    let _guard = lock.lock().await;
168    let snapshot = StepFunctionsSnapshot {
169        schema_version: STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
170        state: None,
171        accounts: Some(state.read().clone()),
172    };
173    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
174        let bytes = serde_json::to_vec(&snapshot)
175            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
176        store.save(&bytes)
177    })
178    .await;
179    match join {
180        Ok(Ok(())) => {}
181        Ok(Err(err)) => tracing::error!(%err, "failed to write stepfunctions snapshot"),
182        Err(err) => tracing::error!(%err, "stepfunctions snapshot task panicked"),
183    }
184}
185
186fn is_mutating_action(action: &str) -> bool {
187    matches!(
188        action,
189        "CreateStateMachine"
190            | "DeleteStateMachine"
191            | "UpdateStateMachine"
192            | "TagResource"
193            | "UntagResource"
194            | "StartExecution"
195            | "StopExecution"
196            | "CreateActivity"
197            | "DeleteActivity"
198            | "GetActivityTask"
199            | "SendTaskFailure"
200            | "SendTaskHeartbeat"
201            | "SendTaskSuccess"
202            | "PublishStateMachineVersion"
203            | "DeleteStateMachineVersion"
204            | "CreateStateMachineAlias"
205            | "DeleteStateMachineAlias"
206            | "UpdateStateMachineAlias"
207            | "UpdateMapRun"
208            | "RedriveExecution"
209            | "StartSyncExecution"
210    )
211}
212
213#[async_trait]
214impl AwsService for StepFunctionsService {
215    fn service_name(&self) -> &str {
216        "states"
217    }
218
219    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
220        let mutates = is_mutating_action(req.action.as_str());
221        let result = match req.action.as_str() {
222            "CreateStateMachine" => self.create_state_machine(&req),
223            "DescribeStateMachine" => self.describe_state_machine(&req),
224            "ListStateMachines" => self.list_state_machines(&req),
225            "DeleteStateMachine" => self.delete_state_machine(&req),
226            "UpdateStateMachine" => self.update_state_machine(&req),
227            "TagResource" => self.tag_resource(&req),
228            "UntagResource" => self.untag_resource(&req),
229            "ListTagsForResource" => self.list_tags_for_resource(&req),
230            "StartExecution" => self.start_execution(&req),
231            "StopExecution" => self.stop_execution(&req),
232            "DescribeExecution" => self.describe_execution(&req),
233            "ListExecutions" => self.list_executions(&req),
234            "GetExecutionHistory" => self.get_execution_history(&req),
235            "DescribeStateMachineForExecution" => self.describe_state_machine_for_execution(&req),
236            "CreateActivity" => self.create_activity(&req),
237            "DeleteActivity" => self.delete_activity(&req),
238            "DescribeActivity" => self.describe_activity(&req),
239            "ListActivities" => self.list_activities(&req),
240            "GetActivityTask" => self.get_activity_task(&req).await,
241            "SendTaskFailure" => self.send_task_failure(&req),
242            "SendTaskHeartbeat" => self.send_task_heartbeat(&req),
243            "SendTaskSuccess" => self.send_task_success(&req),
244            "PublishStateMachineVersion" => self.publish_state_machine_version(&req),
245            "DeleteStateMachineVersion" => self.delete_state_machine_version(&req),
246            "ListStateMachineVersions" => self.list_state_machine_versions(&req),
247            "CreateStateMachineAlias" => self.create_state_machine_alias(&req),
248            "DeleteStateMachineAlias" => self.delete_state_machine_alias(&req),
249            "DescribeStateMachineAlias" => self.describe_state_machine_alias(&req),
250            "ListStateMachineAliases" => self.list_state_machine_aliases(&req),
251            "UpdateStateMachineAlias" => self.update_state_machine_alias(&req),
252            "DescribeMapRun" => self.describe_map_run(&req),
253            "ListMapRuns" => self.list_map_runs(&req),
254            "UpdateMapRun" => self.update_map_run(&req),
255            "RedriveExecution" => self.redrive_execution(&req),
256            "StartSyncExecution" => self.start_sync_execution(&req).await,
257            "TestState" => self.test_state(&req),
258            "ValidateStateMachineDefinition" => self.validate_state_machine_definition(&req),
259            _ => Err(AwsServiceError::action_not_implemented(
260                "states",
261                &req.action,
262            )),
263        };
264        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
265            self.save_snapshot().await;
266        }
267        result
268    }
269
270    fn supported_actions(&self) -> &[&str] {
271        SUPPORTED
272    }
273}
274
275impl StepFunctionsService {
276    fn list_activities(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
277        let body = req.json_body();
278        let raw_max_results = body["maxResults"].as_i64();
279        if let Some(mr) = raw_max_results {
280            validate_max_results(mr)?;
281        }
282        let next_token = body["nextToken"].as_str();
283        if let Some(t) = next_token {
284            validate_page_token(t)?;
285        }
286        // Smithy default: maxResults=0 means "use the service default"
287        // (100 for Step Functions), which we honour by treating both
288        // `None` and `0` as 100.
289        let max_results = match raw_max_results.unwrap_or(0) {
290            0 => 100,
291            n => n as usize,
292        };
293        let accounts = self.state.read();
294        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
295        let state = accounts.get(&req.account_id).unwrap_or(&empty);
296        let mut activities: Vec<&crate::state::Activity> = state.activities.values().collect();
297        activities.sort_by(|a, b| a.name.cmp(&b.name));
298        let items: Vec<Value> = activities
299            .iter()
300            .map(|a| {
301                json!({
302                    "activityArn": a.arn,
303                    "name": a.name,
304                    "creationDate": a.creation_date.timestamp(),
305                })
306            })
307            .collect();
308        let (page, token) =
309            paginate_checked(&items, next_token, max_results).map_err(|_| invalid_token())?;
310        let mut resp = json!({ "activities": page });
311        if let Some(t) = token {
312            resp["nextToken"] = json!(t);
313        }
314        Ok(AwsResponse::ok_json(resp))
315    }
316}
317
318fn state_machine_alias_to_json(alias: &crate::state::StateMachineAlias) -> Value {
319    json!({
320        "stateMachineAliasArn": alias.arn,
321        "name": alias.name,
322        "description": alias.description,
323        "routingConfiguration": alias.routing_configuration.iter().map(|r| json!({
324            "stateMachineVersionArn": r.state_machine_version_arn,
325            "weight": r.weight,
326        })).collect::<Vec<_>>(),
327        "creationDate": alias.creation_date.timestamp(),
328        "updateDate": alias.update_date.timestamp(),
329    })
330}
331
332fn map_run_to_json(mr: &crate::state::MapRun) -> Value {
333    json!({
334        "mapRunArn": mr.map_run_arn,
335        "executionArn": mr.execution_arn,
336        "maxConcurrency": mr.max_concurrency,
337        "toleratedFailurePercentage": mr.tolerated_failure_percentage,
338        "toleratedFailureCount": mr.tolerated_failure_count,
339        "status": mr.status,
340        "startDate": mr.start_date.timestamp(),
341        "stopDate": mr.stop_date.map(|d| d.timestamp()),
342    })
343}
344
345// ─── Helpers ────────────────────────────────────────────────────────────
346
347fn state_machine_to_json(sm: &StateMachine) -> Value {
348    let mut resp = json!({
349        "name": sm.name,
350        "stateMachineArn": sm.arn,
351        "definition": sm.definition,
352        "roleArn": sm.role_arn,
353        "type": sm.machine_type.as_str(),
354        "status": sm.status.as_str(),
355        "creationDate": sm.creation_date.timestamp() as f64,
356        "updateDate": sm.update_date.timestamp() as f64,
357        "revisionId": sm.revision_id,
358        "label": sm.name,
359    });
360
361    if !sm.description.is_empty() {
362        resp["description"] = json!(sm.description);
363    }
364
365    if let Some(ref logging) = sm.logging_configuration {
366        resp["loggingConfiguration"] = logging.clone();
367    } else {
368        resp["loggingConfiguration"] = json!({
369            "level": "OFF",
370            "includeExecutionData": false,
371            "destinations": [],
372        });
373    }
374
375    if let Some(ref tracing) = sm.tracing_configuration {
376        resp["tracingConfiguration"] = tracing.clone();
377    } else {
378        resp["tracingConfiguration"] = json!({
379            "enabled": false,
380        });
381    }
382
383    resp
384}
385
386fn missing(name: &str) -> AwsServiceError {
387    AwsServiceError::aws_error(
388        StatusCode::BAD_REQUEST,
389        "ValidationException",
390        format!("The request must contain the parameter {name}."),
391    )
392}
393
394fn state_machine_not_found(arn: &str) -> AwsServiceError {
395    AwsServiceError::aws_error(
396        StatusCode::BAD_REQUEST,
397        "StateMachineDoesNotExist",
398        format!("State Machine Does Not Exist: '{arn}'"),
399    )
400}
401
402fn activity_not_found(arn: &str) -> AwsServiceError {
403    AwsServiceError::aws_error(
404        StatusCode::BAD_REQUEST,
405        "ActivityDoesNotExist",
406        format!("Activity does not exist: {arn}"),
407    )
408}
409
410fn task_does_not_exist(token: &str) -> AwsServiceError {
411    AwsServiceError::aws_error(
412        StatusCode::BAD_REQUEST,
413        "TaskDoesNotExist",
414        format!("Task does not exist: {token}"),
415    )
416}
417
418fn resource_not_found(arn: &str) -> AwsServiceError {
419    AwsServiceError::aws_error(
420        StatusCode::BAD_REQUEST,
421        "ResourceNotFound",
422        format!("Resource not found: '{arn}'"),
423    )
424}
425
426/// Parse + validate an alias `routingConfiguration` array.
427///
428/// AWS rules: 1 or 2 routes; weights are 0-100 and sum to 100; each
429/// route must include `stateMachineVersionArn`.
430fn parse_routing_configuration(
431    routes: &[serde_json::Value],
432) -> Result<Vec<crate::state::AliasRoute>, AwsServiceError> {
433    if routes.is_empty() || routes.len() > 2 {
434        return Err(AwsServiceError::aws_error(
435            StatusCode::BAD_REQUEST,
436            "ValidationException",
437            "routingConfiguration must contain 1 or 2 routes.",
438        ));
439    }
440    let parsed: Vec<crate::state::AliasRoute> = routes
441        .iter()
442        .map(|r| {
443            let arn = r["stateMachineVersionArn"].as_str().ok_or_else(|| {
444                AwsServiceError::aws_error(
445                    StatusCode::BAD_REQUEST,
446                    "ValidationException",
447                    "routingConfiguration entries must contain stateMachineVersionArn.",
448                )
449            })?;
450            let weight = r["weight"].as_i64().ok_or_else(|| {
451                AwsServiceError::aws_error(
452                    StatusCode::BAD_REQUEST,
453                    "ValidationException",
454                    "routingConfiguration entries must contain a numeric weight.",
455                )
456            })?;
457            if !(0..=100).contains(&weight) {
458                return Err(AwsServiceError::aws_error(
459                    StatusCode::BAD_REQUEST,
460                    "ValidationException",
461                    format!("Invalid routing weight {weight}; must be 0-100."),
462                ));
463            }
464            Ok(crate::state::AliasRoute {
465                state_machine_version_arn: arn.to_string(),
466                weight: weight as i32,
467            })
468        })
469        .collect::<Result<_, _>>()?;
470    let total: i32 = parsed.iter().map(|r| r.weight).sum();
471    if total != 100 {
472        return Err(AwsServiceError::aws_error(
473            StatusCode::BAD_REQUEST,
474            "ValidationException",
475            format!("routingConfiguration weights must sum to 100, got {total}."),
476        ));
477    }
478    Ok(parsed)
479}
480
481fn validate_name(name: &str) -> Result<(), AwsServiceError> {
482    if name.is_empty() || name.len() > 80 {
483        return Err(AwsServiceError::aws_error(
484            StatusCode::BAD_REQUEST,
485            "InvalidName",
486            format!("Invalid Name: '{name}' (length must be between 1 and 80 characters)"),
487        ));
488    }
489    // Only allow alphanumeric, hyphens, and underscores
490    if !name
491        .chars()
492        .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
493    {
494        return Err(AwsServiceError::aws_error(
495            StatusCode::BAD_REQUEST,
496            "InvalidName",
497            format!(
498                "Invalid Name: '{name}' (must only contain alphanumeric characters, hyphens, and underscores)"
499            ),
500        ));
501    }
502    Ok(())
503}
504
505fn validate_definition(definition: &str) -> Result<(), AwsServiceError> {
506    let parsed: Value = serde_json::from_str(definition).map_err(|e| {
507        AwsServiceError::aws_error(
508            StatusCode::BAD_REQUEST,
509            "InvalidDefinition",
510            format!("Invalid State Machine Definition: '{e}'"),
511        )
512    })?;
513
514    if parsed.get("StartAt").and_then(|v| v.as_str()).is_none() {
515        return Err(AwsServiceError::aws_error(
516            StatusCode::BAD_REQUEST,
517            "InvalidDefinition",
518            "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
519                .to_string(),
520        ));
521    }
522
523    let states_obj = parsed
524        .get("States")
525        .and_then(|v| v.as_object())
526        .ok_or_else(|| {
527            AwsServiceError::aws_error(
528                StatusCode::BAD_REQUEST,
529                "InvalidDefinition",
530                "Invalid State Machine Definition: 'MISSING_STATES' (States field is required)"
531                    .to_string(),
532            )
533        })?;
534
535    let start_at = parsed["StartAt"].as_str().ok_or_else(|| {
536        AwsServiceError::aws_error(
537            StatusCode::BAD_REQUEST,
538            "InvalidDefinition",
539            "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
540                .to_string(),
541        )
542    })?;
543    if !states_obj.contains_key(start_at) {
544        return Err(AwsServiceError::aws_error(
545            StatusCode::BAD_REQUEST,
546            "InvalidDefinition",
547            format!(
548                "Invalid State Machine Definition: 'MISSING_TRANSITION_TARGET' \
549                 (StartAt '{start_at}' does not reference a valid state)"
550            ),
551        ));
552    }
553
554    // Reject malformed JSONPath reference fields. AWS rejects bad reference
555    // paths at CreateStateMachine; accepting them here lets a panic-inducing
556    // path reach the interpreter at execution time.
557    for (state_name, state_val) in states_obj {
558        validate_state_paths(state_name, state_val)?;
559    }
560
561    Ok(())
562}
563
564/// Validate the JSONPath reference fields of a single state. Recurses into the
565/// `Choices` array of Choice states. Returns an `InvalidDefinition` error for
566/// any syntactically malformed path.
567fn validate_state_paths(state_name: &str, state: &Value) -> Result<(), AwsServiceError> {
568    for field in ["InputPath", "OutputPath", "ResultPath"] {
569        if let Some(p) = state.get(field).and_then(|v| v.as_str()) {
570            // `InputPath`/`OutputPath` accept the literal "null" to mean "no
571            // value"; `ResultPath` accepts JSON null (handled separately) but
572            // not the string "null". Both accept "$"-rooted reference paths.
573            if (field != "ResultPath" && p == "null") || is_valid_reference_path(p) {
574                continue;
575            }
576            return Err(invalid_reference_path(state_name, field, p));
577        }
578    }
579
580    if state.get("Type").and_then(|v| v.as_str()) == Some("Choice") {
581        if let Some(choices) = state.get("Choices").and_then(|v| v.as_array()) {
582            for rule in choices {
583                validate_choice_variables(state_name, rule)?;
584            }
585        }
586    }
587
588    Ok(())
589}
590
591/// Validate `Variable` reference paths inside a Choice rule, recursing through
592/// nested `And` / `Or` / `Not` boolean combinators.
593fn validate_choice_variables(state_name: &str, rule: &Value) -> Result<(), AwsServiceError> {
594    if let Some(v) = rule.get("Variable").and_then(|v| v.as_str()) {
595        if !is_valid_reference_path(v) {
596            return Err(invalid_reference_path(state_name, "Variable", v));
597        }
598    }
599    for combinator in ["And", "Or"] {
600        if let Some(nested) = rule.get(combinator).and_then(|v| v.as_array()) {
601            for n in nested {
602                validate_choice_variables(state_name, n)?;
603            }
604        }
605    }
606    if let Some(n) = rule.get("Not") {
607        validate_choice_variables(state_name, n)?;
608    }
609    Ok(())
610}
611
612/// A reference path must start with `$` and every `[...]` index segment must be
613/// a balanced, non-empty, decimal-integer index. This mirrors the subset of
614/// JSONPath the interpreter understands; anything it cannot parse is rejected.
615fn is_valid_reference_path(path: &str) -> bool {
616    if path != "$" && !path.starts_with("$.") && !path.starts_with("$[") {
617        return false;
618    }
619    let body = path
620        .strip_prefix("$.")
621        .or_else(|| path.strip_prefix('$'))
622        .unwrap_or(path);
623    for part in body.split('.') {
624        if !segment_is_valid(part) {
625            return false;
626        }
627    }
628    true
629}
630
631fn segment_is_valid(part: &str) -> bool {
632    match part.find('[') {
633        None => !part.contains(']'),
634        Some(open) => {
635            if !part.ends_with(']') {
636                return false;
637            }
638            let inner = &part[open + 1..part.len() - 1];
639            // Empty `[]` or non-integer (incl. multibyte/garbage) indices are invalid.
640            !inner.is_empty() && inner.parse::<usize>().is_ok()
641        }
642    }
643}
644
645fn invalid_reference_path(state_name: &str, field: &str, path: &str) -> AwsServiceError {
646    AwsServiceError::aws_error(
647        StatusCode::BAD_REQUEST,
648        "InvalidDefinition",
649        format!(
650            "Invalid State Machine Definition: 'SCHEMA_VALIDATION_FAILED' \
651             (The {field} field of state '{state_name}' is not a valid reference path: '{path}')"
652        ),
653    )
654}
655
656fn execution_not_found(arn: &str) -> AwsServiceError {
657    AwsServiceError::aws_error(
658        StatusCode::BAD_REQUEST,
659        "ExecutionDoesNotExist",
660        format!("Execution Does Not Exist: '{arn}'"),
661    )
662}
663
664fn execution_to_json(exec: &Execution) -> Value {
665    let mut resp = json!({
666        "executionArn": exec.execution_arn,
667        "stateMachineArn": exec.state_machine_arn,
668        "name": exec.name,
669        "status": exec.status.as_str(),
670        "startDate": exec.start_date.timestamp() as f64,
671    });
672
673    if let Some(ref input) = exec.input {
674        resp["input"] = json!(input);
675    }
676    if let Some(ref output) = exec.output {
677        resp["output"] = json!(output);
678    }
679    if let Some(stop) = exec.stop_date {
680        resp["stopDate"] = json!(stop.timestamp() as f64);
681    }
682    if let Some(ref error) = exec.error {
683        resp["error"] = json!(error);
684    }
685    if let Some(ref cause) = exec.cause {
686        resp["cause"] = json!(cause);
687    }
688
689    resp
690}
691
692/// Convert event type like "PassStateEntered" to the details key format "passStateEntered".
693fn camel_to_details_key(event_type: &str) -> String {
694    let mut chars = event_type.chars();
695    match chars.next() {
696        None => String::new(),
697        Some(c) => c.to_lowercase().to_string() + chars.as_str(),
698    }
699}
700
701fn validate_arn(arn: &str) -> Result<(), AwsServiceError> {
702    if !arn.starts_with("arn:") {
703        return Err(AwsServiceError::aws_error(
704            StatusCode::BAD_REQUEST,
705            "InvalidArn",
706            format!("Invalid Arn: '{arn}'"),
707        ));
708    }
709    Ok(())
710}
711
712/// Enforce the Smithy @length on an ARN-typed field (`Arn` = 1..=256,
713/// `LongArn` = 1..=2000). Empty / oversize ARNs map to `InvalidArn`, which
714/// every ARN-bearing Step Functions operation declares.
715fn validate_arn_length(field: &str, value: &str, max: usize) -> Result<(), AwsServiceError> {
716    if value.is_empty() || value.len() > max {
717        return Err(AwsServiceError::aws_error(
718            StatusCode::BAD_REQUEST,
719            "InvalidArn",
720            format!("Invalid Arn at '{field}': must be 1..={max} characters"),
721        ));
722    }
723    Ok(())
724}
725
726/// Shared error for a malformed (unparseable) `nextToken` reaching the
727/// pagination helper — `InvalidToken` is the only pagination error code
728/// declared on every list op.
729pub(super) fn invalid_token() -> AwsServiceError {
730    AwsServiceError::aws_error(StatusCode::BAD_REQUEST, "InvalidToken", "Invalid nextToken")
731}
732
733/// `nextToken` is declared as `PageToken` (length 1..=1024). The only
734/// error code declared on every paginated list op is `InvalidToken`, so
735/// route both empty and oversize tokens through it.
736fn validate_page_token(value: &str) -> Result<(), AwsServiceError> {
737    if value.is_empty() || value.len() > 1024 {
738        return Err(AwsServiceError::aws_error(
739            StatusCode::BAD_REQUEST,
740            "InvalidToken",
741            "nextToken must be 1..=1024 characters",
742        ));
743    }
744    Ok(())
745}
746
747/// `maxResults` is typed as `PageSize` (range 0..=1000). The negative
748/// probes flip below-min / above-max; since the only error declared on
749/// `ListActivities` is `InvalidToken`, we map page-size violations to
750/// the same code (matches how AWS surfaces malformed pagination input
751/// for ops that don't model a separate `ValidationException`).
752fn validate_max_results(value: i64) -> Result<(), AwsServiceError> {
753    if !(0..=1000).contains(&value) {
754        return Err(AwsServiceError::aws_error(
755            StatusCode::BAD_REQUEST,
756            "InvalidToken",
757            format!("maxResults '{value}' is outside 0..=1000"),
758        ));
759    }
760    Ok(())
761}
762
763/// Start a Step Functions execution from a cross-service delivery (e.g. EventBridge).
764///
765/// This is the public entry point used by `StepFunctionsDeliveryImpl` in the server crate.
766/// It mirrors the logic from `StartExecution` but without the AWS request/response wrapper.
767/// Start a Step Functions execution from a cross-service delivery (e.g. EventBridge).
768///
769/// This is the public entry point used by `StepFunctionsDeliveryImpl` in the server crate.
770/// It mirrors the logic from `StartExecution` but without the AWS request/response wrapper.
771pub fn start_execution_from_delivery(
772    state: &SharedStepFunctionsState,
773    delivery: &Option<Arc<DeliveryBus>>,
774    dynamodb_state: &Option<SharedDynamoDbState>,
775    registry: &Option<SharedServiceRegistry>,
776    state_machine_arn: &str,
777    input: &str,
778) {
779    // Validate input is valid JSON
780    if serde_json::from_str::<serde_json::Value>(input).is_err() {
781        tracing::warn!(
782            state_machine_arn,
783            "Step Functions delivery: invalid JSON input, skipping execution"
784        );
785        return;
786    }
787
788    let execution_name = uuid::Uuid::new_v4().to_string();
789
790    // Extract account_id from the state machine ARN
791    let account_id = state_machine_arn
792        .split(':')
793        .nth(4)
794        .unwrap_or("000000000000")
795        .to_string();
796
797    let mut accounts = state.write();
798    let st = accounts.get_or_create(&account_id);
799    let sm = match st.state_machines.get(state_machine_arn) {
800        Some(sm) => sm,
801        None => {
802            tracing::warn!(
803                state_machine_arn,
804                "Step Functions delivery: state machine not found"
805            );
806            return;
807        }
808    };
809
810    let sm_name = sm.name.clone();
811    let definition = sm.definition.clone();
812    let exec_arn = st.execution_arn(&sm_name, &execution_name);
813
814    let now = Utc::now();
815    let execution = Execution {
816        execution_arn: exec_arn.clone(),
817        state_machine_arn: state_machine_arn.to_string(),
818        state_machine_name: sm_name,
819        name: execution_name,
820        status: ExecutionStatus::Running,
821        input: Some(input.to_string()),
822        output: None,
823        start_date: now,
824        stop_date: None,
825        error: None,
826        cause: None,
827        history_events: vec![],
828        parent_execution_arn: None,
829        is_sync: false,
830        billed_duration_ms: None,
831        billed_memory_mb: None,
832    };
833
834    st.executions.insert(exec_arn.clone(), execution);
835    let logging_config = sm.logging_configuration.clone();
836    drop(accounts);
837
838    let shared_state = state.clone();
839    let delivery = delivery.clone();
840    let dynamodb_state = dynamodb_state.clone();
841    let registry = registry.clone();
842    let input = Some(input.to_string());
843    tokio::spawn(async move {
844        interpreter::execute_state_machine(
845            shared_state,
846            exec_arn,
847            definition,
848            input,
849            delivery,
850            dynamodb_state,
851            registry,
852            logging_config,
853        )
854        .await;
855    });
856}
857
858#[cfg(test)]
859mod tests {
860    use super::*;
861    use http::{HeaderMap, Method};
862    use parking_lot::RwLock;
863    use serde_json::Value;
864    use std::collections::HashMap;
865    use std::sync::Arc;
866
867    fn make_state() -> SharedStepFunctionsState {
868        Arc::new(RwLock::new(
869            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
870        ))
871    }
872
873    fn make_request(action: &str, body: &str) -> AwsRequest {
874        AwsRequest {
875            service: "states".to_string(),
876            action: action.to_string(),
877            region: "us-east-1".to_string(),
878            account_id: "123456789012".to_string(),
879            request_id: "test-id".to_string(),
880            headers: HeaderMap::new(),
881            query_params: HashMap::new(),
882            body: body.as_bytes().to_vec().into(),
883            body_stream: parking_lot::Mutex::new(None),
884            path_segments: vec![],
885            raw_path: "/".to_string(),
886            raw_query: String::new(),
887            method: Method::POST,
888            is_query_protocol: false,
889            access_key_id: None,
890            principal: None,
891        }
892    }
893
894    fn body_json(resp: &AwsResponse) -> Value {
895        serde_json::from_slice(resp.body.expect_bytes()).unwrap()
896    }
897
898    fn expect_err(result: Result<AwsResponse, AwsServiceError>) -> AwsServiceError {
899        match result {
900            Err(e) => e,
901            Ok(_) => panic!("expected error, got Ok"),
902        }
903    }
904
905    const VALID_DEF: &str = r#"{"StartAt":"Pass","States":{"Pass":{"Type":"Pass","End":true}}}"#;
906
907    fn create_sm(svc: &StepFunctionsService, name: &str) -> String {
908        let body = json!({
909            "name": name,
910            "definition": VALID_DEF,
911            "roleArn": "arn:aws:iam::123456789012:role/test",
912        });
913        let req = make_request("CreateStateMachine", &body.to_string());
914        let resp = svc.create_state_machine(&req).unwrap();
915        let b = body_json(&resp);
916        b["stateMachineArn"].as_str().unwrap().to_string()
917    }
918
919    // ── CreateStateMachine ──
920
921    #[test]
922    fn create_state_machine_basic() {
923        let svc = StepFunctionsService::new(make_state());
924        let arn = create_sm(&svc, "test-sm");
925        assert!(arn.contains("test-sm"));
926    }
927
928    #[test]
929    fn create_state_machine_with_express_type() {
930        let svc = StepFunctionsService::new(make_state());
931        let body = json!({
932            "name": "express-sm",
933            "definition": VALID_DEF,
934            "roleArn": "arn:aws:iam::123456789012:role/r",
935            "type": "EXPRESS",
936        });
937        let req = make_request("CreateStateMachine", &body.to_string());
938        let resp = svc.create_state_machine(&req).unwrap();
939        let b = body_json(&resp);
940        assert!(b["stateMachineArn"].as_str().is_some());
941    }
942
943    #[test]
944    fn create_state_machine_duplicate_fails() {
945        let svc = StepFunctionsService::new(make_state());
946        create_sm(&svc, "dup-sm");
947        let body = json!({
948            "name": "dup-sm",
949            "definition": VALID_DEF,
950            "roleArn": "arn:aws:iam::123456789012:role/r",
951        });
952        let req = make_request("CreateStateMachine", &body.to_string());
953        let err = expect_err(svc.create_state_machine(&req));
954        assert!(err.to_string().contains("StateMachineAlreadyExists"));
955    }
956
957    #[test]
958    fn create_state_machine_missing_name() {
959        let svc = StepFunctionsService::new(make_state());
960        let body = json!({
961            "definition": VALID_DEF,
962            "roleArn": "arn:aws:iam::123456789012:role/r",
963        });
964        let req = make_request("CreateStateMachine", &body.to_string());
965        assert!(svc.create_state_machine(&req).is_err());
966    }
967
968    #[test]
969    fn create_state_machine_invalid_definition() {
970        let svc = StepFunctionsService::new(make_state());
971        let body = json!({
972            "name": "bad-def",
973            "definition": "not json",
974            "roleArn": "arn:aws:iam::123456789012:role/r",
975        });
976        let req = make_request("CreateStateMachine", &body.to_string());
977        let err = expect_err(svc.create_state_machine(&req));
978        assert!(err.to_string().contains("InvalidDefinition"));
979    }
980
981    #[test]
982    fn create_state_machine_definition_missing_start_at() {
983        let svc = StepFunctionsService::new(make_state());
984        let body = json!({
985            "name": "no-start",
986            "definition": r#"{"States":{"S":{"Type":"Pass","End":true}}}"#,
987            "roleArn": "arn:aws:iam::123456789012:role/r",
988        });
989        let req = make_request("CreateStateMachine", &body.to_string());
990        let err = expect_err(svc.create_state_machine(&req));
991        assert!(err.to_string().contains("InvalidDefinition"));
992    }
993
994    #[test]
995    fn create_state_machine_definition_missing_states() {
996        let svc = StepFunctionsService::new(make_state());
997        let body = json!({
998            "name": "no-states",
999            "definition": r#"{"StartAt":"S"}"#,
1000            "roleArn": "arn:aws:iam::123456789012:role/r",
1001        });
1002        let req = make_request("CreateStateMachine", &body.to_string());
1003        let err = expect_err(svc.create_state_machine(&req));
1004        assert!(err.to_string().contains("InvalidDefinition"));
1005    }
1006
1007    #[test]
1008    fn create_state_machine_definition_start_at_not_in_states() {
1009        let svc = StepFunctionsService::new(make_state());
1010        let body = json!({
1011            "name": "bad-start",
1012            "definition": r#"{"StartAt":"Missing","States":{"S":{"Type":"Pass","End":true}}}"#,
1013            "roleArn": "arn:aws:iam::123456789012:role/r",
1014        });
1015        let req = make_request("CreateStateMachine", &body.to_string());
1016        let err = expect_err(svc.create_state_machine(&req));
1017        assert!(err.to_string().contains("MISSING_TRANSITION_TARGET"));
1018    }
1019
1020    #[test]
1021    fn create_state_machine_invalid_type() {
1022        let svc = StepFunctionsService::new(make_state());
1023        let body = json!({
1024            "name": "bad-type",
1025            "definition": VALID_DEF,
1026            "roleArn": "arn:aws:iam::123456789012:role/r",
1027            "type": "INVALID",
1028        });
1029        let req = make_request("CreateStateMachine", &body.to_string());
1030        assert!(svc.create_state_machine(&req).is_err());
1031    }
1032
1033    #[test]
1034    fn create_state_machine_invalid_arn() {
1035        let svc = StepFunctionsService::new(make_state());
1036        let body = json!({
1037            "name": "bad-arn",
1038            "definition": VALID_DEF,
1039            "roleArn": "not-an-arn",
1040        });
1041        let req = make_request("CreateStateMachine", &body.to_string());
1042        let err = expect_err(svc.create_state_machine(&req));
1043        assert!(err.to_string().contains("InvalidArn"));
1044    }
1045
1046    #[test]
1047    fn create_state_machine_invalid_name() {
1048        let svc = StepFunctionsService::new(make_state());
1049        let body = json!({
1050            "name": "has spaces!",
1051            "definition": VALID_DEF,
1052            "roleArn": "arn:aws:iam::123456789012:role/r",
1053        });
1054        let req = make_request("CreateStateMachine", &body.to_string());
1055        let err = expect_err(svc.create_state_machine(&req));
1056        assert!(err.to_string().contains("InvalidName"));
1057    }
1058
1059    #[test]
1060    fn create_state_machine_name_too_long() {
1061        let svc = StepFunctionsService::new(make_state());
1062        let long_name = "a".repeat(81);
1063        let body = json!({
1064            "name": long_name,
1065            "definition": VALID_DEF,
1066            "roleArn": "arn:aws:iam::123456789012:role/r",
1067        });
1068        let req = make_request("CreateStateMachine", &body.to_string());
1069        let err = expect_err(svc.create_state_machine(&req));
1070        assert!(err.to_string().contains("InvalidName"));
1071    }
1072
1073    // ── DescribeStateMachine ──
1074
1075    #[test]
1076    fn describe_state_machine_found() {
1077        let svc = StepFunctionsService::new(make_state());
1078        let arn = create_sm(&svc, "desc-sm");
1079
1080        let req = make_request(
1081            "DescribeStateMachine",
1082            &json!({"stateMachineArn": arn}).to_string(),
1083        );
1084        let resp = svc.describe_state_machine(&req).unwrap();
1085        let b = body_json(&resp);
1086        assert_eq!(b["name"], "desc-sm");
1087        assert_eq!(b["status"], "ACTIVE");
1088        assert!(b["definition"].as_str().is_some());
1089    }
1090
1091    #[test]
1092    fn describe_state_machine_not_found() {
1093        let svc = StepFunctionsService::new(make_state());
1094        let req = make_request(
1095            "DescribeStateMachine",
1096            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1097                .to_string(),
1098        );
1099        let err = expect_err(svc.describe_state_machine(&req));
1100        assert!(err.to_string().contains("StateMachineDoesNotExist"));
1101    }
1102
1103    // ── ListStateMachines ──
1104
1105    #[test]
1106    fn list_state_machines_empty() {
1107        let svc = StepFunctionsService::new(make_state());
1108        let req = make_request("ListStateMachines", "{}");
1109        let resp = svc.list_state_machines(&req).unwrap();
1110        let b = body_json(&resp);
1111        assert!(b["stateMachines"].as_array().unwrap().is_empty());
1112    }
1113
1114    #[test]
1115    fn list_state_machines_returns_created() {
1116        let svc = StepFunctionsService::new(make_state());
1117        create_sm(&svc, "sm-1");
1118        create_sm(&svc, "sm-2");
1119
1120        let req = make_request("ListStateMachines", "{}");
1121        let resp = svc.list_state_machines(&req).unwrap();
1122        let b = body_json(&resp);
1123        assert_eq!(b["stateMachines"].as_array().unwrap().len(), 2);
1124    }
1125
1126    // ── DeleteStateMachine ──
1127
1128    #[test]
1129    fn delete_state_machine() {
1130        let svc = StepFunctionsService::new(make_state());
1131        let arn = create_sm(&svc, "del-sm");
1132
1133        let req = make_request(
1134            "DeleteStateMachine",
1135            &json!({"stateMachineArn": arn}).to_string(),
1136        );
1137        svc.delete_state_machine(&req).unwrap();
1138
1139        // Describe should fail
1140        let req = make_request(
1141            "DescribeStateMachine",
1142            &json!({"stateMachineArn": arn}).to_string(),
1143        );
1144        assert!(svc.describe_state_machine(&req).is_err());
1145    }
1146
1147    #[test]
1148    fn delete_state_machine_nonexistent_succeeds() {
1149        let svc = StepFunctionsService::new(make_state());
1150        let req = make_request(
1151            "DeleteStateMachine",
1152            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1153                .to_string(),
1154        );
1155        // AWS returns success even for nonexistent
1156        svc.delete_state_machine(&req).unwrap();
1157    }
1158
1159    // ── UpdateStateMachine ──
1160
1161    #[test]
1162    fn update_state_machine() {
1163        let svc = StepFunctionsService::new(make_state());
1164        let arn = create_sm(&svc, "upd-sm");
1165
1166        let new_def = r#"{"StartAt":"NewPass","States":{"NewPass":{"Type":"Pass","End":true}}}"#;
1167        let body = json!({
1168            "stateMachineArn": arn,
1169            "definition": new_def,
1170            "description": "updated",
1171        });
1172        let req = make_request("UpdateStateMachine", &body.to_string());
1173        let resp = svc.update_state_machine(&req).unwrap();
1174        let b = body_json(&resp);
1175        assert!(b["updateDate"].as_f64().is_some());
1176
1177        // Verify
1178        let req = make_request(
1179            "DescribeStateMachine",
1180            &json!({"stateMachineArn": arn}).to_string(),
1181        );
1182        let resp = svc.describe_state_machine(&req).unwrap();
1183        let b = body_json(&resp);
1184        assert!(b["definition"].as_str().unwrap().contains("NewPass"));
1185        assert_eq!(b["description"], "updated");
1186    }
1187
1188    #[test]
1189    fn update_state_machine_not_found() {
1190        let svc = StepFunctionsService::new(make_state());
1191        let body = json!({
1192            "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1193            "definition": VALID_DEF,
1194        });
1195        let req = make_request("UpdateStateMachine", &body.to_string());
1196        let err = expect_err(svc.update_state_machine(&req));
1197        assert!(err.to_string().contains("StateMachineDoesNotExist"));
1198    }
1199
1200    // ── StartExecution ──
1201
1202    #[tokio::test]
1203    async fn start_execution_basic() {
1204        let svc = StepFunctionsService::new(make_state());
1205        let arn = create_sm(&svc, "exec-sm");
1206
1207        let body = json!({
1208            "stateMachineArn": arn,
1209            "input": r#"{"key":"value"}"#,
1210        });
1211        let req = make_request("StartExecution", &body.to_string());
1212        let resp = svc.start_execution(&req).unwrap();
1213        let b = body_json(&resp);
1214        assert!(b["executionArn"].as_str().is_some());
1215        assert!(b["startDate"].as_f64().is_some());
1216    }
1217
1218    #[tokio::test]
1219    async fn start_execution_with_name() {
1220        let svc = StepFunctionsService::new(make_state());
1221        let arn = create_sm(&svc, "named-exec");
1222
1223        let body = json!({
1224            "stateMachineArn": arn,
1225            "name": "my-execution",
1226        });
1227        let req = make_request("StartExecution", &body.to_string());
1228        let resp = svc.start_execution(&req).unwrap();
1229        let b = body_json(&resp);
1230        assert!(b["executionArn"].as_str().unwrap().contains("my-execution"));
1231    }
1232
1233    #[tokio::test]
1234    async fn start_execution_sm_not_found() {
1235        let svc = StepFunctionsService::new(make_state());
1236        let body = json!({
1237            "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1238        });
1239        let req = make_request("StartExecution", &body.to_string());
1240        let err = expect_err(svc.start_execution(&req));
1241        assert!(err.to_string().contains("StateMachineDoesNotExist"));
1242    }
1243
1244    #[tokio::test]
1245    async fn start_execution_invalid_input() {
1246        let svc = StepFunctionsService::new(make_state());
1247        let arn = create_sm(&svc, "bad-input");
1248
1249        let body = json!({
1250            "stateMachineArn": arn,
1251            "input": "not json",
1252        });
1253        let req = make_request("StartExecution", &body.to_string());
1254        let err = expect_err(svc.start_execution(&req));
1255        assert!(err.to_string().contains("InvalidExecutionInput"));
1256    }
1257
1258    #[tokio::test]
1259    async fn start_execution_duplicate_name() {
1260        let svc = StepFunctionsService::new(make_state());
1261        let arn = create_sm(&svc, "dup-exec");
1262
1263        let body = json!({
1264            "stateMachineArn": arn,
1265            "name": "same-name",
1266        });
1267        let req = make_request("StartExecution", &body.to_string());
1268        svc.start_execution(&req).unwrap();
1269
1270        let req = make_request("StartExecution", &body.to_string());
1271        let err = expect_err(svc.start_execution(&req));
1272        assert!(err.to_string().contains("ExecutionAlreadyExists"));
1273    }
1274
1275    // ── DescribeExecution ──
1276
1277    #[tokio::test]
1278    async fn describe_execution_found() {
1279        let svc = StepFunctionsService::new(make_state());
1280        let sm_arn = create_sm(&svc, "desc-exec");
1281
1282        let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
1283        let req = make_request("StartExecution", &body.to_string());
1284        let resp = svc.start_execution(&req).unwrap();
1285        let exec_arn = body_json(&resp)["executionArn"]
1286            .as_str()
1287            .unwrap()
1288            .to_string();
1289
1290        let req = make_request(
1291            "DescribeExecution",
1292            &json!({"executionArn": exec_arn}).to_string(),
1293        );
1294        let resp = svc.describe_execution(&req).unwrap();
1295        let b = body_json(&resp);
1296        assert_eq!(b["name"], "e1");
1297        assert_eq!(b["status"], "RUNNING");
1298    }
1299
1300    #[tokio::test]
1301    async fn describe_execution_not_found() {
1302        let svc = StepFunctionsService::new(make_state());
1303        let req = make_request(
1304            "DescribeExecution",
1305            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1306                .to_string(),
1307        );
1308        let err = expect_err(svc.describe_execution(&req));
1309        assert!(err.to_string().contains("ExecutionDoesNotExist"));
1310    }
1311
1312    // ── StopExecution ──
1313
1314    #[tokio::test]
1315    async fn stop_execution() {
1316        let svc = StepFunctionsService::new(make_state());
1317        let sm_arn = create_sm(&svc, "stop-sm");
1318
1319        let body = json!({"stateMachineArn": sm_arn, "name": "stop-e"});
1320        let req = make_request("StartExecution", &body.to_string());
1321        let resp = svc.start_execution(&req).unwrap();
1322        let exec_arn = body_json(&resp)["executionArn"]
1323            .as_str()
1324            .unwrap()
1325            .to_string();
1326
1327        let body = json!({
1328            "executionArn": exec_arn,
1329            "error": "UserAborted",
1330            "cause": "test stop",
1331        });
1332        let req = make_request("StopExecution", &body.to_string());
1333        let resp = svc.stop_execution(&req).unwrap();
1334        let b = body_json(&resp);
1335        assert!(b["stopDate"].as_f64().is_some());
1336
1337        // Verify aborted
1338        let req = make_request(
1339            "DescribeExecution",
1340            &json!({"executionArn": exec_arn}).to_string(),
1341        );
1342        let resp = svc.describe_execution(&req).unwrap();
1343        let b = body_json(&resp);
1344        assert_eq!(b["status"], "ABORTED");
1345        assert_eq!(b["error"], "UserAborted");
1346    }
1347
1348    #[tokio::test]
1349    async fn stop_execution_not_found() {
1350        let svc = StepFunctionsService::new(make_state());
1351        let req = make_request(
1352            "StopExecution",
1353            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1354                .to_string(),
1355        );
1356        let err = expect_err(svc.stop_execution(&req));
1357        assert!(err.to_string().contains("ExecutionDoesNotExist"));
1358    }
1359
1360    // ── ListExecutions ──
1361
1362    #[tokio::test]
1363    async fn list_executions() {
1364        let svc = StepFunctionsService::new(make_state());
1365        let sm_arn = create_sm(&svc, "list-exec");
1366
1367        for i in 0..3 {
1368            let body = json!({"stateMachineArn": sm_arn, "name": format!("e{i}")});
1369            let req = make_request("StartExecution", &body.to_string());
1370            svc.start_execution(&req).unwrap();
1371        }
1372
1373        let req = make_request(
1374            "ListExecutions",
1375            &json!({"stateMachineArn": sm_arn}).to_string(),
1376        );
1377        let resp = svc.list_executions(&req).unwrap();
1378        let b = body_json(&resp);
1379        assert_eq!(b["executions"].as_array().unwrap().len(), 3);
1380    }
1381
1382    #[tokio::test]
1383    async fn list_executions_sm_not_found() {
1384        let svc = StepFunctionsService::new(make_state());
1385        let req = make_request(
1386            "ListExecutions",
1387            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1388                .to_string(),
1389        );
1390        let err = expect_err(svc.list_executions(&req));
1391        assert!(err.to_string().contains("StateMachineDoesNotExist"));
1392    }
1393
1394    // ── GetExecutionHistory ──
1395
1396    #[tokio::test]
1397    async fn get_execution_history_not_found() {
1398        let svc = StepFunctionsService::new(make_state());
1399        let req = make_request(
1400            "GetExecutionHistory",
1401            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1402                .to_string(),
1403        );
1404        let err = expect_err(svc.get_execution_history(&req));
1405        assert!(err.to_string().contains("ExecutionDoesNotExist"));
1406    }
1407
1408    // ── DescribeStateMachineForExecution ──
1409
1410    #[tokio::test]
1411    async fn describe_sm_for_execution() {
1412        let svc = StepFunctionsService::new(make_state());
1413        let sm_arn = create_sm(&svc, "sm-for-exec");
1414
1415        let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
1416        let req = make_request("StartExecution", &body.to_string());
1417        let resp = svc.start_execution(&req).unwrap();
1418        let exec_arn = body_json(&resp)["executionArn"]
1419            .as_str()
1420            .unwrap()
1421            .to_string();
1422
1423        let req = make_request(
1424            "DescribeStateMachineForExecution",
1425            &json!({"executionArn": exec_arn}).to_string(),
1426        );
1427        let resp = svc.describe_state_machine_for_execution(&req).unwrap();
1428        let b = body_json(&resp);
1429        assert_eq!(b["name"], "sm-for-exec");
1430    }
1431
1432    // ── Tags ──
1433
1434    #[test]
1435    fn tag_untag_list_tags() {
1436        let svc = StepFunctionsService::new(make_state());
1437        let arn = create_sm(&svc, "tagged-sm");
1438
1439        // Tag
1440        let body = json!({
1441            "resourceArn": arn,
1442            "tags": [{"key": "env", "value": "prod"}],
1443        });
1444        let req = make_request("TagResource", &body.to_string());
1445        svc.tag_resource(&req).unwrap();
1446
1447        // List
1448        let req = make_request(
1449            "ListTagsForResource",
1450            &json!({"resourceArn": arn}).to_string(),
1451        );
1452        let resp = svc.list_tags_for_resource(&req).unwrap();
1453        let b = body_json(&resp);
1454        let tags = b["tags"].as_array().unwrap();
1455        assert_eq!(tags.len(), 1);
1456        assert_eq!(tags[0]["key"], "env");
1457
1458        // Untag
1459        let body = json!({
1460            "resourceArn": arn,
1461            "tagKeys": ["env"],
1462        });
1463        let req = make_request("UntagResource", &body.to_string());
1464        svc.untag_resource(&req).unwrap();
1465
1466        // Verify empty
1467        let req = make_request(
1468            "ListTagsForResource",
1469            &json!({"resourceArn": arn}).to_string(),
1470        );
1471        let resp = svc.list_tags_for_resource(&req).unwrap();
1472        let b = body_json(&resp);
1473        assert!(b["tags"].as_array().unwrap().is_empty());
1474    }
1475
1476    #[test]
1477    fn tag_resource_not_found() {
1478        let svc = StepFunctionsService::new(make_state());
1479        let body = json!({
1480            "resourceArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1481            "tags": [{"key": "k", "value": "v"}],
1482        });
1483        let req = make_request("TagResource", &body.to_string());
1484        let err = expect_err(svc.tag_resource(&req));
1485        assert!(err.to_string().contains("ResourceNotFound"));
1486    }
1487
1488    // ── Helper function tests ──
1489
1490    #[test]
1491    fn test_validate_name() {
1492        assert!(validate_name("valid-name").is_ok());
1493        assert!(validate_name("under_score").is_ok());
1494        assert!(validate_name("").is_err());
1495        assert!(validate_name("has spaces").is_err());
1496        assert!(validate_name(&"a".repeat(81)).is_err());
1497    }
1498
1499    #[test]
1500    fn test_validate_definition() {
1501        assert!(validate_definition(VALID_DEF).is_ok());
1502        assert!(validate_definition("not json").is_err());
1503        assert!(validate_definition(r#"{"States":{}}"#).is_err()); // missing StartAt
1504        assert!(validate_definition(r#"{"StartAt":"S"}"#).is_err()); // missing States
1505    }
1506
1507    #[test]
1508    fn test_validate_definition_rejects_malformed_paths() {
1509        // Unterminated bracket in InputPath.
1510        let def =
1511            r#"{"StartAt":"P","States":{"P":{"Type":"Pass","InputPath":"$.arr[","End":true}}}"#;
1512        assert!(validate_definition(def).is_err());
1513
1514        // Multibyte char where the close bracket would be, in OutputPath.
1515        let def =
1516            "{\"StartAt\":\"P\",\"States\":{\"P\":{\"Type\":\"Pass\",\"OutputPath\":\"$.x[\u{00e9}\",\"End\":true}}}";
1517        assert!(validate_definition(def).is_err());
1518
1519        // Malformed Choice Variable.
1520        let def = r#"{"StartAt":"C","States":{"C":{"Type":"Choice","Choices":[{"Variable":"$.n[","NumericEquals":1,"Next":"P"}]},"P":{"Type":"Pass","End":true}}}"#;
1521        assert!(validate_definition(def).is_err());
1522
1523        // Path not rooted at $.
1524        let def =
1525            r#"{"StartAt":"P","States":{"P":{"Type":"Pass","InputPath":"foo.bar","End":true}}}"#;
1526        assert!(validate_definition(def).is_err());
1527
1528        // Empty index brackets.
1529        let def =
1530            r#"{"StartAt":"P","States":{"P":{"Type":"Pass","ResultPath":"$.x[]","End":true}}}"#;
1531        assert!(validate_definition(def).is_err());
1532    }
1533
1534    #[test]
1535    fn test_validate_definition_accepts_well_formed_paths() {
1536        // Valid reference paths, the literal "null" for Input/OutputPath, and
1537        // nested Choice combinators must all be accepted.
1538        let def = r#"{"StartAt":"P","States":{
1539            "P":{"Type":"Pass","InputPath":"$.a.b[0].c","OutputPath":"$","ResultPath":"$.out","Next":"C"},
1540            "C":{"Type":"Choice","Choices":[
1541                {"And":[{"Variable":"$.items[2]","NumericEquals":1},{"Variable":"$.flag","BooleanEquals":true}],"Next":"S"}
1542            ],"Default":"S"},
1543            "S":{"Type":"Succeed","InputPath":"null"}
1544        }}"#;
1545        assert!(validate_definition(def).is_ok());
1546    }
1547
1548    #[test]
1549    fn test_is_valid_reference_path() {
1550        assert!(is_valid_reference_path("$"));
1551        assert!(is_valid_reference_path("$.foo"));
1552        assert!(is_valid_reference_path("$.foo.bar[3].baz"));
1553        assert!(is_valid_reference_path("$[0]"));
1554        assert!(!is_valid_reference_path("$.arr["));
1555        assert!(!is_valid_reference_path("$.x[\u{00e9}"));
1556        assert!(!is_valid_reference_path("$.x[]"));
1557        assert!(!is_valid_reference_path("$.x[abc]"));
1558        assert!(!is_valid_reference_path("foo.bar"));
1559        assert!(!is_valid_reference_path(""));
1560    }
1561
1562    #[test]
1563    fn test_validate_arn() {
1564        assert!(validate_arn("arn:aws:states:us-east-1:123:sm:test").is_ok());
1565        assert!(validate_arn("not-an-arn").is_err());
1566    }
1567
1568    #[test]
1569    fn test_camel_to_details_key() {
1570        assert_eq!(camel_to_details_key("PassStateEntered"), "passStateEntered");
1571        assert_eq!(camel_to_details_key(""), "");
1572    }
1573
1574    #[test]
1575    fn test_is_mutating_action() {
1576        assert!(is_mutating_action("CreateStateMachine"));
1577        assert!(is_mutating_action("StartExecution"));
1578        assert!(!is_mutating_action("DescribeStateMachine"));
1579        assert!(!is_mutating_action("ListStateMachines"));
1580    }
1581
1582    // ── StartSyncExecution ──
1583
1584    fn create_express_sm(svc: &StepFunctionsService, name: &str) -> String {
1585        let body = json!({
1586            "name": name,
1587            "definition": VALID_DEF,
1588            "roleArn": "arn:aws:iam::123456789012:role/test",
1589            "type": "EXPRESS",
1590        });
1591        let req = make_request("CreateStateMachine", &body.to_string());
1592        let resp = svc.create_state_machine(&req).unwrap();
1593        let b = body_json(&resp);
1594        b["stateMachineArn"].as_str().unwrap().to_string()
1595    }
1596
1597    #[tokio::test]
1598    async fn start_sync_execution_basic() {
1599        let svc = StepFunctionsService::new(make_state());
1600        let arn = create_express_sm(&svc, "sync-sm");
1601
1602        let body = json!({
1603            "stateMachineArn": arn,
1604            "input": r#"{"key":"value"}"#,
1605        });
1606        let req = make_request("StartSyncExecution", &body.to_string());
1607        let resp = svc.start_sync_execution(&req).await.unwrap();
1608        let b = body_json(&resp);
1609        assert!(b["executionArn"]
1610            .as_str()
1611            .unwrap()
1612            .contains("express:sync-sm"));
1613        assert_eq!(b["stateMachineArn"], arn);
1614        assert_eq!(b["status"], "SUCCEEDED");
1615        assert!(b["startDate"].as_i64().is_some());
1616        assert!(b["stopDate"].as_i64().is_some());
1617        assert!(b["output"].as_str().is_some());
1618        assert!(b["billingDetails"]["billedDurationInMilliseconds"]
1619            .as_i64()
1620            .is_some());
1621    }
1622
1623    #[tokio::test]
1624    async fn start_sync_execution_not_express() {
1625        let svc = StepFunctionsService::new(make_state());
1626        let arn = create_sm(&svc, "std-sm");
1627
1628        let body = json!({"stateMachineArn": arn});
1629        let req = make_request("StartSyncExecution", &body.to_string());
1630        let err = expect_err(svc.start_sync_execution(&req).await);
1631        assert!(err.to_string().contains("StateMachineTypeNotSupported"));
1632    }
1633
1634    #[tokio::test]
1635    async fn start_sync_execution_sm_not_found() {
1636        let svc = StepFunctionsService::new(make_state());
1637        let body = json!({
1638            "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1639        });
1640        let req = make_request("StartSyncExecution", &body.to_string());
1641        let err = expect_err(svc.start_sync_execution(&req).await);
1642        assert!(err.to_string().contains("StateMachineDoesNotExist"));
1643    }
1644
1645    #[tokio::test]
1646    async fn start_sync_execution_records_introspection_fields() {
1647        let svc = StepFunctionsService::new(make_state());
1648        let arn = create_express_sm(&svc, "sync-introspect");
1649
1650        let body = json!({"stateMachineArn": arn, "input": "{}"});
1651        let req = make_request("StartSyncExecution", &body.to_string());
1652        let resp = svc.start_sync_execution(&req).await.unwrap();
1653        let b = body_json(&resp);
1654        let exec_arn = b["executionArn"].as_str().unwrap().to_string();
1655
1656        let accounts = svc.state.read();
1657        let state = accounts.get("123456789012").unwrap();
1658        let stored = state
1659            .executions
1660            .get(&exec_arn)
1661            .expect("sync execution should be persisted for introspection");
1662        assert!(stored.is_sync, "sync executions must be marked is_sync");
1663        assert_eq!(stored.billed_memory_mb, Some(64));
1664        assert!(
1665            stored.billed_duration_ms.is_some(),
1666            "billed_duration_ms must be populated after sync run"
1667        );
1668        assert!(
1669            stored.parent_execution_arn.is_none(),
1670            "top-level sync execution has no parent"
1671        );
1672    }
1673
1674    #[tokio::test]
1675    async fn start_sync_execution_invalid_input() {
1676        let svc = StepFunctionsService::new(make_state());
1677        let arn = create_express_sm(&svc, "bad-input-sync");
1678
1679        let body = json!({
1680            "stateMachineArn": arn,
1681            "input": "not json",
1682        });
1683        let req = make_request("StartSyncExecution", &body.to_string());
1684        let err = expect_err(svc.start_sync_execution(&req).await);
1685        assert!(err.to_string().contains("InvalidExecutionInput"));
1686    }
1687
1688    /// No snapshot store (memory mode) -> no persist hook for the CFN provisioner.
1689    #[test]
1690    fn snapshot_hook_is_none_without_store() {
1691        let svc = StepFunctionsService::new(make_state());
1692        assert!(svc.snapshot_hook().is_none());
1693    }
1694
1695    /// With a store, the hook is present and invoking it runs the whole-state
1696    /// persist path the CloudFormation provisioner uses after mutating Step
1697    /// Functions state directly.
1698    #[tokio::test]
1699    async fn snapshot_hook_fires_with_store() {
1700        let store: Arc<dyn fakecloud_persistence::SnapshotStore> =
1701            Arc::new(fakecloud_persistence::MemorySnapshotStore::new());
1702        let svc = StepFunctionsService::new(make_state()).with_snapshot_store(store);
1703        let hook = svc
1704            .snapshot_hook()
1705            .expect("hook present when a store is set");
1706        hook().await;
1707    }
1708}
1709
1710#[cfg(test)]
1711mod pagination_reject_test {
1712    #[test]
1713    fn paginate_checked_rejects_invalid_token() {
1714        use fakecloud_core::pagination::paginate_checked;
1715        let items: Vec<i32> = (0..5).collect();
1716        assert!(paginate_checked(&items, Some("bad"), 3).is_err());
1717        assert!(paginate_checked(&items, Some("2"), 3).is_ok());
1718    }
1719}