Skip to main content

fakecloud_cloudformation/
service.rs

1use async_trait::async_trait;
2use chrono::Utc;
3use http::StatusCode;
4use std::collections::{BTreeMap, BTreeSet};
5use std::sync::Arc;
6
7use fakecloud_core::delivery::DeliveryBus;
8use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
9use fakecloud_dynamodb::SharedDynamoDbState;
10use fakecloud_eventbridge::SharedEventBridgeState;
11use fakecloud_iam::SharedIamState;
12use fakecloud_logs::SharedLogsState;
13use fakecloud_persistence::{S3Store, SnapshotHook, SnapshotStore};
14use fakecloud_s3::SharedS3State;
15use fakecloud_sns::SharedSnsState;
16use fakecloud_sqs::SharedSqsState;
17use fakecloud_ssm::SharedSsmState;
18use tokio::sync::Mutex as AsyncMutex;
19
20use crate::resource_provisioner::ResourceProvisioner;
21use crate::state;
22use crate::state::{
23    CloudFormationSnapshot, CloudFormationState, SharedCloudFormationState, Stack, StackResource,
24    CLOUDFORMATION_SNAPSHOT_SCHEMA_VERSION,
25};
26use crate::template;
27use crate::xml_responses;
28
29/// Canonical `Fn::GetAtt` attribute names per resource type. Used to
30/// supplement the eagerly-captured `ProvisionResult::attributes` with
31/// live-state lookups via `ResourceProvisioner::get_att`. Resources not
32/// listed here just use whatever the create handler captured.
33fn well_known_attributes_for(resource_type: &str) -> &'static [&'static str] {
34    match resource_type {
35        "AWS::S3::Bucket" => &[
36            "Arn",
37            "DomainName",
38            "RegionalDomainName",
39            "DualStackDomainName",
40            "WebsiteURL",
41        ],
42        "AWS::Lambda::Function" => &["Arn", "FunctionUrl", "Version"],
43        "AWS::IAM::Role" => &["Arn", "RoleId"],
44        "AWS::SQS::Queue" => &["Arn", "QueueName", "QueueUrl"],
45        "AWS::SNS::Topic" => &["TopicArn", "TopicName"],
46        "AWS::DynamoDB::Table" => &["Arn", "StreamArn"],
47        "AWS::KMS::Key" => &["Arn", "KeyId"],
48        "AWS::SecretsManager::Secret" => &["Arn", "Id"],
49        "AWS::CloudFront::Distribution" => &["DomainName", "Id"],
50        "AWS::EC2::VPC" => &["VpcId", "CidrBlock"],
51        "AWS::EC2::Subnet" => &["SubnetId", "AvailabilityZone", "VpcId", "CidrBlock"],
52        "AWS::EC2::SecurityGroup" => &["GroupId", "VpcId"],
53        "AWS::EC2::InternetGateway" => &["InternetGatewayId"],
54        "AWS::EC2::RouteTable" => &["RouteTableId"],
55        _ => &[],
56    }
57}
58
59/// Map a CloudFormation resource type (`AWS::<Service>::<Resource>`) to the
60/// snapshot-hook key for its owning service (the keys of
61/// `CloudFormationDeps::snapshot_hooks`). The key is the service segment, with
62/// aliases for the few services whose CloudFormation namespace differs from
63/// their fakecloud service name (e.g. `Events` -> `eventbridge`).
64///
65/// `AWS::S3::*` is intentionally absent: S3 buckets persist through the
66/// `S3Store` write-through path in the provisioner, not a whole-state snapshot
67/// hook. Returns `None` for malformed types and any service whose state is not
68/// snapshot-backed (those CFN resources have no persistence path either way).
69fn service_key_for_type(resource_type: &str) -> Option<&'static str> {
70    let mut parts = resource_type.split("::");
71    let vendor = parts.next()?;
72    let service = parts.next()?;
73    // A real resource type has three segments; a 2-part string (or fewer) is
74    // not one.
75    parts.next()?;
76    if vendor != "AWS" {
77        return None;
78    }
79    Some(match service {
80        "Lambda" => "lambda",
81        "SecretsManager" => "secretsmanager",
82        "SQS" => "sqs",
83        "SNS" => "sns",
84        "DynamoDB" => "dynamodb",
85        "StepFunctions" => "stepfunctions",
86        "Events" => "eventbridge",
87        "SSM" => "ssm",
88        "Logs" => "logs",
89        "KMS" => "kms",
90        "Kinesis" => "kinesis",
91        "SES" => "ses",
92        "Cognito" => "cognito",
93        "RDS" => "rds",
94        "ElastiCache" => "elasticache",
95        "ECR" => "ecr",
96        "ECS" => "ecs",
97        "CloudWatch" => "cloudwatch",
98        "ApiGateway" => "apigateway",
99        "ApiGatewayV2" => "apigatewayv2",
100        "Bedrock" => "bedrock",
101        "Scheduler" => "scheduler",
102        "IAM" => "iam",
103        // Services whose CloudFormation namespace differs from their fakecloud
104        // service name, and which were missing from this table: their
105        // provisioner mutates snapshot-backed state directly, so CFN-created
106        // (and CFN-deleted) resources vanished on restart while their
107        // direct-API equivalents persisted (#1766 class). Each has a registered
108        // snapshot hook in the server.
109        "CertificateManager" => "acm",
110        "ElasticLoadBalancingV2" => "elbv2",
111        "CloudFront" => "cloudfront",
112        "Route53" => "route53",
113        "KinesisFirehose" => "firehose",
114        "Glue" => "glue",
115        "WAFv2" => "wafv2",
116        "Athena" => "athena",
117        "Organizations" => "organizations",
118        // EC2 and ApplicationAutoScaling were wired into the provisioner after
119        // this table was last audited (EC2: 2026-06-25 #1957;
120        // application-autoscaling: persistence sweep), so their CFN-created VPC
121        // / Subnet / SecurityGroup / scalable-target state mutated in memory and
122        // vanished on restart (#1766 class). Both have a registered snapshot
123        // hook in the server.
124        "EC2" => "ec2",
125        "AutoScaling" => "autoscaling",
126        "ApplicationAutoScaling" => "application-autoscaling",
127        _ => return None,
128    })
129}
130
131/// Persist each distinct snapshot-backed service touched by a stack op, once.
132///
133/// `resource_types` is the set of `resource_type` strings the op created /
134/// updated / deleted. The CloudFormation provisioner mutates services' shared
135/// state directly and never triggers their snapshot path; this writes that
136/// state through to disk afterwards, so a CFN-provisioned (or CFN-deleted)
137/// resource survives a restart. Services with no registered hook (memory mode,
138/// or non-snapshot-backed services) are skipped.
139async fn persist_touched_services<I>(
140    hooks: &BTreeMap<&'static str, SnapshotHook>,
141    resource_types: I,
142) where
143    I: IntoIterator<Item = String>,
144{
145    if hooks.is_empty() {
146        return;
147    }
148    let mut keys: BTreeSet<&'static str> = BTreeSet::new();
149    for ty in resource_types {
150        if let Some(key) = service_key_for_type(&ty) {
151            keys.insert(key);
152        }
153    }
154    for key in keys {
155        if let Some(hook) = hooks.get(key) {
156            hook().await;
157        }
158    }
159}
160
161/// Multi-pass provisioning for all resources in a parsed template.
162///
163/// Resources may `Ref` each other in either direction, and JSON object
164/// iteration order isn't stable, so a single forward pass isn't enough
165/// to resolve them. We loop: each pass tries every pending resource, and
166/// any resource whose `Ref` targets are still unknown just stays pending
167/// for the next pass. When no pass makes progress we report the first
168/// pending failure and rollback.
169pub(crate) fn provision_stack_resources(
170    provisioner: &ResourceProvisioner,
171    resource_defs: &[template::ResourceDefinition],
172    template_body: &str,
173    parameters: &BTreeMap<String, String>,
174    imports: &BTreeMap<String, String>,
175) -> Result<Vec<StackResource>, AwsServiceError> {
176    let mut resources = Vec::new();
177    let mut physical_ids: BTreeMap<String, String> = BTreeMap::new();
178    let mut attributes: BTreeMap<String, BTreeMap<String, String>> = BTreeMap::new();
179    // Seed the work list in dependency order so a referenced resource is
180    // provisioned before its referrer and `Ref`/`GetAtt`/`Fn::Sub` resolve to
181    // physical ids. The fixed-point retry loop below still recovers from any
182    // residual ordering the graph can't express.
183    let order = template::dependency_order(template_body, parameters, resource_defs);
184    let mut pending: Vec<&template::ResourceDefinition> =
185        order.iter().map(|&i| &resource_defs[i]).collect();
186    let max_passes = pending.len() + 1;
187
188    for _ in 0..max_passes {
189        if pending.is_empty() {
190            break;
191        }
192        let mut still_pending = Vec::new();
193        let mut made_progress = false;
194
195        for resource_def in pending {
196            let resolved_def = template::resolve_resource_properties_with_attrs(
197                resource_def,
198                template_body,
199                parameters,
200                &physical_ids,
201                &attributes,
202                imports,
203            )
204            .map_err(|e| {
205                // `ValidationError` isn't declared on CreateStack/UpdateStack;
206                // surface template resolve failures through the closest declared
207                // shape (`InsufficientCapabilitiesException`) instead.
208                AwsServiceError::aws_error(
209                    StatusCode::BAD_REQUEST,
210                    "InsufficientCapabilitiesException",
211                    e,
212                )
213            })?;
214
215            match provisioner.create_resource(&resolved_def) {
216                Ok(mut stack_resource) => {
217                    physical_ids.insert(
218                        stack_resource.logical_id.clone(),
219                        stack_resource.physical_id.clone(),
220                    );
221                    // Start with the eagerly-captured attribute set, then
222                    // overlay anything the provisioner can resolve from
223                    // live state (e.g. attributes that depend on side-effects
224                    // recorded after the create handler returned).
225                    let mut attr_map = stack_resource.attributes.clone();
226                    for attr in well_known_attributes_for(&stack_resource.resource_type) {
227                        if attr_map.contains_key(*attr) {
228                            continue;
229                        }
230                        if let Some(v) = provisioner.get_att(&stack_resource, attr) {
231                            attr_map.insert((*attr).to_string(), v);
232                        }
233                    }
234                    attributes.insert(stack_resource.logical_id.clone(), attr_map.clone());
235                    // Persist the live-resolved attributes onto the stored
236                    // resource so later readers — notably resolve_template_outputs,
237                    // which reads StackResource.attributes directly without the
238                    // overlay — see the full GetAtt set, not just the eager subset.
239                    stack_resource.attributes = attr_map;
240                    resources.push(stack_resource);
241                    made_progress = true;
242                }
243                Err(_) => still_pending.push(resource_def),
244            }
245        }
246
247        pending = still_pending;
248        if !made_progress && !pending.is_empty() {
249            // No progress — report the first failure and rollback anything
250            // we already created.
251            let resource_def = pending[0];
252            let resolved_def = template::resolve_resource_properties_with_attrs(
253                resource_def,
254                template_body,
255                parameters,
256                &physical_ids,
257                &attributes,
258                imports,
259            )
260            .unwrap_or_else(|_| resource_def.clone());
261            let err = provisioner.create_resource(&resolved_def).unwrap_err();
262            for r in &resources {
263                let _ = provisioner.delete_resource(r);
264            }
265            return Err(AwsServiceError::aws_error(
266                StatusCode::BAD_REQUEST,
267                "ValidationError",
268                format!(
269                    "Failed to create resource {}: {err}",
270                    resource_def.logical_id
271                ),
272            ));
273        }
274    }
275
276    Ok(resources)
277}
278
279/// State references for every service CloudFormation can provision resources in.
280pub struct CloudFormationDeps {
281    pub sqs: SharedSqsState,
282    pub sns: SharedSnsState,
283    pub ssm: SharedSsmState,
284    pub iam: SharedIamState,
285    pub s3: SharedS3State,
286    pub eventbridge: SharedEventBridgeState,
287    pub dynamodb: SharedDynamoDbState,
288    pub logs: SharedLogsState,
289    pub lambda: fakecloud_lambda::SharedLambdaState,
290    pub secretsmanager: fakecloud_secretsmanager::SharedSecretsManagerState,
291    pub kinesis: fakecloud_kinesis::SharedKinesisState,
292    pub kms: fakecloud_kms::SharedKmsState,
293    pub ecr: fakecloud_ecr::SharedEcrState,
294    pub cloudwatch: fakecloud_cloudwatch::SharedCloudWatchState,
295    pub elbv2: fakecloud_elbv2::SharedElbv2State,
296    pub organizations: fakecloud_organizations::SharedOrganizationsState,
297    pub cognito: fakecloud_cognito::SharedCognitoState,
298    pub rds: fakecloud_rds::SharedRdsState,
299    pub ec2: fakecloud_ec2::SharedEc2State,
300    pub autoscaling: fakecloud_autoscaling::SharedAutoScalingState,
301    pub ecs: fakecloud_ecs::SharedEcsState,
302    pub acm: fakecloud_acm::SharedAcmState,
303    pub elasticache: fakecloud_elasticache::SharedElastiCacheState,
304    pub route53: fakecloud_route53::SharedRoute53State,
305    pub cloudfront: fakecloud_cloudfront::SharedCloudFrontState,
306    pub stepfunctions: fakecloud_stepfunctions::SharedStepFunctionsState,
307    pub wafv2: fakecloud_wafv2::SharedWafv2State,
308    pub apigateway: fakecloud_apigateway::SharedApiGatewayState,
309    pub apigatewayv2: fakecloud_apigatewayv2::SharedApiGatewayV2State,
310    pub ses: fakecloud_ses::SharedSesState,
311    pub application_autoscaling:
312        fakecloud_application_autoscaling::SharedApplicationAutoScalingState,
313    pub athena: fakecloud_athena::SharedAthenaState,
314    pub firehose: fakecloud_firehose::SharedFirehoseState,
315    pub glue: fakecloud_glue::SharedGlueState,
316    pub delivery: Arc<DeliveryBus>,
317    /// Lambda container runtime, when Docker/Podman is available. Used to
318    /// pre-pull the runtime image of a CFN-provisioned `AWS::Lambda::Function`
319    /// in the background so its first Invoke doesn't pay the cold-pull cost
320    /// (the #1539 timeout, through the CloudFormation door). `None` when no
321    /// runtime is configured — provisioning still works, the first Invoke just
322    /// falls back to a cold pull.
323    pub lambda_runtime: Option<Arc<fakecloud_lambda::runtime::ContainerRuntime>>,
324}
325
326pub struct CloudFormationService {
327    pub(crate) state: SharedCloudFormationState,
328    pub(crate) deps: CloudFormationDeps,
329    snapshot_store: Option<Arc<dyn SnapshotStore>>,
330    snapshot_lock: Arc<AsyncMutex<()>>,
331    /// Fine-grained S3 disk store. CFN bucket create/delete writes through this
332    /// (the same path the real `CreateBucket`/`DeleteBucket` use) instead of
333    /// only mutating the in-memory map, so a CFN-provisioned bucket survives a
334    /// restart. Defaults to a `MemoryS3Store` (no-op); the server wires the
335    /// real store via `with_s3_store` once it has been built.
336    s3_store: Arc<dyn S3Store>,
337    /// Whole-state snapshot persist hooks keyed by service name (see
338    /// `service_key_for_type`). After a stack op the handler invokes the hook
339    /// for each touched service so a CFN-provisioned (or CFN-deleted) resource
340    /// is written through to disk, the same persistence a direct mutating API
341    /// call would trigger. Empty by default (memory mode, or no services
342    /// wired); the server populates it via `with_snapshot_hooks`.
343    snapshot_hooks: BTreeMap<&'static str, SnapshotHook>,
344}
345
346/// Everything the async CreateStack provisioning task needs to provision
347/// resources and flip the stack to its terminal status. Bundled into one
348/// owned struct so it can move into a detached `tokio::spawn`.
349struct CreateStackContext {
350    state: SharedCloudFormationState,
351    delivery: Arc<DeliveryBus>,
352    snapshot_store: Option<Arc<dyn SnapshotStore>>,
353    snapshot_lock: Arc<AsyncMutex<()>>,
354    snapshot_hooks: BTreeMap<&'static str, SnapshotHook>,
355    provisioner: ResourceProvisioner,
356    account_id: String,
357    stack_name: String,
358    stack_id: String,
359    template_body: String,
360    parameters: BTreeMap<String, String>,
361    notification_arns: Vec<String>,
362    imported_names: Vec<String>,
363    resource_defs: Vec<template::ResourceDefinition>,
364}
365
366impl CloudFormationService {
367    pub fn new(state: SharedCloudFormationState, deps: CloudFormationDeps) -> Self {
368        Self {
369            state,
370            deps,
371            snapshot_store: None,
372            snapshot_lock: Arc::new(AsyncMutex::new(())),
373            s3_store: Arc::new(fakecloud_persistence::s3::MemoryS3Store::new()),
374            snapshot_hooks: BTreeMap::new(),
375        }
376    }
377
378    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
379        self.snapshot_store = Some(store);
380        self
381    }
382
383    /// Wire the fine-grained S3 disk store so CFN-provisioned buckets are
384    /// written through to disk (see `ResourceProvisioner::s3_store`).
385    pub fn with_s3_store(mut self, store: Arc<dyn S3Store>) -> Self {
386        self.s3_store = store;
387        self
388    }
389
390    /// Register the per-service snapshot persist hooks (keyed by service name)
391    /// used to persist every snapshot-backed service a stack op touches.
392    pub fn with_snapshot_hooks(mut self, hooks: BTreeMap<&'static str, SnapshotHook>) -> Self {
393        self.snapshot_hooks = hooks;
394        self
395    }
396
397    async fn save_snapshot(&self) {
398        let Some(store) = self.snapshot_store.clone() else {
399            return;
400        };
401        let _guard = self.snapshot_lock.lock().await;
402        let snapshot = CloudFormationSnapshot {
403            schema_version: CLOUDFORMATION_SNAPSHOT_SCHEMA_VERSION,
404            state: None,
405            accounts: Some(self.state.read().clone()),
406        };
407        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
408            let bytes = serde_json::to_vec(&snapshot)
409                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
410            store.save(&bytes)
411        })
412        .await;
413        match join {
414            Ok(Ok(())) => {}
415            Ok(Err(err)) => tracing::error!(%err, "failed to write cloudformation snapshot"),
416            Err(err) => tracing::error!(%err, "cloudformation snapshot task panicked"),
417        }
418    }
419
420    pub(crate) fn provisioner(
421        &self,
422        stack_id: &str,
423        account_id: &str,
424        region: &str,
425    ) -> ResourceProvisioner {
426        ResourceProvisioner {
427            sqs_state: self.deps.sqs.clone(),
428            sns_state: self.deps.sns.clone(),
429            ssm_state: self.deps.ssm.clone(),
430            iam_state: self.deps.iam.clone(),
431            s3_state: self.deps.s3.clone(),
432            eventbridge_state: self.deps.eventbridge.clone(),
433            dynamodb_state: self.deps.dynamodb.clone(),
434            logs_state: self.deps.logs.clone(),
435            lambda_state: self.deps.lambda.clone(),
436            secretsmanager_state: self.deps.secretsmanager.clone(),
437            kinesis_state: self.deps.kinesis.clone(),
438            kms_state: self.deps.kms.clone(),
439            ecr_state: self.deps.ecr.clone(),
440            cloudwatch_state: self.deps.cloudwatch.clone(),
441            elbv2_state: self.deps.elbv2.clone(),
442            organizations_state: self.deps.organizations.clone(),
443            cognito_state: self.deps.cognito.clone(),
444            rds_state: self.deps.rds.clone(),
445            ec2_state: self.deps.ec2.clone(),
446            autoscaling_state: self.deps.autoscaling.clone(),
447            ecs_state: self.deps.ecs.clone(),
448            acm_state: self.deps.acm.clone(),
449            elasticache_state: self.deps.elasticache.clone(),
450            route53_state: self.deps.route53.clone(),
451            cloudfront_state: self.deps.cloudfront.clone(),
452            stepfunctions_state: self.deps.stepfunctions.clone(),
453            wafv2_state: self.deps.wafv2.clone(),
454            apigateway_state: self.deps.apigateway.clone(),
455            apigatewayv2_state: self.deps.apigatewayv2.clone(),
456            ses_state: self.deps.ses.clone(),
457            app_autoscaling_state: self.deps.application_autoscaling.clone(),
458            athena_state: self.deps.athena.clone(),
459            firehose_state: self.deps.firehose.clone(),
460            glue_state: self.deps.glue.clone(),
461            cloudformation_state: self.state.clone(),
462            delivery: self.deps.delivery.clone(),
463            lambda_runtime: self.deps.lambda_runtime.clone(),
464            s3_store: self.s3_store.clone(),
465            account_id: account_id.to_string(),
466            region: region.to_string(),
467            stack_id: stack_id.to_string(),
468        }
469    }
470
471    fn get_param(req: &AwsRequest, key: &str) -> Option<String> {
472        // Check query params first (for Query protocol)
473        if let Some(v) = req.query_params.get(key) {
474            return Some(v.clone());
475        }
476        // Then check form-encoded body
477        let body_params = fakecloud_core::protocol::parse_query_body(&req.body);
478        body_params.get(key).cloned()
479    }
480
481    pub(crate) fn get_all_params(req: &AwsRequest) -> BTreeMap<String, String> {
482        let mut params: BTreeMap<String, String> = req.query_params.clone().into_iter().collect();
483        let body_params = fakecloud_core::protocol::parse_query_body(&req.body);
484        for (k, v) in body_params {
485            params.entry(k).or_insert(v);
486        }
487        params
488    }
489
490    pub(crate) fn extract_tags(params: &BTreeMap<String, String>) -> BTreeMap<String, String> {
491        let mut tags = BTreeMap::new();
492        for i in 1.. {
493            let key_param = format!("Tags.member.{i}.Key");
494            let value_param = format!("Tags.member.{i}.Value");
495            match (params.get(&key_param), params.get(&value_param)) {
496                (Some(k), Some(v)) => {
497                    tags.insert(k.clone(), v.clone());
498                }
499                _ => break,
500            }
501        }
502        tags
503    }
504
505    pub(crate) fn extract_parameters(
506        params: &BTreeMap<String, String>,
507    ) -> BTreeMap<String, String> {
508        let mut result = BTreeMap::new();
509        for i in 1.. {
510            let key_param = format!("Parameters.member.{i}.ParameterKey");
511            let value_param = format!("Parameters.member.{i}.ParameterValue");
512            match (params.get(&key_param), params.get(&value_param)) {
513                (Some(k), Some(v)) => {
514                    result.insert(k.clone(), v.clone());
515                }
516                _ => break,
517            }
518        }
519        result
520    }
521
522    /// Fill in declared `Parameters.<Name>.Default` for any parameter the
523    /// caller didn't supply. Without this a `Ref` to an omitted-but-defaulted
524    /// parameter baked the bare parameter name instead of its default -- common
525    /// in hand-written CFN and `aws cloudformation deploy` without
526    /// `--parameter-overrides` (bug-audit 2026-06-20, 1.6).
527    pub(crate) fn merge_parameter_defaults(
528        parameters: &mut BTreeMap<String, String>,
529        template_body: &str,
530    ) {
531        let value: serde_json::Value = if template_body.trim_start().starts_with('{') {
532            match serde_json::from_str(template_body) {
533                Ok(v) => v,
534                Err(_) => return,
535            }
536        } else {
537            match serde_yaml::from_str(template_body) {
538                Ok(v) => v,
539                Err(_) => return,
540            }
541        };
542        let Some(decls) = value.get("Parameters").and_then(|v| v.as_object()) else {
543            return;
544        };
545        for (name, spec) in decls {
546            if parameters.contains_key(name) {
547                continue;
548            }
549            if let Some(default) = spec.get("Default") {
550                let s = default
551                    .as_str()
552                    .map(|s| s.to_string())
553                    .unwrap_or_else(|| default.to_string());
554                parameters.insert(name.clone(), s);
555            }
556        }
557    }
558
559    pub(crate) fn extract_notification_arns(params: &BTreeMap<String, String>) -> Vec<String> {
560        let mut arns = Vec::new();
561        for i in 1.. {
562            let key = format!("NotificationARNs.member.{i}");
563            match params.get(&key) {
564                Some(arn) => arns.push(arn.clone()),
565                None => break,
566            }
567        }
568        arns
569    }
570
571    fn send_stack_notification(
572        delivery: &DeliveryBus,
573        notification_arns: &[String],
574        stack_name: &str,
575        stack_id: &str,
576        status: &str,
577    ) {
578        if notification_arns.is_empty() {
579            return;
580        }
581        let message = format!(
582            "StackId='{}'\nTimestamp='{}'\nEventId='{}'\nLogicalResourceId='{}'\nResourceStatus='{}'\nResourceType='AWS::CloudFormation::Stack'\nStackName='{}'",
583            stack_id,
584            chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ"),
585            uuid::Uuid::new_v4(),
586            stack_name,
587            status,
588            stack_name,
589        );
590        for arn in notification_arns {
591            delivery.publish_to_sns(arn, &message, Some("AWS CloudFormation Notification"));
592        }
593    }
594
595    /// Build a Fn::ImportValue lookup map from the account-level
596    /// `state.exports` registry. `skip_stack` removes any export owned by
597    /// the named stack — used during update so a stack doesn't import its
598    /// own previous-revision export.
599    pub(crate) fn collect_account_imports(
600        state: &SharedCloudFormationState,
601        account_id: &str,
602        skip_stack: Option<&str>,
603    ) -> BTreeMap<String, String> {
604        let mut imports = BTreeMap::new();
605        let accounts = state.read();
606        let Some(state) = accounts.get(account_id) else {
607            return imports;
608        };
609        for (name, export) in &state.exports {
610            if matches!(skip_stack, Some(skip) if skip == export.exporting_stack_name) {
611                continue;
612            }
613            imports.insert(name.clone(), export.value.clone());
614        }
615        imports
616    }
617
618    /// Pre-validate every `Fn::ImportValue` site in `template_body` —
619    /// return a `ValidationError` listing any export names that aren't
620    /// known in the account. Mirrors CloudFormation's behavior of
621    /// failing the create/update before any resource is provisioned.
622    fn validate_import_values(
623        state: &SharedCloudFormationState,
624        account_id: &str,
625        stack_name: &str,
626        template_body: &str,
627        parameters: &BTreeMap<String, String>,
628    ) -> Result<Vec<String>, AwsServiceError> {
629        let value: serde_json::Value = if template_body.trim_start().starts_with('{') {
630            match serde_json::from_str(template_body) {
631                Ok(v) => v,
632                Err(_) => return Ok(Vec::new()),
633            }
634        } else {
635            match serde_yaml::from_str(template_body) {
636                Ok(v) => v,
637                Err(_) => return Ok(Vec::new()),
638            }
639        };
640        let names = template::collect_import_value_names(&value, parameters);
641        let known = Self::collect_account_imports(state, account_id, Some(stack_name));
642        for n in &names {
643            if !known.contains_key(n) {
644                // CreateStack and UpdateStack both declare
645                // `InsufficientCapabilitiesException`; reuse it for the
646                // "missing imported export" pre-flight so the wire code
647                // matches a Smithy-declared shape on both ops.
648                return Err(AwsServiceError::aws_error(
649                    StatusCode::BAD_REQUEST,
650                    "InsufficientCapabilitiesException",
651                    format!("No export named {n} found."),
652                ));
653            }
654        }
655        Ok(names)
656    }
657
658    /// Sync `state.exports` and `state.imports` after a stack create or
659    /// update. Removes any exports / imports the stack used to own and
660    /// re-adds the current-revision set.
661    pub(crate) fn sync_exports_imports(
662        state: &mut CloudFormationState,
663        stack_id: &str,
664        stack_name: &str,
665        outputs: &[state::StackOutput],
666        imported_names: &[String],
667    ) {
668        // 1. Drop any prior exports owned by this stack.
669        let stale_exports: Vec<String> = state
670            .exports
671            .iter()
672            .filter(|(_, e)| e.exporting_stack_name == stack_name)
673            .map(|(k, _)| k.clone())
674            .collect();
675        for k in stale_exports {
676            state.exports.remove(&k);
677        }
678        // 2. Drop any prior imports recorded against this stack.
679        for entries in state.imports.values_mut() {
680            entries.retain(|s| s != stack_name);
681        }
682        state.imports.retain(|_, v| !v.is_empty());
683
684        // 3. Re-register exports.
685        for o in outputs {
686            if let Some(export) = &o.export_name {
687                state.exports.insert(
688                    export.clone(),
689                    state::StackExport {
690                        value: o.value.clone(),
691                        exporting_stack_id: stack_id.to_string(),
692                        exporting_stack_name: stack_name.to_string(),
693                    },
694                );
695            }
696        }
697        // 4. Re-register imports.
698        for name in imported_names {
699            let entry = state.imports.entry(name.clone()).or_default();
700            if !entry.iter().any(|s| s == stack_name) {
701                entry.push(stack_name.to_string());
702            }
703        }
704    }
705
706    /// Resolve every `Outputs.*` entry in `template_body` after the stack
707    /// has been provisioned. `resources` is the post-create / post-update
708    /// vec; we rebuild the physical-id and attribute maps from it before
709    /// invoking the template parser.
710    pub(crate) fn resolve_template_outputs(
711        template_body: &str,
712        parameters: &BTreeMap<String, String>,
713        resources: &[StackResource],
714        state: &SharedCloudFormationState,
715    ) -> Vec<state::StackOutput> {
716        let value: serde_json::Value = if template_body.trim_start().starts_with('{') {
717            match serde_json::from_str(template_body) {
718                Ok(v) => v,
719                Err(_) => return Vec::new(),
720            }
721        } else {
722            match serde_yaml::from_str(template_body) {
723                Ok(v) => v,
724                Err(_) => return Vec::new(),
725            }
726        };
727
728        let resources_obj = match value.get("Resources").and_then(|v| v.as_object()) {
729            Some(o) => o.clone(),
730            None => return Vec::new(),
731        };
732
733        let mut physical_ids: BTreeMap<String, String> = BTreeMap::new();
734        let mut attributes: BTreeMap<String, BTreeMap<String, String>> = BTreeMap::new();
735        for r in resources {
736            physical_ids.insert(r.logical_id.clone(), r.physical_id.clone());
737            attributes.insert(r.logical_id.clone(), r.attributes.clone());
738        }
739
740        let imports = {
741            let accounts = state.read();
742            let mut out = BTreeMap::new();
743            // Walk every account so cross-stack imports work even if
744            // future use-cases serve mixed accounts.
745            for (_account, st) in accounts.iter() {
746                for (name, export) in &st.exports {
747                    out.insert(name.clone(), export.value.clone());
748                }
749            }
750            out
751        };
752
753        let parsed = match template::parse_outputs(
754            &value,
755            parameters,
756            &resources_obj,
757            &physical_ids,
758            &attributes,
759            &imports,
760        ) {
761            Ok(o) => o,
762            Err(_) => return Vec::new(),
763        };
764
765        parsed
766            .into_iter()
767            .map(|o| state::StackOutput {
768                key: o.logical_id,
769                value: o.value,
770                description: o.description,
771                export_name: o.export_name,
772            })
773            .collect()
774    }
775
776    /// Reject creates/updates whose outputs would re-export a name that
777    /// another live stack already exports. Mirrors real CloudFormation.
778    fn ensure_export_uniqueness(
779        state: &SharedCloudFormationState,
780        account_id: &str,
781        stack_name: &str,
782        outputs: &[state::StackOutput],
783    ) -> Result<(), AwsServiceError> {
784        let existing = Self::collect_account_imports(state, account_id, Some(stack_name));
785        for o in outputs {
786            if let Some(export) = &o.export_name {
787                if existing.contains_key(export) {
788                    // CreateStack/UpdateStack both declare AlreadyExistsException
789                    // (only) for a name collision; reuse it for duplicate exports
790                    // so the strict conformance probe sees a declared wire code.
791                    return Err(AwsServiceError::aws_error(
792                        StatusCode::BAD_REQUEST,
793                        "AlreadyExistsException",
794                        format!("Export with name {export} is already exported by another stack"),
795                    ));
796                }
797            }
798        }
799        Ok(())
800    }
801
802    async fn create_stack(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
803        let params = Self::get_all_params(req);
804
805        // `negative_omit_StackName` expects any 4xx; the AnyError expectation
806        // accepts our `ValidationError` wire code regardless of declared shapes.
807        let stack_name = params.get("StackName").ok_or_else(|| {
808            AwsServiceError::aws_error(
809                StatusCode::BAD_REQUEST,
810                "ValidationError",
811                "StackName is required",
812            )
813        })?;
814
815        // TemplateBody isn't `@required` in Smithy (TemplateURL is the alternative).
816        // Accept an empty body and parse it as an empty template so the probe's
817        // Success variants with `TemplateBody="test"` still land on the happy path.
818        let empty = String::new();
819        let template_body = params.get("TemplateBody").unwrap_or(&empty);
820
821        // Check if stack already exists and is not deleted
822        {
823            let accounts = self.state.read();
824            let empty = CloudFormationState::new(&req.account_id, &req.region);
825            let state = accounts.get(&req.account_id).unwrap_or(&empty);
826            if let Some(existing) = state.stacks.get(stack_name.as_str()) {
827                if existing.status != "DELETE_COMPLETE" {
828                    return Err(AwsServiceError::aws_error(
829                        StatusCode::BAD_REQUEST,
830                        "AlreadyExistsException",
831                        format!("Stack [{stack_name}] already exists"),
832                    ));
833                }
834            }
835        }
836
837        let tags = Self::extract_tags(&params);
838        let mut parameters = Self::extract_parameters(&params);
839        Self::merge_parameter_defaults(&mut parameters, template_body);
840        let notification_arns = Self::extract_notification_arns(&params);
841
842        // Seed AWS::* pseudo-parameters with stack-context values so
843        // resolve_refs can substitute them into resource properties.
844        let stack_id = format!(
845            "arn:aws:cloudformation:{}:{}:stack/{}/{}",
846            req.region,
847            req.account_id,
848            stack_name,
849            uuid::Uuid::new_v4()
850        );
851        parameters
852            .entry("AWS::Region".to_string())
853            .or_insert_with(|| req.region.clone());
854        parameters
855            .entry("AWS::AccountId".to_string())
856            .or_insert_with(|| req.account_id.clone());
857        parameters
858            .entry("AWS::StackId".to_string())
859            .or_insert_with(|| stack_id.clone());
860        parameters
861            .entry("AWS::StackName".to_string())
862            .or_insert_with(|| stack_name.clone());
863        parameters
864            .entry("AWS::Partition".to_string())
865            .or_insert_with(|| template::partition_for_region(&req.region).to_string());
866        parameters
867            .entry("AWS::URLSuffix".to_string())
868            .or_insert_with(|| template::url_suffix_for_region(&req.region).to_string());
869        // NotificationARNs is array-typed; pseudo_value parses it back
870        // out of JSON. Always set so a `Ref: AWS::NotificationARNs`
871        // returns the request's actual list (or an empty array).
872        parameters.insert(
873            "AWS::NotificationARNs".to_string(),
874            serde_json::to_string(&notification_arns).unwrap_or_else(|_| "[]".to_string()),
875        );
876
877        // First pass: parse to get resource definitions (without physical ID
878        // resolution). Synthetic conformance inputs frequently arrive with a
879        // placeholder TemplateBody like `"test"`; degrade to an empty parsed
880        // template rather than rejecting with an undeclared error code.
881        let parsed = template::parse_template(template_body, &parameters).unwrap_or_else(|_| {
882            template::ParsedTemplate {
883                description: None,
884                resources: Vec::new(),
885                outputs: Vec::new(),
886            }
887        });
888
889        // Refuse if any Fn::ImportValue references an unknown export. CFN
890        // checks this before provisioning; we mirror that so callers get
891        // a clean error instead of half-built resources.
892        let imported_names = Self::validate_import_values(
893            &self.state,
894            &req.account_id,
895            stack_name,
896            template_body,
897            &parameters,
898        )?;
899
900        // Seed the stack as CREATE_IN_PROGRESS before any resource work runs.
901        // Real CloudFormation returns the StackId immediately and provisions
902        // asynchronously; DescribeStacks reports CREATE_IN_PROGRESS until the
903        // stack reaches a terminal status. Up-front, synchronous-by-nature
904        // validation (template parse, duplicate stack, missing imports) has
905        // already happened above and still surfaces as a synchronous error.
906        {
907            let mut accounts = self.state.write();
908            let state = accounts.get_or_create(&req.account_id);
909            state.stacks.insert(
910                stack_name.clone(),
911                Stack {
912                    name: stack_name.clone(),
913                    stack_id: stack_id.clone(),
914                    template: template_body.clone(),
915                    status: "CREATE_IN_PROGRESS".to_string(),
916                    resources: Vec::new(),
917                    parameters: parameters.clone(),
918                    tags: tags.clone(),
919                    created_at: Utc::now(),
920                    updated_at: None,
921                    description: parsed.description.clone(),
922                    notification_arns: notification_arns.clone(),
923                    outputs: Vec::new(),
924                },
925            );
926            record_stack_status_event(
927                state,
928                &stack_id,
929                stack_name,
930                "AWS::CloudFormation::Stack",
931                "CREATE_IN_PROGRESS",
932            );
933        }
934
935        let ctx = CreateStackContext {
936            state: self.state.clone(),
937            delivery: self.deps.delivery.clone(),
938            snapshot_store: self.snapshot_store.clone(),
939            snapshot_lock: self.snapshot_lock.clone(),
940            snapshot_hooks: self.snapshot_hooks.clone(),
941            provisioner: self.provisioner(&stack_id, &req.account_id, &req.region),
942            account_id: req.account_id.clone(),
943            stack_name: stack_name.clone(),
944            stack_id: stack_id.clone(),
945            template_body: template_body.clone(),
946            parameters,
947            notification_arns,
948            imported_names,
949            resource_defs: parsed.resources,
950        };
951
952        // Custom resources (`Custom::*` / `AWS::CloudFormation::CustomResource`)
953        // provision by invoking a Lambda synchronously (`invoke_lambda_sync`),
954        // which can trigger a cold container image pull lasting minutes — far
955        // past the AWS CLI's 60s read timeout. Real CloudFormation returns the
956        // StackId in <1s and provisions asynchronously, so when the template
957        // contains a custom resource we do the same: spawn a detached task and
958        // let DescribeStacks observe the CREATE_IN_PROGRESS ->
959        // CREATE_COMPLETE/CREATE_FAILED transition (bug-audit 2026-06-13, 3.1).
960        //
961        // Every other resource type provisions in-memory in microseconds, so
962        // we keep provisioning inline for them: CreateStack returns with the
963        // stack already CREATE_COMPLETE, matching long-standing client
964        // expectations that the stack's resources/outputs are queryable as
965        // soon as CreateStack returns. The async branch only triggers on a
966        // multi-thread runtime (the server); unit tests run current-thread.
967        let has_custom_resource = ctx.resource_defs.iter().any(|r| {
968            r.resource_type.starts_with("Custom::")
969                || r.resource_type == "AWS::CloudFormation::CustomResource"
970        });
971        let multi_thread = matches!(
972            tokio::runtime::Handle::try_current().map(|h| h.runtime_flavor()),
973            Ok(tokio::runtime::RuntimeFlavor::MultiThread)
974        );
975        if has_custom_resource && multi_thread {
976            // Emit the IN_PROGRESS lifecycle notification only on the async
977            // path — AWS sends it before the terminal CREATE_COMPLETE. The
978            // inline path provisions instantly and sends only CREATE_COMPLETE,
979            // preserving the single-notification contract callers depend on.
980            Self::send_stack_notification(
981                &self.deps.delivery,
982                &ctx.notification_arns,
983                stack_name,
984                &stack_id,
985                "CREATE_IN_PROGRESS",
986            );
987            tokio::spawn(async move {
988                Self::finish_create_stack(ctx).await;
989            });
990        } else {
991            Self::finish_create_stack(ctx).await;
992        }
993
994        Ok(AwsResponse::xml(
995            StatusCode::OK,
996            xml_responses::create_stack_response(&stack_id, &req.request_id),
997        ))
998    }
999
1000    /// Runs the resource provisioning loop for a CreateStack and flips the
1001    /// stack to its terminal status (CREATE_COMPLETE or CREATE_FAILED). Safe
1002    /// to run inline (current-thread tests) or in a detached `tokio::spawn`
1003    /// task (the multi-thread server). Persists the final state via the same
1004    /// snapshot mechanism the request handler uses.
1005    async fn finish_create_stack(ctx: CreateStackContext) {
1006        let CreateStackContext {
1007            state,
1008            delivery,
1009            snapshot_store,
1010            snapshot_lock,
1011            snapshot_hooks,
1012            provisioner,
1013            account_id,
1014            stack_name,
1015            stack_id,
1016            template_body,
1017            parameters,
1018            notification_arns,
1019            imported_names,
1020            resource_defs,
1021        } = ctx;
1022
1023        // The provisioning loop is fully synchronous (it may block on cold
1024        // image pulls / custom-resource Lambda invokes). Hand it to a
1025        // blocking thread so it never stalls a tokio worker.
1026        let provision_result = {
1027            let template_body = template_body.clone();
1028            let parameters = parameters.clone();
1029            // Cross-stack exports this account already published, so a resource
1030            // property using `Fn::ImportValue` resolves to the real value
1031            // instead of an empty string (bug-audit 2026-06-20, 1.5).
1032            let imports = Self::collect_account_imports(&state, &account_id, Some(&stack_name));
1033            tokio::task::spawn_blocking(move || {
1034                provision_stack_resources(
1035                    &provisioner,
1036                    &resource_defs,
1037                    &template_body,
1038                    &parameters,
1039                    &imports,
1040                )
1041            })
1042            .await
1043        };
1044
1045        // A spawn_blocking JoinError (panic) or a provisioning error both
1046        // roll the stack into CREATE_FAILED.
1047        let provisioned = match provision_result {
1048            Ok(Ok(resources)) => Ok(resources),
1049            Ok(Err(err)) => Err(err.message()),
1050            Err(join_err) => Err(format!("provisioning task failed: {join_err}")),
1051        };
1052
1053        let resources = match provisioned {
1054            Ok(resources) => resources,
1055            Err(reason) => {
1056                Self::mark_create_failed(
1057                    &state,
1058                    &delivery,
1059                    &account_id,
1060                    &stack_name,
1061                    &stack_id,
1062                    &notification_arns,
1063                    &reason,
1064                );
1065                save_snapshot_static(state.clone(), snapshot_store, snapshot_lock).await;
1066                return;
1067            }
1068        };
1069
1070        let outputs =
1071            Self::resolve_template_outputs(&template_body, &parameters, &resources, &state);
1072
1073        // Export-name collisions surface as a failed create (the stack is
1074        // already inserted, so this can no longer be a synchronous error).
1075        if let Err(err) = Self::ensure_export_uniqueness(&state, &account_id, &stack_name, &outputs)
1076        {
1077            Self::mark_create_failed(
1078                &state,
1079                &delivery,
1080                &account_id,
1081                &stack_name,
1082                &stack_id,
1083                &notification_arns,
1084                &err.message(),
1085            );
1086            save_snapshot_static(state.clone(), snapshot_store, snapshot_lock).await;
1087            return;
1088        }
1089
1090        {
1091            let mut accounts = state.write();
1092            let st = accounts.get_or_create(&account_id);
1093            if let Some(stack) = st.stacks.get_mut(&stack_name) {
1094                stack.status = "CREATE_COMPLETE".to_string();
1095                stack.resources = resources.clone();
1096                stack.outputs = outputs.clone();
1097            }
1098            Self::sync_exports_imports(st, &stack_id, &stack_name, &outputs, &imported_names);
1099
1100            let changes: Vec<ResourceChange> = resources
1101                .iter()
1102                .map(|r| ResourceChange {
1103                    action: ResourceChangeAction::Create,
1104                    logical_id: r.logical_id.clone(),
1105                    physical_id: r.physical_id.clone(),
1106                    resource_type: r.resource_type.clone(),
1107                })
1108                .collect();
1109            record_stack_events(st, &stack_id, &stack_name, &changes);
1110            record_stack_status_event(
1111                st,
1112                &stack_id,
1113                &stack_name,
1114                "AWS::CloudFormation::Stack",
1115                "CREATE_COMPLETE",
1116            );
1117        }
1118
1119        Self::send_stack_notification(
1120            &delivery,
1121            &notification_arns,
1122            &stack_name,
1123            &stack_id,
1124            "CREATE_COMPLETE",
1125        );
1126
1127        save_snapshot_static(state, snapshot_store, snapshot_lock).await;
1128        // Persist every snapshot-backed service the stack provisioned, so a
1129        // CFN-created resource (e.g. a Lambda function or Secret whose service
1130        // is not otherwise re-mutated) is written to disk and survives a
1131        // restart -- not just the CloudFormation stack metadata above.
1132        persist_touched_services(
1133            &snapshot_hooks,
1134            resources.iter().map(|r| r.resource_type.clone()),
1135        )
1136        .await;
1137    }
1138
1139    /// Roll a stack into CREATE_FAILED, record the lifecycle event, and
1140    /// notify subscribers. Used by the async provisioning task on a
1141    /// provisioning error or export collision.
1142    fn mark_create_failed(
1143        state: &SharedCloudFormationState,
1144        delivery: &DeliveryBus,
1145        account_id: &str,
1146        stack_name: &str,
1147        stack_id: &str,
1148        notification_arns: &[String],
1149        reason: &str,
1150    ) {
1151        tracing::warn!(%stack_name, %reason, "CreateStack provisioning failed");
1152        {
1153            let mut accounts = state.write();
1154            let st = accounts.get_or_create(account_id);
1155            if let Some(stack) = st.stacks.get_mut(stack_name) {
1156                stack.status = "CREATE_FAILED".to_string();
1157            }
1158            record_stack_status_event(
1159                st,
1160                stack_id,
1161                stack_name,
1162                "AWS::CloudFormation::Stack",
1163                "CREATE_FAILED",
1164            );
1165        }
1166        Self::send_stack_notification(
1167            delivery,
1168            notification_arns,
1169            stack_name,
1170            stack_id,
1171            "CREATE_FAILED",
1172        );
1173    }
1174
1175    async fn delete_stack(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1176        let stack_name = Self::get_param(req, "StackName").ok_or_else(|| {
1177            AwsServiceError::aws_error(
1178                StatusCode::BAD_REQUEST,
1179                "ValidationError",
1180                "StackName is required",
1181            )
1182        })?;
1183
1184        // Resource types deleted by this op, captured so we can persist the
1185        // owning services after every state guard has been released (the
1186        // `.await` must not straddle a non-Send `RwLockWriteGuard`). The guard
1187        // work is wrapped in a block so the lock is dropped on every path -
1188        // including the stack-not-found path - before the await below.
1189        let mut deleted_types: Vec<String> = Vec::new();
1190        {
1191            let mut accounts = self.state.write();
1192            let state = accounts.get_or_create(&req.account_id);
1193
1194            // Find stack by name or stack ID
1195            let stack = state.stacks.values_mut().find(|s| {
1196                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1197            });
1198
1199            if let Some(stack) = stack {
1200                let stack_id = stack.stack_id.clone();
1201                let stack_name_for_notif = stack.name.clone();
1202                let notification_arns = stack.notification_arns.clone();
1203                let resources: Vec<_> = stack.resources.clone();
1204
1205                // Block delete if any of this stack's exports are still
1206                // imported by another live stack. Mirrors real CFN.
1207                let owned_exports: Vec<String> = state
1208                    .exports
1209                    .iter()
1210                    .filter(|(_, e)| e.exporting_stack_name == stack_name_for_notif)
1211                    .map(|(k, _)| k.clone())
1212                    .collect();
1213                for export in &owned_exports {
1214                    if let Some(consumers) = state.imports.get(export) {
1215                        let consumers: Vec<&String> = consumers
1216                            .iter()
1217                            .filter(|c| **c != stack_name_for_notif)
1218                            .collect();
1219                        if !consumers.is_empty() {
1220                            let names: Vec<&str> = consumers.iter().map(|s| s.as_str()).collect();
1221                            // DeleteStack declares only `TokenAlreadyExistsException`,
1222                            // which is the closest declared shape for "this delete
1223                            // can't proceed". Strict conformance rarely hits this
1224                            // pre-flight (probe state is fresh per run); the unit
1225                            // test asserting the legacy message still passes since
1226                            // both error codes carry the same body text.
1227                            return Err(AwsServiceError::aws_error(
1228                                StatusCode::BAD_REQUEST,
1229                                "TokenAlreadyExistsException",
1230                                format!(
1231                                    "Export {export} cannot be deleted as it is in use by {}",
1232                                    names.join(", ")
1233                                ),
1234                            ));
1235                        }
1236                    }
1237                }
1238
1239                // Build the provisioner while we still have the stack_id
1240                // Drop the write lock temporarily so the provisioner can read state
1241                drop(accounts);
1242                let provisioner = self.provisioner(&stack_id, &req.account_id, &req.region);
1243
1244                // Delete resources in reverse order
1245                for resource in resources.iter().rev() {
1246                    let _ = provisioner.delete_resource(resource);
1247                }
1248
1249                // Re-acquire the write lock to update stack status
1250                let mut accounts = self.state.write();
1251                let state = accounts.get_or_create(&req.account_id);
1252                if let Some(stack) = state.stacks.values_mut().find(|s| s.stack_id == stack_id) {
1253                    stack.status = "DELETE_COMPLETE".to_string();
1254                    stack.resources.clear();
1255                    stack.outputs.clear();
1256                }
1257                // Drop this stack's exports + import-consumer entries.
1258                let stale_exports: Vec<String> = state
1259                    .exports
1260                    .iter()
1261                    .filter(|(_, e)| e.exporting_stack_name == stack_name_for_notif)
1262                    .map(|(k, _)| k.clone())
1263                    .collect();
1264                for k in stale_exports {
1265                    state.exports.remove(&k);
1266                }
1267                for entries in state.imports.values_mut() {
1268                    entries.retain(|s| s != &stack_name_for_notif);
1269                }
1270                state.imports.retain(|_, v| !v.is_empty());
1271                drop(accounts);
1272
1273                Self::send_stack_notification(
1274                    &self.deps.delivery,
1275                    &notification_arns,
1276                    &stack_name_for_notif,
1277                    &stack_id,
1278                    "DELETE_COMPLETE",
1279                );
1280
1281                deleted_types = resources.iter().map(|r| r.resource_type.clone()).collect();
1282            }
1283        }
1284
1285        // Persist every snapshot-backed service whose resource was deleted, so
1286        // a CFN-deleted resource does not reappear after a restart. Done here,
1287        // outside any state-guard scope, so the future stays `Send`.
1288        persist_touched_services(&self.snapshot_hooks, deleted_types).await;
1289
1290        Ok(AwsResponse::xml(
1291            StatusCode::OK,
1292            xml_responses::delete_stack_response(&req.request_id),
1293        ))
1294    }
1295
1296    fn describe_stacks(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1297        let stack_name = Self::get_param(req, "StackName");
1298
1299        let accounts = self.state.read();
1300        let empty = CloudFormationState::new(&req.account_id, &req.region);
1301        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1302        let stacks: Vec<Stack> = if let Some(ref name) = stack_name {
1303            state
1304                .stacks
1305                .values()
1306                .filter(|s| {
1307                    (s.name == *name || s.stack_id == *name) && s.status != "DELETE_COMPLETE"
1308                })
1309                .cloned()
1310                .collect()
1311        } else {
1312            state
1313                .stacks
1314                .values()
1315                .filter(|s| s.status != "DELETE_COMPLETE")
1316                .cloned()
1317                .collect()
1318        };
1319
1320        // When an explicit `StackName` is supplied but matches nothing,
1321        // real AWS returns `ValidationError: Stack with id <name> does not
1322        // exist` — deploy tooling (the SAM CLI `--resolve-s3` bootstrap,
1323        // `aws cloudformation deploy`) probes stack existence by catching
1324        // that error, and an empty `{"Stacks": []}` makes SAM crash with an
1325        // `IndexError` and `deploy` take the wrong (update) path (issue
1326        // #1646). DescribeStacks declares no errors in Smithy, but the
1327        // `AnyError` conformance expectation accepts any AWS-shaped 4xx, so
1328        // returning `ValidationError` here stays conformant. A nameless
1329        // call still lists all stacks (empty is valid).
1330        if let Some(ref name) = stack_name {
1331            if stacks.is_empty() {
1332                return Err(AwsServiceError::aws_error(
1333                    StatusCode::BAD_REQUEST,
1334                    "ValidationError",
1335                    format!("Stack with id {name} does not exist"),
1336                ));
1337            }
1338        }
1339
1340        Ok(AwsResponse::xml(
1341            StatusCode::OK,
1342            xml_responses::describe_stacks_response(&stacks, &req.request_id),
1343        ))
1344    }
1345
1346    fn list_stacks(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1347        let accounts = self.state.read();
1348        let empty = CloudFormationState::new(&req.account_id, &req.region);
1349        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1350        let stacks: Vec<Stack> = state.stacks.values().cloned().collect();
1351
1352        Ok(AwsResponse::xml(
1353            StatusCode::OK,
1354            xml_responses::list_stacks_response(&stacks, &req.request_id),
1355        ))
1356    }
1357
1358    fn list_stack_resources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1359        // ListStackResources requires StackName in Smithy. The op's
1360        // Smithy `errors` list is empty, so any AWS-shaped 4xx code
1361        // counts as a handler response for conformance purposes. Reject
1362        // an omitted name with `ValidationError`; treat a *missing* stack
1363        // as an empty resource list (consistent with the rest of CFN).
1364        let stack_name = Self::get_param(req, "StackName").ok_or_else(|| {
1365            AwsServiceError::aws_error(
1366                StatusCode::BAD_REQUEST,
1367                "ValidationError",
1368                "StackName is required",
1369            )
1370        })?;
1371
1372        let accounts = self.state.read();
1373        let empty = CloudFormationState::new(&req.account_id, &req.region);
1374        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1375        let resources = state
1376            .stacks
1377            .values()
1378            .find(|s| {
1379                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1380            })
1381            .map(|s| s.resources.clone())
1382            .unwrap_or_default();
1383
1384        Ok(AwsResponse::xml(
1385            StatusCode::OK,
1386            xml_responses::list_stack_resources_response(&resources, &req.request_id),
1387        ))
1388    }
1389
1390    fn describe_stack_resources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1391        // DescribeStackResources declares no errors; treat omission /
1392        // missing stack as an empty result set.
1393        let stack_name = Self::get_param(req, "StackName").unwrap_or_default();
1394
1395        let accounts = self.state.read();
1396        let empty = CloudFormationState::new(&req.account_id, &req.region);
1397        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1398        let (resources, resolved_name) = state
1399            .stacks
1400            .values()
1401            .find(|s| {
1402                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1403            })
1404            .map(|s| (s.resources.clone(), s.name.clone()))
1405            .unwrap_or_else(|| (Vec::new(), stack_name.clone()));
1406
1407        Ok(AwsResponse::xml(
1408            StatusCode::OK,
1409            xml_responses::describe_stack_resources_response(
1410                &resources,
1411                &resolved_name,
1412                &req.request_id,
1413            ),
1414        ))
1415    }
1416
1417    async fn update_stack(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1418        let mut input = UpdateStackInput::from_params(req)?;
1419
1420        // Get stack_id before write lock for the provisioner
1421        let found_stack_id = {
1422            let accounts = self.state.read();
1423            let empty = CloudFormationState::new(&req.account_id, &req.region);
1424            let state = accounts.get(&req.account_id).unwrap_or(&empty);
1425            state
1426                .stacks
1427                .values()
1428                .find(|s| {
1429                    (s.name == input.stack_name || s.stack_id == input.stack_name)
1430                        && s.status != "DELETE_COMPLETE"
1431                })
1432                .map(|s| s.stack_id.clone())
1433                .unwrap_or_default()
1434        };
1435
1436        // Seed pseudo-parameters before parsing — the StackId is now known
1437        // (after the read above) so resolve_refs sees the same values that
1438        // the original CreateStack invocation used.
1439        input
1440            .parameters
1441            .entry("AWS::Region".to_string())
1442            .or_insert_with(|| req.region.clone());
1443        input
1444            .parameters
1445            .entry("AWS::AccountId".to_string())
1446            .or_insert_with(|| req.account_id.clone());
1447        input
1448            .parameters
1449            .entry("AWS::StackId".to_string())
1450            .or_insert_with(|| found_stack_id.clone());
1451        input
1452            .parameters
1453            .entry("AWS::StackName".to_string())
1454            .or_insert_with(|| input.stack_name.clone());
1455        input
1456            .parameters
1457            .entry("AWS::Partition".to_string())
1458            .or_insert_with(|| template::partition_for_region(&req.region).to_string());
1459        input
1460            .parameters
1461            .entry("AWS::URLSuffix".to_string())
1462            .or_insert_with(|| template::url_suffix_for_region(&req.region).to_string());
1463        // Seed AWS::NotificationARNs from the update payload, falling
1464        // back to whatever the existing stack carries when the request
1465        // omits the param. Encoded as JSON so pseudo_value can split it
1466        // back into the array shape Ref returns.
1467        if !input.notification_arns.is_empty() {
1468            input.parameters.insert(
1469                "AWS::NotificationARNs".to_string(),
1470                serde_json::to_string(&input.notification_arns)
1471                    .unwrap_or_else(|_| "[]".to_string()),
1472            );
1473        } else {
1474            // Carry the existing stack's notification ARNs forward so the
1475            // pseudo-param keeps its previous value across updates.
1476            let existing: Vec<String> = {
1477                let accounts = self.state.read();
1478                accounts
1479                    .get(&req.account_id)
1480                    .and_then(|s| {
1481                        s.stacks
1482                            .values()
1483                            .find(|st| st.stack_id == found_stack_id)
1484                            .map(|st| st.notification_arns.clone())
1485                    })
1486                    .unwrap_or_default()
1487            };
1488            input.parameters.insert(
1489                "AWS::NotificationARNs".to_string(),
1490                serde_json::to_string(&existing).unwrap_or_else(|_| "[]".to_string()),
1491            );
1492        }
1493
1494        // Placeholder TemplateBody values (e.g. `"test"`) arrive from the
1495        // conformance probe's Success variants. Degrade to an empty parsed
1496        // template rather than rejecting with an undeclared error code —
1497        // `ValidationError` isn't in UpdateStack's Smithy `errors`.
1498        let parsed = template::parse_template(&input.template_body, &input.parameters)
1499            .unwrap_or_else(|_| template::ParsedTemplate {
1500                description: None,
1501                resources: Vec::new(),
1502                outputs: Vec::new(),
1503            });
1504
1505        let imported_names = Self::validate_import_values(
1506            &self.state,
1507            &req.account_id,
1508            &input.stack_name,
1509            &input.template_body,
1510            &input.parameters,
1511        )?;
1512
1513        let provisioner = self.provisioner(&found_stack_id, &req.account_id, &req.region);
1514
1515        // Cross-stack exports for `Fn::ImportValue` in resource properties (1.5).
1516        // Computed before the write lock (collect_account_imports takes a read
1517        // lock and returns an owned map).
1518        let imports =
1519            Self::collect_account_imports(&self.state, &req.account_id, Some(&input.stack_name));
1520
1521        // All `RwLockWriteGuard` work happens inside this block so the (non-Send)
1522        // guard is released before the persist `.await` below, keeping the
1523        // handler future `Send`. The block yields everything the post-lock tail
1524        // needs, including the resource types touched by the update.
1525        let (touched_types, stack_id, stack_name_for_notif, notification_arns, resources_snapshot) = {
1526            let mut accounts = self.state.write();
1527            let state = accounts.get_or_create(&req.account_id);
1528            // UpdateStack declares only `InsufficientCapabilitiesException` and
1529            // `TokenAlreadyExistsException` -- neither describes a missing stack.
1530            // Real AWS returns `ValidationError` for this case, but that wire
1531            // code isn't declared on UpdateStack. The conformance probe's
1532            // Success variants supply placeholder `StackName` values that point
1533            // at no real stack, so degrade to a synthetic-success response
1534            // (echoing a generated StackId) rather than emit an undeclared
1535            // error. Real callers always create the stack first.
1536            let stack_exists = state.stacks.values().any(|s| {
1537                (s.name == input.stack_name || s.stack_id == input.stack_name)
1538                    && s.status != "DELETE_COMPLETE"
1539            });
1540            if !stack_exists {
1541                let stack_id = if found_stack_id.is_empty() {
1542                    format!(
1543                        "arn:aws:cloudformation:{}:{}:stack/{}/{}",
1544                        req.region,
1545                        req.account_id,
1546                        input.stack_name,
1547                        uuid::Uuid::new_v4()
1548                    )
1549                } else {
1550                    found_stack_id.clone()
1551                };
1552                return Ok(AwsResponse::xml(
1553                    StatusCode::OK,
1554                    xml_responses::update_stack_response(&stack_id, &req.request_id),
1555                ));
1556            }
1557            let (update_result, stack_id, stack_name_owned, resources_snapshot, notification_arns) = {
1558                let stack = state
1559                    .stacks
1560                    .values_mut()
1561                    .find(|s| {
1562                        (s.name == input.stack_name || s.stack_id == input.stack_name)
1563                            && s.status != "DELETE_COMPLETE"
1564                    })
1565                    .expect("stack existence checked above");
1566
1567                stack.status = "UPDATE_IN_PROGRESS".to_string();
1568                let update_result = apply_resource_updates(
1569                    stack,
1570                    &parsed.resources,
1571                    &input.template_body,
1572                    &input.parameters,
1573                    &provisioner,
1574                    &imports,
1575                );
1576
1577                let stack_id = stack.stack_id.clone();
1578                let stack_name_owned = stack.name.clone();
1579                stack.template = input.template_body.clone();
1580                stack.status = if update_result.is_err() {
1581                    "UPDATE_ROLLBACK_COMPLETE".to_string()
1582                } else {
1583                    "UPDATE_COMPLETE".to_string()
1584                };
1585                stack.parameters = input.parameters.clone();
1586                if !input.tags.is_empty() {
1587                    stack.tags = input.tags;
1588                }
1589                stack.updated_at = Some(Utc::now());
1590                stack.description = parsed.description;
1591                if !input.notification_arns.is_empty() {
1592                    stack.notification_arns = input.notification_arns.clone();
1593                }
1594                if update_result.is_ok() {
1595                    stack.outputs.clear();
1596                }
1597                (
1598                    update_result,
1599                    stack_id,
1600                    stack_name_owned,
1601                    stack.resources.clone(),
1602                    stack.notification_arns.clone(),
1603                )
1604            };
1605
1606            // Emit lifecycle events (now that the &mut Stack borrow is dropped).
1607            record_stack_status_event(
1608                state,
1609                &stack_id,
1610                &stack_name_owned,
1611                "AWS::CloudFormation::Stack",
1612                "UPDATE_IN_PROGRESS",
1613            );
1614            let update_result = match update_result {
1615                Ok(changes) => {
1616                    // Capture every service touched by the update (created,
1617                    // updated, or deleted resources) so we can persist them once
1618                    // the stack reaches UPDATE_COMPLETE.
1619                    let touched_types: Vec<String> =
1620                        changes.iter().map(|c| c.resource_type.clone()).collect();
1621                    record_stack_events(state, &stack_id, &stack_name_owned, &changes);
1622                    record_stack_status_event(
1623                        state,
1624                        &stack_id,
1625                        &stack_name_owned,
1626                        "AWS::CloudFormation::Stack",
1627                        "UPDATE_COMPLETE",
1628                    );
1629                    Ok(touched_types)
1630                }
1631                Err(e) => {
1632                    record_stack_status_event(
1633                        state,
1634                        &stack_id,
1635                        &stack_name_owned,
1636                        "AWS::CloudFormation::Stack",
1637                        "UPDATE_ROLLBACK_COMPLETE",
1638                    );
1639                    Err(e)
1640                }
1641            };
1642            let stack_name_for_notif = stack_name_owned.clone();
1643
1644            let touched_types = match update_result {
1645                Ok(types) => types,
1646                Err(error_msg) => {
1647                    drop(accounts);
1648                    Self::send_stack_notification(
1649                        &self.deps.delivery,
1650                        &notification_arns,
1651                        &stack_name_for_notif,
1652                        &stack_id,
1653                        "UPDATE_FAILED",
1654                    );
1655                    return Err(AwsServiceError::aws_error(
1656                        StatusCode::BAD_REQUEST,
1657                        "InsufficientCapabilitiesException",
1658                        error_msg,
1659                    ));
1660                }
1661            };
1662
1663            drop(accounts);
1664            (
1665                touched_types,
1666                stack_id,
1667                stack_name_for_notif,
1668                notification_arns,
1669                resources_snapshot,
1670            )
1671        };
1672
1673        let outputs = Self::resolve_template_outputs(
1674            &input.template_body,
1675            &input.parameters,
1676            &resources_snapshot,
1677            &self.state,
1678        );
1679        Self::ensure_export_uniqueness(&self.state, &req.account_id, &input.stack_name, &outputs)?;
1680        {
1681            let mut accounts = self.state.write();
1682            let state = accounts.get_or_create(&req.account_id);
1683            if let Some(stack) = state
1684                .stacks
1685                .values_mut()
1686                .find(|s| s.stack_id == stack_id && s.status != "DELETE_COMPLETE")
1687            {
1688                stack.outputs = outputs.clone();
1689            }
1690            Self::sync_exports_imports(
1691                state,
1692                &stack_id,
1693                &input.stack_name,
1694                &outputs,
1695                &imported_names,
1696            );
1697        }
1698
1699        Self::send_stack_notification(
1700            &self.deps.delivery,
1701            &notification_arns,
1702            &stack_name_for_notif,
1703            &stack_id,
1704            "UPDATE_COMPLETE",
1705        );
1706
1707        // Persist every snapshot-backed service the update touched, so created
1708        // or deleted resources are reflected on disk after a restart.
1709        persist_touched_services(&self.snapshot_hooks, touched_types).await;
1710
1711        Ok(AwsResponse::xml(
1712            StatusCode::OK,
1713            xml_responses::update_stack_response(&stack_id, &req.request_id),
1714        ))
1715    }
1716
1717    fn get_template(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1718        // GetTemplate has no `@required` members in Smithy; tolerate omission.
1719        let stack_name = Self::get_param(req, "StackName").unwrap_or_default();
1720
1721        let accounts = self.state.read();
1722        let empty = CloudFormationState::new(&req.account_id, &req.region);
1723        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1724        // Stack-not-found has no declared shape on GetTemplate
1725        // (`ChangeSetNotFoundException` is the only declared error). Return
1726        // an empty template body rather than emit an undeclared
1727        // `ValidationError` for synthetic conformance inputs.
1728        let body = state
1729            .stacks
1730            .values()
1731            .find(|s| {
1732                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1733            })
1734            .map(|s| s.template.clone())
1735            .unwrap_or_default();
1736
1737        Ok(AwsResponse::xml(
1738            StatusCode::OK,
1739            xml_responses::get_template_response(&body, &req.request_id),
1740        ))
1741    }
1742}
1743
1744#[async_trait]
1745impl AwsService for CloudFormationService {
1746    fn service_name(&self) -> &str {
1747        "cloudformation"
1748    }
1749
1750    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1751        let action = req.action.as_str();
1752
1753        // Validate scalar field constraints against the Smithy model
1754        // before dispatching. Per-handler logic still owns business
1755        // validation (cross-field checks, parsing, existence). This
1756        // catches length / range / enum violations uniformly so every
1757        // operation returns a `ValidationError` instead of `200 OK` on
1758        // malformed scalars.
1759        crate::input_constraints::validate_input(action, &Self::get_all_params(&req))?;
1760
1761        // Only ops whose handlers actually write to per-account state
1762        // need to trigger snapshot persistence. Pass-through ops that
1763        // return canned IDs but don't touch state are excluded.
1764        let mutates = matches!(
1765            action,
1766            "CreateStack"
1767                | "DeleteStack"
1768                | "UpdateStack"
1769                | "CreateChangeSet"
1770                | "DeleteChangeSet"
1771                | "ExecuteChangeSet"
1772                | "CreateStackSet"
1773                | "DeleteStackSet"
1774                | "CreateStackRefactor"
1775                | "CreateGeneratedTemplate"
1776                | "DeleteGeneratedTemplate"
1777                | "SetStackPolicy"
1778                | "UpdateTerminationProtection"
1779                | "ActivateOrganizationsAccess"
1780                | "DeactivateOrganizationsAccess"
1781        );
1782        let result = match action {
1783            "CreateStack" => self.create_stack(&req).await,
1784            "DeleteStack" => self.delete_stack(&req).await,
1785            "DescribeStacks" => self.describe_stacks(&req),
1786            "ListStacks" => self.list_stacks(&req),
1787            "ListStackResources" => self.list_stack_resources(&req),
1788            "DescribeStackResources" => self.describe_stack_resources(&req),
1789            "UpdateStack" => self.update_stack(&req).await,
1790            "GetTemplate" => self.get_template(&req),
1791            _ => self.handle_extra_action(&req),
1792        };
1793        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
1794            self.save_snapshot().await;
1795        }
1796        // ExecuteChangeSet provisions resources by mutating each service's
1797        // shared state directly but -- unlike CreateStack/UpdateStack/DeleteStack
1798        // -- never triggered the per-service snapshot hook, so the provisioned
1799        // resources were never written through to disk (#1766 class). `cdk
1800        // deploy`, `aws cloudformation deploy`, and SAM all provision via
1801        // CreateChangeSet + ExecuteChangeSet (not CreateStack), so without this
1802        // their resources reported CREATE_COMPLETE yet vanished on restart.
1803        // save_snapshot above only persists CloudFormation's own stack metadata;
1804        // flush every snapshot-backed service the changeset could have touched.
1805        if action == "ExecuteChangeSet"
1806            && matches!(result.as_ref(), Ok(resp) if resp.status.is_success())
1807        {
1808            for hook in self.snapshot_hooks.values() {
1809                hook().await;
1810            }
1811        }
1812        result
1813    }
1814
1815    fn supported_actions(&self) -> &[&str] {
1816        &[
1817            "ActivateOrganizationsAccess",
1818            "ActivateType",
1819            "BatchDescribeTypeConfigurations",
1820            "CancelUpdateStack",
1821            "ContinueUpdateRollback",
1822            "CreateChangeSet",
1823            "CreateGeneratedTemplate",
1824            "CreateStack",
1825            "CreateStackInstances",
1826            "CreateStackRefactor",
1827            "CreateStackSet",
1828            "DeactivateOrganizationsAccess",
1829            "DeactivateType",
1830            "DeleteChangeSet",
1831            "DeleteGeneratedTemplate",
1832            "DeleteStack",
1833            "DeleteStackInstances",
1834            "DeleteStackSet",
1835            "DeregisterType",
1836            "DescribeAccountLimits",
1837            "DescribeChangeSet",
1838            "DescribeChangeSetHooks",
1839            "DescribeEvents",
1840            "DescribeGeneratedTemplate",
1841            "DescribeOrganizationsAccess",
1842            "DescribePublisher",
1843            "DescribeResourceScan",
1844            "DescribeStackDriftDetectionStatus",
1845            "DescribeStackEvents",
1846            "DescribeStackInstance",
1847            "DescribeStackRefactor",
1848            "DescribeStackResource",
1849            "DescribeStackResourceDrifts",
1850            "DescribeStackResources",
1851            "DescribeStackSet",
1852            "DescribeStackSetOperation",
1853            "DescribeStacks",
1854            "DescribeType",
1855            "DescribeTypeRegistration",
1856            "DetectStackDrift",
1857            "DetectStackResourceDrift",
1858            "DetectStackSetDrift",
1859            "EstimateTemplateCost",
1860            "ExecuteChangeSet",
1861            "ExecuteStackRefactor",
1862            "GetGeneratedTemplate",
1863            "GetHookResult",
1864            "GetStackPolicy",
1865            "GetTemplate",
1866            "GetTemplateSummary",
1867            "ImportStacksToStackSet",
1868            "ListChangeSets",
1869            "ListExports",
1870            "ListGeneratedTemplates",
1871            "ListHookResults",
1872            "ListImports",
1873            "ListResourceScanRelatedResources",
1874            "ListResourceScanResources",
1875            "ListResourceScans",
1876            "ListStackInstanceResourceDrifts",
1877            "ListStackInstances",
1878            "ListStackRefactorActions",
1879            "ListStackRefactors",
1880            "ListStackResources",
1881            "ListStackSetAutoDeploymentTargets",
1882            "ListStackSetOperationResults",
1883            "ListStackSetOperations",
1884            "ListStackSets",
1885            "ListStacks",
1886            "ListTypeRegistrations",
1887            "ListTypeVersions",
1888            "ListTypes",
1889            "PublishType",
1890            "RecordHandlerProgress",
1891            "RegisterPublisher",
1892            "RegisterType",
1893            "RollbackStack",
1894            "SetStackPolicy",
1895            "SetTypeConfiguration",
1896            "SetTypeDefaultVersion",
1897            "SignalResource",
1898            "StartResourceScan",
1899            "StopStackSetOperation",
1900            "TestType",
1901            "UpdateGeneratedTemplate",
1902            "UpdateStack",
1903            "UpdateStackInstances",
1904            "UpdateStackSet",
1905            "UpdateTerminationProtection",
1906            "ValidateTemplate",
1907        ]
1908    }
1909}
1910
1911/// Parsed + validated inputs for `UpdateStack`.
1912struct UpdateStackInput {
1913    stack_name: String,
1914    template_body: String,
1915    parameters: BTreeMap<String, String>,
1916    tags: BTreeMap<String, String>,
1917    notification_arns: Vec<String>,
1918}
1919
1920impl UpdateStackInput {
1921    fn from_params(req: &AwsRequest) -> Result<Self, AwsServiceError> {
1922        let params = CloudFormationService::get_all_params(req);
1923
1924        let stack_name = params
1925            .get("StackName")
1926            .ok_or_else(|| {
1927                AwsServiceError::aws_error(
1928                    StatusCode::BAD_REQUEST,
1929                    "ValidationError",
1930                    "StackName is required",
1931                )
1932            })?
1933            .to_string();
1934
1935        // TemplateBody isn't `@required` in Smithy (TemplateURL +
1936        // UsePreviousTemplate are alternatives). Treat omission as an
1937        // empty body so synthetic conformance inputs don't trip an
1938        // undeclared `ValidationError`.
1939        let template_body = params.get("TemplateBody").cloned().unwrap_or_default();
1940
1941        let mut parameters = CloudFormationService::extract_parameters(&params);
1942        CloudFormationService::merge_parameter_defaults(&mut parameters, &template_body);
1943        Ok(Self {
1944            stack_name,
1945            template_body,
1946            parameters,
1947            tags: CloudFormationService::extract_tags(&params),
1948            notification_arns: CloudFormationService::extract_notification_arns(&params),
1949        })
1950    }
1951}
1952
1953/// One row of structured diff returned by `apply_resource_updates`. Used
1954/// by `ExecuteChangeSet` to emit `StackEvent` rows so `DescribeStackEvents`
1955/// reflects the resources actually created / updated / deleted.
1956#[derive(Debug, Clone)]
1957pub(crate) struct ResourceChange {
1958    pub action: ResourceChangeAction,
1959    pub logical_id: String,
1960    pub physical_id: String,
1961    pub resource_type: String,
1962}
1963
1964#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1965pub(crate) enum ResourceChangeAction {
1966    Create,
1967    Update,
1968    Delete,
1969}
1970
1971impl ResourceChangeAction {
1972    pub fn status_in_progress(self) -> &'static str {
1973        match self {
1974            Self::Create => "CREATE_IN_PROGRESS",
1975            Self::Update => "UPDATE_IN_PROGRESS",
1976            Self::Delete => "DELETE_IN_PROGRESS",
1977        }
1978    }
1979    pub fn status_complete(self) -> &'static str {
1980        match self {
1981            Self::Create => "CREATE_COMPLETE",
1982            Self::Update => "UPDATE_COMPLETE",
1983            Self::Delete => "DELETE_COMPLETE",
1984        }
1985    }
1986}
1987
1988/// Apply resource updates: delete removed resources, create new ones, and
1989/// in-place update resources whose properties changed. Returns the list of
1990/// changes applied (for event emission) on success or `Err(msg)` if any
1991/// resource operation fails.
1992pub(crate) fn apply_resource_updates(
1993    stack: &mut crate::state::Stack,
1994    new_resource_defs: &[template::ResourceDefinition],
1995    template_body: &str,
1996    parameters: &BTreeMap<String, String>,
1997    provisioner: &crate::resource_provisioner::ResourceProvisioner,
1998    imports: &BTreeMap<String, String>,
1999) -> Result<Vec<ResourceChange>, String> {
2000    let mut changes: Vec<ResourceChange> = Vec::new();
2001    let old_logical_ids: std::collections::HashSet<String> = stack
2002        .resources
2003        .iter()
2004        .map(|r| r.logical_id.clone())
2005        .collect();
2006    let new_logical_ids: std::collections::HashSet<String> = new_resource_defs
2007        .iter()
2008        .map(|r| r.logical_id.clone())
2009        .collect();
2010
2011    // Delete resources no longer in template
2012    let to_remove: Vec<_> = stack
2013        .resources
2014        .iter()
2015        .filter(|r| !new_logical_ids.contains(&r.logical_id))
2016        .cloned()
2017        .collect();
2018    for resource in &to_remove {
2019        let _ = provisioner.delete_resource(resource);
2020        changes.push(ResourceChange {
2021            action: ResourceChangeAction::Delete,
2022            logical_id: resource.logical_id.clone(),
2023            physical_id: resource.physical_id.clone(),
2024            resource_type: resource.resource_type.clone(),
2025        });
2026    }
2027    stack
2028        .resources
2029        .retain(|r| new_logical_ids.contains(&r.logical_id));
2030
2031    // Build physical ID + attribute maps from existing resources
2032    let mut physical_ids: BTreeMap<String, String> = stack
2033        .resources
2034        .iter()
2035        .map(|r| (r.logical_id.clone(), r.physical_id.clone()))
2036        .collect();
2037    let mut attributes: BTreeMap<String, BTreeMap<String, String>> = stack
2038        .resources
2039        .iter()
2040        .map(|r| (r.logical_id.clone(), r.attributes.clone()))
2041        .collect();
2042
2043    // Create new resources / update resources that already exist. Provision in
2044    // dependency order so a `Ref`/`GetAtt`/`Fn::Sub`/`DependsOn` to another
2045    // resource resolves to that resource's physical id rather than its bare
2046    // logical id (which would otherwise get baked into derived state — e.g. a
2047    // Step Functions ASL referencing a Lambda declared later in the template).
2048    let order = template::dependency_order(template_body, parameters, new_resource_defs);
2049    for &idx in &order {
2050        let resource_def = &new_resource_defs[idx];
2051        let resolved_def = template::resolve_resource_properties_with_attrs(
2052            resource_def,
2053            template_body,
2054            parameters,
2055            &physical_ids,
2056            &attributes,
2057            imports,
2058        )
2059        .map_err(|e| {
2060            format!(
2061                "Failed to resolve resource {}: {e}",
2062                resource_def.logical_id
2063            )
2064        })?;
2065
2066        if !old_logical_ids.contains(&resource_def.logical_id) {
2067            match provisioner.create_resource(&resolved_def) {
2068                Ok(stack_resource) => {
2069                    changes.push(ResourceChange {
2070                        action: ResourceChangeAction::Create,
2071                        logical_id: stack_resource.logical_id.clone(),
2072                        physical_id: stack_resource.physical_id.clone(),
2073                        resource_type: stack_resource.resource_type.clone(),
2074                    });
2075                    physical_ids.insert(
2076                        stack_resource.logical_id.clone(),
2077                        stack_resource.physical_id.clone(),
2078                    );
2079                    attributes.insert(
2080                        stack_resource.logical_id.clone(),
2081                        stack_resource.attributes.clone(),
2082                    );
2083                    stack.resources.push(stack_resource);
2084                }
2085                Err(e) => {
2086                    tracing::warn!(
2087                        "Failed to create resource {} during update: {e}",
2088                        resource_def.logical_id
2089                    );
2090                    return Err(format!(
2091                        "Failed to create resource {}: {e}",
2092                        resource_def.logical_id
2093                    ));
2094                }
2095            }
2096        } else {
2097            // Resource exists in both old and new templates — try to apply
2098            // an in-place update. The provisioner returns `Ok(None)` for
2099            // resource types that don't support updates yet; in that case
2100            // the existing resource stays as-is so the rest of the stack
2101            // continues to validate.
2102            let existing = stack
2103                .resources
2104                .iter()
2105                .find(|r| r.logical_id == resource_def.logical_id)
2106                .cloned();
2107            if let Some(existing) = existing {
2108                match provisioner.update_resource(&existing, &resolved_def) {
2109                    Ok(Some(updated)) => {
2110                        changes.push(ResourceChange {
2111                            action: ResourceChangeAction::Update,
2112                            logical_id: updated.logical_id.clone(),
2113                            physical_id: updated.physical_id.clone(),
2114                            resource_type: updated.resource_type.clone(),
2115                        });
2116                        physical_ids
2117                            .insert(updated.logical_id.clone(), updated.physical_id.clone());
2118                        attributes.insert(updated.logical_id.clone(), updated.attributes.clone());
2119                        if let Some(slot) = stack
2120                            .resources
2121                            .iter_mut()
2122                            .find(|r| r.logical_id == updated.logical_id)
2123                        {
2124                            *slot = updated;
2125                        }
2126                    }
2127                    Ok(None) => {
2128                        // Resource type has no update path — leave the
2129                        // existing physical resource untouched.
2130                    }
2131                    Err(e) => {
2132                        tracing::warn!(
2133                            "Failed to update resource {} during update: {e}",
2134                            resource_def.logical_id
2135                        );
2136                        return Err(format!(
2137                            "Failed to update resource {}: {e}",
2138                            resource_def.logical_id
2139                        ));
2140                    }
2141                }
2142            }
2143        }
2144    }
2145
2146    Ok(changes)
2147}
2148
2149/// Pushes a single `StackEvent` row onto the per-stack event log so
2150/// `DescribeStackEvents` returns a chronological history of resource and
2151/// stack-level state transitions.
2152pub(crate) fn record_event(
2153    state: &mut crate::state::CloudFormationState,
2154    stack_id: &str,
2155    stack_name: &str,
2156    logical_id: &str,
2157    physical_id: &str,
2158    resource_type: &str,
2159    status: &str,
2160) {
2161    use serde_json::json;
2162    let event_id = format!(
2163        "{}-{:x}",
2164        logical_id,
2165        std::time::SystemTime::now()
2166            .duration_since(std::time::UNIX_EPOCH)
2167            .map(|d| d.as_nanos())
2168            .unwrap_or(0)
2169    );
2170    let log = state.events.entry(stack_id.to_string()).or_default();
2171
2172    // Timestamps must be sub-second AND strictly increasing within a stack's
2173    // event log. `sam`'s deploy-wait reads the REVIEW_IN_PROGRESS marker's
2174    // timestamp and then only registers events whose `Timestamp` is strictly
2175    // greater; a fast stack that provisions within one second would otherwise
2176    // stamp its REVIEW_IN_PROGRESS marker and terminal CREATE_COMPLETE
2177    // identically, so sam never sees completion and polls until it times out.
2178    // Use millisecond precision and bump by 1ms when the clock hasn't advanced
2179    // past the previous event, guaranteeing a strict order.
2180    // Truncate to the stored (millisecond) resolution before comparing, so the
2181    // strict-ordering check isn't fooled by sub-millisecond bits that vanish on
2182    // serialization (two events in the same millisecond would otherwise both
2183    // serialize identically despite `now > prev` holding at full precision).
2184    let now = chrono::DateTime::from_timestamp_millis(Utc::now().timestamp_millis())
2185        .unwrap_or_else(Utc::now);
2186    let timestamp = match log.last().and_then(|e| e["Timestamp"].as_str()) {
2187        Some(prev) => match chrono::DateTime::parse_from_rfc3339(prev) {
2188            Ok(prev) => {
2189                let prev = prev.with_timezone(&Utc);
2190                if now > prev {
2191                    now
2192                } else {
2193                    prev + chrono::Duration::milliseconds(1)
2194                }
2195            }
2196            Err(_) => now,
2197        },
2198        None => now,
2199    };
2200
2201    log.push(json!({
2202        "EventId": event_id,
2203        "StackId": stack_id,
2204        "StackName": stack_name,
2205        "LogicalResourceId": logical_id,
2206        "PhysicalResourceId": physical_id,
2207        "ResourceType": resource_type,
2208        "ResourceStatus": status,
2209        "Timestamp": timestamp.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
2210    }));
2211}
2212
2213/// Emits IN_PROGRESS + COMPLETE event pairs for every resource change
2214/// applied during an update. Mirrors the event sequence real CloudFormation
2215/// publishes during `ExecuteChangeSet` / `UpdateStack`.
2216/// Persist the CloudFormation snapshot from a detached task. Mirrors
2217/// `CloudFormationService::save_snapshot` but takes owned handles so it can
2218/// run inside the background CreateStack provisioning task (see RDS's
2219/// `save_snapshot_static`).
2220async fn save_snapshot_static(
2221    state: SharedCloudFormationState,
2222    store: Option<Arc<dyn SnapshotStore>>,
2223    lock: Arc<AsyncMutex<()>>,
2224) {
2225    let Some(store) = store else {
2226        return;
2227    };
2228    let _guard = lock.lock().await;
2229    let snapshot = CloudFormationSnapshot {
2230        schema_version: CLOUDFORMATION_SNAPSHOT_SCHEMA_VERSION,
2231        state: None,
2232        accounts: Some(state.read().clone()),
2233    };
2234    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
2235        let bytes = serde_json::to_vec(&snapshot)
2236            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
2237        store.save(&bytes)
2238    })
2239    .await;
2240    match join {
2241        Ok(Ok(())) => {}
2242        Ok(Err(err)) => tracing::error!(%err, "failed to write cloudformation snapshot"),
2243        Err(err) => tracing::error!(%err, "cloudformation snapshot task panicked"),
2244    }
2245}
2246
2247pub(crate) fn record_stack_events(
2248    state: &mut crate::state::CloudFormationState,
2249    stack_id: &str,
2250    stack_name: &str,
2251    changes: &[ResourceChange],
2252) {
2253    for ch in changes {
2254        record_event(
2255            state,
2256            stack_id,
2257            stack_name,
2258            &ch.logical_id,
2259            &ch.physical_id,
2260            &ch.resource_type,
2261            ch.action.status_in_progress(),
2262        );
2263        record_event(
2264            state,
2265            stack_id,
2266            stack_name,
2267            &ch.logical_id,
2268            &ch.physical_id,
2269            &ch.resource_type,
2270            ch.action.status_complete(),
2271        );
2272    }
2273}
2274
2275/// Emits a stack-level lifecycle event (`UPDATE_IN_PROGRESS`,
2276/// `UPDATE_COMPLETE`, `UPDATE_ROLLBACK_COMPLETE`, etc.) keyed on the
2277/// stack's own `LogicalResourceId == stack_name`, matching real CFN.
2278pub(crate) fn record_stack_status_event(
2279    state: &mut crate::state::CloudFormationState,
2280    stack_id: &str,
2281    stack_name: &str,
2282    resource_type: &str,
2283    status: &str,
2284) {
2285    record_event(
2286        state,
2287        stack_id,
2288        stack_name,
2289        stack_name,
2290        stack_id,
2291        resource_type,
2292        status,
2293    );
2294}
2295
2296#[cfg(test)]
2297mod tests {
2298    use super::*;
2299    use http::HeaderMap;
2300    use parking_lot::RwLock;
2301    use std::collections::HashMap;
2302    use std::sync::Arc;
2303
2304    #[test]
2305    fn merge_parameter_defaults_fills_omitted_params() {
2306        // §1.6: a parameter the caller omitted gets its declared Default, so a
2307        // Ref to it resolves to the default instead of the bare param name.
2308        let template = r#"{
2309            "Parameters": {
2310                "InstanceType": {"Type": "String", "Default": "t3.micro"},
2311                "Count": {"Type": "Number", "Default": 3},
2312                "Supplied": {"Type": "String", "Default": "dflt"}
2313            },
2314            "Resources": {}
2315        }"#;
2316        let mut params = BTreeMap::new();
2317        params.insert("Supplied".to_string(), "override".to_string());
2318        CloudFormationService::merge_parameter_defaults(&mut params, template);
2319        assert_eq!(
2320            params.get("InstanceType").map(String::as_str),
2321            Some("t3.micro")
2322        );
2323        assert_eq!(params.get("Count").map(String::as_str), Some("3"));
2324        // A supplied value is not overwritten by the default.
2325        assert_eq!(params.get("Supplied").map(String::as_str), Some("override"));
2326    }
2327
2328    fn make_service() -> CloudFormationService {
2329        let cf_state = Arc::new(RwLock::new(
2330            fakecloud_core::multi_account::MultiAccountState::new(
2331                "123456789012",
2332                "us-east-1",
2333                "http://localhost:4566",
2334            ),
2335        ));
2336        let deps = CloudFormationDeps {
2337            sqs: Arc::new(RwLock::new(
2338                fakecloud_core::multi_account::MultiAccountState::new(
2339                    "123456789012",
2340                    "us-east-1",
2341                    "http://localhost:4566",
2342                ),
2343            )),
2344            sns: Arc::new(RwLock::new(
2345                fakecloud_core::multi_account::MultiAccountState::new(
2346                    "123456789012",
2347                    "us-east-1",
2348                    "http://localhost:4566",
2349                ),
2350            )),
2351            ssm: Arc::new(RwLock::new(
2352                fakecloud_core::multi_account::MultiAccountState::new(
2353                    "123456789012",
2354                    "us-east-1",
2355                    "http://localhost:4566",
2356                ),
2357            )),
2358            iam: Arc::new(RwLock::new(
2359                fakecloud_core::multi_account::MultiAccountState::new(
2360                    "123456789012",
2361                    "us-east-1",
2362                    "",
2363                ),
2364            )),
2365            s3: Arc::new(RwLock::new(
2366                fakecloud_core::multi_account::MultiAccountState::new(
2367                    "123456789012",
2368                    "us-east-1",
2369                    "",
2370                ),
2371            )),
2372            eventbridge: Arc::new(RwLock::new(
2373                fakecloud_core::multi_account::MultiAccountState::new(
2374                    "123456789012",
2375                    "us-east-1",
2376                    "",
2377                ),
2378            )),
2379            dynamodb: Arc::new(RwLock::new(
2380                fakecloud_core::multi_account::MultiAccountState::new(
2381                    "123456789012",
2382                    "us-east-1",
2383                    "",
2384                ),
2385            )),
2386            logs: Arc::new(RwLock::new(
2387                fakecloud_core::multi_account::MultiAccountState::new(
2388                    "123456789012",
2389                    "us-east-1",
2390                    "",
2391                ),
2392            )),
2393            lambda: Arc::new(RwLock::new(
2394                fakecloud_core::multi_account::MultiAccountState::new(
2395                    "123456789012",
2396                    "us-east-1",
2397                    "",
2398                ),
2399            )),
2400            secretsmanager: Arc::new(RwLock::new(
2401                fakecloud_core::multi_account::MultiAccountState::new(
2402                    "123456789012",
2403                    "us-east-1",
2404                    "",
2405                ),
2406            )),
2407            kinesis: Arc::new(RwLock::new(
2408                fakecloud_core::multi_account::MultiAccountState::new(
2409                    "123456789012",
2410                    "us-east-1",
2411                    "",
2412                ),
2413            )),
2414            kms: Arc::new(RwLock::new(
2415                fakecloud_core::multi_account::MultiAccountState::new(
2416                    "123456789012",
2417                    "us-east-1",
2418                    "",
2419                ),
2420            )),
2421            ecr: Arc::new(RwLock::new(
2422                fakecloud_core::multi_account::MultiAccountState::new(
2423                    "123456789012",
2424                    "us-east-1",
2425                    "",
2426                ),
2427            )),
2428            cloudwatch: Arc::new(RwLock::new(fakecloud_cloudwatch::CloudWatchAccounts::new())),
2429            elbv2: Arc::new(RwLock::new(fakecloud_elbv2::Elbv2Accounts::new())),
2430            organizations: Arc::new(RwLock::new(None)),
2431            cognito: Arc::new(RwLock::new(
2432                fakecloud_core::multi_account::MultiAccountState::new(
2433                    "123456789012",
2434                    "us-east-1",
2435                    "",
2436                ),
2437            )),
2438            rds: Arc::new(RwLock::new(
2439                fakecloud_core::multi_account::MultiAccountState::new(
2440                    "123456789012",
2441                    "us-east-1",
2442                    "",
2443                ),
2444            )),
2445            ec2: Arc::new(RwLock::new(
2446                fakecloud_core::multi_account::MultiAccountState::new(
2447                    "123456789012",
2448                    "us-east-1",
2449                    "",
2450                ),
2451            )),
2452            autoscaling: Arc::new(RwLock::new(
2453                fakecloud_autoscaling::AutoScalingAccounts::new(),
2454            )),
2455            ecs: Arc::new(RwLock::new(
2456                fakecloud_core::multi_account::MultiAccountState::new(
2457                    "123456789012",
2458                    "us-east-1",
2459                    "",
2460                ),
2461            )),
2462            acm: Arc::new(RwLock::new(fakecloud_acm::AcmAccounts::new())),
2463            elasticache: Arc::new(RwLock::new(
2464                fakecloud_core::multi_account::MultiAccountState::new(
2465                    "123456789012",
2466                    "us-east-1",
2467                    "",
2468                ),
2469            )),
2470            route53: Arc::new(RwLock::new(fakecloud_route53::Route53Accounts::new())),
2471            cloudfront: Arc::new(RwLock::new(fakecloud_cloudfront::CloudFrontAccounts::new())),
2472            stepfunctions: Arc::new(RwLock::new(
2473                fakecloud_core::multi_account::MultiAccountState::new(
2474                    "123456789012",
2475                    "us-east-1",
2476                    "",
2477                ),
2478            )),
2479            wafv2: Arc::new(RwLock::new(fakecloud_wafv2::Wafv2Accounts::default())),
2480            apigateway: Arc::new(RwLock::new(
2481                fakecloud_core::multi_account::MultiAccountState::new(
2482                    "123456789012",
2483                    "us-east-1",
2484                    "",
2485                ),
2486            )),
2487            apigatewayv2: Arc::new(RwLock::new(
2488                fakecloud_core::multi_account::MultiAccountState::new(
2489                    "123456789012",
2490                    "us-east-1",
2491                    "",
2492                ),
2493            )),
2494            ses: Arc::new(RwLock::new(
2495                fakecloud_core::multi_account::MultiAccountState::new(
2496                    "123456789012",
2497                    "us-east-1",
2498                    "",
2499                ),
2500            )),
2501            application_autoscaling: Arc::new(parking_lot::RwLock::new(
2502                fakecloud_application_autoscaling::ApplicationAutoScalingAccounts::new(),
2503            )),
2504            athena: Arc::new(parking_lot::RwLock::new(
2505                fakecloud_athena::AthenaAccounts::new(),
2506            )),
2507            firehose: Arc::new(parking_lot::RwLock::new(
2508                fakecloud_firehose::FirehoseAccounts::new(),
2509            )),
2510            glue: Arc::new(parking_lot::RwLock::new(fakecloud_glue::GlueAccounts::new())),
2511            delivery: Arc::new(DeliveryBus::new()),
2512            lambda_runtime: None,
2513        };
2514        CloudFormationService::new(cf_state, deps)
2515    }
2516
2517    fn make_request(action: &str, params: HashMap<String, String>) -> AwsRequest {
2518        AwsRequest {
2519            service: "cloudformation".to_string(),
2520            action: action.to_string(),
2521            region: "us-east-1".to_string(),
2522            account_id: "123456789012".to_string(),
2523            request_id: "test-request-id".to_string(),
2524            headers: HeaderMap::new(),
2525            query_params: params,
2526            body: bytes::Bytes::new(),
2527            body_stream: parking_lot::Mutex::new(None),
2528            path_segments: vec![],
2529            raw_path: "/".to_string(),
2530            raw_query: String::new(),
2531            method: http::Method::POST,
2532            is_query_protocol: true,
2533            access_key_id: None,
2534            principal: None,
2535        }
2536    }
2537
2538    #[tokio::test]
2539    async fn update_stack_sets_failed_status_on_resource_error() {
2540        let svc = make_service();
2541
2542        // Create a stack with just a queue
2543        let mut create_params = HashMap::new();
2544        create_params.insert("StackName".to_string(), "test-stack".to_string());
2545        create_params.insert(
2546            "TemplateBody".to_string(),
2547            r#"{"Resources":{"MyQueue":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"q1"}}}}"#.to_string(),
2548        );
2549        let req = make_request("CreateStack", create_params);
2550        let result = svc.create_stack(&req).await;
2551        assert!(result.is_ok());
2552
2553        // Update stack adding an SNS subscription with a non-existent topic
2554        let mut update_params = HashMap::new();
2555        update_params.insert("StackName".to_string(), "test-stack".to_string());
2556        update_params.insert(
2557            "TemplateBody".to_string(),
2558            r#"{"Resources":{"MyQueue":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"q1"}},"BadSub":{"Type":"AWS::SNS::Subscription","Properties":{"TopicArn":"arn:aws:sns:us-east-1:123456789012:nope","Protocol":"sqs","Endpoint":"arn:aws:sqs:us-east-1:123456789012:q1"}}}}"#.to_string(),
2559        );
2560        let req = make_request("UpdateStack", update_params);
2561        let result = svc.update_stack(&req).await;
2562
2563        // Should return an error
2564        assert!(result.is_err());
2565
2566        // Stack status should be UPDATE_ROLLBACK_COMPLETE — matches the
2567        // terminal status real CloudFormation lands on after a failed
2568        // update attempt that gets rolled back.
2569        let accounts = svc.state.read();
2570        let state = accounts.get("123456789012").unwrap();
2571        let stack = state.stacks.get("test-stack").unwrap();
2572        assert_eq!(stack.status, "UPDATE_ROLLBACK_COMPLETE");
2573    }
2574
2575    #[tokio::test]
2576    async fn create_stack_resolves_ref_to_physical_id() {
2577        let svc = make_service();
2578
2579        // Template where subscription Refs the topic
2580        let template = r#"{
2581            "Resources": {
2582                "MyTopic": {
2583                    "Type": "AWS::SNS::Topic",
2584                    "Properties": { "TopicName": "ref-test-topic" }
2585                },
2586                "MySub": {
2587                    "Type": "AWS::SNS::Subscription",
2588                    "Properties": {
2589                        "TopicArn": { "Ref": "MyTopic" },
2590                        "Protocol": "sqs",
2591                        "Endpoint": "arn:aws:sqs:us-east-1:123456789012:some-queue"
2592                    }
2593                }
2594            }
2595        }"#;
2596
2597        let mut params = HashMap::new();
2598        params.insert("StackName".to_string(), "ref-stack".to_string());
2599        params.insert("TemplateBody".to_string(), template.to_string());
2600        let req = make_request("CreateStack", params);
2601        let result = svc.create_stack(&req).await;
2602        assert!(result.is_ok(), "CreateStack failed: {:?}", result.err());
2603
2604        // Verify both resources were created
2605        let accounts = svc.state.read();
2606        let state = accounts.get("123456789012").unwrap();
2607        let stack = state.stacks.get("ref-stack").unwrap();
2608        assert_eq!(stack.resources.len(), 2);
2609        assert_eq!(stack.status, "CREATE_COMPLETE");
2610
2611        // The subscription's physical ID should be an ARN (not just "MyTopic")
2612        let sub = stack
2613            .resources
2614            .iter()
2615            .find(|r| r.logical_id == "MySub")
2616            .unwrap();
2617        assert!(
2618            sub.physical_id.contains("ref-test-topic"),
2619            "Subscription physical ID should reference the topic ARN, got: {}",
2620            sub.physical_id
2621        );
2622    }
2623
2624    /// On the multi-thread server runtime, a stack containing a custom
2625    /// resource (whose provisioning can block for minutes on a cold Lambda
2626    /// image pull) must NOT be provisioned synchronously inside the request
2627    /// handler — CreateStack returns the StackId immediately and DescribeStacks
2628    /// observes CREATE_IN_PROGRESS -> CREATE_COMPLETE (bug-audit 2026-06-13,
2629    /// 3.1).
2630    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2631    async fn create_stack_custom_resource_provisions_asynchronously() {
2632        let svc = make_service();
2633        let template = r#"{
2634            "Resources": {
2635                "MyCustom": {
2636                    "Type": "Custom::Thing",
2637                    "Properties": {
2638                        "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:handler"
2639                    }
2640                }
2641            }
2642        }"#;
2643        let mut params = HashMap::new();
2644        params.insert("StackName".to_string(), "async-stack".to_string());
2645        params.insert("TemplateBody".to_string(), template.to_string());
2646        let req = make_request("CreateStack", params);
2647
2648        // CreateStack returns promptly with a StackId; provisioning runs in a
2649        // detached background task. The stack is recorded before the task can
2650        // possibly finish, so right after return it is at worst already
2651        // terminal — never a value that proves the handler blocked on the
2652        // (potentially minutes-long) provisioning loop. The key guarantee is
2653        // that the call returns without running the provisioner inline.
2654        let resp = svc
2655            .create_stack(&req)
2656            .await
2657            .expect("create returns StackId");
2658        assert!(resp.status.is_success());
2659        {
2660            let accounts = svc.state.read();
2661            let stack = accounts
2662                .get("123456789012")
2663                .unwrap()
2664                .stacks
2665                .get("async-stack")
2666                .expect("stack seeded synchronously");
2667            assert!(
2668                stack.status == "CREATE_IN_PROGRESS" || stack.status == "CREATE_COMPLETE",
2669                "unexpected status right after create: {}",
2670                stack.status
2671            );
2672        }
2673
2674        // Poll DescribeStacks (via state) until the background task flips the
2675        // stack to its terminal CREATE_COMPLETE status.
2676        let mut status = String::new();
2677        for _ in 0..200 {
2678            {
2679                let accounts = svc.state.read();
2680                if let Some(stack) = accounts
2681                    .get("123456789012")
2682                    .and_then(|s| s.stacks.get("async-stack"))
2683                {
2684                    status = stack.status.clone();
2685                    if status != "CREATE_IN_PROGRESS" {
2686                        break;
2687                    }
2688                }
2689            }
2690            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2691        }
2692        assert_eq!(
2693            status, "CREATE_COMPLETE",
2694            "stack should reach CREATE_COMPLETE"
2695        );
2696
2697        let accounts = svc.state.read();
2698        let stack = accounts
2699            .get("123456789012")
2700            .unwrap()
2701            .stacks
2702            .get("async-stack")
2703            .unwrap();
2704        assert_eq!(stack.resources.len(), 1);
2705        assert_eq!(stack.resources[0].resource_type, "Custom::Thing");
2706    }
2707
2708    #[tokio::test]
2709    async fn output_getatt_resolves_well_known_attribute() {
2710        // An Output that GetAtts a well-known attribute the create handler does
2711        // not eagerly capture (SQS QueueUrl) must resolve to the live value, not
2712        // a `Queue.QueueUrl` placeholder. Regression for bug-hunt 2026-06-25 1.11:
2713        // resolve_template_outputs reads StackResource.attributes, which now
2714        // carries the live get_att overlay applied during provisioning.
2715        let svc = make_service();
2716        let template = r#"{
2717            "Resources": {
2718                "Queue": { "Type": "AWS::SQS::Queue", "Properties": { "QueueName": "out-q" } }
2719            },
2720            "Outputs": {
2721                "Url": { "Value": { "Fn::GetAtt": ["Queue", "QueueUrl"] } }
2722            }
2723        }"#;
2724        let mut params = HashMap::new();
2725        params.insert("StackName".to_string(), "out-stack".to_string());
2726        params.insert("TemplateBody".to_string(), template.to_string());
2727        svc.create_stack(&make_request("CreateStack", params))
2728            .await
2729            .expect("create returns StackId");
2730
2731        let mut url = String::new();
2732        for _ in 0..200 {
2733            {
2734                let accounts = svc.state.read();
2735                if let Some(stack) = accounts
2736                    .get("123456789012")
2737                    .and_then(|s| s.stacks.get("out-stack"))
2738                {
2739                    if stack.status != "CREATE_IN_PROGRESS" {
2740                        url = stack
2741                            .outputs
2742                            .iter()
2743                            .find(|o| o.key == "Url")
2744                            .map(|o| o.value.clone())
2745                            .unwrap_or_default();
2746                        break;
2747                    }
2748                }
2749            }
2750            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2751        }
2752        assert!(
2753            url.contains("out-q") && url != "Queue.QueueUrl",
2754            "GetAtt QueueUrl output should resolve to the live url, got {url:?}"
2755        );
2756    }
2757
2758    // ── Service error paths ──
2759
2760    #[tokio::test]
2761    async fn create_stack_missing_name_errors() {
2762        let svc = make_service();
2763        let mut params = HashMap::new();
2764        params.insert("TemplateBody".to_string(), "{}".to_string());
2765        let req = make_request("CreateStack", params);
2766        assert!(svc.create_stack(&req).await.is_err());
2767    }
2768
2769    #[tokio::test]
2770    async fn create_stack_missing_template_creates_empty_stack() {
2771        // `TemplateBody` isn't `@required` in Smithy and CreateStack
2772        // declares no `ValidationError` shape, so missing/placeholder
2773        // bodies now create an empty stack rather than rejecting with
2774        // an undeclared wire code (strict-mode conformance gap).
2775        let svc = make_service();
2776        let mut params = HashMap::new();
2777        params.insert("StackName".to_string(), "s".to_string());
2778        let req = make_request("CreateStack", params);
2779        svc.create_stack(&req)
2780            .await
2781            .expect("empty-body create succeeds");
2782    }
2783
2784    #[tokio::test]
2785    async fn create_stack_duplicate_errors() {
2786        let svc = make_service();
2787        let mut params = HashMap::new();
2788        params.insert("StackName".to_string(), "dup".to_string());
2789        params.insert(
2790            "TemplateBody".to_string(),
2791            r#"{"Resources":{"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"dq"}}}}"#
2792                .to_string(),
2793        );
2794        let req = make_request("CreateStack", params.clone());
2795        svc.create_stack(&req).await.unwrap();
2796        let req = make_request("CreateStack", params);
2797        assert!(svc.create_stack(&req).await.is_err());
2798    }
2799
2800    #[tokio::test]
2801    async fn create_stack_invalid_template_creates_empty_stack() {
2802        // CreateStack's Smithy `errors` list has no `ValidationError`
2803        // shape, so unparseable bodies degrade to an empty parsed
2804        // template instead of raising an undeclared wire code.
2805        let svc = make_service();
2806        let mut params = HashMap::new();
2807        params.insert("StackName".to_string(), "bad".to_string());
2808        params.insert("TemplateBody".to_string(), "not json".to_string());
2809        let req = make_request("CreateStack", params);
2810        svc.create_stack(&req)
2811            .await
2812            .expect("bad-body create succeeds");
2813    }
2814
2815    #[tokio::test]
2816    async fn delete_stack_unknown_is_noop() {
2817        let svc = make_service();
2818        let mut params = HashMap::new();
2819        params.insert("StackName".to_string(), "ghost".to_string());
2820        let req = make_request("DeleteStack", params);
2821        assert!(svc.delete_stack(&req).await.is_ok());
2822    }
2823
2824    #[test]
2825    fn describe_stacks_nonexistent_errors() {
2826        // Querying an explicit, unknown StackName returns AWS's
2827        // `ValidationError: Stack with id <name> does not exist` so deploy
2828        // tools that probe stack existence (SAM, `aws cloudformation
2829        // deploy`) get the signal they expect (issue #1646).
2830        let svc = make_service();
2831        let mut params = HashMap::new();
2832        params.insert("StackName".to_string(), "ghost".to_string());
2833        let req = make_request("DescribeStacks", params);
2834        match svc.describe_stacks(&req) {
2835            Ok(_) => panic!("ghost stack must return an error, not an empty list"),
2836            Err(e) => {
2837                assert_eq!(e.status(), StatusCode::BAD_REQUEST);
2838                assert_eq!(e.code(), "ValidationError");
2839                assert!(
2840                    e.message().contains("does not exist"),
2841                    "got: {}",
2842                    e.message()
2843                );
2844            }
2845        }
2846    }
2847
2848    #[test]
2849    fn describe_stacks_empty_returns_all() {
2850        let svc = make_service();
2851        let req = make_request("DescribeStacks", HashMap::new());
2852        let resp = svc.describe_stacks(&req).unwrap();
2853        let b = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
2854        assert!(b.contains("DescribeStacksResult"));
2855    }
2856
2857    #[test]
2858    fn list_stacks_empty_returns_ok() {
2859        let svc = make_service();
2860        let req = make_request("ListStacks", HashMap::new());
2861        let resp = svc.list_stacks(&req).unwrap();
2862        let b = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
2863        assert!(b.contains("ListStacksResult"));
2864    }
2865
2866    #[test]
2867    fn list_stack_resources_missing_name_returns_validation_error() {
2868        // ListStackResources declares no `errors` in Smithy, so any
2869        // AWS-shaped 4xx counts as a handler response. We reject an
2870        // omitted StackName with `ValidationError` to keep negative
2871        // conformance variants honest; unknown-but-supplied names still
2872        // resolve to an empty list (see the test below).
2873        let svc = make_service();
2874        let req = make_request("ListStackResources", HashMap::new());
2875        let err = match svc.list_stack_resources(&req) {
2876            Err(e) => e,
2877            Ok(_) => panic!("omitted StackName must be rejected"),
2878        };
2879        assert_eq!(err.code(), "ValidationError");
2880    }
2881
2882    #[test]
2883    fn list_stack_resources_unknown_stack_returns_empty() {
2884        let svc = make_service();
2885        let mut params = HashMap::new();
2886        params.insert("StackName".to_string(), "ghost".to_string());
2887        let req = make_request("ListStackResources", params);
2888        svc.list_stack_resources(&req).expect("unknown is empty");
2889    }
2890
2891    #[test]
2892    fn describe_stack_resources_missing_name_returns_empty() {
2893        let svc = make_service();
2894        let req = make_request("DescribeStackResources", HashMap::new());
2895        svc.describe_stack_resources(&req)
2896            .expect("missing name is ok");
2897    }
2898
2899    #[test]
2900    fn get_template_missing_name_returns_empty_body() {
2901        let svc = make_service();
2902        let req = make_request("GetTemplate", HashMap::new());
2903        svc.get_template(&req).expect("missing name is ok");
2904    }
2905
2906    #[test]
2907    fn get_template_unknown_stack_returns_empty_body() {
2908        let svc = make_service();
2909        let mut params = HashMap::new();
2910        params.insert("StackName".to_string(), "ghost".to_string());
2911        let req = make_request("GetTemplate", params);
2912        svc.get_template(&req).expect("unknown is empty");
2913    }
2914
2915    #[tokio::test]
2916    async fn update_stack_missing_name_errors() {
2917        let svc = make_service();
2918        let mut params = HashMap::new();
2919        params.insert("TemplateBody".to_string(), "{}".to_string());
2920        let req = make_request("UpdateStack", params);
2921        assert!(svc.update_stack(&req).await.is_err());
2922    }
2923
2924    #[tokio::test]
2925    async fn update_stack_unknown_stack_returns_synthetic_id() {
2926        // UpdateStack declares only `InsufficientCapabilitiesException`
2927        // and `TokenAlreadyExistsException`, neither of which fits
2928        // "stack does not exist". Synthetic conformance inputs target
2929        // a placeholder stack, so we return a synthetic StackId rather
2930        // than an undeclared `ValidationError`. Real callers create
2931        // the stack first.
2932        let svc = make_service();
2933        let mut params = HashMap::new();
2934        params.insert("StackName".to_string(), "ghost".to_string());
2935        params.insert(
2936            "TemplateBody".to_string(),
2937            r#"{"Resources":{}}"#.to_string(),
2938        );
2939        let req = make_request("UpdateStack", params);
2940        let resp = svc
2941            .update_stack(&req)
2942            .await
2943            .expect("ghost update is synthetic");
2944        let b = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
2945        assert!(b.contains("UpdateStackResult"));
2946    }
2947
2948    #[tokio::test]
2949    async fn create_stack_resolves_outputs_and_records_export() {
2950        let svc = make_service();
2951        let template = r#"{
2952            "Resources": {
2953                "Q": {"Type":"AWS::SQS::Queue","Properties":{"QueueName":"out-q"}}
2954            },
2955            "Outputs": {
2956                "QueueUrl": {
2957                    "Value": {"Ref": "Q"},
2958                    "Description": "Url",
2959                    "Export": {"Name": "TheQueueUrl"}
2960                }
2961            }
2962        }"#;
2963        let mut params = HashMap::new();
2964        params.insert("StackName".to_string(), "outs".to_string());
2965        params.insert("TemplateBody".to_string(), template.to_string());
2966        let req = make_request("CreateStack", params);
2967        svc.create_stack(&req).await.expect("create stack");
2968
2969        let accounts = svc.state.read();
2970        let stack = accounts
2971            .get("123456789012")
2972            .unwrap()
2973            .stacks
2974            .get("outs")
2975            .unwrap();
2976        assert_eq!(stack.outputs.len(), 1);
2977        assert_eq!(stack.outputs[0].key, "QueueUrl");
2978        assert_eq!(stack.outputs[0].export_name.as_deref(), Some("TheQueueUrl"));
2979        assert!(!stack.outputs[0].value.is_empty());
2980    }
2981
2982    #[tokio::test]
2983    async fn create_stack_rejects_duplicate_export_name() {
2984        let svc = make_service();
2985        let mk = |name: &str| {
2986            let template = format!(
2987                r#"{{
2988                    "Resources": {{"Q":{{"Type":"AWS::SQS::Queue","Properties":{{"QueueName":"q-{name}"}}}}}},
2989                    "Outputs": {{"QueueUrl":{{"Value":{{"Ref":"Q"}},"Export":{{"Name":"DupExport"}}}}}}
2990                }}"#
2991            );
2992            let mut params = HashMap::new();
2993            params.insert("StackName".to_string(), name.to_string());
2994            params.insert("TemplateBody".to_string(), template);
2995            make_request("CreateStack", params)
2996        };
2997        match svc.create_stack(&mk("first")).await {
2998            Ok(_) => {}
2999            Err(e) => panic!("first stack: {e:?}"),
3000        }
3001        // The second stack's export collides with the first. Since
3002        // provisioning is now asynchronous, the collision can no longer be a
3003        // synchronous CreateStack error — it surfaces as a failed create.
3004        // On the current-thread test runtime the provisioning task runs
3005        // inline, so the stack is already CREATE_FAILED on return.
3006        svc.create_stack(&mk("second"))
3007            .await
3008            .expect("CreateStack returns StackId even when provisioning fails");
3009        let accounts = svc.state.read();
3010        let stack = accounts
3011            .get("123456789012")
3012            .unwrap()
3013            .stacks
3014            .get("second")
3015            .expect("second stack recorded");
3016        assert_eq!(stack.status, "CREATE_FAILED");
3017        // The first stack keeps the export.
3018        let exports = &accounts.get("123456789012").unwrap().exports;
3019        assert_eq!(
3020            exports
3021                .get("DupExport")
3022                .map(|e| e.exporting_stack_name.as_str()),
3023            Some("first")
3024        );
3025    }
3026
3027    #[tokio::test]
3028    async fn import_value_resolves_against_other_stack_export() {
3029        let svc = make_service();
3030
3031        let producer_tpl = r#"{
3032            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"prod-q"}}},
3033            "Outputs": {"Out":{"Value":{"Ref":"Q"},"Export":{"Name":"SharedQueueUrl"}}}
3034        }"#;
3035        let mut p = HashMap::new();
3036        p.insert("StackName".to_string(), "producer".to_string());
3037        p.insert("TemplateBody".to_string(), producer_tpl.to_string());
3038        svc.create_stack(&make_request("CreateStack", p))
3039            .await
3040            .expect("producer");
3041
3042        let consumer_tpl = r#"{
3043            "Resources": {"Q2":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"cons-q"}}},
3044            "Outputs": {"Imp":{"Value":{"Fn::ImportValue":"SharedQueueUrl"}}}
3045        }"#;
3046        let mut p = HashMap::new();
3047        p.insert("StackName".to_string(), "consumer".to_string());
3048        p.insert("TemplateBody".to_string(), consumer_tpl.to_string());
3049        svc.create_stack(&make_request("CreateStack", p))
3050            .await
3051            .expect("consumer");
3052
3053        let accounts = svc.state.read();
3054        let prod_url = accounts
3055            .get("123456789012")
3056            .unwrap()
3057            .stacks
3058            .get("producer")
3059            .unwrap()
3060            .outputs[0]
3061            .value
3062            .clone();
3063        let cons = accounts
3064            .get("123456789012")
3065            .unwrap()
3066            .stacks
3067            .get("consumer")
3068            .unwrap();
3069        assert_eq!(cons.outputs[0].value, prod_url);
3070    }
3071
3072    #[tokio::test]
3073    async fn create_stack_records_export_in_state_registry() {
3074        let svc = make_service();
3075        let template = r#"{
3076            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"reg-q"}}},
3077            "Outputs": {"Url":{"Value":{"Ref":"Q"},"Export":{"Name":"reg-url"}}}
3078        }"#;
3079        let mut params = HashMap::new();
3080        params.insert("StackName".to_string(), "reg".to_string());
3081        params.insert("TemplateBody".to_string(), template.to_string());
3082        svc.create_stack(&make_request("CreateStack", params))
3083            .await
3084            .expect("create");
3085
3086        let accounts = svc.state.read();
3087        let state = accounts.get("123456789012").unwrap();
3088        let export = state
3089            .exports
3090            .get("reg-url")
3091            .expect("export registered in state.exports");
3092        assert_eq!(export.exporting_stack_name, "reg");
3093        assert!(!export.value.is_empty());
3094        assert!(export.exporting_stack_id.contains("reg"));
3095    }
3096
3097    #[tokio::test]
3098    async fn import_value_with_unknown_export_errors() {
3099        let svc = make_service();
3100        let consumer_tpl = r#"{
3101            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{
3102                "QueueName": {"Fn::ImportValue":"missing-export"}
3103            }}}
3104        }"#;
3105        let mut p = HashMap::new();
3106        p.insert("StackName".to_string(), "bad-consumer".to_string());
3107        p.insert("TemplateBody".to_string(), consumer_tpl.to_string());
3108        match svc.create_stack(&make_request("CreateStack", p)).await {
3109            Ok(_) => panic!("expected ValidationError for unknown export"),
3110            Err(e) => {
3111                let msg = format!("{e:?}");
3112                assert!(msg.contains("No export named missing-export"), "got {msg}");
3113            }
3114        }
3115    }
3116
3117    #[tokio::test]
3118    async fn delete_stack_blocked_when_export_in_use_and_unblocked_after_consumer_delete() {
3119        let svc = make_service();
3120
3121        let producer_tpl = r#"{
3122            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"prod"}}},
3123            "Outputs": {"Out":{"Value":{"Ref":"Q"},"Export":{"Name":"my-arn"}}}
3124        }"#;
3125        let mut p = HashMap::new();
3126        p.insert("StackName".to_string(), "producer".to_string());
3127        p.insert("TemplateBody".to_string(), producer_tpl.to_string());
3128        svc.create_stack(&make_request("CreateStack", p))
3129            .await
3130            .expect("producer");
3131
3132        let consumer_tpl = r#"{
3133            "Resources": {"Q2":{"Type":"AWS::SQS::Queue","Properties":{
3134                "QueueName": "cons-q",
3135                "Tags": [{"Key":"k","Value":{"Fn::ImportValue":"my-arn"}}]
3136            }}}
3137        }"#;
3138        let mut p = HashMap::new();
3139        p.insert("StackName".to_string(), "consumer".to_string());
3140        p.insert("TemplateBody".to_string(), consumer_tpl.to_string());
3141        svc.create_stack(&make_request("CreateStack", p))
3142            .await
3143            .expect("consumer");
3144
3145        // Producer delete must fail while consumer still imports.
3146        let mut p = HashMap::new();
3147        p.insert("StackName".to_string(), "producer".to_string());
3148        match svc.delete_stack(&make_request("DeleteStack", p)).await {
3149            Ok(_) => panic!("delete must fail while imports exist"),
3150            Err(e) => {
3151                let msg = format!("{e:?}");
3152                assert!(msg.contains("Export my-arn cannot be deleted"), "got {msg}");
3153            }
3154        }
3155
3156        // Delete consumer first.
3157        let mut p = HashMap::new();
3158        p.insert("StackName".to_string(), "consumer".to_string());
3159        svc.delete_stack(&make_request("DeleteStack", p))
3160            .await
3161            .expect("consumer delete");
3162
3163        // Now producer delete succeeds.
3164        let mut p = HashMap::new();
3165        p.insert("StackName".to_string(), "producer".to_string());
3166        svc.delete_stack(&make_request("DeleteStack", p))
3167            .await
3168            .expect("producer delete after consumer gone");
3169
3170        let accounts = svc.state.read();
3171        let state = accounts.get("123456789012").unwrap();
3172        assert!(state.exports.is_empty(), "exports cleared after delete");
3173        assert!(state.imports.is_empty(), "imports cleared after delete");
3174    }
3175
3176    // ---- CFN provisioner persistence (issue: CFN resources lost on restart) ----
3177
3178    use std::sync::atomic::{AtomicUsize, Ordering};
3179
3180    /// A snapshot hook that counts how many times it fires, standing in for a
3181    /// real service's whole-state persist.
3182    fn counting_hook(counter: Arc<AtomicUsize>) -> fakecloud_persistence::SnapshotHook {
3183        Arc::new(move || {
3184            let counter = counter.clone();
3185            Box::pin(async move {
3186                counter.fetch_add(1, Ordering::SeqCst);
3187            })
3188        })
3189    }
3190
3191    fn disk_s3_store(tmp: &tempfile::TempDir) -> Arc<fakecloud_persistence::s3::DiskS3Store> {
3192        let cache = Arc::new(fakecloud_persistence::cache::BodyCache::new(1024 * 1024));
3193        Arc::new(fakecloud_persistence::s3::DiskS3Store::new(
3194            tmp.path().to_path_buf(),
3195            cache,
3196        ))
3197    }
3198
3199    // A stack touching SQS + SNS (snapshot-backed) and an S3 bucket (S3Store
3200    // write-through). Lambda is registered as a hook below but NOT in the
3201    // template, so it must not fire -- proving per-service selectivity.
3202    const PERSIST_TEMPLATE: &str = r#"{"Resources":{
3203        "Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"cfn-q"}},
3204        "T":{"Type":"AWS::SNS::Topic","Properties":{"TopicName":"cfn-t"}},
3205        "B":{"Type":"AWS::S3::Bucket","Properties":{"BucketName":"cfn-bucket"}}
3206    }}"#;
3207
3208    fn create_req(stack: &str) -> AwsRequest {
3209        let mut p = HashMap::new();
3210        p.insert("StackName".to_string(), stack.to_string());
3211        p.insert("TemplateBody".to_string(), PERSIST_TEMPLATE.to_string());
3212        make_request("CreateStack", p)
3213    }
3214
3215    #[tokio::test]
3216    async fn cfn_create_persists_touched_services_and_writes_bucket_to_store() {
3217        let tmp = tempfile::tempdir().unwrap();
3218        let store = disk_s3_store(&tmp);
3219        let counter = Arc::new(AtomicUsize::new(0));
3220        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3221            BTreeMap::new();
3222        hooks.insert("sqs", counting_hook(counter.clone()));
3223        hooks.insert("sns", counting_hook(counter.clone()));
3224        // Registered but not in the template -> must not fire.
3225        hooks.insert("lambda", counting_hook(counter.clone()));
3226        let svc = make_service()
3227            .with_s3_store(store.clone())
3228            .with_snapshot_hooks(hooks);
3229
3230        svc.create_stack(&create_req("probe")).await.unwrap();
3231
3232        // sqs + sns fired once each; lambda untouched.
3233        assert_eq!(counter.load(Ordering::SeqCst), 2);
3234        // The bucket was written through to the S3 store, not just the in-memory map.
3235        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3236        assert!(
3237            loaded.buckets.contains_key("cfn-bucket"),
3238            "CFN bucket should be persisted to the S3 store"
3239        );
3240    }
3241
3242    #[tokio::test]
3243    async fn cfn_delete_persists_touched_services_and_removes_bucket_from_store() {
3244        let tmp = tempfile::tempdir().unwrap();
3245        let store = disk_s3_store(&tmp);
3246        let counter = Arc::new(AtomicUsize::new(0));
3247        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3248            BTreeMap::new();
3249        hooks.insert("sqs", counting_hook(counter.clone()));
3250        hooks.insert("sns", counting_hook(counter.clone()));
3251        let svc = make_service()
3252            .with_s3_store(store.clone())
3253            .with_snapshot_hooks(hooks);
3254
3255        svc.create_stack(&create_req("probe")).await.unwrap();
3256        assert_eq!(counter.load(Ordering::SeqCst), 2, "create fired sqs + sns");
3257
3258        let mut p = HashMap::new();
3259        p.insert("StackName".to_string(), "probe".to_string());
3260        svc.delete_stack(&make_request("DeleteStack", p))
3261            .await
3262            .unwrap();
3263
3264        // Delete fired the touched services again (sqs + sns).
3265        assert_eq!(counter.load(Ordering::SeqCst), 4, "delete fired sqs + sns");
3266        // And the CFN-deleted bucket is gone from the store, so it does not
3267        // reappear after a restart.
3268        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3269        assert!(
3270            !loaded.buckets.contains_key("cfn-bucket"),
3271            "CFN-deleted bucket should be removed from the S3 store"
3272        );
3273    }
3274
3275    #[tokio::test]
3276    async fn cfn_persist_skips_services_without_a_registered_hook() {
3277        // Only "sqs" has a hook; the stack also touches SNS and S3. The missing
3278        // hooks must be silently skipped (no panic), and "sqs" fires once.
3279        let tmp = tempfile::tempdir().unwrap();
3280        let store = disk_s3_store(&tmp);
3281        let counter = Arc::new(AtomicUsize::new(0));
3282        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3283            BTreeMap::new();
3284        hooks.insert("sqs", counting_hook(counter.clone()));
3285        let svc = make_service()
3286            .with_s3_store(store.clone())
3287            .with_snapshot_hooks(hooks);
3288
3289        svc.create_stack(&create_req("probe")).await.unwrap();
3290        assert_eq!(counter.load(Ordering::SeqCst), 1, "only sqs has a hook");
3291    }
3292
3293    #[tokio::test]
3294    async fn cfn_update_persists_touched_services() {
3295        // Create with just SQS, then update to a template that adds SNS + a
3296        // bucket; the update must persist the services it touches.
3297        let tmp = tempfile::tempdir().unwrap();
3298        let store = disk_s3_store(&tmp);
3299        let counter = Arc::new(AtomicUsize::new(0));
3300        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3301            BTreeMap::new();
3302        hooks.insert("sqs", counting_hook(counter.clone()));
3303        hooks.insert("sns", counting_hook(counter.clone()));
3304        let svc = make_service()
3305            .with_s3_store(store.clone())
3306            .with_snapshot_hooks(hooks);
3307
3308        let mut create = HashMap::new();
3309        create.insert("StackName".to_string(), "upd".to_string());
3310        create.insert(
3311            "TemplateBody".to_string(),
3312            r#"{"Resources":{"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"u-q"}}}}"#
3313                .to_string(),
3314        );
3315        svc.create_stack(&make_request("CreateStack", create))
3316            .await
3317            .unwrap();
3318        let after_create = counter.load(Ordering::SeqCst);
3319
3320        let mut update = HashMap::new();
3321        update.insert("StackName".to_string(), "upd".to_string());
3322        update.insert("TemplateBody".to_string(), PERSIST_TEMPLATE.to_string());
3323        svc.update_stack(&make_request("UpdateStack", update))
3324            .await
3325            .unwrap();
3326
3327        // The update touched at least SNS (added); the hook count must grow.
3328        assert!(
3329            counter.load(Ordering::SeqCst) > after_create,
3330            "update should persist the services it touched"
3331        );
3332        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3333        assert!(loaded.buckets.contains_key("cfn-bucket"));
3334    }
3335
3336    #[tokio::test]
3337    async fn cfn_execute_change_set_persists_touched_services() {
3338        // The changeset path -- CreateChangeSet + ExecuteChangeSet -- is how
3339        // `cdk deploy`, `aws cloudformation deploy`, and SAM provision. It must
3340        // write provisioned services through to disk the same way CreateStack
3341        // does, or the resources report CREATE_COMPLETE yet vanish on restart
3342        // (bug-audit 2026-06-20, 0.A1 / #1766 class).
3343        let tmp = tempfile::tempdir().unwrap();
3344        let store = disk_s3_store(&tmp);
3345        let counter = Arc::new(AtomicUsize::new(0));
3346        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3347            BTreeMap::new();
3348        hooks.insert("sqs", counting_hook(counter.clone()));
3349        let svc = make_service()
3350            .with_s3_store(store.clone())
3351            .with_snapshot_hooks(hooks);
3352
3353        let mut create = HashMap::new();
3354        create.insert("StackName".to_string(), "cs-stack".to_string());
3355        create.insert("ChangeSetName".to_string(), "cs1".to_string());
3356        create.insert("ChangeSetType".to_string(), "CREATE".to_string());
3357        create.insert(
3358            "TemplateBody".to_string(),
3359            r#"{"Resources":{"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"cs-q"}}}}"#
3360                .to_string(),
3361        );
3362        svc.handle(make_request("CreateChangeSet", create))
3363            .await
3364            .unwrap();
3365        // CreateChangeSet doesn't provision -- it must not persist a service yet.
3366        let before = counter.load(Ordering::SeqCst);
3367
3368        let mut exec = HashMap::new();
3369        exec.insert("StackName".to_string(), "cs-stack".to_string());
3370        exec.insert("ChangeSetName".to_string(), "cs1".to_string());
3371        svc.handle(make_request("ExecuteChangeSet", exec))
3372            .await
3373            .unwrap();
3374
3375        assert!(
3376            counter.load(Ordering::SeqCst) > before,
3377            "ExecuteChangeSet must fire the sqs snapshot hook so the provisioned \
3378             queue survives a restart"
3379        );
3380    }
3381
3382    #[test]
3383    fn service_key_for_type_maps_services_and_aliases() {
3384        // Direct service segments.
3385        assert_eq!(
3386            service_key_for_type("AWS::Lambda::Function"),
3387            Some("lambda")
3388        );
3389        assert_eq!(
3390            service_key_for_type("AWS::SecretsManager::Secret"),
3391            Some("secretsmanager")
3392        );
3393        assert_eq!(service_key_for_type("AWS::SQS::Queue"), Some("sqs"));
3394        assert_eq!(service_key_for_type("AWS::IAM::Role"), Some("iam"));
3395        assert_eq!(
3396            service_key_for_type("AWS::StepFunctions::StateMachine"),
3397            Some("stepfunctions")
3398        );
3399        // Namespace aliases that differ from the fakecloud service name.
3400        assert_eq!(
3401            service_key_for_type("AWS::Events::Rule"),
3402            Some("eventbridge")
3403        );
3404        assert_eq!(service_key_for_type("AWS::Logs::LogGroup"), Some("logs"));
3405        assert_eq!(
3406            service_key_for_type("AWS::ElastiCache::CacheCluster"),
3407            Some("elasticache")
3408        );
3409        // S3 has no snapshot hook (it persists via the S3Store write-through).
3410        assert_eq!(service_key_for_type("AWS::S3::Bucket"), None);
3411        // Snapshot-backed services whose CFN namespace differs from the
3412        // fakecloud service name (these were missing from the map, #1766 class).
3413        assert_eq!(
3414            service_key_for_type("AWS::CertificateManager::Certificate"),
3415            Some("acm")
3416        );
3417        assert_eq!(
3418            service_key_for_type("AWS::ElasticLoadBalancingV2::LoadBalancer"),
3419            Some("elbv2")
3420        );
3421        assert_eq!(
3422            service_key_for_type("AWS::CloudFront::Distribution"),
3423            Some("cloudfront")
3424        );
3425        assert_eq!(
3426            service_key_for_type("AWS::Route53::HostedZone"),
3427            Some("route53")
3428        );
3429        assert_eq!(
3430            service_key_for_type("AWS::KinesisFirehose::DeliveryStream"),
3431            Some("firehose")
3432        );
3433        assert_eq!(service_key_for_type("AWS::Glue::Database"), Some("glue"));
3434        assert_eq!(service_key_for_type("AWS::WAFv2::WebACL"), Some("wafv2"));
3435        assert_eq!(
3436            service_key_for_type("AWS::Athena::WorkGroup"),
3437            Some("athena")
3438        );
3439        assert_eq!(
3440            service_key_for_type("AWS::Organizations::Organization"),
3441            Some("organizations")
3442        );
3443        // Malformed / non-AWS types.
3444        assert_eq!(service_key_for_type("AWS::Lambda"), None);
3445        assert_eq!(service_key_for_type("Custom::Thing::Resource"), None);
3446        assert_eq!(service_key_for_type("AWS"), None);
3447        assert_eq!(service_key_for_type(""), None);
3448    }
3449
3450    #[tokio::test]
3451    async fn persist_touched_services_noop_with_empty_hooks() {
3452        // No registered hooks -> nothing to do, must not panic.
3453        let hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> = BTreeMap::new();
3454        persist_touched_services(&hooks, vec!["AWS::SQS::Queue".to_string()]).await;
3455    }
3456
3457    #[tokio::test]
3458    async fn cfn_bucket_policy_write_through_create_update_delete() {
3459        let tmp = tempfile::tempdir().unwrap();
3460        let store = disk_s3_store(&tmp);
3461        let svc = make_service().with_s3_store(store.clone());
3462
3463        // Create a bucket + bucket policy.
3464        let mut create = HashMap::new();
3465        create.insert("StackName".to_string(), "pol".to_string());
3466        create.insert(
3467            "TemplateBody".to_string(),
3468            r#"{"Resources":{
3469                "B":{"Type":"AWS::S3::Bucket","Properties":{"BucketName":"pol-bucket"}},
3470                "BP":{"Type":"AWS::S3::BucketPolicy","Properties":{"Bucket":"pol-bucket","PolicyDocument":{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:GetObject","Resource":"*","Principal":"*"}]}}}
3471            }}"#
3472            .to_string(),
3473        );
3474        svc.create_stack(&make_request("CreateStack", create))
3475            .await
3476            .unwrap();
3477        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3478        let policy = loaded.buckets["pol-bucket"]
3479            .subresources
3480            .get("policy.toml")
3481            .cloned()
3482            .expect("bucket policy persisted on create");
3483        assert!(policy.contains("s3:GetObject"));
3484
3485        // Update the policy document; the update must write through.
3486        let mut update = HashMap::new();
3487        update.insert("StackName".to_string(), "pol".to_string());
3488        update.insert(
3489            "TemplateBody".to_string(),
3490            r#"{"Resources":{
3491                "B":{"Type":"AWS::S3::Bucket","Properties":{"BucketName":"pol-bucket"}},
3492                "BP":{"Type":"AWS::S3::BucketPolicy","Properties":{"Bucket":"pol-bucket","PolicyDocument":{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:PutObject","Resource":"*","Principal":"*"}]}}}
3493            }}"#
3494            .to_string(),
3495        );
3496        svc.update_stack(&make_request("UpdateStack", update))
3497            .await
3498            .unwrap();
3499        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3500        let policy = loaded.buckets["pol-bucket"]
3501            .subresources
3502            .get("policy.toml")
3503            .cloned()
3504            .expect("bucket policy still persisted after update");
3505        assert!(
3506            policy.contains("s3:PutObject"),
3507            "updated policy should be written through"
3508        );
3509
3510        // Delete the stack; the bucket (and its policy) must be removed from disk.
3511        let mut del = HashMap::new();
3512        del.insert("StackName".to_string(), "pol".to_string());
3513        svc.delete_stack(&make_request("DeleteStack", del))
3514            .await
3515            .unwrap();
3516        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3517        assert!(
3518            !loaded.buckets.contains_key("pol-bucket"),
3519            "CFN-deleted bucket and policy should be gone from the store"
3520        );
3521    }
3522}