Skip to main content

fakecloud_stepfunctions/service/
mod.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use chrono::Utc;
6use http::StatusCode;
7use serde_json::{json, Value};
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::pagination::paginate_checked;
12use fakecloud_core::registry::ServiceRegistry;
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_core::validation::*;
15use fakecloud_dynamodb::SharedDynamoDbState;
16use fakecloud_persistence::SnapshotStore;
17
18use crate::interpreter;
19use crate::state::{
20    Execution, ExecutionStatus, SharedStepFunctionsState, StateMachine, StateMachineStatus,
21    StateMachineType, StepFunctionsSnapshot, StepFunctionsState,
22    STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
23};
24
25const SUPPORTED: &[&str] = &[
26    "CreateStateMachine",
27    "DescribeStateMachine",
28    "ListStateMachines",
29    "DeleteStateMachine",
30    "UpdateStateMachine",
31    "TagResource",
32    "UntagResource",
33    "ListTagsForResource",
34    "StartExecution",
35    "StopExecution",
36    "DescribeExecution",
37    "ListExecutions",
38    "GetExecutionHistory",
39    "DescribeStateMachineForExecution",
40    "CreateActivity",
41    "DeleteActivity",
42    "DescribeActivity",
43    "ListActivities",
44    "GetActivityTask",
45    "SendTaskFailure",
46    "SendTaskHeartbeat",
47    "SendTaskSuccess",
48    "PublishStateMachineVersion",
49    "DeleteStateMachineVersion",
50    "ListStateMachineVersions",
51    "CreateStateMachineAlias",
52    "DeleteStateMachineAlias",
53    "DescribeStateMachineAlias",
54    "ListStateMachineAliases",
55    "UpdateStateMachineAlias",
56    "DescribeMapRun",
57    "ListMapRuns",
58    "UpdateMapRun",
59    "RedriveExecution",
60    "StartSyncExecution",
61    "TestState",
62    "ValidateStateMachineDefinition",
63];
64
65/// Handle to the central service registry, set by `main.rs` after every service
66/// has been registered. Wrapped in `OnceLock` so `StepFunctionsService` can be
67/// constructed (and registered into the very registry it later reads back) before
68/// the registry itself is finalized. The interpreter snapshots the inner `Arc`
69/// when it needs to dispatch generic `aws-sdk:*` Task integrations.
70pub type SharedServiceRegistry = Arc<std::sync::OnceLock<Arc<ServiceRegistry>>>;
71
72pub struct StepFunctionsService {
73    state: SharedStepFunctionsState,
74    delivery: Option<Arc<DeliveryBus>>,
75    dynamodb_state: Option<SharedDynamoDbState>,
76    registry: Option<SharedServiceRegistry>,
77    snapshot_store: Option<Arc<dyn SnapshotStore>>,
78    snapshot_lock: Arc<AsyncMutex<()>>,
79}
80
81mod activities;
82mod executions;
83mod map_runs;
84mod state_machines;
85mod tags;
86mod tasks;
87mod validation;
88
89impl StepFunctionsService {
90    pub fn new(state: SharedStepFunctionsState) -> Self {
91        Self {
92            state,
93            delivery: None,
94            dynamodb_state: None,
95            registry: None,
96            snapshot_store: None,
97            snapshot_lock: Arc::new(AsyncMutex::new(())),
98        }
99    }
100
101    pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
102        self.delivery = Some(delivery);
103        self
104    }
105
106    pub fn with_dynamodb(mut self, dynamodb_state: SharedDynamoDbState) -> Self {
107        self.dynamodb_state = Some(dynamodb_state);
108        self
109    }
110
111    /// Hand the service a deferred-fill handle to the central [`ServiceRegistry`].
112    /// `main.rs` calls [`OnceLock::set`] on the inner cell after every service
113    /// has been registered; until then the interpreter falls back to its
114    /// hand-coded SDK integrations (lambda invoke, sqs sendMessage, …).
115    pub fn with_registry(mut self, registry: SharedServiceRegistry) -> Self {
116        self.registry = Some(registry);
117        self
118    }
119
120    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
121        self.snapshot_store = Some(store);
122        self
123    }
124
125    async fn save_snapshot(&self) {
126        save_stepfunctions_snapshot(
127            &self.state,
128            self.snapshot_store.clone(),
129            &self.snapshot_lock,
130        )
131        .await;
132    }
133
134    /// Build a hook that persists the current Step Functions state when invoked,
135    /// or `None` in memory mode (no snapshot store). The CloudFormation
136    /// provisioner mutates `state` directly and uses this to write a
137    /// CFN-provisioned resource through to disk, the same way a direct mutating
138    /// API call would.
139    pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
140        let store = self.snapshot_store.clone()?;
141        let state = self.state.clone();
142        let lock = self.snapshot_lock.clone();
143        Some(Arc::new(move || {
144            let state = state.clone();
145            let store = store.clone();
146            let lock = lock.clone();
147            Box::pin(async move {
148                save_stepfunctions_snapshot(&state, Some(store), &lock).await;
149            })
150        }))
151    }
152}
153
154/// Persist the current Step Functions state as a snapshot. Offloads the serde +
155/// blocking file write to the Tokio blocking pool. Noop when `store` is `None`
156/// (memory mode). Shared by `StepFunctionsService::save_snapshot` and the
157/// CloudFormation provisioner's post-provision persist hook so both route
158/// through the same serialize-and-write path.
159pub async fn save_stepfunctions_snapshot(
160    state: &SharedStepFunctionsState,
161    store: Option<Arc<dyn SnapshotStore>>,
162    lock: &AsyncMutex<()>,
163) {
164    let Some(store) = store else {
165        return;
166    };
167    let _guard = lock.lock().await;
168    let snapshot = StepFunctionsSnapshot {
169        schema_version: STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
170        state: None,
171        accounts: Some(state.read().clone()),
172    };
173    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
174        let bytes = serde_json::to_vec(&snapshot)
175            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
176        store.save(&bytes)
177    })
178    .await;
179    match join {
180        Ok(Ok(())) => {}
181        Ok(Err(err)) => tracing::error!(%err, "failed to write stepfunctions snapshot"),
182        Err(err) => tracing::error!(%err, "stepfunctions snapshot task panicked"),
183    }
184}
185
186/// Abort executions that were RUNNING (or PENDING_REDRIVE) when the server
187/// stopped, called once after a persistence snapshot is loaded on startup.
188///
189/// The in-memory interpreter that was driving each one is gone after a restart,
190/// so the execution can never advance. Real Step Functions resumes from durable
191/// interpreter state; fakecloud can't, so leaving them RUNNING would strand them
192/// forever (DescribeExecution would report RUNNING with no way to ever finish).
193/// Aborting with a terminal stop date + error/cause is the honest outcome
194/// (bug-audit 2026-06-20, 0.A2). Returns the number reconciled.
195pub fn reconcile_interrupted_executions(state: &SharedStepFunctionsState) -> usize {
196    let now = Utc::now();
197    let mut count = 0;
198    let mut accounts = state.write();
199    let account_ids: Vec<String> = accounts.iter().map(|(id, _)| id.to_string()).collect();
200    for account_id in account_ids {
201        let Some(s) = accounts.get_mut(&account_id) else {
202            continue;
203        };
204        for exec in s.executions.values_mut() {
205            if matches!(
206                exec.status,
207                ExecutionStatus::Running | ExecutionStatus::PendingRedrive
208            ) {
209                exec.status = ExecutionStatus::Aborted;
210                exec.stop_date = Some(now);
211                exec.error = Some("Fakecloud.Restart".to_string());
212                exec.cause = Some(
213                    "Execution was interrupted by a fakecloud restart and cannot be resumed"
214                        .to_string(),
215                );
216                count += 1;
217            }
218        }
219    }
220    count
221}
222
223fn is_mutating_action(action: &str) -> bool {
224    matches!(
225        action,
226        "CreateStateMachine"
227            | "DeleteStateMachine"
228            | "UpdateStateMachine"
229            | "TagResource"
230            | "UntagResource"
231            | "StartExecution"
232            | "StopExecution"
233            | "CreateActivity"
234            | "DeleteActivity"
235            | "GetActivityTask"
236            | "SendTaskFailure"
237            | "SendTaskHeartbeat"
238            | "SendTaskSuccess"
239            | "PublishStateMachineVersion"
240            | "DeleteStateMachineVersion"
241            | "CreateStateMachineAlias"
242            | "DeleteStateMachineAlias"
243            | "UpdateStateMachineAlias"
244            | "UpdateMapRun"
245            | "RedriveExecution"
246            | "StartSyncExecution"
247    )
248}
249
250#[async_trait]
251impl AwsService for StepFunctionsService {
252    fn service_name(&self) -> &str {
253        "states"
254    }
255
256    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
257        let mutates = is_mutating_action(req.action.as_str());
258        let result = match req.action.as_str() {
259            "CreateStateMachine" => self.create_state_machine(&req),
260            "DescribeStateMachine" => self.describe_state_machine(&req),
261            "ListStateMachines" => self.list_state_machines(&req),
262            "DeleteStateMachine" => self.delete_state_machine(&req),
263            "UpdateStateMachine" => self.update_state_machine(&req),
264            "TagResource" => self.tag_resource(&req),
265            "UntagResource" => self.untag_resource(&req),
266            "ListTagsForResource" => self.list_tags_for_resource(&req),
267            "StartExecution" => self.start_execution(&req),
268            "StopExecution" => self.stop_execution(&req),
269            "DescribeExecution" => self.describe_execution(&req),
270            "ListExecutions" => self.list_executions(&req),
271            "GetExecutionHistory" => self.get_execution_history(&req),
272            "DescribeStateMachineForExecution" => self.describe_state_machine_for_execution(&req),
273            "CreateActivity" => self.create_activity(&req),
274            "DeleteActivity" => self.delete_activity(&req),
275            "DescribeActivity" => self.describe_activity(&req),
276            "ListActivities" => self.list_activities(&req),
277            "GetActivityTask" => self.get_activity_task(&req).await,
278            "SendTaskFailure" => self.send_task_failure(&req),
279            "SendTaskHeartbeat" => self.send_task_heartbeat(&req),
280            "SendTaskSuccess" => self.send_task_success(&req),
281            "PublishStateMachineVersion" => self.publish_state_machine_version(&req),
282            "DeleteStateMachineVersion" => self.delete_state_machine_version(&req),
283            "ListStateMachineVersions" => self.list_state_machine_versions(&req),
284            "CreateStateMachineAlias" => self.create_state_machine_alias(&req),
285            "DeleteStateMachineAlias" => self.delete_state_machine_alias(&req),
286            "DescribeStateMachineAlias" => self.describe_state_machine_alias(&req),
287            "ListStateMachineAliases" => self.list_state_machine_aliases(&req),
288            "UpdateStateMachineAlias" => self.update_state_machine_alias(&req),
289            "DescribeMapRun" => self.describe_map_run(&req),
290            "ListMapRuns" => self.list_map_runs(&req),
291            "UpdateMapRun" => self.update_map_run(&req),
292            "RedriveExecution" => self.redrive_execution(&req),
293            "StartSyncExecution" => self.start_sync_execution(&req).await,
294            "TestState" => self.test_state(&req),
295            "ValidateStateMachineDefinition" => self.validate_state_machine_definition(&req),
296            _ => Err(AwsServiceError::action_not_implemented(
297                "states",
298                &req.action,
299            )),
300        };
301        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
302            self.save_snapshot().await;
303        }
304        result
305    }
306
307    fn supported_actions(&self) -> &[&str] {
308        SUPPORTED
309    }
310}
311
312impl StepFunctionsService {
313    fn list_activities(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
314        let body = req.json_body();
315        let raw_max_results = body["maxResults"].as_i64();
316        if let Some(mr) = raw_max_results {
317            validate_max_results(mr)?;
318        }
319        let next_token = body["nextToken"].as_str();
320        if let Some(t) = next_token {
321            validate_page_token(t)?;
322        }
323        // Smithy default: maxResults=0 means "use the service default"
324        // (100 for Step Functions), which we honour by treating both
325        // `None` and `0` as 100.
326        let max_results = match raw_max_results.unwrap_or(0) {
327            0 => 100,
328            n => n as usize,
329        };
330        let accounts = self.state.read();
331        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
332        let state = accounts.get(&req.account_id).unwrap_or(&empty);
333        let mut activities: Vec<&crate::state::Activity> = state.activities.values().collect();
334        activities.sort_by(|a, b| a.name.cmp(&b.name));
335        let items: Vec<Value> = activities
336            .iter()
337            .map(|a| {
338                json!({
339                    "activityArn": a.arn,
340                    "name": a.name,
341                    "creationDate": a.creation_date.timestamp(),
342                })
343            })
344            .collect();
345        let (page, token) =
346            paginate_checked(&items, next_token, max_results).map_err(|_| invalid_token())?;
347        let mut resp = json!({ "activities": page });
348        if let Some(t) = token {
349            resp["nextToken"] = json!(t);
350        }
351        Ok(AwsResponse::ok_json(resp))
352    }
353}
354
355fn state_machine_alias_to_json(alias: &crate::state::StateMachineAlias) -> Value {
356    json!({
357        "stateMachineAliasArn": alias.arn,
358        "name": alias.name,
359        "description": alias.description,
360        "routingConfiguration": alias.routing_configuration.iter().map(|r| json!({
361            "stateMachineVersionArn": r.state_machine_version_arn,
362            "weight": r.weight,
363        })).collect::<Vec<_>>(),
364        "creationDate": alias.creation_date.timestamp(),
365        "updateDate": alias.update_date.timestamp(),
366    })
367}
368
369fn map_run_to_json(mr: &crate::state::MapRun) -> Value {
370    json!({
371        "mapRunArn": mr.map_run_arn,
372        "executionArn": mr.execution_arn,
373        "maxConcurrency": mr.max_concurrency,
374        "toleratedFailurePercentage": mr.tolerated_failure_percentage,
375        "toleratedFailureCount": mr.tolerated_failure_count,
376        "status": mr.status,
377        "startDate": mr.start_date.timestamp(),
378        "stopDate": mr.stop_date.map(|d| d.timestamp()),
379    })
380}
381
382// ─── Helpers ────────────────────────────────────────────────────────────
383
384fn state_machine_to_json(sm: &StateMachine) -> Value {
385    let mut resp = json!({
386        "name": sm.name,
387        "stateMachineArn": sm.arn,
388        "definition": sm.definition,
389        "roleArn": sm.role_arn,
390        "type": sm.machine_type.as_str(),
391        "status": sm.status.as_str(),
392        "creationDate": sm.creation_date.timestamp() as f64,
393        "updateDate": sm.update_date.timestamp() as f64,
394        "revisionId": sm.revision_id,
395        "label": sm.name,
396    });
397
398    if !sm.description.is_empty() {
399        resp["description"] = json!(sm.description);
400    }
401
402    if let Some(ref logging) = sm.logging_configuration {
403        resp["loggingConfiguration"] = logging.clone();
404    } else {
405        resp["loggingConfiguration"] = json!({
406            "level": "OFF",
407            "includeExecutionData": false,
408            "destinations": [],
409        });
410    }
411
412    if let Some(ref tracing) = sm.tracing_configuration {
413        resp["tracingConfiguration"] = tracing.clone();
414    } else {
415        resp["tracingConfiguration"] = json!({
416            "enabled": false,
417        });
418    }
419
420    resp
421}
422
423fn missing(name: &str) -> AwsServiceError {
424    AwsServiceError::aws_error(
425        StatusCode::BAD_REQUEST,
426        "ValidationException",
427        format!("The request must contain the parameter {name}."),
428    )
429}
430
431fn state_machine_not_found(arn: &str) -> AwsServiceError {
432    AwsServiceError::aws_error(
433        StatusCode::BAD_REQUEST,
434        "StateMachineDoesNotExist",
435        format!("State Machine Does Not Exist: '{arn}'"),
436    )
437}
438
439fn activity_not_found(arn: &str) -> AwsServiceError {
440    AwsServiceError::aws_error(
441        StatusCode::BAD_REQUEST,
442        "ActivityDoesNotExist",
443        format!("Activity does not exist: {arn}"),
444    )
445}
446
447fn task_does_not_exist(token: &str) -> AwsServiceError {
448    AwsServiceError::aws_error(
449        StatusCode::BAD_REQUEST,
450        "TaskDoesNotExist",
451        format!("Task does not exist: {token}"),
452    )
453}
454
455fn resource_not_found(arn: &str) -> AwsServiceError {
456    AwsServiceError::aws_error(
457        StatusCode::BAD_REQUEST,
458        "ResourceNotFound",
459        format!("Resource not found: '{arn}'"),
460    )
461}
462
463/// Parse + validate an alias `routingConfiguration` array.
464///
465/// AWS rules: 1 or 2 routes; weights are 0-100 and sum to 100; each
466/// route must include `stateMachineVersionArn`.
467fn parse_routing_configuration(
468    routes: &[serde_json::Value],
469) -> Result<Vec<crate::state::AliasRoute>, AwsServiceError> {
470    if routes.is_empty() || routes.len() > 2 {
471        return Err(AwsServiceError::aws_error(
472            StatusCode::BAD_REQUEST,
473            "ValidationException",
474            "routingConfiguration must contain 1 or 2 routes.",
475        ));
476    }
477    let parsed: Vec<crate::state::AliasRoute> = routes
478        .iter()
479        .map(|r| {
480            let arn = r["stateMachineVersionArn"].as_str().ok_or_else(|| {
481                AwsServiceError::aws_error(
482                    StatusCode::BAD_REQUEST,
483                    "ValidationException",
484                    "routingConfiguration entries must contain stateMachineVersionArn.",
485                )
486            })?;
487            let weight = r["weight"].as_i64().ok_or_else(|| {
488                AwsServiceError::aws_error(
489                    StatusCode::BAD_REQUEST,
490                    "ValidationException",
491                    "routingConfiguration entries must contain a numeric weight.",
492                )
493            })?;
494            if !(0..=100).contains(&weight) {
495                return Err(AwsServiceError::aws_error(
496                    StatusCode::BAD_REQUEST,
497                    "ValidationException",
498                    format!("Invalid routing weight {weight}; must be 0-100."),
499                ));
500            }
501            Ok(crate::state::AliasRoute {
502                state_machine_version_arn: arn.to_string(),
503                weight: weight as i32,
504            })
505        })
506        .collect::<Result<_, _>>()?;
507    let total: i32 = parsed.iter().map(|r| r.weight).sum();
508    if total != 100 {
509        return Err(AwsServiceError::aws_error(
510            StatusCode::BAD_REQUEST,
511            "ValidationException",
512            format!("routingConfiguration weights must sum to 100, got {total}."),
513        ));
514    }
515    Ok(parsed)
516}
517
518fn validate_name(name: &str) -> Result<(), AwsServiceError> {
519    if name.is_empty() || name.len() > 80 {
520        return Err(AwsServiceError::aws_error(
521            StatusCode::BAD_REQUEST,
522            "InvalidName",
523            format!("Invalid Name: '{name}' (length must be between 1 and 80 characters)"),
524        ));
525    }
526    // Only allow alphanumeric, hyphens, and underscores
527    if !name
528        .chars()
529        .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
530    {
531        return Err(AwsServiceError::aws_error(
532            StatusCode::BAD_REQUEST,
533            "InvalidName",
534            format!(
535                "Invalid Name: '{name}' (must only contain alphanumeric characters, hyphens, and underscores)"
536            ),
537        ));
538    }
539    Ok(())
540}
541
542fn validate_definition(definition: &str) -> Result<(), AwsServiceError> {
543    let parsed: Value = serde_json::from_str(definition).map_err(|e| {
544        AwsServiceError::aws_error(
545            StatusCode::BAD_REQUEST,
546            "InvalidDefinition",
547            format!("Invalid State Machine Definition: '{e}'"),
548        )
549    })?;
550
551    if parsed.get("StartAt").and_then(|v| v.as_str()).is_none() {
552        return Err(AwsServiceError::aws_error(
553            StatusCode::BAD_REQUEST,
554            "InvalidDefinition",
555            "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
556                .to_string(),
557        ));
558    }
559
560    let states_obj = parsed
561        .get("States")
562        .and_then(|v| v.as_object())
563        .ok_or_else(|| {
564            AwsServiceError::aws_error(
565                StatusCode::BAD_REQUEST,
566                "InvalidDefinition",
567                "Invalid State Machine Definition: 'MISSING_STATES' (States field is required)"
568                    .to_string(),
569            )
570        })?;
571
572    let start_at = parsed["StartAt"].as_str().ok_or_else(|| {
573        AwsServiceError::aws_error(
574            StatusCode::BAD_REQUEST,
575            "InvalidDefinition",
576            "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
577                .to_string(),
578        )
579    })?;
580    if !states_obj.contains_key(start_at) {
581        return Err(AwsServiceError::aws_error(
582            StatusCode::BAD_REQUEST,
583            "InvalidDefinition",
584            format!(
585                "Invalid State Machine Definition: 'MISSING_TRANSITION_TARGET' \
586                 (StartAt '{start_at}' does not reference a valid state)"
587            ),
588        ));
589    }
590
591    // Reject malformed JSONPath reference fields. AWS rejects bad reference
592    // paths at CreateStateMachine; accepting them here lets a panic-inducing
593    // path reach the interpreter at execution time.
594    for (state_name, state_val) in states_obj {
595        validate_state_paths(state_name, state_val)?;
596    }
597
598    Ok(())
599}
600
601/// Validate the JSONPath reference fields of a single state. Recurses into the
602/// `Choices` array of Choice states. Returns an `InvalidDefinition` error for
603/// any syntactically malformed path.
604fn validate_state_paths(state_name: &str, state: &Value) -> Result<(), AwsServiceError> {
605    for field in ["InputPath", "OutputPath", "ResultPath"] {
606        if let Some(p) = state.get(field).and_then(|v| v.as_str()) {
607            // `InputPath`/`OutputPath` accept the literal "null" to mean "no
608            // value"; `ResultPath` accepts JSON null (handled separately) but
609            // not the string "null". Both accept "$"-rooted reference paths.
610            if (field != "ResultPath" && p == "null") || is_valid_reference_path(p) {
611                continue;
612            }
613            return Err(invalid_reference_path(state_name, field, p));
614        }
615    }
616
617    if state.get("Type").and_then(|v| v.as_str()) == Some("Choice") {
618        if let Some(choices) = state.get("Choices").and_then(|v| v.as_array()) {
619            for rule in choices {
620                validate_choice_variables(state_name, rule)?;
621            }
622        }
623    }
624
625    Ok(())
626}
627
628/// Validate `Variable` reference paths inside a Choice rule, recursing through
629/// nested `And` / `Or` / `Not` boolean combinators.
630fn validate_choice_variables(state_name: &str, rule: &Value) -> Result<(), AwsServiceError> {
631    if let Some(v) = rule.get("Variable").and_then(|v| v.as_str()) {
632        if !is_valid_reference_path(v) {
633            return Err(invalid_reference_path(state_name, "Variable", v));
634        }
635    }
636    for combinator in ["And", "Or"] {
637        if let Some(nested) = rule.get(combinator).and_then(|v| v.as_array()) {
638            for n in nested {
639                validate_choice_variables(state_name, n)?;
640            }
641        }
642    }
643    if let Some(n) = rule.get("Not") {
644        validate_choice_variables(state_name, n)?;
645    }
646    Ok(())
647}
648
649/// A reference path must start with `$` and every `[...]` index segment must be
650/// a balanced, non-empty, decimal-integer index. This mirrors the subset of
651/// JSONPath the interpreter understands; anything it cannot parse is rejected.
652fn is_valid_reference_path(path: &str) -> bool {
653    if path != "$" && !path.starts_with("$.") && !path.starts_with("$[") {
654        return false;
655    }
656    let body = path
657        .strip_prefix("$.")
658        .or_else(|| path.strip_prefix('$'))
659        .unwrap_or(path);
660    for part in body.split('.') {
661        if !segment_is_valid(part) {
662            return false;
663        }
664    }
665    true
666}
667
668fn segment_is_valid(part: &str) -> bool {
669    match part.find('[') {
670        None => !part.contains(']'),
671        Some(open) => {
672            if !part.ends_with(']') {
673                return false;
674            }
675            let inner = &part[open + 1..part.len() - 1];
676            // Empty `[]` or non-integer (incl. multibyte/garbage) indices are invalid.
677            !inner.is_empty() && inner.parse::<usize>().is_ok()
678        }
679    }
680}
681
682fn invalid_reference_path(state_name: &str, field: &str, path: &str) -> AwsServiceError {
683    AwsServiceError::aws_error(
684        StatusCode::BAD_REQUEST,
685        "InvalidDefinition",
686        format!(
687            "Invalid State Machine Definition: 'SCHEMA_VALIDATION_FAILED' \
688             (The {field} field of state '{state_name}' is not a valid reference path: '{path}')"
689        ),
690    )
691}
692
693fn execution_not_found(arn: &str) -> AwsServiceError {
694    AwsServiceError::aws_error(
695        StatusCode::BAD_REQUEST,
696        "ExecutionDoesNotExist",
697        format!("Execution Does Not Exist: '{arn}'"),
698    )
699}
700
701fn execution_to_json(exec: &Execution) -> Value {
702    let mut resp = json!({
703        "executionArn": exec.execution_arn,
704        "stateMachineArn": exec.state_machine_arn,
705        "name": exec.name,
706        "status": exec.status.as_str(),
707        "startDate": exec.start_date.timestamp() as f64,
708    });
709
710    if let Some(ref input) = exec.input {
711        resp["input"] = json!(input);
712    }
713    if let Some(ref output) = exec.output {
714        resp["output"] = json!(output);
715    }
716    if let Some(stop) = exec.stop_date {
717        resp["stopDate"] = json!(stop.timestamp() as f64);
718    }
719    if let Some(ref error) = exec.error {
720        resp["error"] = json!(error);
721    }
722    if let Some(ref cause) = exec.cause {
723        resp["cause"] = json!(cause);
724    }
725
726    resp
727}
728
729/// Convert event type like "PassStateEntered" to the details key format "passStateEntered".
730fn camel_to_details_key(event_type: &str) -> String {
731    let mut chars = event_type.chars();
732    match chars.next() {
733        None => String::new(),
734        Some(c) => c.to_lowercase().to_string() + chars.as_str(),
735    }
736}
737
738fn validate_arn(arn: &str) -> Result<(), AwsServiceError> {
739    if !arn.starts_with("arn:") {
740        return Err(AwsServiceError::aws_error(
741            StatusCode::BAD_REQUEST,
742            "InvalidArn",
743            format!("Invalid Arn: '{arn}'"),
744        ));
745    }
746    Ok(())
747}
748
749/// Enforce the Smithy @length on an ARN-typed field (`Arn` = 1..=256,
750/// `LongArn` = 1..=2000). Empty / oversize ARNs map to `InvalidArn`, which
751/// every ARN-bearing Step Functions operation declares.
752fn validate_arn_length(field: &str, value: &str, max: usize) -> Result<(), AwsServiceError> {
753    if value.is_empty() || value.len() > max {
754        return Err(AwsServiceError::aws_error(
755            StatusCode::BAD_REQUEST,
756            "InvalidArn",
757            format!("Invalid Arn at '{field}': must be 1..={max} characters"),
758        ));
759    }
760    Ok(())
761}
762
763/// Shared error for a malformed (unparseable) `nextToken` reaching the
764/// pagination helper — `InvalidToken` is the only pagination error code
765/// declared on every list op.
766pub(super) fn invalid_token() -> AwsServiceError {
767    AwsServiceError::aws_error(StatusCode::BAD_REQUEST, "InvalidToken", "Invalid nextToken")
768}
769
770/// `nextToken` is declared as `PageToken` (length 1..=1024). The only
771/// error code declared on every paginated list op is `InvalidToken`, so
772/// route both empty and oversize tokens through it.
773fn validate_page_token(value: &str) -> Result<(), AwsServiceError> {
774    if value.is_empty() || value.len() > 1024 {
775        return Err(AwsServiceError::aws_error(
776            StatusCode::BAD_REQUEST,
777            "InvalidToken",
778            "nextToken must be 1..=1024 characters",
779        ));
780    }
781    Ok(())
782}
783
784/// `maxResults` is typed as `PageSize` (range 0..=1000). The negative
785/// probes flip below-min / above-max; since the only error declared on
786/// `ListActivities` is `InvalidToken`, we map page-size violations to
787/// the same code (matches how AWS surfaces malformed pagination input
788/// for ops that don't model a separate `ValidationException`).
789fn validate_max_results(value: i64) -> Result<(), AwsServiceError> {
790    if !(0..=1000).contains(&value) {
791        return Err(AwsServiceError::aws_error(
792            StatusCode::BAD_REQUEST,
793            "InvalidToken",
794            format!("maxResults '{value}' is outside 0..=1000"),
795        ));
796    }
797    Ok(())
798}
799
800/// Start a Step Functions execution from a cross-service delivery (e.g. EventBridge).
801///
802/// This is the public entry point used by `StepFunctionsDeliveryImpl` in the server crate.
803/// It mirrors the logic from `StartExecution` but without the AWS request/response wrapper.
804/// Start a Step Functions execution from a cross-service delivery (e.g. EventBridge).
805///
806/// This is the public entry point used by `StepFunctionsDeliveryImpl` in the server crate.
807/// It mirrors the logic from `StartExecution` but without the AWS request/response wrapper.
808pub fn start_execution_from_delivery(
809    state: &SharedStepFunctionsState,
810    delivery: &Option<Arc<DeliveryBus>>,
811    dynamodb_state: &Option<SharedDynamoDbState>,
812    registry: &Option<SharedServiceRegistry>,
813    state_machine_arn: &str,
814    input: &str,
815) {
816    // Validate input is valid JSON
817    if serde_json::from_str::<serde_json::Value>(input).is_err() {
818        tracing::warn!(
819            state_machine_arn,
820            "Step Functions delivery: invalid JSON input, skipping execution"
821        );
822        return;
823    }
824
825    let execution_name = uuid::Uuid::new_v4().to_string();
826
827    // Extract account_id from the state machine ARN
828    let account_id = state_machine_arn
829        .split(':')
830        .nth(4)
831        .unwrap_or("000000000000")
832        .to_string();
833
834    let mut accounts = state.write();
835    let st = accounts.get_or_create(&account_id);
836    let sm = match st.state_machines.get(state_machine_arn) {
837        Some(sm) => sm,
838        None => {
839            tracing::warn!(
840                state_machine_arn,
841                "Step Functions delivery: state machine not found"
842            );
843            return;
844        }
845    };
846
847    let sm_name = sm.name.clone();
848    let definition = sm.definition.clone();
849    let exec_arn = st.execution_arn(&sm_name, &execution_name);
850
851    let now = Utc::now();
852    let execution = Execution {
853        execution_arn: exec_arn.clone(),
854        state_machine_arn: state_machine_arn.to_string(),
855        state_machine_name: sm_name,
856        name: execution_name,
857        status: ExecutionStatus::Running,
858        input: Some(input.to_string()),
859        output: None,
860        start_date: now,
861        stop_date: None,
862        error: None,
863        cause: None,
864        history_events: vec![],
865        parent_execution_arn: None,
866        is_sync: false,
867        billed_duration_ms: None,
868        billed_memory_mb: None,
869    };
870
871    st.executions.insert(exec_arn.clone(), execution);
872    let logging_config = sm.logging_configuration.clone();
873    drop(accounts);
874
875    let shared_state = state.clone();
876    let delivery = delivery.clone();
877    let dynamodb_state = dynamodb_state.clone();
878    let registry = registry.clone();
879    let input = Some(input.to_string());
880    tokio::spawn(async move {
881        interpreter::execute_state_machine(
882            shared_state,
883            exec_arn,
884            definition,
885            input,
886            delivery,
887            dynamodb_state,
888            registry,
889            logging_config,
890        )
891        .await;
892    });
893}
894
895#[cfg(test)]
896mod tests {
897    use super::*;
898    use http::{HeaderMap, Method};
899    use parking_lot::RwLock;
900    use serde_json::Value;
901    use std::collections::HashMap;
902    use std::sync::Arc;
903
904    fn make_state() -> SharedStepFunctionsState {
905        Arc::new(RwLock::new(
906            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
907        ))
908    }
909
910    fn make_request(action: &str, body: &str) -> AwsRequest {
911        AwsRequest {
912            service: "states".to_string(),
913            action: action.to_string(),
914            region: "us-east-1".to_string(),
915            account_id: "123456789012".to_string(),
916            request_id: "test-id".to_string(),
917            headers: HeaderMap::new(),
918            query_params: HashMap::new(),
919            body: body.as_bytes().to_vec().into(),
920            body_stream: parking_lot::Mutex::new(None),
921            path_segments: vec![],
922            raw_path: "/".to_string(),
923            raw_query: String::new(),
924            method: Method::POST,
925            is_query_protocol: false,
926            access_key_id: None,
927            principal: None,
928        }
929    }
930
931    fn body_json(resp: &AwsResponse) -> Value {
932        serde_json::from_slice(resp.body.expect_bytes()).unwrap()
933    }
934
935    fn expect_err(result: Result<AwsResponse, AwsServiceError>) -> AwsServiceError {
936        match result {
937            Err(e) => e,
938            Ok(_) => panic!("expected error, got Ok"),
939        }
940    }
941
942    const VALID_DEF: &str = r#"{"StartAt":"Pass","States":{"Pass":{"Type":"Pass","End":true}}}"#;
943
944    fn create_sm(svc: &StepFunctionsService, name: &str) -> String {
945        let body = json!({
946            "name": name,
947            "definition": VALID_DEF,
948            "roleArn": "arn:aws:iam::123456789012:role/test",
949        });
950        let req = make_request("CreateStateMachine", &body.to_string());
951        let resp = svc.create_state_machine(&req).unwrap();
952        let b = body_json(&resp);
953        b["stateMachineArn"].as_str().unwrap().to_string()
954    }
955
956    // ── CreateStateMachine ──
957
958    #[test]
959    fn create_state_machine_basic() {
960        let svc = StepFunctionsService::new(make_state());
961        let arn = create_sm(&svc, "test-sm");
962        assert!(arn.contains("test-sm"));
963    }
964
965    #[test]
966    fn create_state_machine_with_express_type() {
967        let svc = StepFunctionsService::new(make_state());
968        let body = json!({
969            "name": "express-sm",
970            "definition": VALID_DEF,
971            "roleArn": "arn:aws:iam::123456789012:role/r",
972            "type": "EXPRESS",
973        });
974        let req = make_request("CreateStateMachine", &body.to_string());
975        let resp = svc.create_state_machine(&req).unwrap();
976        let b = body_json(&resp);
977        assert!(b["stateMachineArn"].as_str().is_some());
978    }
979
980    #[test]
981    fn create_state_machine_duplicate_fails() {
982        let svc = StepFunctionsService::new(make_state());
983        create_sm(&svc, "dup-sm");
984        let body = json!({
985            "name": "dup-sm",
986            "definition": VALID_DEF,
987            "roleArn": "arn:aws:iam::123456789012:role/r",
988        });
989        let req = make_request("CreateStateMachine", &body.to_string());
990        let err = expect_err(svc.create_state_machine(&req));
991        assert!(err.to_string().contains("StateMachineAlreadyExists"));
992    }
993
994    #[test]
995    fn create_state_machine_missing_name() {
996        let svc = StepFunctionsService::new(make_state());
997        let body = json!({
998            "definition": VALID_DEF,
999            "roleArn": "arn:aws:iam::123456789012:role/r",
1000        });
1001        let req = make_request("CreateStateMachine", &body.to_string());
1002        assert!(svc.create_state_machine(&req).is_err());
1003    }
1004
1005    #[test]
1006    fn create_state_machine_invalid_definition() {
1007        let svc = StepFunctionsService::new(make_state());
1008        let body = json!({
1009            "name": "bad-def",
1010            "definition": "not json",
1011            "roleArn": "arn:aws:iam::123456789012:role/r",
1012        });
1013        let req = make_request("CreateStateMachine", &body.to_string());
1014        let err = expect_err(svc.create_state_machine(&req));
1015        assert!(err.to_string().contains("InvalidDefinition"));
1016    }
1017
1018    #[test]
1019    fn create_state_machine_definition_missing_start_at() {
1020        let svc = StepFunctionsService::new(make_state());
1021        let body = json!({
1022            "name": "no-start",
1023            "definition": r#"{"States":{"S":{"Type":"Pass","End":true}}}"#,
1024            "roleArn": "arn:aws:iam::123456789012:role/r",
1025        });
1026        let req = make_request("CreateStateMachine", &body.to_string());
1027        let err = expect_err(svc.create_state_machine(&req));
1028        assert!(err.to_string().contains("InvalidDefinition"));
1029    }
1030
1031    #[test]
1032    fn create_state_machine_definition_missing_states() {
1033        let svc = StepFunctionsService::new(make_state());
1034        let body = json!({
1035            "name": "no-states",
1036            "definition": r#"{"StartAt":"S"}"#,
1037            "roleArn": "arn:aws:iam::123456789012:role/r",
1038        });
1039        let req = make_request("CreateStateMachine", &body.to_string());
1040        let err = expect_err(svc.create_state_machine(&req));
1041        assert!(err.to_string().contains("InvalidDefinition"));
1042    }
1043
1044    #[test]
1045    fn create_state_machine_definition_start_at_not_in_states() {
1046        let svc = StepFunctionsService::new(make_state());
1047        let body = json!({
1048            "name": "bad-start",
1049            "definition": r#"{"StartAt":"Missing","States":{"S":{"Type":"Pass","End":true}}}"#,
1050            "roleArn": "arn:aws:iam::123456789012:role/r",
1051        });
1052        let req = make_request("CreateStateMachine", &body.to_string());
1053        let err = expect_err(svc.create_state_machine(&req));
1054        assert!(err.to_string().contains("MISSING_TRANSITION_TARGET"));
1055    }
1056
1057    #[test]
1058    fn create_state_machine_invalid_type() {
1059        let svc = StepFunctionsService::new(make_state());
1060        let body = json!({
1061            "name": "bad-type",
1062            "definition": VALID_DEF,
1063            "roleArn": "arn:aws:iam::123456789012:role/r",
1064            "type": "INVALID",
1065        });
1066        let req = make_request("CreateStateMachine", &body.to_string());
1067        assert!(svc.create_state_machine(&req).is_err());
1068    }
1069
1070    #[test]
1071    fn create_state_machine_invalid_arn() {
1072        let svc = StepFunctionsService::new(make_state());
1073        let body = json!({
1074            "name": "bad-arn",
1075            "definition": VALID_DEF,
1076            "roleArn": "not-an-arn",
1077        });
1078        let req = make_request("CreateStateMachine", &body.to_string());
1079        let err = expect_err(svc.create_state_machine(&req));
1080        assert!(err.to_string().contains("InvalidArn"));
1081    }
1082
1083    #[test]
1084    fn create_state_machine_invalid_name() {
1085        let svc = StepFunctionsService::new(make_state());
1086        let body = json!({
1087            "name": "has spaces!",
1088            "definition": VALID_DEF,
1089            "roleArn": "arn:aws:iam::123456789012:role/r",
1090        });
1091        let req = make_request("CreateStateMachine", &body.to_string());
1092        let err = expect_err(svc.create_state_machine(&req));
1093        assert!(err.to_string().contains("InvalidName"));
1094    }
1095
1096    #[test]
1097    fn create_state_machine_name_too_long() {
1098        let svc = StepFunctionsService::new(make_state());
1099        let long_name = "a".repeat(81);
1100        let body = json!({
1101            "name": long_name,
1102            "definition": VALID_DEF,
1103            "roleArn": "arn:aws:iam::123456789012:role/r",
1104        });
1105        let req = make_request("CreateStateMachine", &body.to_string());
1106        let err = expect_err(svc.create_state_machine(&req));
1107        assert!(err.to_string().contains("InvalidName"));
1108    }
1109
1110    // ── DescribeStateMachine ──
1111
1112    #[test]
1113    fn describe_state_machine_found() {
1114        let svc = StepFunctionsService::new(make_state());
1115        let arn = create_sm(&svc, "desc-sm");
1116
1117        let req = make_request(
1118            "DescribeStateMachine",
1119            &json!({"stateMachineArn": arn}).to_string(),
1120        );
1121        let resp = svc.describe_state_machine(&req).unwrap();
1122        let b = body_json(&resp);
1123        assert_eq!(b["name"], "desc-sm");
1124        assert_eq!(b["status"], "ACTIVE");
1125        assert!(b["definition"].as_str().is_some());
1126    }
1127
1128    #[test]
1129    fn describe_state_machine_not_found() {
1130        let svc = StepFunctionsService::new(make_state());
1131        let req = make_request(
1132            "DescribeStateMachine",
1133            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1134                .to_string(),
1135        );
1136        let err = expect_err(svc.describe_state_machine(&req));
1137        assert!(err.to_string().contains("StateMachineDoesNotExist"));
1138    }
1139
1140    // ── ListStateMachines ──
1141
1142    #[test]
1143    fn list_state_machines_empty() {
1144        let svc = StepFunctionsService::new(make_state());
1145        let req = make_request("ListStateMachines", "{}");
1146        let resp = svc.list_state_machines(&req).unwrap();
1147        let b = body_json(&resp);
1148        assert!(b["stateMachines"].as_array().unwrap().is_empty());
1149    }
1150
1151    #[test]
1152    fn list_state_machines_returns_created() {
1153        let svc = StepFunctionsService::new(make_state());
1154        create_sm(&svc, "sm-1");
1155        create_sm(&svc, "sm-2");
1156
1157        let req = make_request("ListStateMachines", "{}");
1158        let resp = svc.list_state_machines(&req).unwrap();
1159        let b = body_json(&resp);
1160        assert_eq!(b["stateMachines"].as_array().unwrap().len(), 2);
1161    }
1162
1163    // ── DeleteStateMachine ──
1164
1165    #[test]
1166    fn delete_state_machine() {
1167        let svc = StepFunctionsService::new(make_state());
1168        let arn = create_sm(&svc, "del-sm");
1169
1170        let req = make_request(
1171            "DeleteStateMachine",
1172            &json!({"stateMachineArn": arn}).to_string(),
1173        );
1174        svc.delete_state_machine(&req).unwrap();
1175
1176        // Describe should fail
1177        let req = make_request(
1178            "DescribeStateMachine",
1179            &json!({"stateMachineArn": arn}).to_string(),
1180        );
1181        assert!(svc.describe_state_machine(&req).is_err());
1182    }
1183
1184    #[test]
1185    fn delete_state_machine_nonexistent_succeeds() {
1186        let svc = StepFunctionsService::new(make_state());
1187        let req = make_request(
1188            "DeleteStateMachine",
1189            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1190                .to_string(),
1191        );
1192        // AWS returns success even for nonexistent
1193        svc.delete_state_machine(&req).unwrap();
1194    }
1195
1196    // ── UpdateStateMachine ──
1197
1198    #[test]
1199    fn update_state_machine() {
1200        let svc = StepFunctionsService::new(make_state());
1201        let arn = create_sm(&svc, "upd-sm");
1202
1203        let new_def = r#"{"StartAt":"NewPass","States":{"NewPass":{"Type":"Pass","End":true}}}"#;
1204        let body = json!({
1205            "stateMachineArn": arn,
1206            "definition": new_def,
1207            "description": "updated",
1208        });
1209        let req = make_request("UpdateStateMachine", &body.to_string());
1210        let resp = svc.update_state_machine(&req).unwrap();
1211        let b = body_json(&resp);
1212        assert!(b["updateDate"].as_f64().is_some());
1213
1214        // Verify
1215        let req = make_request(
1216            "DescribeStateMachine",
1217            &json!({"stateMachineArn": arn}).to_string(),
1218        );
1219        let resp = svc.describe_state_machine(&req).unwrap();
1220        let b = body_json(&resp);
1221        assert!(b["definition"].as_str().unwrap().contains("NewPass"));
1222        assert_eq!(b["description"], "updated");
1223    }
1224
1225    #[test]
1226    fn update_state_machine_not_found() {
1227        let svc = StepFunctionsService::new(make_state());
1228        let body = json!({
1229            "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1230            "definition": VALID_DEF,
1231        });
1232        let req = make_request("UpdateStateMachine", &body.to_string());
1233        let err = expect_err(svc.update_state_machine(&req));
1234        assert!(err.to_string().contains("StateMachineDoesNotExist"));
1235    }
1236
1237    // ── StartExecution ──
1238
1239    #[tokio::test]
1240    async fn start_execution_basic() {
1241        let svc = StepFunctionsService::new(make_state());
1242        let arn = create_sm(&svc, "exec-sm");
1243
1244        let body = json!({
1245            "stateMachineArn": arn,
1246            "input": r#"{"key":"value"}"#,
1247        });
1248        let req = make_request("StartExecution", &body.to_string());
1249        let resp = svc.start_execution(&req).unwrap();
1250        let b = body_json(&resp);
1251        assert!(b["executionArn"].as_str().is_some());
1252        assert!(b["startDate"].as_f64().is_some());
1253    }
1254
1255    #[tokio::test]
1256    async fn start_execution_with_name() {
1257        let svc = StepFunctionsService::new(make_state());
1258        let arn = create_sm(&svc, "named-exec");
1259
1260        let body = json!({
1261            "stateMachineArn": arn,
1262            "name": "my-execution",
1263        });
1264        let req = make_request("StartExecution", &body.to_string());
1265        let resp = svc.start_execution(&req).unwrap();
1266        let b = body_json(&resp);
1267        assert!(b["executionArn"].as_str().unwrap().contains("my-execution"));
1268    }
1269
1270    #[tokio::test]
1271    async fn start_execution_sm_not_found() {
1272        let svc = StepFunctionsService::new(make_state());
1273        let body = json!({
1274            "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1275        });
1276        let req = make_request("StartExecution", &body.to_string());
1277        let err = expect_err(svc.start_execution(&req));
1278        assert!(err.to_string().contains("StateMachineDoesNotExist"));
1279    }
1280
1281    #[tokio::test]
1282    async fn start_execution_invalid_input() {
1283        let svc = StepFunctionsService::new(make_state());
1284        let arn = create_sm(&svc, "bad-input");
1285
1286        let body = json!({
1287            "stateMachineArn": arn,
1288            "input": "not json",
1289        });
1290        let req = make_request("StartExecution", &body.to_string());
1291        let err = expect_err(svc.start_execution(&req));
1292        assert!(err.to_string().contains("InvalidExecutionInput"));
1293    }
1294
1295    #[tokio::test]
1296    async fn start_execution_duplicate_name() {
1297        let svc = StepFunctionsService::new(make_state());
1298        let arn = create_sm(&svc, "dup-exec");
1299
1300        let body = json!({
1301            "stateMachineArn": arn,
1302            "name": "same-name",
1303        });
1304        let req = make_request("StartExecution", &body.to_string());
1305        svc.start_execution(&req).unwrap();
1306
1307        let req = make_request("StartExecution", &body.to_string());
1308        let err = expect_err(svc.start_execution(&req));
1309        assert!(err.to_string().contains("ExecutionAlreadyExists"));
1310    }
1311
1312    // ── DescribeExecution ──
1313
1314    #[tokio::test]
1315    async fn describe_execution_found() {
1316        let svc = StepFunctionsService::new(make_state());
1317        let sm_arn = create_sm(&svc, "desc-exec");
1318
1319        let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
1320        let req = make_request("StartExecution", &body.to_string());
1321        let resp = svc.start_execution(&req).unwrap();
1322        let exec_arn = body_json(&resp)["executionArn"]
1323            .as_str()
1324            .unwrap()
1325            .to_string();
1326
1327        let req = make_request(
1328            "DescribeExecution",
1329            &json!({"executionArn": exec_arn}).to_string(),
1330        );
1331        let resp = svc.describe_execution(&req).unwrap();
1332        let b = body_json(&resp);
1333        assert_eq!(b["name"], "e1");
1334        assert_eq!(b["status"], "RUNNING");
1335    }
1336
1337    #[tokio::test]
1338    async fn describe_execution_not_found() {
1339        let svc = StepFunctionsService::new(make_state());
1340        let req = make_request(
1341            "DescribeExecution",
1342            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1343                .to_string(),
1344        );
1345        let err = expect_err(svc.describe_execution(&req));
1346        assert!(err.to_string().contains("ExecutionDoesNotExist"));
1347    }
1348
1349    // ── StopExecution ──
1350
1351    #[tokio::test]
1352    async fn stop_execution() {
1353        let svc = StepFunctionsService::new(make_state());
1354        let sm_arn = create_sm(&svc, "stop-sm");
1355
1356        let body = json!({"stateMachineArn": sm_arn, "name": "stop-e"});
1357        let req = make_request("StartExecution", &body.to_string());
1358        let resp = svc.start_execution(&req).unwrap();
1359        let exec_arn = body_json(&resp)["executionArn"]
1360            .as_str()
1361            .unwrap()
1362            .to_string();
1363
1364        let body = json!({
1365            "executionArn": exec_arn,
1366            "error": "UserAborted",
1367            "cause": "test stop",
1368        });
1369        let req = make_request("StopExecution", &body.to_string());
1370        let resp = svc.stop_execution(&req).unwrap();
1371        let b = body_json(&resp);
1372        assert!(b["stopDate"].as_f64().is_some());
1373
1374        // Verify aborted
1375        let req = make_request(
1376            "DescribeExecution",
1377            &json!({"executionArn": exec_arn}).to_string(),
1378        );
1379        let resp = svc.describe_execution(&req).unwrap();
1380        let b = body_json(&resp);
1381        assert_eq!(b["status"], "ABORTED");
1382        assert_eq!(b["error"], "UserAborted");
1383    }
1384
1385    #[tokio::test]
1386    async fn stop_execution_not_found() {
1387        let svc = StepFunctionsService::new(make_state());
1388        let req = make_request(
1389            "StopExecution",
1390            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1391                .to_string(),
1392        );
1393        let err = expect_err(svc.stop_execution(&req));
1394        assert!(err.to_string().contains("ExecutionDoesNotExist"));
1395    }
1396
1397    // ── ListExecutions ──
1398
1399    #[tokio::test]
1400    async fn list_executions() {
1401        let svc = StepFunctionsService::new(make_state());
1402        let sm_arn = create_sm(&svc, "list-exec");
1403
1404        for i in 0..3 {
1405            let body = json!({"stateMachineArn": sm_arn, "name": format!("e{i}")});
1406            let req = make_request("StartExecution", &body.to_string());
1407            svc.start_execution(&req).unwrap();
1408        }
1409
1410        let req = make_request(
1411            "ListExecutions",
1412            &json!({"stateMachineArn": sm_arn}).to_string(),
1413        );
1414        let resp = svc.list_executions(&req).unwrap();
1415        let b = body_json(&resp);
1416        assert_eq!(b["executions"].as_array().unwrap().len(), 3);
1417    }
1418
1419    #[tokio::test]
1420    async fn list_executions_sm_not_found() {
1421        let svc = StepFunctionsService::new(make_state());
1422        let req = make_request(
1423            "ListExecutions",
1424            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1425                .to_string(),
1426        );
1427        let err = expect_err(svc.list_executions(&req));
1428        assert!(err.to_string().contains("StateMachineDoesNotExist"));
1429    }
1430
1431    // ── GetExecutionHistory ──
1432
1433    #[tokio::test]
1434    async fn get_execution_history_not_found() {
1435        let svc = StepFunctionsService::new(make_state());
1436        let req = make_request(
1437            "GetExecutionHistory",
1438            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1439                .to_string(),
1440        );
1441        let err = expect_err(svc.get_execution_history(&req));
1442        assert!(err.to_string().contains("ExecutionDoesNotExist"));
1443    }
1444
1445    // ── DescribeStateMachineForExecution ──
1446
1447    #[tokio::test]
1448    async fn describe_sm_for_execution() {
1449        let svc = StepFunctionsService::new(make_state());
1450        let sm_arn = create_sm(&svc, "sm-for-exec");
1451
1452        let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
1453        let req = make_request("StartExecution", &body.to_string());
1454        let resp = svc.start_execution(&req).unwrap();
1455        let exec_arn = body_json(&resp)["executionArn"]
1456            .as_str()
1457            .unwrap()
1458            .to_string();
1459
1460        let req = make_request(
1461            "DescribeStateMachineForExecution",
1462            &json!({"executionArn": exec_arn}).to_string(),
1463        );
1464        let resp = svc.describe_state_machine_for_execution(&req).unwrap();
1465        let b = body_json(&resp);
1466        assert_eq!(b["name"], "sm-for-exec");
1467    }
1468
1469    // ── Tags ──
1470
1471    #[test]
1472    fn tag_untag_list_tags() {
1473        let svc = StepFunctionsService::new(make_state());
1474        let arn = create_sm(&svc, "tagged-sm");
1475
1476        // Tag
1477        let body = json!({
1478            "resourceArn": arn,
1479            "tags": [{"key": "env", "value": "prod"}],
1480        });
1481        let req = make_request("TagResource", &body.to_string());
1482        svc.tag_resource(&req).unwrap();
1483
1484        // List
1485        let req = make_request(
1486            "ListTagsForResource",
1487            &json!({"resourceArn": arn}).to_string(),
1488        );
1489        let resp = svc.list_tags_for_resource(&req).unwrap();
1490        let b = body_json(&resp);
1491        let tags = b["tags"].as_array().unwrap();
1492        assert_eq!(tags.len(), 1);
1493        assert_eq!(tags[0]["key"], "env");
1494
1495        // Untag
1496        let body = json!({
1497            "resourceArn": arn,
1498            "tagKeys": ["env"],
1499        });
1500        let req = make_request("UntagResource", &body.to_string());
1501        svc.untag_resource(&req).unwrap();
1502
1503        // Verify empty
1504        let req = make_request(
1505            "ListTagsForResource",
1506            &json!({"resourceArn": arn}).to_string(),
1507        );
1508        let resp = svc.list_tags_for_resource(&req).unwrap();
1509        let b = body_json(&resp);
1510        assert!(b["tags"].as_array().unwrap().is_empty());
1511    }
1512
1513    #[test]
1514    fn tag_resource_not_found() {
1515        let svc = StepFunctionsService::new(make_state());
1516        let body = json!({
1517            "resourceArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1518            "tags": [{"key": "k", "value": "v"}],
1519        });
1520        let req = make_request("TagResource", &body.to_string());
1521        let err = expect_err(svc.tag_resource(&req));
1522        assert!(err.to_string().contains("ResourceNotFound"));
1523    }
1524
1525    // ── Helper function tests ──
1526
1527    #[test]
1528    fn test_validate_name() {
1529        assert!(validate_name("valid-name").is_ok());
1530        assert!(validate_name("under_score").is_ok());
1531        assert!(validate_name("").is_err());
1532        assert!(validate_name("has spaces").is_err());
1533        assert!(validate_name(&"a".repeat(81)).is_err());
1534    }
1535
1536    #[test]
1537    fn test_validate_definition() {
1538        assert!(validate_definition(VALID_DEF).is_ok());
1539        assert!(validate_definition("not json").is_err());
1540        assert!(validate_definition(r#"{"States":{}}"#).is_err()); // missing StartAt
1541        assert!(validate_definition(r#"{"StartAt":"S"}"#).is_err()); // missing States
1542    }
1543
1544    #[test]
1545    fn test_validate_definition_rejects_malformed_paths() {
1546        // Unterminated bracket in InputPath.
1547        let def =
1548            r#"{"StartAt":"P","States":{"P":{"Type":"Pass","InputPath":"$.arr[","End":true}}}"#;
1549        assert!(validate_definition(def).is_err());
1550
1551        // Multibyte char where the close bracket would be, in OutputPath.
1552        let def =
1553            "{\"StartAt\":\"P\",\"States\":{\"P\":{\"Type\":\"Pass\",\"OutputPath\":\"$.x[\u{00e9}\",\"End\":true}}}";
1554        assert!(validate_definition(def).is_err());
1555
1556        // Malformed Choice Variable.
1557        let def = r#"{"StartAt":"C","States":{"C":{"Type":"Choice","Choices":[{"Variable":"$.n[","NumericEquals":1,"Next":"P"}]},"P":{"Type":"Pass","End":true}}}"#;
1558        assert!(validate_definition(def).is_err());
1559
1560        // Path not rooted at $.
1561        let def =
1562            r#"{"StartAt":"P","States":{"P":{"Type":"Pass","InputPath":"foo.bar","End":true}}}"#;
1563        assert!(validate_definition(def).is_err());
1564
1565        // Empty index brackets.
1566        let def =
1567            r#"{"StartAt":"P","States":{"P":{"Type":"Pass","ResultPath":"$.x[]","End":true}}}"#;
1568        assert!(validate_definition(def).is_err());
1569    }
1570
1571    #[test]
1572    fn test_validate_definition_accepts_well_formed_paths() {
1573        // Valid reference paths, the literal "null" for Input/OutputPath, and
1574        // nested Choice combinators must all be accepted.
1575        let def = r#"{"StartAt":"P","States":{
1576            "P":{"Type":"Pass","InputPath":"$.a.b[0].c","OutputPath":"$","ResultPath":"$.out","Next":"C"},
1577            "C":{"Type":"Choice","Choices":[
1578                {"And":[{"Variable":"$.items[2]","NumericEquals":1},{"Variable":"$.flag","BooleanEquals":true}],"Next":"S"}
1579            ],"Default":"S"},
1580            "S":{"Type":"Succeed","InputPath":"null"}
1581        }}"#;
1582        assert!(validate_definition(def).is_ok());
1583    }
1584
1585    #[test]
1586    fn test_is_valid_reference_path() {
1587        assert!(is_valid_reference_path("$"));
1588        assert!(is_valid_reference_path("$.foo"));
1589        assert!(is_valid_reference_path("$.foo.bar[3].baz"));
1590        assert!(is_valid_reference_path("$[0]"));
1591        assert!(!is_valid_reference_path("$.arr["));
1592        assert!(!is_valid_reference_path("$.x[\u{00e9}"));
1593        assert!(!is_valid_reference_path("$.x[]"));
1594        assert!(!is_valid_reference_path("$.x[abc]"));
1595        assert!(!is_valid_reference_path("foo.bar"));
1596        assert!(!is_valid_reference_path(""));
1597    }
1598
1599    #[test]
1600    fn test_validate_arn() {
1601        assert!(validate_arn("arn:aws:states:us-east-1:123:sm:test").is_ok());
1602        assert!(validate_arn("not-an-arn").is_err());
1603    }
1604
1605    #[test]
1606    fn test_camel_to_details_key() {
1607        assert_eq!(camel_to_details_key("PassStateEntered"), "passStateEntered");
1608        assert_eq!(camel_to_details_key(""), "");
1609    }
1610
1611    #[test]
1612    fn test_is_mutating_action() {
1613        assert!(is_mutating_action("CreateStateMachine"));
1614        assert!(is_mutating_action("StartExecution"));
1615        assert!(!is_mutating_action("DescribeStateMachine"));
1616        assert!(!is_mutating_action("ListStateMachines"));
1617    }
1618
1619    // ── StartSyncExecution ──
1620
1621    fn create_express_sm(svc: &StepFunctionsService, name: &str) -> String {
1622        let body = json!({
1623            "name": name,
1624            "definition": VALID_DEF,
1625            "roleArn": "arn:aws:iam::123456789012:role/test",
1626            "type": "EXPRESS",
1627        });
1628        let req = make_request("CreateStateMachine", &body.to_string());
1629        let resp = svc.create_state_machine(&req).unwrap();
1630        let b = body_json(&resp);
1631        b["stateMachineArn"].as_str().unwrap().to_string()
1632    }
1633
1634    #[tokio::test]
1635    async fn start_sync_execution_basic() {
1636        let svc = StepFunctionsService::new(make_state());
1637        let arn = create_express_sm(&svc, "sync-sm");
1638
1639        let body = json!({
1640            "stateMachineArn": arn,
1641            "input": r#"{"key":"value"}"#,
1642        });
1643        let req = make_request("StartSyncExecution", &body.to_string());
1644        let resp = svc.start_sync_execution(&req).await.unwrap();
1645        let b = body_json(&resp);
1646        assert!(b["executionArn"]
1647            .as_str()
1648            .unwrap()
1649            .contains("express:sync-sm"));
1650        assert_eq!(b["stateMachineArn"], arn);
1651        assert_eq!(b["status"], "SUCCEEDED");
1652        assert!(b["startDate"].as_i64().is_some());
1653        assert!(b["stopDate"].as_i64().is_some());
1654        assert!(b["output"].as_str().is_some());
1655        assert!(b["billingDetails"]["billedDurationInMilliseconds"]
1656            .as_i64()
1657            .is_some());
1658    }
1659
1660    #[tokio::test]
1661    async fn start_sync_execution_not_express() {
1662        let svc = StepFunctionsService::new(make_state());
1663        let arn = create_sm(&svc, "std-sm");
1664
1665        let body = json!({"stateMachineArn": arn});
1666        let req = make_request("StartSyncExecution", &body.to_string());
1667        let err = expect_err(svc.start_sync_execution(&req).await);
1668        assert!(err.to_string().contains("StateMachineTypeNotSupported"));
1669    }
1670
1671    #[tokio::test]
1672    async fn start_sync_execution_sm_not_found() {
1673        let svc = StepFunctionsService::new(make_state());
1674        let body = json!({
1675            "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1676        });
1677        let req = make_request("StartSyncExecution", &body.to_string());
1678        let err = expect_err(svc.start_sync_execution(&req).await);
1679        assert!(err.to_string().contains("StateMachineDoesNotExist"));
1680    }
1681
1682    #[tokio::test]
1683    async fn start_sync_execution_records_introspection_fields() {
1684        let svc = StepFunctionsService::new(make_state());
1685        let arn = create_express_sm(&svc, "sync-introspect");
1686
1687        let body = json!({"stateMachineArn": arn, "input": "{}"});
1688        let req = make_request("StartSyncExecution", &body.to_string());
1689        let resp = svc.start_sync_execution(&req).await.unwrap();
1690        let b = body_json(&resp);
1691        let exec_arn = b["executionArn"].as_str().unwrap().to_string();
1692
1693        let accounts = svc.state.read();
1694        let state = accounts.get("123456789012").unwrap();
1695        let stored = state
1696            .executions
1697            .get(&exec_arn)
1698            .expect("sync execution should be persisted for introspection");
1699        assert!(stored.is_sync, "sync executions must be marked is_sync");
1700        assert_eq!(stored.billed_memory_mb, Some(64));
1701        assert!(
1702            stored.billed_duration_ms.is_some(),
1703            "billed_duration_ms must be populated after sync run"
1704        );
1705        assert!(
1706            stored.parent_execution_arn.is_none(),
1707            "top-level sync execution has no parent"
1708        );
1709    }
1710
1711    #[tokio::test]
1712    async fn start_sync_execution_invalid_input() {
1713        let svc = StepFunctionsService::new(make_state());
1714        let arn = create_express_sm(&svc, "bad-input-sync");
1715
1716        let body = json!({
1717            "stateMachineArn": arn,
1718            "input": "not json",
1719        });
1720        let req = make_request("StartSyncExecution", &body.to_string());
1721        let err = expect_err(svc.start_sync_execution(&req).await);
1722        assert!(err.to_string().contains("InvalidExecutionInput"));
1723    }
1724
1725    /// No snapshot store (memory mode) -> no persist hook for the CFN provisioner.
1726    #[test]
1727    fn snapshot_hook_is_none_without_store() {
1728        let svc = StepFunctionsService::new(make_state());
1729        assert!(svc.snapshot_hook().is_none());
1730    }
1731
1732    /// With a store, the hook is present and invoking it runs the whole-state
1733    /// persist path the CloudFormation provisioner uses after mutating Step
1734    /// Functions state directly.
1735    #[tokio::test]
1736    async fn snapshot_hook_fires_with_store() {
1737        let store: Arc<dyn fakecloud_persistence::SnapshotStore> =
1738            Arc::new(fakecloud_persistence::MemorySnapshotStore::new());
1739        let svc = StepFunctionsService::new(make_state()).with_snapshot_store(store);
1740        let hook = svc
1741            .snapshot_hook()
1742            .expect("hook present when a store is set");
1743        hook().await;
1744    }
1745
1746    fn make_execution(arn: &str, status: ExecutionStatus) -> Execution {
1747        Execution {
1748            execution_arn: arn.to_string(),
1749            state_machine_arn: "arn:aws:states:us-east-1:123456789012:stateMachine:sm".to_string(),
1750            state_machine_name: "sm".to_string(),
1751            name: arn.to_string(),
1752            status,
1753            input: None,
1754            output: None,
1755            start_date: Utc::now(),
1756            stop_date: None,
1757            error: None,
1758            cause: None,
1759            history_events: vec![],
1760            parent_execution_arn: None,
1761            is_sync: false,
1762            billed_duration_ms: None,
1763            billed_memory_mb: None,
1764        }
1765    }
1766
1767    #[test]
1768    fn reconcile_aborts_running_executions_on_restart() {
1769        // After a restart a RUNNING execution has no interpreter driving it, so
1770        // it must be aborted rather than left RUNNING forever (0.A2). A
1771        // completed execution is untouched.
1772        let state = make_state();
1773        {
1774            let mut accounts = state.write();
1775            let s = accounts.get_or_create("123456789012");
1776            s.executions.insert(
1777                "running".into(),
1778                make_execution("running", ExecutionStatus::Running),
1779            );
1780            s.executions.insert(
1781                "done".into(),
1782                make_execution("done", ExecutionStatus::Succeeded),
1783            );
1784        }
1785
1786        let n = reconcile_interrupted_executions(&state);
1787        assert_eq!(n, 1, "only the RUNNING execution is reconciled");
1788
1789        let accounts = state.read();
1790        let s = accounts.get("123456789012").unwrap();
1791        let running = &s.executions["running"];
1792        assert_eq!(running.status, ExecutionStatus::Aborted);
1793        assert!(running.stop_date.is_some());
1794        assert_eq!(running.error.as_deref(), Some("Fakecloud.Restart"));
1795        assert_eq!(s.executions["done"].status, ExecutionStatus::Succeeded);
1796    }
1797}
1798
1799#[cfg(test)]
1800mod pagination_reject_test {
1801    #[test]
1802    fn paginate_checked_rejects_invalid_token() {
1803        use fakecloud_core::pagination::paginate_checked;
1804        let items: Vec<i32> = (0..5).collect();
1805        assert!(paginate_checked(&items, Some("bad"), 3).is_err());
1806        assert!(paginate_checked(&items, Some("2"), 3).is_ok());
1807    }
1808}