Skip to main content

fakecloud_cloudformation/
service.rs

1use async_trait::async_trait;
2use chrono::Utc;
3use http::StatusCode;
4use std::collections::{BTreeMap, BTreeSet};
5use std::sync::Arc;
6
7use fakecloud_core::delivery::DeliveryBus;
8use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
9use fakecloud_dynamodb::SharedDynamoDbState;
10use fakecloud_eventbridge::SharedEventBridgeState;
11use fakecloud_iam::SharedIamState;
12use fakecloud_logs::SharedLogsState;
13use fakecloud_persistence::{S3Store, SnapshotHook, SnapshotStore};
14use fakecloud_s3::SharedS3State;
15use fakecloud_sns::SharedSnsState;
16use fakecloud_sqs::SharedSqsState;
17use fakecloud_ssm::SharedSsmState;
18use tokio::sync::Mutex as AsyncMutex;
19
20use crate::resource_provisioner::ResourceProvisioner;
21use crate::state;
22use crate::state::{
23    CloudFormationSnapshot, CloudFormationState, SharedCloudFormationState, Stack, StackResource,
24    CLOUDFORMATION_SNAPSHOT_SCHEMA_VERSION,
25};
26use crate::template;
27use crate::xml_responses;
28
29/// Canonical `Fn::GetAtt` attribute names per resource type. Used to
30/// supplement the eagerly-captured `ProvisionResult::attributes` with
31/// live-state lookups via `ResourceProvisioner::get_att`. Resources not
32/// listed here just use whatever the create handler captured.
33fn well_known_attributes_for(resource_type: &str) -> &'static [&'static str] {
34    match resource_type {
35        "AWS::S3::Bucket" => &[
36            "Arn",
37            "DomainName",
38            "RegionalDomainName",
39            "DualStackDomainName",
40            "WebsiteURL",
41        ],
42        "AWS::Lambda::Function" => &["Arn", "FunctionUrl", "Version"],
43        "AWS::IAM::Role" => &["Arn", "RoleId"],
44        "AWS::SQS::Queue" => &["Arn", "QueueName", "QueueUrl"],
45        "AWS::SNS::Topic" => &["TopicArn", "TopicName"],
46        "AWS::DynamoDB::Table" => &["Arn", "StreamArn"],
47        "AWS::KMS::Key" => &["Arn", "KeyId"],
48        "AWS::SecretsManager::Secret" => &["Arn", "Id"],
49        "AWS::CloudFront::Distribution" => &["DomainName", "Id"],
50        "AWS::EC2::VPC" => &["VpcId", "CidrBlock"],
51        "AWS::EC2::Subnet" => &["SubnetId", "AvailabilityZone", "VpcId", "CidrBlock"],
52        "AWS::EC2::SecurityGroup" => &["GroupId", "VpcId"],
53        "AWS::EC2::InternetGateway" => &["InternetGatewayId"],
54        "AWS::EC2::RouteTable" => &["RouteTableId"],
55        _ => &[],
56    }
57}
58
59/// Map a CloudFormation resource type (`AWS::<Service>::<Resource>`) to the
60/// snapshot-hook key for its owning service (the keys of
61/// `CloudFormationDeps::snapshot_hooks`). The key is the service segment, with
62/// aliases for the few services whose CloudFormation namespace differs from
63/// their fakecloud service name (e.g. `Events` -> `eventbridge`).
64///
65/// `AWS::S3::*` is intentionally absent: S3 buckets persist through the
66/// `S3Store` write-through path in the provisioner, not a whole-state snapshot
67/// hook. Returns `None` for malformed types and any service whose state is not
68/// snapshot-backed (those CFN resources have no persistence path either way).
69fn service_key_for_type(resource_type: &str) -> Option<&'static str> {
70    let mut parts = resource_type.split("::");
71    let vendor = parts.next()?;
72    let service = parts.next()?;
73    // A real resource type has three segments; a 2-part string (or fewer) is
74    // not one.
75    parts.next()?;
76    if vendor != "AWS" {
77        return None;
78    }
79    Some(match service {
80        "Lambda" => "lambda",
81        "SecretsManager" => "secretsmanager",
82        "SQS" => "sqs",
83        "SNS" => "sns",
84        "DynamoDB" => "dynamodb",
85        "StepFunctions" => "stepfunctions",
86        "Events" => "eventbridge",
87        "SSM" => "ssm",
88        "Logs" => "logs",
89        "KMS" => "kms",
90        "Kinesis" => "kinesis",
91        "SES" => "ses",
92        "Cognito" => "cognito",
93        "RDS" => "rds",
94        "ElastiCache" => "elasticache",
95        "ECR" => "ecr",
96        "ECS" => "ecs",
97        "CloudWatch" => "cloudwatch",
98        "ApiGateway" => "apigateway",
99        "ApiGatewayV2" => "apigatewayv2",
100        "Bedrock" => "bedrock",
101        "Scheduler" => "scheduler",
102        "IAM" => "iam",
103        // Services whose CloudFormation namespace differs from their fakecloud
104        // service name, and which were missing from this table: their
105        // provisioner mutates snapshot-backed state directly, so CFN-created
106        // (and CFN-deleted) resources vanished on restart while their
107        // direct-API equivalents persisted (#1766 class). Each has a registered
108        // snapshot hook in the server.
109        "CertificateManager" => "acm",
110        "ElasticLoadBalancingV2" => "elbv2",
111        "CloudFront" => "cloudfront",
112        "Route53" => "route53",
113        "KinesisFirehose" => "firehose",
114        "Glue" => "glue",
115        "WAFv2" => "wafv2",
116        "Athena" => "athena",
117        "Organizations" => "organizations",
118        // EC2 and ApplicationAutoScaling were wired into the provisioner after
119        // this table was last audited (EC2: 2026-06-25 #1957;
120        // application-autoscaling: persistence sweep), so their CFN-created VPC
121        // / Subnet / SecurityGroup / scalable-target state mutated in memory and
122        // vanished on restart (#1766 class). Both have a registered snapshot
123        // hook in the server.
124        "EC2" => "ec2",
125        "AutoScaling" => "autoscaling",
126        "Batch" => "batch",
127        "ApplicationAutoScaling" => "application-autoscaling",
128        _ => return None,
129    })
130}
131
132/// Persist each distinct snapshot-backed service touched by a stack op, once.
133///
134/// `resource_types` is the set of `resource_type` strings the op created /
135/// updated / deleted. The CloudFormation provisioner mutates services' shared
136/// state directly and never triggers their snapshot path; this writes that
137/// state through to disk afterwards, so a CFN-provisioned (or CFN-deleted)
138/// resource survives a restart. Services with no registered hook (memory mode,
139/// or non-snapshot-backed services) are skipped.
140async fn persist_touched_services<I>(
141    hooks: &BTreeMap<&'static str, SnapshotHook>,
142    resource_types: I,
143) where
144    I: IntoIterator<Item = String>,
145{
146    if hooks.is_empty() {
147        return;
148    }
149    let mut keys: BTreeSet<&'static str> = BTreeSet::new();
150    for ty in resource_types {
151        if let Some(key) = service_key_for_type(&ty) {
152            keys.insert(key);
153        }
154    }
155    for key in keys {
156        if let Some(hook) = hooks.get(key) {
157            hook().await;
158        }
159    }
160}
161
162/// Multi-pass provisioning for all resources in a parsed template.
163///
164/// Resources may `Ref` each other in either direction, and JSON object
165/// iteration order isn't stable, so a single forward pass isn't enough
166/// to resolve them. We loop: each pass tries every pending resource, and
167/// any resource whose `Ref` targets are still unknown just stays pending
168/// for the next pass. When no pass makes progress we report the first
169/// pending failure and rollback.
170pub(crate) fn provision_stack_resources(
171    provisioner: &ResourceProvisioner,
172    resource_defs: &[template::ResourceDefinition],
173    template_body: &str,
174    parameters: &BTreeMap<String, String>,
175    imports: &BTreeMap<String, String>,
176) -> Result<Vec<StackResource>, AwsServiceError> {
177    let mut resources = Vec::new();
178    let mut physical_ids: BTreeMap<String, String> = BTreeMap::new();
179    let mut attributes: BTreeMap<String, BTreeMap<String, String>> = BTreeMap::new();
180    // Seed the work list in dependency order so a referenced resource is
181    // provisioned before its referrer and `Ref`/`GetAtt`/`Fn::Sub` resolve to
182    // physical ids. The fixed-point retry loop below still recovers from any
183    // residual ordering the graph can't express.
184    let order = template::dependency_order(template_body, parameters, resource_defs);
185    let mut pending: Vec<&template::ResourceDefinition> =
186        order.iter().map(|&i| &resource_defs[i]).collect();
187    let max_passes = pending.len() + 1;
188
189    for _ in 0..max_passes {
190        if pending.is_empty() {
191            break;
192        }
193        let mut still_pending = Vec::new();
194        let mut made_progress = false;
195
196        for resource_def in pending {
197            let resolved_def = template::resolve_resource_properties_with_attrs(
198                resource_def,
199                template_body,
200                parameters,
201                &physical_ids,
202                &attributes,
203                imports,
204            )
205            .map_err(|e| {
206                // `ValidationError` isn't declared on CreateStack/UpdateStack;
207                // surface template resolve failures through the closest declared
208                // shape (`InsufficientCapabilitiesException`) instead.
209                AwsServiceError::aws_error(
210                    StatusCode::BAD_REQUEST,
211                    "InsufficientCapabilitiesException",
212                    e,
213                )
214            })?;
215
216            match provisioner.create_resource(&resolved_def) {
217                Ok(mut stack_resource) => {
218                    physical_ids.insert(
219                        stack_resource.logical_id.clone(),
220                        stack_resource.physical_id.clone(),
221                    );
222                    // Start with the eagerly-captured attribute set, then
223                    // overlay anything the provisioner can resolve from
224                    // live state (e.g. attributes that depend on side-effects
225                    // recorded after the create handler returned).
226                    let mut attr_map = stack_resource.attributes.clone();
227                    for attr in well_known_attributes_for(&stack_resource.resource_type) {
228                        if attr_map.contains_key(*attr) {
229                            continue;
230                        }
231                        if let Some(v) = provisioner.get_att(&stack_resource, attr) {
232                            attr_map.insert((*attr).to_string(), v);
233                        }
234                    }
235                    attributes.insert(stack_resource.logical_id.clone(), attr_map.clone());
236                    // Persist the live-resolved attributes onto the stored
237                    // resource so later readers — notably resolve_template_outputs,
238                    // which reads StackResource.attributes directly without the
239                    // overlay — see the full GetAtt set, not just the eager subset.
240                    stack_resource.attributes = attr_map;
241                    resources.push(stack_resource);
242                    made_progress = true;
243                }
244                Err(_) => still_pending.push(resource_def),
245            }
246        }
247
248        pending = still_pending;
249        if !made_progress && !pending.is_empty() {
250            // No progress — report the first failure and rollback anything
251            // we already created.
252            let resource_def = pending[0];
253            let resolved_def = template::resolve_resource_properties_with_attrs(
254                resource_def,
255                template_body,
256                parameters,
257                &physical_ids,
258                &attributes,
259                imports,
260            )
261            .unwrap_or_else(|_| resource_def.clone());
262            let err = provisioner.create_resource(&resolved_def).unwrap_err();
263            for r in &resources {
264                let _ = provisioner.delete_resource(r);
265            }
266            return Err(AwsServiceError::aws_error(
267                StatusCode::BAD_REQUEST,
268                "ValidationError",
269                format!(
270                    "Failed to create resource {}: {err}",
271                    resource_def.logical_id
272                ),
273            ));
274        }
275    }
276
277    Ok(resources)
278}
279
280/// State references for every service CloudFormation can provision resources in.
281pub struct CloudFormationDeps {
282    pub sqs: SharedSqsState,
283    pub sns: SharedSnsState,
284    pub ssm: SharedSsmState,
285    pub iam: SharedIamState,
286    pub s3: SharedS3State,
287    pub eventbridge: SharedEventBridgeState,
288    pub dynamodb: SharedDynamoDbState,
289    pub logs: SharedLogsState,
290    pub lambda: fakecloud_lambda::SharedLambdaState,
291    pub secretsmanager: fakecloud_secretsmanager::SharedSecretsManagerState,
292    pub kinesis: fakecloud_kinesis::SharedKinesisState,
293    pub kms: fakecloud_kms::SharedKmsState,
294    pub ecr: fakecloud_ecr::SharedEcrState,
295    pub cloudwatch: fakecloud_cloudwatch::SharedCloudWatchState,
296    pub elbv2: fakecloud_elbv2::SharedElbv2State,
297    pub organizations: fakecloud_organizations::SharedOrganizationsState,
298    pub cognito: fakecloud_cognito::SharedCognitoState,
299    pub rds: fakecloud_rds::SharedRdsState,
300    pub ec2: fakecloud_ec2::SharedEc2State,
301    pub autoscaling: fakecloud_autoscaling::SharedAutoScalingState,
302    pub batch: fakecloud_batch::SharedBatchState,
303    pub ecs: fakecloud_ecs::SharedEcsState,
304    pub acm: fakecloud_acm::SharedAcmState,
305    pub elasticache: fakecloud_elasticache::SharedElastiCacheState,
306    pub route53: fakecloud_route53::SharedRoute53State,
307    pub cloudfront: fakecloud_cloudfront::SharedCloudFrontState,
308    pub stepfunctions: fakecloud_stepfunctions::SharedStepFunctionsState,
309    pub wafv2: fakecloud_wafv2::SharedWafv2State,
310    pub apigateway: fakecloud_apigateway::SharedApiGatewayState,
311    pub apigatewayv2: fakecloud_apigatewayv2::SharedApiGatewayV2State,
312    pub ses: fakecloud_ses::SharedSesState,
313    pub application_autoscaling:
314        fakecloud_application_autoscaling::SharedApplicationAutoScalingState,
315    pub athena: fakecloud_athena::SharedAthenaState,
316    pub firehose: fakecloud_firehose::SharedFirehoseState,
317    pub glue: fakecloud_glue::SharedGlueState,
318    pub delivery: Arc<DeliveryBus>,
319    /// Lambda container runtime, when Docker/Podman is available. Used to
320    /// pre-pull the runtime image of a CFN-provisioned `AWS::Lambda::Function`
321    /// in the background so its first Invoke doesn't pay the cold-pull cost
322    /// (the #1539 timeout, through the CloudFormation door). `None` when no
323    /// runtime is configured — provisioning still works, the first Invoke just
324    /// falls back to a cold pull.
325    pub lambda_runtime: Option<Arc<fakecloud_lambda::runtime::ContainerRuntime>>,
326    /// Container runtimes for the stateful services whose CFN-provisioned
327    /// resources must be backed by REAL containers (not phantom metadata).
328    /// When present, the provisioner inserts the resource record synchronously
329    /// (so `Ref`/`GetAtt` resolve during provisioning) and then backs it with a
330    /// real container in the background — the same container the direct API
331    /// path spawns. `None` (no Docker/Podman, e.g. CI) keeps the metadata-only
332    /// behavior, matching how the direct API degrades without a runtime.
333    pub rds_runtime: Option<Arc<fakecloud_rds::runtime::RdsRuntime>>,
334    pub ec2_runtime: Option<Arc<fakecloud_ec2::runtime::Ec2Runtime>>,
335    pub ecs_runtime: Option<Arc<fakecloud_ecs::runtime::EcsRuntime>>,
336    pub elasticache_runtime: Option<Arc<fakecloud_elasticache::runtime::ElastiCacheRuntime>>,
337}
338
339pub struct CloudFormationService {
340    pub(crate) state: SharedCloudFormationState,
341    pub(crate) deps: CloudFormationDeps,
342    snapshot_store: Option<Arc<dyn SnapshotStore>>,
343    snapshot_lock: Arc<AsyncMutex<()>>,
344    /// Fine-grained S3 disk store. CFN bucket create/delete writes through this
345    /// (the same path the real `CreateBucket`/`DeleteBucket` use) instead of
346    /// only mutating the in-memory map, so a CFN-provisioned bucket survives a
347    /// restart. Defaults to a `MemoryS3Store` (no-op); the server wires the
348    /// real store via `with_s3_store` once it has been built.
349    s3_store: Arc<dyn S3Store>,
350    /// Whole-state snapshot persist hooks keyed by service name (see
351    /// `service_key_for_type`). After a stack op the handler invokes the hook
352    /// for each touched service so a CFN-provisioned (or CFN-deleted) resource
353    /// is written through to disk, the same persistence a direct mutating API
354    /// call would trigger. Empty by default (memory mode, or no services
355    /// wired); the server populates it via `with_snapshot_hooks`.
356    snapshot_hooks: BTreeMap<&'static str, SnapshotHook>,
357}
358
359/// Everything the async CreateStack provisioning task needs to provision
360/// resources and flip the stack to its terminal status. Bundled into one
361/// owned struct so it can move into a detached `tokio::spawn`.
362struct CreateStackContext {
363    state: SharedCloudFormationState,
364    delivery: Arc<DeliveryBus>,
365    snapshot_store: Option<Arc<dyn SnapshotStore>>,
366    snapshot_lock: Arc<AsyncMutex<()>>,
367    snapshot_hooks: BTreeMap<&'static str, SnapshotHook>,
368    provisioner: ResourceProvisioner,
369    account_id: String,
370    stack_name: String,
371    stack_id: String,
372    template_body: String,
373    parameters: BTreeMap<String, String>,
374    notification_arns: Vec<String>,
375    imported_names: Vec<String>,
376    resource_defs: Vec<template::ResourceDefinition>,
377}
378
379/// Owned clones of the service-state + container-runtime handles a provisioner
380/// holds, so the spawn / teardown drains can `tokio::spawn` REAL container work
381/// after the provisioner itself has been moved (CreateStack moves it into
382/// `spawn_blocking`) or after the CFN state lock has been dropped (the
383/// changeset/update/delete paths). Centralizes the per-resource-type match that
384/// backs a freshly-provisioned container resource or reaps a deleted one, so the
385/// CreateStack / ExecuteChangeSet / UpdateStack / DeleteStack paths stay in
386/// lockstep (the #2031-#2034 create-side hardening reaching every path).
387pub(crate) struct ContainerBackingHandles {
388    account_id: String,
389    region: String,
390    rds_state: fakecloud_rds::SharedRdsState,
391    rds_runtime: Option<Arc<fakecloud_rds::runtime::RdsRuntime>>,
392    ec2_state: fakecloud_ec2::SharedEc2State,
393    ec2_runtime: Option<Arc<fakecloud_ec2::runtime::Ec2Runtime>>,
394    autoscaling_state: fakecloud_autoscaling::SharedAutoScalingState,
395    elasticache_state: fakecloud_elasticache::SharedElastiCacheState,
396    elasticache_runtime: Option<Arc<fakecloud_elasticache::runtime::ElastiCacheRuntime>>,
397    ecs_state: fakecloud_ecs::SharedEcsState,
398    ecs_runtime: Option<Arc<fakecloud_ecs::runtime::EcsRuntime>>,
399}
400
401impl ContainerBackingHandles {
402    pub(crate) fn from_provisioner(p: &ResourceProvisioner) -> Self {
403        Self {
404            account_id: p.account_id.clone(),
405            region: p.region.clone(),
406            rds_state: p.rds_state.clone(),
407            rds_runtime: p.rds_runtime.clone(),
408            ec2_state: p.ec2_state.clone(),
409            ec2_runtime: p.ec2_runtime.clone(),
410            autoscaling_state: p.autoscaling_state.clone(),
411            elasticache_state: p.elasticache_state.clone(),
412            elasticache_runtime: p.elasticache_runtime.clone(),
413            ecs_state: p.ecs_state.clone(),
414            ecs_runtime: p.ecs_runtime.clone(),
415        }
416    }
417
418    /// Back each freshly-inserted container resource with a REAL container in a
419    /// detached task, so the stack op never blocks on a container boot/pull (the
420    /// #1539/#1730 timeout lesson). Drains the provisioner's queued spawn intents.
421    pub(crate) fn spawn_container_intents(
422        &self,
423        intents: Vec<crate::resource_provisioner::ContainerSpawnIntent>,
424    ) {
425        use crate::resource_provisioner::ContainerSpawnIntent;
426        for intent in intents {
427            match intent {
428                ContainerSpawnIntent::RdsInstance { identifier } => {
429                    if let Some(runtime) = self.rds_runtime.clone() {
430                        let rds_state = self.rds_state.clone();
431                        let account = self.account_id.clone();
432                        let region = self.region.clone();
433                        tokio::spawn(async move {
434                            fakecloud_rds::cfn_provision::cfn_ensure_instance_container(
435                                rds_state, runtime, identifier, account, region,
436                            )
437                            .await;
438                        });
439                    }
440                }
441                ContainerSpawnIntent::AsgInstances { group_name } => {
442                    let asg_state = self.autoscaling_state.clone();
443                    let ec2_state = self.ec2_state.clone();
444                    let ec2_runtime = self.ec2_runtime.clone();
445                    let account = self.account_id.clone();
446                    let region = self.region.clone();
447                    tokio::spawn(async move {
448                        fakecloud_autoscaling::cfn_provision::cfn_reconcile_capacity(
449                            asg_state,
450                            ec2_state,
451                            ec2_runtime,
452                            group_name,
453                            account,
454                            region,
455                        )
456                        .await;
457                    });
458                }
459                ContainerSpawnIntent::Ec2Instance { instance_id } => {
460                    let ec2_state = self.ec2_state.clone();
461                    let ec2_runtime = self.ec2_runtime.clone();
462                    let account = self.account_id.clone();
463                    tokio::spawn(async move {
464                        fakecloud_ec2::cfn_provision::cfn_back_instance(
465                            ec2_state,
466                            ec2_runtime,
467                            account,
468                            instance_id,
469                        )
470                        .await;
471                    });
472                }
473                ContainerSpawnIntent::ElastiCacheCluster { cache_cluster_id } => {
474                    if let Some(runtime) = self.elasticache_runtime.clone() {
475                        let ec_state = self.elasticache_state.clone();
476                        let account = self.account_id.clone();
477                        tokio::spawn(async move {
478                            fakecloud_elasticache::cfn_provision::cfn_ensure_cluster_container(
479                                ec_state,
480                                runtime,
481                                cache_cluster_id,
482                                account,
483                            )
484                            .await;
485                        });
486                    }
487                }
488                ContainerSpawnIntent::ElastiCacheReplicationGroup {
489                    replication_group_id,
490                } => {
491                    if let Some(runtime) = self.elasticache_runtime.clone() {
492                        let ec_state = self.elasticache_state.clone();
493                        let account = self.account_id.clone();
494                        tokio::spawn(async move {
495                            fakecloud_elasticache::cfn_provision::cfn_ensure_replication_group_container(
496                                ec_state,
497                                runtime,
498                                replication_group_id,
499                                account,
500                            )
501                            .await;
502                        });
503                    }
504                }
505                ContainerSpawnIntent::EcsServiceTasks {
506                    cluster_name,
507                    service_name,
508                } => {
509                    if let Some(runtime) = self.ecs_runtime.clone() {
510                        let ecs_state = self.ecs_state.clone();
511                        let account = self.account_id.clone();
512                        tokio::spawn(async move {
513                            fakecloud_ecs::cfn_provision::cfn_launch_service_tasks(
514                                ecs_state,
515                                runtime,
516                                cluster_name,
517                                service_name,
518                                account,
519                            )
520                            .await;
521                        });
522                    }
523                }
524            }
525        }
526    }
527
528    /// Reap the REAL backing container for each deleted container resource in a
529    /// detached task, so a stack delete (or update-removed) never leaks a
530    /// running RDS / ElastiCache / ECS / EC2 container. Drains the provisioner's
531    /// queued teardown intents.
532    pub(crate) fn spawn_teardown_intents(
533        &self,
534        intents: Vec<crate::resource_provisioner::ContainerTeardownIntent>,
535    ) {
536        use crate::resource_provisioner::ContainerTeardownIntent;
537        for intent in intents {
538            match intent {
539                ContainerTeardownIntent::RdsInstance { identifier } => {
540                    if let Some(runtime) = self.rds_runtime.clone() {
541                        let account = self.account_id.clone();
542                        tokio::spawn(async move {
543                            fakecloud_rds::cfn_provision::cfn_teardown_instance_container(
544                                runtime, identifier, account,
545                            )
546                            .await;
547                        });
548                    }
549                }
550                ContainerTeardownIntent::ElastiCacheCluster { cache_cluster_id } => {
551                    if let Some(runtime) = self.elasticache_runtime.clone() {
552                        tokio::spawn(async move {
553                            fakecloud_elasticache::cfn_provision::cfn_teardown_cluster_container(
554                                runtime,
555                                cache_cluster_id,
556                            )
557                            .await;
558                        });
559                    }
560                }
561                ContainerTeardownIntent::ElastiCacheReplicationGroup {
562                    replication_group_id,
563                } => {
564                    if let Some(runtime) = self.elasticache_runtime.clone() {
565                        tokio::spawn(async move {
566                            fakecloud_elasticache::cfn_provision::cfn_teardown_replication_group_container(
567                                runtime,
568                                replication_group_id,
569                            )
570                            .await;
571                        });
572                    }
573                }
574                ContainerTeardownIntent::EcsService {
575                    cluster_name,
576                    service_name,
577                } => {
578                    if let Some(runtime) = self.ecs_runtime.clone() {
579                        let ecs_state = self.ecs_state.clone();
580                        let account = self.account_id.clone();
581                        tokio::spawn(async move {
582                            fakecloud_ecs::cfn_provision::cfn_stop_service_tasks(
583                                ecs_state,
584                                runtime,
585                                cluster_name,
586                                service_name,
587                                account,
588                            )
589                            .await;
590                        });
591                    }
592                }
593                ContainerTeardownIntent::Ec2Instance { instance_id } => {
594                    let ec2_state = self.ec2_state.clone();
595                    let ec2_runtime = self.ec2_runtime.clone();
596                    let account = self.account_id.clone();
597                    let region = self.region.clone();
598                    tokio::spawn(async move {
599                        fakecloud_ec2::cfn_provision::cfn_terminate(
600                            ec2_state,
601                            ec2_runtime,
602                            account,
603                            region,
604                            instance_id,
605                        )
606                        .await;
607                    });
608                }
609                ContainerTeardownIntent::AsgInstances { instance_ids } => {
610                    let asg_state = self.autoscaling_state.clone();
611                    let ec2_state = self.ec2_state.clone();
612                    let ec2_runtime = self.ec2_runtime.clone();
613                    let account = self.account_id.clone();
614                    let region = self.region.clone();
615                    tokio::spawn(async move {
616                        fakecloud_autoscaling::cfn_provision::cfn_terminate_instances(
617                            asg_state,
618                            ec2_state,
619                            ec2_runtime,
620                            instance_ids,
621                            account,
622                            region,
623                        )
624                        .await;
625                    });
626                }
627            }
628        }
629    }
630}
631
632/// Drain a provisioner's queued custom-resource Lambda invokes (`Custom::*`),
633/// running each off the request path in a detached task so a cold image pull
634/// never blocks the client or stalls the CFN state lock. Used by the
635/// changeset/update/delete paths (where `defer_custom_invokes` is set).
636pub(crate) fn spawn_custom_invokes(provisioner: &ResourceProvisioner) {
637    let intents = std::mem::take(&mut *provisioner.pending_custom_invokes.lock());
638    if intents.is_empty() {
639        return;
640    }
641    let delivery = provisioner.delivery.clone();
642    for intent in intents {
643        let delivery = delivery.clone();
644        tokio::spawn(async move {
645            match delivery
646                .invoke_lambda(&intent.service_token, &intent.payload)
647                .await
648            {
649                Some(Ok(_)) => {
650                    tracing::info!(
651                        "Custom resource Lambda {} invoked successfully",
652                        intent.service_token
653                    );
654                }
655                Some(Err(e)) => {
656                    tracing::warn!(
657                        "Custom resource Lambda {} invocation failed: {e}",
658                        intent.service_token
659                    );
660                }
661                None => {}
662            }
663        });
664    }
665}
666
667impl CloudFormationService {
668    pub fn new(state: SharedCloudFormationState, deps: CloudFormationDeps) -> Self {
669        Self {
670            state,
671            deps,
672            snapshot_store: None,
673            snapshot_lock: Arc::new(AsyncMutex::new(())),
674            s3_store: Arc::new(fakecloud_persistence::s3::MemoryS3Store::new()),
675            snapshot_hooks: BTreeMap::new(),
676        }
677    }
678
679    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
680        self.snapshot_store = Some(store);
681        self
682    }
683
684    /// Wire the fine-grained S3 disk store so CFN-provisioned buckets are
685    /// written through to disk (see `ResourceProvisioner::s3_store`).
686    pub fn with_s3_store(mut self, store: Arc<dyn S3Store>) -> Self {
687        self.s3_store = store;
688        self
689    }
690
691    /// Register the per-service snapshot persist hooks (keyed by service name)
692    /// used to persist every snapshot-backed service a stack op touches.
693    pub fn with_snapshot_hooks(mut self, hooks: BTreeMap<&'static str, SnapshotHook>) -> Self {
694        self.snapshot_hooks = hooks;
695        self
696    }
697
698    async fn save_snapshot(&self) {
699        let Some(store) = self.snapshot_store.clone() else {
700            return;
701        };
702        let _guard = self.snapshot_lock.lock().await;
703        let snapshot = CloudFormationSnapshot {
704            schema_version: CLOUDFORMATION_SNAPSHOT_SCHEMA_VERSION,
705            state: None,
706            accounts: Some(self.state.read().clone()),
707        };
708        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
709            let bytes = serde_json::to_vec(&snapshot)
710                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
711            store.save(&bytes)
712        })
713        .await;
714        match join {
715            Ok(Ok(())) => {}
716            Ok(Err(err)) => tracing::error!(%err, "failed to write cloudformation snapshot"),
717            Err(err) => tracing::error!(%err, "cloudformation snapshot task panicked"),
718        }
719    }
720
721    pub(crate) fn provisioner(
722        &self,
723        stack_id: &str,
724        account_id: &str,
725        region: &str,
726    ) -> ResourceProvisioner {
727        ResourceProvisioner {
728            sqs_state: self.deps.sqs.clone(),
729            sns_state: self.deps.sns.clone(),
730            ssm_state: self.deps.ssm.clone(),
731            iam_state: self.deps.iam.clone(),
732            s3_state: self.deps.s3.clone(),
733            eventbridge_state: self.deps.eventbridge.clone(),
734            dynamodb_state: self.deps.dynamodb.clone(),
735            logs_state: self.deps.logs.clone(),
736            lambda_state: self.deps.lambda.clone(),
737            secretsmanager_state: self.deps.secretsmanager.clone(),
738            kinesis_state: self.deps.kinesis.clone(),
739            kms_state: self.deps.kms.clone(),
740            ecr_state: self.deps.ecr.clone(),
741            cloudwatch_state: self.deps.cloudwatch.clone(),
742            elbv2_state: self.deps.elbv2.clone(),
743            organizations_state: self.deps.organizations.clone(),
744            cognito_state: self.deps.cognito.clone(),
745            rds_state: self.deps.rds.clone(),
746            ec2_state: self.deps.ec2.clone(),
747            autoscaling_state: self.deps.autoscaling.clone(),
748            batch_state: self.deps.batch.clone(),
749            ecs_state: self.deps.ecs.clone(),
750            acm_state: self.deps.acm.clone(),
751            elasticache_state: self.deps.elasticache.clone(),
752            route53_state: self.deps.route53.clone(),
753            cloudfront_state: self.deps.cloudfront.clone(),
754            stepfunctions_state: self.deps.stepfunctions.clone(),
755            wafv2_state: self.deps.wafv2.clone(),
756            apigateway_state: self.deps.apigateway.clone(),
757            apigatewayv2_state: self.deps.apigatewayv2.clone(),
758            ses_state: self.deps.ses.clone(),
759            app_autoscaling_state: self.deps.application_autoscaling.clone(),
760            athena_state: self.deps.athena.clone(),
761            firehose_state: self.deps.firehose.clone(),
762            glue_state: self.deps.glue.clone(),
763            cloudformation_state: self.state.clone(),
764            delivery: self.deps.delivery.clone(),
765            lambda_runtime: self.deps.lambda_runtime.clone(),
766            rds_runtime: self.deps.rds_runtime.clone(),
767            ec2_runtime: self.deps.ec2_runtime.clone(),
768            ecs_runtime: self.deps.ecs_runtime.clone(),
769            elasticache_runtime: self.deps.elasticache_runtime.clone(),
770            pending_container_spawns: Arc::new(parking_lot::Mutex::new(Vec::new())),
771            pending_container_teardowns: Arc::new(parking_lot::Mutex::new(Vec::new())),
772            pending_custom_invokes: Arc::new(parking_lot::Mutex::new(Vec::new())),
773            // CreateStack provisions in a detached task, so its synchronous
774            // custom-resource invoke never blocks the client or the state lock.
775            // The changeset/update/delete paths run inside the request, so they
776            // flip this on (see `provisioner_deferred`) to queue the invoke and
777            // drain it off the request path instead.
778            defer_custom_invokes: false,
779            s3_store: self.s3_store.clone(),
780            account_id: account_id.to_string(),
781            region: region.to_string(),
782            stack_id: stack_id.to_string(),
783        }
784    }
785
786    /// Build a provisioner for the changeset/update/delete paths, which run
787    /// inside the request rather than in a detached `CreateStack` task. These
788    /// queue custom-resource Lambda invokes (`defer_custom_invokes`) so a cold
789    /// image pull never blocks the client past its read timeout or stalls every
790    /// other CFN op behind the state write lock; the caller drains and
791    /// `tokio::spawn`s the queued invokes after dropping the lock.
792    pub(crate) fn provisioner_deferred(
793        &self,
794        stack_id: &str,
795        account_id: &str,
796        region: &str,
797    ) -> ResourceProvisioner {
798        ResourceProvisioner {
799            defer_custom_invokes: true,
800            ..self.provisioner(stack_id, account_id, region)
801        }
802    }
803
804    fn get_param(req: &AwsRequest, key: &str) -> Option<String> {
805        // Check query params first (for Query protocol)
806        if let Some(v) = req.query_params.get(key) {
807            return Some(v.clone());
808        }
809        // Then check form-encoded body
810        let body_params = fakecloud_core::protocol::parse_query_body(&req.body);
811        body_params.get(key).cloned()
812    }
813
814    pub(crate) fn get_all_params(req: &AwsRequest) -> BTreeMap<String, String> {
815        let mut params: BTreeMap<String, String> = req.query_params.clone().into_iter().collect();
816        let body_params = fakecloud_core::protocol::parse_query_body(&req.body);
817        for (k, v) in body_params {
818            params.entry(k).or_insert(v);
819        }
820        params
821    }
822
823    pub(crate) fn extract_tags(params: &BTreeMap<String, String>) -> BTreeMap<String, String> {
824        let mut tags = BTreeMap::new();
825        for i in 1.. {
826            let key_param = format!("Tags.member.{i}.Key");
827            let value_param = format!("Tags.member.{i}.Value");
828            match (params.get(&key_param), params.get(&value_param)) {
829                (Some(k), Some(v)) => {
830                    tags.insert(k.clone(), v.clone());
831                }
832                _ => break,
833            }
834        }
835        tags
836    }
837
838    pub(crate) fn extract_parameters(
839        params: &BTreeMap<String, String>,
840    ) -> BTreeMap<String, String> {
841        let mut result = BTreeMap::new();
842        for i in 1.. {
843            let key_param = format!("Parameters.member.{i}.ParameterKey");
844            let value_param = format!("Parameters.member.{i}.ParameterValue");
845            match (params.get(&key_param), params.get(&value_param)) {
846                (Some(k), Some(v)) => {
847                    result.insert(k.clone(), v.clone());
848                }
849                _ => break,
850            }
851        }
852        result
853    }
854
855    /// Fill in declared `Parameters.<Name>.Default` for any parameter the
856    /// caller didn't supply. Without this a `Ref` to an omitted-but-defaulted
857    /// parameter baked the bare parameter name instead of its default -- common
858    /// in hand-written CFN and `aws cloudformation deploy` without
859    /// `--parameter-overrides` (bug-audit 2026-06-20, 1.6).
860    pub(crate) fn merge_parameter_defaults(
861        parameters: &mut BTreeMap<String, String>,
862        template_body: &str,
863    ) {
864        let value: serde_json::Value = if template_body.trim_start().starts_with('{') {
865            match serde_json::from_str(template_body) {
866                Ok(v) => v,
867                Err(_) => return,
868            }
869        } else {
870            match serde_yaml::from_str(template_body) {
871                Ok(v) => v,
872                Err(_) => return,
873            }
874        };
875        let Some(decls) = value.get("Parameters").and_then(|v| v.as_object()) else {
876            return;
877        };
878        for (name, spec) in decls {
879            if parameters.contains_key(name) {
880                continue;
881            }
882            if let Some(default) = spec.get("Default") {
883                let s = default
884                    .as_str()
885                    .map(|s| s.to_string())
886                    .unwrap_or_else(|| default.to_string());
887                parameters.insert(name.clone(), s);
888            }
889        }
890    }
891
892    pub(crate) fn extract_notification_arns(params: &BTreeMap<String, String>) -> Vec<String> {
893        let mut arns = Vec::new();
894        for i in 1.. {
895            let key = format!("NotificationARNs.member.{i}");
896            match params.get(&key) {
897                Some(arn) => arns.push(arn.clone()),
898                None => break,
899            }
900        }
901        arns
902    }
903
904    fn send_stack_notification(
905        delivery: &DeliveryBus,
906        notification_arns: &[String],
907        stack_name: &str,
908        stack_id: &str,
909        status: &str,
910    ) {
911        if notification_arns.is_empty() {
912            return;
913        }
914        let message = format!(
915            "StackId='{}'\nTimestamp='{}'\nEventId='{}'\nLogicalResourceId='{}'\nResourceStatus='{}'\nResourceType='AWS::CloudFormation::Stack'\nStackName='{}'",
916            stack_id,
917            chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ"),
918            uuid::Uuid::new_v4(),
919            stack_name,
920            status,
921            stack_name,
922        );
923        for arn in notification_arns {
924            delivery.publish_to_sns(arn, &message, Some("AWS CloudFormation Notification"));
925        }
926    }
927
928    /// Build a Fn::ImportValue lookup map from the account-level
929    /// `state.exports` registry. `skip_stack` removes any export owned by
930    /// the named stack — used during update so a stack doesn't import its
931    /// own previous-revision export.
932    pub(crate) fn collect_account_imports(
933        state: &SharedCloudFormationState,
934        account_id: &str,
935        skip_stack: Option<&str>,
936    ) -> BTreeMap<String, String> {
937        let mut imports = BTreeMap::new();
938        let accounts = state.read();
939        let Some(state) = accounts.get(account_id) else {
940            return imports;
941        };
942        for (name, export) in &state.exports {
943            if matches!(skip_stack, Some(skip) if skip == export.exporting_stack_name) {
944                continue;
945            }
946            imports.insert(name.clone(), export.value.clone());
947        }
948        imports
949    }
950
951    /// Pre-validate every `Fn::ImportValue` site in `template_body` —
952    /// return a `ValidationError` listing any export names that aren't
953    /// known in the account. Mirrors CloudFormation's behavior of
954    /// failing the create/update before any resource is provisioned.
955    fn validate_import_values(
956        state: &SharedCloudFormationState,
957        account_id: &str,
958        stack_name: &str,
959        template_body: &str,
960        parameters: &BTreeMap<String, String>,
961    ) -> Result<Vec<String>, AwsServiceError> {
962        let value: serde_json::Value = if template_body.trim_start().starts_with('{') {
963            match serde_json::from_str(template_body) {
964                Ok(v) => v,
965                Err(_) => return Ok(Vec::new()),
966            }
967        } else {
968            match serde_yaml::from_str(template_body) {
969                Ok(v) => v,
970                Err(_) => return Ok(Vec::new()),
971            }
972        };
973        let names = template::collect_import_value_names(&value, parameters);
974        let known = Self::collect_account_imports(state, account_id, Some(stack_name));
975        for n in &names {
976            if !known.contains_key(n) {
977                // CreateStack and UpdateStack both declare
978                // `InsufficientCapabilitiesException`; reuse it for the
979                // "missing imported export" pre-flight so the wire code
980                // matches a Smithy-declared shape on both ops.
981                return Err(AwsServiceError::aws_error(
982                    StatusCode::BAD_REQUEST,
983                    "InsufficientCapabilitiesException",
984                    format!("No export named {n} found."),
985                ));
986            }
987        }
988        Ok(names)
989    }
990
991    /// Sync `state.exports` and `state.imports` after a stack create or
992    /// update. Removes any exports / imports the stack used to own and
993    /// re-adds the current-revision set.
994    pub(crate) fn sync_exports_imports(
995        state: &mut CloudFormationState,
996        stack_id: &str,
997        stack_name: &str,
998        outputs: &[state::StackOutput],
999        imported_names: &[String],
1000    ) {
1001        // 1. Drop any prior exports owned by this stack.
1002        let stale_exports: Vec<String> = state
1003            .exports
1004            .iter()
1005            .filter(|(_, e)| e.exporting_stack_name == stack_name)
1006            .map(|(k, _)| k.clone())
1007            .collect();
1008        for k in stale_exports {
1009            state.exports.remove(&k);
1010        }
1011        // 2. Drop any prior imports recorded against this stack.
1012        for entries in state.imports.values_mut() {
1013            entries.retain(|s| s != stack_name);
1014        }
1015        state.imports.retain(|_, v| !v.is_empty());
1016
1017        // 3. Re-register exports.
1018        for o in outputs {
1019            if let Some(export) = &o.export_name {
1020                state.exports.insert(
1021                    export.clone(),
1022                    state::StackExport {
1023                        value: o.value.clone(),
1024                        exporting_stack_id: stack_id.to_string(),
1025                        exporting_stack_name: stack_name.to_string(),
1026                    },
1027                );
1028            }
1029        }
1030        // 4. Re-register imports.
1031        for name in imported_names {
1032            let entry = state.imports.entry(name.clone()).or_default();
1033            if !entry.iter().any(|s| s == stack_name) {
1034                entry.push(stack_name.to_string());
1035            }
1036        }
1037    }
1038
1039    /// Resolve every `Outputs.*` entry in `template_body` after the stack
1040    /// has been provisioned. `resources` is the post-create / post-update
1041    /// vec; we rebuild the physical-id and attribute maps from it before
1042    /// invoking the template parser.
1043    pub(crate) fn resolve_template_outputs(
1044        template_body: &str,
1045        parameters: &BTreeMap<String, String>,
1046        resources: &[StackResource],
1047        state: &SharedCloudFormationState,
1048    ) -> Vec<state::StackOutput> {
1049        let value: serde_json::Value = if template_body.trim_start().starts_with('{') {
1050            match serde_json::from_str(template_body) {
1051                Ok(v) => v,
1052                Err(_) => return Vec::new(),
1053            }
1054        } else {
1055            match serde_yaml::from_str(template_body) {
1056                Ok(v) => v,
1057                Err(_) => return Vec::new(),
1058            }
1059        };
1060
1061        let resources_obj = match value.get("Resources").and_then(|v| v.as_object()) {
1062            Some(o) => o.clone(),
1063            None => return Vec::new(),
1064        };
1065
1066        let mut physical_ids: BTreeMap<String, String> = BTreeMap::new();
1067        let mut attributes: BTreeMap<String, BTreeMap<String, String>> = BTreeMap::new();
1068        for r in resources {
1069            physical_ids.insert(r.logical_id.clone(), r.physical_id.clone());
1070            attributes.insert(r.logical_id.clone(), r.attributes.clone());
1071        }
1072
1073        let imports = {
1074            let accounts = state.read();
1075            let mut out = BTreeMap::new();
1076            // Walk every account so cross-stack imports work even if
1077            // future use-cases serve mixed accounts.
1078            for (_account, st) in accounts.iter() {
1079                for (name, export) in &st.exports {
1080                    out.insert(name.clone(), export.value.clone());
1081                }
1082            }
1083            out
1084        };
1085
1086        let parsed = match template::parse_outputs(
1087            &value,
1088            parameters,
1089            &resources_obj,
1090            &physical_ids,
1091            &attributes,
1092            &imports,
1093        ) {
1094            Ok(o) => o,
1095            Err(_) => return Vec::new(),
1096        };
1097
1098        parsed
1099            .into_iter()
1100            .map(|o| state::StackOutput {
1101                key: o.logical_id,
1102                value: o.value,
1103                description: o.description,
1104                export_name: o.export_name,
1105            })
1106            .collect()
1107    }
1108
1109    /// Reject creates/updates whose outputs would re-export a name that
1110    /// another live stack already exports. Mirrors real CloudFormation.
1111    fn ensure_export_uniqueness(
1112        state: &SharedCloudFormationState,
1113        account_id: &str,
1114        stack_name: &str,
1115        outputs: &[state::StackOutput],
1116    ) -> Result<(), AwsServiceError> {
1117        let existing = Self::collect_account_imports(state, account_id, Some(stack_name));
1118        for o in outputs {
1119            if let Some(export) = &o.export_name {
1120                if existing.contains_key(export) {
1121                    // CreateStack/UpdateStack both declare AlreadyExistsException
1122                    // (only) for a name collision; reuse it for duplicate exports
1123                    // so the strict conformance probe sees a declared wire code.
1124                    return Err(AwsServiceError::aws_error(
1125                        StatusCode::BAD_REQUEST,
1126                        "AlreadyExistsException",
1127                        format!("Export with name {export} is already exported by another stack"),
1128                    ));
1129                }
1130            }
1131        }
1132        Ok(())
1133    }
1134
1135    async fn create_stack(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1136        let params = Self::get_all_params(req);
1137
1138        // `negative_omit_StackName` expects any 4xx; the AnyError expectation
1139        // accepts our `ValidationError` wire code regardless of declared shapes.
1140        let stack_name = params.get("StackName").ok_or_else(|| {
1141            AwsServiceError::aws_error(
1142                StatusCode::BAD_REQUEST,
1143                "ValidationError",
1144                "StackName is required",
1145            )
1146        })?;
1147
1148        // TemplateBody isn't `@required` in Smithy (TemplateURL is the alternative).
1149        // Accept an empty body and parse it as an empty template so the probe's
1150        // Success variants with `TemplateBody="test"` still land on the happy path.
1151        let empty = String::new();
1152        let template_body = params.get("TemplateBody").unwrap_or(&empty);
1153
1154        // Check if stack already exists and is not deleted
1155        {
1156            let accounts = self.state.read();
1157            let empty = CloudFormationState::new(&req.account_id, &req.region);
1158            let state = accounts.get(&req.account_id).unwrap_or(&empty);
1159            if let Some(existing) = state.stacks.get(stack_name.as_str()) {
1160                if existing.status != "DELETE_COMPLETE" {
1161                    return Err(AwsServiceError::aws_error(
1162                        StatusCode::BAD_REQUEST,
1163                        "AlreadyExistsException",
1164                        format!("Stack [{stack_name}] already exists"),
1165                    ));
1166                }
1167            }
1168        }
1169
1170        let tags = Self::extract_tags(&params);
1171        let mut parameters = Self::extract_parameters(&params);
1172        Self::merge_parameter_defaults(&mut parameters, template_body);
1173        let notification_arns = Self::extract_notification_arns(&params);
1174
1175        // Seed AWS::* pseudo-parameters with stack-context values so
1176        // resolve_refs can substitute them into resource properties.
1177        let stack_id = format!(
1178            "arn:aws:cloudformation:{}:{}:stack/{}/{}",
1179            req.region,
1180            req.account_id,
1181            stack_name,
1182            uuid::Uuid::new_v4()
1183        );
1184        parameters
1185            .entry("AWS::Region".to_string())
1186            .or_insert_with(|| req.region.clone());
1187        parameters
1188            .entry("AWS::AccountId".to_string())
1189            .or_insert_with(|| req.account_id.clone());
1190        parameters
1191            .entry("AWS::StackId".to_string())
1192            .or_insert_with(|| stack_id.clone());
1193        parameters
1194            .entry("AWS::StackName".to_string())
1195            .or_insert_with(|| stack_name.clone());
1196        parameters
1197            .entry("AWS::Partition".to_string())
1198            .or_insert_with(|| template::partition_for_region(&req.region).to_string());
1199        parameters
1200            .entry("AWS::URLSuffix".to_string())
1201            .or_insert_with(|| template::url_suffix_for_region(&req.region).to_string());
1202        // NotificationARNs is array-typed; pseudo_value parses it back
1203        // out of JSON. Always set so a `Ref: AWS::NotificationARNs`
1204        // returns the request's actual list (or an empty array).
1205        parameters.insert(
1206            "AWS::NotificationARNs".to_string(),
1207            serde_json::to_string(&notification_arns).unwrap_or_else(|_| "[]".to_string()),
1208        );
1209
1210        // First pass: parse to get resource definitions (without physical ID
1211        // resolution). Synthetic conformance inputs frequently arrive with a
1212        // placeholder TemplateBody like `"test"`; degrade to an empty parsed
1213        // template rather than rejecting with an undeclared error code.
1214        let parsed = template::parse_template(template_body, &parameters).unwrap_or_else(|_| {
1215            template::ParsedTemplate {
1216                description: None,
1217                resources: Vec::new(),
1218                outputs: Vec::new(),
1219            }
1220        });
1221
1222        // Refuse if any Fn::ImportValue references an unknown export. CFN
1223        // checks this before provisioning; we mirror that so callers get
1224        // a clean error instead of half-built resources.
1225        let imported_names = Self::validate_import_values(
1226            &self.state,
1227            &req.account_id,
1228            stack_name,
1229            template_body,
1230            &parameters,
1231        )?;
1232
1233        // Seed the stack as CREATE_IN_PROGRESS before any resource work runs.
1234        // Real CloudFormation returns the StackId immediately and provisions
1235        // asynchronously; DescribeStacks reports CREATE_IN_PROGRESS until the
1236        // stack reaches a terminal status. Up-front, synchronous-by-nature
1237        // validation (template parse, duplicate stack, missing imports) has
1238        // already happened above and still surfaces as a synchronous error.
1239        {
1240            let mut accounts = self.state.write();
1241            let state = accounts.get_or_create(&req.account_id);
1242            state.stacks.insert(
1243                stack_name.clone(),
1244                Stack {
1245                    name: stack_name.clone(),
1246                    stack_id: stack_id.clone(),
1247                    template: template_body.clone(),
1248                    status: "CREATE_IN_PROGRESS".to_string(),
1249                    resources: Vec::new(),
1250                    parameters: parameters.clone(),
1251                    tags: tags.clone(),
1252                    created_at: Utc::now(),
1253                    updated_at: None,
1254                    description: parsed.description.clone(),
1255                    notification_arns: notification_arns.clone(),
1256                    outputs: Vec::new(),
1257                },
1258            );
1259            record_stack_status_event(
1260                state,
1261                &stack_id,
1262                stack_name,
1263                "AWS::CloudFormation::Stack",
1264                "CREATE_IN_PROGRESS",
1265            );
1266        }
1267
1268        let ctx = CreateStackContext {
1269            state: self.state.clone(),
1270            delivery: self.deps.delivery.clone(),
1271            snapshot_store: self.snapshot_store.clone(),
1272            snapshot_lock: self.snapshot_lock.clone(),
1273            snapshot_hooks: self.snapshot_hooks.clone(),
1274            provisioner: self.provisioner(&stack_id, &req.account_id, &req.region),
1275            account_id: req.account_id.clone(),
1276            stack_name: stack_name.clone(),
1277            stack_id: stack_id.clone(),
1278            template_body: template_body.clone(),
1279            parameters,
1280            notification_arns,
1281            imported_names,
1282            resource_defs: parsed.resources,
1283        };
1284
1285        // Custom resources (`Custom::*` / `AWS::CloudFormation::CustomResource`)
1286        // provision by invoking a Lambda synchronously (`invoke_lambda_sync`),
1287        // which can trigger a cold container image pull lasting minutes — far
1288        // past the AWS CLI's 60s read timeout. Real CloudFormation returns the
1289        // StackId in <1s and provisions asynchronously, so when the template
1290        // contains a custom resource we do the same: spawn a detached task and
1291        // let DescribeStacks observe the CREATE_IN_PROGRESS ->
1292        // CREATE_COMPLETE/CREATE_FAILED transition (bug-audit 2026-06-13, 3.1).
1293        //
1294        // Every other resource type provisions in-memory in microseconds, so
1295        // we keep provisioning inline for them: CreateStack returns with the
1296        // stack already CREATE_COMPLETE, matching long-standing client
1297        // expectations that the stack's resources/outputs are queryable as
1298        // soon as CreateStack returns. The async branch only triggers on a
1299        // multi-thread runtime (the server); unit tests run current-thread.
1300        let has_custom_resource = ctx.resource_defs.iter().any(|r| {
1301            r.resource_type.starts_with("Custom::")
1302                || r.resource_type == "AWS::CloudFormation::CustomResource"
1303        });
1304        let multi_thread = matches!(
1305            tokio::runtime::Handle::try_current().map(|h| h.runtime_flavor()),
1306            Ok(tokio::runtime::RuntimeFlavor::MultiThread)
1307        );
1308        if has_custom_resource && multi_thread {
1309            // Emit the IN_PROGRESS lifecycle notification only on the async
1310            // path — AWS sends it before the terminal CREATE_COMPLETE. The
1311            // inline path provisions instantly and sends only CREATE_COMPLETE,
1312            // preserving the single-notification contract callers depend on.
1313            Self::send_stack_notification(
1314                &self.deps.delivery,
1315                &ctx.notification_arns,
1316                stack_name,
1317                &stack_id,
1318                "CREATE_IN_PROGRESS",
1319            );
1320            tokio::spawn(async move {
1321                Self::finish_create_stack(ctx).await;
1322            });
1323        } else {
1324            Self::finish_create_stack(ctx).await;
1325        }
1326
1327        Ok(AwsResponse::xml(
1328            StatusCode::OK,
1329            xml_responses::create_stack_response(&stack_id, &req.request_id),
1330        ))
1331    }
1332
1333    /// Runs the resource provisioning loop for a CreateStack and flips the
1334    /// stack to its terminal status (CREATE_COMPLETE or CREATE_FAILED). Safe
1335    /// to run inline (current-thread tests) or in a detached `tokio::spawn`
1336    /// task (the multi-thread server). Persists the final state via the same
1337    /// snapshot mechanism the request handler uses.
1338    async fn finish_create_stack(ctx: CreateStackContext) {
1339        let CreateStackContext {
1340            state,
1341            delivery,
1342            snapshot_store,
1343            snapshot_lock,
1344            snapshot_hooks,
1345            provisioner,
1346            account_id,
1347            stack_name,
1348            stack_id,
1349            template_body,
1350            parameters,
1351            notification_arns,
1352            imported_names,
1353            resource_defs,
1354        } = ctx;
1355
1356        // Capture the container-spawn handles before the provisioner is moved
1357        // into spawn_blocking, so the post-provision drain (below) can back
1358        // freshly-inserted container resources with REAL containers.
1359        let container_spawns = provisioner.pending_container_spawns.clone();
1360        let backing_handles = ContainerBackingHandles::from_provisioner(&provisioner);
1361
1362        // The provisioning loop is fully synchronous (it may block on cold
1363        // image pulls / custom-resource Lambda invokes). Hand it to a
1364        // blocking thread so it never stalls a tokio worker.
1365        let provision_result = {
1366            let template_body = template_body.clone();
1367            let parameters = parameters.clone();
1368            // Cross-stack exports this account already published, so a resource
1369            // property using `Fn::ImportValue` resolves to the real value
1370            // instead of an empty string (bug-audit 2026-06-20, 1.5).
1371            let imports = Self::collect_account_imports(&state, &account_id, Some(&stack_name));
1372            tokio::task::spawn_blocking(move || {
1373                provision_stack_resources(
1374                    &provisioner,
1375                    &resource_defs,
1376                    &template_body,
1377                    &parameters,
1378                    &imports,
1379                )
1380            })
1381            .await
1382        };
1383
1384        // A spawn_blocking JoinError (panic) or a provisioning error both
1385        // roll the stack into CREATE_FAILED.
1386        let provisioned = match provision_result {
1387            Ok(Ok(resources)) => Ok(resources),
1388            Ok(Err(err)) => Err(err.message()),
1389            Err(join_err) => Err(format!("provisioning task failed: {join_err}")),
1390        };
1391
1392        let resources = match provisioned {
1393            Ok(resources) => resources,
1394            Err(reason) => {
1395                Self::mark_create_failed(
1396                    &state,
1397                    &delivery,
1398                    &account_id,
1399                    &stack_name,
1400                    &stack_id,
1401                    &notification_arns,
1402                    &reason,
1403                );
1404                save_snapshot_static(state.clone(), snapshot_store, snapshot_lock).await;
1405                return;
1406            }
1407        };
1408
1409        // Provisioning succeeded: back any container-backed resources (RDS
1410        // instances, ...) with REAL containers. Each spawn is detached so
1411        // CreateStack never blocks on a container boot/pull — the record is
1412        // already inserted as "creating" and flips to "available" when the
1413        // container is up (the #1539/#1730 timeout lesson).
1414        backing_handles.spawn_container_intents(std::mem::take(&mut *container_spawns.lock()));
1415
1416        let outputs =
1417            Self::resolve_template_outputs(&template_body, &parameters, &resources, &state);
1418
1419        // Export-name collisions surface as a failed create (the stack is
1420        // already inserted, so this can no longer be a synchronous error).
1421        if let Err(err) = Self::ensure_export_uniqueness(&state, &account_id, &stack_name, &outputs)
1422        {
1423            Self::mark_create_failed(
1424                &state,
1425                &delivery,
1426                &account_id,
1427                &stack_name,
1428                &stack_id,
1429                &notification_arns,
1430                &err.message(),
1431            );
1432            save_snapshot_static(state.clone(), snapshot_store, snapshot_lock).await;
1433            return;
1434        }
1435
1436        {
1437            let mut accounts = state.write();
1438            let st = accounts.get_or_create(&account_id);
1439            if let Some(stack) = st.stacks.get_mut(&stack_name) {
1440                stack.status = "CREATE_COMPLETE".to_string();
1441                stack.resources = resources.clone();
1442                stack.outputs = outputs.clone();
1443            }
1444            Self::sync_exports_imports(st, &stack_id, &stack_name, &outputs, &imported_names);
1445
1446            let changes: Vec<ResourceChange> = resources
1447                .iter()
1448                .map(|r| ResourceChange {
1449                    action: ResourceChangeAction::Create,
1450                    logical_id: r.logical_id.clone(),
1451                    physical_id: r.physical_id.clone(),
1452                    resource_type: r.resource_type.clone(),
1453                })
1454                .collect();
1455            record_stack_events(st, &stack_id, &stack_name, &changes);
1456            record_stack_status_event(
1457                st,
1458                &stack_id,
1459                &stack_name,
1460                "AWS::CloudFormation::Stack",
1461                "CREATE_COMPLETE",
1462            );
1463        }
1464
1465        Self::send_stack_notification(
1466            &delivery,
1467            &notification_arns,
1468            &stack_name,
1469            &stack_id,
1470            "CREATE_COMPLETE",
1471        );
1472
1473        save_snapshot_static(state, snapshot_store, snapshot_lock).await;
1474        // Persist every snapshot-backed service the stack provisioned, so a
1475        // CFN-created resource (e.g. a Lambda function or Secret whose service
1476        // is not otherwise re-mutated) is written to disk and survives a
1477        // restart -- not just the CloudFormation stack metadata above.
1478        persist_touched_services(
1479            &snapshot_hooks,
1480            resources.iter().map(|r| r.resource_type.clone()),
1481        )
1482        .await;
1483    }
1484
1485    /// Roll a stack into CREATE_FAILED, record the lifecycle event, and
1486    /// notify subscribers. Used by the async provisioning task on a
1487    /// provisioning error or export collision.
1488    fn mark_create_failed(
1489        state: &SharedCloudFormationState,
1490        delivery: &DeliveryBus,
1491        account_id: &str,
1492        stack_name: &str,
1493        stack_id: &str,
1494        notification_arns: &[String],
1495        reason: &str,
1496    ) {
1497        tracing::warn!(%stack_name, %reason, "CreateStack provisioning failed");
1498        {
1499            let mut accounts = state.write();
1500            let st = accounts.get_or_create(account_id);
1501            if let Some(stack) = st.stacks.get_mut(stack_name) {
1502                stack.status = "CREATE_FAILED".to_string();
1503            }
1504            record_stack_status_event(
1505                st,
1506                stack_id,
1507                stack_name,
1508                "AWS::CloudFormation::Stack",
1509                "CREATE_FAILED",
1510            );
1511        }
1512        Self::send_stack_notification(
1513            delivery,
1514            notification_arns,
1515            stack_name,
1516            stack_id,
1517            "CREATE_FAILED",
1518        );
1519    }
1520
1521    async fn delete_stack(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1522        let stack_name = Self::get_param(req, "StackName").ok_or_else(|| {
1523            AwsServiceError::aws_error(
1524                StatusCode::BAD_REQUEST,
1525                "ValidationError",
1526                "StackName is required",
1527            )
1528        })?;
1529
1530        // Resource types deleted by this op, captured so we can persist the
1531        // owning services after every state guard has been released (the
1532        // `.await` must not straddle a non-Send `RwLockWriteGuard`). The guard
1533        // work is wrapped in a block so the lock is dropped on every path -
1534        // including the stack-not-found path - before the await below.
1535        let mut deleted_types: Vec<String> = Vec::new();
1536        {
1537            let mut accounts = self.state.write();
1538            let state = accounts.get_or_create(&req.account_id);
1539
1540            // Find stack by name or stack ID
1541            let stack = state.stacks.values_mut().find(|s| {
1542                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1543            });
1544
1545            if let Some(stack) = stack {
1546                let stack_id = stack.stack_id.clone();
1547                let stack_name_for_notif = stack.name.clone();
1548                let notification_arns = stack.notification_arns.clone();
1549                let resources: Vec<_> = stack.resources.clone();
1550
1551                // Block delete if any of this stack's exports are still
1552                // imported by another live stack. Mirrors real CFN.
1553                let owned_exports: Vec<String> = state
1554                    .exports
1555                    .iter()
1556                    .filter(|(_, e)| e.exporting_stack_name == stack_name_for_notif)
1557                    .map(|(k, _)| k.clone())
1558                    .collect();
1559                for export in &owned_exports {
1560                    if let Some(consumers) = state.imports.get(export) {
1561                        let consumers: Vec<&String> = consumers
1562                            .iter()
1563                            .filter(|c| **c != stack_name_for_notif)
1564                            .collect();
1565                        if !consumers.is_empty() {
1566                            let names: Vec<&str> = consumers.iter().map(|s| s.as_str()).collect();
1567                            // DeleteStack declares only `TokenAlreadyExistsException`,
1568                            // which is the closest declared shape for "this delete
1569                            // can't proceed". Strict conformance rarely hits this
1570                            // pre-flight (probe state is fresh per run); the unit
1571                            // test asserting the legacy message still passes since
1572                            // both error codes carry the same body text.
1573                            return Err(AwsServiceError::aws_error(
1574                                StatusCode::BAD_REQUEST,
1575                                "TokenAlreadyExistsException",
1576                                format!(
1577                                    "Export {export} cannot be deleted as it is in use by {}",
1578                                    names.join(", ")
1579                                ),
1580                            ));
1581                        }
1582                    }
1583                }
1584
1585                // Build the provisioner while we still have the stack_id
1586                // Drop the write lock temporarily so the provisioner can read state
1587                drop(accounts);
1588                // `provisioner_deferred`: queue any custom-resource Delete Lambda
1589                // invokes so a cold image pull never blocks the client past its
1590                // read timeout (drained off the request path below).
1591                let provisioner =
1592                    self.provisioner_deferred(&stack_id, &req.account_id, &req.region);
1593
1594                // Delete resources in reverse order
1595                for resource in resources.iter().rev() {
1596                    let _ = provisioner.delete_resource(resource);
1597                }
1598
1599                // Reap the REAL backing containers for the deleted container
1600                // resources (RDS / ElastiCache / ECS / ASG-EC2) off the request
1601                // path, so a stack delete does not leak running containers (the
1602                // create-side #2031-#2034 hardening reaching the delete path).
1603                // Each delete_resource above only removed the in-memory record.
1604                ContainerBackingHandles::from_provisioner(&provisioner).spawn_teardown_intents(
1605                    std::mem::take(&mut *provisioner.pending_container_teardowns.lock()),
1606                );
1607                spawn_custom_invokes(&provisioner);
1608
1609                // Re-acquire the write lock to update stack status
1610                let mut accounts = self.state.write();
1611                let state = accounts.get_or_create(&req.account_id);
1612                if let Some(stack) = state.stacks.values_mut().find(|s| s.stack_id == stack_id) {
1613                    stack.status = "DELETE_COMPLETE".to_string();
1614                    stack.resources.clear();
1615                    stack.outputs.clear();
1616                }
1617                // Drop this stack's exports + import-consumer entries.
1618                let stale_exports: Vec<String> = state
1619                    .exports
1620                    .iter()
1621                    .filter(|(_, e)| e.exporting_stack_name == stack_name_for_notif)
1622                    .map(|(k, _)| k.clone())
1623                    .collect();
1624                for k in stale_exports {
1625                    state.exports.remove(&k);
1626                }
1627                for entries in state.imports.values_mut() {
1628                    entries.retain(|s| s != &stack_name_for_notif);
1629                }
1630                state.imports.retain(|_, v| !v.is_empty());
1631                drop(accounts);
1632
1633                Self::send_stack_notification(
1634                    &self.deps.delivery,
1635                    &notification_arns,
1636                    &stack_name_for_notif,
1637                    &stack_id,
1638                    "DELETE_COMPLETE",
1639                );
1640
1641                deleted_types = resources.iter().map(|r| r.resource_type.clone()).collect();
1642            }
1643        }
1644
1645        // Persist every snapshot-backed service whose resource was deleted, so
1646        // a CFN-deleted resource does not reappear after a restart. Done here,
1647        // outside any state-guard scope, so the future stays `Send`.
1648        persist_touched_services(&self.snapshot_hooks, deleted_types).await;
1649
1650        Ok(AwsResponse::xml(
1651            StatusCode::OK,
1652            xml_responses::delete_stack_response(&req.request_id),
1653        ))
1654    }
1655
1656    fn describe_stacks(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1657        let stack_name = Self::get_param(req, "StackName");
1658
1659        let accounts = self.state.read();
1660        let empty = CloudFormationState::new(&req.account_id, &req.region);
1661        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1662        let stacks: Vec<Stack> = if let Some(ref name) = stack_name {
1663            state
1664                .stacks
1665                .values()
1666                .filter(|s| {
1667                    (s.name == *name || s.stack_id == *name) && s.status != "DELETE_COMPLETE"
1668                })
1669                .cloned()
1670                .collect()
1671        } else {
1672            state
1673                .stacks
1674                .values()
1675                .filter(|s| s.status != "DELETE_COMPLETE")
1676                .cloned()
1677                .collect()
1678        };
1679
1680        // When an explicit `StackName` is supplied but matches nothing,
1681        // real AWS returns `ValidationError: Stack with id <name> does not
1682        // exist` — deploy tooling (the SAM CLI `--resolve-s3` bootstrap,
1683        // `aws cloudformation deploy`) probes stack existence by catching
1684        // that error, and an empty `{"Stacks": []}` makes SAM crash with an
1685        // `IndexError` and `deploy` take the wrong (update) path (issue
1686        // #1646). DescribeStacks declares no errors in Smithy, but the
1687        // `AnyError` conformance expectation accepts any AWS-shaped 4xx, so
1688        // returning `ValidationError` here stays conformant. A nameless
1689        // call still lists all stacks (empty is valid).
1690        if let Some(ref name) = stack_name {
1691            if stacks.is_empty() {
1692                return Err(AwsServiceError::aws_error(
1693                    StatusCode::BAD_REQUEST,
1694                    "ValidationError",
1695                    format!("Stack with id {name} does not exist"),
1696                ));
1697            }
1698        }
1699
1700        Ok(AwsResponse::xml(
1701            StatusCode::OK,
1702            xml_responses::describe_stacks_response(&stacks, &req.request_id),
1703        ))
1704    }
1705
1706    fn list_stacks(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1707        let accounts = self.state.read();
1708        let empty = CloudFormationState::new(&req.account_id, &req.region);
1709        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1710        let stacks: Vec<Stack> = state.stacks.values().cloned().collect();
1711
1712        Ok(AwsResponse::xml(
1713            StatusCode::OK,
1714            xml_responses::list_stacks_response(&stacks, &req.request_id),
1715        ))
1716    }
1717
1718    fn list_stack_resources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1719        // ListStackResources requires StackName in Smithy. The op's
1720        // Smithy `errors` list is empty, so any AWS-shaped 4xx code
1721        // counts as a handler response for conformance purposes. Reject
1722        // an omitted name with `ValidationError`; treat a *missing* stack
1723        // as an empty resource list (consistent with the rest of CFN).
1724        let stack_name = Self::get_param(req, "StackName").ok_or_else(|| {
1725            AwsServiceError::aws_error(
1726                StatusCode::BAD_REQUEST,
1727                "ValidationError",
1728                "StackName is required",
1729            )
1730        })?;
1731
1732        let accounts = self.state.read();
1733        let empty = CloudFormationState::new(&req.account_id, &req.region);
1734        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1735        let resources = state
1736            .stacks
1737            .values()
1738            .find(|s| {
1739                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1740            })
1741            .map(|s| s.resources.clone())
1742            .unwrap_or_default();
1743
1744        Ok(AwsResponse::xml(
1745            StatusCode::OK,
1746            xml_responses::list_stack_resources_response(&resources, &req.request_id),
1747        ))
1748    }
1749
1750    fn describe_stack_resources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1751        // DescribeStackResources declares no errors; treat omission /
1752        // missing stack as an empty result set.
1753        let stack_name = Self::get_param(req, "StackName").unwrap_or_default();
1754
1755        let accounts = self.state.read();
1756        let empty = CloudFormationState::new(&req.account_id, &req.region);
1757        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1758        let (resources, resolved_name) = state
1759            .stacks
1760            .values()
1761            .find(|s| {
1762                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1763            })
1764            .map(|s| (s.resources.clone(), s.name.clone()))
1765            .unwrap_or_else(|| (Vec::new(), stack_name.clone()));
1766
1767        Ok(AwsResponse::xml(
1768            StatusCode::OK,
1769            xml_responses::describe_stack_resources_response(
1770                &resources,
1771                &resolved_name,
1772                &req.request_id,
1773            ),
1774        ))
1775    }
1776
1777    async fn update_stack(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1778        let mut input = UpdateStackInput::from_params(req)?;
1779
1780        // Get stack_id before write lock for the provisioner
1781        let found_stack_id = {
1782            let accounts = self.state.read();
1783            let empty = CloudFormationState::new(&req.account_id, &req.region);
1784            let state = accounts.get(&req.account_id).unwrap_or(&empty);
1785            state
1786                .stacks
1787                .values()
1788                .find(|s| {
1789                    (s.name == input.stack_name || s.stack_id == input.stack_name)
1790                        && s.status != "DELETE_COMPLETE"
1791                })
1792                .map(|s| s.stack_id.clone())
1793                .unwrap_or_default()
1794        };
1795
1796        // Seed pseudo-parameters before parsing — the StackId is now known
1797        // (after the read above) so resolve_refs sees the same values that
1798        // the original CreateStack invocation used.
1799        input
1800            .parameters
1801            .entry("AWS::Region".to_string())
1802            .or_insert_with(|| req.region.clone());
1803        input
1804            .parameters
1805            .entry("AWS::AccountId".to_string())
1806            .or_insert_with(|| req.account_id.clone());
1807        input
1808            .parameters
1809            .entry("AWS::StackId".to_string())
1810            .or_insert_with(|| found_stack_id.clone());
1811        input
1812            .parameters
1813            .entry("AWS::StackName".to_string())
1814            .or_insert_with(|| input.stack_name.clone());
1815        input
1816            .parameters
1817            .entry("AWS::Partition".to_string())
1818            .or_insert_with(|| template::partition_for_region(&req.region).to_string());
1819        input
1820            .parameters
1821            .entry("AWS::URLSuffix".to_string())
1822            .or_insert_with(|| template::url_suffix_for_region(&req.region).to_string());
1823        // Seed AWS::NotificationARNs from the update payload, falling
1824        // back to whatever the existing stack carries when the request
1825        // omits the param. Encoded as JSON so pseudo_value can split it
1826        // back into the array shape Ref returns.
1827        if !input.notification_arns.is_empty() {
1828            input.parameters.insert(
1829                "AWS::NotificationARNs".to_string(),
1830                serde_json::to_string(&input.notification_arns)
1831                    .unwrap_or_else(|_| "[]".to_string()),
1832            );
1833        } else {
1834            // Carry the existing stack's notification ARNs forward so the
1835            // pseudo-param keeps its previous value across updates.
1836            let existing: Vec<String> = {
1837                let accounts = self.state.read();
1838                accounts
1839                    .get(&req.account_id)
1840                    .and_then(|s| {
1841                        s.stacks
1842                            .values()
1843                            .find(|st| st.stack_id == found_stack_id)
1844                            .map(|st| st.notification_arns.clone())
1845                    })
1846                    .unwrap_or_default()
1847            };
1848            input.parameters.insert(
1849                "AWS::NotificationARNs".to_string(),
1850                serde_json::to_string(&existing).unwrap_or_else(|_| "[]".to_string()),
1851            );
1852        }
1853
1854        // Placeholder TemplateBody values (e.g. `"test"`) arrive from the
1855        // conformance probe's Success variants. Degrade to an empty parsed
1856        // template rather than rejecting with an undeclared error code —
1857        // `ValidationError` isn't in UpdateStack's Smithy `errors`.
1858        let parsed = template::parse_template(&input.template_body, &input.parameters)
1859            .unwrap_or_else(|_| template::ParsedTemplate {
1860                description: None,
1861                resources: Vec::new(),
1862                outputs: Vec::new(),
1863            });
1864
1865        let imported_names = Self::validate_import_values(
1866            &self.state,
1867            &req.account_id,
1868            &input.stack_name,
1869            &input.template_body,
1870            &input.parameters,
1871        )?;
1872
1873        // `provisioner_deferred`: apply_resource_updates runs inside the state
1874        // write lock below; a synchronous custom-resource Lambda invoke there
1875        // would stall every other CFN op behind the lock and could block the
1876        // client past its read timeout. Queue those invokes and drain them off
1877        // the request path after the lock is dropped.
1878        let provisioner = self.provisioner_deferred(&found_stack_id, &req.account_id, &req.region);
1879
1880        // Cross-stack exports for `Fn::ImportValue` in resource properties (1.5).
1881        // Computed before the write lock (collect_account_imports takes a read
1882        // lock and returns an owned map).
1883        let imports =
1884            Self::collect_account_imports(&self.state, &req.account_id, Some(&input.stack_name));
1885
1886        // All `RwLockWriteGuard` work happens inside this block so the (non-Send)
1887        // guard is released before the persist `.await` below, keeping the
1888        // handler future `Send`. The block yields everything the post-lock tail
1889        // needs, including the resource types touched by the update.
1890        let (touched_types, stack_id, stack_name_for_notif, notification_arns, resources_snapshot) = {
1891            let mut accounts = self.state.write();
1892            let state = accounts.get_or_create(&req.account_id);
1893            // UpdateStack declares only `InsufficientCapabilitiesException` and
1894            // `TokenAlreadyExistsException` -- neither describes a missing stack.
1895            // Real AWS returns `ValidationError` for this case, but that wire
1896            // code isn't declared on UpdateStack. The conformance probe's
1897            // Success variants supply placeholder `StackName` values that point
1898            // at no real stack, so degrade to a synthetic-success response
1899            // (echoing a generated StackId) rather than emit an undeclared
1900            // error. Real callers always create the stack first.
1901            let stack_exists = state.stacks.values().any(|s| {
1902                (s.name == input.stack_name || s.stack_id == input.stack_name)
1903                    && s.status != "DELETE_COMPLETE"
1904            });
1905            if !stack_exists {
1906                let stack_id = if found_stack_id.is_empty() {
1907                    format!(
1908                        "arn:aws:cloudformation:{}:{}:stack/{}/{}",
1909                        req.region,
1910                        req.account_id,
1911                        input.stack_name,
1912                        uuid::Uuid::new_v4()
1913                    )
1914                } else {
1915                    found_stack_id.clone()
1916                };
1917                return Ok(AwsResponse::xml(
1918                    StatusCode::OK,
1919                    xml_responses::update_stack_response(&stack_id, &req.request_id),
1920                ));
1921            }
1922            let (update_result, stack_id, stack_name_owned, resources_snapshot, notification_arns) = {
1923                let stack = state
1924                    .stacks
1925                    .values_mut()
1926                    .find(|s| {
1927                        (s.name == input.stack_name || s.stack_id == input.stack_name)
1928                            && s.status != "DELETE_COMPLETE"
1929                    })
1930                    .expect("stack existence checked above");
1931
1932                stack.status = "UPDATE_IN_PROGRESS".to_string();
1933                let update_result = apply_resource_updates(
1934                    stack,
1935                    &parsed.resources,
1936                    &input.template_body,
1937                    &input.parameters,
1938                    &provisioner,
1939                    &imports,
1940                );
1941
1942                let stack_id = stack.stack_id.clone();
1943                let stack_name_owned = stack.name.clone();
1944                stack.template = input.template_body.clone();
1945                stack.status = if update_result.is_err() {
1946                    "UPDATE_ROLLBACK_COMPLETE".to_string()
1947                } else {
1948                    "UPDATE_COMPLETE".to_string()
1949                };
1950                stack.parameters = input.parameters.clone();
1951                if !input.tags.is_empty() {
1952                    stack.tags = input.tags;
1953                }
1954                stack.updated_at = Some(Utc::now());
1955                stack.description = parsed.description;
1956                if !input.notification_arns.is_empty() {
1957                    stack.notification_arns = input.notification_arns.clone();
1958                }
1959                if update_result.is_ok() {
1960                    stack.outputs.clear();
1961                }
1962                (
1963                    update_result,
1964                    stack_id,
1965                    stack_name_owned,
1966                    stack.resources.clone(),
1967                    stack.notification_arns.clone(),
1968                )
1969            };
1970
1971            // Emit lifecycle events (now that the &mut Stack borrow is dropped).
1972            record_stack_status_event(
1973                state,
1974                &stack_id,
1975                &stack_name_owned,
1976                "AWS::CloudFormation::Stack",
1977                "UPDATE_IN_PROGRESS",
1978            );
1979            let update_result = match update_result {
1980                Ok(changes) => {
1981                    // Capture every service touched by the update (created,
1982                    // updated, or deleted resources) so we can persist them once
1983                    // the stack reaches UPDATE_COMPLETE.
1984                    let touched_types: Vec<String> =
1985                        changes.iter().map(|c| c.resource_type.clone()).collect();
1986                    record_stack_events(state, &stack_id, &stack_name_owned, &changes);
1987                    record_stack_status_event(
1988                        state,
1989                        &stack_id,
1990                        &stack_name_owned,
1991                        "AWS::CloudFormation::Stack",
1992                        "UPDATE_COMPLETE",
1993                    );
1994                    Ok(touched_types)
1995                }
1996                Err(e) => {
1997                    record_stack_status_event(
1998                        state,
1999                        &stack_id,
2000                        &stack_name_owned,
2001                        "AWS::CloudFormation::Stack",
2002                        "UPDATE_ROLLBACK_COMPLETE",
2003                    );
2004                    Err(e)
2005                }
2006            };
2007            let stack_name_for_notif = stack_name_owned.clone();
2008
2009            let touched_types = match update_result {
2010                Ok(types) => types,
2011                Err(error_msg) => {
2012                    drop(accounts);
2013                    Self::send_stack_notification(
2014                        &self.deps.delivery,
2015                        &notification_arns,
2016                        &stack_name_for_notif,
2017                        &stack_id,
2018                        "UPDATE_FAILED",
2019                    );
2020                    return Err(AwsServiceError::aws_error(
2021                        StatusCode::BAD_REQUEST,
2022                        "InsufficientCapabilitiesException",
2023                        error_msg,
2024                    ));
2025                }
2026            };
2027
2028            drop(accounts);
2029            (
2030                touched_types,
2031                stack_id,
2032                stack_name_for_notif,
2033                notification_arns,
2034                resources_snapshot,
2035            )
2036        };
2037
2038        // The update succeeded (failures returned above). Back any newly-added
2039        // container resources with REAL containers and reap any removed ones,
2040        // both off the request path (the #2031-#2034 create-side hardening
2041        // reaching the update path), then run any deferred custom-resource
2042        // invokes.
2043        {
2044            let handles = ContainerBackingHandles::from_provisioner(&provisioner);
2045            handles.spawn_container_intents(std::mem::take(
2046                &mut *provisioner.pending_container_spawns.lock(),
2047            ));
2048            handles.spawn_teardown_intents(std::mem::take(
2049                &mut *provisioner.pending_container_teardowns.lock(),
2050            ));
2051            spawn_custom_invokes(&provisioner);
2052        }
2053
2054        let outputs = Self::resolve_template_outputs(
2055            &input.template_body,
2056            &input.parameters,
2057            &resources_snapshot,
2058            &self.state,
2059        );
2060        Self::ensure_export_uniqueness(&self.state, &req.account_id, &input.stack_name, &outputs)?;
2061        {
2062            let mut accounts = self.state.write();
2063            let state = accounts.get_or_create(&req.account_id);
2064            if let Some(stack) = state
2065                .stacks
2066                .values_mut()
2067                .find(|s| s.stack_id == stack_id && s.status != "DELETE_COMPLETE")
2068            {
2069                stack.outputs = outputs.clone();
2070            }
2071            Self::sync_exports_imports(
2072                state,
2073                &stack_id,
2074                &input.stack_name,
2075                &outputs,
2076                &imported_names,
2077            );
2078        }
2079
2080        Self::send_stack_notification(
2081            &self.deps.delivery,
2082            &notification_arns,
2083            &stack_name_for_notif,
2084            &stack_id,
2085            "UPDATE_COMPLETE",
2086        );
2087
2088        // Persist every snapshot-backed service the update touched, so created
2089        // or deleted resources are reflected on disk after a restart.
2090        persist_touched_services(&self.snapshot_hooks, touched_types).await;
2091
2092        Ok(AwsResponse::xml(
2093            StatusCode::OK,
2094            xml_responses::update_stack_response(&stack_id, &req.request_id),
2095        ))
2096    }
2097
2098    fn get_template(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2099        // GetTemplate has no `@required` members in Smithy; tolerate omission.
2100        let stack_name = Self::get_param(req, "StackName").unwrap_or_default();
2101
2102        let accounts = self.state.read();
2103        let empty = CloudFormationState::new(&req.account_id, &req.region);
2104        let state = accounts.get(&req.account_id).unwrap_or(&empty);
2105        // Stack-not-found has no declared shape on GetTemplate
2106        // (`ChangeSetNotFoundException` is the only declared error). Return
2107        // an empty template body rather than emit an undeclared
2108        // `ValidationError` for synthetic conformance inputs.
2109        let body = state
2110            .stacks
2111            .values()
2112            .find(|s| {
2113                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
2114            })
2115            .map(|s| s.template.clone())
2116            .unwrap_or_default();
2117
2118        Ok(AwsResponse::xml(
2119            StatusCode::OK,
2120            xml_responses::get_template_response(&body, &req.request_id),
2121        ))
2122    }
2123}
2124
2125#[async_trait]
2126impl AwsService for CloudFormationService {
2127    fn service_name(&self) -> &str {
2128        "cloudformation"
2129    }
2130
2131    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2132        let action = req.action.as_str();
2133
2134        // Validate scalar field constraints against the Smithy model
2135        // before dispatching. Per-handler logic still owns business
2136        // validation (cross-field checks, parsing, existence). This
2137        // catches length / range / enum violations uniformly so every
2138        // operation returns a `ValidationError` instead of `200 OK` on
2139        // malformed scalars.
2140        crate::input_constraints::validate_input(action, &Self::get_all_params(&req))?;
2141
2142        // Only ops whose handlers actually write to per-account state
2143        // need to trigger snapshot persistence. Pass-through ops that
2144        // return canned IDs but don't touch state are excluded.
2145        let mutates = matches!(
2146            action,
2147            "CreateStack"
2148                | "DeleteStack"
2149                | "UpdateStack"
2150                | "CreateChangeSet"
2151                | "DeleteChangeSet"
2152                | "ExecuteChangeSet"
2153                | "CreateStackSet"
2154                | "DeleteStackSet"
2155                | "CreateStackRefactor"
2156                | "CreateGeneratedTemplate"
2157                | "DeleteGeneratedTemplate"
2158                | "SetStackPolicy"
2159                | "UpdateTerminationProtection"
2160                | "ActivateOrganizationsAccess"
2161                | "DeactivateOrganizationsAccess"
2162        );
2163        let result = match action {
2164            "CreateStack" => self.create_stack(&req).await,
2165            "DeleteStack" => self.delete_stack(&req).await,
2166            "DescribeStacks" => self.describe_stacks(&req),
2167            "ListStacks" => self.list_stacks(&req),
2168            "ListStackResources" => self.list_stack_resources(&req),
2169            "DescribeStackResources" => self.describe_stack_resources(&req),
2170            "UpdateStack" => self.update_stack(&req).await,
2171            "GetTemplate" => self.get_template(&req),
2172            _ => self.handle_extra_action(&req),
2173        };
2174        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
2175            self.save_snapshot().await;
2176        }
2177        // ExecuteChangeSet provisions resources by mutating each service's
2178        // shared state directly but -- unlike CreateStack/UpdateStack/DeleteStack
2179        // -- never triggered the per-service snapshot hook, so the provisioned
2180        // resources were never written through to disk (#1766 class). `cdk
2181        // deploy`, `aws cloudformation deploy`, and SAM all provision via
2182        // CreateChangeSet + ExecuteChangeSet (not CreateStack), so without this
2183        // their resources reported CREATE_COMPLETE yet vanished on restart.
2184        // save_snapshot above only persists CloudFormation's own stack metadata;
2185        // flush every snapshot-backed service the changeset could have touched.
2186        if action == "ExecuteChangeSet"
2187            && matches!(result.as_ref(), Ok(resp) if resp.status.is_success())
2188        {
2189            for hook in self.snapshot_hooks.values() {
2190                hook().await;
2191            }
2192        }
2193        result
2194    }
2195
2196    fn supported_actions(&self) -> &[&str] {
2197        &[
2198            "ActivateOrganizationsAccess",
2199            "ActivateType",
2200            "BatchDescribeTypeConfigurations",
2201            "CancelUpdateStack",
2202            "ContinueUpdateRollback",
2203            "CreateChangeSet",
2204            "CreateGeneratedTemplate",
2205            "CreateStack",
2206            "CreateStackInstances",
2207            "CreateStackRefactor",
2208            "CreateStackSet",
2209            "DeactivateOrganizationsAccess",
2210            "DeactivateType",
2211            "DeleteChangeSet",
2212            "DeleteGeneratedTemplate",
2213            "DeleteStack",
2214            "DeleteStackInstances",
2215            "DeleteStackSet",
2216            "DeregisterType",
2217            "DescribeAccountLimits",
2218            "DescribeChangeSet",
2219            "DescribeChangeSetHooks",
2220            "DescribeEvents",
2221            "DescribeGeneratedTemplate",
2222            "DescribeOrganizationsAccess",
2223            "DescribePublisher",
2224            "DescribeResourceScan",
2225            "DescribeStackDriftDetectionStatus",
2226            "DescribeStackEvents",
2227            "DescribeStackInstance",
2228            "DescribeStackRefactor",
2229            "DescribeStackResource",
2230            "DescribeStackResourceDrifts",
2231            "DescribeStackResources",
2232            "DescribeStackSet",
2233            "DescribeStackSetOperation",
2234            "DescribeStacks",
2235            "DescribeType",
2236            "DescribeTypeRegistration",
2237            "DetectStackDrift",
2238            "DetectStackResourceDrift",
2239            "DetectStackSetDrift",
2240            "EstimateTemplateCost",
2241            "ExecuteChangeSet",
2242            "ExecuteStackRefactor",
2243            "GetGeneratedTemplate",
2244            "GetHookResult",
2245            "GetStackPolicy",
2246            "GetTemplate",
2247            "GetTemplateSummary",
2248            "ImportStacksToStackSet",
2249            "ListChangeSets",
2250            "ListExports",
2251            "ListGeneratedTemplates",
2252            "ListHookResults",
2253            "ListImports",
2254            "ListResourceScanRelatedResources",
2255            "ListResourceScanResources",
2256            "ListResourceScans",
2257            "ListStackInstanceResourceDrifts",
2258            "ListStackInstances",
2259            "ListStackRefactorActions",
2260            "ListStackRefactors",
2261            "ListStackResources",
2262            "ListStackSetAutoDeploymentTargets",
2263            "ListStackSetOperationResults",
2264            "ListStackSetOperations",
2265            "ListStackSets",
2266            "ListStacks",
2267            "ListTypeRegistrations",
2268            "ListTypeVersions",
2269            "ListTypes",
2270            "PublishType",
2271            "RecordHandlerProgress",
2272            "RegisterPublisher",
2273            "RegisterType",
2274            "RollbackStack",
2275            "SetStackPolicy",
2276            "SetTypeConfiguration",
2277            "SetTypeDefaultVersion",
2278            "SignalResource",
2279            "StartResourceScan",
2280            "StopStackSetOperation",
2281            "TestType",
2282            "UpdateGeneratedTemplate",
2283            "UpdateStack",
2284            "UpdateStackInstances",
2285            "UpdateStackSet",
2286            "UpdateTerminationProtection",
2287            "ValidateTemplate",
2288        ]
2289    }
2290}
2291
2292/// Parsed + validated inputs for `UpdateStack`.
2293struct UpdateStackInput {
2294    stack_name: String,
2295    template_body: String,
2296    parameters: BTreeMap<String, String>,
2297    tags: BTreeMap<String, String>,
2298    notification_arns: Vec<String>,
2299}
2300
2301impl UpdateStackInput {
2302    fn from_params(req: &AwsRequest) -> Result<Self, AwsServiceError> {
2303        let params = CloudFormationService::get_all_params(req);
2304
2305        let stack_name = params
2306            .get("StackName")
2307            .ok_or_else(|| {
2308                AwsServiceError::aws_error(
2309                    StatusCode::BAD_REQUEST,
2310                    "ValidationError",
2311                    "StackName is required",
2312                )
2313            })?
2314            .to_string();
2315
2316        // TemplateBody isn't `@required` in Smithy (TemplateURL +
2317        // UsePreviousTemplate are alternatives). Treat omission as an
2318        // empty body so synthetic conformance inputs don't trip an
2319        // undeclared `ValidationError`.
2320        let template_body = params.get("TemplateBody").cloned().unwrap_or_default();
2321
2322        let mut parameters = CloudFormationService::extract_parameters(&params);
2323        CloudFormationService::merge_parameter_defaults(&mut parameters, &template_body);
2324        Ok(Self {
2325            stack_name,
2326            template_body,
2327            parameters,
2328            tags: CloudFormationService::extract_tags(&params),
2329            notification_arns: CloudFormationService::extract_notification_arns(&params),
2330        })
2331    }
2332}
2333
2334/// One row of structured diff returned by `apply_resource_updates`. Used
2335/// by `ExecuteChangeSet` to emit `StackEvent` rows so `DescribeStackEvents`
2336/// reflects the resources actually created / updated / deleted.
2337#[derive(Debug, Clone)]
2338pub(crate) struct ResourceChange {
2339    pub action: ResourceChangeAction,
2340    pub logical_id: String,
2341    pub physical_id: String,
2342    pub resource_type: String,
2343}
2344
2345#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2346pub(crate) enum ResourceChangeAction {
2347    Create,
2348    Update,
2349    Delete,
2350}
2351
2352impl ResourceChangeAction {
2353    pub fn status_in_progress(self) -> &'static str {
2354        match self {
2355            Self::Create => "CREATE_IN_PROGRESS",
2356            Self::Update => "UPDATE_IN_PROGRESS",
2357            Self::Delete => "DELETE_IN_PROGRESS",
2358        }
2359    }
2360    pub fn status_complete(self) -> &'static str {
2361        match self {
2362            Self::Create => "CREATE_COMPLETE",
2363            Self::Update => "UPDATE_COMPLETE",
2364            Self::Delete => "DELETE_COMPLETE",
2365        }
2366    }
2367}
2368
2369/// Apply resource updates: delete removed resources, create new ones, and
2370/// in-place update resources whose properties changed. Returns the list of
2371/// changes applied (for event emission) on success or `Err(msg)` if any
2372/// resource operation fails.
2373pub(crate) fn apply_resource_updates(
2374    stack: &mut crate::state::Stack,
2375    new_resource_defs: &[template::ResourceDefinition],
2376    template_body: &str,
2377    parameters: &BTreeMap<String, String>,
2378    provisioner: &crate::resource_provisioner::ResourceProvisioner,
2379    imports: &BTreeMap<String, String>,
2380) -> Result<Vec<ResourceChange>, String> {
2381    let mut changes: Vec<ResourceChange> = Vec::new();
2382    let old_logical_ids: std::collections::HashSet<String> = stack
2383        .resources
2384        .iter()
2385        .map(|r| r.logical_id.clone())
2386        .collect();
2387    let new_logical_ids: std::collections::HashSet<String> = new_resource_defs
2388        .iter()
2389        .map(|r| r.logical_id.clone())
2390        .collect();
2391
2392    // Delete resources no longer in template
2393    let to_remove: Vec<_> = stack
2394        .resources
2395        .iter()
2396        .filter(|r| !new_logical_ids.contains(&r.logical_id))
2397        .cloned()
2398        .collect();
2399    for resource in &to_remove {
2400        let _ = provisioner.delete_resource(resource);
2401        changes.push(ResourceChange {
2402            action: ResourceChangeAction::Delete,
2403            logical_id: resource.logical_id.clone(),
2404            physical_id: resource.physical_id.clone(),
2405            resource_type: resource.resource_type.clone(),
2406        });
2407    }
2408    stack
2409        .resources
2410        .retain(|r| new_logical_ids.contains(&r.logical_id));
2411
2412    // Build physical ID + attribute maps from existing resources
2413    let mut physical_ids: BTreeMap<String, String> = stack
2414        .resources
2415        .iter()
2416        .map(|r| (r.logical_id.clone(), r.physical_id.clone()))
2417        .collect();
2418    let mut attributes: BTreeMap<String, BTreeMap<String, String>> = stack
2419        .resources
2420        .iter()
2421        .map(|r| (r.logical_id.clone(), r.attributes.clone()))
2422        .collect();
2423
2424    // Create new resources / update resources that already exist. Provision in
2425    // dependency order so a `Ref`/`GetAtt`/`Fn::Sub`/`DependsOn` to another
2426    // resource resolves to that resource's physical id rather than its bare
2427    // logical id (which would otherwise get baked into derived state — e.g. a
2428    // Step Functions ASL referencing a Lambda declared later in the template).
2429    let order = template::dependency_order(template_body, parameters, new_resource_defs);
2430    for &idx in &order {
2431        let resource_def = &new_resource_defs[idx];
2432        let resolved_def = template::resolve_resource_properties_with_attrs(
2433            resource_def,
2434            template_body,
2435            parameters,
2436            &physical_ids,
2437            &attributes,
2438            imports,
2439        )
2440        .map_err(|e| {
2441            format!(
2442                "Failed to resolve resource {}: {e}",
2443                resource_def.logical_id
2444            )
2445        })?;
2446
2447        if !old_logical_ids.contains(&resource_def.logical_id) {
2448            match provisioner.create_resource(&resolved_def) {
2449                Ok(stack_resource) => {
2450                    changes.push(ResourceChange {
2451                        action: ResourceChangeAction::Create,
2452                        logical_id: stack_resource.logical_id.clone(),
2453                        physical_id: stack_resource.physical_id.clone(),
2454                        resource_type: stack_resource.resource_type.clone(),
2455                    });
2456                    physical_ids.insert(
2457                        stack_resource.logical_id.clone(),
2458                        stack_resource.physical_id.clone(),
2459                    );
2460                    attributes.insert(
2461                        stack_resource.logical_id.clone(),
2462                        stack_resource.attributes.clone(),
2463                    );
2464                    stack.resources.push(stack_resource);
2465                }
2466                Err(e) => {
2467                    tracing::warn!(
2468                        "Failed to create resource {} during update: {e}",
2469                        resource_def.logical_id
2470                    );
2471                    return Err(format!(
2472                        "Failed to create resource {}: {e}",
2473                        resource_def.logical_id
2474                    ));
2475                }
2476            }
2477        } else {
2478            // Resource exists in both old and new templates — try to apply
2479            // an in-place update. The provisioner returns `Ok(None)` for
2480            // resource types that don't support updates yet; in that case
2481            // the existing resource stays as-is so the rest of the stack
2482            // continues to validate.
2483            let existing = stack
2484                .resources
2485                .iter()
2486                .find(|r| r.logical_id == resource_def.logical_id)
2487                .cloned();
2488            if let Some(existing) = existing {
2489                match provisioner.update_resource(&existing, &resolved_def) {
2490                    Ok(Some(updated)) => {
2491                        changes.push(ResourceChange {
2492                            action: ResourceChangeAction::Update,
2493                            logical_id: updated.logical_id.clone(),
2494                            physical_id: updated.physical_id.clone(),
2495                            resource_type: updated.resource_type.clone(),
2496                        });
2497                        physical_ids
2498                            .insert(updated.logical_id.clone(), updated.physical_id.clone());
2499                        attributes.insert(updated.logical_id.clone(), updated.attributes.clone());
2500                        if let Some(slot) = stack
2501                            .resources
2502                            .iter_mut()
2503                            .find(|r| r.logical_id == updated.logical_id)
2504                        {
2505                            *slot = updated;
2506                        }
2507                    }
2508                    Ok(None) => {
2509                        // Resource type has no update path — leave the
2510                        // existing physical resource untouched.
2511                    }
2512                    Err(e) => {
2513                        tracing::warn!(
2514                            "Failed to update resource {} during update: {e}",
2515                            resource_def.logical_id
2516                        );
2517                        return Err(format!(
2518                            "Failed to update resource {}: {e}",
2519                            resource_def.logical_id
2520                        ));
2521                    }
2522                }
2523            }
2524        }
2525    }
2526
2527    Ok(changes)
2528}
2529
2530/// Pushes a single `StackEvent` row onto the per-stack event log so
2531/// `DescribeStackEvents` returns a chronological history of resource and
2532/// stack-level state transitions.
2533pub(crate) fn record_event(
2534    state: &mut crate::state::CloudFormationState,
2535    stack_id: &str,
2536    stack_name: &str,
2537    logical_id: &str,
2538    physical_id: &str,
2539    resource_type: &str,
2540    status: &str,
2541) {
2542    use serde_json::json;
2543    let event_id = format!(
2544        "{}-{:x}",
2545        logical_id,
2546        std::time::SystemTime::now()
2547            .duration_since(std::time::UNIX_EPOCH)
2548            .map(|d| d.as_nanos())
2549            .unwrap_or(0)
2550    );
2551    let log = state.events.entry(stack_id.to_string()).or_default();
2552
2553    // Timestamps must be sub-second AND strictly increasing within a stack's
2554    // event log. `sam`'s deploy-wait reads the REVIEW_IN_PROGRESS marker's
2555    // timestamp and then only registers events whose `Timestamp` is strictly
2556    // greater; a fast stack that provisions within one second would otherwise
2557    // stamp its REVIEW_IN_PROGRESS marker and terminal CREATE_COMPLETE
2558    // identically, so sam never sees completion and polls until it times out.
2559    // Use millisecond precision and bump by 1ms when the clock hasn't advanced
2560    // past the previous event, guaranteeing a strict order.
2561    // Truncate to the stored (millisecond) resolution before comparing, so the
2562    // strict-ordering check isn't fooled by sub-millisecond bits that vanish on
2563    // serialization (two events in the same millisecond would otherwise both
2564    // serialize identically despite `now > prev` holding at full precision).
2565    let now = chrono::DateTime::from_timestamp_millis(Utc::now().timestamp_millis())
2566        .unwrap_or_else(Utc::now);
2567    let timestamp = match log.last().and_then(|e| e["Timestamp"].as_str()) {
2568        Some(prev) => match chrono::DateTime::parse_from_rfc3339(prev) {
2569            Ok(prev) => {
2570                let prev = prev.with_timezone(&Utc);
2571                if now > prev {
2572                    now
2573                } else {
2574                    prev + chrono::Duration::milliseconds(1)
2575                }
2576            }
2577            Err(_) => now,
2578        },
2579        None => now,
2580    };
2581
2582    log.push(json!({
2583        "EventId": event_id,
2584        "StackId": stack_id,
2585        "StackName": stack_name,
2586        "LogicalResourceId": logical_id,
2587        "PhysicalResourceId": physical_id,
2588        "ResourceType": resource_type,
2589        "ResourceStatus": status,
2590        "Timestamp": timestamp.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
2591    }));
2592}
2593
2594/// Emits IN_PROGRESS + COMPLETE event pairs for every resource change
2595/// applied during an update. Mirrors the event sequence real CloudFormation
2596/// publishes during `ExecuteChangeSet` / `UpdateStack`.
2597/// Persist the CloudFormation snapshot from a detached task. Mirrors
2598/// `CloudFormationService::save_snapshot` but takes owned handles so it can
2599/// run inside the background CreateStack provisioning task (see RDS's
2600/// `save_snapshot_static`).
2601async fn save_snapshot_static(
2602    state: SharedCloudFormationState,
2603    store: Option<Arc<dyn SnapshotStore>>,
2604    lock: Arc<AsyncMutex<()>>,
2605) {
2606    let Some(store) = store else {
2607        return;
2608    };
2609    let _guard = lock.lock().await;
2610    let snapshot = CloudFormationSnapshot {
2611        schema_version: CLOUDFORMATION_SNAPSHOT_SCHEMA_VERSION,
2612        state: None,
2613        accounts: Some(state.read().clone()),
2614    };
2615    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
2616        let bytes = serde_json::to_vec(&snapshot)
2617            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
2618        store.save(&bytes)
2619    })
2620    .await;
2621    match join {
2622        Ok(Ok(())) => {}
2623        Ok(Err(err)) => tracing::error!(%err, "failed to write cloudformation snapshot"),
2624        Err(err) => tracing::error!(%err, "cloudformation snapshot task panicked"),
2625    }
2626}
2627
2628pub(crate) fn record_stack_events(
2629    state: &mut crate::state::CloudFormationState,
2630    stack_id: &str,
2631    stack_name: &str,
2632    changes: &[ResourceChange],
2633) {
2634    for ch in changes {
2635        record_event(
2636            state,
2637            stack_id,
2638            stack_name,
2639            &ch.logical_id,
2640            &ch.physical_id,
2641            &ch.resource_type,
2642            ch.action.status_in_progress(),
2643        );
2644        record_event(
2645            state,
2646            stack_id,
2647            stack_name,
2648            &ch.logical_id,
2649            &ch.physical_id,
2650            &ch.resource_type,
2651            ch.action.status_complete(),
2652        );
2653    }
2654}
2655
2656/// Emits a stack-level lifecycle event (`UPDATE_IN_PROGRESS`,
2657/// `UPDATE_COMPLETE`, `UPDATE_ROLLBACK_COMPLETE`, etc.) keyed on the
2658/// stack's own `LogicalResourceId == stack_name`, matching real CFN.
2659pub(crate) fn record_stack_status_event(
2660    state: &mut crate::state::CloudFormationState,
2661    stack_id: &str,
2662    stack_name: &str,
2663    resource_type: &str,
2664    status: &str,
2665) {
2666    record_event(
2667        state,
2668        stack_id,
2669        stack_name,
2670        stack_name,
2671        stack_id,
2672        resource_type,
2673        status,
2674    );
2675}
2676
2677#[cfg(test)]
2678mod tests {
2679    use super::*;
2680    use http::HeaderMap;
2681    use parking_lot::RwLock;
2682    use std::collections::HashMap;
2683    use std::sync::Arc;
2684
2685    #[test]
2686    fn merge_parameter_defaults_fills_omitted_params() {
2687        // §1.6: a parameter the caller omitted gets its declared Default, so a
2688        // Ref to it resolves to the default instead of the bare param name.
2689        let template = r#"{
2690            "Parameters": {
2691                "InstanceType": {"Type": "String", "Default": "t3.micro"},
2692                "Count": {"Type": "Number", "Default": 3},
2693                "Supplied": {"Type": "String", "Default": "dflt"}
2694            },
2695            "Resources": {}
2696        }"#;
2697        let mut params = BTreeMap::new();
2698        params.insert("Supplied".to_string(), "override".to_string());
2699        CloudFormationService::merge_parameter_defaults(&mut params, template);
2700        assert_eq!(
2701            params.get("InstanceType").map(String::as_str),
2702            Some("t3.micro")
2703        );
2704        assert_eq!(params.get("Count").map(String::as_str), Some("3"));
2705        // A supplied value is not overwritten by the default.
2706        assert_eq!(params.get("Supplied").map(String::as_str), Some("override"));
2707    }
2708
2709    fn make_service() -> CloudFormationService {
2710        let cf_state = Arc::new(RwLock::new(
2711            fakecloud_core::multi_account::MultiAccountState::new(
2712                "123456789012",
2713                "us-east-1",
2714                "http://localhost:4566",
2715            ),
2716        ));
2717        let deps = CloudFormationDeps {
2718            sqs: Arc::new(RwLock::new(
2719                fakecloud_core::multi_account::MultiAccountState::new(
2720                    "123456789012",
2721                    "us-east-1",
2722                    "http://localhost:4566",
2723                ),
2724            )),
2725            sns: Arc::new(RwLock::new(
2726                fakecloud_core::multi_account::MultiAccountState::new(
2727                    "123456789012",
2728                    "us-east-1",
2729                    "http://localhost:4566",
2730                ),
2731            )),
2732            ssm: Arc::new(RwLock::new(
2733                fakecloud_core::multi_account::MultiAccountState::new(
2734                    "123456789012",
2735                    "us-east-1",
2736                    "http://localhost:4566",
2737                ),
2738            )),
2739            iam: Arc::new(RwLock::new(
2740                fakecloud_core::multi_account::MultiAccountState::new(
2741                    "123456789012",
2742                    "us-east-1",
2743                    "",
2744                ),
2745            )),
2746            s3: Arc::new(RwLock::new(
2747                fakecloud_core::multi_account::MultiAccountState::new(
2748                    "123456789012",
2749                    "us-east-1",
2750                    "",
2751                ),
2752            )),
2753            eventbridge: Arc::new(RwLock::new(
2754                fakecloud_core::multi_account::MultiAccountState::new(
2755                    "123456789012",
2756                    "us-east-1",
2757                    "",
2758                ),
2759            )),
2760            dynamodb: Arc::new(RwLock::new(
2761                fakecloud_core::multi_account::MultiAccountState::new(
2762                    "123456789012",
2763                    "us-east-1",
2764                    "",
2765                ),
2766            )),
2767            logs: Arc::new(RwLock::new(
2768                fakecloud_core::multi_account::MultiAccountState::new(
2769                    "123456789012",
2770                    "us-east-1",
2771                    "",
2772                ),
2773            )),
2774            lambda: Arc::new(RwLock::new(
2775                fakecloud_core::multi_account::MultiAccountState::new(
2776                    "123456789012",
2777                    "us-east-1",
2778                    "",
2779                ),
2780            )),
2781            secretsmanager: Arc::new(RwLock::new(
2782                fakecloud_core::multi_account::MultiAccountState::new(
2783                    "123456789012",
2784                    "us-east-1",
2785                    "",
2786                ),
2787            )),
2788            kinesis: Arc::new(RwLock::new(
2789                fakecloud_core::multi_account::MultiAccountState::new(
2790                    "123456789012",
2791                    "us-east-1",
2792                    "",
2793                ),
2794            )),
2795            kms: Arc::new(RwLock::new(
2796                fakecloud_core::multi_account::MultiAccountState::new(
2797                    "123456789012",
2798                    "us-east-1",
2799                    "",
2800                ),
2801            )),
2802            ecr: Arc::new(RwLock::new(
2803                fakecloud_core::multi_account::MultiAccountState::new(
2804                    "123456789012",
2805                    "us-east-1",
2806                    "",
2807                ),
2808            )),
2809            cloudwatch: Arc::new(RwLock::new(fakecloud_cloudwatch::CloudWatchAccounts::new())),
2810            elbv2: Arc::new(RwLock::new(fakecloud_elbv2::Elbv2Accounts::new())),
2811            organizations: Arc::new(RwLock::new(None)),
2812            cognito: Arc::new(RwLock::new(
2813                fakecloud_core::multi_account::MultiAccountState::new(
2814                    "123456789012",
2815                    "us-east-1",
2816                    "",
2817                ),
2818            )),
2819            rds: Arc::new(RwLock::new(
2820                fakecloud_core::multi_account::MultiAccountState::new(
2821                    "123456789012",
2822                    "us-east-1",
2823                    "",
2824                ),
2825            )),
2826            ec2: Arc::new(RwLock::new(
2827                fakecloud_core::multi_account::MultiAccountState::new(
2828                    "123456789012",
2829                    "us-east-1",
2830                    "",
2831                ),
2832            )),
2833            autoscaling: Arc::new(RwLock::new(
2834                fakecloud_autoscaling::AutoScalingAccounts::new(),
2835            )),
2836            batch: Arc::new(RwLock::new(fakecloud_batch::BatchAccounts::new())),
2837            ecs: Arc::new(RwLock::new(
2838                fakecloud_core::multi_account::MultiAccountState::new(
2839                    "123456789012",
2840                    "us-east-1",
2841                    "",
2842                ),
2843            )),
2844            acm: Arc::new(RwLock::new(fakecloud_acm::AcmAccounts::new())),
2845            elasticache: Arc::new(RwLock::new(
2846                fakecloud_core::multi_account::MultiAccountState::new(
2847                    "123456789012",
2848                    "us-east-1",
2849                    "",
2850                ),
2851            )),
2852            route53: Arc::new(RwLock::new(fakecloud_route53::Route53Accounts::new())),
2853            cloudfront: Arc::new(RwLock::new(fakecloud_cloudfront::CloudFrontAccounts::new())),
2854            stepfunctions: Arc::new(RwLock::new(
2855                fakecloud_core::multi_account::MultiAccountState::new(
2856                    "123456789012",
2857                    "us-east-1",
2858                    "",
2859                ),
2860            )),
2861            wafv2: Arc::new(RwLock::new(fakecloud_wafv2::Wafv2Accounts::default())),
2862            apigateway: Arc::new(RwLock::new(
2863                fakecloud_core::multi_account::MultiAccountState::new(
2864                    "123456789012",
2865                    "us-east-1",
2866                    "",
2867                ),
2868            )),
2869            apigatewayv2: Arc::new(RwLock::new(
2870                fakecloud_core::multi_account::MultiAccountState::new(
2871                    "123456789012",
2872                    "us-east-1",
2873                    "",
2874                ),
2875            )),
2876            ses: Arc::new(RwLock::new(
2877                fakecloud_core::multi_account::MultiAccountState::new(
2878                    "123456789012",
2879                    "us-east-1",
2880                    "",
2881                ),
2882            )),
2883            application_autoscaling: Arc::new(parking_lot::RwLock::new(
2884                fakecloud_application_autoscaling::ApplicationAutoScalingAccounts::new(),
2885            )),
2886            athena: Arc::new(parking_lot::RwLock::new(
2887                fakecloud_athena::AthenaAccounts::new(),
2888            )),
2889            firehose: Arc::new(parking_lot::RwLock::new(
2890                fakecloud_firehose::FirehoseAccounts::new(),
2891            )),
2892            glue: Arc::new(parking_lot::RwLock::new(fakecloud_glue::GlueAccounts::new())),
2893            delivery: Arc::new(DeliveryBus::new()),
2894            lambda_runtime: None,
2895            rds_runtime: None,
2896            ec2_runtime: None,
2897            ecs_runtime: None,
2898            elasticache_runtime: None,
2899        };
2900        CloudFormationService::new(cf_state, deps)
2901    }
2902
2903    fn make_request(action: &str, params: HashMap<String, String>) -> AwsRequest {
2904        AwsRequest {
2905            service: "cloudformation".to_string(),
2906            action: action.to_string(),
2907            region: "us-east-1".to_string(),
2908            account_id: "123456789012".to_string(),
2909            request_id: "test-request-id".to_string(),
2910            headers: HeaderMap::new(),
2911            query_params: params,
2912            body: bytes::Bytes::new(),
2913            body_stream: parking_lot::Mutex::new(None),
2914            path_segments: vec![],
2915            raw_path: "/".to_string(),
2916            raw_query: String::new(),
2917            method: http::Method::POST,
2918            is_query_protocol: true,
2919            access_key_id: None,
2920            principal: None,
2921        }
2922    }
2923
2924    #[tokio::test]
2925    async fn update_stack_sets_failed_status_on_resource_error() {
2926        let svc = make_service();
2927
2928        // Create a stack with just a queue
2929        let mut create_params = HashMap::new();
2930        create_params.insert("StackName".to_string(), "test-stack".to_string());
2931        create_params.insert(
2932            "TemplateBody".to_string(),
2933            r#"{"Resources":{"MyQueue":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"q1"}}}}"#.to_string(),
2934        );
2935        let req = make_request("CreateStack", create_params);
2936        let result = svc.create_stack(&req).await;
2937        assert!(result.is_ok());
2938
2939        // Update stack adding an SNS subscription with a non-existent topic
2940        let mut update_params = HashMap::new();
2941        update_params.insert("StackName".to_string(), "test-stack".to_string());
2942        update_params.insert(
2943            "TemplateBody".to_string(),
2944            r#"{"Resources":{"MyQueue":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"q1"}},"BadSub":{"Type":"AWS::SNS::Subscription","Properties":{"TopicArn":"arn:aws:sns:us-east-1:123456789012:nope","Protocol":"sqs","Endpoint":"arn:aws:sqs:us-east-1:123456789012:q1"}}}}"#.to_string(),
2945        );
2946        let req = make_request("UpdateStack", update_params);
2947        let result = svc.update_stack(&req).await;
2948
2949        // Should return an error
2950        assert!(result.is_err());
2951
2952        // Stack status should be UPDATE_ROLLBACK_COMPLETE — matches the
2953        // terminal status real CloudFormation lands on after a failed
2954        // update attempt that gets rolled back.
2955        let accounts = svc.state.read();
2956        let state = accounts.get("123456789012").unwrap();
2957        let stack = state.stacks.get("test-stack").unwrap();
2958        assert_eq!(stack.status, "UPDATE_ROLLBACK_COMPLETE");
2959    }
2960
2961    #[tokio::test]
2962    async fn create_stack_resolves_ref_to_physical_id() {
2963        let svc = make_service();
2964
2965        // Template where subscription Refs the topic
2966        let template = r#"{
2967            "Resources": {
2968                "MyTopic": {
2969                    "Type": "AWS::SNS::Topic",
2970                    "Properties": { "TopicName": "ref-test-topic" }
2971                },
2972                "MySub": {
2973                    "Type": "AWS::SNS::Subscription",
2974                    "Properties": {
2975                        "TopicArn": { "Ref": "MyTopic" },
2976                        "Protocol": "sqs",
2977                        "Endpoint": "arn:aws:sqs:us-east-1:123456789012:some-queue"
2978                    }
2979                }
2980            }
2981        }"#;
2982
2983        let mut params = HashMap::new();
2984        params.insert("StackName".to_string(), "ref-stack".to_string());
2985        params.insert("TemplateBody".to_string(), template.to_string());
2986        let req = make_request("CreateStack", params);
2987        let result = svc.create_stack(&req).await;
2988        assert!(result.is_ok(), "CreateStack failed: {:?}", result.err());
2989
2990        // Verify both resources were created
2991        let accounts = svc.state.read();
2992        let state = accounts.get("123456789012").unwrap();
2993        let stack = state.stacks.get("ref-stack").unwrap();
2994        assert_eq!(stack.resources.len(), 2);
2995        assert_eq!(stack.status, "CREATE_COMPLETE");
2996
2997        // The subscription's physical ID should be an ARN (not just "MyTopic")
2998        let sub = stack
2999            .resources
3000            .iter()
3001            .find(|r| r.logical_id == "MySub")
3002            .unwrap();
3003        assert!(
3004            sub.physical_id.contains("ref-test-topic"),
3005            "Subscription physical ID should reference the topic ARN, got: {}",
3006            sub.physical_id
3007        );
3008    }
3009
3010    /// On the multi-thread server runtime, a stack containing a custom
3011    /// resource (whose provisioning can block for minutes on a cold Lambda
3012    /// image pull) must NOT be provisioned synchronously inside the request
3013    /// handler — CreateStack returns the StackId immediately and DescribeStacks
3014    /// observes CREATE_IN_PROGRESS -> CREATE_COMPLETE (bug-audit 2026-06-13,
3015    /// 3.1).
3016    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
3017    async fn create_stack_custom_resource_provisions_asynchronously() {
3018        let svc = make_service();
3019        let template = r#"{
3020            "Resources": {
3021                "MyCustom": {
3022                    "Type": "Custom::Thing",
3023                    "Properties": {
3024                        "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:handler"
3025                    }
3026                }
3027            }
3028        }"#;
3029        let mut params = HashMap::new();
3030        params.insert("StackName".to_string(), "async-stack".to_string());
3031        params.insert("TemplateBody".to_string(), template.to_string());
3032        let req = make_request("CreateStack", params);
3033
3034        // CreateStack returns promptly with a StackId; provisioning runs in a
3035        // detached background task. The stack is recorded before the task can
3036        // possibly finish, so right after return it is at worst already
3037        // terminal — never a value that proves the handler blocked on the
3038        // (potentially minutes-long) provisioning loop. The key guarantee is
3039        // that the call returns without running the provisioner inline.
3040        let resp = svc
3041            .create_stack(&req)
3042            .await
3043            .expect("create returns StackId");
3044        assert!(resp.status.is_success());
3045        {
3046            let accounts = svc.state.read();
3047            let stack = accounts
3048                .get("123456789012")
3049                .unwrap()
3050                .stacks
3051                .get("async-stack")
3052                .expect("stack seeded synchronously");
3053            assert!(
3054                stack.status == "CREATE_IN_PROGRESS" || stack.status == "CREATE_COMPLETE",
3055                "unexpected status right after create: {}",
3056                stack.status
3057            );
3058        }
3059
3060        // Poll DescribeStacks (via state) until the background task flips the
3061        // stack to its terminal CREATE_COMPLETE status.
3062        let mut status = String::new();
3063        for _ in 0..200 {
3064            {
3065                let accounts = svc.state.read();
3066                if let Some(stack) = accounts
3067                    .get("123456789012")
3068                    .and_then(|s| s.stacks.get("async-stack"))
3069                {
3070                    status = stack.status.clone();
3071                    if status != "CREATE_IN_PROGRESS" {
3072                        break;
3073                    }
3074                }
3075            }
3076            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3077        }
3078        assert_eq!(
3079            status, "CREATE_COMPLETE",
3080            "stack should reach CREATE_COMPLETE"
3081        );
3082
3083        let accounts = svc.state.read();
3084        let stack = accounts
3085            .get("123456789012")
3086            .unwrap()
3087            .stacks
3088            .get("async-stack")
3089            .unwrap();
3090        assert_eq!(stack.resources.len(), 1);
3091        assert_eq!(stack.resources[0].resource_type, "Custom::Thing");
3092    }
3093
3094    #[tokio::test]
3095    async fn output_getatt_resolves_well_known_attribute() {
3096        // An Output that GetAtts a well-known attribute the create handler does
3097        // not eagerly capture (SQS QueueUrl) must resolve to the live value, not
3098        // a `Queue.QueueUrl` placeholder. Regression for bug-hunt 2026-06-25 1.11:
3099        // resolve_template_outputs reads StackResource.attributes, which now
3100        // carries the live get_att overlay applied during provisioning.
3101        let svc = make_service();
3102        let template = r#"{
3103            "Resources": {
3104                "Queue": { "Type": "AWS::SQS::Queue", "Properties": { "QueueName": "out-q" } }
3105            },
3106            "Outputs": {
3107                "Url": { "Value": { "Fn::GetAtt": ["Queue", "QueueUrl"] } }
3108            }
3109        }"#;
3110        let mut params = HashMap::new();
3111        params.insert("StackName".to_string(), "out-stack".to_string());
3112        params.insert("TemplateBody".to_string(), template.to_string());
3113        svc.create_stack(&make_request("CreateStack", params))
3114            .await
3115            .expect("create returns StackId");
3116
3117        let mut url = String::new();
3118        for _ in 0..200 {
3119            {
3120                let accounts = svc.state.read();
3121                if let Some(stack) = accounts
3122                    .get("123456789012")
3123                    .and_then(|s| s.stacks.get("out-stack"))
3124                {
3125                    if stack.status != "CREATE_IN_PROGRESS" {
3126                        url = stack
3127                            .outputs
3128                            .iter()
3129                            .find(|o| o.key == "Url")
3130                            .map(|o| o.value.clone())
3131                            .unwrap_or_default();
3132                        break;
3133                    }
3134                }
3135            }
3136            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3137        }
3138        assert!(
3139            url.contains("out-q") && url != "Queue.QueueUrl",
3140            "GetAtt QueueUrl output should resolve to the live url, got {url:?}"
3141        );
3142    }
3143
3144    // ── Service error paths ──
3145
3146    #[tokio::test]
3147    async fn create_stack_missing_name_errors() {
3148        let svc = make_service();
3149        let mut params = HashMap::new();
3150        params.insert("TemplateBody".to_string(), "{}".to_string());
3151        let req = make_request("CreateStack", params);
3152        assert!(svc.create_stack(&req).await.is_err());
3153    }
3154
3155    #[tokio::test]
3156    async fn create_stack_missing_template_creates_empty_stack() {
3157        // `TemplateBody` isn't `@required` in Smithy and CreateStack
3158        // declares no `ValidationError` shape, so missing/placeholder
3159        // bodies now create an empty stack rather than rejecting with
3160        // an undeclared wire code (strict-mode conformance gap).
3161        let svc = make_service();
3162        let mut params = HashMap::new();
3163        params.insert("StackName".to_string(), "s".to_string());
3164        let req = make_request("CreateStack", params);
3165        svc.create_stack(&req)
3166            .await
3167            .expect("empty-body create succeeds");
3168    }
3169
3170    #[tokio::test]
3171    async fn create_stack_duplicate_errors() {
3172        let svc = make_service();
3173        let mut params = HashMap::new();
3174        params.insert("StackName".to_string(), "dup".to_string());
3175        params.insert(
3176            "TemplateBody".to_string(),
3177            r#"{"Resources":{"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"dq"}}}}"#
3178                .to_string(),
3179        );
3180        let req = make_request("CreateStack", params.clone());
3181        svc.create_stack(&req).await.unwrap();
3182        let req = make_request("CreateStack", params);
3183        assert!(svc.create_stack(&req).await.is_err());
3184    }
3185
3186    #[tokio::test]
3187    async fn create_stack_invalid_template_creates_empty_stack() {
3188        // CreateStack's Smithy `errors` list has no `ValidationError`
3189        // shape, so unparseable bodies degrade to an empty parsed
3190        // template instead of raising an undeclared wire code.
3191        let svc = make_service();
3192        let mut params = HashMap::new();
3193        params.insert("StackName".to_string(), "bad".to_string());
3194        params.insert("TemplateBody".to_string(), "not json".to_string());
3195        let req = make_request("CreateStack", params);
3196        svc.create_stack(&req)
3197            .await
3198            .expect("bad-body create succeeds");
3199    }
3200
3201    #[tokio::test]
3202    async fn delete_stack_unknown_is_noop() {
3203        let svc = make_service();
3204        let mut params = HashMap::new();
3205        params.insert("StackName".to_string(), "ghost".to_string());
3206        let req = make_request("DeleteStack", params);
3207        assert!(svc.delete_stack(&req).await.is_ok());
3208    }
3209
3210    #[test]
3211    fn describe_stacks_nonexistent_errors() {
3212        // Querying an explicit, unknown StackName returns AWS's
3213        // `ValidationError: Stack with id <name> does not exist` so deploy
3214        // tools that probe stack existence (SAM, `aws cloudformation
3215        // deploy`) get the signal they expect (issue #1646).
3216        let svc = make_service();
3217        let mut params = HashMap::new();
3218        params.insert("StackName".to_string(), "ghost".to_string());
3219        let req = make_request("DescribeStacks", params);
3220        match svc.describe_stacks(&req) {
3221            Ok(_) => panic!("ghost stack must return an error, not an empty list"),
3222            Err(e) => {
3223                assert_eq!(e.status(), StatusCode::BAD_REQUEST);
3224                assert_eq!(e.code(), "ValidationError");
3225                assert!(
3226                    e.message().contains("does not exist"),
3227                    "got: {}",
3228                    e.message()
3229                );
3230            }
3231        }
3232    }
3233
3234    #[test]
3235    fn describe_stacks_empty_returns_all() {
3236        let svc = make_service();
3237        let req = make_request("DescribeStacks", HashMap::new());
3238        let resp = svc.describe_stacks(&req).unwrap();
3239        let b = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
3240        assert!(b.contains("DescribeStacksResult"));
3241    }
3242
3243    #[test]
3244    fn list_stacks_empty_returns_ok() {
3245        let svc = make_service();
3246        let req = make_request("ListStacks", HashMap::new());
3247        let resp = svc.list_stacks(&req).unwrap();
3248        let b = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
3249        assert!(b.contains("ListStacksResult"));
3250    }
3251
3252    #[test]
3253    fn list_stack_resources_missing_name_returns_validation_error() {
3254        // ListStackResources declares no `errors` in Smithy, so any
3255        // AWS-shaped 4xx counts as a handler response. We reject an
3256        // omitted StackName with `ValidationError` to keep negative
3257        // conformance variants honest; unknown-but-supplied names still
3258        // resolve to an empty list (see the test below).
3259        let svc = make_service();
3260        let req = make_request("ListStackResources", HashMap::new());
3261        let err = match svc.list_stack_resources(&req) {
3262            Err(e) => e,
3263            Ok(_) => panic!("omitted StackName must be rejected"),
3264        };
3265        assert_eq!(err.code(), "ValidationError");
3266    }
3267
3268    #[test]
3269    fn list_stack_resources_unknown_stack_returns_empty() {
3270        let svc = make_service();
3271        let mut params = HashMap::new();
3272        params.insert("StackName".to_string(), "ghost".to_string());
3273        let req = make_request("ListStackResources", params);
3274        svc.list_stack_resources(&req).expect("unknown is empty");
3275    }
3276
3277    #[test]
3278    fn describe_stack_resources_missing_name_returns_empty() {
3279        let svc = make_service();
3280        let req = make_request("DescribeStackResources", HashMap::new());
3281        svc.describe_stack_resources(&req)
3282            .expect("missing name is ok");
3283    }
3284
3285    #[test]
3286    fn get_template_missing_name_returns_empty_body() {
3287        let svc = make_service();
3288        let req = make_request("GetTemplate", HashMap::new());
3289        svc.get_template(&req).expect("missing name is ok");
3290    }
3291
3292    #[test]
3293    fn get_template_unknown_stack_returns_empty_body() {
3294        let svc = make_service();
3295        let mut params = HashMap::new();
3296        params.insert("StackName".to_string(), "ghost".to_string());
3297        let req = make_request("GetTemplate", params);
3298        svc.get_template(&req).expect("unknown is empty");
3299    }
3300
3301    #[tokio::test]
3302    async fn update_stack_missing_name_errors() {
3303        let svc = make_service();
3304        let mut params = HashMap::new();
3305        params.insert("TemplateBody".to_string(), "{}".to_string());
3306        let req = make_request("UpdateStack", params);
3307        assert!(svc.update_stack(&req).await.is_err());
3308    }
3309
3310    #[tokio::test]
3311    async fn update_stack_unknown_stack_returns_synthetic_id() {
3312        // UpdateStack declares only `InsufficientCapabilitiesException`
3313        // and `TokenAlreadyExistsException`, neither of which fits
3314        // "stack does not exist". Synthetic conformance inputs target
3315        // a placeholder stack, so we return a synthetic StackId rather
3316        // than an undeclared `ValidationError`. Real callers create
3317        // the stack first.
3318        let svc = make_service();
3319        let mut params = HashMap::new();
3320        params.insert("StackName".to_string(), "ghost".to_string());
3321        params.insert(
3322            "TemplateBody".to_string(),
3323            r#"{"Resources":{}}"#.to_string(),
3324        );
3325        let req = make_request("UpdateStack", params);
3326        let resp = svc
3327            .update_stack(&req)
3328            .await
3329            .expect("ghost update is synthetic");
3330        let b = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
3331        assert!(b.contains("UpdateStackResult"));
3332    }
3333
3334    #[tokio::test]
3335    async fn create_stack_resolves_outputs_and_records_export() {
3336        let svc = make_service();
3337        let template = r#"{
3338            "Resources": {
3339                "Q": {"Type":"AWS::SQS::Queue","Properties":{"QueueName":"out-q"}}
3340            },
3341            "Outputs": {
3342                "QueueUrl": {
3343                    "Value": {"Ref": "Q"},
3344                    "Description": "Url",
3345                    "Export": {"Name": "TheQueueUrl"}
3346                }
3347            }
3348        }"#;
3349        let mut params = HashMap::new();
3350        params.insert("StackName".to_string(), "outs".to_string());
3351        params.insert("TemplateBody".to_string(), template.to_string());
3352        let req = make_request("CreateStack", params);
3353        svc.create_stack(&req).await.expect("create stack");
3354
3355        let accounts = svc.state.read();
3356        let stack = accounts
3357            .get("123456789012")
3358            .unwrap()
3359            .stacks
3360            .get("outs")
3361            .unwrap();
3362        assert_eq!(stack.outputs.len(), 1);
3363        assert_eq!(stack.outputs[0].key, "QueueUrl");
3364        assert_eq!(stack.outputs[0].export_name.as_deref(), Some("TheQueueUrl"));
3365        assert!(!stack.outputs[0].value.is_empty());
3366    }
3367
3368    #[tokio::test]
3369    async fn create_stack_rejects_duplicate_export_name() {
3370        let svc = make_service();
3371        let mk = |name: &str| {
3372            let template = format!(
3373                r#"{{
3374                    "Resources": {{"Q":{{"Type":"AWS::SQS::Queue","Properties":{{"QueueName":"q-{name}"}}}}}},
3375                    "Outputs": {{"QueueUrl":{{"Value":{{"Ref":"Q"}},"Export":{{"Name":"DupExport"}}}}}}
3376                }}"#
3377            );
3378            let mut params = HashMap::new();
3379            params.insert("StackName".to_string(), name.to_string());
3380            params.insert("TemplateBody".to_string(), template);
3381            make_request("CreateStack", params)
3382        };
3383        match svc.create_stack(&mk("first")).await {
3384            Ok(_) => {}
3385            Err(e) => panic!("first stack: {e:?}"),
3386        }
3387        // The second stack's export collides with the first. Since
3388        // provisioning is now asynchronous, the collision can no longer be a
3389        // synchronous CreateStack error — it surfaces as a failed create.
3390        // On the current-thread test runtime the provisioning task runs
3391        // inline, so the stack is already CREATE_FAILED on return.
3392        svc.create_stack(&mk("second"))
3393            .await
3394            .expect("CreateStack returns StackId even when provisioning fails");
3395        let accounts = svc.state.read();
3396        let stack = accounts
3397            .get("123456789012")
3398            .unwrap()
3399            .stacks
3400            .get("second")
3401            .expect("second stack recorded");
3402        assert_eq!(stack.status, "CREATE_FAILED");
3403        // The first stack keeps the export.
3404        let exports = &accounts.get("123456789012").unwrap().exports;
3405        assert_eq!(
3406            exports
3407                .get("DupExport")
3408                .map(|e| e.exporting_stack_name.as_str()),
3409            Some("first")
3410        );
3411    }
3412
3413    #[tokio::test]
3414    async fn import_value_resolves_against_other_stack_export() {
3415        let svc = make_service();
3416
3417        let producer_tpl = r#"{
3418            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"prod-q"}}},
3419            "Outputs": {"Out":{"Value":{"Ref":"Q"},"Export":{"Name":"SharedQueueUrl"}}}
3420        }"#;
3421        let mut p = HashMap::new();
3422        p.insert("StackName".to_string(), "producer".to_string());
3423        p.insert("TemplateBody".to_string(), producer_tpl.to_string());
3424        svc.create_stack(&make_request("CreateStack", p))
3425            .await
3426            .expect("producer");
3427
3428        let consumer_tpl = r#"{
3429            "Resources": {"Q2":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"cons-q"}}},
3430            "Outputs": {"Imp":{"Value":{"Fn::ImportValue":"SharedQueueUrl"}}}
3431        }"#;
3432        let mut p = HashMap::new();
3433        p.insert("StackName".to_string(), "consumer".to_string());
3434        p.insert("TemplateBody".to_string(), consumer_tpl.to_string());
3435        svc.create_stack(&make_request("CreateStack", p))
3436            .await
3437            .expect("consumer");
3438
3439        let accounts = svc.state.read();
3440        let prod_url = accounts
3441            .get("123456789012")
3442            .unwrap()
3443            .stacks
3444            .get("producer")
3445            .unwrap()
3446            .outputs[0]
3447            .value
3448            .clone();
3449        let cons = accounts
3450            .get("123456789012")
3451            .unwrap()
3452            .stacks
3453            .get("consumer")
3454            .unwrap();
3455        assert_eq!(cons.outputs[0].value, prod_url);
3456    }
3457
3458    #[tokio::test]
3459    async fn create_stack_records_export_in_state_registry() {
3460        let svc = make_service();
3461        let template = r#"{
3462            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"reg-q"}}},
3463            "Outputs": {"Url":{"Value":{"Ref":"Q"},"Export":{"Name":"reg-url"}}}
3464        }"#;
3465        let mut params = HashMap::new();
3466        params.insert("StackName".to_string(), "reg".to_string());
3467        params.insert("TemplateBody".to_string(), template.to_string());
3468        svc.create_stack(&make_request("CreateStack", params))
3469            .await
3470            .expect("create");
3471
3472        let accounts = svc.state.read();
3473        let state = accounts.get("123456789012").unwrap();
3474        let export = state
3475            .exports
3476            .get("reg-url")
3477            .expect("export registered in state.exports");
3478        assert_eq!(export.exporting_stack_name, "reg");
3479        assert!(!export.value.is_empty());
3480        assert!(export.exporting_stack_id.contains("reg"));
3481    }
3482
3483    #[tokio::test]
3484    async fn import_value_with_unknown_export_errors() {
3485        let svc = make_service();
3486        let consumer_tpl = r#"{
3487            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{
3488                "QueueName": {"Fn::ImportValue":"missing-export"}
3489            }}}
3490        }"#;
3491        let mut p = HashMap::new();
3492        p.insert("StackName".to_string(), "bad-consumer".to_string());
3493        p.insert("TemplateBody".to_string(), consumer_tpl.to_string());
3494        match svc.create_stack(&make_request("CreateStack", p)).await {
3495            Ok(_) => panic!("expected ValidationError for unknown export"),
3496            Err(e) => {
3497                let msg = format!("{e:?}");
3498                assert!(msg.contains("No export named missing-export"), "got {msg}");
3499            }
3500        }
3501    }
3502
3503    #[tokio::test]
3504    async fn delete_stack_blocked_when_export_in_use_and_unblocked_after_consumer_delete() {
3505        let svc = make_service();
3506
3507        let producer_tpl = r#"{
3508            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"prod"}}},
3509            "Outputs": {"Out":{"Value":{"Ref":"Q"},"Export":{"Name":"my-arn"}}}
3510        }"#;
3511        let mut p = HashMap::new();
3512        p.insert("StackName".to_string(), "producer".to_string());
3513        p.insert("TemplateBody".to_string(), producer_tpl.to_string());
3514        svc.create_stack(&make_request("CreateStack", p))
3515            .await
3516            .expect("producer");
3517
3518        let consumer_tpl = r#"{
3519            "Resources": {"Q2":{"Type":"AWS::SQS::Queue","Properties":{
3520                "QueueName": "cons-q",
3521                "Tags": [{"Key":"k","Value":{"Fn::ImportValue":"my-arn"}}]
3522            }}}
3523        }"#;
3524        let mut p = HashMap::new();
3525        p.insert("StackName".to_string(), "consumer".to_string());
3526        p.insert("TemplateBody".to_string(), consumer_tpl.to_string());
3527        svc.create_stack(&make_request("CreateStack", p))
3528            .await
3529            .expect("consumer");
3530
3531        // Producer delete must fail while consumer still imports.
3532        let mut p = HashMap::new();
3533        p.insert("StackName".to_string(), "producer".to_string());
3534        match svc.delete_stack(&make_request("DeleteStack", p)).await {
3535            Ok(_) => panic!("delete must fail while imports exist"),
3536            Err(e) => {
3537                let msg = format!("{e:?}");
3538                assert!(msg.contains("Export my-arn cannot be deleted"), "got {msg}");
3539            }
3540        }
3541
3542        // Delete consumer first.
3543        let mut p = HashMap::new();
3544        p.insert("StackName".to_string(), "consumer".to_string());
3545        svc.delete_stack(&make_request("DeleteStack", p))
3546            .await
3547            .expect("consumer delete");
3548
3549        // Now producer delete succeeds.
3550        let mut p = HashMap::new();
3551        p.insert("StackName".to_string(), "producer".to_string());
3552        svc.delete_stack(&make_request("DeleteStack", p))
3553            .await
3554            .expect("producer delete after consumer gone");
3555
3556        let accounts = svc.state.read();
3557        let state = accounts.get("123456789012").unwrap();
3558        assert!(state.exports.is_empty(), "exports cleared after delete");
3559        assert!(state.imports.is_empty(), "imports cleared after delete");
3560    }
3561
3562    // ---- CFN provisioner persistence (issue: CFN resources lost on restart) ----
3563
3564    use std::sync::atomic::{AtomicUsize, Ordering};
3565
3566    /// A snapshot hook that counts how many times it fires, standing in for a
3567    /// real service's whole-state persist.
3568    fn counting_hook(counter: Arc<AtomicUsize>) -> fakecloud_persistence::SnapshotHook {
3569        Arc::new(move || {
3570            let counter = counter.clone();
3571            Box::pin(async move {
3572                counter.fetch_add(1, Ordering::SeqCst);
3573            })
3574        })
3575    }
3576
3577    fn disk_s3_store(tmp: &tempfile::TempDir) -> Arc<fakecloud_persistence::s3::DiskS3Store> {
3578        let cache = Arc::new(fakecloud_persistence::cache::BodyCache::new(1024 * 1024));
3579        Arc::new(fakecloud_persistence::s3::DiskS3Store::new(
3580            tmp.path().to_path_buf(),
3581            cache,
3582        ))
3583    }
3584
3585    // A stack touching SQS + SNS (snapshot-backed) and an S3 bucket (S3Store
3586    // write-through). Lambda is registered as a hook below but NOT in the
3587    // template, so it must not fire -- proving per-service selectivity.
3588    const PERSIST_TEMPLATE: &str = r#"{"Resources":{
3589        "Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"cfn-q"}},
3590        "T":{"Type":"AWS::SNS::Topic","Properties":{"TopicName":"cfn-t"}},
3591        "B":{"Type":"AWS::S3::Bucket","Properties":{"BucketName":"cfn-bucket"}}
3592    }}"#;
3593
3594    fn create_req(stack: &str) -> AwsRequest {
3595        let mut p = HashMap::new();
3596        p.insert("StackName".to_string(), stack.to_string());
3597        p.insert("TemplateBody".to_string(), PERSIST_TEMPLATE.to_string());
3598        make_request("CreateStack", p)
3599    }
3600
3601    #[tokio::test]
3602    async fn cfn_create_persists_touched_services_and_writes_bucket_to_store() {
3603        let tmp = tempfile::tempdir().unwrap();
3604        let store = disk_s3_store(&tmp);
3605        let counter = Arc::new(AtomicUsize::new(0));
3606        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3607            BTreeMap::new();
3608        hooks.insert("sqs", counting_hook(counter.clone()));
3609        hooks.insert("sns", counting_hook(counter.clone()));
3610        // Registered but not in the template -> must not fire.
3611        hooks.insert("lambda", counting_hook(counter.clone()));
3612        let svc = make_service()
3613            .with_s3_store(store.clone())
3614            .with_snapshot_hooks(hooks);
3615
3616        svc.create_stack(&create_req("probe")).await.unwrap();
3617
3618        // sqs + sns fired once each; lambda untouched.
3619        assert_eq!(counter.load(Ordering::SeqCst), 2);
3620        // The bucket was written through to the S3 store, not just the in-memory map.
3621        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3622        assert!(
3623            loaded.buckets.contains_key("cfn-bucket"),
3624            "CFN bucket should be persisted to the S3 store"
3625        );
3626    }
3627
3628    #[tokio::test]
3629    async fn cfn_delete_persists_touched_services_and_removes_bucket_from_store() {
3630        let tmp = tempfile::tempdir().unwrap();
3631        let store = disk_s3_store(&tmp);
3632        let counter = Arc::new(AtomicUsize::new(0));
3633        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3634            BTreeMap::new();
3635        hooks.insert("sqs", counting_hook(counter.clone()));
3636        hooks.insert("sns", counting_hook(counter.clone()));
3637        let svc = make_service()
3638            .with_s3_store(store.clone())
3639            .with_snapshot_hooks(hooks);
3640
3641        svc.create_stack(&create_req("probe")).await.unwrap();
3642        assert_eq!(counter.load(Ordering::SeqCst), 2, "create fired sqs + sns");
3643
3644        let mut p = HashMap::new();
3645        p.insert("StackName".to_string(), "probe".to_string());
3646        svc.delete_stack(&make_request("DeleteStack", p))
3647            .await
3648            .unwrap();
3649
3650        // Delete fired the touched services again (sqs + sns).
3651        assert_eq!(counter.load(Ordering::SeqCst), 4, "delete fired sqs + sns");
3652        // And the CFN-deleted bucket is gone from the store, so it does not
3653        // reappear after a restart.
3654        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3655        assert!(
3656            !loaded.buckets.contains_key("cfn-bucket"),
3657            "CFN-deleted bucket should be removed from the S3 store"
3658        );
3659    }
3660
3661    #[tokio::test]
3662    async fn cfn_persist_skips_services_without_a_registered_hook() {
3663        // Only "sqs" has a hook; the stack also touches SNS and S3. The missing
3664        // hooks must be silently skipped (no panic), and "sqs" fires once.
3665        let tmp = tempfile::tempdir().unwrap();
3666        let store = disk_s3_store(&tmp);
3667        let counter = Arc::new(AtomicUsize::new(0));
3668        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3669            BTreeMap::new();
3670        hooks.insert("sqs", counting_hook(counter.clone()));
3671        let svc = make_service()
3672            .with_s3_store(store.clone())
3673            .with_snapshot_hooks(hooks);
3674
3675        svc.create_stack(&create_req("probe")).await.unwrap();
3676        assert_eq!(counter.load(Ordering::SeqCst), 1, "only sqs has a hook");
3677    }
3678
3679    #[tokio::test]
3680    async fn cfn_update_persists_touched_services() {
3681        // Create with just SQS, then update to a template that adds SNS + a
3682        // bucket; the update must persist the services it touches.
3683        let tmp = tempfile::tempdir().unwrap();
3684        let store = disk_s3_store(&tmp);
3685        let counter = Arc::new(AtomicUsize::new(0));
3686        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3687            BTreeMap::new();
3688        hooks.insert("sqs", counting_hook(counter.clone()));
3689        hooks.insert("sns", counting_hook(counter.clone()));
3690        let svc = make_service()
3691            .with_s3_store(store.clone())
3692            .with_snapshot_hooks(hooks);
3693
3694        let mut create = HashMap::new();
3695        create.insert("StackName".to_string(), "upd".to_string());
3696        create.insert(
3697            "TemplateBody".to_string(),
3698            r#"{"Resources":{"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"u-q"}}}}"#
3699                .to_string(),
3700        );
3701        svc.create_stack(&make_request("CreateStack", create))
3702            .await
3703            .unwrap();
3704        let after_create = counter.load(Ordering::SeqCst);
3705
3706        let mut update = HashMap::new();
3707        update.insert("StackName".to_string(), "upd".to_string());
3708        update.insert("TemplateBody".to_string(), PERSIST_TEMPLATE.to_string());
3709        svc.update_stack(&make_request("UpdateStack", update))
3710            .await
3711            .unwrap();
3712
3713        // The update touched at least SNS (added); the hook count must grow.
3714        assert!(
3715            counter.load(Ordering::SeqCst) > after_create,
3716            "update should persist the services it touched"
3717        );
3718        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3719        assert!(loaded.buckets.contains_key("cfn-bucket"));
3720    }
3721
3722    #[tokio::test]
3723    async fn cfn_execute_change_set_persists_touched_services() {
3724        // The changeset path -- CreateChangeSet + ExecuteChangeSet -- is how
3725        // `cdk deploy`, `aws cloudformation deploy`, and SAM provision. It must
3726        // write provisioned services through to disk the same way CreateStack
3727        // does, or the resources report CREATE_COMPLETE yet vanish on restart
3728        // (bug-audit 2026-06-20, 0.A1 / #1766 class).
3729        let tmp = tempfile::tempdir().unwrap();
3730        let store = disk_s3_store(&tmp);
3731        let counter = Arc::new(AtomicUsize::new(0));
3732        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3733            BTreeMap::new();
3734        hooks.insert("sqs", counting_hook(counter.clone()));
3735        let svc = make_service()
3736            .with_s3_store(store.clone())
3737            .with_snapshot_hooks(hooks);
3738
3739        let mut create = HashMap::new();
3740        create.insert("StackName".to_string(), "cs-stack".to_string());
3741        create.insert("ChangeSetName".to_string(), "cs1".to_string());
3742        create.insert("ChangeSetType".to_string(), "CREATE".to_string());
3743        create.insert(
3744            "TemplateBody".to_string(),
3745            r#"{"Resources":{"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"cs-q"}}}}"#
3746                .to_string(),
3747        );
3748        svc.handle(make_request("CreateChangeSet", create))
3749            .await
3750            .unwrap();
3751        // CreateChangeSet doesn't provision -- it must not persist a service yet.
3752        let before = counter.load(Ordering::SeqCst);
3753
3754        let mut exec = HashMap::new();
3755        exec.insert("StackName".to_string(), "cs-stack".to_string());
3756        exec.insert("ChangeSetName".to_string(), "cs1".to_string());
3757        svc.handle(make_request("ExecuteChangeSet", exec))
3758            .await
3759            .unwrap();
3760
3761        assert!(
3762            counter.load(Ordering::SeqCst) > before,
3763            "ExecuteChangeSet must fire the sqs snapshot hook so the provisioned \
3764             queue survives a restart"
3765        );
3766    }
3767
3768    #[test]
3769    fn service_key_for_type_maps_services_and_aliases() {
3770        // Direct service segments.
3771        assert_eq!(
3772            service_key_for_type("AWS::Lambda::Function"),
3773            Some("lambda")
3774        );
3775        assert_eq!(
3776            service_key_for_type("AWS::SecretsManager::Secret"),
3777            Some("secretsmanager")
3778        );
3779        assert_eq!(service_key_for_type("AWS::SQS::Queue"), Some("sqs"));
3780        assert_eq!(service_key_for_type("AWS::IAM::Role"), Some("iam"));
3781        assert_eq!(
3782            service_key_for_type("AWS::StepFunctions::StateMachine"),
3783            Some("stepfunctions")
3784        );
3785        // Namespace aliases that differ from the fakecloud service name.
3786        assert_eq!(
3787            service_key_for_type("AWS::Events::Rule"),
3788            Some("eventbridge")
3789        );
3790        assert_eq!(service_key_for_type("AWS::Logs::LogGroup"), Some("logs"));
3791        assert_eq!(
3792            service_key_for_type("AWS::ElastiCache::CacheCluster"),
3793            Some("elasticache")
3794        );
3795        // S3 has no snapshot hook (it persists via the S3Store write-through).
3796        assert_eq!(service_key_for_type("AWS::S3::Bucket"), None);
3797        // Snapshot-backed services whose CFN namespace differs from the
3798        // fakecloud service name (these were missing from the map, #1766 class).
3799        assert_eq!(
3800            service_key_for_type("AWS::CertificateManager::Certificate"),
3801            Some("acm")
3802        );
3803        assert_eq!(
3804            service_key_for_type("AWS::ElasticLoadBalancingV2::LoadBalancer"),
3805            Some("elbv2")
3806        );
3807        assert_eq!(
3808            service_key_for_type("AWS::CloudFront::Distribution"),
3809            Some("cloudfront")
3810        );
3811        assert_eq!(
3812            service_key_for_type("AWS::Route53::HostedZone"),
3813            Some("route53")
3814        );
3815        assert_eq!(
3816            service_key_for_type("AWS::KinesisFirehose::DeliveryStream"),
3817            Some("firehose")
3818        );
3819        assert_eq!(service_key_for_type("AWS::Glue::Database"), Some("glue"));
3820        assert_eq!(service_key_for_type("AWS::WAFv2::WebACL"), Some("wafv2"));
3821        assert_eq!(
3822            service_key_for_type("AWS::Athena::WorkGroup"),
3823            Some("athena")
3824        );
3825        assert_eq!(
3826            service_key_for_type("AWS::Organizations::Organization"),
3827            Some("organizations")
3828        );
3829        // Malformed / non-AWS types.
3830        assert_eq!(service_key_for_type("AWS::Lambda"), None);
3831        assert_eq!(service_key_for_type("Custom::Thing::Resource"), None);
3832        assert_eq!(service_key_for_type("AWS"), None);
3833        assert_eq!(service_key_for_type(""), None);
3834    }
3835
3836    #[tokio::test]
3837    async fn persist_touched_services_noop_with_empty_hooks() {
3838        // No registered hooks -> nothing to do, must not panic.
3839        let hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> = BTreeMap::new();
3840        persist_touched_services(&hooks, vec!["AWS::SQS::Queue".to_string()]).await;
3841    }
3842
3843    #[tokio::test]
3844    async fn cfn_bucket_policy_write_through_create_update_delete() {
3845        let tmp = tempfile::tempdir().unwrap();
3846        let store = disk_s3_store(&tmp);
3847        let svc = make_service().with_s3_store(store.clone());
3848
3849        // Create a bucket + bucket policy.
3850        let mut create = HashMap::new();
3851        create.insert("StackName".to_string(), "pol".to_string());
3852        create.insert(
3853            "TemplateBody".to_string(),
3854            r#"{"Resources":{
3855                "B":{"Type":"AWS::S3::Bucket","Properties":{"BucketName":"pol-bucket"}},
3856                "BP":{"Type":"AWS::S3::BucketPolicy","Properties":{"Bucket":"pol-bucket","PolicyDocument":{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:GetObject","Resource":"*","Principal":"*"}]}}}
3857            }}"#
3858            .to_string(),
3859        );
3860        svc.create_stack(&make_request("CreateStack", create))
3861            .await
3862            .unwrap();
3863        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3864        let policy = loaded.buckets["pol-bucket"]
3865            .subresources
3866            .get("policy.toml")
3867            .cloned()
3868            .expect("bucket policy persisted on create");
3869        assert!(policy.contains("s3:GetObject"));
3870
3871        // Update the policy document; the update must write through.
3872        let mut update = HashMap::new();
3873        update.insert("StackName".to_string(), "pol".to_string());
3874        update.insert(
3875            "TemplateBody".to_string(),
3876            r#"{"Resources":{
3877                "B":{"Type":"AWS::S3::Bucket","Properties":{"BucketName":"pol-bucket"}},
3878                "BP":{"Type":"AWS::S3::BucketPolicy","Properties":{"Bucket":"pol-bucket","PolicyDocument":{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:PutObject","Resource":"*","Principal":"*"}]}}}
3879            }}"#
3880            .to_string(),
3881        );
3882        svc.update_stack(&make_request("UpdateStack", update))
3883            .await
3884            .unwrap();
3885        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3886        let policy = loaded.buckets["pol-bucket"]
3887            .subresources
3888            .get("policy.toml")
3889            .cloned()
3890            .expect("bucket policy still persisted after update");
3891        assert!(
3892            policy.contains("s3:PutObject"),
3893            "updated policy should be written through"
3894        );
3895
3896        // Delete the stack; the bucket (and its policy) must be removed from disk.
3897        let mut del = HashMap::new();
3898        del.insert("StackName".to_string(), "pol".to_string());
3899        svc.delete_stack(&make_request("DeleteStack", del))
3900            .await
3901            .unwrap();
3902        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3903        assert!(
3904            !loaded.buckets.contains_key("pol-bucket"),
3905            "CFN-deleted bucket and policy should be gone from the store"
3906        );
3907    }
3908}