Skip to main content

fakecloud_stepfunctions/service/
mod.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use chrono::Utc;
6use http::StatusCode;
7use serde_json::{json, Value};
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::pagination::paginate;
12use fakecloud_core::registry::ServiceRegistry;
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_core::validation::*;
15use fakecloud_dynamodb::SharedDynamoDbState;
16use fakecloud_persistence::SnapshotStore;
17
18use crate::interpreter;
19use crate::state::{
20    Execution, ExecutionStatus, SharedStepFunctionsState, StateMachine, StateMachineStatus,
21    StateMachineType, StepFunctionsSnapshot, StepFunctionsState,
22    STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
23};
24
25const SUPPORTED: &[&str] = &[
26    "CreateStateMachine",
27    "DescribeStateMachine",
28    "ListStateMachines",
29    "DeleteStateMachine",
30    "UpdateStateMachine",
31    "TagResource",
32    "UntagResource",
33    "ListTagsForResource",
34    "StartExecution",
35    "StopExecution",
36    "DescribeExecution",
37    "ListExecutions",
38    "GetExecutionHistory",
39    "DescribeStateMachineForExecution",
40    "CreateActivity",
41    "DeleteActivity",
42    "DescribeActivity",
43    "ListActivities",
44    "GetActivityTask",
45    "SendTaskFailure",
46    "SendTaskHeartbeat",
47    "SendTaskSuccess",
48    "PublishStateMachineVersion",
49    "DeleteStateMachineVersion",
50    "ListStateMachineVersions",
51    "CreateStateMachineAlias",
52    "DeleteStateMachineAlias",
53    "DescribeStateMachineAlias",
54    "ListStateMachineAliases",
55    "UpdateStateMachineAlias",
56    "DescribeMapRun",
57    "ListMapRuns",
58    "UpdateMapRun",
59    "RedriveExecution",
60    "StartSyncExecution",
61    "TestState",
62    "ValidateStateMachineDefinition",
63];
64
65/// Handle to the central service registry, set by `main.rs` after every service
66/// has been registered. Wrapped in `OnceLock` so `StepFunctionsService` can be
67/// constructed (and registered into the very registry it later reads back) before
68/// the registry itself is finalized. The interpreter snapshots the inner `Arc`
69/// when it needs to dispatch generic `aws-sdk:*` Task integrations.
70pub type SharedServiceRegistry = Arc<std::sync::OnceLock<Arc<ServiceRegistry>>>;
71
72pub struct StepFunctionsService {
73    state: SharedStepFunctionsState,
74    delivery: Option<Arc<DeliveryBus>>,
75    dynamodb_state: Option<SharedDynamoDbState>,
76    registry: Option<SharedServiceRegistry>,
77    snapshot_store: Option<Arc<dyn SnapshotStore>>,
78    snapshot_lock: Arc<AsyncMutex<()>>,
79}
80
81mod activities;
82mod executions;
83mod map_runs;
84mod state_machines;
85mod tags;
86mod tasks;
87mod validation;
88
89impl StepFunctionsService {
90    pub fn new(state: SharedStepFunctionsState) -> Self {
91        Self {
92            state,
93            delivery: None,
94            dynamodb_state: None,
95            registry: None,
96            snapshot_store: None,
97            snapshot_lock: Arc::new(AsyncMutex::new(())),
98        }
99    }
100
101    pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
102        self.delivery = Some(delivery);
103        self
104    }
105
106    pub fn with_dynamodb(mut self, dynamodb_state: SharedDynamoDbState) -> Self {
107        self.dynamodb_state = Some(dynamodb_state);
108        self
109    }
110
111    /// Hand the service a deferred-fill handle to the central [`ServiceRegistry`].
112    /// `main.rs` calls [`OnceLock::set`] on the inner cell after every service
113    /// has been registered; until then the interpreter falls back to its
114    /// hand-coded SDK integrations (lambda invoke, sqs sendMessage, …).
115    pub fn with_registry(mut self, registry: SharedServiceRegistry) -> Self {
116        self.registry = Some(registry);
117        self
118    }
119
120    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
121        self.snapshot_store = Some(store);
122        self
123    }
124
125    async fn save_snapshot(&self) {
126        let Some(store) = self.snapshot_store.clone() else {
127            return;
128        };
129        let _guard = self.snapshot_lock.lock().await;
130        let snapshot = StepFunctionsSnapshot {
131            schema_version: STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION,
132            state: None,
133            accounts: Some(self.state.read().clone()),
134        };
135        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
136            let bytes = serde_json::to_vec(&snapshot)
137                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
138            store.save(&bytes)
139        })
140        .await;
141        match join {
142            Ok(Ok(())) => {}
143            Ok(Err(err)) => tracing::error!(%err, "failed to write stepfunctions snapshot"),
144            Err(err) => tracing::error!(%err, "stepfunctions snapshot task panicked"),
145        }
146    }
147}
148
149fn is_mutating_action(action: &str) -> bool {
150    matches!(
151        action,
152        "CreateStateMachine"
153            | "DeleteStateMachine"
154            | "UpdateStateMachine"
155            | "TagResource"
156            | "UntagResource"
157            | "StartExecution"
158            | "StopExecution"
159            | "CreateActivity"
160            | "DeleteActivity"
161            | "GetActivityTask"
162            | "SendTaskFailure"
163            | "SendTaskHeartbeat"
164            | "SendTaskSuccess"
165            | "PublishStateMachineVersion"
166            | "DeleteStateMachineVersion"
167            | "CreateStateMachineAlias"
168            | "DeleteStateMachineAlias"
169            | "UpdateStateMachineAlias"
170            | "UpdateMapRun"
171            | "RedriveExecution"
172            | "StartSyncExecution"
173    )
174}
175
176#[async_trait]
177impl AwsService for StepFunctionsService {
178    fn service_name(&self) -> &str {
179        "states"
180    }
181
182    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
183        let mutates = is_mutating_action(req.action.as_str());
184        let result = match req.action.as_str() {
185            "CreateStateMachine" => self.create_state_machine(&req),
186            "DescribeStateMachine" => self.describe_state_machine(&req),
187            "ListStateMachines" => self.list_state_machines(&req),
188            "DeleteStateMachine" => self.delete_state_machine(&req),
189            "UpdateStateMachine" => self.update_state_machine(&req),
190            "TagResource" => self.tag_resource(&req),
191            "UntagResource" => self.untag_resource(&req),
192            "ListTagsForResource" => self.list_tags_for_resource(&req),
193            "StartExecution" => self.start_execution(&req),
194            "StopExecution" => self.stop_execution(&req),
195            "DescribeExecution" => self.describe_execution(&req),
196            "ListExecutions" => self.list_executions(&req),
197            "GetExecutionHistory" => self.get_execution_history(&req),
198            "DescribeStateMachineForExecution" => self.describe_state_machine_for_execution(&req),
199            "CreateActivity" => self.create_activity(&req),
200            "DeleteActivity" => self.delete_activity(&req),
201            "DescribeActivity" => self.describe_activity(&req),
202            "ListActivities" => self.list_activities(&req),
203            "GetActivityTask" => self.get_activity_task(&req).await,
204            "SendTaskFailure" => self.send_task_failure(&req),
205            "SendTaskHeartbeat" => self.send_task_heartbeat(&req),
206            "SendTaskSuccess" => self.send_task_success(&req),
207            "PublishStateMachineVersion" => self.publish_state_machine_version(&req),
208            "DeleteStateMachineVersion" => self.delete_state_machine_version(&req),
209            "ListStateMachineVersions" => self.list_state_machine_versions(&req),
210            "CreateStateMachineAlias" => self.create_state_machine_alias(&req),
211            "DeleteStateMachineAlias" => self.delete_state_machine_alias(&req),
212            "DescribeStateMachineAlias" => self.describe_state_machine_alias(&req),
213            "ListStateMachineAliases" => self.list_state_machine_aliases(&req),
214            "UpdateStateMachineAlias" => self.update_state_machine_alias(&req),
215            "DescribeMapRun" => self.describe_map_run(&req),
216            "ListMapRuns" => self.list_map_runs(&req),
217            "UpdateMapRun" => self.update_map_run(&req),
218            "RedriveExecution" => self.redrive_execution(&req),
219            "StartSyncExecution" => self.start_sync_execution(&req).await,
220            "TestState" => self.test_state(&req),
221            "ValidateStateMachineDefinition" => self.validate_state_machine_definition(&req),
222            _ => Err(AwsServiceError::action_not_implemented(
223                "states",
224                &req.action,
225            )),
226        };
227        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
228            self.save_snapshot().await;
229        }
230        result
231    }
232
233    fn supported_actions(&self) -> &[&str] {
234        SUPPORTED
235    }
236}
237
238impl StepFunctionsService {
239    fn list_activities(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
240        let body = req.json_body();
241        let raw_max_results = body["maxResults"].as_i64();
242        if let Some(mr) = raw_max_results {
243            validate_max_results(mr)?;
244        }
245        let next_token = body["nextToken"].as_str();
246        if let Some(t) = next_token {
247            validate_page_token(t)?;
248        }
249        // Smithy default: maxResults=0 means "use the service default"
250        // (100 for Step Functions), which we honour by treating both
251        // `None` and `0` as 100.
252        let max_results = match raw_max_results.unwrap_or(0) {
253            0 => 100,
254            n => n as usize,
255        };
256        let accounts = self.state.read();
257        let empty = crate::state::StepFunctionsState::new(&req.account_id, &req.region);
258        let state = accounts.get(&req.account_id).unwrap_or(&empty);
259        let mut activities: Vec<&crate::state::Activity> = state.activities.values().collect();
260        activities.sort_by(|a, b| a.name.cmp(&b.name));
261        let items: Vec<Value> = activities
262            .iter()
263            .map(|a| {
264                json!({
265                    "activityArn": a.arn,
266                    "name": a.name,
267                    "creationDate": a.creation_date.timestamp(),
268                })
269            })
270            .collect();
271        let (page, token) = paginate(&items, next_token, max_results);
272        let mut resp = json!({ "activities": page });
273        if let Some(t) = token {
274            resp["nextToken"] = json!(t);
275        }
276        Ok(AwsResponse::ok_json(resp))
277    }
278}
279
280fn state_machine_alias_to_json(alias: &crate::state::StateMachineAlias) -> Value {
281    json!({
282        "stateMachineAliasArn": alias.arn,
283        "name": alias.name,
284        "description": alias.description,
285        "routingConfiguration": alias.routing_configuration.iter().map(|r| json!({
286            "stateMachineVersionArn": r.state_machine_version_arn,
287            "weight": r.weight,
288        })).collect::<Vec<_>>(),
289        "creationDate": alias.creation_date.timestamp(),
290        "updateDate": alias.update_date.timestamp(),
291    })
292}
293
294fn map_run_to_json(mr: &crate::state::MapRun) -> Value {
295    json!({
296        "mapRunArn": mr.map_run_arn,
297        "executionArn": mr.execution_arn,
298        "maxConcurrency": mr.max_concurrency,
299        "toleratedFailurePercentage": mr.tolerated_failure_percentage,
300        "toleratedFailureCount": mr.tolerated_failure_count,
301        "status": mr.status,
302        "startDate": mr.start_date.timestamp(),
303        "stopDate": mr.stop_date.map(|d| d.timestamp()),
304    })
305}
306
307// ─── Helpers ────────────────────────────────────────────────────────────
308
309fn state_machine_to_json(sm: &StateMachine) -> Value {
310    let mut resp = json!({
311        "name": sm.name,
312        "stateMachineArn": sm.arn,
313        "definition": sm.definition,
314        "roleArn": sm.role_arn,
315        "type": sm.machine_type.as_str(),
316        "status": sm.status.as_str(),
317        "creationDate": sm.creation_date.timestamp() as f64,
318        "updateDate": sm.update_date.timestamp() as f64,
319        "revisionId": sm.revision_id,
320        "label": sm.name,
321    });
322
323    if !sm.description.is_empty() {
324        resp["description"] = json!(sm.description);
325    }
326
327    if let Some(ref logging) = sm.logging_configuration {
328        resp["loggingConfiguration"] = logging.clone();
329    } else {
330        resp["loggingConfiguration"] = json!({
331            "level": "OFF",
332            "includeExecutionData": false,
333            "destinations": [],
334        });
335    }
336
337    if let Some(ref tracing) = sm.tracing_configuration {
338        resp["tracingConfiguration"] = tracing.clone();
339    } else {
340        resp["tracingConfiguration"] = json!({
341            "enabled": false,
342        });
343    }
344
345    resp
346}
347
348fn missing(name: &str) -> AwsServiceError {
349    AwsServiceError::aws_error(
350        StatusCode::BAD_REQUEST,
351        "ValidationException",
352        format!("The request must contain the parameter {name}."),
353    )
354}
355
356fn state_machine_not_found(arn: &str) -> AwsServiceError {
357    AwsServiceError::aws_error(
358        StatusCode::BAD_REQUEST,
359        "StateMachineDoesNotExist",
360        format!("State Machine Does Not Exist: '{arn}'"),
361    )
362}
363
364fn activity_not_found(arn: &str) -> AwsServiceError {
365    AwsServiceError::aws_error(
366        StatusCode::BAD_REQUEST,
367        "ActivityDoesNotExist",
368        format!("Activity does not exist: {arn}"),
369    )
370}
371
372fn task_does_not_exist(token: &str) -> AwsServiceError {
373    AwsServiceError::aws_error(
374        StatusCode::BAD_REQUEST,
375        "TaskDoesNotExist",
376        format!("Task does not exist: {token}"),
377    )
378}
379
380fn resource_not_found(arn: &str) -> AwsServiceError {
381    AwsServiceError::aws_error(
382        StatusCode::BAD_REQUEST,
383        "ResourceNotFound",
384        format!("Resource not found: '{arn}'"),
385    )
386}
387
388/// Parse + validate an alias `routingConfiguration` array.
389///
390/// AWS rules: 1 or 2 routes; weights are 0-100 and sum to 100; each
391/// route must include `stateMachineVersionArn`.
392fn parse_routing_configuration(
393    routes: &[serde_json::Value],
394) -> Result<Vec<crate::state::AliasRoute>, AwsServiceError> {
395    if routes.is_empty() || routes.len() > 2 {
396        return Err(AwsServiceError::aws_error(
397            StatusCode::BAD_REQUEST,
398            "ValidationException",
399            "routingConfiguration must contain 1 or 2 routes.",
400        ));
401    }
402    let parsed: Vec<crate::state::AliasRoute> = routes
403        .iter()
404        .map(|r| {
405            let arn = r["stateMachineVersionArn"].as_str().ok_or_else(|| {
406                AwsServiceError::aws_error(
407                    StatusCode::BAD_REQUEST,
408                    "ValidationException",
409                    "routingConfiguration entries must contain stateMachineVersionArn.",
410                )
411            })?;
412            let weight = r["weight"].as_i64().ok_or_else(|| {
413                AwsServiceError::aws_error(
414                    StatusCode::BAD_REQUEST,
415                    "ValidationException",
416                    "routingConfiguration entries must contain a numeric weight.",
417                )
418            })?;
419            if !(0..=100).contains(&weight) {
420                return Err(AwsServiceError::aws_error(
421                    StatusCode::BAD_REQUEST,
422                    "ValidationException",
423                    format!("Invalid routing weight {weight}; must be 0-100."),
424                ));
425            }
426            Ok(crate::state::AliasRoute {
427                state_machine_version_arn: arn.to_string(),
428                weight: weight as i32,
429            })
430        })
431        .collect::<Result<_, _>>()?;
432    let total: i32 = parsed.iter().map(|r| r.weight).sum();
433    if total != 100 {
434        return Err(AwsServiceError::aws_error(
435            StatusCode::BAD_REQUEST,
436            "ValidationException",
437            format!("routingConfiguration weights must sum to 100, got {total}."),
438        ));
439    }
440    Ok(parsed)
441}
442
443fn validate_name(name: &str) -> Result<(), AwsServiceError> {
444    if name.is_empty() || name.len() > 80 {
445        return Err(AwsServiceError::aws_error(
446            StatusCode::BAD_REQUEST,
447            "InvalidName",
448            format!("Invalid Name: '{name}' (length must be between 1 and 80 characters)"),
449        ));
450    }
451    // Only allow alphanumeric, hyphens, and underscores
452    if !name
453        .chars()
454        .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
455    {
456        return Err(AwsServiceError::aws_error(
457            StatusCode::BAD_REQUEST,
458            "InvalidName",
459            format!(
460                "Invalid Name: '{name}' (must only contain alphanumeric characters, hyphens, and underscores)"
461            ),
462        ));
463    }
464    Ok(())
465}
466
467fn validate_definition(definition: &str) -> Result<(), AwsServiceError> {
468    let parsed: Value = serde_json::from_str(definition).map_err(|e| {
469        AwsServiceError::aws_error(
470            StatusCode::BAD_REQUEST,
471            "InvalidDefinition",
472            format!("Invalid State Machine Definition: '{e}'"),
473        )
474    })?;
475
476    if parsed.get("StartAt").and_then(|v| v.as_str()).is_none() {
477        return Err(AwsServiceError::aws_error(
478            StatusCode::BAD_REQUEST,
479            "InvalidDefinition",
480            "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
481                .to_string(),
482        ));
483    }
484
485    let states_obj = parsed
486        .get("States")
487        .and_then(|v| v.as_object())
488        .ok_or_else(|| {
489            AwsServiceError::aws_error(
490                StatusCode::BAD_REQUEST,
491                "InvalidDefinition",
492                "Invalid State Machine Definition: 'MISSING_STATES' (States field is required)"
493                    .to_string(),
494            )
495        })?;
496
497    let start_at = parsed["StartAt"].as_str().ok_or_else(|| {
498        AwsServiceError::aws_error(
499            StatusCode::BAD_REQUEST,
500            "InvalidDefinition",
501            "Invalid State Machine Definition: 'MISSING_START_AT' (StartAt field is required)"
502                .to_string(),
503        )
504    })?;
505    if !states_obj.contains_key(start_at) {
506        return Err(AwsServiceError::aws_error(
507            StatusCode::BAD_REQUEST,
508            "InvalidDefinition",
509            format!(
510                "Invalid State Machine Definition: 'MISSING_TRANSITION_TARGET' \
511                 (StartAt '{start_at}' does not reference a valid state)"
512            ),
513        ));
514    }
515
516    Ok(())
517}
518
519fn execution_not_found(arn: &str) -> AwsServiceError {
520    AwsServiceError::aws_error(
521        StatusCode::BAD_REQUEST,
522        "ExecutionDoesNotExist",
523        format!("Execution Does Not Exist: '{arn}'"),
524    )
525}
526
527fn execution_to_json(exec: &Execution) -> Value {
528    let mut resp = json!({
529        "executionArn": exec.execution_arn,
530        "stateMachineArn": exec.state_machine_arn,
531        "name": exec.name,
532        "status": exec.status.as_str(),
533        "startDate": exec.start_date.timestamp() as f64,
534    });
535
536    if let Some(ref input) = exec.input {
537        resp["input"] = json!(input);
538    }
539    if let Some(ref output) = exec.output {
540        resp["output"] = json!(output);
541    }
542    if let Some(stop) = exec.stop_date {
543        resp["stopDate"] = json!(stop.timestamp() as f64);
544    }
545    if let Some(ref error) = exec.error {
546        resp["error"] = json!(error);
547    }
548    if let Some(ref cause) = exec.cause {
549        resp["cause"] = json!(cause);
550    }
551
552    resp
553}
554
555/// Convert event type like "PassStateEntered" to the details key format "passStateEntered".
556fn camel_to_details_key(event_type: &str) -> String {
557    let mut chars = event_type.chars();
558    match chars.next() {
559        None => String::new(),
560        Some(c) => c.to_lowercase().to_string() + chars.as_str(),
561    }
562}
563
564fn validate_arn(arn: &str) -> Result<(), AwsServiceError> {
565    if !arn.starts_with("arn:") {
566        return Err(AwsServiceError::aws_error(
567            StatusCode::BAD_REQUEST,
568            "InvalidArn",
569            format!("Invalid Arn: '{arn}'"),
570        ));
571    }
572    Ok(())
573}
574
575/// Enforce the Smithy @length on an ARN-typed field (`Arn` = 1..=256,
576/// `LongArn` = 1..=2000). Empty / oversize ARNs map to `InvalidArn`, which
577/// every ARN-bearing Step Functions operation declares.
578fn validate_arn_length(field: &str, value: &str, max: usize) -> Result<(), AwsServiceError> {
579    if value.is_empty() || value.len() > max {
580        return Err(AwsServiceError::aws_error(
581            StatusCode::BAD_REQUEST,
582            "InvalidArn",
583            format!("Invalid Arn at '{field}': must be 1..={max} characters"),
584        ));
585    }
586    Ok(())
587}
588
589/// `nextToken` is declared as `PageToken` (length 1..=1024). The only
590/// error code declared on every paginated list op is `InvalidToken`, so
591/// route both empty and oversize tokens through it.
592fn validate_page_token(value: &str) -> Result<(), AwsServiceError> {
593    if value.is_empty() || value.len() > 1024 {
594        return Err(AwsServiceError::aws_error(
595            StatusCode::BAD_REQUEST,
596            "InvalidToken",
597            "nextToken must be 1..=1024 characters",
598        ));
599    }
600    Ok(())
601}
602
603/// `maxResults` is typed as `PageSize` (range 0..=1000). The negative
604/// probes flip below-min / above-max; since the only error declared on
605/// `ListActivities` is `InvalidToken`, we map page-size violations to
606/// the same code (matches how AWS surfaces malformed pagination input
607/// for ops that don't model a separate `ValidationException`).
608fn validate_max_results(value: i64) -> Result<(), AwsServiceError> {
609    if !(0..=1000).contains(&value) {
610        return Err(AwsServiceError::aws_error(
611            StatusCode::BAD_REQUEST,
612            "InvalidToken",
613            format!("maxResults '{value}' is outside 0..=1000"),
614        ));
615    }
616    Ok(())
617}
618
619/// Start a Step Functions execution from a cross-service delivery (e.g. EventBridge).
620///
621/// This is the public entry point used by `StepFunctionsDeliveryImpl` in the server crate.
622/// It mirrors the logic from `StartExecution` but without the AWS request/response wrapper.
623/// Start a Step Functions execution from a cross-service delivery (e.g. EventBridge).
624///
625/// This is the public entry point used by `StepFunctionsDeliveryImpl` in the server crate.
626/// It mirrors the logic from `StartExecution` but without the AWS request/response wrapper.
627pub fn start_execution_from_delivery(
628    state: &SharedStepFunctionsState,
629    delivery: &Option<Arc<DeliveryBus>>,
630    dynamodb_state: &Option<SharedDynamoDbState>,
631    registry: &Option<SharedServiceRegistry>,
632    state_machine_arn: &str,
633    input: &str,
634) {
635    // Validate input is valid JSON
636    if serde_json::from_str::<serde_json::Value>(input).is_err() {
637        tracing::warn!(
638            state_machine_arn,
639            "Step Functions delivery: invalid JSON input, skipping execution"
640        );
641        return;
642    }
643
644    let execution_name = uuid::Uuid::new_v4().to_string();
645
646    // Extract account_id from the state machine ARN
647    let account_id = state_machine_arn
648        .split(':')
649        .nth(4)
650        .unwrap_or("000000000000")
651        .to_string();
652
653    let mut accounts = state.write();
654    let st = accounts.get_or_create(&account_id);
655    let sm = match st.state_machines.get(state_machine_arn) {
656        Some(sm) => sm,
657        None => {
658            tracing::warn!(
659                state_machine_arn,
660                "Step Functions delivery: state machine not found"
661            );
662            return;
663        }
664    };
665
666    let sm_name = sm.name.clone();
667    let definition = sm.definition.clone();
668    let exec_arn = st.execution_arn(&sm_name, &execution_name);
669
670    let now = Utc::now();
671    let execution = Execution {
672        execution_arn: exec_arn.clone(),
673        state_machine_arn: state_machine_arn.to_string(),
674        state_machine_name: sm_name,
675        name: execution_name,
676        status: ExecutionStatus::Running,
677        input: Some(input.to_string()),
678        output: None,
679        start_date: now,
680        stop_date: None,
681        error: None,
682        cause: None,
683        history_events: vec![],
684        parent_execution_arn: None,
685        is_sync: false,
686        billed_duration_ms: None,
687        billed_memory_mb: None,
688    };
689
690    st.executions.insert(exec_arn.clone(), execution);
691    let logging_config = sm.logging_configuration.clone();
692    drop(accounts);
693
694    let shared_state = state.clone();
695    let delivery = delivery.clone();
696    let dynamodb_state = dynamodb_state.clone();
697    let registry = registry.clone();
698    let input = Some(input.to_string());
699    tokio::spawn(async move {
700        interpreter::execute_state_machine(
701            shared_state,
702            exec_arn,
703            definition,
704            input,
705            delivery,
706            dynamodb_state,
707            registry,
708            logging_config,
709        )
710        .await;
711    });
712}
713
714#[cfg(test)]
715mod tests {
716    use super::*;
717    use http::{HeaderMap, Method};
718    use parking_lot::RwLock;
719    use serde_json::Value;
720    use std::collections::HashMap;
721    use std::sync::Arc;
722
723    fn make_state() -> SharedStepFunctionsState {
724        Arc::new(RwLock::new(
725            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
726        ))
727    }
728
729    fn make_request(action: &str, body: &str) -> AwsRequest {
730        AwsRequest {
731            service: "states".to_string(),
732            action: action.to_string(),
733            region: "us-east-1".to_string(),
734            account_id: "123456789012".to_string(),
735            request_id: "test-id".to_string(),
736            headers: HeaderMap::new(),
737            query_params: HashMap::new(),
738            body: body.as_bytes().to_vec().into(),
739            body_stream: parking_lot::Mutex::new(None),
740            path_segments: vec![],
741            raw_path: "/".to_string(),
742            raw_query: String::new(),
743            method: Method::POST,
744            is_query_protocol: false,
745            access_key_id: None,
746            principal: None,
747        }
748    }
749
750    fn body_json(resp: &AwsResponse) -> Value {
751        serde_json::from_slice(resp.body.expect_bytes()).unwrap()
752    }
753
754    fn expect_err(result: Result<AwsResponse, AwsServiceError>) -> AwsServiceError {
755        match result {
756            Err(e) => e,
757            Ok(_) => panic!("expected error, got Ok"),
758        }
759    }
760
761    const VALID_DEF: &str = r#"{"StartAt":"Pass","States":{"Pass":{"Type":"Pass","End":true}}}"#;
762
763    fn create_sm(svc: &StepFunctionsService, name: &str) -> String {
764        let body = json!({
765            "name": name,
766            "definition": VALID_DEF,
767            "roleArn": "arn:aws:iam::123456789012:role/test",
768        });
769        let req = make_request("CreateStateMachine", &body.to_string());
770        let resp = svc.create_state_machine(&req).unwrap();
771        let b = body_json(&resp);
772        b["stateMachineArn"].as_str().unwrap().to_string()
773    }
774
775    // ── CreateStateMachine ──
776
777    #[test]
778    fn create_state_machine_basic() {
779        let svc = StepFunctionsService::new(make_state());
780        let arn = create_sm(&svc, "test-sm");
781        assert!(arn.contains("test-sm"));
782    }
783
784    #[test]
785    fn create_state_machine_with_express_type() {
786        let svc = StepFunctionsService::new(make_state());
787        let body = json!({
788            "name": "express-sm",
789            "definition": VALID_DEF,
790            "roleArn": "arn:aws:iam::123456789012:role/r",
791            "type": "EXPRESS",
792        });
793        let req = make_request("CreateStateMachine", &body.to_string());
794        let resp = svc.create_state_machine(&req).unwrap();
795        let b = body_json(&resp);
796        assert!(b["stateMachineArn"].as_str().is_some());
797    }
798
799    #[test]
800    fn create_state_machine_duplicate_fails() {
801        let svc = StepFunctionsService::new(make_state());
802        create_sm(&svc, "dup-sm");
803        let body = json!({
804            "name": "dup-sm",
805            "definition": VALID_DEF,
806            "roleArn": "arn:aws:iam::123456789012:role/r",
807        });
808        let req = make_request("CreateStateMachine", &body.to_string());
809        let err = expect_err(svc.create_state_machine(&req));
810        assert!(err.to_string().contains("StateMachineAlreadyExists"));
811    }
812
813    #[test]
814    fn create_state_machine_missing_name() {
815        let svc = StepFunctionsService::new(make_state());
816        let body = json!({
817            "definition": VALID_DEF,
818            "roleArn": "arn:aws:iam::123456789012:role/r",
819        });
820        let req = make_request("CreateStateMachine", &body.to_string());
821        assert!(svc.create_state_machine(&req).is_err());
822    }
823
824    #[test]
825    fn create_state_machine_invalid_definition() {
826        let svc = StepFunctionsService::new(make_state());
827        let body = json!({
828            "name": "bad-def",
829            "definition": "not json",
830            "roleArn": "arn:aws:iam::123456789012:role/r",
831        });
832        let req = make_request("CreateStateMachine", &body.to_string());
833        let err = expect_err(svc.create_state_machine(&req));
834        assert!(err.to_string().contains("InvalidDefinition"));
835    }
836
837    #[test]
838    fn create_state_machine_definition_missing_start_at() {
839        let svc = StepFunctionsService::new(make_state());
840        let body = json!({
841            "name": "no-start",
842            "definition": r#"{"States":{"S":{"Type":"Pass","End":true}}}"#,
843            "roleArn": "arn:aws:iam::123456789012:role/r",
844        });
845        let req = make_request("CreateStateMachine", &body.to_string());
846        let err = expect_err(svc.create_state_machine(&req));
847        assert!(err.to_string().contains("InvalidDefinition"));
848    }
849
850    #[test]
851    fn create_state_machine_definition_missing_states() {
852        let svc = StepFunctionsService::new(make_state());
853        let body = json!({
854            "name": "no-states",
855            "definition": r#"{"StartAt":"S"}"#,
856            "roleArn": "arn:aws:iam::123456789012:role/r",
857        });
858        let req = make_request("CreateStateMachine", &body.to_string());
859        let err = expect_err(svc.create_state_machine(&req));
860        assert!(err.to_string().contains("InvalidDefinition"));
861    }
862
863    #[test]
864    fn create_state_machine_definition_start_at_not_in_states() {
865        let svc = StepFunctionsService::new(make_state());
866        let body = json!({
867            "name": "bad-start",
868            "definition": r#"{"StartAt":"Missing","States":{"S":{"Type":"Pass","End":true}}}"#,
869            "roleArn": "arn:aws:iam::123456789012:role/r",
870        });
871        let req = make_request("CreateStateMachine", &body.to_string());
872        let err = expect_err(svc.create_state_machine(&req));
873        assert!(err.to_string().contains("MISSING_TRANSITION_TARGET"));
874    }
875
876    #[test]
877    fn create_state_machine_invalid_type() {
878        let svc = StepFunctionsService::new(make_state());
879        let body = json!({
880            "name": "bad-type",
881            "definition": VALID_DEF,
882            "roleArn": "arn:aws:iam::123456789012:role/r",
883            "type": "INVALID",
884        });
885        let req = make_request("CreateStateMachine", &body.to_string());
886        assert!(svc.create_state_machine(&req).is_err());
887    }
888
889    #[test]
890    fn create_state_machine_invalid_arn() {
891        let svc = StepFunctionsService::new(make_state());
892        let body = json!({
893            "name": "bad-arn",
894            "definition": VALID_DEF,
895            "roleArn": "not-an-arn",
896        });
897        let req = make_request("CreateStateMachine", &body.to_string());
898        let err = expect_err(svc.create_state_machine(&req));
899        assert!(err.to_string().contains("InvalidArn"));
900    }
901
902    #[test]
903    fn create_state_machine_invalid_name() {
904        let svc = StepFunctionsService::new(make_state());
905        let body = json!({
906            "name": "has spaces!",
907            "definition": VALID_DEF,
908            "roleArn": "arn:aws:iam::123456789012:role/r",
909        });
910        let req = make_request("CreateStateMachine", &body.to_string());
911        let err = expect_err(svc.create_state_machine(&req));
912        assert!(err.to_string().contains("InvalidName"));
913    }
914
915    #[test]
916    fn create_state_machine_name_too_long() {
917        let svc = StepFunctionsService::new(make_state());
918        let long_name = "a".repeat(81);
919        let body = json!({
920            "name": long_name,
921            "definition": VALID_DEF,
922            "roleArn": "arn:aws:iam::123456789012:role/r",
923        });
924        let req = make_request("CreateStateMachine", &body.to_string());
925        let err = expect_err(svc.create_state_machine(&req));
926        assert!(err.to_string().contains("InvalidName"));
927    }
928
929    // ── DescribeStateMachine ──
930
931    #[test]
932    fn describe_state_machine_found() {
933        let svc = StepFunctionsService::new(make_state());
934        let arn = create_sm(&svc, "desc-sm");
935
936        let req = make_request(
937            "DescribeStateMachine",
938            &json!({"stateMachineArn": arn}).to_string(),
939        );
940        let resp = svc.describe_state_machine(&req).unwrap();
941        let b = body_json(&resp);
942        assert_eq!(b["name"], "desc-sm");
943        assert_eq!(b["status"], "ACTIVE");
944        assert!(b["definition"].as_str().is_some());
945    }
946
947    #[test]
948    fn describe_state_machine_not_found() {
949        let svc = StepFunctionsService::new(make_state());
950        let req = make_request(
951            "DescribeStateMachine",
952            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
953                .to_string(),
954        );
955        let err = expect_err(svc.describe_state_machine(&req));
956        assert!(err.to_string().contains("StateMachineDoesNotExist"));
957    }
958
959    // ── ListStateMachines ──
960
961    #[test]
962    fn list_state_machines_empty() {
963        let svc = StepFunctionsService::new(make_state());
964        let req = make_request("ListStateMachines", "{}");
965        let resp = svc.list_state_machines(&req).unwrap();
966        let b = body_json(&resp);
967        assert!(b["stateMachines"].as_array().unwrap().is_empty());
968    }
969
970    #[test]
971    fn list_state_machines_returns_created() {
972        let svc = StepFunctionsService::new(make_state());
973        create_sm(&svc, "sm-1");
974        create_sm(&svc, "sm-2");
975
976        let req = make_request("ListStateMachines", "{}");
977        let resp = svc.list_state_machines(&req).unwrap();
978        let b = body_json(&resp);
979        assert_eq!(b["stateMachines"].as_array().unwrap().len(), 2);
980    }
981
982    // ── DeleteStateMachine ──
983
984    #[test]
985    fn delete_state_machine() {
986        let svc = StepFunctionsService::new(make_state());
987        let arn = create_sm(&svc, "del-sm");
988
989        let req = make_request(
990            "DeleteStateMachine",
991            &json!({"stateMachineArn": arn}).to_string(),
992        );
993        svc.delete_state_machine(&req).unwrap();
994
995        // Describe should fail
996        let req = make_request(
997            "DescribeStateMachine",
998            &json!({"stateMachineArn": arn}).to_string(),
999        );
1000        assert!(svc.describe_state_machine(&req).is_err());
1001    }
1002
1003    #[test]
1004    fn delete_state_machine_nonexistent_succeeds() {
1005        let svc = StepFunctionsService::new(make_state());
1006        let req = make_request(
1007            "DeleteStateMachine",
1008            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1009                .to_string(),
1010        );
1011        // AWS returns success even for nonexistent
1012        svc.delete_state_machine(&req).unwrap();
1013    }
1014
1015    // ── UpdateStateMachine ──
1016
1017    #[test]
1018    fn update_state_machine() {
1019        let svc = StepFunctionsService::new(make_state());
1020        let arn = create_sm(&svc, "upd-sm");
1021
1022        let new_def = r#"{"StartAt":"NewPass","States":{"NewPass":{"Type":"Pass","End":true}}}"#;
1023        let body = json!({
1024            "stateMachineArn": arn,
1025            "definition": new_def,
1026            "description": "updated",
1027        });
1028        let req = make_request("UpdateStateMachine", &body.to_string());
1029        let resp = svc.update_state_machine(&req).unwrap();
1030        let b = body_json(&resp);
1031        assert!(b["updateDate"].as_f64().is_some());
1032
1033        // Verify
1034        let req = make_request(
1035            "DescribeStateMachine",
1036            &json!({"stateMachineArn": arn}).to_string(),
1037        );
1038        let resp = svc.describe_state_machine(&req).unwrap();
1039        let b = body_json(&resp);
1040        assert!(b["definition"].as_str().unwrap().contains("NewPass"));
1041        assert_eq!(b["description"], "updated");
1042    }
1043
1044    #[test]
1045    fn update_state_machine_not_found() {
1046        let svc = StepFunctionsService::new(make_state());
1047        let body = json!({
1048            "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1049            "definition": VALID_DEF,
1050        });
1051        let req = make_request("UpdateStateMachine", &body.to_string());
1052        let err = expect_err(svc.update_state_machine(&req));
1053        assert!(err.to_string().contains("StateMachineDoesNotExist"));
1054    }
1055
1056    // ── StartExecution ──
1057
1058    #[tokio::test]
1059    async fn start_execution_basic() {
1060        let svc = StepFunctionsService::new(make_state());
1061        let arn = create_sm(&svc, "exec-sm");
1062
1063        let body = json!({
1064            "stateMachineArn": arn,
1065            "input": r#"{"key":"value"}"#,
1066        });
1067        let req = make_request("StartExecution", &body.to_string());
1068        let resp = svc.start_execution(&req).unwrap();
1069        let b = body_json(&resp);
1070        assert!(b["executionArn"].as_str().is_some());
1071        assert!(b["startDate"].as_f64().is_some());
1072    }
1073
1074    #[tokio::test]
1075    async fn start_execution_with_name() {
1076        let svc = StepFunctionsService::new(make_state());
1077        let arn = create_sm(&svc, "named-exec");
1078
1079        let body = json!({
1080            "stateMachineArn": arn,
1081            "name": "my-execution",
1082        });
1083        let req = make_request("StartExecution", &body.to_string());
1084        let resp = svc.start_execution(&req).unwrap();
1085        let b = body_json(&resp);
1086        assert!(b["executionArn"].as_str().unwrap().contains("my-execution"));
1087    }
1088
1089    #[tokio::test]
1090    async fn start_execution_sm_not_found() {
1091        let svc = StepFunctionsService::new(make_state());
1092        let body = json!({
1093            "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1094        });
1095        let req = make_request("StartExecution", &body.to_string());
1096        let err = expect_err(svc.start_execution(&req));
1097        assert!(err.to_string().contains("StateMachineDoesNotExist"));
1098    }
1099
1100    #[tokio::test]
1101    async fn start_execution_invalid_input() {
1102        let svc = StepFunctionsService::new(make_state());
1103        let arn = create_sm(&svc, "bad-input");
1104
1105        let body = json!({
1106            "stateMachineArn": arn,
1107            "input": "not json",
1108        });
1109        let req = make_request("StartExecution", &body.to_string());
1110        let err = expect_err(svc.start_execution(&req));
1111        assert!(err.to_string().contains("InvalidExecutionInput"));
1112    }
1113
1114    #[tokio::test]
1115    async fn start_execution_duplicate_name() {
1116        let svc = StepFunctionsService::new(make_state());
1117        let arn = create_sm(&svc, "dup-exec");
1118
1119        let body = json!({
1120            "stateMachineArn": arn,
1121            "name": "same-name",
1122        });
1123        let req = make_request("StartExecution", &body.to_string());
1124        svc.start_execution(&req).unwrap();
1125
1126        let req = make_request("StartExecution", &body.to_string());
1127        let err = expect_err(svc.start_execution(&req));
1128        assert!(err.to_string().contains("ExecutionAlreadyExists"));
1129    }
1130
1131    // ── DescribeExecution ──
1132
1133    #[tokio::test]
1134    async fn describe_execution_found() {
1135        let svc = StepFunctionsService::new(make_state());
1136        let sm_arn = create_sm(&svc, "desc-exec");
1137
1138        let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
1139        let req = make_request("StartExecution", &body.to_string());
1140        let resp = svc.start_execution(&req).unwrap();
1141        let exec_arn = body_json(&resp)["executionArn"]
1142            .as_str()
1143            .unwrap()
1144            .to_string();
1145
1146        let req = make_request(
1147            "DescribeExecution",
1148            &json!({"executionArn": exec_arn}).to_string(),
1149        );
1150        let resp = svc.describe_execution(&req).unwrap();
1151        let b = body_json(&resp);
1152        assert_eq!(b["name"], "e1");
1153        assert_eq!(b["status"], "RUNNING");
1154    }
1155
1156    #[tokio::test]
1157    async fn describe_execution_not_found() {
1158        let svc = StepFunctionsService::new(make_state());
1159        let req = make_request(
1160            "DescribeExecution",
1161            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1162                .to_string(),
1163        );
1164        let err = expect_err(svc.describe_execution(&req));
1165        assert!(err.to_string().contains("ExecutionDoesNotExist"));
1166    }
1167
1168    // ── StopExecution ──
1169
1170    #[tokio::test]
1171    async fn stop_execution() {
1172        let svc = StepFunctionsService::new(make_state());
1173        let sm_arn = create_sm(&svc, "stop-sm");
1174
1175        let body = json!({"stateMachineArn": sm_arn, "name": "stop-e"});
1176        let req = make_request("StartExecution", &body.to_string());
1177        let resp = svc.start_execution(&req).unwrap();
1178        let exec_arn = body_json(&resp)["executionArn"]
1179            .as_str()
1180            .unwrap()
1181            .to_string();
1182
1183        let body = json!({
1184            "executionArn": exec_arn,
1185            "error": "UserAborted",
1186            "cause": "test stop",
1187        });
1188        let req = make_request("StopExecution", &body.to_string());
1189        let resp = svc.stop_execution(&req).unwrap();
1190        let b = body_json(&resp);
1191        assert!(b["stopDate"].as_f64().is_some());
1192
1193        // Verify aborted
1194        let req = make_request(
1195            "DescribeExecution",
1196            &json!({"executionArn": exec_arn}).to_string(),
1197        );
1198        let resp = svc.describe_execution(&req).unwrap();
1199        let b = body_json(&resp);
1200        assert_eq!(b["status"], "ABORTED");
1201        assert_eq!(b["error"], "UserAborted");
1202    }
1203
1204    #[tokio::test]
1205    async fn stop_execution_not_found() {
1206        let svc = StepFunctionsService::new(make_state());
1207        let req = make_request(
1208            "StopExecution",
1209            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1210                .to_string(),
1211        );
1212        let err = expect_err(svc.stop_execution(&req));
1213        assert!(err.to_string().contains("ExecutionDoesNotExist"));
1214    }
1215
1216    // ── ListExecutions ──
1217
1218    #[tokio::test]
1219    async fn list_executions() {
1220        let svc = StepFunctionsService::new(make_state());
1221        let sm_arn = create_sm(&svc, "list-exec");
1222
1223        for i in 0..3 {
1224            let body = json!({"stateMachineArn": sm_arn, "name": format!("e{i}")});
1225            let req = make_request("StartExecution", &body.to_string());
1226            svc.start_execution(&req).unwrap();
1227        }
1228
1229        let req = make_request(
1230            "ListExecutions",
1231            &json!({"stateMachineArn": sm_arn}).to_string(),
1232        );
1233        let resp = svc.list_executions(&req).unwrap();
1234        let b = body_json(&resp);
1235        assert_eq!(b["executions"].as_array().unwrap().len(), 3);
1236    }
1237
1238    #[tokio::test]
1239    async fn list_executions_sm_not_found() {
1240        let svc = StepFunctionsService::new(make_state());
1241        let req = make_request(
1242            "ListExecutions",
1243            &json!({"stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope"})
1244                .to_string(),
1245        );
1246        let err = expect_err(svc.list_executions(&req));
1247        assert!(err.to_string().contains("StateMachineDoesNotExist"));
1248    }
1249
1250    // ── GetExecutionHistory ──
1251
1252    #[tokio::test]
1253    async fn get_execution_history_not_found() {
1254        let svc = StepFunctionsService::new(make_state());
1255        let req = make_request(
1256            "GetExecutionHistory",
1257            &json!({"executionArn": "arn:aws:states:us-east-1:123456789012:execution:sm:nope"})
1258                .to_string(),
1259        );
1260        let err = expect_err(svc.get_execution_history(&req));
1261        assert!(err.to_string().contains("ExecutionDoesNotExist"));
1262    }
1263
1264    // ── DescribeStateMachineForExecution ──
1265
1266    #[tokio::test]
1267    async fn describe_sm_for_execution() {
1268        let svc = StepFunctionsService::new(make_state());
1269        let sm_arn = create_sm(&svc, "sm-for-exec");
1270
1271        let body = json!({"stateMachineArn": sm_arn, "name": "e1"});
1272        let req = make_request("StartExecution", &body.to_string());
1273        let resp = svc.start_execution(&req).unwrap();
1274        let exec_arn = body_json(&resp)["executionArn"]
1275            .as_str()
1276            .unwrap()
1277            .to_string();
1278
1279        let req = make_request(
1280            "DescribeStateMachineForExecution",
1281            &json!({"executionArn": exec_arn}).to_string(),
1282        );
1283        let resp = svc.describe_state_machine_for_execution(&req).unwrap();
1284        let b = body_json(&resp);
1285        assert_eq!(b["name"], "sm-for-exec");
1286    }
1287
1288    // ── Tags ──
1289
1290    #[test]
1291    fn tag_untag_list_tags() {
1292        let svc = StepFunctionsService::new(make_state());
1293        let arn = create_sm(&svc, "tagged-sm");
1294
1295        // Tag
1296        let body = json!({
1297            "resourceArn": arn,
1298            "tags": [{"key": "env", "value": "prod"}],
1299        });
1300        let req = make_request("TagResource", &body.to_string());
1301        svc.tag_resource(&req).unwrap();
1302
1303        // List
1304        let req = make_request(
1305            "ListTagsForResource",
1306            &json!({"resourceArn": arn}).to_string(),
1307        );
1308        let resp = svc.list_tags_for_resource(&req).unwrap();
1309        let b = body_json(&resp);
1310        let tags = b["tags"].as_array().unwrap();
1311        assert_eq!(tags.len(), 1);
1312        assert_eq!(tags[0]["key"], "env");
1313
1314        // Untag
1315        let body = json!({
1316            "resourceArn": arn,
1317            "tagKeys": ["env"],
1318        });
1319        let req = make_request("UntagResource", &body.to_string());
1320        svc.untag_resource(&req).unwrap();
1321
1322        // Verify empty
1323        let req = make_request(
1324            "ListTagsForResource",
1325            &json!({"resourceArn": arn}).to_string(),
1326        );
1327        let resp = svc.list_tags_for_resource(&req).unwrap();
1328        let b = body_json(&resp);
1329        assert!(b["tags"].as_array().unwrap().is_empty());
1330    }
1331
1332    #[test]
1333    fn tag_resource_not_found() {
1334        let svc = StepFunctionsService::new(make_state());
1335        let body = json!({
1336            "resourceArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1337            "tags": [{"key": "k", "value": "v"}],
1338        });
1339        let req = make_request("TagResource", &body.to_string());
1340        let err = expect_err(svc.tag_resource(&req));
1341        assert!(err.to_string().contains("ResourceNotFound"));
1342    }
1343
1344    // ── Helper function tests ──
1345
1346    #[test]
1347    fn test_validate_name() {
1348        assert!(validate_name("valid-name").is_ok());
1349        assert!(validate_name("under_score").is_ok());
1350        assert!(validate_name("").is_err());
1351        assert!(validate_name("has spaces").is_err());
1352        assert!(validate_name(&"a".repeat(81)).is_err());
1353    }
1354
1355    #[test]
1356    fn test_validate_definition() {
1357        assert!(validate_definition(VALID_DEF).is_ok());
1358        assert!(validate_definition("not json").is_err());
1359        assert!(validate_definition(r#"{"States":{}}"#).is_err()); // missing StartAt
1360        assert!(validate_definition(r#"{"StartAt":"S"}"#).is_err()); // missing States
1361    }
1362
1363    #[test]
1364    fn test_validate_arn() {
1365        assert!(validate_arn("arn:aws:states:us-east-1:123:sm:test").is_ok());
1366        assert!(validate_arn("not-an-arn").is_err());
1367    }
1368
1369    #[test]
1370    fn test_camel_to_details_key() {
1371        assert_eq!(camel_to_details_key("PassStateEntered"), "passStateEntered");
1372        assert_eq!(camel_to_details_key(""), "");
1373    }
1374
1375    #[test]
1376    fn test_is_mutating_action() {
1377        assert!(is_mutating_action("CreateStateMachine"));
1378        assert!(is_mutating_action("StartExecution"));
1379        assert!(!is_mutating_action("DescribeStateMachine"));
1380        assert!(!is_mutating_action("ListStateMachines"));
1381    }
1382
1383    // ── StartSyncExecution ──
1384
1385    fn create_express_sm(svc: &StepFunctionsService, name: &str) -> String {
1386        let body = json!({
1387            "name": name,
1388            "definition": VALID_DEF,
1389            "roleArn": "arn:aws:iam::123456789012:role/test",
1390            "type": "EXPRESS",
1391        });
1392        let req = make_request("CreateStateMachine", &body.to_string());
1393        let resp = svc.create_state_machine(&req).unwrap();
1394        let b = body_json(&resp);
1395        b["stateMachineArn"].as_str().unwrap().to_string()
1396    }
1397
1398    #[tokio::test]
1399    async fn start_sync_execution_basic() {
1400        let svc = StepFunctionsService::new(make_state());
1401        let arn = create_express_sm(&svc, "sync-sm");
1402
1403        let body = json!({
1404            "stateMachineArn": arn,
1405            "input": r#"{"key":"value"}"#,
1406        });
1407        let req = make_request("StartSyncExecution", &body.to_string());
1408        let resp = svc.start_sync_execution(&req).await.unwrap();
1409        let b = body_json(&resp);
1410        assert!(b["executionArn"]
1411            .as_str()
1412            .unwrap()
1413            .contains("express:sync-sm"));
1414        assert_eq!(b["stateMachineArn"], arn);
1415        assert_eq!(b["status"], "SUCCEEDED");
1416        assert!(b["startDate"].as_i64().is_some());
1417        assert!(b["stopDate"].as_i64().is_some());
1418        assert!(b["output"].as_str().is_some());
1419        assert!(b["billingDetails"]["billedDurationInMilliseconds"]
1420            .as_i64()
1421            .is_some());
1422    }
1423
1424    #[tokio::test]
1425    async fn start_sync_execution_not_express() {
1426        let svc = StepFunctionsService::new(make_state());
1427        let arn = create_sm(&svc, "std-sm");
1428
1429        let body = json!({"stateMachineArn": arn});
1430        let req = make_request("StartSyncExecution", &body.to_string());
1431        let err = expect_err(svc.start_sync_execution(&req).await);
1432        assert!(err.to_string().contains("StateMachineTypeNotSupported"));
1433    }
1434
1435    #[tokio::test]
1436    async fn start_sync_execution_sm_not_found() {
1437        let svc = StepFunctionsService::new(make_state());
1438        let body = json!({
1439            "stateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:nope",
1440        });
1441        let req = make_request("StartSyncExecution", &body.to_string());
1442        let err = expect_err(svc.start_sync_execution(&req).await);
1443        assert!(err.to_string().contains("StateMachineDoesNotExist"));
1444    }
1445
1446    #[tokio::test]
1447    async fn start_sync_execution_records_introspection_fields() {
1448        let svc = StepFunctionsService::new(make_state());
1449        let arn = create_express_sm(&svc, "sync-introspect");
1450
1451        let body = json!({"stateMachineArn": arn, "input": "{}"});
1452        let req = make_request("StartSyncExecution", &body.to_string());
1453        let resp = svc.start_sync_execution(&req).await.unwrap();
1454        let b = body_json(&resp);
1455        let exec_arn = b["executionArn"].as_str().unwrap().to_string();
1456
1457        let accounts = svc.state.read();
1458        let state = accounts.get("123456789012").unwrap();
1459        let stored = state
1460            .executions
1461            .get(&exec_arn)
1462            .expect("sync execution should be persisted for introspection");
1463        assert!(stored.is_sync, "sync executions must be marked is_sync");
1464        assert_eq!(stored.billed_memory_mb, Some(64));
1465        assert!(
1466            stored.billed_duration_ms.is_some(),
1467            "billed_duration_ms must be populated after sync run"
1468        );
1469        assert!(
1470            stored.parent_execution_arn.is_none(),
1471            "top-level sync execution has no parent"
1472        );
1473    }
1474
1475    #[tokio::test]
1476    async fn start_sync_execution_invalid_input() {
1477        let svc = StepFunctionsService::new(make_state());
1478        let arn = create_express_sm(&svc, "bad-input-sync");
1479
1480        let body = json!({
1481            "stateMachineArn": arn,
1482            "input": "not json",
1483        });
1484        let req = make_request("StartSyncExecution", &body.to_string());
1485        let err = expect_err(svc.start_sync_execution(&req).await);
1486        assert!(err.to_string().contains("InvalidExecutionInput"));
1487    }
1488}