Skip to main content

fakecloud_cloudformation/
service.rs

1use async_trait::async_trait;
2use chrono::Utc;
3use http::StatusCode;
4use std::collections::{BTreeMap, BTreeSet};
5use std::sync::Arc;
6
7use fakecloud_core::delivery::DeliveryBus;
8use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
9use fakecloud_dynamodb::SharedDynamoDbState;
10use fakecloud_eventbridge::SharedEventBridgeState;
11use fakecloud_iam::SharedIamState;
12use fakecloud_logs::SharedLogsState;
13use fakecloud_persistence::{S3Store, SnapshotHook, SnapshotStore};
14use fakecloud_s3::SharedS3State;
15use fakecloud_sns::SharedSnsState;
16use fakecloud_sqs::SharedSqsState;
17use fakecloud_ssm::SharedSsmState;
18use tokio::sync::Mutex as AsyncMutex;
19
20use crate::resource_provisioner::ResourceProvisioner;
21use crate::state;
22use crate::state::{
23    CloudFormationSnapshot, CloudFormationState, SharedCloudFormationState, Stack, StackResource,
24    CLOUDFORMATION_SNAPSHOT_SCHEMA_VERSION,
25};
26use crate::template;
27use crate::xml_responses;
28
29/// Canonical `Fn::GetAtt` attribute names per resource type. Used to
30/// supplement the eagerly-captured `ProvisionResult::attributes` with
31/// live-state lookups via `ResourceProvisioner::get_att`. Resources not
32/// listed here just use whatever the create handler captured.
33fn well_known_attributes_for(resource_type: &str) -> &'static [&'static str] {
34    match resource_type {
35        "AWS::S3::Bucket" => &[
36            "Arn",
37            "DomainName",
38            "RegionalDomainName",
39            "DualStackDomainName",
40            "WebsiteURL",
41        ],
42        "AWS::Lambda::Function" => &["Arn", "FunctionUrl", "Version"],
43        "AWS::IAM::Role" => &["Arn", "RoleId"],
44        "AWS::SQS::Queue" => &["Arn", "QueueName", "QueueUrl"],
45        "AWS::SNS::Topic" => &["TopicArn", "TopicName"],
46        "AWS::DynamoDB::Table" => &["Arn", "StreamArn"],
47        "AWS::KMS::Key" => &["Arn", "KeyId"],
48        "AWS::SecretsManager::Secret" => &["Arn", "Id"],
49        "AWS::CloudFront::Distribution" => &["DomainName", "Id"],
50        "AWS::EC2::VPC" => &["VpcId", "CidrBlock"],
51        "AWS::EC2::Subnet" => &["SubnetId", "AvailabilityZone", "VpcId", "CidrBlock"],
52        "AWS::EC2::SecurityGroup" => &["GroupId", "VpcId"],
53        "AWS::EC2::InternetGateway" => &["InternetGatewayId"],
54        "AWS::EC2::RouteTable" => &["RouteTableId"],
55        _ => &[],
56    }
57}
58
59/// Map a CloudFormation resource type (`AWS::<Service>::<Resource>`) to the
60/// snapshot-hook key for its owning service (the keys of
61/// `CloudFormationDeps::snapshot_hooks`). The key is the service segment, with
62/// aliases for the few services whose CloudFormation namespace differs from
63/// their fakecloud service name (e.g. `Events` -> `eventbridge`).
64///
65/// `AWS::S3::*` is intentionally absent: S3 buckets persist through the
66/// `S3Store` write-through path in the provisioner, not a whole-state snapshot
67/// hook. Returns `None` for malformed types and any service whose state is not
68/// snapshot-backed (those CFN resources have no persistence path either way).
69fn service_key_for_type(resource_type: &str) -> Option<&'static str> {
70    let mut parts = resource_type.split("::");
71    let vendor = parts.next()?;
72    let service = parts.next()?;
73    // A real resource type has three segments; a 2-part string (or fewer) is
74    // not one.
75    parts.next()?;
76    if vendor != "AWS" {
77        return None;
78    }
79    Some(match service {
80        "Lambda" => "lambda",
81        "SecretsManager" => "secretsmanager",
82        "SQS" => "sqs",
83        "SNS" => "sns",
84        "DynamoDB" => "dynamodb",
85        "StepFunctions" => "stepfunctions",
86        "Events" => "eventbridge",
87        "SSM" => "ssm",
88        "Logs" => "logs",
89        "KMS" => "kms",
90        "Kinesis" => "kinesis",
91        "SES" => "ses",
92        "Cognito" => "cognito",
93        "RDS" => "rds",
94        "ElastiCache" => "elasticache",
95        "ECR" => "ecr",
96        "ECS" => "ecs",
97        "CloudWatch" => "cloudwatch",
98        "ApiGateway" => "apigateway",
99        "ApiGatewayV2" => "apigatewayv2",
100        "Bedrock" => "bedrock",
101        "Scheduler" => "scheduler",
102        "IAM" => "iam",
103        // Services whose CloudFormation namespace differs from their fakecloud
104        // service name, and which were missing from this table: their
105        // provisioner mutates snapshot-backed state directly, so CFN-created
106        // (and CFN-deleted) resources vanished on restart while their
107        // direct-API equivalents persisted (#1766 class). Each has a registered
108        // snapshot hook in the server.
109        "CertificateManager" => "acm",
110        "ElasticLoadBalancingV2" => "elbv2",
111        "CloudFront" => "cloudfront",
112        "Route53" => "route53",
113        "KinesisFirehose" => "firehose",
114        "Glue" => "glue",
115        "WAFv2" => "wafv2",
116        "Athena" => "athena",
117        "Organizations" => "organizations",
118        // EC2 and ApplicationAutoScaling were wired into the provisioner after
119        // this table was last audited (EC2: 2026-06-25 #1957;
120        // application-autoscaling: persistence sweep), so their CFN-created VPC
121        // / Subnet / SecurityGroup / scalable-target state mutated in memory and
122        // vanished on restart (#1766 class). Both have a registered snapshot
123        // hook in the server.
124        "EC2" => "ec2",
125        "ApplicationAutoScaling" => "application-autoscaling",
126        _ => return None,
127    })
128}
129
130/// Persist each distinct snapshot-backed service touched by a stack op, once.
131///
132/// `resource_types` is the set of `resource_type` strings the op created /
133/// updated / deleted. The CloudFormation provisioner mutates services' shared
134/// state directly and never triggers their snapshot path; this writes that
135/// state through to disk afterwards, so a CFN-provisioned (or CFN-deleted)
136/// resource survives a restart. Services with no registered hook (memory mode,
137/// or non-snapshot-backed services) are skipped.
138async fn persist_touched_services<I>(
139    hooks: &BTreeMap<&'static str, SnapshotHook>,
140    resource_types: I,
141) where
142    I: IntoIterator<Item = String>,
143{
144    if hooks.is_empty() {
145        return;
146    }
147    let mut keys: BTreeSet<&'static str> = BTreeSet::new();
148    for ty in resource_types {
149        if let Some(key) = service_key_for_type(&ty) {
150            keys.insert(key);
151        }
152    }
153    for key in keys {
154        if let Some(hook) = hooks.get(key) {
155            hook().await;
156        }
157    }
158}
159
160/// Multi-pass provisioning for all resources in a parsed template.
161///
162/// Resources may `Ref` each other in either direction, and JSON object
163/// iteration order isn't stable, so a single forward pass isn't enough
164/// to resolve them. We loop: each pass tries every pending resource, and
165/// any resource whose `Ref` targets are still unknown just stays pending
166/// for the next pass. When no pass makes progress we report the first
167/// pending failure and rollback.
168pub(crate) fn provision_stack_resources(
169    provisioner: &ResourceProvisioner,
170    resource_defs: &[template::ResourceDefinition],
171    template_body: &str,
172    parameters: &BTreeMap<String, String>,
173    imports: &BTreeMap<String, String>,
174) -> Result<Vec<StackResource>, AwsServiceError> {
175    let mut resources = Vec::new();
176    let mut physical_ids: BTreeMap<String, String> = BTreeMap::new();
177    let mut attributes: BTreeMap<String, BTreeMap<String, String>> = BTreeMap::new();
178    // Seed the work list in dependency order so a referenced resource is
179    // provisioned before its referrer and `Ref`/`GetAtt`/`Fn::Sub` resolve to
180    // physical ids. The fixed-point retry loop below still recovers from any
181    // residual ordering the graph can't express.
182    let order = template::dependency_order(template_body, parameters, resource_defs);
183    let mut pending: Vec<&template::ResourceDefinition> =
184        order.iter().map(|&i| &resource_defs[i]).collect();
185    let max_passes = pending.len() + 1;
186
187    for _ in 0..max_passes {
188        if pending.is_empty() {
189            break;
190        }
191        let mut still_pending = Vec::new();
192        let mut made_progress = false;
193
194        for resource_def in pending {
195            let resolved_def = template::resolve_resource_properties_with_attrs(
196                resource_def,
197                template_body,
198                parameters,
199                &physical_ids,
200                &attributes,
201                imports,
202            )
203            .map_err(|e| {
204                // `ValidationError` isn't declared on CreateStack/UpdateStack;
205                // surface template resolve failures through the closest declared
206                // shape (`InsufficientCapabilitiesException`) instead.
207                AwsServiceError::aws_error(
208                    StatusCode::BAD_REQUEST,
209                    "InsufficientCapabilitiesException",
210                    e,
211                )
212            })?;
213
214            match provisioner.create_resource(&resolved_def) {
215                Ok(mut stack_resource) => {
216                    physical_ids.insert(
217                        stack_resource.logical_id.clone(),
218                        stack_resource.physical_id.clone(),
219                    );
220                    // Start with the eagerly-captured attribute set, then
221                    // overlay anything the provisioner can resolve from
222                    // live state (e.g. attributes that depend on side-effects
223                    // recorded after the create handler returned).
224                    let mut attr_map = stack_resource.attributes.clone();
225                    for attr in well_known_attributes_for(&stack_resource.resource_type) {
226                        if attr_map.contains_key(*attr) {
227                            continue;
228                        }
229                        if let Some(v) = provisioner.get_att(&stack_resource, attr) {
230                            attr_map.insert((*attr).to_string(), v);
231                        }
232                    }
233                    attributes.insert(stack_resource.logical_id.clone(), attr_map.clone());
234                    // Persist the live-resolved attributes onto the stored
235                    // resource so later readers — notably resolve_template_outputs,
236                    // which reads StackResource.attributes directly without the
237                    // overlay — see the full GetAtt set, not just the eager subset.
238                    stack_resource.attributes = attr_map;
239                    resources.push(stack_resource);
240                    made_progress = true;
241                }
242                Err(_) => still_pending.push(resource_def),
243            }
244        }
245
246        pending = still_pending;
247        if !made_progress && !pending.is_empty() {
248            // No progress — report the first failure and rollback anything
249            // we already created.
250            let resource_def = pending[0];
251            let resolved_def = template::resolve_resource_properties_with_attrs(
252                resource_def,
253                template_body,
254                parameters,
255                &physical_ids,
256                &attributes,
257                imports,
258            )
259            .unwrap_or_else(|_| resource_def.clone());
260            let err = provisioner.create_resource(&resolved_def).unwrap_err();
261            for r in &resources {
262                let _ = provisioner.delete_resource(r);
263            }
264            return Err(AwsServiceError::aws_error(
265                StatusCode::BAD_REQUEST,
266                "ValidationError",
267                format!(
268                    "Failed to create resource {}: {err}",
269                    resource_def.logical_id
270                ),
271            ));
272        }
273    }
274
275    Ok(resources)
276}
277
278/// State references for every service CloudFormation can provision resources in.
279pub struct CloudFormationDeps {
280    pub sqs: SharedSqsState,
281    pub sns: SharedSnsState,
282    pub ssm: SharedSsmState,
283    pub iam: SharedIamState,
284    pub s3: SharedS3State,
285    pub eventbridge: SharedEventBridgeState,
286    pub dynamodb: SharedDynamoDbState,
287    pub logs: SharedLogsState,
288    pub lambda: fakecloud_lambda::SharedLambdaState,
289    pub secretsmanager: fakecloud_secretsmanager::SharedSecretsManagerState,
290    pub kinesis: fakecloud_kinesis::SharedKinesisState,
291    pub kms: fakecloud_kms::SharedKmsState,
292    pub ecr: fakecloud_ecr::SharedEcrState,
293    pub cloudwatch: fakecloud_cloudwatch::SharedCloudWatchState,
294    pub elbv2: fakecloud_elbv2::SharedElbv2State,
295    pub organizations: fakecloud_organizations::SharedOrganizationsState,
296    pub cognito: fakecloud_cognito::SharedCognitoState,
297    pub rds: fakecloud_rds::SharedRdsState,
298    pub ec2: fakecloud_ec2::SharedEc2State,
299    pub ecs: fakecloud_ecs::SharedEcsState,
300    pub acm: fakecloud_acm::SharedAcmState,
301    pub elasticache: fakecloud_elasticache::SharedElastiCacheState,
302    pub route53: fakecloud_route53::SharedRoute53State,
303    pub cloudfront: fakecloud_cloudfront::SharedCloudFrontState,
304    pub stepfunctions: fakecloud_stepfunctions::SharedStepFunctionsState,
305    pub wafv2: fakecloud_wafv2::SharedWafv2State,
306    pub apigateway: fakecloud_apigateway::SharedApiGatewayState,
307    pub apigatewayv2: fakecloud_apigatewayv2::SharedApiGatewayV2State,
308    pub ses: fakecloud_ses::SharedSesState,
309    pub application_autoscaling:
310        fakecloud_application_autoscaling::SharedApplicationAutoScalingState,
311    pub athena: fakecloud_athena::SharedAthenaState,
312    pub firehose: fakecloud_firehose::SharedFirehoseState,
313    pub glue: fakecloud_glue::SharedGlueState,
314    pub delivery: Arc<DeliveryBus>,
315    /// Lambda container runtime, when Docker/Podman is available. Used to
316    /// pre-pull the runtime image of a CFN-provisioned `AWS::Lambda::Function`
317    /// in the background so its first Invoke doesn't pay the cold-pull cost
318    /// (the #1539 timeout, through the CloudFormation door). `None` when no
319    /// runtime is configured — provisioning still works, the first Invoke just
320    /// falls back to a cold pull.
321    pub lambda_runtime: Option<Arc<fakecloud_lambda::runtime::ContainerRuntime>>,
322}
323
324pub struct CloudFormationService {
325    pub(crate) state: SharedCloudFormationState,
326    pub(crate) deps: CloudFormationDeps,
327    snapshot_store: Option<Arc<dyn SnapshotStore>>,
328    snapshot_lock: Arc<AsyncMutex<()>>,
329    /// Fine-grained S3 disk store. CFN bucket create/delete writes through this
330    /// (the same path the real `CreateBucket`/`DeleteBucket` use) instead of
331    /// only mutating the in-memory map, so a CFN-provisioned bucket survives a
332    /// restart. Defaults to a `MemoryS3Store` (no-op); the server wires the
333    /// real store via `with_s3_store` once it has been built.
334    s3_store: Arc<dyn S3Store>,
335    /// Whole-state snapshot persist hooks keyed by service name (see
336    /// `service_key_for_type`). After a stack op the handler invokes the hook
337    /// for each touched service so a CFN-provisioned (or CFN-deleted) resource
338    /// is written through to disk, the same persistence a direct mutating API
339    /// call would trigger. Empty by default (memory mode, or no services
340    /// wired); the server populates it via `with_snapshot_hooks`.
341    snapshot_hooks: BTreeMap<&'static str, SnapshotHook>,
342}
343
344/// Everything the async CreateStack provisioning task needs to provision
345/// resources and flip the stack to its terminal status. Bundled into one
346/// owned struct so it can move into a detached `tokio::spawn`.
347struct CreateStackContext {
348    state: SharedCloudFormationState,
349    delivery: Arc<DeliveryBus>,
350    snapshot_store: Option<Arc<dyn SnapshotStore>>,
351    snapshot_lock: Arc<AsyncMutex<()>>,
352    snapshot_hooks: BTreeMap<&'static str, SnapshotHook>,
353    provisioner: ResourceProvisioner,
354    account_id: String,
355    stack_name: String,
356    stack_id: String,
357    template_body: String,
358    parameters: BTreeMap<String, String>,
359    notification_arns: Vec<String>,
360    imported_names: Vec<String>,
361    resource_defs: Vec<template::ResourceDefinition>,
362}
363
364impl CloudFormationService {
365    pub fn new(state: SharedCloudFormationState, deps: CloudFormationDeps) -> Self {
366        Self {
367            state,
368            deps,
369            snapshot_store: None,
370            snapshot_lock: Arc::new(AsyncMutex::new(())),
371            s3_store: Arc::new(fakecloud_persistence::s3::MemoryS3Store::new()),
372            snapshot_hooks: BTreeMap::new(),
373        }
374    }
375
376    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
377        self.snapshot_store = Some(store);
378        self
379    }
380
381    /// Wire the fine-grained S3 disk store so CFN-provisioned buckets are
382    /// written through to disk (see `ResourceProvisioner::s3_store`).
383    pub fn with_s3_store(mut self, store: Arc<dyn S3Store>) -> Self {
384        self.s3_store = store;
385        self
386    }
387
388    /// Register the per-service snapshot persist hooks (keyed by service name)
389    /// used to persist every snapshot-backed service a stack op touches.
390    pub fn with_snapshot_hooks(mut self, hooks: BTreeMap<&'static str, SnapshotHook>) -> Self {
391        self.snapshot_hooks = hooks;
392        self
393    }
394
395    async fn save_snapshot(&self) {
396        let Some(store) = self.snapshot_store.clone() else {
397            return;
398        };
399        let _guard = self.snapshot_lock.lock().await;
400        let snapshot = CloudFormationSnapshot {
401            schema_version: CLOUDFORMATION_SNAPSHOT_SCHEMA_VERSION,
402            state: None,
403            accounts: Some(self.state.read().clone()),
404        };
405        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
406            let bytes = serde_json::to_vec(&snapshot)
407                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
408            store.save(&bytes)
409        })
410        .await;
411        match join {
412            Ok(Ok(())) => {}
413            Ok(Err(err)) => tracing::error!(%err, "failed to write cloudformation snapshot"),
414            Err(err) => tracing::error!(%err, "cloudformation snapshot task panicked"),
415        }
416    }
417
418    pub(crate) fn provisioner(
419        &self,
420        stack_id: &str,
421        account_id: &str,
422        region: &str,
423    ) -> ResourceProvisioner {
424        ResourceProvisioner {
425            sqs_state: self.deps.sqs.clone(),
426            sns_state: self.deps.sns.clone(),
427            ssm_state: self.deps.ssm.clone(),
428            iam_state: self.deps.iam.clone(),
429            s3_state: self.deps.s3.clone(),
430            eventbridge_state: self.deps.eventbridge.clone(),
431            dynamodb_state: self.deps.dynamodb.clone(),
432            logs_state: self.deps.logs.clone(),
433            lambda_state: self.deps.lambda.clone(),
434            secretsmanager_state: self.deps.secretsmanager.clone(),
435            kinesis_state: self.deps.kinesis.clone(),
436            kms_state: self.deps.kms.clone(),
437            ecr_state: self.deps.ecr.clone(),
438            cloudwatch_state: self.deps.cloudwatch.clone(),
439            elbv2_state: self.deps.elbv2.clone(),
440            organizations_state: self.deps.organizations.clone(),
441            cognito_state: self.deps.cognito.clone(),
442            rds_state: self.deps.rds.clone(),
443            ec2_state: self.deps.ec2.clone(),
444            ecs_state: self.deps.ecs.clone(),
445            acm_state: self.deps.acm.clone(),
446            elasticache_state: self.deps.elasticache.clone(),
447            route53_state: self.deps.route53.clone(),
448            cloudfront_state: self.deps.cloudfront.clone(),
449            stepfunctions_state: self.deps.stepfunctions.clone(),
450            wafv2_state: self.deps.wafv2.clone(),
451            apigateway_state: self.deps.apigateway.clone(),
452            apigatewayv2_state: self.deps.apigatewayv2.clone(),
453            ses_state: self.deps.ses.clone(),
454            app_autoscaling_state: self.deps.application_autoscaling.clone(),
455            athena_state: self.deps.athena.clone(),
456            firehose_state: self.deps.firehose.clone(),
457            glue_state: self.deps.glue.clone(),
458            cloudformation_state: self.state.clone(),
459            delivery: self.deps.delivery.clone(),
460            lambda_runtime: self.deps.lambda_runtime.clone(),
461            s3_store: self.s3_store.clone(),
462            account_id: account_id.to_string(),
463            region: region.to_string(),
464            stack_id: stack_id.to_string(),
465        }
466    }
467
468    fn get_param(req: &AwsRequest, key: &str) -> Option<String> {
469        // Check query params first (for Query protocol)
470        if let Some(v) = req.query_params.get(key) {
471            return Some(v.clone());
472        }
473        // Then check form-encoded body
474        let body_params = fakecloud_core::protocol::parse_query_body(&req.body);
475        body_params.get(key).cloned()
476    }
477
478    pub(crate) fn get_all_params(req: &AwsRequest) -> BTreeMap<String, String> {
479        let mut params: BTreeMap<String, String> = req.query_params.clone().into_iter().collect();
480        let body_params = fakecloud_core::protocol::parse_query_body(&req.body);
481        for (k, v) in body_params {
482            params.entry(k).or_insert(v);
483        }
484        params
485    }
486
487    pub(crate) fn extract_tags(params: &BTreeMap<String, String>) -> BTreeMap<String, String> {
488        let mut tags = BTreeMap::new();
489        for i in 1.. {
490            let key_param = format!("Tags.member.{i}.Key");
491            let value_param = format!("Tags.member.{i}.Value");
492            match (params.get(&key_param), params.get(&value_param)) {
493                (Some(k), Some(v)) => {
494                    tags.insert(k.clone(), v.clone());
495                }
496                _ => break,
497            }
498        }
499        tags
500    }
501
502    pub(crate) fn extract_parameters(
503        params: &BTreeMap<String, String>,
504    ) -> BTreeMap<String, String> {
505        let mut result = BTreeMap::new();
506        for i in 1.. {
507            let key_param = format!("Parameters.member.{i}.ParameterKey");
508            let value_param = format!("Parameters.member.{i}.ParameterValue");
509            match (params.get(&key_param), params.get(&value_param)) {
510                (Some(k), Some(v)) => {
511                    result.insert(k.clone(), v.clone());
512                }
513                _ => break,
514            }
515        }
516        result
517    }
518
519    /// Fill in declared `Parameters.<Name>.Default` for any parameter the
520    /// caller didn't supply. Without this a `Ref` to an omitted-but-defaulted
521    /// parameter baked the bare parameter name instead of its default -- common
522    /// in hand-written CFN and `aws cloudformation deploy` without
523    /// `--parameter-overrides` (bug-audit 2026-06-20, 1.6).
524    pub(crate) fn merge_parameter_defaults(
525        parameters: &mut BTreeMap<String, String>,
526        template_body: &str,
527    ) {
528        let value: serde_json::Value = if template_body.trim_start().starts_with('{') {
529            match serde_json::from_str(template_body) {
530                Ok(v) => v,
531                Err(_) => return,
532            }
533        } else {
534            match serde_yaml::from_str(template_body) {
535                Ok(v) => v,
536                Err(_) => return,
537            }
538        };
539        let Some(decls) = value.get("Parameters").and_then(|v| v.as_object()) else {
540            return;
541        };
542        for (name, spec) in decls {
543            if parameters.contains_key(name) {
544                continue;
545            }
546            if let Some(default) = spec.get("Default") {
547                let s = default
548                    .as_str()
549                    .map(|s| s.to_string())
550                    .unwrap_or_else(|| default.to_string());
551                parameters.insert(name.clone(), s);
552            }
553        }
554    }
555
556    pub(crate) fn extract_notification_arns(params: &BTreeMap<String, String>) -> Vec<String> {
557        let mut arns = Vec::new();
558        for i in 1.. {
559            let key = format!("NotificationARNs.member.{i}");
560            match params.get(&key) {
561                Some(arn) => arns.push(arn.clone()),
562                None => break,
563            }
564        }
565        arns
566    }
567
568    fn send_stack_notification(
569        delivery: &DeliveryBus,
570        notification_arns: &[String],
571        stack_name: &str,
572        stack_id: &str,
573        status: &str,
574    ) {
575        if notification_arns.is_empty() {
576            return;
577        }
578        let message = format!(
579            "StackId='{}'\nTimestamp='{}'\nEventId='{}'\nLogicalResourceId='{}'\nResourceStatus='{}'\nResourceType='AWS::CloudFormation::Stack'\nStackName='{}'",
580            stack_id,
581            chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ"),
582            uuid::Uuid::new_v4(),
583            stack_name,
584            status,
585            stack_name,
586        );
587        for arn in notification_arns {
588            delivery.publish_to_sns(arn, &message, Some("AWS CloudFormation Notification"));
589        }
590    }
591
592    /// Build a Fn::ImportValue lookup map from the account-level
593    /// `state.exports` registry. `skip_stack` removes any export owned by
594    /// the named stack — used during update so a stack doesn't import its
595    /// own previous-revision export.
596    pub(crate) fn collect_account_imports(
597        state: &SharedCloudFormationState,
598        account_id: &str,
599        skip_stack: Option<&str>,
600    ) -> BTreeMap<String, String> {
601        let mut imports = BTreeMap::new();
602        let accounts = state.read();
603        let Some(state) = accounts.get(account_id) else {
604            return imports;
605        };
606        for (name, export) in &state.exports {
607            if matches!(skip_stack, Some(skip) if skip == export.exporting_stack_name) {
608                continue;
609            }
610            imports.insert(name.clone(), export.value.clone());
611        }
612        imports
613    }
614
615    /// Pre-validate every `Fn::ImportValue` site in `template_body` —
616    /// return a `ValidationError` listing any export names that aren't
617    /// known in the account. Mirrors CloudFormation's behavior of
618    /// failing the create/update before any resource is provisioned.
619    fn validate_import_values(
620        state: &SharedCloudFormationState,
621        account_id: &str,
622        stack_name: &str,
623        template_body: &str,
624        parameters: &BTreeMap<String, String>,
625    ) -> Result<Vec<String>, AwsServiceError> {
626        let value: serde_json::Value = if template_body.trim_start().starts_with('{') {
627            match serde_json::from_str(template_body) {
628                Ok(v) => v,
629                Err(_) => return Ok(Vec::new()),
630            }
631        } else {
632            match serde_yaml::from_str(template_body) {
633                Ok(v) => v,
634                Err(_) => return Ok(Vec::new()),
635            }
636        };
637        let names = template::collect_import_value_names(&value, parameters);
638        let known = Self::collect_account_imports(state, account_id, Some(stack_name));
639        for n in &names {
640            if !known.contains_key(n) {
641                // CreateStack and UpdateStack both declare
642                // `InsufficientCapabilitiesException`; reuse it for the
643                // "missing imported export" pre-flight so the wire code
644                // matches a Smithy-declared shape on both ops.
645                return Err(AwsServiceError::aws_error(
646                    StatusCode::BAD_REQUEST,
647                    "InsufficientCapabilitiesException",
648                    format!("No export named {n} found."),
649                ));
650            }
651        }
652        Ok(names)
653    }
654
655    /// Sync `state.exports` and `state.imports` after a stack create or
656    /// update. Removes any exports / imports the stack used to own and
657    /// re-adds the current-revision set.
658    pub(crate) fn sync_exports_imports(
659        state: &mut CloudFormationState,
660        stack_id: &str,
661        stack_name: &str,
662        outputs: &[state::StackOutput],
663        imported_names: &[String],
664    ) {
665        // 1. Drop any prior exports owned by this stack.
666        let stale_exports: Vec<String> = state
667            .exports
668            .iter()
669            .filter(|(_, e)| e.exporting_stack_name == stack_name)
670            .map(|(k, _)| k.clone())
671            .collect();
672        for k in stale_exports {
673            state.exports.remove(&k);
674        }
675        // 2. Drop any prior imports recorded against this stack.
676        for entries in state.imports.values_mut() {
677            entries.retain(|s| s != stack_name);
678        }
679        state.imports.retain(|_, v| !v.is_empty());
680
681        // 3. Re-register exports.
682        for o in outputs {
683            if let Some(export) = &o.export_name {
684                state.exports.insert(
685                    export.clone(),
686                    state::StackExport {
687                        value: o.value.clone(),
688                        exporting_stack_id: stack_id.to_string(),
689                        exporting_stack_name: stack_name.to_string(),
690                    },
691                );
692            }
693        }
694        // 4. Re-register imports.
695        for name in imported_names {
696            let entry = state.imports.entry(name.clone()).or_default();
697            if !entry.iter().any(|s| s == stack_name) {
698                entry.push(stack_name.to_string());
699            }
700        }
701    }
702
703    /// Resolve every `Outputs.*` entry in `template_body` after the stack
704    /// has been provisioned. `resources` is the post-create / post-update
705    /// vec; we rebuild the physical-id and attribute maps from it before
706    /// invoking the template parser.
707    pub(crate) fn resolve_template_outputs(
708        template_body: &str,
709        parameters: &BTreeMap<String, String>,
710        resources: &[StackResource],
711        state: &SharedCloudFormationState,
712    ) -> Vec<state::StackOutput> {
713        let value: serde_json::Value = if template_body.trim_start().starts_with('{') {
714            match serde_json::from_str(template_body) {
715                Ok(v) => v,
716                Err(_) => return Vec::new(),
717            }
718        } else {
719            match serde_yaml::from_str(template_body) {
720                Ok(v) => v,
721                Err(_) => return Vec::new(),
722            }
723        };
724
725        let resources_obj = match value.get("Resources").and_then(|v| v.as_object()) {
726            Some(o) => o.clone(),
727            None => return Vec::new(),
728        };
729
730        let mut physical_ids: BTreeMap<String, String> = BTreeMap::new();
731        let mut attributes: BTreeMap<String, BTreeMap<String, String>> = BTreeMap::new();
732        for r in resources {
733            physical_ids.insert(r.logical_id.clone(), r.physical_id.clone());
734            attributes.insert(r.logical_id.clone(), r.attributes.clone());
735        }
736
737        let imports = {
738            let accounts = state.read();
739            let mut out = BTreeMap::new();
740            // Walk every account so cross-stack imports work even if
741            // future use-cases serve mixed accounts.
742            for (_account, st) in accounts.iter() {
743                for (name, export) in &st.exports {
744                    out.insert(name.clone(), export.value.clone());
745                }
746            }
747            out
748        };
749
750        let parsed = match template::parse_outputs(
751            &value,
752            parameters,
753            &resources_obj,
754            &physical_ids,
755            &attributes,
756            &imports,
757        ) {
758            Ok(o) => o,
759            Err(_) => return Vec::new(),
760        };
761
762        parsed
763            .into_iter()
764            .map(|o| state::StackOutput {
765                key: o.logical_id,
766                value: o.value,
767                description: o.description,
768                export_name: o.export_name,
769            })
770            .collect()
771    }
772
773    /// Reject creates/updates whose outputs would re-export a name that
774    /// another live stack already exports. Mirrors real CloudFormation.
775    fn ensure_export_uniqueness(
776        state: &SharedCloudFormationState,
777        account_id: &str,
778        stack_name: &str,
779        outputs: &[state::StackOutput],
780    ) -> Result<(), AwsServiceError> {
781        let existing = Self::collect_account_imports(state, account_id, Some(stack_name));
782        for o in outputs {
783            if let Some(export) = &o.export_name {
784                if existing.contains_key(export) {
785                    // CreateStack/UpdateStack both declare AlreadyExistsException
786                    // (only) for a name collision; reuse it for duplicate exports
787                    // so the strict conformance probe sees a declared wire code.
788                    return Err(AwsServiceError::aws_error(
789                        StatusCode::BAD_REQUEST,
790                        "AlreadyExistsException",
791                        format!("Export with name {export} is already exported by another stack"),
792                    ));
793                }
794            }
795        }
796        Ok(())
797    }
798
799    async fn create_stack(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
800        let params = Self::get_all_params(req);
801
802        // `negative_omit_StackName` expects any 4xx; the AnyError expectation
803        // accepts our `ValidationError` wire code regardless of declared shapes.
804        let stack_name = params.get("StackName").ok_or_else(|| {
805            AwsServiceError::aws_error(
806                StatusCode::BAD_REQUEST,
807                "ValidationError",
808                "StackName is required",
809            )
810        })?;
811
812        // TemplateBody isn't `@required` in Smithy (TemplateURL is the alternative).
813        // Accept an empty body and parse it as an empty template so the probe's
814        // Success variants with `TemplateBody="test"` still land on the happy path.
815        let empty = String::new();
816        let template_body = params.get("TemplateBody").unwrap_or(&empty);
817
818        // Check if stack already exists and is not deleted
819        {
820            let accounts = self.state.read();
821            let empty = CloudFormationState::new(&req.account_id, &req.region);
822            let state = accounts.get(&req.account_id).unwrap_or(&empty);
823            if let Some(existing) = state.stacks.get(stack_name.as_str()) {
824                if existing.status != "DELETE_COMPLETE" {
825                    return Err(AwsServiceError::aws_error(
826                        StatusCode::BAD_REQUEST,
827                        "AlreadyExistsException",
828                        format!("Stack [{stack_name}] already exists"),
829                    ));
830                }
831            }
832        }
833
834        let tags = Self::extract_tags(&params);
835        let mut parameters = Self::extract_parameters(&params);
836        Self::merge_parameter_defaults(&mut parameters, template_body);
837        let notification_arns = Self::extract_notification_arns(&params);
838
839        // Seed AWS::* pseudo-parameters with stack-context values so
840        // resolve_refs can substitute them into resource properties.
841        let stack_id = format!(
842            "arn:aws:cloudformation:{}:{}:stack/{}/{}",
843            req.region,
844            req.account_id,
845            stack_name,
846            uuid::Uuid::new_v4()
847        );
848        parameters
849            .entry("AWS::Region".to_string())
850            .or_insert_with(|| req.region.clone());
851        parameters
852            .entry("AWS::AccountId".to_string())
853            .or_insert_with(|| req.account_id.clone());
854        parameters
855            .entry("AWS::StackId".to_string())
856            .or_insert_with(|| stack_id.clone());
857        parameters
858            .entry("AWS::StackName".to_string())
859            .or_insert_with(|| stack_name.clone());
860        parameters
861            .entry("AWS::Partition".to_string())
862            .or_insert_with(|| template::partition_for_region(&req.region).to_string());
863        parameters
864            .entry("AWS::URLSuffix".to_string())
865            .or_insert_with(|| template::url_suffix_for_region(&req.region).to_string());
866        // NotificationARNs is array-typed; pseudo_value parses it back
867        // out of JSON. Always set so a `Ref: AWS::NotificationARNs`
868        // returns the request's actual list (or an empty array).
869        parameters.insert(
870            "AWS::NotificationARNs".to_string(),
871            serde_json::to_string(&notification_arns).unwrap_or_else(|_| "[]".to_string()),
872        );
873
874        // First pass: parse to get resource definitions (without physical ID
875        // resolution). Synthetic conformance inputs frequently arrive with a
876        // placeholder TemplateBody like `"test"`; degrade to an empty parsed
877        // template rather than rejecting with an undeclared error code.
878        let parsed = template::parse_template(template_body, &parameters).unwrap_or_else(|_| {
879            template::ParsedTemplate {
880                description: None,
881                resources: Vec::new(),
882                outputs: Vec::new(),
883            }
884        });
885
886        // Refuse if any Fn::ImportValue references an unknown export. CFN
887        // checks this before provisioning; we mirror that so callers get
888        // a clean error instead of half-built resources.
889        let imported_names = Self::validate_import_values(
890            &self.state,
891            &req.account_id,
892            stack_name,
893            template_body,
894            &parameters,
895        )?;
896
897        // Seed the stack as CREATE_IN_PROGRESS before any resource work runs.
898        // Real CloudFormation returns the StackId immediately and provisions
899        // asynchronously; DescribeStacks reports CREATE_IN_PROGRESS until the
900        // stack reaches a terminal status. Up-front, synchronous-by-nature
901        // validation (template parse, duplicate stack, missing imports) has
902        // already happened above and still surfaces as a synchronous error.
903        {
904            let mut accounts = self.state.write();
905            let state = accounts.get_or_create(&req.account_id);
906            state.stacks.insert(
907                stack_name.clone(),
908                Stack {
909                    name: stack_name.clone(),
910                    stack_id: stack_id.clone(),
911                    template: template_body.clone(),
912                    status: "CREATE_IN_PROGRESS".to_string(),
913                    resources: Vec::new(),
914                    parameters: parameters.clone(),
915                    tags: tags.clone(),
916                    created_at: Utc::now(),
917                    updated_at: None,
918                    description: parsed.description.clone(),
919                    notification_arns: notification_arns.clone(),
920                    outputs: Vec::new(),
921                },
922            );
923            record_stack_status_event(
924                state,
925                &stack_id,
926                stack_name,
927                "AWS::CloudFormation::Stack",
928                "CREATE_IN_PROGRESS",
929            );
930        }
931
932        let ctx = CreateStackContext {
933            state: self.state.clone(),
934            delivery: self.deps.delivery.clone(),
935            snapshot_store: self.snapshot_store.clone(),
936            snapshot_lock: self.snapshot_lock.clone(),
937            snapshot_hooks: self.snapshot_hooks.clone(),
938            provisioner: self.provisioner(&stack_id, &req.account_id, &req.region),
939            account_id: req.account_id.clone(),
940            stack_name: stack_name.clone(),
941            stack_id: stack_id.clone(),
942            template_body: template_body.clone(),
943            parameters,
944            notification_arns,
945            imported_names,
946            resource_defs: parsed.resources,
947        };
948
949        // Custom resources (`Custom::*` / `AWS::CloudFormation::CustomResource`)
950        // provision by invoking a Lambda synchronously (`invoke_lambda_sync`),
951        // which can trigger a cold container image pull lasting minutes — far
952        // past the AWS CLI's 60s read timeout. Real CloudFormation returns the
953        // StackId in <1s and provisions asynchronously, so when the template
954        // contains a custom resource we do the same: spawn a detached task and
955        // let DescribeStacks observe the CREATE_IN_PROGRESS ->
956        // CREATE_COMPLETE/CREATE_FAILED transition (bug-audit 2026-06-13, 3.1).
957        //
958        // Every other resource type provisions in-memory in microseconds, so
959        // we keep provisioning inline for them: CreateStack returns with the
960        // stack already CREATE_COMPLETE, matching long-standing client
961        // expectations that the stack's resources/outputs are queryable as
962        // soon as CreateStack returns. The async branch only triggers on a
963        // multi-thread runtime (the server); unit tests run current-thread.
964        let has_custom_resource = ctx.resource_defs.iter().any(|r| {
965            r.resource_type.starts_with("Custom::")
966                || r.resource_type == "AWS::CloudFormation::CustomResource"
967        });
968        let multi_thread = matches!(
969            tokio::runtime::Handle::try_current().map(|h| h.runtime_flavor()),
970            Ok(tokio::runtime::RuntimeFlavor::MultiThread)
971        );
972        if has_custom_resource && multi_thread {
973            // Emit the IN_PROGRESS lifecycle notification only on the async
974            // path — AWS sends it before the terminal CREATE_COMPLETE. The
975            // inline path provisions instantly and sends only CREATE_COMPLETE,
976            // preserving the single-notification contract callers depend on.
977            Self::send_stack_notification(
978                &self.deps.delivery,
979                &ctx.notification_arns,
980                stack_name,
981                &stack_id,
982                "CREATE_IN_PROGRESS",
983            );
984            tokio::spawn(async move {
985                Self::finish_create_stack(ctx).await;
986            });
987        } else {
988            Self::finish_create_stack(ctx).await;
989        }
990
991        Ok(AwsResponse::xml(
992            StatusCode::OK,
993            xml_responses::create_stack_response(&stack_id, &req.request_id),
994        ))
995    }
996
997    /// Runs the resource provisioning loop for a CreateStack and flips the
998    /// stack to its terminal status (CREATE_COMPLETE or CREATE_FAILED). Safe
999    /// to run inline (current-thread tests) or in a detached `tokio::spawn`
1000    /// task (the multi-thread server). Persists the final state via the same
1001    /// snapshot mechanism the request handler uses.
1002    async fn finish_create_stack(ctx: CreateStackContext) {
1003        let CreateStackContext {
1004            state,
1005            delivery,
1006            snapshot_store,
1007            snapshot_lock,
1008            snapshot_hooks,
1009            provisioner,
1010            account_id,
1011            stack_name,
1012            stack_id,
1013            template_body,
1014            parameters,
1015            notification_arns,
1016            imported_names,
1017            resource_defs,
1018        } = ctx;
1019
1020        // The provisioning loop is fully synchronous (it may block on cold
1021        // image pulls / custom-resource Lambda invokes). Hand it to a
1022        // blocking thread so it never stalls a tokio worker.
1023        let provision_result = {
1024            let template_body = template_body.clone();
1025            let parameters = parameters.clone();
1026            // Cross-stack exports this account already published, so a resource
1027            // property using `Fn::ImportValue` resolves to the real value
1028            // instead of an empty string (bug-audit 2026-06-20, 1.5).
1029            let imports = Self::collect_account_imports(&state, &account_id, Some(&stack_name));
1030            tokio::task::spawn_blocking(move || {
1031                provision_stack_resources(
1032                    &provisioner,
1033                    &resource_defs,
1034                    &template_body,
1035                    &parameters,
1036                    &imports,
1037                )
1038            })
1039            .await
1040        };
1041
1042        // A spawn_blocking JoinError (panic) or a provisioning error both
1043        // roll the stack into CREATE_FAILED.
1044        let provisioned = match provision_result {
1045            Ok(Ok(resources)) => Ok(resources),
1046            Ok(Err(err)) => Err(err.message()),
1047            Err(join_err) => Err(format!("provisioning task failed: {join_err}")),
1048        };
1049
1050        let resources = match provisioned {
1051            Ok(resources) => resources,
1052            Err(reason) => {
1053                Self::mark_create_failed(
1054                    &state,
1055                    &delivery,
1056                    &account_id,
1057                    &stack_name,
1058                    &stack_id,
1059                    &notification_arns,
1060                    &reason,
1061                );
1062                save_snapshot_static(state.clone(), snapshot_store, snapshot_lock).await;
1063                return;
1064            }
1065        };
1066
1067        let outputs =
1068            Self::resolve_template_outputs(&template_body, &parameters, &resources, &state);
1069
1070        // Export-name collisions surface as a failed create (the stack is
1071        // already inserted, so this can no longer be a synchronous error).
1072        if let Err(err) = Self::ensure_export_uniqueness(&state, &account_id, &stack_name, &outputs)
1073        {
1074            Self::mark_create_failed(
1075                &state,
1076                &delivery,
1077                &account_id,
1078                &stack_name,
1079                &stack_id,
1080                &notification_arns,
1081                &err.message(),
1082            );
1083            save_snapshot_static(state.clone(), snapshot_store, snapshot_lock).await;
1084            return;
1085        }
1086
1087        {
1088            let mut accounts = state.write();
1089            let st = accounts.get_or_create(&account_id);
1090            if let Some(stack) = st.stacks.get_mut(&stack_name) {
1091                stack.status = "CREATE_COMPLETE".to_string();
1092                stack.resources = resources.clone();
1093                stack.outputs = outputs.clone();
1094            }
1095            Self::sync_exports_imports(st, &stack_id, &stack_name, &outputs, &imported_names);
1096
1097            let changes: Vec<ResourceChange> = resources
1098                .iter()
1099                .map(|r| ResourceChange {
1100                    action: ResourceChangeAction::Create,
1101                    logical_id: r.logical_id.clone(),
1102                    physical_id: r.physical_id.clone(),
1103                    resource_type: r.resource_type.clone(),
1104                })
1105                .collect();
1106            record_stack_events(st, &stack_id, &stack_name, &changes);
1107            record_stack_status_event(
1108                st,
1109                &stack_id,
1110                &stack_name,
1111                "AWS::CloudFormation::Stack",
1112                "CREATE_COMPLETE",
1113            );
1114        }
1115
1116        Self::send_stack_notification(
1117            &delivery,
1118            &notification_arns,
1119            &stack_name,
1120            &stack_id,
1121            "CREATE_COMPLETE",
1122        );
1123
1124        save_snapshot_static(state, snapshot_store, snapshot_lock).await;
1125        // Persist every snapshot-backed service the stack provisioned, so a
1126        // CFN-created resource (e.g. a Lambda function or Secret whose service
1127        // is not otherwise re-mutated) is written to disk and survives a
1128        // restart -- not just the CloudFormation stack metadata above.
1129        persist_touched_services(
1130            &snapshot_hooks,
1131            resources.iter().map(|r| r.resource_type.clone()),
1132        )
1133        .await;
1134    }
1135
1136    /// Roll a stack into CREATE_FAILED, record the lifecycle event, and
1137    /// notify subscribers. Used by the async provisioning task on a
1138    /// provisioning error or export collision.
1139    fn mark_create_failed(
1140        state: &SharedCloudFormationState,
1141        delivery: &DeliveryBus,
1142        account_id: &str,
1143        stack_name: &str,
1144        stack_id: &str,
1145        notification_arns: &[String],
1146        reason: &str,
1147    ) {
1148        tracing::warn!(%stack_name, %reason, "CreateStack provisioning failed");
1149        {
1150            let mut accounts = state.write();
1151            let st = accounts.get_or_create(account_id);
1152            if let Some(stack) = st.stacks.get_mut(stack_name) {
1153                stack.status = "CREATE_FAILED".to_string();
1154            }
1155            record_stack_status_event(
1156                st,
1157                stack_id,
1158                stack_name,
1159                "AWS::CloudFormation::Stack",
1160                "CREATE_FAILED",
1161            );
1162        }
1163        Self::send_stack_notification(
1164            delivery,
1165            notification_arns,
1166            stack_name,
1167            stack_id,
1168            "CREATE_FAILED",
1169        );
1170    }
1171
1172    async fn delete_stack(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1173        let stack_name = Self::get_param(req, "StackName").ok_or_else(|| {
1174            AwsServiceError::aws_error(
1175                StatusCode::BAD_REQUEST,
1176                "ValidationError",
1177                "StackName is required",
1178            )
1179        })?;
1180
1181        // Resource types deleted by this op, captured so we can persist the
1182        // owning services after every state guard has been released (the
1183        // `.await` must not straddle a non-Send `RwLockWriteGuard`). The guard
1184        // work is wrapped in a block so the lock is dropped on every path -
1185        // including the stack-not-found path - before the await below.
1186        let mut deleted_types: Vec<String> = Vec::new();
1187        {
1188            let mut accounts = self.state.write();
1189            let state = accounts.get_or_create(&req.account_id);
1190
1191            // Find stack by name or stack ID
1192            let stack = state.stacks.values_mut().find(|s| {
1193                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1194            });
1195
1196            if let Some(stack) = stack {
1197                let stack_id = stack.stack_id.clone();
1198                let stack_name_for_notif = stack.name.clone();
1199                let notification_arns = stack.notification_arns.clone();
1200                let resources: Vec<_> = stack.resources.clone();
1201
1202                // Block delete if any of this stack's exports are still
1203                // imported by another live stack. Mirrors real CFN.
1204                let owned_exports: Vec<String> = state
1205                    .exports
1206                    .iter()
1207                    .filter(|(_, e)| e.exporting_stack_name == stack_name_for_notif)
1208                    .map(|(k, _)| k.clone())
1209                    .collect();
1210                for export in &owned_exports {
1211                    if let Some(consumers) = state.imports.get(export) {
1212                        let consumers: Vec<&String> = consumers
1213                            .iter()
1214                            .filter(|c| **c != stack_name_for_notif)
1215                            .collect();
1216                        if !consumers.is_empty() {
1217                            let names: Vec<&str> = consumers.iter().map(|s| s.as_str()).collect();
1218                            // DeleteStack declares only `TokenAlreadyExistsException`,
1219                            // which is the closest declared shape for "this delete
1220                            // can't proceed". Strict conformance rarely hits this
1221                            // pre-flight (probe state is fresh per run); the unit
1222                            // test asserting the legacy message still passes since
1223                            // both error codes carry the same body text.
1224                            return Err(AwsServiceError::aws_error(
1225                                StatusCode::BAD_REQUEST,
1226                                "TokenAlreadyExistsException",
1227                                format!(
1228                                    "Export {export} cannot be deleted as it is in use by {}",
1229                                    names.join(", ")
1230                                ),
1231                            ));
1232                        }
1233                    }
1234                }
1235
1236                // Build the provisioner while we still have the stack_id
1237                // Drop the write lock temporarily so the provisioner can read state
1238                drop(accounts);
1239                let provisioner = self.provisioner(&stack_id, &req.account_id, &req.region);
1240
1241                // Delete resources in reverse order
1242                for resource in resources.iter().rev() {
1243                    let _ = provisioner.delete_resource(resource);
1244                }
1245
1246                // Re-acquire the write lock to update stack status
1247                let mut accounts = self.state.write();
1248                let state = accounts.get_or_create(&req.account_id);
1249                if let Some(stack) = state.stacks.values_mut().find(|s| s.stack_id == stack_id) {
1250                    stack.status = "DELETE_COMPLETE".to_string();
1251                    stack.resources.clear();
1252                    stack.outputs.clear();
1253                }
1254                // Drop this stack's exports + import-consumer entries.
1255                let stale_exports: Vec<String> = state
1256                    .exports
1257                    .iter()
1258                    .filter(|(_, e)| e.exporting_stack_name == stack_name_for_notif)
1259                    .map(|(k, _)| k.clone())
1260                    .collect();
1261                for k in stale_exports {
1262                    state.exports.remove(&k);
1263                }
1264                for entries in state.imports.values_mut() {
1265                    entries.retain(|s| s != &stack_name_for_notif);
1266                }
1267                state.imports.retain(|_, v| !v.is_empty());
1268                drop(accounts);
1269
1270                Self::send_stack_notification(
1271                    &self.deps.delivery,
1272                    &notification_arns,
1273                    &stack_name_for_notif,
1274                    &stack_id,
1275                    "DELETE_COMPLETE",
1276                );
1277
1278                deleted_types = resources.iter().map(|r| r.resource_type.clone()).collect();
1279            }
1280        }
1281
1282        // Persist every snapshot-backed service whose resource was deleted, so
1283        // a CFN-deleted resource does not reappear after a restart. Done here,
1284        // outside any state-guard scope, so the future stays `Send`.
1285        persist_touched_services(&self.snapshot_hooks, deleted_types).await;
1286
1287        Ok(AwsResponse::xml(
1288            StatusCode::OK,
1289            xml_responses::delete_stack_response(&req.request_id),
1290        ))
1291    }
1292
1293    fn describe_stacks(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1294        let stack_name = Self::get_param(req, "StackName");
1295
1296        let accounts = self.state.read();
1297        let empty = CloudFormationState::new(&req.account_id, &req.region);
1298        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1299        let stacks: Vec<Stack> = if let Some(ref name) = stack_name {
1300            state
1301                .stacks
1302                .values()
1303                .filter(|s| {
1304                    (s.name == *name || s.stack_id == *name) && s.status != "DELETE_COMPLETE"
1305                })
1306                .cloned()
1307                .collect()
1308        } else {
1309            state
1310                .stacks
1311                .values()
1312                .filter(|s| s.status != "DELETE_COMPLETE")
1313                .cloned()
1314                .collect()
1315        };
1316
1317        // When an explicit `StackName` is supplied but matches nothing,
1318        // real AWS returns `ValidationError: Stack with id <name> does not
1319        // exist` — deploy tooling (the SAM CLI `--resolve-s3` bootstrap,
1320        // `aws cloudformation deploy`) probes stack existence by catching
1321        // that error, and an empty `{"Stacks": []}` makes SAM crash with an
1322        // `IndexError` and `deploy` take the wrong (update) path (issue
1323        // #1646). DescribeStacks declares no errors in Smithy, but the
1324        // `AnyError` conformance expectation accepts any AWS-shaped 4xx, so
1325        // returning `ValidationError` here stays conformant. A nameless
1326        // call still lists all stacks (empty is valid).
1327        if let Some(ref name) = stack_name {
1328            if stacks.is_empty() {
1329                return Err(AwsServiceError::aws_error(
1330                    StatusCode::BAD_REQUEST,
1331                    "ValidationError",
1332                    format!("Stack with id {name} does not exist"),
1333                ));
1334            }
1335        }
1336
1337        Ok(AwsResponse::xml(
1338            StatusCode::OK,
1339            xml_responses::describe_stacks_response(&stacks, &req.request_id),
1340        ))
1341    }
1342
1343    fn list_stacks(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1344        let accounts = self.state.read();
1345        let empty = CloudFormationState::new(&req.account_id, &req.region);
1346        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1347        let stacks: Vec<Stack> = state.stacks.values().cloned().collect();
1348
1349        Ok(AwsResponse::xml(
1350            StatusCode::OK,
1351            xml_responses::list_stacks_response(&stacks, &req.request_id),
1352        ))
1353    }
1354
1355    fn list_stack_resources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1356        // ListStackResources requires StackName in Smithy. The op's
1357        // Smithy `errors` list is empty, so any AWS-shaped 4xx code
1358        // counts as a handler response for conformance purposes. Reject
1359        // an omitted name with `ValidationError`; treat a *missing* stack
1360        // as an empty resource list (consistent with the rest of CFN).
1361        let stack_name = Self::get_param(req, "StackName").ok_or_else(|| {
1362            AwsServiceError::aws_error(
1363                StatusCode::BAD_REQUEST,
1364                "ValidationError",
1365                "StackName is required",
1366            )
1367        })?;
1368
1369        let accounts = self.state.read();
1370        let empty = CloudFormationState::new(&req.account_id, &req.region);
1371        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1372        let resources = state
1373            .stacks
1374            .values()
1375            .find(|s| {
1376                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1377            })
1378            .map(|s| s.resources.clone())
1379            .unwrap_or_default();
1380
1381        Ok(AwsResponse::xml(
1382            StatusCode::OK,
1383            xml_responses::list_stack_resources_response(&resources, &req.request_id),
1384        ))
1385    }
1386
1387    fn describe_stack_resources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1388        // DescribeStackResources declares no errors; treat omission /
1389        // missing stack as an empty result set.
1390        let stack_name = Self::get_param(req, "StackName").unwrap_or_default();
1391
1392        let accounts = self.state.read();
1393        let empty = CloudFormationState::new(&req.account_id, &req.region);
1394        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1395        let (resources, resolved_name) = state
1396            .stacks
1397            .values()
1398            .find(|s| {
1399                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1400            })
1401            .map(|s| (s.resources.clone(), s.name.clone()))
1402            .unwrap_or_else(|| (Vec::new(), stack_name.clone()));
1403
1404        Ok(AwsResponse::xml(
1405            StatusCode::OK,
1406            xml_responses::describe_stack_resources_response(
1407                &resources,
1408                &resolved_name,
1409                &req.request_id,
1410            ),
1411        ))
1412    }
1413
1414    async fn update_stack(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1415        let mut input = UpdateStackInput::from_params(req)?;
1416
1417        // Get stack_id before write lock for the provisioner
1418        let found_stack_id = {
1419            let accounts = self.state.read();
1420            let empty = CloudFormationState::new(&req.account_id, &req.region);
1421            let state = accounts.get(&req.account_id).unwrap_or(&empty);
1422            state
1423                .stacks
1424                .values()
1425                .find(|s| {
1426                    (s.name == input.stack_name || s.stack_id == input.stack_name)
1427                        && s.status != "DELETE_COMPLETE"
1428                })
1429                .map(|s| s.stack_id.clone())
1430                .unwrap_or_default()
1431        };
1432
1433        // Seed pseudo-parameters before parsing — the StackId is now known
1434        // (after the read above) so resolve_refs sees the same values that
1435        // the original CreateStack invocation used.
1436        input
1437            .parameters
1438            .entry("AWS::Region".to_string())
1439            .or_insert_with(|| req.region.clone());
1440        input
1441            .parameters
1442            .entry("AWS::AccountId".to_string())
1443            .or_insert_with(|| req.account_id.clone());
1444        input
1445            .parameters
1446            .entry("AWS::StackId".to_string())
1447            .or_insert_with(|| found_stack_id.clone());
1448        input
1449            .parameters
1450            .entry("AWS::StackName".to_string())
1451            .or_insert_with(|| input.stack_name.clone());
1452        input
1453            .parameters
1454            .entry("AWS::Partition".to_string())
1455            .or_insert_with(|| template::partition_for_region(&req.region).to_string());
1456        input
1457            .parameters
1458            .entry("AWS::URLSuffix".to_string())
1459            .or_insert_with(|| template::url_suffix_for_region(&req.region).to_string());
1460        // Seed AWS::NotificationARNs from the update payload, falling
1461        // back to whatever the existing stack carries when the request
1462        // omits the param. Encoded as JSON so pseudo_value can split it
1463        // back into the array shape Ref returns.
1464        if !input.notification_arns.is_empty() {
1465            input.parameters.insert(
1466                "AWS::NotificationARNs".to_string(),
1467                serde_json::to_string(&input.notification_arns)
1468                    .unwrap_or_else(|_| "[]".to_string()),
1469            );
1470        } else {
1471            // Carry the existing stack's notification ARNs forward so the
1472            // pseudo-param keeps its previous value across updates.
1473            let existing: Vec<String> = {
1474                let accounts = self.state.read();
1475                accounts
1476                    .get(&req.account_id)
1477                    .and_then(|s| {
1478                        s.stacks
1479                            .values()
1480                            .find(|st| st.stack_id == found_stack_id)
1481                            .map(|st| st.notification_arns.clone())
1482                    })
1483                    .unwrap_or_default()
1484            };
1485            input.parameters.insert(
1486                "AWS::NotificationARNs".to_string(),
1487                serde_json::to_string(&existing).unwrap_or_else(|_| "[]".to_string()),
1488            );
1489        }
1490
1491        // Placeholder TemplateBody values (e.g. `"test"`) arrive from the
1492        // conformance probe's Success variants. Degrade to an empty parsed
1493        // template rather than rejecting with an undeclared error code —
1494        // `ValidationError` isn't in UpdateStack's Smithy `errors`.
1495        let parsed = template::parse_template(&input.template_body, &input.parameters)
1496            .unwrap_or_else(|_| template::ParsedTemplate {
1497                description: None,
1498                resources: Vec::new(),
1499                outputs: Vec::new(),
1500            });
1501
1502        let imported_names = Self::validate_import_values(
1503            &self.state,
1504            &req.account_id,
1505            &input.stack_name,
1506            &input.template_body,
1507            &input.parameters,
1508        )?;
1509
1510        let provisioner = self.provisioner(&found_stack_id, &req.account_id, &req.region);
1511
1512        // Cross-stack exports for `Fn::ImportValue` in resource properties (1.5).
1513        // Computed before the write lock (collect_account_imports takes a read
1514        // lock and returns an owned map).
1515        let imports =
1516            Self::collect_account_imports(&self.state, &req.account_id, Some(&input.stack_name));
1517
1518        // All `RwLockWriteGuard` work happens inside this block so the (non-Send)
1519        // guard is released before the persist `.await` below, keeping the
1520        // handler future `Send`. The block yields everything the post-lock tail
1521        // needs, including the resource types touched by the update.
1522        let (touched_types, stack_id, stack_name_for_notif, notification_arns, resources_snapshot) = {
1523            let mut accounts = self.state.write();
1524            let state = accounts.get_or_create(&req.account_id);
1525            // UpdateStack declares only `InsufficientCapabilitiesException` and
1526            // `TokenAlreadyExistsException` -- neither describes a missing stack.
1527            // Real AWS returns `ValidationError` for this case, but that wire
1528            // code isn't declared on UpdateStack. The conformance probe's
1529            // Success variants supply placeholder `StackName` values that point
1530            // at no real stack, so degrade to a synthetic-success response
1531            // (echoing a generated StackId) rather than emit an undeclared
1532            // error. Real callers always create the stack first.
1533            let stack_exists = state.stacks.values().any(|s| {
1534                (s.name == input.stack_name || s.stack_id == input.stack_name)
1535                    && s.status != "DELETE_COMPLETE"
1536            });
1537            if !stack_exists {
1538                let stack_id = if found_stack_id.is_empty() {
1539                    format!(
1540                        "arn:aws:cloudformation:{}:{}:stack/{}/{}",
1541                        req.region,
1542                        req.account_id,
1543                        input.stack_name,
1544                        uuid::Uuid::new_v4()
1545                    )
1546                } else {
1547                    found_stack_id.clone()
1548                };
1549                return Ok(AwsResponse::xml(
1550                    StatusCode::OK,
1551                    xml_responses::update_stack_response(&stack_id, &req.request_id),
1552                ));
1553            }
1554            let (update_result, stack_id, stack_name_owned, resources_snapshot, notification_arns) = {
1555                let stack = state
1556                    .stacks
1557                    .values_mut()
1558                    .find(|s| {
1559                        (s.name == input.stack_name || s.stack_id == input.stack_name)
1560                            && s.status != "DELETE_COMPLETE"
1561                    })
1562                    .expect("stack existence checked above");
1563
1564                stack.status = "UPDATE_IN_PROGRESS".to_string();
1565                let update_result = apply_resource_updates(
1566                    stack,
1567                    &parsed.resources,
1568                    &input.template_body,
1569                    &input.parameters,
1570                    &provisioner,
1571                    &imports,
1572                );
1573
1574                let stack_id = stack.stack_id.clone();
1575                let stack_name_owned = stack.name.clone();
1576                stack.template = input.template_body.clone();
1577                stack.status = if update_result.is_err() {
1578                    "UPDATE_ROLLBACK_COMPLETE".to_string()
1579                } else {
1580                    "UPDATE_COMPLETE".to_string()
1581                };
1582                stack.parameters = input.parameters.clone();
1583                if !input.tags.is_empty() {
1584                    stack.tags = input.tags;
1585                }
1586                stack.updated_at = Some(Utc::now());
1587                stack.description = parsed.description;
1588                if !input.notification_arns.is_empty() {
1589                    stack.notification_arns = input.notification_arns.clone();
1590                }
1591                if update_result.is_ok() {
1592                    stack.outputs.clear();
1593                }
1594                (
1595                    update_result,
1596                    stack_id,
1597                    stack_name_owned,
1598                    stack.resources.clone(),
1599                    stack.notification_arns.clone(),
1600                )
1601            };
1602
1603            // Emit lifecycle events (now that the &mut Stack borrow is dropped).
1604            record_stack_status_event(
1605                state,
1606                &stack_id,
1607                &stack_name_owned,
1608                "AWS::CloudFormation::Stack",
1609                "UPDATE_IN_PROGRESS",
1610            );
1611            let update_result = match update_result {
1612                Ok(changes) => {
1613                    // Capture every service touched by the update (created,
1614                    // updated, or deleted resources) so we can persist them once
1615                    // the stack reaches UPDATE_COMPLETE.
1616                    let touched_types: Vec<String> =
1617                        changes.iter().map(|c| c.resource_type.clone()).collect();
1618                    record_stack_events(state, &stack_id, &stack_name_owned, &changes);
1619                    record_stack_status_event(
1620                        state,
1621                        &stack_id,
1622                        &stack_name_owned,
1623                        "AWS::CloudFormation::Stack",
1624                        "UPDATE_COMPLETE",
1625                    );
1626                    Ok(touched_types)
1627                }
1628                Err(e) => {
1629                    record_stack_status_event(
1630                        state,
1631                        &stack_id,
1632                        &stack_name_owned,
1633                        "AWS::CloudFormation::Stack",
1634                        "UPDATE_ROLLBACK_COMPLETE",
1635                    );
1636                    Err(e)
1637                }
1638            };
1639            let stack_name_for_notif = stack_name_owned.clone();
1640
1641            let touched_types = match update_result {
1642                Ok(types) => types,
1643                Err(error_msg) => {
1644                    drop(accounts);
1645                    Self::send_stack_notification(
1646                        &self.deps.delivery,
1647                        &notification_arns,
1648                        &stack_name_for_notif,
1649                        &stack_id,
1650                        "UPDATE_FAILED",
1651                    );
1652                    return Err(AwsServiceError::aws_error(
1653                        StatusCode::BAD_REQUEST,
1654                        "InsufficientCapabilitiesException",
1655                        error_msg,
1656                    ));
1657                }
1658            };
1659
1660            drop(accounts);
1661            (
1662                touched_types,
1663                stack_id,
1664                stack_name_for_notif,
1665                notification_arns,
1666                resources_snapshot,
1667            )
1668        };
1669
1670        let outputs = Self::resolve_template_outputs(
1671            &input.template_body,
1672            &input.parameters,
1673            &resources_snapshot,
1674            &self.state,
1675        );
1676        Self::ensure_export_uniqueness(&self.state, &req.account_id, &input.stack_name, &outputs)?;
1677        {
1678            let mut accounts = self.state.write();
1679            let state = accounts.get_or_create(&req.account_id);
1680            if let Some(stack) = state
1681                .stacks
1682                .values_mut()
1683                .find(|s| s.stack_id == stack_id && s.status != "DELETE_COMPLETE")
1684            {
1685                stack.outputs = outputs.clone();
1686            }
1687            Self::sync_exports_imports(
1688                state,
1689                &stack_id,
1690                &input.stack_name,
1691                &outputs,
1692                &imported_names,
1693            );
1694        }
1695
1696        Self::send_stack_notification(
1697            &self.deps.delivery,
1698            &notification_arns,
1699            &stack_name_for_notif,
1700            &stack_id,
1701            "UPDATE_COMPLETE",
1702        );
1703
1704        // Persist every snapshot-backed service the update touched, so created
1705        // or deleted resources are reflected on disk after a restart.
1706        persist_touched_services(&self.snapshot_hooks, touched_types).await;
1707
1708        Ok(AwsResponse::xml(
1709            StatusCode::OK,
1710            xml_responses::update_stack_response(&stack_id, &req.request_id),
1711        ))
1712    }
1713
1714    fn get_template(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1715        // GetTemplate has no `@required` members in Smithy; tolerate omission.
1716        let stack_name = Self::get_param(req, "StackName").unwrap_or_default();
1717
1718        let accounts = self.state.read();
1719        let empty = CloudFormationState::new(&req.account_id, &req.region);
1720        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1721        // Stack-not-found has no declared shape on GetTemplate
1722        // (`ChangeSetNotFoundException` is the only declared error). Return
1723        // an empty template body rather than emit an undeclared
1724        // `ValidationError` for synthetic conformance inputs.
1725        let body = state
1726            .stacks
1727            .values()
1728            .find(|s| {
1729                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1730            })
1731            .map(|s| s.template.clone())
1732            .unwrap_or_default();
1733
1734        Ok(AwsResponse::xml(
1735            StatusCode::OK,
1736            xml_responses::get_template_response(&body, &req.request_id),
1737        ))
1738    }
1739}
1740
1741#[async_trait]
1742impl AwsService for CloudFormationService {
1743    fn service_name(&self) -> &str {
1744        "cloudformation"
1745    }
1746
1747    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1748        let action = req.action.as_str();
1749
1750        // Validate scalar field constraints against the Smithy model
1751        // before dispatching. Per-handler logic still owns business
1752        // validation (cross-field checks, parsing, existence). This
1753        // catches length / range / enum violations uniformly so every
1754        // operation returns a `ValidationError` instead of `200 OK` on
1755        // malformed scalars.
1756        crate::input_constraints::validate_input(action, &Self::get_all_params(&req))?;
1757
1758        // Only ops whose handlers actually write to per-account state
1759        // need to trigger snapshot persistence. Pass-through ops that
1760        // return canned IDs but don't touch state are excluded.
1761        let mutates = matches!(
1762            action,
1763            "CreateStack"
1764                | "DeleteStack"
1765                | "UpdateStack"
1766                | "CreateChangeSet"
1767                | "DeleteChangeSet"
1768                | "ExecuteChangeSet"
1769                | "CreateStackSet"
1770                | "DeleteStackSet"
1771                | "CreateStackRefactor"
1772                | "CreateGeneratedTemplate"
1773                | "DeleteGeneratedTemplate"
1774                | "SetStackPolicy"
1775                | "UpdateTerminationProtection"
1776                | "ActivateOrganizationsAccess"
1777                | "DeactivateOrganizationsAccess"
1778        );
1779        let result = match action {
1780            "CreateStack" => self.create_stack(&req).await,
1781            "DeleteStack" => self.delete_stack(&req).await,
1782            "DescribeStacks" => self.describe_stacks(&req),
1783            "ListStacks" => self.list_stacks(&req),
1784            "ListStackResources" => self.list_stack_resources(&req),
1785            "DescribeStackResources" => self.describe_stack_resources(&req),
1786            "UpdateStack" => self.update_stack(&req).await,
1787            "GetTemplate" => self.get_template(&req),
1788            _ => self.handle_extra_action(&req),
1789        };
1790        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
1791            self.save_snapshot().await;
1792        }
1793        // ExecuteChangeSet provisions resources by mutating each service's
1794        // shared state directly but -- unlike CreateStack/UpdateStack/DeleteStack
1795        // -- never triggered the per-service snapshot hook, so the provisioned
1796        // resources were never written through to disk (#1766 class). `cdk
1797        // deploy`, `aws cloudformation deploy`, and SAM all provision via
1798        // CreateChangeSet + ExecuteChangeSet (not CreateStack), so without this
1799        // their resources reported CREATE_COMPLETE yet vanished on restart.
1800        // save_snapshot above only persists CloudFormation's own stack metadata;
1801        // flush every snapshot-backed service the changeset could have touched.
1802        if action == "ExecuteChangeSet"
1803            && matches!(result.as_ref(), Ok(resp) if resp.status.is_success())
1804        {
1805            for hook in self.snapshot_hooks.values() {
1806                hook().await;
1807            }
1808        }
1809        result
1810    }
1811
1812    fn supported_actions(&self) -> &[&str] {
1813        &[
1814            "ActivateOrganizationsAccess",
1815            "ActivateType",
1816            "BatchDescribeTypeConfigurations",
1817            "CancelUpdateStack",
1818            "ContinueUpdateRollback",
1819            "CreateChangeSet",
1820            "CreateGeneratedTemplate",
1821            "CreateStack",
1822            "CreateStackInstances",
1823            "CreateStackRefactor",
1824            "CreateStackSet",
1825            "DeactivateOrganizationsAccess",
1826            "DeactivateType",
1827            "DeleteChangeSet",
1828            "DeleteGeneratedTemplate",
1829            "DeleteStack",
1830            "DeleteStackInstances",
1831            "DeleteStackSet",
1832            "DeregisterType",
1833            "DescribeAccountLimits",
1834            "DescribeChangeSet",
1835            "DescribeChangeSetHooks",
1836            "DescribeEvents",
1837            "DescribeGeneratedTemplate",
1838            "DescribeOrganizationsAccess",
1839            "DescribePublisher",
1840            "DescribeResourceScan",
1841            "DescribeStackDriftDetectionStatus",
1842            "DescribeStackEvents",
1843            "DescribeStackInstance",
1844            "DescribeStackRefactor",
1845            "DescribeStackResource",
1846            "DescribeStackResourceDrifts",
1847            "DescribeStackResources",
1848            "DescribeStackSet",
1849            "DescribeStackSetOperation",
1850            "DescribeStacks",
1851            "DescribeType",
1852            "DescribeTypeRegistration",
1853            "DetectStackDrift",
1854            "DetectStackResourceDrift",
1855            "DetectStackSetDrift",
1856            "EstimateTemplateCost",
1857            "ExecuteChangeSet",
1858            "ExecuteStackRefactor",
1859            "GetGeneratedTemplate",
1860            "GetHookResult",
1861            "GetStackPolicy",
1862            "GetTemplate",
1863            "GetTemplateSummary",
1864            "ImportStacksToStackSet",
1865            "ListChangeSets",
1866            "ListExports",
1867            "ListGeneratedTemplates",
1868            "ListHookResults",
1869            "ListImports",
1870            "ListResourceScanRelatedResources",
1871            "ListResourceScanResources",
1872            "ListResourceScans",
1873            "ListStackInstanceResourceDrifts",
1874            "ListStackInstances",
1875            "ListStackRefactorActions",
1876            "ListStackRefactors",
1877            "ListStackResources",
1878            "ListStackSetAutoDeploymentTargets",
1879            "ListStackSetOperationResults",
1880            "ListStackSetOperations",
1881            "ListStackSets",
1882            "ListStacks",
1883            "ListTypeRegistrations",
1884            "ListTypeVersions",
1885            "ListTypes",
1886            "PublishType",
1887            "RecordHandlerProgress",
1888            "RegisterPublisher",
1889            "RegisterType",
1890            "RollbackStack",
1891            "SetStackPolicy",
1892            "SetTypeConfiguration",
1893            "SetTypeDefaultVersion",
1894            "SignalResource",
1895            "StartResourceScan",
1896            "StopStackSetOperation",
1897            "TestType",
1898            "UpdateGeneratedTemplate",
1899            "UpdateStack",
1900            "UpdateStackInstances",
1901            "UpdateStackSet",
1902            "UpdateTerminationProtection",
1903            "ValidateTemplate",
1904        ]
1905    }
1906}
1907
1908/// Parsed + validated inputs for `UpdateStack`.
1909struct UpdateStackInput {
1910    stack_name: String,
1911    template_body: String,
1912    parameters: BTreeMap<String, String>,
1913    tags: BTreeMap<String, String>,
1914    notification_arns: Vec<String>,
1915}
1916
1917impl UpdateStackInput {
1918    fn from_params(req: &AwsRequest) -> Result<Self, AwsServiceError> {
1919        let params = CloudFormationService::get_all_params(req);
1920
1921        let stack_name = params
1922            .get("StackName")
1923            .ok_or_else(|| {
1924                AwsServiceError::aws_error(
1925                    StatusCode::BAD_REQUEST,
1926                    "ValidationError",
1927                    "StackName is required",
1928                )
1929            })?
1930            .to_string();
1931
1932        // TemplateBody isn't `@required` in Smithy (TemplateURL +
1933        // UsePreviousTemplate are alternatives). Treat omission as an
1934        // empty body so synthetic conformance inputs don't trip an
1935        // undeclared `ValidationError`.
1936        let template_body = params.get("TemplateBody").cloned().unwrap_or_default();
1937
1938        let mut parameters = CloudFormationService::extract_parameters(&params);
1939        CloudFormationService::merge_parameter_defaults(&mut parameters, &template_body);
1940        Ok(Self {
1941            stack_name,
1942            template_body,
1943            parameters,
1944            tags: CloudFormationService::extract_tags(&params),
1945            notification_arns: CloudFormationService::extract_notification_arns(&params),
1946        })
1947    }
1948}
1949
1950/// One row of structured diff returned by `apply_resource_updates`. Used
1951/// by `ExecuteChangeSet` to emit `StackEvent` rows so `DescribeStackEvents`
1952/// reflects the resources actually created / updated / deleted.
1953#[derive(Debug, Clone)]
1954pub(crate) struct ResourceChange {
1955    pub action: ResourceChangeAction,
1956    pub logical_id: String,
1957    pub physical_id: String,
1958    pub resource_type: String,
1959}
1960
1961#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1962pub(crate) enum ResourceChangeAction {
1963    Create,
1964    Update,
1965    Delete,
1966}
1967
1968impl ResourceChangeAction {
1969    pub fn status_in_progress(self) -> &'static str {
1970        match self {
1971            Self::Create => "CREATE_IN_PROGRESS",
1972            Self::Update => "UPDATE_IN_PROGRESS",
1973            Self::Delete => "DELETE_IN_PROGRESS",
1974        }
1975    }
1976    pub fn status_complete(self) -> &'static str {
1977        match self {
1978            Self::Create => "CREATE_COMPLETE",
1979            Self::Update => "UPDATE_COMPLETE",
1980            Self::Delete => "DELETE_COMPLETE",
1981        }
1982    }
1983}
1984
1985/// Apply resource updates: delete removed resources, create new ones, and
1986/// in-place update resources whose properties changed. Returns the list of
1987/// changes applied (for event emission) on success or `Err(msg)` if any
1988/// resource operation fails.
1989pub(crate) fn apply_resource_updates(
1990    stack: &mut crate::state::Stack,
1991    new_resource_defs: &[template::ResourceDefinition],
1992    template_body: &str,
1993    parameters: &BTreeMap<String, String>,
1994    provisioner: &crate::resource_provisioner::ResourceProvisioner,
1995    imports: &BTreeMap<String, String>,
1996) -> Result<Vec<ResourceChange>, String> {
1997    let mut changes: Vec<ResourceChange> = Vec::new();
1998    let old_logical_ids: std::collections::HashSet<String> = stack
1999        .resources
2000        .iter()
2001        .map(|r| r.logical_id.clone())
2002        .collect();
2003    let new_logical_ids: std::collections::HashSet<String> = new_resource_defs
2004        .iter()
2005        .map(|r| r.logical_id.clone())
2006        .collect();
2007
2008    // Delete resources no longer in template
2009    let to_remove: Vec<_> = stack
2010        .resources
2011        .iter()
2012        .filter(|r| !new_logical_ids.contains(&r.logical_id))
2013        .cloned()
2014        .collect();
2015    for resource in &to_remove {
2016        let _ = provisioner.delete_resource(resource);
2017        changes.push(ResourceChange {
2018            action: ResourceChangeAction::Delete,
2019            logical_id: resource.logical_id.clone(),
2020            physical_id: resource.physical_id.clone(),
2021            resource_type: resource.resource_type.clone(),
2022        });
2023    }
2024    stack
2025        .resources
2026        .retain(|r| new_logical_ids.contains(&r.logical_id));
2027
2028    // Build physical ID + attribute maps from existing resources
2029    let mut physical_ids: BTreeMap<String, String> = stack
2030        .resources
2031        .iter()
2032        .map(|r| (r.logical_id.clone(), r.physical_id.clone()))
2033        .collect();
2034    let mut attributes: BTreeMap<String, BTreeMap<String, String>> = stack
2035        .resources
2036        .iter()
2037        .map(|r| (r.logical_id.clone(), r.attributes.clone()))
2038        .collect();
2039
2040    // Create new resources / update resources that already exist. Provision in
2041    // dependency order so a `Ref`/`GetAtt`/`Fn::Sub`/`DependsOn` to another
2042    // resource resolves to that resource's physical id rather than its bare
2043    // logical id (which would otherwise get baked into derived state — e.g. a
2044    // Step Functions ASL referencing a Lambda declared later in the template).
2045    let order = template::dependency_order(template_body, parameters, new_resource_defs);
2046    for &idx in &order {
2047        let resource_def = &new_resource_defs[idx];
2048        let resolved_def = template::resolve_resource_properties_with_attrs(
2049            resource_def,
2050            template_body,
2051            parameters,
2052            &physical_ids,
2053            &attributes,
2054            imports,
2055        )
2056        .map_err(|e| {
2057            format!(
2058                "Failed to resolve resource {}: {e}",
2059                resource_def.logical_id
2060            )
2061        })?;
2062
2063        if !old_logical_ids.contains(&resource_def.logical_id) {
2064            match provisioner.create_resource(&resolved_def) {
2065                Ok(stack_resource) => {
2066                    changes.push(ResourceChange {
2067                        action: ResourceChangeAction::Create,
2068                        logical_id: stack_resource.logical_id.clone(),
2069                        physical_id: stack_resource.physical_id.clone(),
2070                        resource_type: stack_resource.resource_type.clone(),
2071                    });
2072                    physical_ids.insert(
2073                        stack_resource.logical_id.clone(),
2074                        stack_resource.physical_id.clone(),
2075                    );
2076                    attributes.insert(
2077                        stack_resource.logical_id.clone(),
2078                        stack_resource.attributes.clone(),
2079                    );
2080                    stack.resources.push(stack_resource);
2081                }
2082                Err(e) => {
2083                    tracing::warn!(
2084                        "Failed to create resource {} during update: {e}",
2085                        resource_def.logical_id
2086                    );
2087                    return Err(format!(
2088                        "Failed to create resource {}: {e}",
2089                        resource_def.logical_id
2090                    ));
2091                }
2092            }
2093        } else {
2094            // Resource exists in both old and new templates — try to apply
2095            // an in-place update. The provisioner returns `Ok(None)` for
2096            // resource types that don't support updates yet; in that case
2097            // the existing resource stays as-is so the rest of the stack
2098            // continues to validate.
2099            let existing = stack
2100                .resources
2101                .iter()
2102                .find(|r| r.logical_id == resource_def.logical_id)
2103                .cloned();
2104            if let Some(existing) = existing {
2105                match provisioner.update_resource(&existing, &resolved_def) {
2106                    Ok(Some(updated)) => {
2107                        changes.push(ResourceChange {
2108                            action: ResourceChangeAction::Update,
2109                            logical_id: updated.logical_id.clone(),
2110                            physical_id: updated.physical_id.clone(),
2111                            resource_type: updated.resource_type.clone(),
2112                        });
2113                        physical_ids
2114                            .insert(updated.logical_id.clone(), updated.physical_id.clone());
2115                        attributes.insert(updated.logical_id.clone(), updated.attributes.clone());
2116                        if let Some(slot) = stack
2117                            .resources
2118                            .iter_mut()
2119                            .find(|r| r.logical_id == updated.logical_id)
2120                        {
2121                            *slot = updated;
2122                        }
2123                    }
2124                    Ok(None) => {
2125                        // Resource type has no update path — leave the
2126                        // existing physical resource untouched.
2127                    }
2128                    Err(e) => {
2129                        tracing::warn!(
2130                            "Failed to update resource {} during update: {e}",
2131                            resource_def.logical_id
2132                        );
2133                        return Err(format!(
2134                            "Failed to update resource {}: {e}",
2135                            resource_def.logical_id
2136                        ));
2137                    }
2138                }
2139            }
2140        }
2141    }
2142
2143    Ok(changes)
2144}
2145
2146/// Pushes a single `StackEvent` row onto the per-stack event log so
2147/// `DescribeStackEvents` returns a chronological history of resource and
2148/// stack-level state transitions.
2149pub(crate) fn record_event(
2150    state: &mut crate::state::CloudFormationState,
2151    stack_id: &str,
2152    stack_name: &str,
2153    logical_id: &str,
2154    physical_id: &str,
2155    resource_type: &str,
2156    status: &str,
2157) {
2158    use serde_json::json;
2159    let event_id = format!(
2160        "{}-{:x}",
2161        logical_id,
2162        std::time::SystemTime::now()
2163            .duration_since(std::time::UNIX_EPOCH)
2164            .map(|d| d.as_nanos())
2165            .unwrap_or(0)
2166    );
2167    let log = state.events.entry(stack_id.to_string()).or_default();
2168
2169    // Timestamps must be sub-second AND strictly increasing within a stack's
2170    // event log. `sam`'s deploy-wait reads the REVIEW_IN_PROGRESS marker's
2171    // timestamp and then only registers events whose `Timestamp` is strictly
2172    // greater; a fast stack that provisions within one second would otherwise
2173    // stamp its REVIEW_IN_PROGRESS marker and terminal CREATE_COMPLETE
2174    // identically, so sam never sees completion and polls until it times out.
2175    // Use millisecond precision and bump by 1ms when the clock hasn't advanced
2176    // past the previous event, guaranteeing a strict order.
2177    // Truncate to the stored (millisecond) resolution before comparing, so the
2178    // strict-ordering check isn't fooled by sub-millisecond bits that vanish on
2179    // serialization (two events in the same millisecond would otherwise both
2180    // serialize identically despite `now > prev` holding at full precision).
2181    let now = chrono::DateTime::from_timestamp_millis(Utc::now().timestamp_millis())
2182        .unwrap_or_else(Utc::now);
2183    let timestamp = match log.last().and_then(|e| e["Timestamp"].as_str()) {
2184        Some(prev) => match chrono::DateTime::parse_from_rfc3339(prev) {
2185            Ok(prev) => {
2186                let prev = prev.with_timezone(&Utc);
2187                if now > prev {
2188                    now
2189                } else {
2190                    prev + chrono::Duration::milliseconds(1)
2191                }
2192            }
2193            Err(_) => now,
2194        },
2195        None => now,
2196    };
2197
2198    log.push(json!({
2199        "EventId": event_id,
2200        "StackId": stack_id,
2201        "StackName": stack_name,
2202        "LogicalResourceId": logical_id,
2203        "PhysicalResourceId": physical_id,
2204        "ResourceType": resource_type,
2205        "ResourceStatus": status,
2206        "Timestamp": timestamp.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
2207    }));
2208}
2209
2210/// Emits IN_PROGRESS + COMPLETE event pairs for every resource change
2211/// applied during an update. Mirrors the event sequence real CloudFormation
2212/// publishes during `ExecuteChangeSet` / `UpdateStack`.
2213/// Persist the CloudFormation snapshot from a detached task. Mirrors
2214/// `CloudFormationService::save_snapshot` but takes owned handles so it can
2215/// run inside the background CreateStack provisioning task (see RDS's
2216/// `save_snapshot_static`).
2217async fn save_snapshot_static(
2218    state: SharedCloudFormationState,
2219    store: Option<Arc<dyn SnapshotStore>>,
2220    lock: Arc<AsyncMutex<()>>,
2221) {
2222    let Some(store) = store else {
2223        return;
2224    };
2225    let _guard = lock.lock().await;
2226    let snapshot = CloudFormationSnapshot {
2227        schema_version: CLOUDFORMATION_SNAPSHOT_SCHEMA_VERSION,
2228        state: None,
2229        accounts: Some(state.read().clone()),
2230    };
2231    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
2232        let bytes = serde_json::to_vec(&snapshot)
2233            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
2234        store.save(&bytes)
2235    })
2236    .await;
2237    match join {
2238        Ok(Ok(())) => {}
2239        Ok(Err(err)) => tracing::error!(%err, "failed to write cloudformation snapshot"),
2240        Err(err) => tracing::error!(%err, "cloudformation snapshot task panicked"),
2241    }
2242}
2243
2244pub(crate) fn record_stack_events(
2245    state: &mut crate::state::CloudFormationState,
2246    stack_id: &str,
2247    stack_name: &str,
2248    changes: &[ResourceChange],
2249) {
2250    for ch in changes {
2251        record_event(
2252            state,
2253            stack_id,
2254            stack_name,
2255            &ch.logical_id,
2256            &ch.physical_id,
2257            &ch.resource_type,
2258            ch.action.status_in_progress(),
2259        );
2260        record_event(
2261            state,
2262            stack_id,
2263            stack_name,
2264            &ch.logical_id,
2265            &ch.physical_id,
2266            &ch.resource_type,
2267            ch.action.status_complete(),
2268        );
2269    }
2270}
2271
2272/// Emits a stack-level lifecycle event (`UPDATE_IN_PROGRESS`,
2273/// `UPDATE_COMPLETE`, `UPDATE_ROLLBACK_COMPLETE`, etc.) keyed on the
2274/// stack's own `LogicalResourceId == stack_name`, matching real CFN.
2275pub(crate) fn record_stack_status_event(
2276    state: &mut crate::state::CloudFormationState,
2277    stack_id: &str,
2278    stack_name: &str,
2279    resource_type: &str,
2280    status: &str,
2281) {
2282    record_event(
2283        state,
2284        stack_id,
2285        stack_name,
2286        stack_name,
2287        stack_id,
2288        resource_type,
2289        status,
2290    );
2291}
2292
2293#[cfg(test)]
2294mod tests {
2295    use super::*;
2296    use http::HeaderMap;
2297    use parking_lot::RwLock;
2298    use std::collections::HashMap;
2299    use std::sync::Arc;
2300
2301    #[test]
2302    fn merge_parameter_defaults_fills_omitted_params() {
2303        // §1.6: a parameter the caller omitted gets its declared Default, so a
2304        // Ref to it resolves to the default instead of the bare param name.
2305        let template = r#"{
2306            "Parameters": {
2307                "InstanceType": {"Type": "String", "Default": "t3.micro"},
2308                "Count": {"Type": "Number", "Default": 3},
2309                "Supplied": {"Type": "String", "Default": "dflt"}
2310            },
2311            "Resources": {}
2312        }"#;
2313        let mut params = BTreeMap::new();
2314        params.insert("Supplied".to_string(), "override".to_string());
2315        CloudFormationService::merge_parameter_defaults(&mut params, template);
2316        assert_eq!(
2317            params.get("InstanceType").map(String::as_str),
2318            Some("t3.micro")
2319        );
2320        assert_eq!(params.get("Count").map(String::as_str), Some("3"));
2321        // A supplied value is not overwritten by the default.
2322        assert_eq!(params.get("Supplied").map(String::as_str), Some("override"));
2323    }
2324
2325    fn make_service() -> CloudFormationService {
2326        let cf_state = Arc::new(RwLock::new(
2327            fakecloud_core::multi_account::MultiAccountState::new(
2328                "123456789012",
2329                "us-east-1",
2330                "http://localhost:4566",
2331            ),
2332        ));
2333        let deps = CloudFormationDeps {
2334            sqs: Arc::new(RwLock::new(
2335                fakecloud_core::multi_account::MultiAccountState::new(
2336                    "123456789012",
2337                    "us-east-1",
2338                    "http://localhost:4566",
2339                ),
2340            )),
2341            sns: Arc::new(RwLock::new(
2342                fakecloud_core::multi_account::MultiAccountState::new(
2343                    "123456789012",
2344                    "us-east-1",
2345                    "http://localhost:4566",
2346                ),
2347            )),
2348            ssm: Arc::new(RwLock::new(
2349                fakecloud_core::multi_account::MultiAccountState::new(
2350                    "123456789012",
2351                    "us-east-1",
2352                    "http://localhost:4566",
2353                ),
2354            )),
2355            iam: Arc::new(RwLock::new(
2356                fakecloud_core::multi_account::MultiAccountState::new(
2357                    "123456789012",
2358                    "us-east-1",
2359                    "",
2360                ),
2361            )),
2362            s3: Arc::new(RwLock::new(
2363                fakecloud_core::multi_account::MultiAccountState::new(
2364                    "123456789012",
2365                    "us-east-1",
2366                    "",
2367                ),
2368            )),
2369            eventbridge: Arc::new(RwLock::new(
2370                fakecloud_core::multi_account::MultiAccountState::new(
2371                    "123456789012",
2372                    "us-east-1",
2373                    "",
2374                ),
2375            )),
2376            dynamodb: Arc::new(RwLock::new(
2377                fakecloud_core::multi_account::MultiAccountState::new(
2378                    "123456789012",
2379                    "us-east-1",
2380                    "",
2381                ),
2382            )),
2383            logs: Arc::new(RwLock::new(
2384                fakecloud_core::multi_account::MultiAccountState::new(
2385                    "123456789012",
2386                    "us-east-1",
2387                    "",
2388                ),
2389            )),
2390            lambda: Arc::new(RwLock::new(
2391                fakecloud_core::multi_account::MultiAccountState::new(
2392                    "123456789012",
2393                    "us-east-1",
2394                    "",
2395                ),
2396            )),
2397            secretsmanager: Arc::new(RwLock::new(
2398                fakecloud_core::multi_account::MultiAccountState::new(
2399                    "123456789012",
2400                    "us-east-1",
2401                    "",
2402                ),
2403            )),
2404            kinesis: Arc::new(RwLock::new(
2405                fakecloud_core::multi_account::MultiAccountState::new(
2406                    "123456789012",
2407                    "us-east-1",
2408                    "",
2409                ),
2410            )),
2411            kms: Arc::new(RwLock::new(
2412                fakecloud_core::multi_account::MultiAccountState::new(
2413                    "123456789012",
2414                    "us-east-1",
2415                    "",
2416                ),
2417            )),
2418            ecr: Arc::new(RwLock::new(
2419                fakecloud_core::multi_account::MultiAccountState::new(
2420                    "123456789012",
2421                    "us-east-1",
2422                    "",
2423                ),
2424            )),
2425            cloudwatch: Arc::new(RwLock::new(fakecloud_cloudwatch::CloudWatchAccounts::new())),
2426            elbv2: Arc::new(RwLock::new(fakecloud_elbv2::Elbv2Accounts::new())),
2427            organizations: Arc::new(RwLock::new(None)),
2428            cognito: Arc::new(RwLock::new(
2429                fakecloud_core::multi_account::MultiAccountState::new(
2430                    "123456789012",
2431                    "us-east-1",
2432                    "",
2433                ),
2434            )),
2435            rds: Arc::new(RwLock::new(
2436                fakecloud_core::multi_account::MultiAccountState::new(
2437                    "123456789012",
2438                    "us-east-1",
2439                    "",
2440                ),
2441            )),
2442            ec2: Arc::new(RwLock::new(
2443                fakecloud_core::multi_account::MultiAccountState::new(
2444                    "123456789012",
2445                    "us-east-1",
2446                    "",
2447                ),
2448            )),
2449            ecs: Arc::new(RwLock::new(
2450                fakecloud_core::multi_account::MultiAccountState::new(
2451                    "123456789012",
2452                    "us-east-1",
2453                    "",
2454                ),
2455            )),
2456            acm: Arc::new(RwLock::new(fakecloud_acm::AcmAccounts::new())),
2457            elasticache: Arc::new(RwLock::new(
2458                fakecloud_core::multi_account::MultiAccountState::new(
2459                    "123456789012",
2460                    "us-east-1",
2461                    "",
2462                ),
2463            )),
2464            route53: Arc::new(RwLock::new(fakecloud_route53::Route53Accounts::new())),
2465            cloudfront: Arc::new(RwLock::new(fakecloud_cloudfront::CloudFrontAccounts::new())),
2466            stepfunctions: Arc::new(RwLock::new(
2467                fakecloud_core::multi_account::MultiAccountState::new(
2468                    "123456789012",
2469                    "us-east-1",
2470                    "",
2471                ),
2472            )),
2473            wafv2: Arc::new(RwLock::new(fakecloud_wafv2::Wafv2Accounts::default())),
2474            apigateway: Arc::new(RwLock::new(
2475                fakecloud_core::multi_account::MultiAccountState::new(
2476                    "123456789012",
2477                    "us-east-1",
2478                    "",
2479                ),
2480            )),
2481            apigatewayv2: Arc::new(RwLock::new(
2482                fakecloud_core::multi_account::MultiAccountState::new(
2483                    "123456789012",
2484                    "us-east-1",
2485                    "",
2486                ),
2487            )),
2488            ses: Arc::new(RwLock::new(
2489                fakecloud_core::multi_account::MultiAccountState::new(
2490                    "123456789012",
2491                    "us-east-1",
2492                    "",
2493                ),
2494            )),
2495            application_autoscaling: Arc::new(parking_lot::RwLock::new(
2496                fakecloud_application_autoscaling::ApplicationAutoScalingAccounts::new(),
2497            )),
2498            athena: Arc::new(parking_lot::RwLock::new(
2499                fakecloud_athena::AthenaAccounts::new(),
2500            )),
2501            firehose: Arc::new(parking_lot::RwLock::new(
2502                fakecloud_firehose::FirehoseAccounts::new(),
2503            )),
2504            glue: Arc::new(parking_lot::RwLock::new(fakecloud_glue::GlueAccounts::new())),
2505            delivery: Arc::new(DeliveryBus::new()),
2506            lambda_runtime: None,
2507        };
2508        CloudFormationService::new(cf_state, deps)
2509    }
2510
2511    fn make_request(action: &str, params: HashMap<String, String>) -> AwsRequest {
2512        AwsRequest {
2513            service: "cloudformation".to_string(),
2514            action: action.to_string(),
2515            region: "us-east-1".to_string(),
2516            account_id: "123456789012".to_string(),
2517            request_id: "test-request-id".to_string(),
2518            headers: HeaderMap::new(),
2519            query_params: params,
2520            body: bytes::Bytes::new(),
2521            body_stream: parking_lot::Mutex::new(None),
2522            path_segments: vec![],
2523            raw_path: "/".to_string(),
2524            raw_query: String::new(),
2525            method: http::Method::POST,
2526            is_query_protocol: true,
2527            access_key_id: None,
2528            principal: None,
2529        }
2530    }
2531
2532    #[tokio::test]
2533    async fn update_stack_sets_failed_status_on_resource_error() {
2534        let svc = make_service();
2535
2536        // Create a stack with just a queue
2537        let mut create_params = HashMap::new();
2538        create_params.insert("StackName".to_string(), "test-stack".to_string());
2539        create_params.insert(
2540            "TemplateBody".to_string(),
2541            r#"{"Resources":{"MyQueue":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"q1"}}}}"#.to_string(),
2542        );
2543        let req = make_request("CreateStack", create_params);
2544        let result = svc.create_stack(&req).await;
2545        assert!(result.is_ok());
2546
2547        // Update stack adding an SNS subscription with a non-existent topic
2548        let mut update_params = HashMap::new();
2549        update_params.insert("StackName".to_string(), "test-stack".to_string());
2550        update_params.insert(
2551            "TemplateBody".to_string(),
2552            r#"{"Resources":{"MyQueue":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"q1"}},"BadSub":{"Type":"AWS::SNS::Subscription","Properties":{"TopicArn":"arn:aws:sns:us-east-1:123456789012:nope","Protocol":"sqs","Endpoint":"arn:aws:sqs:us-east-1:123456789012:q1"}}}}"#.to_string(),
2553        );
2554        let req = make_request("UpdateStack", update_params);
2555        let result = svc.update_stack(&req).await;
2556
2557        // Should return an error
2558        assert!(result.is_err());
2559
2560        // Stack status should be UPDATE_ROLLBACK_COMPLETE — matches the
2561        // terminal status real CloudFormation lands on after a failed
2562        // update attempt that gets rolled back.
2563        let accounts = svc.state.read();
2564        let state = accounts.get("123456789012").unwrap();
2565        let stack = state.stacks.get("test-stack").unwrap();
2566        assert_eq!(stack.status, "UPDATE_ROLLBACK_COMPLETE");
2567    }
2568
2569    #[tokio::test]
2570    async fn create_stack_resolves_ref_to_physical_id() {
2571        let svc = make_service();
2572
2573        // Template where subscription Refs the topic
2574        let template = r#"{
2575            "Resources": {
2576                "MyTopic": {
2577                    "Type": "AWS::SNS::Topic",
2578                    "Properties": { "TopicName": "ref-test-topic" }
2579                },
2580                "MySub": {
2581                    "Type": "AWS::SNS::Subscription",
2582                    "Properties": {
2583                        "TopicArn": { "Ref": "MyTopic" },
2584                        "Protocol": "sqs",
2585                        "Endpoint": "arn:aws:sqs:us-east-1:123456789012:some-queue"
2586                    }
2587                }
2588            }
2589        }"#;
2590
2591        let mut params = HashMap::new();
2592        params.insert("StackName".to_string(), "ref-stack".to_string());
2593        params.insert("TemplateBody".to_string(), template.to_string());
2594        let req = make_request("CreateStack", params);
2595        let result = svc.create_stack(&req).await;
2596        assert!(result.is_ok(), "CreateStack failed: {:?}", result.err());
2597
2598        // Verify both resources were created
2599        let accounts = svc.state.read();
2600        let state = accounts.get("123456789012").unwrap();
2601        let stack = state.stacks.get("ref-stack").unwrap();
2602        assert_eq!(stack.resources.len(), 2);
2603        assert_eq!(stack.status, "CREATE_COMPLETE");
2604
2605        // The subscription's physical ID should be an ARN (not just "MyTopic")
2606        let sub = stack
2607            .resources
2608            .iter()
2609            .find(|r| r.logical_id == "MySub")
2610            .unwrap();
2611        assert!(
2612            sub.physical_id.contains("ref-test-topic"),
2613            "Subscription physical ID should reference the topic ARN, got: {}",
2614            sub.physical_id
2615        );
2616    }
2617
2618    /// On the multi-thread server runtime, a stack containing a custom
2619    /// resource (whose provisioning can block for minutes on a cold Lambda
2620    /// image pull) must NOT be provisioned synchronously inside the request
2621    /// handler — CreateStack returns the StackId immediately and DescribeStacks
2622    /// observes CREATE_IN_PROGRESS -> CREATE_COMPLETE (bug-audit 2026-06-13,
2623    /// 3.1).
2624    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2625    async fn create_stack_custom_resource_provisions_asynchronously() {
2626        let svc = make_service();
2627        let template = r#"{
2628            "Resources": {
2629                "MyCustom": {
2630                    "Type": "Custom::Thing",
2631                    "Properties": {
2632                        "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:handler"
2633                    }
2634                }
2635            }
2636        }"#;
2637        let mut params = HashMap::new();
2638        params.insert("StackName".to_string(), "async-stack".to_string());
2639        params.insert("TemplateBody".to_string(), template.to_string());
2640        let req = make_request("CreateStack", params);
2641
2642        // CreateStack returns promptly with a StackId; provisioning runs in a
2643        // detached background task. The stack is recorded before the task can
2644        // possibly finish, so right after return it is at worst already
2645        // terminal — never a value that proves the handler blocked on the
2646        // (potentially minutes-long) provisioning loop. The key guarantee is
2647        // that the call returns without running the provisioner inline.
2648        let resp = svc
2649            .create_stack(&req)
2650            .await
2651            .expect("create returns StackId");
2652        assert!(resp.status.is_success());
2653        {
2654            let accounts = svc.state.read();
2655            let stack = accounts
2656                .get("123456789012")
2657                .unwrap()
2658                .stacks
2659                .get("async-stack")
2660                .expect("stack seeded synchronously");
2661            assert!(
2662                stack.status == "CREATE_IN_PROGRESS" || stack.status == "CREATE_COMPLETE",
2663                "unexpected status right after create: {}",
2664                stack.status
2665            );
2666        }
2667
2668        // Poll DescribeStacks (via state) until the background task flips the
2669        // stack to its terminal CREATE_COMPLETE status.
2670        let mut status = String::new();
2671        for _ in 0..200 {
2672            {
2673                let accounts = svc.state.read();
2674                if let Some(stack) = accounts
2675                    .get("123456789012")
2676                    .and_then(|s| s.stacks.get("async-stack"))
2677                {
2678                    status = stack.status.clone();
2679                    if status != "CREATE_IN_PROGRESS" {
2680                        break;
2681                    }
2682                }
2683            }
2684            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2685        }
2686        assert_eq!(
2687            status, "CREATE_COMPLETE",
2688            "stack should reach CREATE_COMPLETE"
2689        );
2690
2691        let accounts = svc.state.read();
2692        let stack = accounts
2693            .get("123456789012")
2694            .unwrap()
2695            .stacks
2696            .get("async-stack")
2697            .unwrap();
2698        assert_eq!(stack.resources.len(), 1);
2699        assert_eq!(stack.resources[0].resource_type, "Custom::Thing");
2700    }
2701
2702    #[tokio::test]
2703    async fn output_getatt_resolves_well_known_attribute() {
2704        // An Output that GetAtts a well-known attribute the create handler does
2705        // not eagerly capture (SQS QueueUrl) must resolve to the live value, not
2706        // a `Queue.QueueUrl` placeholder. Regression for bug-hunt 2026-06-25 1.11:
2707        // resolve_template_outputs reads StackResource.attributes, which now
2708        // carries the live get_att overlay applied during provisioning.
2709        let svc = make_service();
2710        let template = r#"{
2711            "Resources": {
2712                "Queue": { "Type": "AWS::SQS::Queue", "Properties": { "QueueName": "out-q" } }
2713            },
2714            "Outputs": {
2715                "Url": { "Value": { "Fn::GetAtt": ["Queue", "QueueUrl"] } }
2716            }
2717        }"#;
2718        let mut params = HashMap::new();
2719        params.insert("StackName".to_string(), "out-stack".to_string());
2720        params.insert("TemplateBody".to_string(), template.to_string());
2721        svc.create_stack(&make_request("CreateStack", params))
2722            .await
2723            .expect("create returns StackId");
2724
2725        let mut url = String::new();
2726        for _ in 0..200 {
2727            {
2728                let accounts = svc.state.read();
2729                if let Some(stack) = accounts
2730                    .get("123456789012")
2731                    .and_then(|s| s.stacks.get("out-stack"))
2732                {
2733                    if stack.status != "CREATE_IN_PROGRESS" {
2734                        url = stack
2735                            .outputs
2736                            .iter()
2737                            .find(|o| o.key == "Url")
2738                            .map(|o| o.value.clone())
2739                            .unwrap_or_default();
2740                        break;
2741                    }
2742                }
2743            }
2744            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2745        }
2746        assert!(
2747            url.contains("out-q") && url != "Queue.QueueUrl",
2748            "GetAtt QueueUrl output should resolve to the live url, got {url:?}"
2749        );
2750    }
2751
2752    // ── Service error paths ──
2753
2754    #[tokio::test]
2755    async fn create_stack_missing_name_errors() {
2756        let svc = make_service();
2757        let mut params = HashMap::new();
2758        params.insert("TemplateBody".to_string(), "{}".to_string());
2759        let req = make_request("CreateStack", params);
2760        assert!(svc.create_stack(&req).await.is_err());
2761    }
2762
2763    #[tokio::test]
2764    async fn create_stack_missing_template_creates_empty_stack() {
2765        // `TemplateBody` isn't `@required` in Smithy and CreateStack
2766        // declares no `ValidationError` shape, so missing/placeholder
2767        // bodies now create an empty stack rather than rejecting with
2768        // an undeclared wire code (strict-mode conformance gap).
2769        let svc = make_service();
2770        let mut params = HashMap::new();
2771        params.insert("StackName".to_string(), "s".to_string());
2772        let req = make_request("CreateStack", params);
2773        svc.create_stack(&req)
2774            .await
2775            .expect("empty-body create succeeds");
2776    }
2777
2778    #[tokio::test]
2779    async fn create_stack_duplicate_errors() {
2780        let svc = make_service();
2781        let mut params = HashMap::new();
2782        params.insert("StackName".to_string(), "dup".to_string());
2783        params.insert(
2784            "TemplateBody".to_string(),
2785            r#"{"Resources":{"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"dq"}}}}"#
2786                .to_string(),
2787        );
2788        let req = make_request("CreateStack", params.clone());
2789        svc.create_stack(&req).await.unwrap();
2790        let req = make_request("CreateStack", params);
2791        assert!(svc.create_stack(&req).await.is_err());
2792    }
2793
2794    #[tokio::test]
2795    async fn create_stack_invalid_template_creates_empty_stack() {
2796        // CreateStack's Smithy `errors` list has no `ValidationError`
2797        // shape, so unparseable bodies degrade to an empty parsed
2798        // template instead of raising an undeclared wire code.
2799        let svc = make_service();
2800        let mut params = HashMap::new();
2801        params.insert("StackName".to_string(), "bad".to_string());
2802        params.insert("TemplateBody".to_string(), "not json".to_string());
2803        let req = make_request("CreateStack", params);
2804        svc.create_stack(&req)
2805            .await
2806            .expect("bad-body create succeeds");
2807    }
2808
2809    #[tokio::test]
2810    async fn delete_stack_unknown_is_noop() {
2811        let svc = make_service();
2812        let mut params = HashMap::new();
2813        params.insert("StackName".to_string(), "ghost".to_string());
2814        let req = make_request("DeleteStack", params);
2815        assert!(svc.delete_stack(&req).await.is_ok());
2816    }
2817
2818    #[test]
2819    fn describe_stacks_nonexistent_errors() {
2820        // Querying an explicit, unknown StackName returns AWS's
2821        // `ValidationError: Stack with id <name> does not exist` so deploy
2822        // tools that probe stack existence (SAM, `aws cloudformation
2823        // deploy`) get the signal they expect (issue #1646).
2824        let svc = make_service();
2825        let mut params = HashMap::new();
2826        params.insert("StackName".to_string(), "ghost".to_string());
2827        let req = make_request("DescribeStacks", params);
2828        match svc.describe_stacks(&req) {
2829            Ok(_) => panic!("ghost stack must return an error, not an empty list"),
2830            Err(e) => {
2831                assert_eq!(e.status(), StatusCode::BAD_REQUEST);
2832                assert_eq!(e.code(), "ValidationError");
2833                assert!(
2834                    e.message().contains("does not exist"),
2835                    "got: {}",
2836                    e.message()
2837                );
2838            }
2839        }
2840    }
2841
2842    #[test]
2843    fn describe_stacks_empty_returns_all() {
2844        let svc = make_service();
2845        let req = make_request("DescribeStacks", HashMap::new());
2846        let resp = svc.describe_stacks(&req).unwrap();
2847        let b = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
2848        assert!(b.contains("DescribeStacksResult"));
2849    }
2850
2851    #[test]
2852    fn list_stacks_empty_returns_ok() {
2853        let svc = make_service();
2854        let req = make_request("ListStacks", HashMap::new());
2855        let resp = svc.list_stacks(&req).unwrap();
2856        let b = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
2857        assert!(b.contains("ListStacksResult"));
2858    }
2859
2860    #[test]
2861    fn list_stack_resources_missing_name_returns_validation_error() {
2862        // ListStackResources declares no `errors` in Smithy, so any
2863        // AWS-shaped 4xx counts as a handler response. We reject an
2864        // omitted StackName with `ValidationError` to keep negative
2865        // conformance variants honest; unknown-but-supplied names still
2866        // resolve to an empty list (see the test below).
2867        let svc = make_service();
2868        let req = make_request("ListStackResources", HashMap::new());
2869        let err = match svc.list_stack_resources(&req) {
2870            Err(e) => e,
2871            Ok(_) => panic!("omitted StackName must be rejected"),
2872        };
2873        assert_eq!(err.code(), "ValidationError");
2874    }
2875
2876    #[test]
2877    fn list_stack_resources_unknown_stack_returns_empty() {
2878        let svc = make_service();
2879        let mut params = HashMap::new();
2880        params.insert("StackName".to_string(), "ghost".to_string());
2881        let req = make_request("ListStackResources", params);
2882        svc.list_stack_resources(&req).expect("unknown is empty");
2883    }
2884
2885    #[test]
2886    fn describe_stack_resources_missing_name_returns_empty() {
2887        let svc = make_service();
2888        let req = make_request("DescribeStackResources", HashMap::new());
2889        svc.describe_stack_resources(&req)
2890            .expect("missing name is ok");
2891    }
2892
2893    #[test]
2894    fn get_template_missing_name_returns_empty_body() {
2895        let svc = make_service();
2896        let req = make_request("GetTemplate", HashMap::new());
2897        svc.get_template(&req).expect("missing name is ok");
2898    }
2899
2900    #[test]
2901    fn get_template_unknown_stack_returns_empty_body() {
2902        let svc = make_service();
2903        let mut params = HashMap::new();
2904        params.insert("StackName".to_string(), "ghost".to_string());
2905        let req = make_request("GetTemplate", params);
2906        svc.get_template(&req).expect("unknown is empty");
2907    }
2908
2909    #[tokio::test]
2910    async fn update_stack_missing_name_errors() {
2911        let svc = make_service();
2912        let mut params = HashMap::new();
2913        params.insert("TemplateBody".to_string(), "{}".to_string());
2914        let req = make_request("UpdateStack", params);
2915        assert!(svc.update_stack(&req).await.is_err());
2916    }
2917
2918    #[tokio::test]
2919    async fn update_stack_unknown_stack_returns_synthetic_id() {
2920        // UpdateStack declares only `InsufficientCapabilitiesException`
2921        // and `TokenAlreadyExistsException`, neither of which fits
2922        // "stack does not exist". Synthetic conformance inputs target
2923        // a placeholder stack, so we return a synthetic StackId rather
2924        // than an undeclared `ValidationError`. Real callers create
2925        // the stack first.
2926        let svc = make_service();
2927        let mut params = HashMap::new();
2928        params.insert("StackName".to_string(), "ghost".to_string());
2929        params.insert(
2930            "TemplateBody".to_string(),
2931            r#"{"Resources":{}}"#.to_string(),
2932        );
2933        let req = make_request("UpdateStack", params);
2934        let resp = svc
2935            .update_stack(&req)
2936            .await
2937            .expect("ghost update is synthetic");
2938        let b = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
2939        assert!(b.contains("UpdateStackResult"));
2940    }
2941
2942    #[tokio::test]
2943    async fn create_stack_resolves_outputs_and_records_export() {
2944        let svc = make_service();
2945        let template = r#"{
2946            "Resources": {
2947                "Q": {"Type":"AWS::SQS::Queue","Properties":{"QueueName":"out-q"}}
2948            },
2949            "Outputs": {
2950                "QueueUrl": {
2951                    "Value": {"Ref": "Q"},
2952                    "Description": "Url",
2953                    "Export": {"Name": "TheQueueUrl"}
2954                }
2955            }
2956        }"#;
2957        let mut params = HashMap::new();
2958        params.insert("StackName".to_string(), "outs".to_string());
2959        params.insert("TemplateBody".to_string(), template.to_string());
2960        let req = make_request("CreateStack", params);
2961        svc.create_stack(&req).await.expect("create stack");
2962
2963        let accounts = svc.state.read();
2964        let stack = accounts
2965            .get("123456789012")
2966            .unwrap()
2967            .stacks
2968            .get("outs")
2969            .unwrap();
2970        assert_eq!(stack.outputs.len(), 1);
2971        assert_eq!(stack.outputs[0].key, "QueueUrl");
2972        assert_eq!(stack.outputs[0].export_name.as_deref(), Some("TheQueueUrl"));
2973        assert!(!stack.outputs[0].value.is_empty());
2974    }
2975
2976    #[tokio::test]
2977    async fn create_stack_rejects_duplicate_export_name() {
2978        let svc = make_service();
2979        let mk = |name: &str| {
2980            let template = format!(
2981                r#"{{
2982                    "Resources": {{"Q":{{"Type":"AWS::SQS::Queue","Properties":{{"QueueName":"q-{name}"}}}}}},
2983                    "Outputs": {{"QueueUrl":{{"Value":{{"Ref":"Q"}},"Export":{{"Name":"DupExport"}}}}}}
2984                }}"#
2985            );
2986            let mut params = HashMap::new();
2987            params.insert("StackName".to_string(), name.to_string());
2988            params.insert("TemplateBody".to_string(), template);
2989            make_request("CreateStack", params)
2990        };
2991        match svc.create_stack(&mk("first")).await {
2992            Ok(_) => {}
2993            Err(e) => panic!("first stack: {e:?}"),
2994        }
2995        // The second stack's export collides with the first. Since
2996        // provisioning is now asynchronous, the collision can no longer be a
2997        // synchronous CreateStack error — it surfaces as a failed create.
2998        // On the current-thread test runtime the provisioning task runs
2999        // inline, so the stack is already CREATE_FAILED on return.
3000        svc.create_stack(&mk("second"))
3001            .await
3002            .expect("CreateStack returns StackId even when provisioning fails");
3003        let accounts = svc.state.read();
3004        let stack = accounts
3005            .get("123456789012")
3006            .unwrap()
3007            .stacks
3008            .get("second")
3009            .expect("second stack recorded");
3010        assert_eq!(stack.status, "CREATE_FAILED");
3011        // The first stack keeps the export.
3012        let exports = &accounts.get("123456789012").unwrap().exports;
3013        assert_eq!(
3014            exports
3015                .get("DupExport")
3016                .map(|e| e.exporting_stack_name.as_str()),
3017            Some("first")
3018        );
3019    }
3020
3021    #[tokio::test]
3022    async fn import_value_resolves_against_other_stack_export() {
3023        let svc = make_service();
3024
3025        let producer_tpl = r#"{
3026            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"prod-q"}}},
3027            "Outputs": {"Out":{"Value":{"Ref":"Q"},"Export":{"Name":"SharedQueueUrl"}}}
3028        }"#;
3029        let mut p = HashMap::new();
3030        p.insert("StackName".to_string(), "producer".to_string());
3031        p.insert("TemplateBody".to_string(), producer_tpl.to_string());
3032        svc.create_stack(&make_request("CreateStack", p))
3033            .await
3034            .expect("producer");
3035
3036        let consumer_tpl = r#"{
3037            "Resources": {"Q2":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"cons-q"}}},
3038            "Outputs": {"Imp":{"Value":{"Fn::ImportValue":"SharedQueueUrl"}}}
3039        }"#;
3040        let mut p = HashMap::new();
3041        p.insert("StackName".to_string(), "consumer".to_string());
3042        p.insert("TemplateBody".to_string(), consumer_tpl.to_string());
3043        svc.create_stack(&make_request("CreateStack", p))
3044            .await
3045            .expect("consumer");
3046
3047        let accounts = svc.state.read();
3048        let prod_url = accounts
3049            .get("123456789012")
3050            .unwrap()
3051            .stacks
3052            .get("producer")
3053            .unwrap()
3054            .outputs[0]
3055            .value
3056            .clone();
3057        let cons = accounts
3058            .get("123456789012")
3059            .unwrap()
3060            .stacks
3061            .get("consumer")
3062            .unwrap();
3063        assert_eq!(cons.outputs[0].value, prod_url);
3064    }
3065
3066    #[tokio::test]
3067    async fn create_stack_records_export_in_state_registry() {
3068        let svc = make_service();
3069        let template = r#"{
3070            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"reg-q"}}},
3071            "Outputs": {"Url":{"Value":{"Ref":"Q"},"Export":{"Name":"reg-url"}}}
3072        }"#;
3073        let mut params = HashMap::new();
3074        params.insert("StackName".to_string(), "reg".to_string());
3075        params.insert("TemplateBody".to_string(), template.to_string());
3076        svc.create_stack(&make_request("CreateStack", params))
3077            .await
3078            .expect("create");
3079
3080        let accounts = svc.state.read();
3081        let state = accounts.get("123456789012").unwrap();
3082        let export = state
3083            .exports
3084            .get("reg-url")
3085            .expect("export registered in state.exports");
3086        assert_eq!(export.exporting_stack_name, "reg");
3087        assert!(!export.value.is_empty());
3088        assert!(export.exporting_stack_id.contains("reg"));
3089    }
3090
3091    #[tokio::test]
3092    async fn import_value_with_unknown_export_errors() {
3093        let svc = make_service();
3094        let consumer_tpl = r#"{
3095            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{
3096                "QueueName": {"Fn::ImportValue":"missing-export"}
3097            }}}
3098        }"#;
3099        let mut p = HashMap::new();
3100        p.insert("StackName".to_string(), "bad-consumer".to_string());
3101        p.insert("TemplateBody".to_string(), consumer_tpl.to_string());
3102        match svc.create_stack(&make_request("CreateStack", p)).await {
3103            Ok(_) => panic!("expected ValidationError for unknown export"),
3104            Err(e) => {
3105                let msg = format!("{e:?}");
3106                assert!(msg.contains("No export named missing-export"), "got {msg}");
3107            }
3108        }
3109    }
3110
3111    #[tokio::test]
3112    async fn delete_stack_blocked_when_export_in_use_and_unblocked_after_consumer_delete() {
3113        let svc = make_service();
3114
3115        let producer_tpl = r#"{
3116            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"prod"}}},
3117            "Outputs": {"Out":{"Value":{"Ref":"Q"},"Export":{"Name":"my-arn"}}}
3118        }"#;
3119        let mut p = HashMap::new();
3120        p.insert("StackName".to_string(), "producer".to_string());
3121        p.insert("TemplateBody".to_string(), producer_tpl.to_string());
3122        svc.create_stack(&make_request("CreateStack", p))
3123            .await
3124            .expect("producer");
3125
3126        let consumer_tpl = r#"{
3127            "Resources": {"Q2":{"Type":"AWS::SQS::Queue","Properties":{
3128                "QueueName": "cons-q",
3129                "Tags": [{"Key":"k","Value":{"Fn::ImportValue":"my-arn"}}]
3130            }}}
3131        }"#;
3132        let mut p = HashMap::new();
3133        p.insert("StackName".to_string(), "consumer".to_string());
3134        p.insert("TemplateBody".to_string(), consumer_tpl.to_string());
3135        svc.create_stack(&make_request("CreateStack", p))
3136            .await
3137            .expect("consumer");
3138
3139        // Producer delete must fail while consumer still imports.
3140        let mut p = HashMap::new();
3141        p.insert("StackName".to_string(), "producer".to_string());
3142        match svc.delete_stack(&make_request("DeleteStack", p)).await {
3143            Ok(_) => panic!("delete must fail while imports exist"),
3144            Err(e) => {
3145                let msg = format!("{e:?}");
3146                assert!(msg.contains("Export my-arn cannot be deleted"), "got {msg}");
3147            }
3148        }
3149
3150        // Delete consumer first.
3151        let mut p = HashMap::new();
3152        p.insert("StackName".to_string(), "consumer".to_string());
3153        svc.delete_stack(&make_request("DeleteStack", p))
3154            .await
3155            .expect("consumer delete");
3156
3157        // Now producer delete succeeds.
3158        let mut p = HashMap::new();
3159        p.insert("StackName".to_string(), "producer".to_string());
3160        svc.delete_stack(&make_request("DeleteStack", p))
3161            .await
3162            .expect("producer delete after consumer gone");
3163
3164        let accounts = svc.state.read();
3165        let state = accounts.get("123456789012").unwrap();
3166        assert!(state.exports.is_empty(), "exports cleared after delete");
3167        assert!(state.imports.is_empty(), "imports cleared after delete");
3168    }
3169
3170    // ---- CFN provisioner persistence (issue: CFN resources lost on restart) ----
3171
3172    use std::sync::atomic::{AtomicUsize, Ordering};
3173
3174    /// A snapshot hook that counts how many times it fires, standing in for a
3175    /// real service's whole-state persist.
3176    fn counting_hook(counter: Arc<AtomicUsize>) -> fakecloud_persistence::SnapshotHook {
3177        Arc::new(move || {
3178            let counter = counter.clone();
3179            Box::pin(async move {
3180                counter.fetch_add(1, Ordering::SeqCst);
3181            })
3182        })
3183    }
3184
3185    fn disk_s3_store(tmp: &tempfile::TempDir) -> Arc<fakecloud_persistence::s3::DiskS3Store> {
3186        let cache = Arc::new(fakecloud_persistence::cache::BodyCache::new(1024 * 1024));
3187        Arc::new(fakecloud_persistence::s3::DiskS3Store::new(
3188            tmp.path().to_path_buf(),
3189            cache,
3190        ))
3191    }
3192
3193    // A stack touching SQS + SNS (snapshot-backed) and an S3 bucket (S3Store
3194    // write-through). Lambda is registered as a hook below but NOT in the
3195    // template, so it must not fire -- proving per-service selectivity.
3196    const PERSIST_TEMPLATE: &str = r#"{"Resources":{
3197        "Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"cfn-q"}},
3198        "T":{"Type":"AWS::SNS::Topic","Properties":{"TopicName":"cfn-t"}},
3199        "B":{"Type":"AWS::S3::Bucket","Properties":{"BucketName":"cfn-bucket"}}
3200    }}"#;
3201
3202    fn create_req(stack: &str) -> AwsRequest {
3203        let mut p = HashMap::new();
3204        p.insert("StackName".to_string(), stack.to_string());
3205        p.insert("TemplateBody".to_string(), PERSIST_TEMPLATE.to_string());
3206        make_request("CreateStack", p)
3207    }
3208
3209    #[tokio::test]
3210    async fn cfn_create_persists_touched_services_and_writes_bucket_to_store() {
3211        let tmp = tempfile::tempdir().unwrap();
3212        let store = disk_s3_store(&tmp);
3213        let counter = Arc::new(AtomicUsize::new(0));
3214        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3215            BTreeMap::new();
3216        hooks.insert("sqs", counting_hook(counter.clone()));
3217        hooks.insert("sns", counting_hook(counter.clone()));
3218        // Registered but not in the template -> must not fire.
3219        hooks.insert("lambda", counting_hook(counter.clone()));
3220        let svc = make_service()
3221            .with_s3_store(store.clone())
3222            .with_snapshot_hooks(hooks);
3223
3224        svc.create_stack(&create_req("probe")).await.unwrap();
3225
3226        // sqs + sns fired once each; lambda untouched.
3227        assert_eq!(counter.load(Ordering::SeqCst), 2);
3228        // The bucket was written through to the S3 store, not just the in-memory map.
3229        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3230        assert!(
3231            loaded.buckets.contains_key("cfn-bucket"),
3232            "CFN bucket should be persisted to the S3 store"
3233        );
3234    }
3235
3236    #[tokio::test]
3237    async fn cfn_delete_persists_touched_services_and_removes_bucket_from_store() {
3238        let tmp = tempfile::tempdir().unwrap();
3239        let store = disk_s3_store(&tmp);
3240        let counter = Arc::new(AtomicUsize::new(0));
3241        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3242            BTreeMap::new();
3243        hooks.insert("sqs", counting_hook(counter.clone()));
3244        hooks.insert("sns", counting_hook(counter.clone()));
3245        let svc = make_service()
3246            .with_s3_store(store.clone())
3247            .with_snapshot_hooks(hooks);
3248
3249        svc.create_stack(&create_req("probe")).await.unwrap();
3250        assert_eq!(counter.load(Ordering::SeqCst), 2, "create fired sqs + sns");
3251
3252        let mut p = HashMap::new();
3253        p.insert("StackName".to_string(), "probe".to_string());
3254        svc.delete_stack(&make_request("DeleteStack", p))
3255            .await
3256            .unwrap();
3257
3258        // Delete fired the touched services again (sqs + sns).
3259        assert_eq!(counter.load(Ordering::SeqCst), 4, "delete fired sqs + sns");
3260        // And the CFN-deleted bucket is gone from the store, so it does not
3261        // reappear after a restart.
3262        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3263        assert!(
3264            !loaded.buckets.contains_key("cfn-bucket"),
3265            "CFN-deleted bucket should be removed from the S3 store"
3266        );
3267    }
3268
3269    #[tokio::test]
3270    async fn cfn_persist_skips_services_without_a_registered_hook() {
3271        // Only "sqs" has a hook; the stack also touches SNS and S3. The missing
3272        // hooks must be silently skipped (no panic), and "sqs" fires once.
3273        let tmp = tempfile::tempdir().unwrap();
3274        let store = disk_s3_store(&tmp);
3275        let counter = Arc::new(AtomicUsize::new(0));
3276        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3277            BTreeMap::new();
3278        hooks.insert("sqs", counting_hook(counter.clone()));
3279        let svc = make_service()
3280            .with_s3_store(store.clone())
3281            .with_snapshot_hooks(hooks);
3282
3283        svc.create_stack(&create_req("probe")).await.unwrap();
3284        assert_eq!(counter.load(Ordering::SeqCst), 1, "only sqs has a hook");
3285    }
3286
3287    #[tokio::test]
3288    async fn cfn_update_persists_touched_services() {
3289        // Create with just SQS, then update to a template that adds SNS + a
3290        // bucket; the update must persist the services it touches.
3291        let tmp = tempfile::tempdir().unwrap();
3292        let store = disk_s3_store(&tmp);
3293        let counter = Arc::new(AtomicUsize::new(0));
3294        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3295            BTreeMap::new();
3296        hooks.insert("sqs", counting_hook(counter.clone()));
3297        hooks.insert("sns", counting_hook(counter.clone()));
3298        let svc = make_service()
3299            .with_s3_store(store.clone())
3300            .with_snapshot_hooks(hooks);
3301
3302        let mut create = HashMap::new();
3303        create.insert("StackName".to_string(), "upd".to_string());
3304        create.insert(
3305            "TemplateBody".to_string(),
3306            r#"{"Resources":{"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"u-q"}}}}"#
3307                .to_string(),
3308        );
3309        svc.create_stack(&make_request("CreateStack", create))
3310            .await
3311            .unwrap();
3312        let after_create = counter.load(Ordering::SeqCst);
3313
3314        let mut update = HashMap::new();
3315        update.insert("StackName".to_string(), "upd".to_string());
3316        update.insert("TemplateBody".to_string(), PERSIST_TEMPLATE.to_string());
3317        svc.update_stack(&make_request("UpdateStack", update))
3318            .await
3319            .unwrap();
3320
3321        // The update touched at least SNS (added); the hook count must grow.
3322        assert!(
3323            counter.load(Ordering::SeqCst) > after_create,
3324            "update should persist the services it touched"
3325        );
3326        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3327        assert!(loaded.buckets.contains_key("cfn-bucket"));
3328    }
3329
3330    #[tokio::test]
3331    async fn cfn_execute_change_set_persists_touched_services() {
3332        // The changeset path -- CreateChangeSet + ExecuteChangeSet -- is how
3333        // `cdk deploy`, `aws cloudformation deploy`, and SAM provision. It must
3334        // write provisioned services through to disk the same way CreateStack
3335        // does, or the resources report CREATE_COMPLETE yet vanish on restart
3336        // (bug-audit 2026-06-20, 0.A1 / #1766 class).
3337        let tmp = tempfile::tempdir().unwrap();
3338        let store = disk_s3_store(&tmp);
3339        let counter = Arc::new(AtomicUsize::new(0));
3340        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3341            BTreeMap::new();
3342        hooks.insert("sqs", counting_hook(counter.clone()));
3343        let svc = make_service()
3344            .with_s3_store(store.clone())
3345            .with_snapshot_hooks(hooks);
3346
3347        let mut create = HashMap::new();
3348        create.insert("StackName".to_string(), "cs-stack".to_string());
3349        create.insert("ChangeSetName".to_string(), "cs1".to_string());
3350        create.insert("ChangeSetType".to_string(), "CREATE".to_string());
3351        create.insert(
3352            "TemplateBody".to_string(),
3353            r#"{"Resources":{"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"cs-q"}}}}"#
3354                .to_string(),
3355        );
3356        svc.handle(make_request("CreateChangeSet", create))
3357            .await
3358            .unwrap();
3359        // CreateChangeSet doesn't provision -- it must not persist a service yet.
3360        let before = counter.load(Ordering::SeqCst);
3361
3362        let mut exec = HashMap::new();
3363        exec.insert("StackName".to_string(), "cs-stack".to_string());
3364        exec.insert("ChangeSetName".to_string(), "cs1".to_string());
3365        svc.handle(make_request("ExecuteChangeSet", exec))
3366            .await
3367            .unwrap();
3368
3369        assert!(
3370            counter.load(Ordering::SeqCst) > before,
3371            "ExecuteChangeSet must fire the sqs snapshot hook so the provisioned \
3372             queue survives a restart"
3373        );
3374    }
3375
3376    #[test]
3377    fn service_key_for_type_maps_services_and_aliases() {
3378        // Direct service segments.
3379        assert_eq!(
3380            service_key_for_type("AWS::Lambda::Function"),
3381            Some("lambda")
3382        );
3383        assert_eq!(
3384            service_key_for_type("AWS::SecretsManager::Secret"),
3385            Some("secretsmanager")
3386        );
3387        assert_eq!(service_key_for_type("AWS::SQS::Queue"), Some("sqs"));
3388        assert_eq!(service_key_for_type("AWS::IAM::Role"), Some("iam"));
3389        assert_eq!(
3390            service_key_for_type("AWS::StepFunctions::StateMachine"),
3391            Some("stepfunctions")
3392        );
3393        // Namespace aliases that differ from the fakecloud service name.
3394        assert_eq!(
3395            service_key_for_type("AWS::Events::Rule"),
3396            Some("eventbridge")
3397        );
3398        assert_eq!(service_key_for_type("AWS::Logs::LogGroup"), Some("logs"));
3399        assert_eq!(
3400            service_key_for_type("AWS::ElastiCache::CacheCluster"),
3401            Some("elasticache")
3402        );
3403        // S3 has no snapshot hook (it persists via the S3Store write-through).
3404        assert_eq!(service_key_for_type("AWS::S3::Bucket"), None);
3405        // Snapshot-backed services whose CFN namespace differs from the
3406        // fakecloud service name (these were missing from the map, #1766 class).
3407        assert_eq!(
3408            service_key_for_type("AWS::CertificateManager::Certificate"),
3409            Some("acm")
3410        );
3411        assert_eq!(
3412            service_key_for_type("AWS::ElasticLoadBalancingV2::LoadBalancer"),
3413            Some("elbv2")
3414        );
3415        assert_eq!(
3416            service_key_for_type("AWS::CloudFront::Distribution"),
3417            Some("cloudfront")
3418        );
3419        assert_eq!(
3420            service_key_for_type("AWS::Route53::HostedZone"),
3421            Some("route53")
3422        );
3423        assert_eq!(
3424            service_key_for_type("AWS::KinesisFirehose::DeliveryStream"),
3425            Some("firehose")
3426        );
3427        assert_eq!(service_key_for_type("AWS::Glue::Database"), Some("glue"));
3428        assert_eq!(service_key_for_type("AWS::WAFv2::WebACL"), Some("wafv2"));
3429        assert_eq!(
3430            service_key_for_type("AWS::Athena::WorkGroup"),
3431            Some("athena")
3432        );
3433        assert_eq!(
3434            service_key_for_type("AWS::Organizations::Organization"),
3435            Some("organizations")
3436        );
3437        // Malformed / non-AWS types.
3438        assert_eq!(service_key_for_type("AWS::Lambda"), None);
3439        assert_eq!(service_key_for_type("Custom::Thing::Resource"), None);
3440        assert_eq!(service_key_for_type("AWS"), None);
3441        assert_eq!(service_key_for_type(""), None);
3442    }
3443
3444    #[tokio::test]
3445    async fn persist_touched_services_noop_with_empty_hooks() {
3446        // No registered hooks -> nothing to do, must not panic.
3447        let hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> = BTreeMap::new();
3448        persist_touched_services(&hooks, vec!["AWS::SQS::Queue".to_string()]).await;
3449    }
3450
3451    #[tokio::test]
3452    async fn cfn_bucket_policy_write_through_create_update_delete() {
3453        let tmp = tempfile::tempdir().unwrap();
3454        let store = disk_s3_store(&tmp);
3455        let svc = make_service().with_s3_store(store.clone());
3456
3457        // Create a bucket + bucket policy.
3458        let mut create = HashMap::new();
3459        create.insert("StackName".to_string(), "pol".to_string());
3460        create.insert(
3461            "TemplateBody".to_string(),
3462            r#"{"Resources":{
3463                "B":{"Type":"AWS::S3::Bucket","Properties":{"BucketName":"pol-bucket"}},
3464                "BP":{"Type":"AWS::S3::BucketPolicy","Properties":{"Bucket":"pol-bucket","PolicyDocument":{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:GetObject","Resource":"*","Principal":"*"}]}}}
3465            }}"#
3466            .to_string(),
3467        );
3468        svc.create_stack(&make_request("CreateStack", create))
3469            .await
3470            .unwrap();
3471        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3472        let policy = loaded.buckets["pol-bucket"]
3473            .subresources
3474            .get("policy.toml")
3475            .cloned()
3476            .expect("bucket policy persisted on create");
3477        assert!(policy.contains("s3:GetObject"));
3478
3479        // Update the policy document; the update must write through.
3480        let mut update = HashMap::new();
3481        update.insert("StackName".to_string(), "pol".to_string());
3482        update.insert(
3483            "TemplateBody".to_string(),
3484            r#"{"Resources":{
3485                "B":{"Type":"AWS::S3::Bucket","Properties":{"BucketName":"pol-bucket"}},
3486                "BP":{"Type":"AWS::S3::BucketPolicy","Properties":{"Bucket":"pol-bucket","PolicyDocument":{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:PutObject","Resource":"*","Principal":"*"}]}}}
3487            }}"#
3488            .to_string(),
3489        );
3490        svc.update_stack(&make_request("UpdateStack", update))
3491            .await
3492            .unwrap();
3493        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3494        let policy = loaded.buckets["pol-bucket"]
3495            .subresources
3496            .get("policy.toml")
3497            .cloned()
3498            .expect("bucket policy still persisted after update");
3499        assert!(
3500            policy.contains("s3:PutObject"),
3501            "updated policy should be written through"
3502        );
3503
3504        // Delete the stack; the bucket (and its policy) must be removed from disk.
3505        let mut del = HashMap::new();
3506        del.insert("StackName".to_string(), "pol".to_string());
3507        svc.delete_stack(&make_request("DeleteStack", del))
3508            .await
3509            .unwrap();
3510        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3511        assert!(
3512            !loaded.buckets.contains_key("pol-bucket"),
3513            "CFN-deleted bucket and policy should be gone from the store"
3514        );
3515    }
3516}