Skip to main content

fakecloud_stepfunctions/service/
mod.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use chrono::Utc;
6use http::StatusCode;
7use serde_json::{json, Value};
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::pagination::paginate_checked;
12use fakecloud_core::registry::ServiceRegistry;
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_core::validation::*;
15use fakecloud_dynamodb::SharedDynamoDbState;
16use fakecloud_persistence::SnapshotStore;
17
18use crate::interpreter;
19use crate::state::{
20    Execution, ExecutionStatus, SharedStepFunctionsState, StateMachine, StateMachineStatus,
21    StateMachineType, StepFunctionsSnapshot, StepFunctionsState,
22    STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
23};
24
25const SUPPORTED: &[&str] = &[
26    "CreateStateMachine",
27    "DescribeStateMachine",
28    "ListStateMachines",
29    "DeleteStateMachine",
30    "UpdateStateMachine",
31    "TagResource",
32    "UntagResource",
33    "ListTagsForResource",
34    "StartExecution",
35    "StopExecution",
36    "DescribeExecution",
37    "ListExecutions",
38    "GetExecutionHistory",
39    "DescribeStateMachineForExecution",
40    "CreateActivity",
41    "DeleteActivity",
42    "DescribeActivity",
43    "ListActivities",
44    "GetActivityTask",
45    "SendTaskFailure",
46    "SendTaskHeartbeat",
47    "SendTaskSuccess",
48    "PublishStateMachineVersion",
49    "DeleteStateMachineVersion",
50    "ListStateMachineVersions",
51    "CreateStateMachineAlias",
52    "DeleteStateMachineAlias",
53    "DescribeStateMachineAlias",
54    "ListStateMachineAliases",
55    "UpdateStateMachineAlias",
56    "DescribeMapRun",
57    "ListMapRuns",
58    "UpdateMapRun",
59    "RedriveExecution",
60    "StartSyncExecution",
61    "TestState",
62    "ValidateStateMachineDefinition",
63];
64
65/// Handle to the central service registry, set by `main.rs` after every service
66/// has been registered. Wrapped in `OnceLock` so `StepFunctionsService` can be
67/// constructed (and registered into the very registry it later reads back) before
68/// the registry itself is finalized. The interpreter snapshots the inner `Arc`
69/// when it needs to dispatch generic `aws-sdk:*` Task integrations.
70pub type SharedServiceRegistry = Arc<std::sync::OnceLock<Arc<ServiceRegistry>>>;
71
72pub struct StepFunctionsService {
73    state: SharedStepFunctionsState,
74    delivery: Option<Arc<DeliveryBus>>,
75    dynamodb_state: Option<SharedDynamoDbState>,
76    registry: Option<SharedServiceRegistry>,
77    snapshot_store: Option<Arc<dyn SnapshotStore>>,
78    snapshot_lock: Arc<AsyncMutex<()>>,
79}
80
81mod activities;
82mod executions;
83mod map_runs;
84mod state_machines;
85mod tags;
86mod tasks;
87mod validation;
88
89impl StepFunctionsService {
90    pub fn new(state: SharedStepFunctionsState) -> Self {
91        Self {
92            state,
93            delivery: None,
94            dynamodb_state: None,
95            registry: None,
96            snapshot_store: None,
97            snapshot_lock: Arc::new(AsyncMutex::new(())),
98        }
99    }
100
101    pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
102        self.delivery = Some(delivery);
103        self
104    }
105
106    pub fn with_dynamodb(mut self, dynamodb_state: SharedDynamoDbState) -> Self {
107        self.dynamodb_state = Some(dynamodb_state);
108        self
109    }
110
111    /// Hand the service a deferred-fill handle to the central [`ServiceRegistry`].
112    /// `main.rs` calls [`OnceLock::set`] on the inner cell after every service
113    /// has been registered; until then the interpreter falls back to its
114    /// hand-coded SDK integrations (lambda invoke, sqs sendMessage, …).
115    pub fn with_registry(mut self, registry: SharedServiceRegistry) -> Self {
116        self.registry = Some(registry);
117        self
118    }
119
120    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
121        self.snapshot_store = Some(store);
122        self
123    }
124
125    async fn save_snapshot(&self) {
126        save_stepfunctions_snapshot(
127            &self.state,
128            self.snapshot_store.clone(),
129            &self.snapshot_lock,
130        )
131        .await;
132    }
133
134    /// Build a hook that persists the current Step Functions state when invoked,
135    /// or `None` in memory mode (no snapshot store). The CloudFormation
136    /// provisioner mutates `state` directly and uses this to write a
137    /// CFN-provisioned resource through to disk, the same way a direct mutating
138    /// API call would.
139    pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
140        let store = self.snapshot_store.clone()?;
141        let state = self.state.clone();
142        let lock = self.snapshot_lock.clone();
143        Some(Arc::new(move || {
144            let state = state.clone();
145            let store = store.clone();
146            let lock = lock.clone();
147            Box::pin(async move {
148                save_stepfunctions_snapshot(&state, Some(store), &lock).await;
149            })
150        }))
151    }
152}
153
154/// Persist the current Step Functions state as a snapshot. Offloads the serde +
155/// blocking file write to the Tokio blocking pool. Noop when `store` is `None`
156/// (memory mode). Shared by `StepFunctionsService::save_snapshot` and the
157/// CloudFormation provisioner's post-provision persist hook so both route
158/// through the same serialize-and-write path.
159pub async fn save_stepfunctions_snapshot(
160    state: &SharedStepFunctionsState,
161    store: Option<Arc<dyn SnapshotStore>>,
162    lock: &AsyncMutex<()>,
163) {
164    let Some(store) = store else {
165        return;
166    };
167    let _guard = lock.lock().await;
168    let snapshot = StepFunctionsSnapshot {
169        schema_version: STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
170        state: None,
171        accounts: Some(state.read().clone()),
172    };
173    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
174        let bytes = serde_json::to_vec(&snapshot)
175            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
176        store.save(&bytes)
177    })
178    .await;
179    match join {
180        Ok(Ok(())) => {}
181        Ok(Err(err)) => tracing::error!(%err, "failed to write stepfunctions snapshot"),
182        Err(err) => tracing::error!(%err, "stepfunctions snapshot task panicked"),
183    }
184}
185
186/// Abort executions that were RUNNING (or PENDING_REDRIVE) when the server
187/// stopped, called once after a persistence snapshot is loaded on startup.
188///
189/// The in-memory interpreter that was driving each one is gone after a restart,
190/// so the execution can never advance. Real Step Functions resumes from durable
191/// interpreter state; fakecloud can't, so leaving them RUNNING would strand them
192/// forever (DescribeExecution would report RUNNING with no way to ever finish).
193/// Aborting with a terminal stop date + error/cause is the honest outcome
194/// (bug-audit 2026-06-20, 0.A2). Returns the number reconciled.
195pub fn reconcile_interrupted_executions(state: &SharedStepFunctionsState) -> usize {
196    let now = Utc::now();
197    let mut count = 0;
198    let mut accounts = state.write();
199    let account_ids: Vec<String> = accounts.iter().map(|(id, _)| id.to_string()).collect();
200    for account_id in account_ids {
201        let Some(s) = accounts.get_mut(&account_id) else {
202            continue;
203        };
204        for exec in s.executions.values_mut() {
205            if matches!(
206                exec.status,
207                ExecutionStatus::Running | ExecutionStatus::PendingRedrive
208            ) {
209                exec.status = ExecutionStatus::Aborted;
210                exec.stop_date = Some(now);
211                exec.error = Some("Fakecloud.Restart".to_string());
212                exec.cause = Some(
213                    "Execution was interrupted by a fakecloud restart and cannot be resumed"
214                        .to_string(),
215                );
216                count += 1;
217            }
218        }
219    }
220    count
221}
222
223fn is_mutating_action(action: &str) -> bool {
224    matches!(
225        action,
226        "CreateStateMachine"
227            | "DeleteStateMachine"
228            | "UpdateStateMachine"
229            | "TagResource"
230            | "UntagResource"
231            | "StartExecution"
232            | "StopExecution"
233            | "CreateActivity"
234            | "DeleteActivity"
235            | "GetActivityTask"
236            | "SendTaskFailure"
237            | "SendTaskHeartbeat"
238            | "SendTaskSuccess"
239            | "PublishStateMachineVersion"
240            | "DeleteStateMachineVersion"
241            | "CreateStateMachineAlias"
242            | "DeleteStateMachineAlias"
243            | "UpdateStateMachineAlias"
244            | "UpdateMapRun"
245            | "RedriveExecution"
246            | "StartSyncExecution"
247    )
248}
249
250#[async_trait]
251impl AwsService for StepFunctionsService {
252    fn service_name(&self) -> &str {
253        "states"
254    }
255
256    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
257        let mutates = is_mutating_action(req.action.as_str());
258        let result = match req.action.as_str() {
259            "CreateStateMachine" => self.create_state_machine(&req),
260            "DescribeStateMachine" => self.describe_state_machine(&req),
261            "ListStateMachines" => self.list_state_machines(&req),
262            "DeleteStateMachine" => self.delete_state_machine(&req),
263            "UpdateStateMachine" => self.update_state_machine(&req),
264            "TagResource" => self.tag_resource(&req),
265            "UntagResource" => self.untag_resource(&req),
266            "ListTagsForResource" => self.list_tags_for_resource(&req),
267            "StartExecution" => self.start_execution(&req),
268            "StopExecution" => self.stop_execution(&req),
269            "DescribeExecution" => self.describe_execution(&req),
270            "ListExecutions" => self.list_executions(&req),
271            "GetExecutionHistory" => self.get_execution_history(&req),
272            "DescribeStateMachineForExecution" => self.describe_state_machine_for_execution(&req),
273            "CreateActivity" => self.create_activity(&req),
274            "DeleteActivity" => self.delete_activity(&req),
275            "DescribeActivity" => self.describe_activity(&req),
276            "ListActivities" => self.list_activities(&req),
277            "GetActivityTask" => self.get_activity_task(&req).await,
278            "SendTaskFailure" => self.send_task_failure(&req),
279            "SendTaskHeartbeat" => self.send_task_heartbeat(&req),
280            "SendTaskSuccess" => self.send_task_success(&req),
281            "PublishStateMachineVersion" => self.publish_state_machine_version(&req),
282            "DeleteStateMachineVersion" => self.delete_state_machine_version(&req),
283            "ListStateMachineVersions" => self.list_state_machine_versions(&req),
284            "CreateStateMachineAlias" => self.create_state_machine_alias(&req),
285            "DeleteStateMachineAlias" => self.delete_state_machine_alias(&req),
286            "DescribeStateMachineAlias" => self.describe_state_machine_alias(&req),
287            "ListStateMachineAliases" => self.list_state_machine_aliases(&req),
288            "UpdateStateMachineAlias" => self.update_state_machine_alias(&req),
289            "DescribeMapRun" => self.describe_map_run(&req),
290            "ListMapRuns" => self.list_map_runs(&req),
291            "UpdateMapRun" => self.update_map_run(&req),
292            "RedriveExecution" => self.redrive_execution(&req),
293            "StartSyncExecution" => self.start_sync_execution(&req).await,
294            "TestState" => self.test_state(&req),
295            "ValidateStateMachineDefinition" => self.validate_state_machine_definition(&req),
296            _ => Err(AwsServiceError::action_not_implemented(
297                "states",
298                &req.action,
299            )),
300        };
301        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
302            self.save_snapshot().await;
303        }
304        result
305    }
306
307    fn supported_actions(&self) -> &[&str] {
308        SUPPORTED
309    }
310}
311
312impl StepFunctionsService {
313    fn list_activities(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
314        let body = req.json_body();
315        let raw_max_results = body["maxResults"].as_i64();
316        if let Some(mr) = raw_max_results {
317            validate_max_results(mr)?;
318        }
319        let next_token = body["nextToken"].as_str();
320        if let Some(t) = next_token {
321            validate_page_token(t)?;
322        }
323        // Smithy default: maxResults=0 means "use the service default"
324        // (100 for Step Functions), which we honour by treating both
325        // `None` and `0` as 100.
326        let max_results = match raw_max_results.unwrap_or(0) {
327            0 => 100,
328            n => n as usize,
329        };
330        let accounts = self.state.read();
331        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
332        let state = accounts.get(&req.account_id).unwrap_or(&empty);
333        let mut activities: Vec<&crate::state::Activity> = state.activities.values().collect();
334        activities.sort_by(|a, b| a.name.cmp(&b.name));
335        let items: Vec<Value> = activities
336            .iter()
337            .map(|a| {
338                json!({
339                    "activityArn": a.arn,
340                    "name": a.name,
341                    "creationDate": a.creation_date.timestamp(),
342                })
343            })
344            .collect();
345        let (page, token) =
346            paginate_checked(&items, next_token, max_results).map_err(|_| invalid_token())?;
347        let mut resp = json!({ "activities": page });
348        if let Some(t) = token {
349            resp["nextToken"] = json!(t);
350        }
351        Ok(AwsResponse::ok_json(resp))
352    }
353}
354
355fn state_machine_alias_to_json(alias: &crate::state::StateMachineAlias) -> Value {
356    json!({
357        "stateMachineAliasArn": alias.arn,
358        "name": alias.name,
359        "description": alias.description,
360        "routingConfiguration": alias.routing_configuration.iter().map(|r| json!({
361            "stateMachineVersionArn": r.state_machine_version_arn,
362            "weight": r.weight,
363        })).collect::<Vec<_>>(),
364        "creationDate": alias.creation_date.timestamp(),
365        "updateDate": alias.update_date.timestamp(),
366    })
367}
368
369fn map_run_to_json(mr: &crate::state::MapRun) -> Value {
370    json!({
371        "mapRunArn": mr.map_run_arn,
372        "executionArn": mr.execution_arn,
373        "maxConcurrency": mr.max_concurrency,
374        "toleratedFailurePercentage": mr.tolerated_failure_percentage,
375        "toleratedFailureCount": mr.tolerated_failure_count,
376        "status": mr.status,
377        "startDate": mr.start_date.timestamp(),
378        "stopDate": mr.stop_date.map(|d| d.timestamp()),
379    })
380}
381
382// ─── Helpers ────────────────────────────────────────────────────────────
383
384fn state_machine_to_json(sm: &StateMachine) -> Value {
385    let mut resp = json!({
386        "name": sm.name,
387        "stateMachineArn": sm.arn,
388        "definition": sm.definition,
389        "roleArn": sm.role_arn,
390        "type": sm.machine_type.as_str(),
391        "status": sm.status.as_str(),
392        "creationDate": sm.creation_date.timestamp() as f64,
393        "updateDate": sm.update_date.timestamp() as f64,
394        "revisionId": sm.revision_id,
395        "label": sm.name,
396    });
397
398    if !sm.description.is_empty() {
399        resp["description"] = json!(sm.description);
400    }
401
402    if let Some(ref logging) = sm.logging_configuration {
403        resp["loggingConfiguration"] = logging.clone();
404    } else {
405        resp["loggingConfiguration"] = json!({
406            "level": "OFF",
407            "includeExecutionData": false,
408            "destinations": [],
409        });
410    }
411
412    if let Some(ref tracing) = sm.tracing_configuration {
413        resp["tracingConfiguration"] = tracing.clone();
414    } else {
415        resp["tracingConfiguration"] = json!({
416            "enabled": false,
417        });
418    }
419
420    resp
421}
422
423fn missing(name: &str) -> AwsServiceError {
424    AwsServiceError::aws_error(
425        StatusCode::BAD_REQUEST,
426        "ValidationException",
427        format!("The request must contain the parameter {name}."),
428    )
429}
430
431fn state_machine_not_found(arn: &str) -> AwsServiceError {
432    AwsServiceError::aws_error(
433        StatusCode::BAD_REQUEST,
434        "StateMachineDoesNotExist",
435        format!("State Machine Does Not Exist: '{arn}'"),
436    )
437}
438
439fn activity_not_found(arn: &str) -> AwsServiceError {
440    AwsServiceError::aws_error(
441        StatusCode::BAD_REQUEST,
442        "ActivityDoesNotExist",
443        format!("Activity does not exist: {arn}"),
444    )
445}
446
447fn task_does_not_exist(token: &str) -> AwsServiceError {
448    AwsServiceError::aws_error(
449        StatusCode::BAD_REQUEST,
450        "TaskDoesNotExist",
451        format!("Task does not exist: {token}"),
452    )
453}
454
455fn resource_not_found(arn: &str) -> AwsServiceError {
456    AwsServiceError::aws_error(
457        StatusCode::BAD_REQUEST,
458        "ResourceNotFound",
459        format!("Resource not found: '{arn}'"),
460    )
461}
462
463/// Parse + validate an alias `routingConfiguration` array.
464///
465/// AWS rules: 1 or 2 routes; weights are 0-100 and sum to 100; each
466/// route must include `stateMachineVersionArn`.
467fn parse_routing_configuration(
468    routes: &[serde_json::Value],
469) -> Result<Vec<crate::state::AliasRoute>, AwsServiceError> {
470    if routes.is_empty() || routes.len() > 2 {
471        return Err(AwsServiceError::aws_error(
472            StatusCode::BAD_REQUEST,
473            "ValidationException",
474            "routingConfiguration must contain 1 or 2 routes.",
475        ));
476    }
477    let parsed: Vec<crate::state::AliasRoute> = routes
478        .iter()
479        .map(|r| {
480            let arn = r["stateMachineVersionArn"].as_str().ok_or_else(|| {
481                AwsServiceError::aws_error(
482                    StatusCode::BAD_REQUEST,
483                    "ValidationException",
484                    "routingConfiguration entries must contain stateMachineVersionArn.",
485                )
486            })?;
487            let weight = r["weight"].as_i64().ok_or_else(|| {
488                AwsServiceError::aws_error(
489                    StatusCode::BAD_REQUEST,
490                    "ValidationException",
491                    "routingConfiguration entries must contain a numeric weight.",
492                )
493            })?;
494            if !(0..=100).contains(&weight) {
495                return Err(AwsServiceError::aws_error(
496                    StatusCode::BAD_REQUEST,
497                    "ValidationException",
498                    format!("Invalid routing weight {weight}; must be 0-100."),
499                ));
500            }
501            Ok(crate::state::AliasRoute {
502                state_machine_version_arn: arn.to_string(),
503                weight: weight as i32,
504            })
505        })
506        .collect::<Result<_, _>>()?;
507    let total: i32 = parsed.iter().map(|r| r.weight).sum();
508    if total != 100 {
509        return Err(AwsServiceError::aws_error(
510            StatusCode::BAD_REQUEST,
511            "ValidationException",
512            format!("routingConfiguration weights must sum to 100, got {total}."),
513        ));
514    }
515    Ok(parsed)
516}
517
518fn validate_name(name: &str) -> Result<(), AwsServiceError> {
519    if name.is_empty() || name.len() > 80 {
520        return Err(AwsServiceError::aws_error(
521            StatusCode::BAD_REQUEST,
522            "InvalidName",
523            format!("Invalid Name: '{name}' (length must be between 1 and 80 characters)"),
524        ));
525    }
526    // Only allow alphanumeric, hyphens, and underscores
527    if !name
528        .chars()
529        .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
530    {
531        return Err(AwsServiceError::aws_error(
532            StatusCode::BAD_REQUEST,
533            "InvalidName",
534            format!(
535                "Invalid Name: '{name}' (must only contain alphanumeric characters, hyphens, and underscores)"
536            ),
537        ));
538    }
539    Ok(())
540}
541
542fn validate_definition(definition: &str) -> Result<(), AwsServiceError> {
543    let parsed: Value = serde_json::from_str(definition).map_err(|e| {
544        AwsServiceError::aws_error(
545            StatusCode::BAD_REQUEST,
546            "InvalidDefinition",
547            format!("Invalid State Machine Definition: '{e}'"),
548        )
549    })?;
550
551    if parsed.get("StartAt").and_then(|v| v.as_str()).is_none() {
552        return Err(AwsServiceError::aws_error(
553            StatusCode::BAD_REQUEST,
554            "InvalidDefinition",
555            "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
556                .to_string(),
557        ));
558    }
559
560    let states_obj = parsed
561        .get("States")
562        .and_then(|v| v.as_object())
563        .ok_or_else(|| {
564            AwsServiceError::aws_error(
565                StatusCode::BAD_REQUEST,
566                "InvalidDefinition",
567                "Invalid State Machine Definition: 'MISSING_STATES' (States field is required)"
568                    .to_string(),
569            )
570        })?;
571
572    let start_at = parsed["StartAt"].as_str().ok_or_else(|| {
573        AwsServiceError::aws_error(
574            StatusCode::BAD_REQUEST,
575            "InvalidDefinition",
576            "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
577                .to_string(),
578        )
579    })?;
580    if !states_obj.contains_key(start_at) {
581        return Err(AwsServiceError::aws_error(
582            StatusCode::BAD_REQUEST,
583            "InvalidDefinition",
584            format!(
585                "Invalid State Machine Definition: 'MISSING_TRANSITION_TARGET' \
586                 (StartAt '{start_at}' does not reference a valid state)"
587            ),
588        ));
589    }
590
591    // Reject malformed JSONPath reference fields. AWS rejects bad reference
592    // paths at CreateStateMachine; accepting them here lets a panic-inducing
593    // path reach the interpreter at execution time.
594    for (state_name, state_val) in states_obj {
595        validate_state_paths(state_name, state_val)?;
596    }
597
598    Ok(())
599}
600
601/// Validate the JSONPath reference fields of a single state. Recurses into the
602/// `Choices` array of Choice states. Returns an `InvalidDefinition` error for
603/// any syntactically malformed path.
604fn validate_state_paths(state_name: &str, state: &Value) -> Result<(), AwsServiceError> {
605    for field in ["InputPath", "OutputPath", "ResultPath"] {
606        if let Some(p) = state.get(field).and_then(|v| v.as_str()) {
607            // `InputPath`/`OutputPath` accept the literal "null" to mean "no
608            // value"; `ResultPath` accepts JSON null (handled separately) but
609            // not the string "null". Both accept "$"-rooted reference paths.
610            if (field != "ResultPath" && p == "null") || is_valid_reference_path(p) {
611                continue;
612            }
613            return Err(invalid_reference_path(state_name, field, p));
614        }
615    }
616
617    if state.get("Type").and_then(|v| v.as_str()) == Some("Choice") {
618        if let Some(choices) = state.get("Choices").and_then(|v| v.as_array()) {
619            for rule in choices {
620                validate_choice_variables(state_name, rule)?;
621            }
622        }
623    }
624
625    Ok(())
626}
627
628/// Validate `Variable` reference paths inside a Choice rule, recursing through
629/// nested `And` / `Or` / `Not` boolean combinators.
630fn validate_choice_variables(state_name: &str, rule: &Value) -> Result<(), AwsServiceError> {
631    if let Some(v) = rule.get("Variable").and_then(|v| v.as_str()) {
632        if !is_valid_reference_path(v) {
633            return Err(invalid_reference_path(state_name, "Variable", v));
634        }
635    }
636    for combinator in ["And", "Or"] {
637        if let Some(nested) = rule.get(combinator).and_then(|v| v.as_array()) {
638            for n in nested {
639                validate_choice_variables(state_name, n)?;
640            }
641        }
642    }
643    if let Some(n) = rule.get("Not") {
644        validate_choice_variables(state_name, n)?;
645    }
646    Ok(())
647}
648
649/// A reference path must start with `$` and every `[...]` index segment must be
650/// a balanced, non-empty, decimal-integer index. This mirrors the subset of
651/// JSONPath the interpreter understands; anything it cannot parse is rejected.
652fn is_valid_reference_path(path: &str) -> bool {
653    if path != "$" && !path.starts_with("$.") && !path.starts_with("$[") {
654        return false;
655    }
656    let body = path
657        .strip_prefix("$.")
658        .or_else(|| path.strip_prefix('$'))
659        .unwrap_or(path);
660    for part in body.split('.') {
661        if !segment_is_valid(part) {
662            return false;
663        }
664    }
665    true
666}
667
668fn segment_is_valid(part: &str) -> bool {
669    match part.find('[') {
670        None => !part.contains(']'),
671        Some(open) => {
672            if !part.ends_with(']') {
673                return false;
674            }
675            let inner = &part[open + 1..part.len() - 1];
676            // Empty `[]` or non-integer (incl. multibyte/garbage) indices are invalid.
677            !inner.is_empty() && inner.parse::<usize>().is_ok()
678        }
679    }
680}
681
682fn invalid_reference_path(state_name: &str, field: &str, path: &str) -> AwsServiceError {
683    AwsServiceError::aws_error(
684        StatusCode::BAD_REQUEST,
685        "InvalidDefinition",
686        format!(
687            "Invalid State Machine Definition: 'SCHEMA_VALIDATION_FAILED' \
688             (The {field} field of state '{state_name}' is not a valid reference path: '{path}')"
689        ),
690    )
691}
692
693fn execution_not_found(arn: &str) -> AwsServiceError {
694    AwsServiceError::aws_error(
695        StatusCode::BAD_REQUEST,
696        "ExecutionDoesNotExist",
697        format!("Execution Does Not Exist: '{arn}'"),
698    )
699}
700
701fn execution_to_json(exec: &Execution) -> Value {
702    let mut resp = json!({
703        "executionArn": exec.execution_arn,
704        "stateMachineArn": exec.state_machine_arn,
705        "name": exec.name,
706        "status": exec.status.as_str(),
707        "startDate": exec.start_date.timestamp() as f64,
708    });
709
710    if let Some(ref input) = exec.input {
711        resp["input"] = json!(input);
712    }
713    if let Some(ref output) = exec.output {
714        resp["output"] = json!(output);
715    }
716    if let Some(stop) = exec.stop_date {
717        resp["stopDate"] = json!(stop.timestamp() as f64);
718    }
719    if let Some(ref error) = exec.error {
720        resp["error"] = json!(error);
721    }
722    if let Some(ref cause) = exec.cause {
723        resp["cause"] = json!(cause);
724    }
725
726    resp
727}
728
729/// Convert an event type to the JSON key prefix used in the `HistoryEvent`
730/// shape (the full key is `<prefix>EventDetails`).
731///
732/// AWS collapses every `*StateEntered` event type (Pass/Task/Choice/Wait/Map/
733/// Parallel/Succeed/...) into a single `stateEnteredEventDetails` member, and
734/// every `*StateExited` into `stateExitedEventDetails`. All other event types
735/// map by lowercasing the first character (`TaskScheduled` ->
736/// `taskScheduledEventDetails`). Without the collapse, SDK deserializers see an
737/// unknown key like `passStateEnteredEventDetails` and return null name/input.
738fn camel_to_details_key(event_type: &str) -> String {
739    if event_type.ends_with("StateEntered") {
740        return "stateEntered".to_string();
741    }
742    if event_type.ends_with("StateExited") {
743        return "stateExited".to_string();
744    }
745    let mut chars = event_type.chars();
746    match chars.next() {
747        None => String::new(),
748        Some(c) => c.to_lowercase().to_string() + chars.as_str(),
749    }
750}
751
752/// Compare two StartExecution inputs for STANDARD idempotency. A missing input
753/// defaults to `{}` (AWS's default execution input). Comparison is structural
754/// (parsed JSON), so insignificant whitespace differences still dedup; inputs
755/// that fail to parse fall back to an exact string match.
756fn execution_input_matches(stored: Option<&str>, incoming: Option<&str>) -> bool {
757    let stored = stored.unwrap_or("{}");
758    let incoming = incoming.unwrap_or("{}");
759    match (
760        serde_json::from_str::<Value>(stored),
761        serde_json::from_str::<Value>(incoming),
762    ) {
763        (Ok(a), Ok(b)) => a == b,
764        _ => stored == incoming,
765    }
766}
767
768fn validate_arn(arn: &str) -> Result<(), AwsServiceError> {
769    if !arn.starts_with("arn:") {
770        return Err(AwsServiceError::aws_error(
771            StatusCode::BAD_REQUEST,
772            "InvalidArn",
773            format!("Invalid Arn: '{arn}'"),
774        ));
775    }
776    Ok(())
777}
778
779/// Enforce the Smithy @length on an ARN-typed field (`Arn` = 1..=256,
780/// `LongArn` = 1..=2000). Empty / oversize ARNs map to `InvalidArn`, which
781/// every ARN-bearing Step Functions operation declares.
782fn validate_arn_length(field: &str, value: &str, max: usize) -> Result<(), AwsServiceError> {
783    if value.is_empty() || value.len() > max {
784        return Err(AwsServiceError::aws_error(
785            StatusCode::BAD_REQUEST,
786            "InvalidArn",
787            format!("Invalid Arn at '{field}': must be 1..={max} characters"),
788        ));
789    }
790    Ok(())
791}
792
793/// Shared error for a malformed (unparseable) `nextToken` reaching the
794/// pagination helper — `InvalidToken` is the only pagination error code
795/// declared on every list op.
796pub(super) fn invalid_token() -> AwsServiceError {
797    AwsServiceError::aws_error(StatusCode::BAD_REQUEST, "InvalidToken", "Invalid nextToken")
798}
799
800/// `nextToken` is declared as `PageToken` (length 1..=1024). The only
801/// error code declared on every paginated list op is `InvalidToken`, so
802/// route both empty and oversize tokens through it.
803fn validate_page_token(value: &str) -> Result<(), AwsServiceError> {
804    if value.is_empty() || value.len() > 1024 {
805        return Err(AwsServiceError::aws_error(
806            StatusCode::BAD_REQUEST,
807            "InvalidToken",
808            "nextToken must be 1..=1024 characters",
809        ));
810    }
811    Ok(())
812}
813
814/// `maxResults` is typed as `PageSize` (range 0..=1000). The negative
815/// probes flip below-min / above-max; since the only error declared on
816/// `ListActivities` is `InvalidToken`, we map page-size violations to
817/// the same code (matches how AWS surfaces malformed pagination input
818/// for ops that don't model a separate `ValidationException`).
819fn validate_max_results(value: i64) -> Result<(), AwsServiceError> {
820    if !(0..=1000).contains(&value) {
821        return Err(AwsServiceError::aws_error(
822            StatusCode::BAD_REQUEST,
823            "InvalidToken",
824            format!("maxResults '{value}' is outside 0..=1000"),
825        ));
826    }
827    Ok(())
828}
829
830/// Start a Step Functions execution from a cross-service delivery (e.g. EventBridge).
831///
832/// This is the public entry point used by `StepFunctionsDeliveryImpl` in the server crate.
833/// It mirrors the logic from `StartExecution` but without the AWS request/response wrapper.
834/// Start a Step Functions execution from a cross-service delivery (e.g. EventBridge).
835///
836/// This is the public entry point used by `StepFunctionsDeliveryImpl` in the server crate.
837/// It mirrors the logic from `StartExecution` but without the AWS request/response wrapper.
838pub fn start_execution_from_delivery(
839    state: &SharedStepFunctionsState,
840    delivery: &Option<Arc<DeliveryBus>>,
841    dynamodb_state: &Option<SharedDynamoDbState>,
842    registry: &Option<SharedServiceRegistry>,
843    state_machine_arn: &str,
844    input: &str,
845) {
846    // Validate input is valid JSON
847    if serde_json::from_str::<serde_json::Value>(input).is_err() {
848        tracing::warn!(
849            state_machine_arn,
850            "Step Functions delivery: invalid JSON input, skipping execution"
851        );
852        return;
853    }
854
855    let execution_name = uuid::Uuid::new_v4().to_string();
856
857    // Extract account_id from the state machine ARN
858    let account_id = state_machine_arn
859        .split(':')
860        .nth(4)
861        .unwrap_or("000000000000")
862        .to_string();
863
864    let mut accounts = state.write();
865    let st = accounts.get_or_create(&account_id);
866    let sm = match st.state_machines.get(state_machine_arn) {
867        Some(sm) => sm,
868        None => {
869            tracing::warn!(
870                state_machine_arn,
871                "Step Functions delivery: state machine not found"
872            );
873            return;
874        }
875    };
876
877    let sm_name = sm.name.clone();
878    let definition = sm.definition.clone();
879    let exec_arn = st.execution_arn(&sm_name, &execution_name);
880
881    let now = Utc::now();
882    let execution = Execution {
883        execution_arn: exec_arn.clone(),
884        state_machine_arn: state_machine_arn.to_string(),
885        state_machine_name: sm_name,
886        name: execution_name,
887        status: ExecutionStatus::Running,
888        input: Some(input.to_string()),
889        output: None,
890        start_date: now,
891        stop_date: None,
892        error: None,
893        cause: None,
894        history_events: vec![],
895        parent_execution_arn: None,
896        is_sync: false,
897        billed_duration_ms: None,
898        billed_memory_mb: None,
899    };
900
901    st.executions.insert(exec_arn.clone(), execution);
902    let logging_config = sm.logging_configuration.clone();
903    drop(accounts);
904
905    let shared_state = state.clone();
906    let delivery = delivery.clone();
907    let dynamodb_state = dynamodb_state.clone();
908    let registry = registry.clone();
909    let input = Some(input.to_string());
910    tokio::spawn(async move {
911        interpreter::execute_state_machine(
912            shared_state,
913            exec_arn,
914            definition,
915            input,
916            delivery,
917            dynamodb_state,
918            registry,
919            logging_config,
920        )
921        .await;
922    });
923}
924
925#[cfg(test)]
926mod tests {
927    use super::*;
928    use http::{HeaderMap, Method};
929    use parking_lot::RwLock;
930    use serde_json::Value;
931    use std::collections::HashMap;
932    use std::sync::Arc;
933
934    fn make_state() -> SharedStepFunctionsState {
935        Arc::new(RwLock::new(
936            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
937        ))
938    }
939
940    fn make_request(action: &str, body: &str) -> AwsRequest {
941        AwsRequest {
942            service: "states".to_string(),
943            action: action.to_string(),
944            region: "us-east-1".to_string(),
945            account_id: "123456789012".to_string(),
946            request_id: "test-id".to_string(),
947            headers: HeaderMap::new(),
948            query_params: HashMap::new(),
949            body: body.as_bytes().to_vec().into(),
950            body_stream: parking_lot::Mutex::new(None),
951            path_segments: vec![],
952            raw_path: "/".to_string(),
953            raw_query: String::new(),
954            method: Method::POST,
955            is_query_protocol: false,
956            access_key_id: None,
957            principal: None,
958        }
959    }
960
961    fn body_json(resp: &AwsResponse) -> Value {
962        serde_json::from_slice(resp.body.expect_bytes()).unwrap()
963    }
964
965    fn expect_err(result: Result<AwsResponse, AwsServiceError>) -> AwsServiceError {
966        match result {
967            Err(e) => e,
968            Ok(_) => panic!("expected error, got Ok"),
969        }
970    }
971
972    const VALID_DEF: &str = r#"{"StartAt":"Pass","States":{"Pass":{"Type":"Pass","End":true}}}"#;
973
974    fn create_sm(svc: &StepFunctionsService, name: &str) -> String {
975        let body = json!({
976            "name": name,
977            "definition": VALID_DEF,
978            "roleArn": "arn:aws:iam::123456789012:role/test",
979        });
980        let req = make_request("CreateStateMachine", &body.to_string());
981        let resp = svc.create_state_machine(&req).unwrap();
982        let b = body_json(&resp);
983        b["stateMachineArn"].as_str().unwrap().to_string()
984    }
985
986    // ── CreateStateMachine ──
987
988    #[test]
989    fn create_state_machine_basic() {
990        let svc = StepFunctionsService::new(make_state());
991        let arn = create_sm(&svc, "test-sm");
992        assert!(arn.contains("test-sm"));
993    }
994
995    #[test]
996    fn create_state_machine_with_express_type() {
997        let svc = StepFunctionsService::new(make_state());
998        let body = json!({
999            "name": "express-sm",
1000            "definition": VALID_DEF,
1001            "roleArn": "arn:aws:iam::123456789012:role/r",
1002            "type": "EXPRESS",
1003        });
1004        let req = make_request("CreateStateMachine", &body.to_string());
1005        let resp = svc.create_state_machine(&req).unwrap();
1006        let b = body_json(&resp);
1007        assert!(b["stateMachineArn"].as_str().is_some());
1008    }
1009
1010    #[test]
1011    fn create_state_machine_duplicate_fails() {
1012        let svc = StepFunctionsService::new(make_state());
1013        create_sm(&svc, "dup-sm");
1014        let body = json!({
1015            "name": "dup-sm",
1016            "definition": VALID_DEF,
1017            "roleArn": "arn:aws:iam::123456789012:role/r",
1018        });
1019        let req = make_request("CreateStateMachine", &body.to_string());
1020        let err = expect_err(svc.create_state_machine(&req));
1021        assert!(err.to_string().contains("StateMachineAlreadyExists"));
1022    }
1023
1024    #[test]
1025    fn create_state_machine_missing_name() {
1026        let svc = StepFunctionsService::new(make_state());
1027        let body = json!({
1028            "definition": VALID_DEF,
1029            "roleArn": "arn:aws:iam::123456789012:role/r",
1030        });
1031        let req = make_request("CreateStateMachine", &body.to_string());
1032        assert!(svc.create_state_machine(&req).is_err());
1033    }
1034
1035    #[test]
1036    fn create_state_machine_invalid_definition() {
1037        let svc = StepFunctionsService::new(make_state());
1038        let body = json!({
1039            "name": "bad-def",
1040            "definition": "not json",
1041            "roleArn": "arn:aws:iam::123456789012:role/r",
1042        });
1043        let req = make_request("CreateStateMachine", &body.to_string());
1044        let err = expect_err(svc.create_state_machine(&req));
1045        assert!(err.to_string().contains("InvalidDefinition"));
1046    }
1047
1048    #[test]
1049    fn create_state_machine_definition_missing_start_at() {
1050        let svc = StepFunctionsService::new(make_state());
1051        let body = json!({
1052            "name": "no-start",
1053            "definition": r#"{"States":{"S":{"Type":"Pass","End":true}}}"#,
1054            "roleArn": "arn:aws:iam::123456789012:role/r",
1055        });
1056        let req = make_request("CreateStateMachine", &body.to_string());
1057        let err = expect_err(svc.create_state_machine(&req));
1058        assert!(err.to_string().contains("InvalidDefinition"));
1059    }
1060
1061    #[test]
1062    fn create_state_machine_definition_missing_states() {
1063        let svc = StepFunctionsService::new(make_state());
1064        let body = json!({
1065            "name": "no-states",
1066            "definition": r#"{"StartAt":"S"}"#,
1067            "roleArn": "arn:aws:iam::123456789012:role/r",
1068        });
1069        let req = make_request("CreateStateMachine", &body.to_string());
1070        let err = expect_err(svc.create_state_machine(&req));
1071        assert!(err.to_string().contains("InvalidDefinition"));
1072    }
1073
1074    #[test]
1075    fn create_state_machine_definition_start_at_not_in_states() {
1076        let svc = StepFunctionsService::new(make_state());
1077        let body = json!({
1078            "name": "bad-start",
1079            "definition": r#"{"StartAt":"Missing","States":{"S":{"Type":"Pass","End":true}}}"#,
1080            "roleArn": "arn:aws:iam::123456789012:role/r",
1081        });
1082        let req = make_request("CreateStateMachine", &body.to_string());
1083        let err = expect_err(svc.create_state_machine(&req));
1084        assert!(err.to_string().contains("MISSING_TRANSITION_TARGET"));
1085    }
1086
1087    #[test]
1088    fn create_state_machine_invalid_type() {
1089        let svc = StepFunctionsService::new(make_state());
1090        let body = json!({
1091            "name": "bad-type",
1092            "definition": VALID_DEF,
1093            "roleArn": "arn:aws:iam::123456789012:role/r",
1094            "type": "INVALID",
1095        });
1096        let req = make_request("CreateStateMachine", &body.to_string());
1097        assert!(svc.create_state_machine(&req).is_err());
1098    }
1099
1100    #[test]
1101    fn create_state_machine_invalid_arn() {
1102        let svc = StepFunctionsService::new(make_state());
1103        let body = json!({
1104            "name": "bad-arn",
1105            "definition": VALID_DEF,
1106            "roleArn": "not-an-arn",
1107        });
1108        let req = make_request("CreateStateMachine", &body.to_string());
1109        let err = expect_err(svc.create_state_machine(&req));
1110        assert!(err.to_string().contains("InvalidArn"));
1111    }
1112
1113    #[test]
1114    fn create_state_machine_invalid_name() {
1115        let svc = StepFunctionsService::new(make_state());
1116        let body = json!({
1117            "name": "has spaces!",
1118            "definition": VALID_DEF,
1119            "roleArn": "arn:aws:iam::123456789012:role/r",
1120        });
1121        let req = make_request("CreateStateMachine", &body.to_string());
1122        let err = expect_err(svc.create_state_machine(&req));
1123        assert!(err.to_string().contains("InvalidName"));
1124    }
1125
1126    #[test]
1127    fn create_state_machine_name_too_long() {
1128        let svc = StepFunctionsService::new(make_state());
1129        let long_name = "a".repeat(81);
1130        let body = json!({
1131            "name": long_name,
1132            "definition": VALID_DEF,
1133            "roleArn": "arn:aws:iam::123456789012:role/r",
1134        });
1135        let req = make_request("CreateStateMachine", &body.to_string());
1136        let err = expect_err(svc.create_state_machine(&req));
1137        assert!(err.to_string().contains("InvalidName"));
1138    }
1139
1140    // ── DescribeStateMachine ──
1141
1142    #[test]
1143    fn describe_state_machine_found() {
1144        let svc = StepFunctionsService::new(make_state());
1145        let arn = create_sm(&svc, "desc-sm");
1146
1147        let req = make_request(
1148            "DescribeStateMachine",
1149            &json!({"stateMachineArn": arn}).to_string(),
1150        );
1151        let resp = svc.describe_state_machine(&req).unwrap();
1152        let b = body_json(&resp);
1153        assert_eq!(b["name"], "desc-sm");
1154        assert_eq!(b["status"], "ACTIVE");
1155        assert!(b["definition"].as_str().is_some());
1156    }
1157
1158    #[test]
1159    fn describe_state_machine_not_found() {
1160        let svc = StepFunctionsService::new(make_state());
1161        let req = make_request(
1162            "DescribeStateMachine",
1163            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1164                .to_string(),
1165        );
1166        let err = expect_err(svc.describe_state_machine(&req));
1167        assert!(err.to_string().contains("StateMachineDoesNotExist"));
1168    }
1169
1170    // ── ListStateMachines ──
1171
1172    #[test]
1173    fn list_state_machines_empty() {
1174        let svc = StepFunctionsService::new(make_state());
1175        let req = make_request("ListStateMachines", "{}");
1176        let resp = svc.list_state_machines(&req).unwrap();
1177        let b = body_json(&resp);
1178        assert!(b["stateMachines"].as_array().unwrap().is_empty());
1179    }
1180
1181    #[test]
1182    fn list_state_machines_returns_created() {
1183        let svc = StepFunctionsService::new(make_state());
1184        create_sm(&svc, "sm-1");
1185        create_sm(&svc, "sm-2");
1186
1187        let req = make_request("ListStateMachines", "{}");
1188        let resp = svc.list_state_machines(&req).unwrap();
1189        let b = body_json(&resp);
1190        assert_eq!(b["stateMachines"].as_array().unwrap().len(), 2);
1191    }
1192
1193    // ── DeleteStateMachine ──
1194
1195    #[test]
1196    fn delete_state_machine() {
1197        let svc = StepFunctionsService::new(make_state());
1198        let arn = create_sm(&svc, "del-sm");
1199
1200        let req = make_request(
1201            "DeleteStateMachine",
1202            &json!({"stateMachineArn": arn}).to_string(),
1203        );
1204        svc.delete_state_machine(&req).unwrap();
1205
1206        // Describe should fail
1207        let req = make_request(
1208            "DescribeStateMachine",
1209            &json!({"stateMachineArn": arn}).to_string(),
1210        );
1211        assert!(svc.describe_state_machine(&req).is_err());
1212    }
1213
1214    #[test]
1215    fn delete_state_machine_nonexistent_succeeds() {
1216        let svc = StepFunctionsService::new(make_state());
1217        let req = make_request(
1218            "DeleteStateMachine",
1219            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1220                .to_string(),
1221        );
1222        // AWS returns success even for nonexistent
1223        svc.delete_state_machine(&req).unwrap();
1224    }
1225
1226    // ── UpdateStateMachine ──
1227
1228    #[test]
1229    fn update_state_machine() {
1230        let svc = StepFunctionsService::new(make_state());
1231        let arn = create_sm(&svc, "upd-sm");
1232
1233        let new_def = r#"{"StartAt":"NewPass","States":{"NewPass":{"Type":"Pass","End":true}}}"#;
1234        let body = json!({
1235            "stateMachineArn": arn,
1236            "definition": new_def,
1237            "description": "updated",
1238        });
1239        let req = make_request("UpdateStateMachine", &body.to_string());
1240        let resp = svc.update_state_machine(&req).unwrap();
1241        let b = body_json(&resp);
1242        assert!(b["updateDate"].as_f64().is_some());
1243
1244        // Verify
1245        let req = make_request(
1246            "DescribeStateMachine",
1247            &json!({"stateMachineArn": arn}).to_string(),
1248        );
1249        let resp = svc.describe_state_machine(&req).unwrap();
1250        let b = body_json(&resp);
1251        assert!(b["definition"].as_str().unwrap().contains("NewPass"));
1252        assert_eq!(b["description"], "updated");
1253    }
1254
1255    #[test]
1256    fn update_state_machine_not_found() {
1257        let svc = StepFunctionsService::new(make_state());
1258        let body = json!({
1259            "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1260            "definition": VALID_DEF,
1261        });
1262        let req = make_request("UpdateStateMachine", &body.to_string());
1263        let err = expect_err(svc.update_state_machine(&req));
1264        assert!(err.to_string().contains("StateMachineDoesNotExist"));
1265    }
1266
1267    // ── StartExecution ──
1268
1269    #[tokio::test]
1270    async fn start_execution_basic() {
1271        let svc = StepFunctionsService::new(make_state());
1272        let arn = create_sm(&svc, "exec-sm");
1273
1274        let body = json!({
1275            "stateMachineArn": arn,
1276            "input": r#"{"key":"value"}"#,
1277        });
1278        let req = make_request("StartExecution", &body.to_string());
1279        let resp = svc.start_execution(&req).unwrap();
1280        let b = body_json(&resp);
1281        assert!(b["executionArn"].as_str().is_some());
1282        assert!(b["startDate"].as_f64().is_some());
1283    }
1284
1285    #[tokio::test]
1286    async fn start_execution_with_name() {
1287        let svc = StepFunctionsService::new(make_state());
1288        let arn = create_sm(&svc, "named-exec");
1289
1290        let body = json!({
1291            "stateMachineArn": arn,
1292            "name": "my-execution",
1293        });
1294        let req = make_request("StartExecution", &body.to_string());
1295        let resp = svc.start_execution(&req).unwrap();
1296        let b = body_json(&resp);
1297        assert!(b["executionArn"].as_str().unwrap().contains("my-execution"));
1298    }
1299
1300    #[tokio::test]
1301    async fn start_execution_sm_not_found() {
1302        let svc = StepFunctionsService::new(make_state());
1303        let body = json!({
1304            "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1305        });
1306        let req = make_request("StartExecution", &body.to_string());
1307        let err = expect_err(svc.start_execution(&req));
1308        assert!(err.to_string().contains("StateMachineDoesNotExist"));
1309    }
1310
1311    #[tokio::test]
1312    async fn start_execution_invalid_input() {
1313        let svc = StepFunctionsService::new(make_state());
1314        let arn = create_sm(&svc, "bad-input");
1315
1316        let body = json!({
1317            "stateMachineArn": arn,
1318            "input": "not json",
1319        });
1320        let req = make_request("StartExecution", &body.to_string());
1321        let err = expect_err(svc.start_execution(&req));
1322        assert!(err.to_string().contains("InvalidExecutionInput"));
1323    }
1324
1325    #[tokio::test]
1326    async fn start_execution_same_name_same_input_is_idempotent() {
1327        let svc = StepFunctionsService::new(make_state());
1328        let arn = create_sm(&svc, "dup-exec");
1329
1330        let body = json!({
1331            "stateMachineArn": arn,
1332            "name": "same-name",
1333            "input": "{\"a\":1}",
1334        });
1335        let req = make_request("StartExecution", &body.to_string());
1336        let first = body_json(&svc.start_execution(&req).unwrap());
1337
1338        // Same name AND same input -> 200 with the existing executionArn.
1339        let req = make_request("StartExecution", &body.to_string());
1340        let second = body_json(&svc.start_execution(&req).unwrap());
1341        assert_eq!(first["executionArn"], second["executionArn"]);
1342        assert_eq!(first["startDate"], second["startDate"]);
1343    }
1344
1345    #[tokio::test]
1346    async fn start_execution_same_name_different_input_conflicts() {
1347        let svc = StepFunctionsService::new(make_state());
1348        let arn = create_sm(&svc, "dup-exec-diff");
1349
1350        let req = make_request(
1351            "StartExecution",
1352            &json!({
1353                "stateMachineArn": arn,
1354                "name": "same-name",
1355                "input": "{\"a\":1}",
1356            })
1357            .to_string(),
1358        );
1359        svc.start_execution(&req).unwrap();
1360
1361        // Same name, DIFFERENT input -> 400 ExecutionAlreadyExists.
1362        let req = make_request(
1363            "StartExecution",
1364            &json!({
1365                "stateMachineArn": arn,
1366                "name": "same-name",
1367                "input": "{\"a\":2}",
1368            })
1369            .to_string(),
1370        );
1371        let err = expect_err(svc.start_execution(&req));
1372        assert!(err.to_string().contains("ExecutionAlreadyExists"));
1373    }
1374
1375    #[tokio::test]
1376    async fn start_execution_express_name_collision_never_idempotent() {
1377        let svc = StepFunctionsService::new(make_state());
1378        let arn = create_express_sm(&svc, "dup-exec-express");
1379
1380        let body = json!({
1381            "stateMachineArn": arn,
1382            "name": "same-name",
1383            "input": "{\"a\":1}",
1384        });
1385        let req = make_request("StartExecution", &body.to_string());
1386        svc.start_execution(&req).unwrap();
1387
1388        // EXPRESS has no idempotency: even an identical re-issue conflicts.
1389        let req = make_request("StartExecution", &body.to_string());
1390        let err = expect_err(svc.start_execution(&req));
1391        assert!(err.to_string().contains("ExecutionAlreadyExists"));
1392    }
1393
1394    // ── DescribeExecution ──
1395
1396    #[tokio::test]
1397    async fn describe_execution_found() {
1398        let svc = StepFunctionsService::new(make_state());
1399        let sm_arn = create_sm(&svc, "desc-exec");
1400
1401        let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
1402        let req = make_request("StartExecution", &body.to_string());
1403        let resp = svc.start_execution(&req).unwrap();
1404        let exec_arn = body_json(&resp)["executionArn"]
1405            .as_str()
1406            .unwrap()
1407            .to_string();
1408
1409        let req = make_request(
1410            "DescribeExecution",
1411            &json!({"executionArn": exec_arn}).to_string(),
1412        );
1413        let resp = svc.describe_execution(&req).unwrap();
1414        let b = body_json(&resp);
1415        assert_eq!(b["name"], "e1");
1416        assert_eq!(b["status"], "RUNNING");
1417    }
1418
1419    #[tokio::test]
1420    async fn describe_execution_not_found() {
1421        let svc = StepFunctionsService::new(make_state());
1422        let req = make_request(
1423            "DescribeExecution",
1424            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1425                .to_string(),
1426        );
1427        let err = expect_err(svc.describe_execution(&req));
1428        assert!(err.to_string().contains("ExecutionDoesNotExist"));
1429    }
1430
1431    // ── StopExecution ──
1432
1433    #[tokio::test]
1434    async fn stop_execution() {
1435        let svc = StepFunctionsService::new(make_state());
1436        let sm_arn = create_sm(&svc, "stop-sm");
1437
1438        let body = json!({"stateMachineArn": sm_arn, "name": "stop-e"});
1439        let req = make_request("StartExecution", &body.to_string());
1440        let resp = svc.start_execution(&req).unwrap();
1441        let exec_arn = body_json(&resp)["executionArn"]
1442            .as_str()
1443            .unwrap()
1444            .to_string();
1445
1446        let body = json!({
1447            "executionArn": exec_arn,
1448            "error": "UserAborted",
1449            "cause": "test stop",
1450        });
1451        let req = make_request("StopExecution", &body.to_string());
1452        let resp = svc.stop_execution(&req).unwrap();
1453        let b = body_json(&resp);
1454        assert!(b["stopDate"].as_f64().is_some());
1455
1456        // Verify aborted
1457        let req = make_request(
1458            "DescribeExecution",
1459            &json!({"executionArn": exec_arn}).to_string(),
1460        );
1461        let resp = svc.describe_execution(&req).unwrap();
1462        let b = body_json(&resp);
1463        assert_eq!(b["status"], "ABORTED");
1464        assert_eq!(b["error"], "UserAborted");
1465    }
1466
1467    #[tokio::test]
1468    async fn stop_execution_not_found() {
1469        let svc = StepFunctionsService::new(make_state());
1470        let req = make_request(
1471            "StopExecution",
1472            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1473                .to_string(),
1474        );
1475        let err = expect_err(svc.stop_execution(&req));
1476        assert!(err.to_string().contains("ExecutionDoesNotExist"));
1477    }
1478
1479    // ── ListExecutions ──
1480
1481    #[tokio::test]
1482    async fn list_executions() {
1483        let svc = StepFunctionsService::new(make_state());
1484        let sm_arn = create_sm(&svc, "list-exec");
1485
1486        for i in 0..3 {
1487            let body = json!({"stateMachineArn": sm_arn, "name": format!("e{i}")});
1488            let req = make_request("StartExecution", &body.to_string());
1489            svc.start_execution(&req).unwrap();
1490        }
1491
1492        let req = make_request(
1493            "ListExecutions",
1494            &json!({"stateMachineArn": sm_arn}).to_string(),
1495        );
1496        let resp = svc.list_executions(&req).unwrap();
1497        let b = body_json(&resp);
1498        assert_eq!(b["executions"].as_array().unwrap().len(), 3);
1499    }
1500
1501    #[tokio::test]
1502    async fn list_executions_sm_not_found() {
1503        let svc = StepFunctionsService::new(make_state());
1504        let req = make_request(
1505            "ListExecutions",
1506            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1507                .to_string(),
1508        );
1509        let err = expect_err(svc.list_executions(&req));
1510        assert!(err.to_string().contains("StateMachineDoesNotExist"));
1511    }
1512
1513    // ── GetExecutionHistory ──
1514
1515    #[tokio::test]
1516    async fn get_execution_history_not_found() {
1517        let svc = StepFunctionsService::new(make_state());
1518        let req = make_request(
1519            "GetExecutionHistory",
1520            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1521                .to_string(),
1522        );
1523        let err = expect_err(svc.get_execution_history(&req));
1524        assert!(err.to_string().contains("ExecutionDoesNotExist"));
1525    }
1526
1527    // ── DescribeStateMachineForExecution ──
1528
1529    #[tokio::test]
1530    async fn describe_sm_for_execution() {
1531        let svc = StepFunctionsService::new(make_state());
1532        let sm_arn = create_sm(&svc, "sm-for-exec");
1533
1534        let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
1535        let req = make_request("StartExecution", &body.to_string());
1536        let resp = svc.start_execution(&req).unwrap();
1537        let exec_arn = body_json(&resp)["executionArn"]
1538            .as_str()
1539            .unwrap()
1540            .to_string();
1541
1542        let req = make_request(
1543            "DescribeStateMachineForExecution",
1544            &json!({"executionArn": exec_arn}).to_string(),
1545        );
1546        let resp = svc.describe_state_machine_for_execution(&req).unwrap();
1547        let b = body_json(&resp);
1548        assert_eq!(b["name"], "sm-for-exec");
1549    }
1550
1551    // ── Tags ──
1552
1553    #[test]
1554    fn tag_untag_list_tags() {
1555        let svc = StepFunctionsService::new(make_state());
1556        let arn = create_sm(&svc, "tagged-sm");
1557
1558        // Tag
1559        let body = json!({
1560            "resourceArn": arn,
1561            "tags": [{"key": "env", "value": "prod"}],
1562        });
1563        let req = make_request("TagResource", &body.to_string());
1564        svc.tag_resource(&req).unwrap();
1565
1566        // List
1567        let req = make_request(
1568            "ListTagsForResource",
1569            &json!({"resourceArn": arn}).to_string(),
1570        );
1571        let resp = svc.list_tags_for_resource(&req).unwrap();
1572        let b = body_json(&resp);
1573        let tags = b["tags"].as_array().unwrap();
1574        assert_eq!(tags.len(), 1);
1575        assert_eq!(tags[0]["key"], "env");
1576
1577        // Untag
1578        let body = json!({
1579            "resourceArn": arn,
1580            "tagKeys": ["env"],
1581        });
1582        let req = make_request("UntagResource", &body.to_string());
1583        svc.untag_resource(&req).unwrap();
1584
1585        // Verify empty
1586        let req = make_request(
1587            "ListTagsForResource",
1588            &json!({"resourceArn": arn}).to_string(),
1589        );
1590        let resp = svc.list_tags_for_resource(&req).unwrap();
1591        let b = body_json(&resp);
1592        assert!(b["tags"].as_array().unwrap().is_empty());
1593    }
1594
1595    #[test]
1596    fn tag_resource_not_found() {
1597        let svc = StepFunctionsService::new(make_state());
1598        let body = json!({
1599            "resourceArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1600            "tags": [{"key": "k", "value": "v"}],
1601        });
1602        let req = make_request("TagResource", &body.to_string());
1603        let err = expect_err(svc.tag_resource(&req));
1604        assert!(err.to_string().contains("ResourceNotFound"));
1605    }
1606
1607    // ── Helper function tests ──
1608
1609    #[test]
1610    fn test_validate_name() {
1611        assert!(validate_name("valid-name").is_ok());
1612        assert!(validate_name("under_score").is_ok());
1613        assert!(validate_name("").is_err());
1614        assert!(validate_name("has spaces").is_err());
1615        assert!(validate_name(&"a".repeat(81)).is_err());
1616    }
1617
1618    #[test]
1619    fn test_validate_definition() {
1620        assert!(validate_definition(VALID_DEF).is_ok());
1621        assert!(validate_definition("not json").is_err());
1622        assert!(validate_definition(r#"{"States":{}}"#).is_err()); // missing StartAt
1623        assert!(validate_definition(r#"{"StartAt":"S"}"#).is_err()); // missing States
1624    }
1625
1626    #[test]
1627    fn test_validate_definition_rejects_malformed_paths() {
1628        // Unterminated bracket in InputPath.
1629        let def =
1630            r#"{"StartAt":"P","States":{"P":{"Type":"Pass","InputPath":"$.arr[","End":true}}}"#;
1631        assert!(validate_definition(def).is_err());
1632
1633        // Multibyte char where the close bracket would be, in OutputPath.
1634        let def =
1635            "{\"StartAt\":\"P\",\"States\":{\"P\":{\"Type\":\"Pass\",\"OutputPath\":\"$.x[\u{00e9}\",\"End\":true}}}";
1636        assert!(validate_definition(def).is_err());
1637
1638        // Malformed Choice Variable.
1639        let def = r#"{"StartAt":"C","States":{"C":{"Type":"Choice","Choices":[{"Variable":"$.n[","NumericEquals":1,"Next":"P"}]},"P":{"Type":"Pass","End":true}}}"#;
1640        assert!(validate_definition(def).is_err());
1641
1642        // Path not rooted at $.
1643        let def =
1644            r#"{"StartAt":"P","States":{"P":{"Type":"Pass","InputPath":"foo.bar","End":true}}}"#;
1645        assert!(validate_definition(def).is_err());
1646
1647        // Empty index brackets.
1648        let def =
1649            r#"{"StartAt":"P","States":{"P":{"Type":"Pass","ResultPath":"$.x[]","End":true}}}"#;
1650        assert!(validate_definition(def).is_err());
1651    }
1652
1653    #[test]
1654    fn test_validate_definition_accepts_well_formed_paths() {
1655        // Valid reference paths, the literal "null" for Input/OutputPath, and
1656        // nested Choice combinators must all be accepted.
1657        let def = r#"{"StartAt":"P","States":{
1658            "P":{"Type":"Pass","InputPath":"$.a.b[0].c","OutputPath":"$","ResultPath":"$.out","Next":"C"},
1659            "C":{"Type":"Choice","Choices":[
1660                {"And":[{"Variable":"$.items[2]","NumericEquals":1},{"Variable":"$.flag","BooleanEquals":true}],"Next":"S"}
1661            ],"Default":"S"},
1662            "S":{"Type":"Succeed","InputPath":"null"}
1663        }}"#;
1664        assert!(validate_definition(def).is_ok());
1665    }
1666
1667    #[test]
1668    fn test_is_valid_reference_path() {
1669        assert!(is_valid_reference_path("$"));
1670        assert!(is_valid_reference_path("$.foo"));
1671        assert!(is_valid_reference_path("$.foo.bar[3].baz"));
1672        assert!(is_valid_reference_path("$[0]"));
1673        assert!(!is_valid_reference_path("$.arr["));
1674        assert!(!is_valid_reference_path("$.x[\u{00e9}"));
1675        assert!(!is_valid_reference_path("$.x[]"));
1676        assert!(!is_valid_reference_path("$.x[abc]"));
1677        assert!(!is_valid_reference_path("foo.bar"));
1678        assert!(!is_valid_reference_path(""));
1679    }
1680
1681    #[test]
1682    fn test_validate_arn() {
1683        assert!(validate_arn("arn:aws:states:us-east-1:123:sm:test").is_ok());
1684        assert!(validate_arn("not-an-arn").is_err());
1685    }
1686
1687    #[test]
1688    fn test_camel_to_details_key() {
1689        // All *StateEntered / *StateExited types collapse to one key each.
1690        assert_eq!(camel_to_details_key("PassStateEntered"), "stateEntered");
1691        assert_eq!(camel_to_details_key("TaskStateEntered"), "stateEntered");
1692        assert_eq!(camel_to_details_key("ChoiceStateExited"), "stateExited");
1693        assert_eq!(camel_to_details_key("MapStateExited"), "stateExited");
1694        // Other event types keep first-char-lowercased prefix.
1695        assert_eq!(camel_to_details_key("TaskScheduled"), "taskScheduled");
1696        assert_eq!(camel_to_details_key("ExecutionStarted"), "executionStarted");
1697        assert_eq!(camel_to_details_key(""), "");
1698    }
1699
1700    #[test]
1701    fn test_is_mutating_action() {
1702        assert!(is_mutating_action("CreateStateMachine"));
1703        assert!(is_mutating_action("StartExecution"));
1704        assert!(!is_mutating_action("DescribeStateMachine"));
1705        assert!(!is_mutating_action("ListStateMachines"));
1706    }
1707
1708    // ── StartSyncExecution ──
1709
1710    fn create_express_sm(svc: &StepFunctionsService, name: &str) -> String {
1711        let body = json!({
1712            "name": name,
1713            "definition": VALID_DEF,
1714            "roleArn": "arn:aws:iam::123456789012:role/test",
1715            "type": "EXPRESS",
1716        });
1717        let req = make_request("CreateStateMachine", &body.to_string());
1718        let resp = svc.create_state_machine(&req).unwrap();
1719        let b = body_json(&resp);
1720        b["stateMachineArn"].as_str().unwrap().to_string()
1721    }
1722
1723    #[tokio::test]
1724    async fn start_sync_execution_basic() {
1725        let svc = StepFunctionsService::new(make_state());
1726        let arn = create_express_sm(&svc, "sync-sm");
1727
1728        let body = json!({
1729            "stateMachineArn": arn,
1730            "input": r#"{"key":"value"}"#,
1731        });
1732        let req = make_request("StartSyncExecution", &body.to_string());
1733        let resp = svc.start_sync_execution(&req).await.unwrap();
1734        let b = body_json(&resp);
1735        assert!(b["executionArn"]
1736            .as_str()
1737            .unwrap()
1738            .contains("express:sync-sm"));
1739        assert_eq!(b["stateMachineArn"], arn);
1740        assert_eq!(b["status"], "SUCCEEDED");
1741        assert!(b["startDate"].as_i64().is_some());
1742        assert!(b["stopDate"].as_i64().is_some());
1743        assert!(b["output"].as_str().is_some());
1744        assert!(b["billingDetails"]["billedDurationInMilliseconds"]
1745            .as_i64()
1746            .is_some());
1747    }
1748
1749    #[tokio::test]
1750    async fn start_sync_execution_not_express() {
1751        let svc = StepFunctionsService::new(make_state());
1752        let arn = create_sm(&svc, "std-sm");
1753
1754        let body = json!({"stateMachineArn": arn});
1755        let req = make_request("StartSyncExecution", &body.to_string());
1756        let err = expect_err(svc.start_sync_execution(&req).await);
1757        assert!(err.to_string().contains("StateMachineTypeNotSupported"));
1758    }
1759
1760    #[tokio::test]
1761    async fn start_sync_execution_sm_not_found() {
1762        let svc = StepFunctionsService::new(make_state());
1763        let body = json!({
1764            "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1765        });
1766        let req = make_request("StartSyncExecution", &body.to_string());
1767        let err = expect_err(svc.start_sync_execution(&req).await);
1768        assert!(err.to_string().contains("StateMachineDoesNotExist"));
1769    }
1770
1771    #[tokio::test]
1772    async fn start_sync_execution_records_introspection_fields() {
1773        let svc = StepFunctionsService::new(make_state());
1774        let arn = create_express_sm(&svc, "sync-introspect");
1775
1776        let body = json!({"stateMachineArn": arn, "input": "{}"});
1777        let req = make_request("StartSyncExecution", &body.to_string());
1778        let resp = svc.start_sync_execution(&req).await.unwrap();
1779        let b = body_json(&resp);
1780        let exec_arn = b["executionArn"].as_str().unwrap().to_string();
1781
1782        let accounts = svc.state.read();
1783        let state = accounts.get("123456789012").unwrap();
1784        let stored = state
1785            .executions
1786            .get(&exec_arn)
1787            .expect("sync execution should be persisted for introspection");
1788        assert!(stored.is_sync, "sync executions must be marked is_sync");
1789        assert_eq!(stored.billed_memory_mb, Some(64));
1790        assert!(
1791            stored.billed_duration_ms.is_some(),
1792            "billed_duration_ms must be populated after sync run"
1793        );
1794        assert!(
1795            stored.parent_execution_arn.is_none(),
1796            "top-level sync execution has no parent"
1797        );
1798    }
1799
1800    #[tokio::test]
1801    async fn start_sync_execution_invalid_input() {
1802        let svc = StepFunctionsService::new(make_state());
1803        let arn = create_express_sm(&svc, "bad-input-sync");
1804
1805        let body = json!({
1806            "stateMachineArn": arn,
1807            "input": "not json",
1808        });
1809        let req = make_request("StartSyncExecution", &body.to_string());
1810        let err = expect_err(svc.start_sync_execution(&req).await);
1811        assert!(err.to_string().contains("InvalidExecutionInput"));
1812    }
1813
1814    /// No snapshot store (memory mode) -> no persist hook for the CFN provisioner.
1815    #[test]
1816    fn snapshot_hook_is_none_without_store() {
1817        let svc = StepFunctionsService::new(make_state());
1818        assert!(svc.snapshot_hook().is_none());
1819    }
1820
1821    /// With a store, the hook is present and invoking it runs the whole-state
1822    /// persist path the CloudFormation provisioner uses after mutating Step
1823    /// Functions state directly.
1824    #[tokio::test]
1825    async fn snapshot_hook_fires_with_store() {
1826        let store: Arc<dyn fakecloud_persistence::SnapshotStore> =
1827            Arc::new(fakecloud_persistence::MemorySnapshotStore::new());
1828        let svc = StepFunctionsService::new(make_state()).with_snapshot_store(store);
1829        let hook = svc
1830            .snapshot_hook()
1831            .expect("hook present when a store is set");
1832        hook().await;
1833    }
1834
1835    fn make_execution(arn: &str, status: ExecutionStatus) -> Execution {
1836        Execution {
1837            execution_arn: arn.to_string(),
1838            state_machine_arn: "arn:aws:states:us-east-1:123456789012:stateMachine:sm".to_string(),
1839            state_machine_name: "sm".to_string(),
1840            name: arn.to_string(),
1841            status,
1842            input: None,
1843            output: None,
1844            start_date: Utc::now(),
1845            stop_date: None,
1846            error: None,
1847            cause: None,
1848            history_events: vec![],
1849            parent_execution_arn: None,
1850            is_sync: false,
1851            billed_duration_ms: None,
1852            billed_memory_mb: None,
1853        }
1854    }
1855
1856    #[test]
1857    fn reconcile_aborts_running_executions_on_restart() {
1858        // After a restart a RUNNING execution has no interpreter driving it, so
1859        // it must be aborted rather than left RUNNING forever (0.A2). A
1860        // completed execution is untouched.
1861        let state = make_state();
1862        {
1863            let mut accounts = state.write();
1864            let s = accounts.get_or_create("123456789012");
1865            s.executions.insert(
1866                "running".into(),
1867                make_execution("running", ExecutionStatus::Running),
1868            );
1869            s.executions.insert(
1870                "done".into(),
1871                make_execution("done", ExecutionStatus::Succeeded),
1872            );
1873        }
1874
1875        let n = reconcile_interrupted_executions(&state);
1876        assert_eq!(n, 1, "only the RUNNING execution is reconciled");
1877
1878        let accounts = state.read();
1879        let s = accounts.get("123456789012").unwrap();
1880        let running = &s.executions["running"];
1881        assert_eq!(running.status, ExecutionStatus::Aborted);
1882        assert!(running.stop_date.is_some());
1883        assert_eq!(running.error.as_deref(), Some("Fakecloud.Restart"));
1884        assert_eq!(s.executions["done"].status, ExecutionStatus::Succeeded);
1885    }
1886}
1887
1888#[cfg(test)]
1889mod pagination_reject_test {
1890    #[test]
1891    fn paginate_checked_rejects_invalid_token() {
1892        use fakecloud_core::pagination::paginate_checked;
1893        let items: Vec<i32> = (0..5).collect();
1894        assert!(paginate_checked(&items, Some("bad"), 3).is_err());
1895        assert!(paginate_checked(&items, Some("2"), 3).is_ok());
1896    }
1897}