Skip to main content

fakecloud_stepfunctions/service/
mod.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use chrono::Utc;
6use http::StatusCode;
7use serde_json::{json, Value};
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::pagination::paginate_checked;
12use fakecloud_core::registry::ServiceRegistry;
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_core::validation::*;
15use fakecloud_dynamodb::SharedDynamoDbState;
16use fakecloud_persistence::SnapshotStore;
17
18use crate::interpreter;
19use crate::state::{
20    Execution, ExecutionStatus, SharedStepFunctionsState, StateMachine, StateMachineStatus,
21    StateMachineType, StepFunctionsSnapshot, StepFunctionsState,
22    STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
23};
24
25const SUPPORTED: &[&str] = &[
26    "CreateStateMachine",
27    "DescribeStateMachine",
28    "ListStateMachines",
29    "DeleteStateMachine",
30    "UpdateStateMachine",
31    "TagResource",
32    "UntagResource",
33    "ListTagsForResource",
34    "StartExecution",
35    "StopExecution",
36    "DescribeExecution",
37    "ListExecutions",
38    "GetExecutionHistory",
39    "DescribeStateMachineForExecution",
40    "CreateActivity",
41    "DeleteActivity",
42    "DescribeActivity",
43    "ListActivities",
44    "GetActivityTask",
45    "SendTaskFailure",
46    "SendTaskHeartbeat",
47    "SendTaskSuccess",
48    "PublishStateMachineVersion",
49    "DeleteStateMachineVersion",
50    "ListStateMachineVersions",
51    "CreateStateMachineAlias",
52    "DeleteStateMachineAlias",
53    "DescribeStateMachineAlias",
54    "ListStateMachineAliases",
55    "UpdateStateMachineAlias",
56    "DescribeMapRun",
57    "ListMapRuns",
58    "UpdateMapRun",
59    "RedriveExecution",
60    "StartSyncExecution",
61    "TestState",
62    "ValidateStateMachineDefinition",
63];
64
65/// Handle to the central service registry, set by `main.rs` after every service
66/// has been registered. Wrapped in `OnceLock` so `StepFunctionsService` can be
67/// constructed (and registered into the very registry it later reads back) before
68/// the registry itself is finalized. The interpreter snapshots the inner `Arc`
69/// when it needs to dispatch generic `aws-sdk:*` Task integrations.
70pub type SharedServiceRegistry = Arc<std::sync::OnceLock<Arc<ServiceRegistry>>>;
71
72pub struct StepFunctionsService {
73    state: SharedStepFunctionsState,
74    delivery: Option<Arc<DeliveryBus>>,
75    dynamodb_state: Option<SharedDynamoDbState>,
76    registry: Option<SharedServiceRegistry>,
77    snapshot_store: Option<Arc<dyn SnapshotStore>>,
78    snapshot_lock: Arc<AsyncMutex<()>>,
79}
80
81mod activities;
82mod executions;
83mod map_runs;
84mod state_machines;
85mod tags;
86mod tasks;
87mod validation;
88
89impl StepFunctionsService {
90    pub fn new(state: SharedStepFunctionsState) -> Self {
91        Self {
92            state,
93            delivery: None,
94            dynamodb_state: None,
95            registry: None,
96            snapshot_store: None,
97            snapshot_lock: Arc::new(AsyncMutex::new(())),
98        }
99    }
100
101    pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
102        self.delivery = Some(delivery);
103        self
104    }
105
106    pub fn with_dynamodb(mut self, dynamodb_state: SharedDynamoDbState) -> Self {
107        self.dynamodb_state = Some(dynamodb_state);
108        self
109    }
110
111    /// Hand the service a deferred-fill handle to the central [`ServiceRegistry`].
112    /// `main.rs` calls [`OnceLock::set`] on the inner cell after every service
113    /// has been registered; until then the interpreter falls back to its
114    /// hand-coded SDK integrations (lambda invoke, sqs sendMessage, …).
115    pub fn with_registry(mut self, registry: SharedServiceRegistry) -> Self {
116        self.registry = Some(registry);
117        self
118    }
119
120    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
121        self.snapshot_store = Some(store);
122        self
123    }
124
125    async fn save_snapshot(&self) {
126        let Some(store) = self.snapshot_store.clone() else {
127            return;
128        };
129        let _guard = self.snapshot_lock.lock().await;
130        let snapshot = StepFunctionsSnapshot {
131            schema_version: STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
132            state: None,
133            accounts: Some(self.state.read().clone()),
134        };
135        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
136            let bytes = serde_json::to_vec(&snapshot)
137                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
138            store.save(&bytes)
139        })
140        .await;
141        match join {
142            Ok(Ok(())) => {}
143            Ok(Err(err)) => tracing::error!(%err, "failed to write stepfunctions snapshot"),
144            Err(err) => tracing::error!(%err, "stepfunctions snapshot task panicked"),
145        }
146    }
147}
148
149fn is_mutating_action(action: &str) -> bool {
150    matches!(
151        action,
152        "CreateStateMachine"
153            | "DeleteStateMachine"
154            | "UpdateStateMachine"
155            | "TagResource"
156            | "UntagResource"
157            | "StartExecution"
158            | "StopExecution"
159            | "CreateActivity"
160            | "DeleteActivity"
161            | "GetActivityTask"
162            | "SendTaskFailure"
163            | "SendTaskHeartbeat"
164            | "SendTaskSuccess"
165            | "PublishStateMachineVersion"
166            | "DeleteStateMachineVersion"
167            | "CreateStateMachineAlias"
168            | "DeleteStateMachineAlias"
169            | "UpdateStateMachineAlias"
170            | "UpdateMapRun"
171            | "RedriveExecution"
172            | "StartSyncExecution"
173    )
174}
175
176#[async_trait]
177impl AwsService for StepFunctionsService {
178    fn service_name(&self) -> &str {
179        "states"
180    }
181
182    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
183        let mutates = is_mutating_action(req.action.as_str());
184        let result = match req.action.as_str() {
185            "CreateStateMachine" => self.create_state_machine(&req),
186            "DescribeStateMachine" => self.describe_state_machine(&req),
187            "ListStateMachines" => self.list_state_machines(&req),
188            "DeleteStateMachine" => self.delete_state_machine(&req),
189            "UpdateStateMachine" => self.update_state_machine(&req),
190            "TagResource" => self.tag_resource(&req),
191            "UntagResource" => self.untag_resource(&req),
192            "ListTagsForResource" => self.list_tags_for_resource(&req),
193            "StartExecution" => self.start_execution(&req),
194            "StopExecution" => self.stop_execution(&req),
195            "DescribeExecution" => self.describe_execution(&req),
196            "ListExecutions" => self.list_executions(&req),
197            "GetExecutionHistory" => self.get_execution_history(&req),
198            "DescribeStateMachineForExecution" => self.describe_state_machine_for_execution(&req),
199            "CreateActivity" => self.create_activity(&req),
200            "DeleteActivity" => self.delete_activity(&req),
201            "DescribeActivity" => self.describe_activity(&req),
202            "ListActivities" => self.list_activities(&req),
203            "GetActivityTask" => self.get_activity_task(&req).await,
204            "SendTaskFailure" => self.send_task_failure(&req),
205            "SendTaskHeartbeat" => self.send_task_heartbeat(&req),
206            "SendTaskSuccess" => self.send_task_success(&req),
207            "PublishStateMachineVersion" => self.publish_state_machine_version(&req),
208            "DeleteStateMachineVersion" => self.delete_state_machine_version(&req),
209            "ListStateMachineVersions" => self.list_state_machine_versions(&req),
210            "CreateStateMachineAlias" => self.create_state_machine_alias(&req),
211            "DeleteStateMachineAlias" => self.delete_state_machine_alias(&req),
212            "DescribeStateMachineAlias" => self.describe_state_machine_alias(&req),
213            "ListStateMachineAliases" => self.list_state_machine_aliases(&req),
214            "UpdateStateMachineAlias" => self.update_state_machine_alias(&req),
215            "DescribeMapRun" => self.describe_map_run(&req),
216            "ListMapRuns" => self.list_map_runs(&req),
217            "UpdateMapRun" => self.update_map_run(&req),
218            "RedriveExecution" => self.redrive_execution(&req),
219            "StartSyncExecution" => self.start_sync_execution(&req).await,
220            "TestState" => self.test_state(&req),
221            "ValidateStateMachineDefinition" => self.validate_state_machine_definition(&req),
222            _ => Err(AwsServiceError::action_not_implemented(
223                "states",
224                &req.action,
225            )),
226        };
227        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
228            self.save_snapshot().await;
229        }
230        result
231    }
232
233    fn supported_actions(&self) -> &[&str] {
234        SUPPORTED
235    }
236}
237
238impl StepFunctionsService {
239    fn list_activities(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
240        let body = req.json_body();
241        let raw_max_results = body["maxResults"].as_i64();
242        if let Some(mr) = raw_max_results {
243            validate_max_results(mr)?;
244        }
245        let next_token = body["nextToken"].as_str();
246        if let Some(t) = next_token {
247            validate_page_token(t)?;
248        }
249        // Smithy default: maxResults=0 means "use the service default"
250        // (100 for Step Functions), which we honour by treating both
251        // `None` and `0` as 100.
252        let max_results = match raw_max_results.unwrap_or(0) {
253            0 => 100,
254            n => n as usize,
255        };
256        let accounts = self.state.read();
257        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
258        let state = accounts.get(&req.account_id).unwrap_or(&empty);
259        let mut activities: Vec<&crate::state::Activity> = state.activities.values().collect();
260        activities.sort_by(|a, b| a.name.cmp(&b.name));
261        let items: Vec<Value> = activities
262            .iter()
263            .map(|a| {
264                json!({
265                    "activityArn": a.arn,
266                    "name": a.name,
267                    "creationDate": a.creation_date.timestamp(),
268                })
269            })
270            .collect();
271        let (page, token) =
272            paginate_checked(&items, next_token, max_results).map_err(|_| invalid_token())?;
273        let mut resp = json!({ "activities": page });
274        if let Some(t) = token {
275            resp["nextToken"] = json!(t);
276        }
277        Ok(AwsResponse::ok_json(resp))
278    }
279}
280
281fn state_machine_alias_to_json(alias: &crate::state::StateMachineAlias) -> Value {
282    json!({
283        "stateMachineAliasArn": alias.arn,
284        "name": alias.name,
285        "description": alias.description,
286        "routingConfiguration": alias.routing_configuration.iter().map(|r| json!({
287            "stateMachineVersionArn": r.state_machine_version_arn,
288            "weight": r.weight,
289        })).collect::<Vec<_>>(),
290        "creationDate": alias.creation_date.timestamp(),
291        "updateDate": alias.update_date.timestamp(),
292    })
293}
294
295fn map_run_to_json(mr: &crate::state::MapRun) -> Value {
296    json!({
297        "mapRunArn": mr.map_run_arn,
298        "executionArn": mr.execution_arn,
299        "maxConcurrency": mr.max_concurrency,
300        "toleratedFailurePercentage": mr.tolerated_failure_percentage,
301        "toleratedFailureCount": mr.tolerated_failure_count,
302        "status": mr.status,
303        "startDate": mr.start_date.timestamp(),
304        "stopDate": mr.stop_date.map(|d| d.timestamp()),
305    })
306}
307
308// ─── Helpers ────────────────────────────────────────────────────────────
309
310fn state_machine_to_json(sm: &StateMachine) -> Value {
311    let mut resp = json!({
312        "name": sm.name,
313        "stateMachineArn": sm.arn,
314        "definition": sm.definition,
315        "roleArn": sm.role_arn,
316        "type": sm.machine_type.as_str(),
317        "status": sm.status.as_str(),
318        "creationDate": sm.creation_date.timestamp() as f64,
319        "updateDate": sm.update_date.timestamp() as f64,
320        "revisionId": sm.revision_id,
321        "label": sm.name,
322    });
323
324    if !sm.description.is_empty() {
325        resp["description"] = json!(sm.description);
326    }
327
328    if let Some(ref logging) = sm.logging_configuration {
329        resp["loggingConfiguration"] = logging.clone();
330    } else {
331        resp["loggingConfiguration"] = json!({
332            "level": "OFF",
333            "includeExecutionData": false,
334            "destinations": [],
335        });
336    }
337
338    if let Some(ref tracing) = sm.tracing_configuration {
339        resp["tracingConfiguration"] = tracing.clone();
340    } else {
341        resp["tracingConfiguration"] = json!({
342            "enabled": false,
343        });
344    }
345
346    resp
347}
348
349fn missing(name: &str) -> AwsServiceError {
350    AwsServiceError::aws_error(
351        StatusCode::BAD_REQUEST,
352        "ValidationException",
353        format!("The request must contain the parameter {name}."),
354    )
355}
356
357fn state_machine_not_found(arn: &str) -> AwsServiceError {
358    AwsServiceError::aws_error(
359        StatusCode::BAD_REQUEST,
360        "StateMachineDoesNotExist",
361        format!("State Machine Does Not Exist: '{arn}'"),
362    )
363}
364
365fn activity_not_found(arn: &str) -> AwsServiceError {
366    AwsServiceError::aws_error(
367        StatusCode::BAD_REQUEST,
368        "ActivityDoesNotExist",
369        format!("Activity does not exist: {arn}"),
370    )
371}
372
373fn task_does_not_exist(token: &str) -> AwsServiceError {
374    AwsServiceError::aws_error(
375        StatusCode::BAD_REQUEST,
376        "TaskDoesNotExist",
377        format!("Task does not exist: {token}"),
378    )
379}
380
381fn resource_not_found(arn: &str) -> AwsServiceError {
382    AwsServiceError::aws_error(
383        StatusCode::BAD_REQUEST,
384        "ResourceNotFound",
385        format!("Resource not found: '{arn}'"),
386    )
387}
388
389/// Parse + validate an alias `routingConfiguration` array.
390///
391/// AWS rules: 1 or 2 routes; weights are 0-100 and sum to 100; each
392/// route must include `stateMachineVersionArn`.
393fn parse_routing_configuration(
394    routes: &[serde_json::Value],
395) -> Result<Vec<crate::state::AliasRoute>, AwsServiceError> {
396    if routes.is_empty() || routes.len() > 2 {
397        return Err(AwsServiceError::aws_error(
398            StatusCode::BAD_REQUEST,
399            "ValidationException",
400            "routingConfiguration must contain 1 or 2 routes.",
401        ));
402    }
403    let parsed: Vec<crate::state::AliasRoute> = routes
404        .iter()
405        .map(|r| {
406            let arn = r["stateMachineVersionArn"].as_str().ok_or_else(|| {
407                AwsServiceError::aws_error(
408                    StatusCode::BAD_REQUEST,
409                    "ValidationException",
410                    "routingConfiguration entries must contain stateMachineVersionArn.",
411                )
412            })?;
413            let weight = r["weight"].as_i64().ok_or_else(|| {
414                AwsServiceError::aws_error(
415                    StatusCode::BAD_REQUEST,
416                    "ValidationException",
417                    "routingConfiguration entries must contain a numeric weight.",
418                )
419            })?;
420            if !(0..=100).contains(&weight) {
421                return Err(AwsServiceError::aws_error(
422                    StatusCode::BAD_REQUEST,
423                    "ValidationException",
424                    format!("Invalid routing weight {weight}; must be 0-100."),
425                ));
426            }
427            Ok(crate::state::AliasRoute {
428                state_machine_version_arn: arn.to_string(),
429                weight: weight as i32,
430            })
431        })
432        .collect::<Result<_, _>>()?;
433    let total: i32 = parsed.iter().map(|r| r.weight).sum();
434    if total != 100 {
435        return Err(AwsServiceError::aws_error(
436            StatusCode::BAD_REQUEST,
437            "ValidationException",
438            format!("routingConfiguration weights must sum to 100, got {total}."),
439        ));
440    }
441    Ok(parsed)
442}
443
444fn validate_name(name: &str) -> Result<(), AwsServiceError> {
445    if name.is_empty() || name.len() > 80 {
446        return Err(AwsServiceError::aws_error(
447            StatusCode::BAD_REQUEST,
448            "InvalidName",
449            format!("Invalid Name: '{name}' (length must be between 1 and 80 characters)"),
450        ));
451    }
452    // Only allow alphanumeric, hyphens, and underscores
453    if !name
454        .chars()
455        .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
456    {
457        return Err(AwsServiceError::aws_error(
458            StatusCode::BAD_REQUEST,
459            "InvalidName",
460            format!(
461                "Invalid Name: '{name}' (must only contain alphanumeric characters, hyphens, and underscores)"
462            ),
463        ));
464    }
465    Ok(())
466}
467
468fn validate_definition(definition: &str) -> Result<(), AwsServiceError> {
469    let parsed: Value = serde_json::from_str(definition).map_err(|e| {
470        AwsServiceError::aws_error(
471            StatusCode::BAD_REQUEST,
472            "InvalidDefinition",
473            format!("Invalid State Machine Definition: '{e}'"),
474        )
475    })?;
476
477    if parsed.get("StartAt").and_then(|v| v.as_str()).is_none() {
478        return Err(AwsServiceError::aws_error(
479            StatusCode::BAD_REQUEST,
480            "InvalidDefinition",
481            "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
482                .to_string(),
483        ));
484    }
485
486    let states_obj = parsed
487        .get("States")
488        .and_then(|v| v.as_object())
489        .ok_or_else(|| {
490            AwsServiceError::aws_error(
491                StatusCode::BAD_REQUEST,
492                "InvalidDefinition",
493                "Invalid State Machine Definition: 'MISSING_STATES' (States field is required)"
494                    .to_string(),
495            )
496        })?;
497
498    let start_at = parsed["StartAt"].as_str().ok_or_else(|| {
499        AwsServiceError::aws_error(
500            StatusCode::BAD_REQUEST,
501            "InvalidDefinition",
502            "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
503                .to_string(),
504        )
505    })?;
506    if !states_obj.contains_key(start_at) {
507        return Err(AwsServiceError::aws_error(
508            StatusCode::BAD_REQUEST,
509            "InvalidDefinition",
510            format!(
511                "Invalid State Machine Definition: 'MISSING_TRANSITION_TARGET' \
512                 (StartAt '{start_at}' does not reference a valid state)"
513            ),
514        ));
515    }
516
517    Ok(())
518}
519
520fn execution_not_found(arn: &str) -> AwsServiceError {
521    AwsServiceError::aws_error(
522        StatusCode::BAD_REQUEST,
523        "ExecutionDoesNotExist",
524        format!("Execution Does Not Exist: '{arn}'"),
525    )
526}
527
528fn execution_to_json(exec: &Execution) -> Value {
529    let mut resp = json!({
530        "executionArn": exec.execution_arn,
531        "stateMachineArn": exec.state_machine_arn,
532        "name": exec.name,
533        "status": exec.status.as_str(),
534        "startDate": exec.start_date.timestamp() as f64,
535    });
536
537    if let Some(ref input) = exec.input {
538        resp["input"] = json!(input);
539    }
540    if let Some(ref output) = exec.output {
541        resp["output"] = json!(output);
542    }
543    if let Some(stop) = exec.stop_date {
544        resp["stopDate"] = json!(stop.timestamp() as f64);
545    }
546    if let Some(ref error) = exec.error {
547        resp["error"] = json!(error);
548    }
549    if let Some(ref cause) = exec.cause {
550        resp["cause"] = json!(cause);
551    }
552
553    resp
554}
555
556/// Convert event type like "PassStateEntered" to the details key format "passStateEntered".
557fn camel_to_details_key(event_type: &str) -> String {
558    let mut chars = event_type.chars();
559    match chars.next() {
560        None => String::new(),
561        Some(c) => c.to_lowercase().to_string() + chars.as_str(),
562    }
563}
564
565fn validate_arn(arn: &str) -> Result<(), AwsServiceError> {
566    if !arn.starts_with("arn:") {
567        return Err(AwsServiceError::aws_error(
568            StatusCode::BAD_REQUEST,
569            "InvalidArn",
570            format!("Invalid Arn: '{arn}'"),
571        ));
572    }
573    Ok(())
574}
575
576/// Enforce the Smithy @length on an ARN-typed field (`Arn` = 1..=256,
577/// `LongArn` = 1..=2000). Empty / oversize ARNs map to `InvalidArn`, which
578/// every ARN-bearing Step Functions operation declares.
579fn validate_arn_length(field: &str, value: &str, max: usize) -> Result<(), AwsServiceError> {
580    if value.is_empty() || value.len() > max {
581        return Err(AwsServiceError::aws_error(
582            StatusCode::BAD_REQUEST,
583            "InvalidArn",
584            format!("Invalid Arn at '{field}': must be 1..={max} characters"),
585        ));
586    }
587    Ok(())
588}
589
590/// Shared error for a malformed (unparseable) `nextToken` reaching the
591/// pagination helper — `InvalidToken` is the only pagination error code
592/// declared on every list op.
593pub(super) fn invalid_token() -> AwsServiceError {
594    AwsServiceError::aws_error(StatusCode::BAD_REQUEST, "InvalidToken", "Invalid nextToken")
595}
596
597/// `nextToken` is declared as `PageToken` (length 1..=1024). The only
598/// error code declared on every paginated list op is `InvalidToken`, so
599/// route both empty and oversize tokens through it.
600fn validate_page_token(value: &str) -> Result<(), AwsServiceError> {
601    if value.is_empty() || value.len() > 1024 {
602        return Err(AwsServiceError::aws_error(
603            StatusCode::BAD_REQUEST,
604            "InvalidToken",
605            "nextToken must be 1..=1024 characters",
606        ));
607    }
608    Ok(())
609}
610
611/// `maxResults` is typed as `PageSize` (range 0..=1000). The negative
612/// probes flip below-min / above-max; since the only error declared on
613/// `ListActivities` is `InvalidToken`, we map page-size violations to
614/// the same code (matches how AWS surfaces malformed pagination input
615/// for ops that don't model a separate `ValidationException`).
616fn validate_max_results(value: i64) -> Result<(), AwsServiceError> {
617    if !(0..=1000).contains(&value) {
618        return Err(AwsServiceError::aws_error(
619            StatusCode::BAD_REQUEST,
620            "InvalidToken",
621            format!("maxResults '{value}' is outside 0..=1000"),
622        ));
623    }
624    Ok(())
625}
626
627/// Start a Step Functions execution from a cross-service delivery (e.g. EventBridge).
628///
629/// This is the public entry point used by `StepFunctionsDeliveryImpl` in the server crate.
630/// It mirrors the logic from `StartExecution` but without the AWS request/response wrapper.
631/// Start a Step Functions execution from a cross-service delivery (e.g. EventBridge).
632///
633/// This is the public entry point used by `StepFunctionsDeliveryImpl` in the server crate.
634/// It mirrors the logic from `StartExecution` but without the AWS request/response wrapper.
635pub fn start_execution_from_delivery(
636    state: &SharedStepFunctionsState,
637    delivery: &Option<Arc<DeliveryBus>>,
638    dynamodb_state: &Option<SharedDynamoDbState>,
639    registry: &Option<SharedServiceRegistry>,
640    state_machine_arn: &str,
641    input: &str,
642) {
643    // Validate input is valid JSON
644    if serde_json::from_str::<serde_json::Value>(input).is_err() {
645        tracing::warn!(
646            state_machine_arn,
647            "Step Functions delivery: invalid JSON input, skipping execution"
648        );
649        return;
650    }
651
652    let execution_name = uuid::Uuid::new_v4().to_string();
653
654    // Extract account_id from the state machine ARN
655    let account_id = state_machine_arn
656        .split(':')
657        .nth(4)
658        .unwrap_or("000000000000")
659        .to_string();
660
661    let mut accounts = state.write();
662    let st = accounts.get_or_create(&account_id);
663    let sm = match st.state_machines.get(state_machine_arn) {
664        Some(sm) => sm,
665        None => {
666            tracing::warn!(
667                state_machine_arn,
668                "Step Functions delivery: state machine not found"
669            );
670            return;
671        }
672    };
673
674    let sm_name = sm.name.clone();
675    let definition = sm.definition.clone();
676    let exec_arn = st.execution_arn(&sm_name, &execution_name);
677
678    let now = Utc::now();
679    let execution = Execution {
680        execution_arn: exec_arn.clone(),
681        state_machine_arn: state_machine_arn.to_string(),
682        state_machine_name: sm_name,
683        name: execution_name,
684        status: ExecutionStatus::Running,
685        input: Some(input.to_string()),
686        output: None,
687        start_date: now,
688        stop_date: None,
689        error: None,
690        cause: None,
691        history_events: vec![],
692        parent_execution_arn: None,
693        is_sync: false,
694        billed_duration_ms: None,
695        billed_memory_mb: None,
696    };
697
698    st.executions.insert(exec_arn.clone(), execution);
699    let logging_config = sm.logging_configuration.clone();
700    drop(accounts);
701
702    let shared_state = state.clone();
703    let delivery = delivery.clone();
704    let dynamodb_state = dynamodb_state.clone();
705    let registry = registry.clone();
706    let input = Some(input.to_string());
707    tokio::spawn(async move {
708        interpreter::execute_state_machine(
709            shared_state,
710            exec_arn,
711            definition,
712            input,
713            delivery,
714            dynamodb_state,
715            registry,
716            logging_config,
717        )
718        .await;
719    });
720}
721
722#[cfg(test)]
723mod tests {
724    use super::*;
725    use http::{HeaderMap, Method};
726    use parking_lot::RwLock;
727    use serde_json::Value;
728    use std::collections::HashMap;
729    use std::sync::Arc;
730
731    fn make_state() -> SharedStepFunctionsState {
732        Arc::new(RwLock::new(
733            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
734        ))
735    }
736
737    fn make_request(action: &str, body: &str) -> AwsRequest {
738        AwsRequest {
739            service: "states".to_string(),
740            action: action.to_string(),
741            region: "us-east-1".to_string(),
742            account_id: "123456789012".to_string(),
743            request_id: "test-id".to_string(),
744            headers: HeaderMap::new(),
745            query_params: HashMap::new(),
746            body: body.as_bytes().to_vec().into(),
747            body_stream: parking_lot::Mutex::new(None),
748            path_segments: vec![],
749            raw_path: "/".to_string(),
750            raw_query: String::new(),
751            method: Method::POST,
752            is_query_protocol: false,
753            access_key_id: None,
754            principal: None,
755        }
756    }
757
758    fn body_json(resp: &AwsResponse) -> Value {
759        serde_json::from_slice(resp.body.expect_bytes()).unwrap()
760    }
761
762    fn expect_err(result: Result<AwsResponse, AwsServiceError>) -> AwsServiceError {
763        match result {
764            Err(e) => e,
765            Ok(_) => panic!("expected error, got Ok"),
766        }
767    }
768
769    const VALID_DEF: &str = r#"{"StartAt":"Pass","States":{"Pass":{"Type":"Pass","End":true}}}"#;
770
771    fn create_sm(svc: &StepFunctionsService, name: &str) -> String {
772        let body = json!({
773            "name": name,
774            "definition": VALID_DEF,
775            "roleArn": "arn:aws:iam::123456789012:role/test",
776        });
777        let req = make_request("CreateStateMachine", &body.to_string());
778        let resp = svc.create_state_machine(&req).unwrap();
779        let b = body_json(&resp);
780        b["stateMachineArn"].as_str().unwrap().to_string()
781    }
782
783    // ── CreateStateMachine ──
784
785    #[test]
786    fn create_state_machine_basic() {
787        let svc = StepFunctionsService::new(make_state());
788        let arn = create_sm(&svc, "test-sm");
789        assert!(arn.contains("test-sm"));
790    }
791
792    #[test]
793    fn create_state_machine_with_express_type() {
794        let svc = StepFunctionsService::new(make_state());
795        let body = json!({
796            "name": "express-sm",
797            "definition": VALID_DEF,
798            "roleArn": "arn:aws:iam::123456789012:role/r",
799            "type": "EXPRESS",
800        });
801        let req = make_request("CreateStateMachine", &body.to_string());
802        let resp = svc.create_state_machine(&req).unwrap();
803        let b = body_json(&resp);
804        assert!(b["stateMachineArn"].as_str().is_some());
805    }
806
807    #[test]
808    fn create_state_machine_duplicate_fails() {
809        let svc = StepFunctionsService::new(make_state());
810        create_sm(&svc, "dup-sm");
811        let body = json!({
812            "name": "dup-sm",
813            "definition": VALID_DEF,
814            "roleArn": "arn:aws:iam::123456789012:role/r",
815        });
816        let req = make_request("CreateStateMachine", &body.to_string());
817        let err = expect_err(svc.create_state_machine(&req));
818        assert!(err.to_string().contains("StateMachineAlreadyExists"));
819    }
820
821    #[test]
822    fn create_state_machine_missing_name() {
823        let svc = StepFunctionsService::new(make_state());
824        let body = json!({
825            "definition": VALID_DEF,
826            "roleArn": "arn:aws:iam::123456789012:role/r",
827        });
828        let req = make_request("CreateStateMachine", &body.to_string());
829        assert!(svc.create_state_machine(&req).is_err());
830    }
831
832    #[test]
833    fn create_state_machine_invalid_definition() {
834        let svc = StepFunctionsService::new(make_state());
835        let body = json!({
836            "name": "bad-def",
837            "definition": "not json",
838            "roleArn": "arn:aws:iam::123456789012:role/r",
839        });
840        let req = make_request("CreateStateMachine", &body.to_string());
841        let err = expect_err(svc.create_state_machine(&req));
842        assert!(err.to_string().contains("InvalidDefinition"));
843    }
844
845    #[test]
846    fn create_state_machine_definition_missing_start_at() {
847        let svc = StepFunctionsService::new(make_state());
848        let body = json!({
849            "name": "no-start",
850            "definition": r#"{"States":{"S":{"Type":"Pass","End":true}}}"#,
851            "roleArn": "arn:aws:iam::123456789012:role/r",
852        });
853        let req = make_request("CreateStateMachine", &body.to_string());
854        let err = expect_err(svc.create_state_machine(&req));
855        assert!(err.to_string().contains("InvalidDefinition"));
856    }
857
858    #[test]
859    fn create_state_machine_definition_missing_states() {
860        let svc = StepFunctionsService::new(make_state());
861        let body = json!({
862            "name": "no-states",
863            "definition": r#"{"StartAt":"S"}"#,
864            "roleArn": "arn:aws:iam::123456789012:role/r",
865        });
866        let req = make_request("CreateStateMachine", &body.to_string());
867        let err = expect_err(svc.create_state_machine(&req));
868        assert!(err.to_string().contains("InvalidDefinition"));
869    }
870
871    #[test]
872    fn create_state_machine_definition_start_at_not_in_states() {
873        let svc = StepFunctionsService::new(make_state());
874        let body = json!({
875            "name": "bad-start",
876            "definition": r#"{"StartAt":"Missing","States":{"S":{"Type":"Pass","End":true}}}"#,
877            "roleArn": "arn:aws:iam::123456789012:role/r",
878        });
879        let req = make_request("CreateStateMachine", &body.to_string());
880        let err = expect_err(svc.create_state_machine(&req));
881        assert!(err.to_string().contains("MISSING_TRANSITION_TARGET"));
882    }
883
884    #[test]
885    fn create_state_machine_invalid_type() {
886        let svc = StepFunctionsService::new(make_state());
887        let body = json!({
888            "name": "bad-type",
889            "definition": VALID_DEF,
890            "roleArn": "arn:aws:iam::123456789012:role/r",
891            "type": "INVALID",
892        });
893        let req = make_request("CreateStateMachine", &body.to_string());
894        assert!(svc.create_state_machine(&req).is_err());
895    }
896
897    #[test]
898    fn create_state_machine_invalid_arn() {
899        let svc = StepFunctionsService::new(make_state());
900        let body = json!({
901            "name": "bad-arn",
902            "definition": VALID_DEF,
903            "roleArn": "not-an-arn",
904        });
905        let req = make_request("CreateStateMachine", &body.to_string());
906        let err = expect_err(svc.create_state_machine(&req));
907        assert!(err.to_string().contains("InvalidArn"));
908    }
909
910    #[test]
911    fn create_state_machine_invalid_name() {
912        let svc = StepFunctionsService::new(make_state());
913        let body = json!({
914            "name": "has spaces!",
915            "definition": VALID_DEF,
916            "roleArn": "arn:aws:iam::123456789012:role/r",
917        });
918        let req = make_request("CreateStateMachine", &body.to_string());
919        let err = expect_err(svc.create_state_machine(&req));
920        assert!(err.to_string().contains("InvalidName"));
921    }
922
923    #[test]
924    fn create_state_machine_name_too_long() {
925        let svc = StepFunctionsService::new(make_state());
926        let long_name = "a".repeat(81);
927        let body = json!({
928            "name": long_name,
929            "definition": VALID_DEF,
930            "roleArn": "arn:aws:iam::123456789012:role/r",
931        });
932        let req = make_request("CreateStateMachine", &body.to_string());
933        let err = expect_err(svc.create_state_machine(&req));
934        assert!(err.to_string().contains("InvalidName"));
935    }
936
937    // ── DescribeStateMachine ──
938
939    #[test]
940    fn describe_state_machine_found() {
941        let svc = StepFunctionsService::new(make_state());
942        let arn = create_sm(&svc, "desc-sm");
943
944        let req = make_request(
945            "DescribeStateMachine",
946            &json!({"stateMachineArn": arn}).to_string(),
947        );
948        let resp = svc.describe_state_machine(&req).unwrap();
949        let b = body_json(&resp);
950        assert_eq!(b["name"], "desc-sm");
951        assert_eq!(b["status"], "ACTIVE");
952        assert!(b["definition"].as_str().is_some());
953    }
954
955    #[test]
956    fn describe_state_machine_not_found() {
957        let svc = StepFunctionsService::new(make_state());
958        let req = make_request(
959            "DescribeStateMachine",
960            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
961                .to_string(),
962        );
963        let err = expect_err(svc.describe_state_machine(&req));
964        assert!(err.to_string().contains("StateMachineDoesNotExist"));
965    }
966
967    // ── ListStateMachines ──
968
969    #[test]
970    fn list_state_machines_empty() {
971        let svc = StepFunctionsService::new(make_state());
972        let req = make_request("ListStateMachines", "{}");
973        let resp = svc.list_state_machines(&req).unwrap();
974        let b = body_json(&resp);
975        assert!(b["stateMachines"].as_array().unwrap().is_empty());
976    }
977
978    #[test]
979    fn list_state_machines_returns_created() {
980        let svc = StepFunctionsService::new(make_state());
981        create_sm(&svc, "sm-1");
982        create_sm(&svc, "sm-2");
983
984        let req = make_request("ListStateMachines", "{}");
985        let resp = svc.list_state_machines(&req).unwrap();
986        let b = body_json(&resp);
987        assert_eq!(b["stateMachines"].as_array().unwrap().len(), 2);
988    }
989
990    // ── DeleteStateMachine ──
991
992    #[test]
993    fn delete_state_machine() {
994        let svc = StepFunctionsService::new(make_state());
995        let arn = create_sm(&svc, "del-sm");
996
997        let req = make_request(
998            "DeleteStateMachine",
999            &json!({"stateMachineArn": arn}).to_string(),
1000        );
1001        svc.delete_state_machine(&req).unwrap();
1002
1003        // Describe should fail
1004        let req = make_request(
1005            "DescribeStateMachine",
1006            &json!({"stateMachineArn": arn}).to_string(),
1007        );
1008        assert!(svc.describe_state_machine(&req).is_err());
1009    }
1010
1011    #[test]
1012    fn delete_state_machine_nonexistent_succeeds() {
1013        let svc = StepFunctionsService::new(make_state());
1014        let req = make_request(
1015            "DeleteStateMachine",
1016            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1017                .to_string(),
1018        );
1019        // AWS returns success even for nonexistent
1020        svc.delete_state_machine(&req).unwrap();
1021    }
1022
1023    // ── UpdateStateMachine ──
1024
1025    #[test]
1026    fn update_state_machine() {
1027        let svc = StepFunctionsService::new(make_state());
1028        let arn = create_sm(&svc, "upd-sm");
1029
1030        let new_def = r#"{"StartAt":"NewPass","States":{"NewPass":{"Type":"Pass","End":true}}}"#;
1031        let body = json!({
1032            "stateMachineArn": arn,
1033            "definition": new_def,
1034            "description": "updated",
1035        });
1036        let req = make_request("UpdateStateMachine", &body.to_string());
1037        let resp = svc.update_state_machine(&req).unwrap();
1038        let b = body_json(&resp);
1039        assert!(b["updateDate"].as_f64().is_some());
1040
1041        // Verify
1042        let req = make_request(
1043            "DescribeStateMachine",
1044            &json!({"stateMachineArn": arn}).to_string(),
1045        );
1046        let resp = svc.describe_state_machine(&req).unwrap();
1047        let b = body_json(&resp);
1048        assert!(b["definition"].as_str().unwrap().contains("NewPass"));
1049        assert_eq!(b["description"], "updated");
1050    }
1051
1052    #[test]
1053    fn update_state_machine_not_found() {
1054        let svc = StepFunctionsService::new(make_state());
1055        let body = json!({
1056            "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1057            "definition": VALID_DEF,
1058        });
1059        let req = make_request("UpdateStateMachine", &body.to_string());
1060        let err = expect_err(svc.update_state_machine(&req));
1061        assert!(err.to_string().contains("StateMachineDoesNotExist"));
1062    }
1063
1064    // ── StartExecution ──
1065
1066    #[tokio::test]
1067    async fn start_execution_basic() {
1068        let svc = StepFunctionsService::new(make_state());
1069        let arn = create_sm(&svc, "exec-sm");
1070
1071        let body = json!({
1072            "stateMachineArn": arn,
1073            "input": r#"{"key":"value"}"#,
1074        });
1075        let req = make_request("StartExecution", &body.to_string());
1076        let resp = svc.start_execution(&req).unwrap();
1077        let b = body_json(&resp);
1078        assert!(b["executionArn"].as_str().is_some());
1079        assert!(b["startDate"].as_f64().is_some());
1080    }
1081
1082    #[tokio::test]
1083    async fn start_execution_with_name() {
1084        let svc = StepFunctionsService::new(make_state());
1085        let arn = create_sm(&svc, "named-exec");
1086
1087        let body = json!({
1088            "stateMachineArn": arn,
1089            "name": "my-execution",
1090        });
1091        let req = make_request("StartExecution", &body.to_string());
1092        let resp = svc.start_execution(&req).unwrap();
1093        let b = body_json(&resp);
1094        assert!(b["executionArn"].as_str().unwrap().contains("my-execution"));
1095    }
1096
1097    #[tokio::test]
1098    async fn start_execution_sm_not_found() {
1099        let svc = StepFunctionsService::new(make_state());
1100        let body = json!({
1101            "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1102        });
1103        let req = make_request("StartExecution", &body.to_string());
1104        let err = expect_err(svc.start_execution(&req));
1105        assert!(err.to_string().contains("StateMachineDoesNotExist"));
1106    }
1107
1108    #[tokio::test]
1109    async fn start_execution_invalid_input() {
1110        let svc = StepFunctionsService::new(make_state());
1111        let arn = create_sm(&svc, "bad-input");
1112
1113        let body = json!({
1114            "stateMachineArn": arn,
1115            "input": "not json",
1116        });
1117        let req = make_request("StartExecution", &body.to_string());
1118        let err = expect_err(svc.start_execution(&req));
1119        assert!(err.to_string().contains("InvalidExecutionInput"));
1120    }
1121
1122    #[tokio::test]
1123    async fn start_execution_duplicate_name() {
1124        let svc = StepFunctionsService::new(make_state());
1125        let arn = create_sm(&svc, "dup-exec");
1126
1127        let body = json!({
1128            "stateMachineArn": arn,
1129            "name": "same-name",
1130        });
1131        let req = make_request("StartExecution", &body.to_string());
1132        svc.start_execution(&req).unwrap();
1133
1134        let req = make_request("StartExecution", &body.to_string());
1135        let err = expect_err(svc.start_execution(&req));
1136        assert!(err.to_string().contains("ExecutionAlreadyExists"));
1137    }
1138
1139    // ── DescribeExecution ──
1140
1141    #[tokio::test]
1142    async fn describe_execution_found() {
1143        let svc = StepFunctionsService::new(make_state());
1144        let sm_arn = create_sm(&svc, "desc-exec");
1145
1146        let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
1147        let req = make_request("StartExecution", &body.to_string());
1148        let resp = svc.start_execution(&req).unwrap();
1149        let exec_arn = body_json(&resp)["executionArn"]
1150            .as_str()
1151            .unwrap()
1152            .to_string();
1153
1154        let req = make_request(
1155            "DescribeExecution",
1156            &json!({"executionArn": exec_arn}).to_string(),
1157        );
1158        let resp = svc.describe_execution(&req).unwrap();
1159        let b = body_json(&resp);
1160        assert_eq!(b["name"], "e1");
1161        assert_eq!(b["status"], "RUNNING");
1162    }
1163
1164    #[tokio::test]
1165    async fn describe_execution_not_found() {
1166        let svc = StepFunctionsService::new(make_state());
1167        let req = make_request(
1168            "DescribeExecution",
1169            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1170                .to_string(),
1171        );
1172        let err = expect_err(svc.describe_execution(&req));
1173        assert!(err.to_string().contains("ExecutionDoesNotExist"));
1174    }
1175
1176    // ── StopExecution ──
1177
1178    #[tokio::test]
1179    async fn stop_execution() {
1180        let svc = StepFunctionsService::new(make_state());
1181        let sm_arn = create_sm(&svc, "stop-sm");
1182
1183        let body = json!({"stateMachineArn": sm_arn, "name": "stop-e"});
1184        let req = make_request("StartExecution", &body.to_string());
1185        let resp = svc.start_execution(&req).unwrap();
1186        let exec_arn = body_json(&resp)["executionArn"]
1187            .as_str()
1188            .unwrap()
1189            .to_string();
1190
1191        let body = json!({
1192            "executionArn": exec_arn,
1193            "error": "UserAborted",
1194            "cause": "test stop",
1195        });
1196        let req = make_request("StopExecution", &body.to_string());
1197        let resp = svc.stop_execution(&req).unwrap();
1198        let b = body_json(&resp);
1199        assert!(b["stopDate"].as_f64().is_some());
1200
1201        // Verify aborted
1202        let req = make_request(
1203            "DescribeExecution",
1204            &json!({"executionArn": exec_arn}).to_string(),
1205        );
1206        let resp = svc.describe_execution(&req).unwrap();
1207        let b = body_json(&resp);
1208        assert_eq!(b["status"], "ABORTED");
1209        assert_eq!(b["error"], "UserAborted");
1210    }
1211
1212    #[tokio::test]
1213    async fn stop_execution_not_found() {
1214        let svc = StepFunctionsService::new(make_state());
1215        let req = make_request(
1216            "StopExecution",
1217            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1218                .to_string(),
1219        );
1220        let err = expect_err(svc.stop_execution(&req));
1221        assert!(err.to_string().contains("ExecutionDoesNotExist"));
1222    }
1223
1224    // ── ListExecutions ──
1225
1226    #[tokio::test]
1227    async fn list_executions() {
1228        let svc = StepFunctionsService::new(make_state());
1229        let sm_arn = create_sm(&svc, "list-exec");
1230
1231        for i in 0..3 {
1232            let body = json!({"stateMachineArn": sm_arn, "name": format!("e{i}")});
1233            let req = make_request("StartExecution", &body.to_string());
1234            svc.start_execution(&req).unwrap();
1235        }
1236
1237        let req = make_request(
1238            "ListExecutions",
1239            &json!({"stateMachineArn": sm_arn}).to_string(),
1240        );
1241        let resp = svc.list_executions(&req).unwrap();
1242        let b = body_json(&resp);
1243        assert_eq!(b["executions"].as_array().unwrap().len(), 3);
1244    }
1245
1246    #[tokio::test]
1247    async fn list_executions_sm_not_found() {
1248        let svc = StepFunctionsService::new(make_state());
1249        let req = make_request(
1250            "ListExecutions",
1251            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1252                .to_string(),
1253        );
1254        let err = expect_err(svc.list_executions(&req));
1255        assert!(err.to_string().contains("StateMachineDoesNotExist"));
1256    }
1257
1258    // ── GetExecutionHistory ──
1259
1260    #[tokio::test]
1261    async fn get_execution_history_not_found() {
1262        let svc = StepFunctionsService::new(make_state());
1263        let req = make_request(
1264            "GetExecutionHistory",
1265            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1266                .to_string(),
1267        );
1268        let err = expect_err(svc.get_execution_history(&req));
1269        assert!(err.to_string().contains("ExecutionDoesNotExist"));
1270    }
1271
1272    // ── DescribeStateMachineForExecution ──
1273
1274    #[tokio::test]
1275    async fn describe_sm_for_execution() {
1276        let svc = StepFunctionsService::new(make_state());
1277        let sm_arn = create_sm(&svc, "sm-for-exec");
1278
1279        let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
1280        let req = make_request("StartExecution", &body.to_string());
1281        let resp = svc.start_execution(&req).unwrap();
1282        let exec_arn = body_json(&resp)["executionArn"]
1283            .as_str()
1284            .unwrap()
1285            .to_string();
1286
1287        let req = make_request(
1288            "DescribeStateMachineForExecution",
1289            &json!({"executionArn": exec_arn}).to_string(),
1290        );
1291        let resp = svc.describe_state_machine_for_execution(&req).unwrap();
1292        let b = body_json(&resp);
1293        assert_eq!(b["name"], "sm-for-exec");
1294    }
1295
1296    // ── Tags ──
1297
1298    #[test]
1299    fn tag_untag_list_tags() {
1300        let svc = StepFunctionsService::new(make_state());
1301        let arn = create_sm(&svc, "tagged-sm");
1302
1303        // Tag
1304        let body = json!({
1305            "resourceArn": arn,
1306            "tags": [{"key": "env", "value": "prod"}],
1307        });
1308        let req = make_request("TagResource", &body.to_string());
1309        svc.tag_resource(&req).unwrap();
1310
1311        // List
1312        let req = make_request(
1313            "ListTagsForResource",
1314            &json!({"resourceArn": arn}).to_string(),
1315        );
1316        let resp = svc.list_tags_for_resource(&req).unwrap();
1317        let b = body_json(&resp);
1318        let tags = b["tags"].as_array().unwrap();
1319        assert_eq!(tags.len(), 1);
1320        assert_eq!(tags[0]["key"], "env");
1321
1322        // Untag
1323        let body = json!({
1324            "resourceArn": arn,
1325            "tagKeys": ["env"],
1326        });
1327        let req = make_request("UntagResource", &body.to_string());
1328        svc.untag_resource(&req).unwrap();
1329
1330        // Verify empty
1331        let req = make_request(
1332            "ListTagsForResource",
1333            &json!({"resourceArn": arn}).to_string(),
1334        );
1335        let resp = svc.list_tags_for_resource(&req).unwrap();
1336        let b = body_json(&resp);
1337        assert!(b["tags"].as_array().unwrap().is_empty());
1338    }
1339
1340    #[test]
1341    fn tag_resource_not_found() {
1342        let svc = StepFunctionsService::new(make_state());
1343        let body = json!({
1344            "resourceArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1345            "tags": [{"key": "k", "value": "v"}],
1346        });
1347        let req = make_request("TagResource", &body.to_string());
1348        let err = expect_err(svc.tag_resource(&req));
1349        assert!(err.to_string().contains("ResourceNotFound"));
1350    }
1351
1352    // ── Helper function tests ──
1353
1354    #[test]
1355    fn test_validate_name() {
1356        assert!(validate_name("valid-name").is_ok());
1357        assert!(validate_name("under_score").is_ok());
1358        assert!(validate_name("").is_err());
1359        assert!(validate_name("has spaces").is_err());
1360        assert!(validate_name(&"a".repeat(81)).is_err());
1361    }
1362
1363    #[test]
1364    fn test_validate_definition() {
1365        assert!(validate_definition(VALID_DEF).is_ok());
1366        assert!(validate_definition("not json").is_err());
1367        assert!(validate_definition(r#"{"States":{}}"#).is_err()); // missing StartAt
1368        assert!(validate_definition(r#"{"StartAt":"S"}"#).is_err()); // missing States
1369    }
1370
1371    #[test]
1372    fn test_validate_arn() {
1373        assert!(validate_arn("arn:aws:states:us-east-1:123:sm:test").is_ok());
1374        assert!(validate_arn("not-an-arn").is_err());
1375    }
1376
1377    #[test]
1378    fn test_camel_to_details_key() {
1379        assert_eq!(camel_to_details_key("PassStateEntered"), "passStateEntered");
1380        assert_eq!(camel_to_details_key(""), "");
1381    }
1382
1383    #[test]
1384    fn test_is_mutating_action() {
1385        assert!(is_mutating_action("CreateStateMachine"));
1386        assert!(is_mutating_action("StartExecution"));
1387        assert!(!is_mutating_action("DescribeStateMachine"));
1388        assert!(!is_mutating_action("ListStateMachines"));
1389    }
1390
1391    // ── StartSyncExecution ──
1392
1393    fn create_express_sm(svc: &StepFunctionsService, name: &str) -> String {
1394        let body = json!({
1395            "name": name,
1396            "definition": VALID_DEF,
1397            "roleArn": "arn:aws:iam::123456789012:role/test",
1398            "type": "EXPRESS",
1399        });
1400        let req = make_request("CreateStateMachine", &body.to_string());
1401        let resp = svc.create_state_machine(&req).unwrap();
1402        let b = body_json(&resp);
1403        b["stateMachineArn"].as_str().unwrap().to_string()
1404    }
1405
1406    #[tokio::test]
1407    async fn start_sync_execution_basic() {
1408        let svc = StepFunctionsService::new(make_state());
1409        let arn = create_express_sm(&svc, "sync-sm");
1410
1411        let body = json!({
1412            "stateMachineArn": arn,
1413            "input": r#"{"key":"value"}"#,
1414        });
1415        let req = make_request("StartSyncExecution", &body.to_string());
1416        let resp = svc.start_sync_execution(&req).await.unwrap();
1417        let b = body_json(&resp);
1418        assert!(b["executionArn"]
1419            .as_str()
1420            .unwrap()
1421            .contains("express:sync-sm"));
1422        assert_eq!(b["stateMachineArn"], arn);
1423        assert_eq!(b["status"], "SUCCEEDED");
1424        assert!(b["startDate"].as_i64().is_some());
1425        assert!(b["stopDate"].as_i64().is_some());
1426        assert!(b["output"].as_str().is_some());
1427        assert!(b["billingDetails"]["billedDurationInMilliseconds"]
1428            .as_i64()
1429            .is_some());
1430    }
1431
1432    #[tokio::test]
1433    async fn start_sync_execution_not_express() {
1434        let svc = StepFunctionsService::new(make_state());
1435        let arn = create_sm(&svc, "std-sm");
1436
1437        let body = json!({"stateMachineArn": arn});
1438        let req = make_request("StartSyncExecution", &body.to_string());
1439        let err = expect_err(svc.start_sync_execution(&req).await);
1440        assert!(err.to_string().contains("StateMachineTypeNotSupported"));
1441    }
1442
1443    #[tokio::test]
1444    async fn start_sync_execution_sm_not_found() {
1445        let svc = StepFunctionsService::new(make_state());
1446        let body = json!({
1447            "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1448        });
1449        let req = make_request("StartSyncExecution", &body.to_string());
1450        let err = expect_err(svc.start_sync_execution(&req).await);
1451        assert!(err.to_string().contains("StateMachineDoesNotExist"));
1452    }
1453
1454    #[tokio::test]
1455    async fn start_sync_execution_records_introspection_fields() {
1456        let svc = StepFunctionsService::new(make_state());
1457        let arn = create_express_sm(&svc, "sync-introspect");
1458
1459        let body = json!({"stateMachineArn": arn, "input": "{}"});
1460        let req = make_request("StartSyncExecution", &body.to_string());
1461        let resp = svc.start_sync_execution(&req).await.unwrap();
1462        let b = body_json(&resp);
1463        let exec_arn = b["executionArn"].as_str().unwrap().to_string();
1464
1465        let accounts = svc.state.read();
1466        let state = accounts.get("123456789012").unwrap();
1467        let stored = state
1468            .executions
1469            .get(&exec_arn)
1470            .expect("sync execution should be persisted for introspection");
1471        assert!(stored.is_sync, "sync executions must be marked is_sync");
1472        assert_eq!(stored.billed_memory_mb, Some(64));
1473        assert!(
1474            stored.billed_duration_ms.is_some(),
1475            "billed_duration_ms must be populated after sync run"
1476        );
1477        assert!(
1478            stored.parent_execution_arn.is_none(),
1479            "top-level sync execution has no parent"
1480        );
1481    }
1482
1483    #[tokio::test]
1484    async fn start_sync_execution_invalid_input() {
1485        let svc = StepFunctionsService::new(make_state());
1486        let arn = create_express_sm(&svc, "bad-input-sync");
1487
1488        let body = json!({
1489            "stateMachineArn": arn,
1490            "input": "not json",
1491        });
1492        let req = make_request("StartSyncExecution", &body.to_string());
1493        let err = expect_err(svc.start_sync_execution(&req).await);
1494        assert!(err.to_string().contains("InvalidExecutionInput"));
1495    }
1496}
1497
1498#[cfg(test)]
1499mod pagination_reject_test {
1500    #[test]
1501    fn paginate_checked_rejects_invalid_token() {
1502        use fakecloud_core::pagination::paginate_checked;
1503        let items: Vec<i32> = (0..5).collect();
1504        assert!(paginate_checked(&items, Some("bad"), 3).is_err());
1505        assert!(paginate_checked(&items, Some("2"), 3).is_ok());
1506    }
1507}