Skip to main content

fakecloud_cloudformation/
service.rs

1use async_trait::async_trait;
2use chrono::Utc;
3use http::StatusCode;
4use std::collections::{BTreeMap, BTreeSet};
5use std::sync::Arc;
6
7use fakecloud_core::delivery::DeliveryBus;
8use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
9use fakecloud_dynamodb::SharedDynamoDbState;
10use fakecloud_eventbridge::SharedEventBridgeState;
11use fakecloud_iam::SharedIamState;
12use fakecloud_logs::SharedLogsState;
13use fakecloud_persistence::{S3Store, SnapshotHook, SnapshotStore};
14use fakecloud_s3::SharedS3State;
15use fakecloud_sns::SharedSnsState;
16use fakecloud_sqs::SharedSqsState;
17use fakecloud_ssm::SharedSsmState;
18use tokio::sync::Mutex as AsyncMutex;
19
20use crate::resource_provisioner::ResourceProvisioner;
21use crate::state;
22use crate::state::{
23    CloudFormationSnapshot, CloudFormationState, SharedCloudFormationState, Stack, StackResource,
24    CLOUDFORMATION_SNAPSHOT_SCHEMA_VERSION,
25};
26use crate::template;
27use crate::xml_responses;
28
29/// Canonical `Fn::GetAtt` attribute names per resource type. Used to
30/// supplement the eagerly-captured `ProvisionResult::attributes` with
31/// live-state lookups via `ResourceProvisioner::get_att`. Resources not
32/// listed here just use whatever the create handler captured.
33fn well_known_attributes_for(resource_type: &str) -> &'static [&'static str] {
34    match resource_type {
35        "AWS::S3::Bucket" => &[
36            "Arn",
37            "DomainName",
38            "RegionalDomainName",
39            "DualStackDomainName",
40            "WebsiteURL",
41        ],
42        "AWS::Lambda::Function" => &["Arn", "FunctionUrl", "Version"],
43        "AWS::IAM::Role" => &["Arn", "RoleId"],
44        "AWS::SQS::Queue" => &["Arn", "QueueName", "QueueUrl"],
45        "AWS::SNS::Topic" => &["TopicArn", "TopicName"],
46        "AWS::DynamoDB::Table" => &["Arn", "StreamArn"],
47        "AWS::KMS::Key" => &["Arn", "KeyId"],
48        "AWS::SecretsManager::Secret" => &["Arn", "Id"],
49        "AWS::CloudFront::Distribution" => &["DomainName", "Id"],
50        "AWS::EC2::VPC" => &["VpcId", "CidrBlock"],
51        "AWS::EC2::Subnet" => &["SubnetId", "AvailabilityZone", "VpcId", "CidrBlock"],
52        "AWS::EC2::SecurityGroup" => &["GroupId", "VpcId"],
53        "AWS::EC2::InternetGateway" => &["InternetGatewayId"],
54        "AWS::EC2::RouteTable" => &["RouteTableId"],
55        "AWS::Pipes::Pipe" => &["Arn"],
56        _ => &[],
57    }
58}
59
60/// Map a CloudFormation resource type (`AWS::<Service>::<Resource>`) to the
61/// snapshot-hook key for its owning service (the keys of
62/// `CloudFormationDeps::snapshot_hooks`). The key is the service segment, with
63/// aliases for the few services whose CloudFormation namespace differs from
64/// their fakecloud service name (e.g. `Events` -> `eventbridge`).
65///
66/// `AWS::S3::*` is intentionally absent: S3 buckets persist through the
67/// `S3Store` write-through path in the provisioner, not a whole-state snapshot
68/// hook. Returns `None` for malformed types and any service whose state is not
69/// snapshot-backed (those CFN resources have no persistence path either way).
70fn service_key_for_type(resource_type: &str) -> Option<&'static str> {
71    let mut parts = resource_type.split("::");
72    let vendor = parts.next()?;
73    let service = parts.next()?;
74    // A real resource type has three segments; a 2-part string (or fewer) is
75    // not one.
76    parts.next()?;
77    if vendor != "AWS" {
78        return None;
79    }
80    Some(match service {
81        "Lambda" => "lambda",
82        "SecretsManager" => "secretsmanager",
83        "SQS" => "sqs",
84        "SNS" => "sns",
85        "DynamoDB" => "dynamodb",
86        "StepFunctions" => "stepfunctions",
87        "Events" => "eventbridge",
88        "SSM" => "ssm",
89        "Logs" => "logs",
90        "KMS" => "kms",
91        "Kinesis" => "kinesis",
92        "SES" => "ses",
93        "Cognito" => "cognito",
94        "RDS" => "rds",
95        "ElastiCache" => "elasticache",
96        "ECR" => "ecr",
97        "ECS" => "ecs",
98        "CloudWatch" => "cloudwatch",
99        "ApiGateway" => "apigateway",
100        "ApiGatewayV2" => "apigatewayv2",
101        "Bedrock" => "bedrock",
102        "Scheduler" => "scheduler",
103        "IAM" => "iam",
104        // Services whose CloudFormation namespace differs from their fakecloud
105        // service name, and which were missing from this table: their
106        // provisioner mutates snapshot-backed state directly, so CFN-created
107        // (and CFN-deleted) resources vanished on restart while their
108        // direct-API equivalents persisted (#1766 class). Each has a registered
109        // snapshot hook in the server.
110        "CertificateManager" => "acm",
111        "ElasticLoadBalancingV2" => "elbv2",
112        "CloudFront" => "cloudfront",
113        "Route53" => "route53",
114        "KinesisFirehose" => "firehose",
115        "Glue" => "glue",
116        "WAFv2" => "wafv2",
117        "Athena" => "athena",
118        "Organizations" => "organizations",
119        // EC2 and ApplicationAutoScaling were wired into the provisioner after
120        // this table was last audited (EC2: 2026-06-25 #1957;
121        // application-autoscaling: persistence sweep), so their CFN-created VPC
122        // / Subnet / SecurityGroup / scalable-target state mutated in memory and
123        // vanished on restart (#1766 class). Both have a registered snapshot
124        // hook in the server.
125        "EC2" => "ec2",
126        "AutoScaling" => "autoscaling",
127        "Batch" => "batch",
128        "ApplicationAutoScaling" => "application-autoscaling",
129        "Pipes" => "pipes",
130        "EKS" => "eks",
131        "ServiceDiscovery" => "servicediscovery",
132        _ => return None,
133    })
134}
135
136/// Persist each distinct snapshot-backed service touched by a stack op, once.
137///
138/// `resource_types` is the set of `resource_type` strings the op created /
139/// updated / deleted. The CloudFormation provisioner mutates services' shared
140/// state directly and never triggers their snapshot path; this writes that
141/// state through to disk afterwards, so a CFN-provisioned (or CFN-deleted)
142/// resource survives a restart. Services with no registered hook (memory mode,
143/// or non-snapshot-backed services) are skipped.
144async fn persist_touched_services<I>(
145    hooks: &BTreeMap<&'static str, SnapshotHook>,
146    resource_types: I,
147) where
148    I: IntoIterator<Item = String>,
149{
150    if hooks.is_empty() {
151        return;
152    }
153    let mut keys: BTreeSet<&'static str> = BTreeSet::new();
154    for ty in resource_types {
155        if let Some(key) = service_key_for_type(&ty) {
156            keys.insert(key);
157        }
158    }
159    for key in keys {
160        if let Some(hook) = hooks.get(key) {
161            hook().await;
162        }
163    }
164}
165
166/// Multi-pass provisioning for all resources in a parsed template.
167///
168/// Resources may `Ref` each other in either direction, and JSON object
169/// iteration order isn't stable, so a single forward pass isn't enough
170/// to resolve them. We loop: each pass tries every pending resource, and
171/// any resource whose `Ref` targets are still unknown just stays pending
172/// for the next pass. When no pass makes progress we report the first
173/// pending failure and rollback.
174pub(crate) fn provision_stack_resources(
175    provisioner: &ResourceProvisioner,
176    resource_defs: &[template::ResourceDefinition],
177    template_body: &str,
178    parameters: &BTreeMap<String, String>,
179    imports: &BTreeMap<String, String>,
180) -> Result<Vec<StackResource>, AwsServiceError> {
181    let mut resources = Vec::new();
182    let mut physical_ids: BTreeMap<String, String> = BTreeMap::new();
183    let mut attributes: BTreeMap<String, BTreeMap<String, String>> = BTreeMap::new();
184    // Seed the work list in dependency order so a referenced resource is
185    // provisioned before its referrer and `Ref`/`GetAtt`/`Fn::Sub` resolve to
186    // physical ids. The fixed-point retry loop below still recovers from any
187    // residual ordering the graph can't express.
188    let order = template::dependency_order(template_body, parameters, resource_defs);
189    let mut pending: Vec<&template::ResourceDefinition> =
190        order.iter().map(|&i| &resource_defs[i]).collect();
191    let max_passes = pending.len() + 1;
192
193    for _ in 0..max_passes {
194        if pending.is_empty() {
195            break;
196        }
197        let mut still_pending = Vec::new();
198        let mut made_progress = false;
199
200        for resource_def in pending {
201            let resolved_def = template::resolve_resource_properties_with_attrs(
202                resource_def,
203                template_body,
204                parameters,
205                &physical_ids,
206                &attributes,
207                imports,
208            )
209            .map_err(|e| {
210                // `ValidationError` isn't declared on CreateStack/UpdateStack;
211                // surface template resolve failures through the closest declared
212                // shape (`InsufficientCapabilitiesException`) instead.
213                AwsServiceError::aws_error(
214                    StatusCode::BAD_REQUEST,
215                    "InsufficientCapabilitiesException",
216                    e,
217                )
218            })?;
219
220            match provisioner.create_resource(&resolved_def) {
221                Ok(mut stack_resource) => {
222                    physical_ids.insert(
223                        stack_resource.logical_id.clone(),
224                        stack_resource.physical_id.clone(),
225                    );
226                    // Start with the eagerly-captured attribute set, then
227                    // overlay anything the provisioner can resolve from
228                    // live state (e.g. attributes that depend on side-effects
229                    // recorded after the create handler returned).
230                    let mut attr_map = stack_resource.attributes.clone();
231                    for attr in well_known_attributes_for(&stack_resource.resource_type) {
232                        if attr_map.contains_key(*attr) {
233                            continue;
234                        }
235                        if let Some(v) = provisioner.get_att(&stack_resource, attr) {
236                            attr_map.insert((*attr).to_string(), v);
237                        }
238                    }
239                    attributes.insert(stack_resource.logical_id.clone(), attr_map.clone());
240                    // Persist the live-resolved attributes onto the stored
241                    // resource so later readers — notably resolve_template_outputs,
242                    // which reads StackResource.attributes directly without the
243                    // overlay — see the full GetAtt set, not just the eager subset.
244                    stack_resource.attributes = attr_map;
245                    resources.push(stack_resource);
246                    made_progress = true;
247                }
248                Err(_) => still_pending.push(resource_def),
249            }
250        }
251
252        pending = still_pending;
253        if !made_progress && !pending.is_empty() {
254            // No progress — report the first failure and rollback anything
255            // we already created.
256            let resource_def = pending[0];
257            let resolved_def = template::resolve_resource_properties_with_attrs(
258                resource_def,
259                template_body,
260                parameters,
261                &physical_ids,
262                &attributes,
263                imports,
264            )
265            .unwrap_or_else(|_| resource_def.clone());
266            let err = provisioner.create_resource(&resolved_def).unwrap_err();
267            for r in &resources {
268                let _ = provisioner.delete_resource(r);
269            }
270            return Err(AwsServiceError::aws_error(
271                StatusCode::BAD_REQUEST,
272                "ValidationError",
273                format!(
274                    "Failed to create resource {}: {err}",
275                    resource_def.logical_id
276                ),
277            ));
278        }
279    }
280
281    Ok(resources)
282}
283
284/// State references for every service CloudFormation can provision resources in.
285/// The result of a Cloud Control API resource provisioning call: the resource's
286/// primary identifier plus its `GetAtt`-resolvable attributes. Kept free of the
287/// crate-internal `StackResource` type so the `cloudcontrol` crate can consume
288/// it directly.
289#[derive(Debug, Clone)]
290pub struct CloudControlOutcome {
291    pub physical_id: String,
292    pub attributes: BTreeMap<String, String>,
293}
294
295impl CloudControlOutcome {
296    fn from(resource: &StackResource) -> Self {
297        Self {
298            physical_id: resource.physical_id.clone(),
299            attributes: resource.attributes.clone(),
300        }
301    }
302}
303
304/// Rebuild the `StackResource` shape the update/delete handlers expect from the
305/// identity Cloud Control persists between calls.
306fn reconstruct_stack_resource(
307    type_name: &str,
308    physical_id: &str,
309    attributes: &BTreeMap<String, String>,
310) -> StackResource {
311    StackResource {
312        logical_id: "Resource".to_string(),
313        physical_id: physical_id.to_string(),
314        resource_type: type_name.to_string(),
315        status: "CREATE_COMPLETE".to_string(),
316        service_token: None,
317        attributes: attributes.clone(),
318    }
319}
320
321pub struct CloudFormationDeps {
322    pub sqs: SharedSqsState,
323    pub sns: SharedSnsState,
324    pub ssm: SharedSsmState,
325    pub iam: SharedIamState,
326    pub s3: SharedS3State,
327    pub eventbridge: SharedEventBridgeState,
328    pub dynamodb: SharedDynamoDbState,
329    pub logs: SharedLogsState,
330    pub lambda: fakecloud_lambda::SharedLambdaState,
331    pub secretsmanager: fakecloud_secretsmanager::SharedSecretsManagerState,
332    pub kinesis: fakecloud_kinesis::SharedKinesisState,
333    pub kms: fakecloud_kms::SharedKmsState,
334    pub ecr: fakecloud_ecr::SharedEcrState,
335    pub cloudwatch: fakecloud_cloudwatch::SharedCloudWatchState,
336    pub elbv2: fakecloud_elbv2::SharedElbv2State,
337    pub organizations: fakecloud_organizations::SharedOrganizationsState,
338    pub cognito: fakecloud_cognito::SharedCognitoState,
339    pub rds: fakecloud_rds::SharedRdsState,
340    pub ec2: fakecloud_ec2::SharedEc2State,
341    pub autoscaling: fakecloud_autoscaling::SharedAutoScalingState,
342    pub batch: fakecloud_batch::SharedBatchState,
343    pub pipes: fakecloud_pipes::SharedPipesState,
344    pub ecs: fakecloud_ecs::SharedEcsState,
345    pub acm: fakecloud_acm::SharedAcmState,
346    pub elasticache: fakecloud_elasticache::SharedElastiCacheState,
347    pub route53: fakecloud_route53::SharedRoute53State,
348    pub cloudfront: fakecloud_cloudfront::SharedCloudFrontState,
349    pub stepfunctions: fakecloud_stepfunctions::SharedStepFunctionsState,
350    pub wafv2: fakecloud_wafv2::SharedWafv2State,
351    pub apigateway: fakecloud_apigateway::SharedApiGatewayState,
352    pub apigatewayv2: fakecloud_apigatewayv2::SharedApiGatewayV2State,
353    pub ses: fakecloud_ses::SharedSesState,
354    pub application_autoscaling:
355        fakecloud_application_autoscaling::SharedApplicationAutoScalingState,
356    pub athena: fakecloud_athena::SharedAthenaState,
357    pub firehose: fakecloud_firehose::SharedFirehoseState,
358    pub glue: fakecloud_glue::SharedGlueState,
359    pub eks: fakecloud_eks::state::SharedEksState,
360    pub servicediscovery: fakecloud_servicediscovery::state::SharedServiceDiscoveryState,
361    pub delivery: Arc<DeliveryBus>,
362    /// Lambda container runtime, when Docker/Podman is available. Used to
363    /// pre-pull the runtime image of a CFN-provisioned `AWS::Lambda::Function`
364    /// in the background so its first Invoke doesn't pay the cold-pull cost
365    /// (the #1539 timeout, through the CloudFormation door). `None` when no
366    /// runtime is configured — provisioning still works, the first Invoke just
367    /// falls back to a cold pull.
368    pub lambda_runtime: Option<Arc<fakecloud_lambda::runtime::ContainerRuntime>>,
369    /// Container runtimes for the stateful services whose CFN-provisioned
370    /// resources must be backed by REAL containers (not phantom metadata).
371    /// When present, the provisioner inserts the resource record synchronously
372    /// (so `Ref`/`GetAtt` resolve during provisioning) and then backs it with a
373    /// real container in the background — the same container the direct API
374    /// path spawns. `None` (no Docker/Podman, e.g. CI) keeps the metadata-only
375    /// behavior, matching how the direct API degrades without a runtime.
376    pub rds_runtime: Option<Arc<fakecloud_rds::runtime::RdsRuntime>>,
377    pub ec2_runtime: Option<Arc<fakecloud_ec2::runtime::Ec2Runtime>>,
378    pub ecs_runtime: Option<Arc<fakecloud_ecs::runtime::EcsRuntime>>,
379    pub elasticache_runtime: Option<Arc<fakecloud_elasticache::runtime::ElastiCacheRuntime>>,
380}
381
382pub struct CloudFormationService {
383    pub(crate) state: SharedCloudFormationState,
384    pub(crate) deps: CloudFormationDeps,
385    snapshot_store: Option<Arc<dyn SnapshotStore>>,
386    snapshot_lock: Arc<AsyncMutex<()>>,
387    /// Fine-grained S3 disk store. CFN bucket create/delete writes through this
388    /// (the same path the real `CreateBucket`/`DeleteBucket` use) instead of
389    /// only mutating the in-memory map, so a CFN-provisioned bucket survives a
390    /// restart. Defaults to a `MemoryS3Store` (no-op); the server wires the
391    /// real store via `with_s3_store` once it has been built.
392    s3_store: Arc<dyn S3Store>,
393    /// Whole-state snapshot persist hooks keyed by service name (see
394    /// `service_key_for_type`). After a stack op the handler invokes the hook
395    /// for each touched service so a CFN-provisioned (or CFN-deleted) resource
396    /// is written through to disk, the same persistence a direct mutating API
397    /// call would trigger. Empty by default (memory mode, or no services
398    /// wired); the server populates it via `with_snapshot_hooks`.
399    snapshot_hooks: BTreeMap<&'static str, SnapshotHook>,
400}
401
402/// Everything the async CreateStack provisioning task needs to provision
403/// resources and flip the stack to its terminal status. Bundled into one
404/// owned struct so it can move into a detached `tokio::spawn`.
405struct CreateStackContext {
406    state: SharedCloudFormationState,
407    delivery: Arc<DeliveryBus>,
408    snapshot_store: Option<Arc<dyn SnapshotStore>>,
409    snapshot_lock: Arc<AsyncMutex<()>>,
410    snapshot_hooks: BTreeMap<&'static str, SnapshotHook>,
411    provisioner: ResourceProvisioner,
412    account_id: String,
413    stack_name: String,
414    stack_id: String,
415    template_body: String,
416    parameters: BTreeMap<String, String>,
417    notification_arns: Vec<String>,
418    imported_names: Vec<String>,
419    resource_defs: Vec<template::ResourceDefinition>,
420}
421
422/// Owned clones of the service-state + container-runtime handles a provisioner
423/// holds, so the spawn / teardown drains can `tokio::spawn` REAL container work
424/// after the provisioner itself has been moved (CreateStack moves it into
425/// `spawn_blocking`) or after the CFN state lock has been dropped (the
426/// changeset/update/delete paths). Centralizes the per-resource-type match that
427/// backs a freshly-provisioned container resource or reaps a deleted one, so the
428/// CreateStack / ExecuteChangeSet / UpdateStack / DeleteStack paths stay in
429/// lockstep (the #2031-#2034 create-side hardening reaching every path).
430pub(crate) struct ContainerBackingHandles {
431    account_id: String,
432    region: String,
433    rds_state: fakecloud_rds::SharedRdsState,
434    rds_runtime: Option<Arc<fakecloud_rds::runtime::RdsRuntime>>,
435    ec2_state: fakecloud_ec2::SharedEc2State,
436    ec2_runtime: Option<Arc<fakecloud_ec2::runtime::Ec2Runtime>>,
437    autoscaling_state: fakecloud_autoscaling::SharedAutoScalingState,
438    elasticache_state: fakecloud_elasticache::SharedElastiCacheState,
439    elasticache_runtime: Option<Arc<fakecloud_elasticache::runtime::ElastiCacheRuntime>>,
440    ecs_state: fakecloud_ecs::SharedEcsState,
441    ecs_runtime: Option<Arc<fakecloud_ecs::runtime::EcsRuntime>>,
442}
443
444impl ContainerBackingHandles {
445    pub(crate) fn from_provisioner(p: &ResourceProvisioner) -> Self {
446        Self {
447            account_id: p.account_id.clone(),
448            region: p.region.clone(),
449            rds_state: p.rds_state.clone(),
450            rds_runtime: p.rds_runtime.clone(),
451            ec2_state: p.ec2_state.clone(),
452            ec2_runtime: p.ec2_runtime.clone(),
453            autoscaling_state: p.autoscaling_state.clone(),
454            elasticache_state: p.elasticache_state.clone(),
455            elasticache_runtime: p.elasticache_runtime.clone(),
456            ecs_state: p.ecs_state.clone(),
457            ecs_runtime: p.ecs_runtime.clone(),
458        }
459    }
460
461    /// Back each freshly-inserted container resource with a REAL container in a
462    /// detached task, so the stack op never blocks on a container boot/pull (the
463    /// #1539/#1730 timeout lesson). Drains the provisioner's queued spawn intents.
464    pub(crate) fn spawn_container_intents(
465        &self,
466        intents: Vec<crate::resource_provisioner::ContainerSpawnIntent>,
467    ) {
468        use crate::resource_provisioner::ContainerSpawnIntent;
469        for intent in intents {
470            match intent {
471                ContainerSpawnIntent::RdsInstance { identifier } => {
472                    if let Some(runtime) = self.rds_runtime.clone() {
473                        let rds_state = self.rds_state.clone();
474                        let account = self.account_id.clone();
475                        let region = self.region.clone();
476                        tokio::spawn(async move {
477                            fakecloud_rds::cfn_provision::cfn_ensure_instance_container(
478                                rds_state, runtime, identifier, account, region,
479                            )
480                            .await;
481                        });
482                    }
483                }
484                ContainerSpawnIntent::AsgInstances { group_name } => {
485                    let asg_state = self.autoscaling_state.clone();
486                    let ec2_state = self.ec2_state.clone();
487                    let ec2_runtime = self.ec2_runtime.clone();
488                    let account = self.account_id.clone();
489                    let region = self.region.clone();
490                    tokio::spawn(async move {
491                        fakecloud_autoscaling::cfn_provision::cfn_reconcile_capacity(
492                            asg_state,
493                            ec2_state,
494                            ec2_runtime,
495                            group_name,
496                            account,
497                            region,
498                        )
499                        .await;
500                    });
501                }
502                ContainerSpawnIntent::Ec2Instance { instance_id } => {
503                    let ec2_state = self.ec2_state.clone();
504                    let ec2_runtime = self.ec2_runtime.clone();
505                    let account = self.account_id.clone();
506                    tokio::spawn(async move {
507                        fakecloud_ec2::cfn_provision::cfn_back_instance(
508                            ec2_state,
509                            ec2_runtime,
510                            account,
511                            instance_id,
512                        )
513                        .await;
514                    });
515                }
516                ContainerSpawnIntent::ElastiCacheCluster { cache_cluster_id } => {
517                    if let Some(runtime) = self.elasticache_runtime.clone() {
518                        let ec_state = self.elasticache_state.clone();
519                        let account = self.account_id.clone();
520                        tokio::spawn(async move {
521                            fakecloud_elasticache::cfn_provision::cfn_ensure_cluster_container(
522                                ec_state,
523                                runtime,
524                                cache_cluster_id,
525                                account,
526                            )
527                            .await;
528                        });
529                    }
530                }
531                ContainerSpawnIntent::ElastiCacheReplicationGroup {
532                    replication_group_id,
533                } => {
534                    if let Some(runtime) = self.elasticache_runtime.clone() {
535                        let ec_state = self.elasticache_state.clone();
536                        let account = self.account_id.clone();
537                        tokio::spawn(async move {
538                            fakecloud_elasticache::cfn_provision::cfn_ensure_replication_group_container(
539                                ec_state,
540                                runtime,
541                                replication_group_id,
542                                account,
543                            )
544                            .await;
545                        });
546                    }
547                }
548                ContainerSpawnIntent::EcsServiceTasks {
549                    cluster_name,
550                    service_name,
551                } => {
552                    if let Some(runtime) = self.ecs_runtime.clone() {
553                        let ecs_state = self.ecs_state.clone();
554                        let account = self.account_id.clone();
555                        tokio::spawn(async move {
556                            fakecloud_ecs::cfn_provision::cfn_launch_service_tasks(
557                                ecs_state,
558                                runtime,
559                                cluster_name,
560                                service_name,
561                                account,
562                            )
563                            .await;
564                        });
565                    }
566                }
567            }
568        }
569    }
570
571    /// Reap the REAL backing container for each deleted container resource in a
572    /// detached task, so a stack delete (or update-removed) never leaks a
573    /// running RDS / ElastiCache / ECS / EC2 container. Drains the provisioner's
574    /// queued teardown intents.
575    pub(crate) fn spawn_teardown_intents(
576        &self,
577        intents: Vec<crate::resource_provisioner::ContainerTeardownIntent>,
578    ) {
579        use crate::resource_provisioner::ContainerTeardownIntent;
580        for intent in intents {
581            match intent {
582                ContainerTeardownIntent::RdsInstance { identifier } => {
583                    if let Some(runtime) = self.rds_runtime.clone() {
584                        let account = self.account_id.clone();
585                        tokio::spawn(async move {
586                            fakecloud_rds::cfn_provision::cfn_teardown_instance_container(
587                                runtime, identifier, account,
588                            )
589                            .await;
590                        });
591                    }
592                }
593                ContainerTeardownIntent::ElastiCacheCluster { cache_cluster_id } => {
594                    if let Some(runtime) = self.elasticache_runtime.clone() {
595                        tokio::spawn(async move {
596                            fakecloud_elasticache::cfn_provision::cfn_teardown_cluster_container(
597                                runtime,
598                                cache_cluster_id,
599                            )
600                            .await;
601                        });
602                    }
603                }
604                ContainerTeardownIntent::ElastiCacheReplicationGroup {
605                    replication_group_id,
606                } => {
607                    if let Some(runtime) = self.elasticache_runtime.clone() {
608                        tokio::spawn(async move {
609                            fakecloud_elasticache::cfn_provision::cfn_teardown_replication_group_container(
610                                runtime,
611                                replication_group_id,
612                            )
613                            .await;
614                        });
615                    }
616                }
617                ContainerTeardownIntent::EcsService {
618                    cluster_name,
619                    service_name,
620                } => {
621                    if let Some(runtime) = self.ecs_runtime.clone() {
622                        let ecs_state = self.ecs_state.clone();
623                        let account = self.account_id.clone();
624                        tokio::spawn(async move {
625                            fakecloud_ecs::cfn_provision::cfn_stop_service_tasks(
626                                ecs_state,
627                                runtime,
628                                cluster_name,
629                                service_name,
630                                account,
631                            )
632                            .await;
633                        });
634                    }
635                }
636                ContainerTeardownIntent::Ec2Instance { instance_id } => {
637                    let ec2_state = self.ec2_state.clone();
638                    let ec2_runtime = self.ec2_runtime.clone();
639                    let account = self.account_id.clone();
640                    let region = self.region.clone();
641                    tokio::spawn(async move {
642                        fakecloud_ec2::cfn_provision::cfn_terminate(
643                            ec2_state,
644                            ec2_runtime,
645                            account,
646                            region,
647                            instance_id,
648                        )
649                        .await;
650                    });
651                }
652                ContainerTeardownIntent::AsgInstances { instance_ids } => {
653                    let asg_state = self.autoscaling_state.clone();
654                    let ec2_state = self.ec2_state.clone();
655                    let ec2_runtime = self.ec2_runtime.clone();
656                    let account = self.account_id.clone();
657                    let region = self.region.clone();
658                    tokio::spawn(async move {
659                        fakecloud_autoscaling::cfn_provision::cfn_terminate_instances(
660                            asg_state,
661                            ec2_state,
662                            ec2_runtime,
663                            instance_ids,
664                            account,
665                            region,
666                        )
667                        .await;
668                    });
669                }
670            }
671        }
672    }
673}
674
675/// Drain a provisioner's queued custom-resource Lambda invokes (`Custom::*`),
676/// running each off the request path in a detached task so a cold image pull
677/// never blocks the client or stalls the CFN state lock. Used by the
678/// changeset/update/delete paths (where `defer_custom_invokes` is set).
679pub(crate) fn spawn_custom_invokes(provisioner: &ResourceProvisioner) {
680    let intents = std::mem::take(&mut *provisioner.pending_custom_invokes.lock());
681    if intents.is_empty() {
682        return;
683    }
684    let delivery = provisioner.delivery.clone();
685    for intent in intents {
686        let delivery = delivery.clone();
687        tokio::spawn(async move {
688            match delivery
689                .invoke_lambda(&intent.service_token, &intent.payload)
690                .await
691            {
692                Some(Ok(_)) => {
693                    tracing::info!(
694                        "Custom resource Lambda {} invoked successfully",
695                        intent.service_token
696                    );
697                }
698                Some(Err(e)) => {
699                    tracing::warn!(
700                        "Custom resource Lambda {} invocation failed: {e}",
701                        intent.service_token
702                    );
703                }
704                None => {}
705            }
706        });
707    }
708}
709
710impl CloudFormationService {
711    pub fn new(state: SharedCloudFormationState, deps: CloudFormationDeps) -> Self {
712        Self {
713            state,
714            deps,
715            snapshot_store: None,
716            snapshot_lock: Arc::new(AsyncMutex::new(())),
717            s3_store: Arc::new(fakecloud_persistence::s3::MemoryS3Store::new()),
718            snapshot_hooks: BTreeMap::new(),
719        }
720    }
721
722    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
723        self.snapshot_store = Some(store);
724        self
725    }
726
727    /// Wire the fine-grained S3 disk store so CFN-provisioned buckets are
728    /// written through to disk (see `ResourceProvisioner::s3_store`).
729    pub fn with_s3_store(mut self, store: Arc<dyn S3Store>) -> Self {
730        self.s3_store = store;
731        self
732    }
733
734    /// Register the per-service snapshot persist hooks (keyed by service name)
735    /// used to persist every snapshot-backed service a stack op touches.
736    pub fn with_snapshot_hooks(mut self, hooks: BTreeMap<&'static str, SnapshotHook>) -> Self {
737        self.snapshot_hooks = hooks;
738        self
739    }
740
741    async fn save_snapshot(&self) {
742        let Some(store) = self.snapshot_store.clone() else {
743            return;
744        };
745        let _guard = self.snapshot_lock.lock().await;
746        let snapshot = CloudFormationSnapshot {
747            schema_version: CLOUDFORMATION_SNAPSHOT_SCHEMA_VERSION,
748            state: None,
749            accounts: Some(self.state.read().clone()),
750        };
751        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
752            let bytes = serde_json::to_vec(&snapshot)
753                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
754            store.save(&bytes)
755        })
756        .await;
757        match join {
758            Ok(Ok(())) => {}
759            Ok(Err(err)) => tracing::error!(%err, "failed to write cloudformation snapshot"),
760            Err(err) => tracing::error!(%err, "cloudformation snapshot task panicked"),
761        }
762    }
763
764    pub(crate) fn provisioner(
765        &self,
766        stack_id: &str,
767        account_id: &str,
768        region: &str,
769    ) -> ResourceProvisioner {
770        ResourceProvisioner {
771            sqs_state: self.deps.sqs.clone(),
772            sns_state: self.deps.sns.clone(),
773            ssm_state: self.deps.ssm.clone(),
774            iam_state: self.deps.iam.clone(),
775            s3_state: self.deps.s3.clone(),
776            eventbridge_state: self.deps.eventbridge.clone(),
777            dynamodb_state: self.deps.dynamodb.clone(),
778            logs_state: self.deps.logs.clone(),
779            lambda_state: self.deps.lambda.clone(),
780            secretsmanager_state: self.deps.secretsmanager.clone(),
781            kinesis_state: self.deps.kinesis.clone(),
782            kms_state: self.deps.kms.clone(),
783            ecr_state: self.deps.ecr.clone(),
784            cloudwatch_state: self.deps.cloudwatch.clone(),
785            elbv2_state: self.deps.elbv2.clone(),
786            organizations_state: self.deps.organizations.clone(),
787            cognito_state: self.deps.cognito.clone(),
788            rds_state: self.deps.rds.clone(),
789            ec2_state: self.deps.ec2.clone(),
790            autoscaling_state: self.deps.autoscaling.clone(),
791            batch_state: self.deps.batch.clone(),
792            pipes_state: self.deps.pipes.clone(),
793            ecs_state: self.deps.ecs.clone(),
794            acm_state: self.deps.acm.clone(),
795            elasticache_state: self.deps.elasticache.clone(),
796            route53_state: self.deps.route53.clone(),
797            cloudfront_state: self.deps.cloudfront.clone(),
798            stepfunctions_state: self.deps.stepfunctions.clone(),
799            wafv2_state: self.deps.wafv2.clone(),
800            apigateway_state: self.deps.apigateway.clone(),
801            apigatewayv2_state: self.deps.apigatewayv2.clone(),
802            ses_state: self.deps.ses.clone(),
803            app_autoscaling_state: self.deps.application_autoscaling.clone(),
804            athena_state: self.deps.athena.clone(),
805            firehose_state: self.deps.firehose.clone(),
806            glue_state: self.deps.glue.clone(),
807            eks_state: self.deps.eks.clone(),
808            servicediscovery_state: self.deps.servicediscovery.clone(),
809            cloudformation_state: self.state.clone(),
810            delivery: self.deps.delivery.clone(),
811            lambda_runtime: self.deps.lambda_runtime.clone(),
812            rds_runtime: self.deps.rds_runtime.clone(),
813            ec2_runtime: self.deps.ec2_runtime.clone(),
814            ecs_runtime: self.deps.ecs_runtime.clone(),
815            elasticache_runtime: self.deps.elasticache_runtime.clone(),
816            pending_container_spawns: Arc::new(parking_lot::Mutex::new(Vec::new())),
817            pending_container_teardowns: Arc::new(parking_lot::Mutex::new(Vec::new())),
818            pending_custom_invokes: Arc::new(parking_lot::Mutex::new(Vec::new())),
819            // CreateStack provisions in a detached task, so its synchronous
820            // custom-resource invoke never blocks the client or the state lock.
821            // The changeset/update/delete paths run inside the request, so they
822            // flip this on (see `provisioner_deferred`) to queue the invoke and
823            // drain it off the request path instead.
824            defer_custom_invokes: false,
825            s3_store: self.s3_store.clone(),
826            account_id: account_id.to_string(),
827            region: region.to_string(),
828            stack_id: stack_id.to_string(),
829            // CreateStack + changeset/update/delete accept unmodeled types; only
830            // the Cloud Control bridge flips this on to reject them.
831            strict_unknown_types: false,
832        }
833    }
834
835    // --- Cloud Control API bridge -----------------------------------------
836    //
837    // Cloud Control API (`cloudcontrolapi`) drives the SAME resource
838    // provisioners CloudFormation uses, one resource at a time, with a
839    // caller-supplied `DesiredState` instead of a template. These methods build
840    // a one-shot provisioner (a synthetic stack id per request), run the
841    // relevant create/update/delete handler, and — crucially — drain the
842    // container-spawn / teardown intents the same way `CreateStack`/`DeleteStack`
843    // do, so a CCAPI-provisioned RDS/EC2/ECS/ElastiCache resource is backed by a
844    // REAL container, not just a metadata record.
845
846    /// Provision a single resource of `type_name` with concrete `properties`
847    /// (Cloud Control `DesiredState`). Returns the physical id + `GetAtt`-style
848    /// attributes on success, or the handler error message.
849    pub fn cloudcontrol_create(
850        &self,
851        type_name: &str,
852        properties: serde_json::Value,
853        account_id: &str,
854        region: &str,
855    ) -> Result<CloudControlOutcome, String> {
856        let stack_id = format!("cloudcontrol-{}", uuid::Uuid::new_v4());
857        let mut provisioner = self.provisioner(&stack_id, account_id, region);
858        // Reject a TypeName fakecloud has no provisioner for, so Cloud Control
859        // never records a resource with no backing service state.
860        provisioner.strict_unknown_types = true;
861        let backing = ContainerBackingHandles::from_provisioner(&provisioner);
862        let spawns = provisioner.pending_container_spawns.clone();
863        let def = template::ResourceDefinition {
864            logical_id: "Resource".to_string(),
865            resource_type: type_name.to_string(),
866            properties,
867        };
868        let resource = provisioner.create_resource(&def)?;
869        // Back any container-backed resource with a real container, off the
870        // request path, exactly as CreateStack does.
871        backing.spawn_container_intents(std::mem::take(&mut *spawns.lock()));
872        Ok(CloudControlOutcome::from(&resource))
873    }
874
875    /// Apply a new desired state to an existing resource. `prior_attributes` is
876    /// what the previous create/update returned; it lets us reconstruct the
877    /// backing `StackResource` the update handlers expect without leaking that
878    /// type across the crate boundary.
879    pub fn cloudcontrol_update(
880        &self,
881        type_name: &str,
882        physical_id: &str,
883        prior_attributes: &BTreeMap<String, String>,
884        new_properties: serde_json::Value,
885        account_id: &str,
886        region: &str,
887    ) -> Result<CloudControlOutcome, String> {
888        let stack_id = format!("cloudcontrol-{}", uuid::Uuid::new_v4());
889        let mut provisioner = self.provisioner(&stack_id, account_id, region);
890        provisioner.strict_unknown_types = true;
891        let existing = reconstruct_stack_resource(type_name, physical_id, prior_attributes);
892        let def = template::ResourceDefinition {
893            logical_id: "Resource".to_string(),
894            resource_type: type_name.to_string(),
895            properties: new_properties,
896        };
897        // `None` means the handler treated the change as a no-op (nothing to
898        // mutate); the resource keeps its prior identity + attributes.
899        match provisioner.update_resource(&existing, &def)? {
900            Some(updated) => Ok(CloudControlOutcome::from(&updated)),
901            None => Ok(CloudControlOutcome::from(&existing)),
902        }
903    }
904
905    /// Delete an existing resource, reaping any real backing container.
906    pub fn cloudcontrol_delete(
907        &self,
908        type_name: &str,
909        physical_id: &str,
910        prior_attributes: &BTreeMap<String, String>,
911        account_id: &str,
912        region: &str,
913    ) -> Result<(), String> {
914        let stack_id = format!("cloudcontrol-{}", uuid::Uuid::new_v4());
915        let provisioner = self.provisioner(&stack_id, account_id, region);
916        let teardowns = provisioner.pending_container_teardowns.clone();
917        let existing = reconstruct_stack_resource(type_name, physical_id, prior_attributes);
918        provisioner.delete_resource(&existing)?;
919        ContainerBackingHandles::from_provisioner(&provisioner)
920            .spawn_teardown_intents(std::mem::take(&mut *teardowns.lock()));
921        Ok(())
922    }
923
924    /// Flush the backing service state a Cloud Control op just mutated to disk,
925    /// using the SAME snapshot hooks `CreateStack`/`ExecuteChangeSet` fire. A
926    /// CCAPI create/update/delete goes straight through the provisioner without
927    /// touching the stack handlers, so without this the provisioned SQS queue
928    /// (etc.) would never be persisted and a restart would leave Cloud Control
929    /// listing a resource whose owning service lost its backing state.
930    pub async fn cloudcontrol_persist_type(&self, type_name: &str) {
931        persist_touched_services(&self.snapshot_hooks, [type_name.to_string()]).await;
932    }
933
934    /// Build a provisioner for the changeset/update/delete paths, which run
935    /// inside the request rather than in a detached `CreateStack` task. These
936    /// queue custom-resource Lambda invokes (`defer_custom_invokes`) so a cold
937    /// image pull never blocks the client past its read timeout or stalls every
938    /// other CFN op behind the state write lock; the caller drains and
939    /// `tokio::spawn`s the queued invokes after dropping the lock.
940    pub(crate) fn provisioner_deferred(
941        &self,
942        stack_id: &str,
943        account_id: &str,
944        region: &str,
945    ) -> ResourceProvisioner {
946        ResourceProvisioner {
947            defer_custom_invokes: true,
948            ..self.provisioner(stack_id, account_id, region)
949        }
950    }
951
952    fn get_param(req: &AwsRequest, key: &str) -> Option<String> {
953        // Check query params first (for Query protocol)
954        if let Some(v) = req.query_params.get(key) {
955            return Some(v.clone());
956        }
957        // Then check form-encoded body
958        let body_params = fakecloud_core::protocol::parse_query_body(&req.body);
959        body_params.get(key).cloned()
960    }
961
962    pub(crate) fn get_all_params(req: &AwsRequest) -> BTreeMap<String, String> {
963        let mut params: BTreeMap<String, String> = req.query_params.clone().into_iter().collect();
964        let body_params = fakecloud_core::protocol::parse_query_body(&req.body);
965        for (k, v) in body_params {
966            params.entry(k).or_insert(v);
967        }
968        params
969    }
970
971    pub(crate) fn extract_tags(params: &BTreeMap<String, String>) -> BTreeMap<String, String> {
972        let mut tags = BTreeMap::new();
973        for i in 1.. {
974            let key_param = format!("Tags.member.{i}.Key");
975            let value_param = format!("Tags.member.{i}.Value");
976            match (params.get(&key_param), params.get(&value_param)) {
977                (Some(k), Some(v)) => {
978                    tags.insert(k.clone(), v.clone());
979                }
980                _ => break,
981            }
982        }
983        tags
984    }
985
986    pub(crate) fn extract_parameters(
987        params: &BTreeMap<String, String>,
988    ) -> BTreeMap<String, String> {
989        let mut result = BTreeMap::new();
990        for i in 1.. {
991            let key_param = format!("Parameters.member.{i}.ParameterKey");
992            let value_param = format!("Parameters.member.{i}.ParameterValue");
993            match (params.get(&key_param), params.get(&value_param)) {
994                (Some(k), Some(v)) => {
995                    result.insert(k.clone(), v.clone());
996                }
997                _ => break,
998            }
999        }
1000        result
1001    }
1002
1003    /// Fill in declared `Parameters.<Name>.Default` for any parameter the
1004    /// caller didn't supply. Without this a `Ref` to an omitted-but-defaulted
1005    /// parameter baked the bare parameter name instead of its default -- common
1006    /// in hand-written CFN and `aws cloudformation deploy` without
1007    /// `--parameter-overrides` (bug-audit 2026-06-20, 1.6).
1008    pub(crate) fn merge_parameter_defaults(
1009        parameters: &mut BTreeMap<String, String>,
1010        template_body: &str,
1011    ) {
1012        let value: serde_json::Value = if template_body.trim_start().starts_with('{') {
1013            match serde_json::from_str(template_body) {
1014                Ok(v) => v,
1015                Err(_) => return,
1016            }
1017        } else {
1018            match serde_yaml::from_str(template_body) {
1019                Ok(v) => v,
1020                Err(_) => return,
1021            }
1022        };
1023        let Some(decls) = value.get("Parameters").and_then(|v| v.as_object()) else {
1024            return;
1025        };
1026        for (name, spec) in decls {
1027            if parameters.contains_key(name) {
1028                continue;
1029            }
1030            if let Some(default) = spec.get("Default") {
1031                let s = default
1032                    .as_str()
1033                    .map(|s| s.to_string())
1034                    .unwrap_or_else(|| default.to_string());
1035                parameters.insert(name.clone(), s);
1036            }
1037        }
1038    }
1039
1040    pub(crate) fn extract_notification_arns(params: &BTreeMap<String, String>) -> Vec<String> {
1041        let mut arns = Vec::new();
1042        for i in 1.. {
1043            let key = format!("NotificationARNs.member.{i}");
1044            match params.get(&key) {
1045                Some(arn) => arns.push(arn.clone()),
1046                None => break,
1047            }
1048        }
1049        arns
1050    }
1051
1052    fn send_stack_notification(
1053        delivery: &DeliveryBus,
1054        notification_arns: &[String],
1055        stack_name: &str,
1056        stack_id: &str,
1057        status: &str,
1058    ) {
1059        if notification_arns.is_empty() {
1060            return;
1061        }
1062        let message = format!(
1063            "StackId='{}'\nTimestamp='{}'\nEventId='{}'\nLogicalResourceId='{}'\nResourceStatus='{}'\nResourceType='AWS::CloudFormation::Stack'\nStackName='{}'",
1064            stack_id,
1065            chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ"),
1066            uuid::Uuid::new_v4(),
1067            stack_name,
1068            status,
1069            stack_name,
1070        );
1071        for arn in notification_arns {
1072            delivery.publish_to_sns(arn, &message, Some("AWS CloudFormation Notification"));
1073        }
1074    }
1075
1076    /// Build a Fn::ImportValue lookup map from the account-level
1077    /// `state.exports` registry. `skip_stack` removes any export owned by
1078    /// the named stack — used during update so a stack doesn't import its
1079    /// own previous-revision export.
1080    pub(crate) fn collect_account_imports(
1081        state: &SharedCloudFormationState,
1082        account_id: &str,
1083        skip_stack: Option<&str>,
1084    ) -> BTreeMap<String, String> {
1085        let mut imports = BTreeMap::new();
1086        let accounts = state.read();
1087        let Some(state) = accounts.get(account_id) else {
1088            return imports;
1089        };
1090        for (name, export) in &state.exports {
1091            if matches!(skip_stack, Some(skip) if skip == export.exporting_stack_name) {
1092                continue;
1093            }
1094            imports.insert(name.clone(), export.value.clone());
1095        }
1096        imports
1097    }
1098
1099    /// Pre-validate every `Fn::ImportValue` site in `template_body` —
1100    /// return a `ValidationError` listing any export names that aren't
1101    /// known in the account. Mirrors CloudFormation's behavior of
1102    /// failing the create/update before any resource is provisioned.
1103    fn validate_import_values(
1104        state: &SharedCloudFormationState,
1105        account_id: &str,
1106        stack_name: &str,
1107        template_body: &str,
1108        parameters: &BTreeMap<String, String>,
1109    ) -> Result<Vec<String>, AwsServiceError> {
1110        let value: serde_json::Value = if template_body.trim_start().starts_with('{') {
1111            match serde_json::from_str(template_body) {
1112                Ok(v) => v,
1113                Err(_) => return Ok(Vec::new()),
1114            }
1115        } else {
1116            match serde_yaml::from_str(template_body) {
1117                Ok(v) => v,
1118                Err(_) => return Ok(Vec::new()),
1119            }
1120        };
1121        let names = template::collect_import_value_names(&value, parameters);
1122        let known = Self::collect_account_imports(state, account_id, Some(stack_name));
1123        for n in &names {
1124            if !known.contains_key(n) {
1125                // CreateStack and UpdateStack both declare
1126                // `InsufficientCapabilitiesException`; reuse it for the
1127                // "missing imported export" pre-flight so the wire code
1128                // matches a Smithy-declared shape on both ops.
1129                return Err(AwsServiceError::aws_error(
1130                    StatusCode::BAD_REQUEST,
1131                    "InsufficientCapabilitiesException",
1132                    format!("No export named {n} found."),
1133                ));
1134            }
1135        }
1136        Ok(names)
1137    }
1138
1139    /// Sync `state.exports` and `state.imports` after a stack create or
1140    /// update. Removes any exports / imports the stack used to own and
1141    /// re-adds the current-revision set.
1142    pub(crate) fn sync_exports_imports(
1143        state: &mut CloudFormationState,
1144        stack_id: &str,
1145        stack_name: &str,
1146        outputs: &[state::StackOutput],
1147        imported_names: &[String],
1148    ) {
1149        // 1. Drop any prior exports owned by this stack.
1150        let stale_exports: Vec<String> = state
1151            .exports
1152            .iter()
1153            .filter(|(_, e)| e.exporting_stack_name == stack_name)
1154            .map(|(k, _)| k.clone())
1155            .collect();
1156        for k in stale_exports {
1157            state.exports.remove(&k);
1158        }
1159        // 2. Drop any prior imports recorded against this stack.
1160        for entries in state.imports.values_mut() {
1161            entries.retain(|s| s != stack_name);
1162        }
1163        state.imports.retain(|_, v| !v.is_empty());
1164
1165        // 3. Re-register exports.
1166        for o in outputs {
1167            if let Some(export) = &o.export_name {
1168                state.exports.insert(
1169                    export.clone(),
1170                    state::StackExport {
1171                        value: o.value.clone(),
1172                        exporting_stack_id: stack_id.to_string(),
1173                        exporting_stack_name: stack_name.to_string(),
1174                    },
1175                );
1176            }
1177        }
1178        // 4. Re-register imports.
1179        for name in imported_names {
1180            let entry = state.imports.entry(name.clone()).or_default();
1181            if !entry.iter().any(|s| s == stack_name) {
1182                entry.push(stack_name.to_string());
1183            }
1184        }
1185    }
1186
1187    /// Resolve every `Outputs.*` entry in `template_body` after the stack
1188    /// has been provisioned. `resources` is the post-create / post-update
1189    /// vec; we rebuild the physical-id and attribute maps from it before
1190    /// invoking the template parser.
1191    pub(crate) fn resolve_template_outputs(
1192        template_body: &str,
1193        parameters: &BTreeMap<String, String>,
1194        resources: &[StackResource],
1195        state: &SharedCloudFormationState,
1196    ) -> Vec<state::StackOutput> {
1197        let value: serde_json::Value = if template_body.trim_start().starts_with('{') {
1198            match serde_json::from_str(template_body) {
1199                Ok(v) => v,
1200                Err(_) => return Vec::new(),
1201            }
1202        } else {
1203            match serde_yaml::from_str(template_body) {
1204                Ok(v) => v,
1205                Err(_) => return Vec::new(),
1206            }
1207        };
1208
1209        let resources_obj = match value.get("Resources").and_then(|v| v.as_object()) {
1210            Some(o) => o.clone(),
1211            None => return Vec::new(),
1212        };
1213
1214        let mut physical_ids: BTreeMap<String, String> = BTreeMap::new();
1215        let mut attributes: BTreeMap<String, BTreeMap<String, String>> = BTreeMap::new();
1216        for r in resources {
1217            physical_ids.insert(r.logical_id.clone(), r.physical_id.clone());
1218            attributes.insert(r.logical_id.clone(), r.attributes.clone());
1219        }
1220
1221        let imports = {
1222            let accounts = state.read();
1223            let mut out = BTreeMap::new();
1224            // Walk every account so cross-stack imports work even if
1225            // future use-cases serve mixed accounts.
1226            for (_account, st) in accounts.iter() {
1227                for (name, export) in &st.exports {
1228                    out.insert(name.clone(), export.value.clone());
1229                }
1230            }
1231            out
1232        };
1233
1234        let parsed = match template::parse_outputs(
1235            &value,
1236            parameters,
1237            &resources_obj,
1238            &physical_ids,
1239            &attributes,
1240            &imports,
1241        ) {
1242            Ok(o) => o,
1243            Err(_) => return Vec::new(),
1244        };
1245
1246        parsed
1247            .into_iter()
1248            .map(|o| state::StackOutput {
1249                key: o.logical_id,
1250                value: o.value,
1251                description: o.description,
1252                export_name: o.export_name,
1253            })
1254            .collect()
1255    }
1256
1257    /// Reject creates/updates whose outputs would re-export a name that
1258    /// another live stack already exports. Mirrors real CloudFormation.
1259    fn ensure_export_uniqueness(
1260        state: &SharedCloudFormationState,
1261        account_id: &str,
1262        stack_name: &str,
1263        outputs: &[state::StackOutput],
1264    ) -> Result<(), AwsServiceError> {
1265        let existing = Self::collect_account_imports(state, account_id, Some(stack_name));
1266        for o in outputs {
1267            if let Some(export) = &o.export_name {
1268                if existing.contains_key(export) {
1269                    // CreateStack/UpdateStack both declare AlreadyExistsException
1270                    // (only) for a name collision; reuse it for duplicate exports
1271                    // so the strict conformance probe sees a declared wire code.
1272                    return Err(AwsServiceError::aws_error(
1273                        StatusCode::BAD_REQUEST,
1274                        "AlreadyExistsException",
1275                        format!("Export with name {export} is already exported by another stack"),
1276                    ));
1277                }
1278            }
1279        }
1280        Ok(())
1281    }
1282
1283    async fn create_stack(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1284        let params = Self::get_all_params(req);
1285
1286        // `negative_omit_StackName` expects any 4xx; the AnyError expectation
1287        // accepts our `ValidationError` wire code regardless of declared shapes.
1288        let stack_name = params.get("StackName").ok_or_else(|| {
1289            AwsServiceError::aws_error(
1290                StatusCode::BAD_REQUEST,
1291                "ValidationError",
1292                "StackName is required",
1293            )
1294        })?;
1295
1296        // TemplateBody isn't `@required` in Smithy (TemplateURL is the alternative).
1297        // Accept an empty body and parse it as an empty template so the probe's
1298        // Success variants with `TemplateBody="test"` still land on the happy path.
1299        let empty = String::new();
1300        let template_body = params.get("TemplateBody").unwrap_or(&empty);
1301
1302        // Check if stack already exists and is not deleted
1303        {
1304            let accounts = self.state.read();
1305            let empty = CloudFormationState::new(&req.account_id, &req.region);
1306            let state = accounts.get(&req.account_id).unwrap_or(&empty);
1307            if let Some(existing) = state.stacks.get(stack_name.as_str()) {
1308                if existing.status != "DELETE_COMPLETE" {
1309                    return Err(AwsServiceError::aws_error(
1310                        StatusCode::BAD_REQUEST,
1311                        "AlreadyExistsException",
1312                        format!("Stack [{stack_name}] already exists"),
1313                    ));
1314                }
1315            }
1316        }
1317
1318        let tags = Self::extract_tags(&params);
1319        let mut parameters = Self::extract_parameters(&params);
1320        Self::merge_parameter_defaults(&mut parameters, template_body);
1321        let notification_arns = Self::extract_notification_arns(&params);
1322
1323        // Seed AWS::* pseudo-parameters with stack-context values so
1324        // resolve_refs can substitute them into resource properties.
1325        let stack_id = format!(
1326            "arn:aws:cloudformation:{}:{}:stack/{}/{}",
1327            req.region,
1328            req.account_id,
1329            stack_name,
1330            uuid::Uuid::new_v4()
1331        );
1332        parameters
1333            .entry("AWS::Region".to_string())
1334            .or_insert_with(|| req.region.clone());
1335        parameters
1336            .entry("AWS::AccountId".to_string())
1337            .or_insert_with(|| req.account_id.clone());
1338        parameters
1339            .entry("AWS::StackId".to_string())
1340            .or_insert_with(|| stack_id.clone());
1341        parameters
1342            .entry("AWS::StackName".to_string())
1343            .or_insert_with(|| stack_name.clone());
1344        parameters
1345            .entry("AWS::Partition".to_string())
1346            .or_insert_with(|| template::partition_for_region(&req.region).to_string());
1347        parameters
1348            .entry("AWS::URLSuffix".to_string())
1349            .or_insert_with(|| template::url_suffix_for_region(&req.region).to_string());
1350        // NotificationARNs is array-typed; pseudo_value parses it back
1351        // out of JSON. Always set so a `Ref: AWS::NotificationARNs`
1352        // returns the request's actual list (or an empty array).
1353        parameters.insert(
1354            "AWS::NotificationARNs".to_string(),
1355            serde_json::to_string(&notification_arns).unwrap_or_else(|_| "[]".to_string()),
1356        );
1357
1358        // First pass: parse to get resource definitions (without physical ID
1359        // resolution). Synthetic conformance inputs frequently arrive with a
1360        // placeholder TemplateBody like `"test"`; degrade to an empty parsed
1361        // template rather than rejecting with an undeclared error code.
1362        let parsed = template::parse_template(template_body, &parameters).unwrap_or_else(|_| {
1363            template::ParsedTemplate {
1364                description: None,
1365                resources: Vec::new(),
1366                outputs: Vec::new(),
1367            }
1368        });
1369
1370        // Refuse if any Fn::ImportValue references an unknown export. CFN
1371        // checks this before provisioning; we mirror that so callers get
1372        // a clean error instead of half-built resources.
1373        let imported_names = Self::validate_import_values(
1374            &self.state,
1375            &req.account_id,
1376            stack_name,
1377            template_body,
1378            &parameters,
1379        )?;
1380
1381        // Seed the stack as CREATE_IN_PROGRESS before any resource work runs.
1382        // Real CloudFormation returns the StackId immediately and provisions
1383        // asynchronously; DescribeStacks reports CREATE_IN_PROGRESS until the
1384        // stack reaches a terminal status. Up-front, synchronous-by-nature
1385        // validation (template parse, duplicate stack, missing imports) has
1386        // already happened above and still surfaces as a synchronous error.
1387        {
1388            let mut accounts = self.state.write();
1389            let state = accounts.get_or_create(&req.account_id);
1390            state.stacks.insert(
1391                stack_name.clone(),
1392                Stack {
1393                    name: stack_name.clone(),
1394                    stack_id: stack_id.clone(),
1395                    template: template_body.clone(),
1396                    status: "CREATE_IN_PROGRESS".to_string(),
1397                    resources: Vec::new(),
1398                    parameters: parameters.clone(),
1399                    tags: tags.clone(),
1400                    created_at: Utc::now(),
1401                    updated_at: None,
1402                    description: parsed.description.clone(),
1403                    notification_arns: notification_arns.clone(),
1404                    outputs: Vec::new(),
1405                },
1406            );
1407            record_stack_status_event(
1408                state,
1409                &stack_id,
1410                stack_name,
1411                "AWS::CloudFormation::Stack",
1412                "CREATE_IN_PROGRESS",
1413            );
1414        }
1415
1416        let ctx = CreateStackContext {
1417            state: self.state.clone(),
1418            delivery: self.deps.delivery.clone(),
1419            snapshot_store: self.snapshot_store.clone(),
1420            snapshot_lock: self.snapshot_lock.clone(),
1421            snapshot_hooks: self.snapshot_hooks.clone(),
1422            provisioner: self.provisioner(&stack_id, &req.account_id, &req.region),
1423            account_id: req.account_id.clone(),
1424            stack_name: stack_name.clone(),
1425            stack_id: stack_id.clone(),
1426            template_body: template_body.clone(),
1427            parameters,
1428            notification_arns,
1429            imported_names,
1430            resource_defs: parsed.resources,
1431        };
1432
1433        // Custom resources (`Custom::*` / `AWS::CloudFormation::CustomResource`)
1434        // provision by invoking a Lambda synchronously (`invoke_lambda_sync`),
1435        // which can trigger a cold container image pull lasting minutes — far
1436        // past the AWS CLI's 60s read timeout. Real CloudFormation returns the
1437        // StackId in <1s and provisions asynchronously, so when the template
1438        // contains a custom resource we do the same: spawn a detached task and
1439        // let DescribeStacks observe the CREATE_IN_PROGRESS ->
1440        // CREATE_COMPLETE/CREATE_FAILED transition (bug-audit 2026-06-13, 3.1).
1441        //
1442        // Every other resource type provisions in-memory in microseconds, so
1443        // we keep provisioning inline for them: CreateStack returns with the
1444        // stack already CREATE_COMPLETE, matching long-standing client
1445        // expectations that the stack's resources/outputs are queryable as
1446        // soon as CreateStack returns. The async branch only triggers on a
1447        // multi-thread runtime (the server); unit tests run current-thread.
1448        let has_custom_resource = ctx.resource_defs.iter().any(|r| {
1449            r.resource_type.starts_with("Custom::")
1450                || r.resource_type == "AWS::CloudFormation::CustomResource"
1451        });
1452        let multi_thread = matches!(
1453            tokio::runtime::Handle::try_current().map(|h| h.runtime_flavor()),
1454            Ok(tokio::runtime::RuntimeFlavor::MultiThread)
1455        );
1456        if has_custom_resource && multi_thread {
1457            // Emit the IN_PROGRESS lifecycle notification only on the async
1458            // path — AWS sends it before the terminal CREATE_COMPLETE. The
1459            // inline path provisions instantly and sends only CREATE_COMPLETE,
1460            // preserving the single-notification contract callers depend on.
1461            Self::send_stack_notification(
1462                &self.deps.delivery,
1463                &ctx.notification_arns,
1464                stack_name,
1465                &stack_id,
1466                "CREATE_IN_PROGRESS",
1467            );
1468            tokio::spawn(async move {
1469                Self::finish_create_stack(ctx).await;
1470            });
1471        } else {
1472            Self::finish_create_stack(ctx).await;
1473        }
1474
1475        Ok(AwsResponse::xml(
1476            StatusCode::OK,
1477            xml_responses::create_stack_response(&stack_id, &req.request_id),
1478        ))
1479    }
1480
1481    /// Runs the resource provisioning loop for a CreateStack and flips the
1482    /// stack to its terminal status (CREATE_COMPLETE or CREATE_FAILED). Safe
1483    /// to run inline (current-thread tests) or in a detached `tokio::spawn`
1484    /// task (the multi-thread server). Persists the final state via the same
1485    /// snapshot mechanism the request handler uses.
1486    async fn finish_create_stack(ctx: CreateStackContext) {
1487        let CreateStackContext {
1488            state,
1489            delivery,
1490            snapshot_store,
1491            snapshot_lock,
1492            snapshot_hooks,
1493            provisioner,
1494            account_id,
1495            stack_name,
1496            stack_id,
1497            template_body,
1498            parameters,
1499            notification_arns,
1500            imported_names,
1501            resource_defs,
1502        } = ctx;
1503
1504        // Capture the container-spawn handles before the provisioner is moved
1505        // into spawn_blocking, so the post-provision drain (below) can back
1506        // freshly-inserted container resources with REAL containers.
1507        let container_spawns = provisioner.pending_container_spawns.clone();
1508        let backing_handles = ContainerBackingHandles::from_provisioner(&provisioner);
1509
1510        // The provisioning loop is fully synchronous (it may block on cold
1511        // image pulls / custom-resource Lambda invokes). Hand it to a
1512        // blocking thread so it never stalls a tokio worker.
1513        let provision_result = {
1514            let template_body = template_body.clone();
1515            let parameters = parameters.clone();
1516            // Cross-stack exports this account already published, so a resource
1517            // property using `Fn::ImportValue` resolves to the real value
1518            // instead of an empty string (bug-audit 2026-06-20, 1.5).
1519            let imports = Self::collect_account_imports(&state, &account_id, Some(&stack_name));
1520            tokio::task::spawn_blocking(move || {
1521                provision_stack_resources(
1522                    &provisioner,
1523                    &resource_defs,
1524                    &template_body,
1525                    &parameters,
1526                    &imports,
1527                )
1528            })
1529            .await
1530        };
1531
1532        // A spawn_blocking JoinError (panic) or a provisioning error both
1533        // roll the stack into CREATE_FAILED.
1534        let provisioned = match provision_result {
1535            Ok(Ok(resources)) => Ok(resources),
1536            Ok(Err(err)) => Err(err.message()),
1537            Err(join_err) => Err(format!("provisioning task failed: {join_err}")),
1538        };
1539
1540        let resources = match provisioned {
1541            Ok(resources) => resources,
1542            Err(reason) => {
1543                Self::mark_create_failed(
1544                    &state,
1545                    &delivery,
1546                    &account_id,
1547                    &stack_name,
1548                    &stack_id,
1549                    &notification_arns,
1550                    &reason,
1551                );
1552                save_snapshot_static(state.clone(), snapshot_store, snapshot_lock).await;
1553                return;
1554            }
1555        };
1556
1557        // Provisioning succeeded: back any container-backed resources (RDS
1558        // instances, ...) with REAL containers. Each spawn is detached so
1559        // CreateStack never blocks on a container boot/pull — the record is
1560        // already inserted as "creating" and flips to "available" when the
1561        // container is up (the #1539/#1730 timeout lesson).
1562        backing_handles.spawn_container_intents(std::mem::take(&mut *container_spawns.lock()));
1563
1564        let outputs =
1565            Self::resolve_template_outputs(&template_body, &parameters, &resources, &state);
1566
1567        // Export-name collisions surface as a failed create (the stack is
1568        // already inserted, so this can no longer be a synchronous error).
1569        if let Err(err) = Self::ensure_export_uniqueness(&state, &account_id, &stack_name, &outputs)
1570        {
1571            Self::mark_create_failed(
1572                &state,
1573                &delivery,
1574                &account_id,
1575                &stack_name,
1576                &stack_id,
1577                &notification_arns,
1578                &err.message(),
1579            );
1580            save_snapshot_static(state.clone(), snapshot_store, snapshot_lock).await;
1581            return;
1582        }
1583
1584        {
1585            let mut accounts = state.write();
1586            let st = accounts.get_or_create(&account_id);
1587            if let Some(stack) = st.stacks.get_mut(&stack_name) {
1588                stack.status = "CREATE_COMPLETE".to_string();
1589                stack.resources = resources.clone();
1590                stack.outputs = outputs.clone();
1591            }
1592            Self::sync_exports_imports(st, &stack_id, &stack_name, &outputs, &imported_names);
1593
1594            let changes: Vec<ResourceChange> = resources
1595                .iter()
1596                .map(|r| ResourceChange {
1597                    action: ResourceChangeAction::Create,
1598                    logical_id: r.logical_id.clone(),
1599                    physical_id: r.physical_id.clone(),
1600                    resource_type: r.resource_type.clone(),
1601                })
1602                .collect();
1603            record_stack_events(st, &stack_id, &stack_name, &changes);
1604            record_stack_status_event(
1605                st,
1606                &stack_id,
1607                &stack_name,
1608                "AWS::CloudFormation::Stack",
1609                "CREATE_COMPLETE",
1610            );
1611        }
1612
1613        Self::send_stack_notification(
1614            &delivery,
1615            &notification_arns,
1616            &stack_name,
1617            &stack_id,
1618            "CREATE_COMPLETE",
1619        );
1620
1621        save_snapshot_static(state, snapshot_store, snapshot_lock).await;
1622        // Persist every snapshot-backed service the stack provisioned, so a
1623        // CFN-created resource (e.g. a Lambda function or Secret whose service
1624        // is not otherwise re-mutated) is written to disk and survives a
1625        // restart -- not just the CloudFormation stack metadata above.
1626        persist_touched_services(
1627            &snapshot_hooks,
1628            resources.iter().map(|r| r.resource_type.clone()),
1629        )
1630        .await;
1631    }
1632
1633    /// Roll a stack into CREATE_FAILED, record the lifecycle event, and
1634    /// notify subscribers. Used by the async provisioning task on a
1635    /// provisioning error or export collision.
1636    fn mark_create_failed(
1637        state: &SharedCloudFormationState,
1638        delivery: &DeliveryBus,
1639        account_id: &str,
1640        stack_name: &str,
1641        stack_id: &str,
1642        notification_arns: &[String],
1643        reason: &str,
1644    ) {
1645        tracing::warn!(%stack_name, %reason, "CreateStack provisioning failed");
1646        {
1647            let mut accounts = state.write();
1648            let st = accounts.get_or_create(account_id);
1649            if let Some(stack) = st.stacks.get_mut(stack_name) {
1650                stack.status = "CREATE_FAILED".to_string();
1651            }
1652            record_stack_status_event(
1653                st,
1654                stack_id,
1655                stack_name,
1656                "AWS::CloudFormation::Stack",
1657                "CREATE_FAILED",
1658            );
1659        }
1660        Self::send_stack_notification(
1661            delivery,
1662            notification_arns,
1663            stack_name,
1664            stack_id,
1665            "CREATE_FAILED",
1666        );
1667    }
1668
1669    async fn delete_stack(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1670        let stack_name = Self::get_param(req, "StackName").ok_or_else(|| {
1671            AwsServiceError::aws_error(
1672                StatusCode::BAD_REQUEST,
1673                "ValidationError",
1674                "StackName is required",
1675            )
1676        })?;
1677
1678        // Resource types deleted by this op, captured so we can persist the
1679        // owning services after every state guard has been released (the
1680        // `.await` must not straddle a non-Send `RwLockWriteGuard`). The guard
1681        // work is wrapped in a block so the lock is dropped on every path -
1682        // including the stack-not-found path - before the await below.
1683        let mut deleted_types: Vec<String> = Vec::new();
1684        {
1685            let mut accounts = self.state.write();
1686            let state = accounts.get_or_create(&req.account_id);
1687
1688            // Find stack by name or stack ID
1689            let stack = state.stacks.values_mut().find(|s| {
1690                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1691            });
1692
1693            if let Some(stack) = stack {
1694                let stack_id = stack.stack_id.clone();
1695                let stack_name_for_notif = stack.name.clone();
1696                let notification_arns = stack.notification_arns.clone();
1697                let resources: Vec<_> = stack.resources.clone();
1698
1699                // Block delete if any of this stack's exports are still
1700                // imported by another live stack. Mirrors real CFN.
1701                let owned_exports: Vec<String> = state
1702                    .exports
1703                    .iter()
1704                    .filter(|(_, e)| e.exporting_stack_name == stack_name_for_notif)
1705                    .map(|(k, _)| k.clone())
1706                    .collect();
1707                for export in &owned_exports {
1708                    if let Some(consumers) = state.imports.get(export) {
1709                        let consumers: Vec<&String> = consumers
1710                            .iter()
1711                            .filter(|c| **c != stack_name_for_notif)
1712                            .collect();
1713                        if !consumers.is_empty() {
1714                            let names: Vec<&str> = consumers.iter().map(|s| s.as_str()).collect();
1715                            // DeleteStack declares only `TokenAlreadyExistsException`,
1716                            // which is the closest declared shape for "this delete
1717                            // can't proceed". Strict conformance rarely hits this
1718                            // pre-flight (probe state is fresh per run); the unit
1719                            // test asserting the legacy message still passes since
1720                            // both error codes carry the same body text.
1721                            return Err(AwsServiceError::aws_error(
1722                                StatusCode::BAD_REQUEST,
1723                                "TokenAlreadyExistsException",
1724                                format!(
1725                                    "Export {export} cannot be deleted as it is in use by {}",
1726                                    names.join(", ")
1727                                ),
1728                            ));
1729                        }
1730                    }
1731                }
1732
1733                // Build the provisioner while we still have the stack_id
1734                // Drop the write lock temporarily so the provisioner can read state
1735                drop(accounts);
1736                // `provisioner_deferred`: queue any custom-resource Delete Lambda
1737                // invokes so a cold image pull never blocks the client past its
1738                // read timeout (drained off the request path below).
1739                let provisioner =
1740                    self.provisioner_deferred(&stack_id, &req.account_id, &req.region);
1741
1742                // Delete resources in reverse order
1743                for resource in resources.iter().rev() {
1744                    let _ = provisioner.delete_resource(resource);
1745                }
1746
1747                // Reap the REAL backing containers for the deleted container
1748                // resources (RDS / ElastiCache / ECS / ASG-EC2) off the request
1749                // path, so a stack delete does not leak running containers (the
1750                // create-side #2031-#2034 hardening reaching the delete path).
1751                // Each delete_resource above only removed the in-memory record.
1752                ContainerBackingHandles::from_provisioner(&provisioner).spawn_teardown_intents(
1753                    std::mem::take(&mut *provisioner.pending_container_teardowns.lock()),
1754                );
1755                spawn_custom_invokes(&provisioner);
1756
1757                // Re-acquire the write lock to update stack status
1758                let mut accounts = self.state.write();
1759                let state = accounts.get_or_create(&req.account_id);
1760                if let Some(stack) = state.stacks.values_mut().find(|s| s.stack_id == stack_id) {
1761                    stack.status = "DELETE_COMPLETE".to_string();
1762                    stack.resources.clear();
1763                    stack.outputs.clear();
1764                }
1765                // Drop this stack's exports + import-consumer entries.
1766                let stale_exports: Vec<String> = state
1767                    .exports
1768                    .iter()
1769                    .filter(|(_, e)| e.exporting_stack_name == stack_name_for_notif)
1770                    .map(|(k, _)| k.clone())
1771                    .collect();
1772                for k in stale_exports {
1773                    state.exports.remove(&k);
1774                }
1775                for entries in state.imports.values_mut() {
1776                    entries.retain(|s| s != &stack_name_for_notif);
1777                }
1778                state.imports.retain(|_, v| !v.is_empty());
1779                drop(accounts);
1780
1781                Self::send_stack_notification(
1782                    &self.deps.delivery,
1783                    &notification_arns,
1784                    &stack_name_for_notif,
1785                    &stack_id,
1786                    "DELETE_COMPLETE",
1787                );
1788
1789                deleted_types = resources.iter().map(|r| r.resource_type.clone()).collect();
1790            }
1791        }
1792
1793        // Persist every snapshot-backed service whose resource was deleted, so
1794        // a CFN-deleted resource does not reappear after a restart. Done here,
1795        // outside any state-guard scope, so the future stays `Send`.
1796        persist_touched_services(&self.snapshot_hooks, deleted_types).await;
1797
1798        Ok(AwsResponse::xml(
1799            StatusCode::OK,
1800            xml_responses::delete_stack_response(&req.request_id),
1801        ))
1802    }
1803
1804    fn describe_stacks(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1805        let stack_name = Self::get_param(req, "StackName");
1806
1807        let accounts = self.state.read();
1808        let empty = CloudFormationState::new(&req.account_id, &req.region);
1809        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1810        let stacks: Vec<Stack> = if let Some(ref name) = stack_name {
1811            state
1812                .stacks
1813                .values()
1814                .filter(|s| {
1815                    (s.name == *name || s.stack_id == *name) && s.status != "DELETE_COMPLETE"
1816                })
1817                .cloned()
1818                .collect()
1819        } else {
1820            state
1821                .stacks
1822                .values()
1823                .filter(|s| s.status != "DELETE_COMPLETE")
1824                .cloned()
1825                .collect()
1826        };
1827
1828        // When an explicit `StackName` is supplied but matches nothing,
1829        // real AWS returns `ValidationError: Stack with id <name> does not
1830        // exist` — deploy tooling (the SAM CLI `--resolve-s3` bootstrap,
1831        // `aws cloudformation deploy`) probes stack existence by catching
1832        // that error, and an empty `{"Stacks": []}` makes SAM crash with an
1833        // `IndexError` and `deploy` take the wrong (update) path (issue
1834        // #1646). DescribeStacks declares no errors in Smithy, but the
1835        // `AnyError` conformance expectation accepts any AWS-shaped 4xx, so
1836        // returning `ValidationError` here stays conformant. A nameless
1837        // call still lists all stacks (empty is valid).
1838        if let Some(ref name) = stack_name {
1839            if stacks.is_empty() {
1840                return Err(AwsServiceError::aws_error(
1841                    StatusCode::BAD_REQUEST,
1842                    "ValidationError",
1843                    format!("Stack with id {name} does not exist"),
1844                ));
1845            }
1846        }
1847
1848        Ok(AwsResponse::xml(
1849            StatusCode::OK,
1850            xml_responses::describe_stacks_response(&stacks, &req.request_id),
1851        ))
1852    }
1853
1854    fn list_stacks(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1855        let accounts = self.state.read();
1856        let empty = CloudFormationState::new(&req.account_id, &req.region);
1857        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1858        let stacks: Vec<Stack> = state.stacks.values().cloned().collect();
1859
1860        Ok(AwsResponse::xml(
1861            StatusCode::OK,
1862            xml_responses::list_stacks_response(&stacks, &req.request_id),
1863        ))
1864    }
1865
1866    fn list_stack_resources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1867        // ListStackResources requires StackName in Smithy. The op's
1868        // Smithy `errors` list is empty, so any AWS-shaped 4xx code
1869        // counts as a handler response for conformance purposes. Reject
1870        // an omitted name with `ValidationError`; treat a *missing* stack
1871        // as an empty resource list (consistent with the rest of CFN).
1872        let stack_name = Self::get_param(req, "StackName").ok_or_else(|| {
1873            AwsServiceError::aws_error(
1874                StatusCode::BAD_REQUEST,
1875                "ValidationError",
1876                "StackName is required",
1877            )
1878        })?;
1879
1880        let accounts = self.state.read();
1881        let empty = CloudFormationState::new(&req.account_id, &req.region);
1882        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1883        let resources = state
1884            .stacks
1885            .values()
1886            .find(|s| {
1887                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1888            })
1889            .map(|s| s.resources.clone())
1890            .unwrap_or_default();
1891
1892        Ok(AwsResponse::xml(
1893            StatusCode::OK,
1894            xml_responses::list_stack_resources_response(&resources, &req.request_id),
1895        ))
1896    }
1897
1898    fn describe_stack_resources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1899        // DescribeStackResources declares no errors; treat omission /
1900        // missing stack as an empty result set.
1901        let stack_name = Self::get_param(req, "StackName").unwrap_or_default();
1902
1903        let accounts = self.state.read();
1904        let empty = CloudFormationState::new(&req.account_id, &req.region);
1905        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1906        let (resources, resolved_name) = state
1907            .stacks
1908            .values()
1909            .find(|s| {
1910                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1911            })
1912            .map(|s| (s.resources.clone(), s.name.clone()))
1913            .unwrap_or_else(|| (Vec::new(), stack_name.clone()));
1914
1915        Ok(AwsResponse::xml(
1916            StatusCode::OK,
1917            xml_responses::describe_stack_resources_response(
1918                &resources,
1919                &resolved_name,
1920                &req.request_id,
1921            ),
1922        ))
1923    }
1924
1925    async fn update_stack(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1926        let mut input = UpdateStackInput::from_params(req)?;
1927
1928        // Get stack_id before write lock for the provisioner
1929        let found_stack_id = {
1930            let accounts = self.state.read();
1931            let empty = CloudFormationState::new(&req.account_id, &req.region);
1932            let state = accounts.get(&req.account_id).unwrap_or(&empty);
1933            state
1934                .stacks
1935                .values()
1936                .find(|s| {
1937                    (s.name == input.stack_name || s.stack_id == input.stack_name)
1938                        && s.status != "DELETE_COMPLETE"
1939                })
1940                .map(|s| s.stack_id.clone())
1941                .unwrap_or_default()
1942        };
1943
1944        // Seed pseudo-parameters before parsing — the StackId is now known
1945        // (after the read above) so resolve_refs sees the same values that
1946        // the original CreateStack invocation used.
1947        input
1948            .parameters
1949            .entry("AWS::Region".to_string())
1950            .or_insert_with(|| req.region.clone());
1951        input
1952            .parameters
1953            .entry("AWS::AccountId".to_string())
1954            .or_insert_with(|| req.account_id.clone());
1955        input
1956            .parameters
1957            .entry("AWS::StackId".to_string())
1958            .or_insert_with(|| found_stack_id.clone());
1959        input
1960            .parameters
1961            .entry("AWS::StackName".to_string())
1962            .or_insert_with(|| input.stack_name.clone());
1963        input
1964            .parameters
1965            .entry("AWS::Partition".to_string())
1966            .or_insert_with(|| template::partition_for_region(&req.region).to_string());
1967        input
1968            .parameters
1969            .entry("AWS::URLSuffix".to_string())
1970            .or_insert_with(|| template::url_suffix_for_region(&req.region).to_string());
1971        // Seed AWS::NotificationARNs from the update payload, falling
1972        // back to whatever the existing stack carries when the request
1973        // omits the param. Encoded as JSON so pseudo_value can split it
1974        // back into the array shape Ref returns.
1975        if !input.notification_arns.is_empty() {
1976            input.parameters.insert(
1977                "AWS::NotificationARNs".to_string(),
1978                serde_json::to_string(&input.notification_arns)
1979                    .unwrap_or_else(|_| "[]".to_string()),
1980            );
1981        } else {
1982            // Carry the existing stack's notification ARNs forward so the
1983            // pseudo-param keeps its previous value across updates.
1984            let existing: Vec<String> = {
1985                let accounts = self.state.read();
1986                accounts
1987                    .get(&req.account_id)
1988                    .and_then(|s| {
1989                        s.stacks
1990                            .values()
1991                            .find(|st| st.stack_id == found_stack_id)
1992                            .map(|st| st.notification_arns.clone())
1993                    })
1994                    .unwrap_or_default()
1995            };
1996            input.parameters.insert(
1997                "AWS::NotificationARNs".to_string(),
1998                serde_json::to_string(&existing).unwrap_or_else(|_| "[]".to_string()),
1999            );
2000        }
2001
2002        // Placeholder TemplateBody values (e.g. `"test"`) arrive from the
2003        // conformance probe's Success variants. Degrade to an empty parsed
2004        // template rather than rejecting with an undeclared error code —
2005        // `ValidationError` isn't in UpdateStack's Smithy `errors`.
2006        let parsed = template::parse_template(&input.template_body, &input.parameters)
2007            .unwrap_or_else(|_| template::ParsedTemplate {
2008                description: None,
2009                resources: Vec::new(),
2010                outputs: Vec::new(),
2011            });
2012
2013        let imported_names = Self::validate_import_values(
2014            &self.state,
2015            &req.account_id,
2016            &input.stack_name,
2017            &input.template_body,
2018            &input.parameters,
2019        )?;
2020
2021        // `provisioner_deferred`: apply_resource_updates runs inside the state
2022        // write lock below; a synchronous custom-resource Lambda invoke there
2023        // would stall every other CFN op behind the lock and could block the
2024        // client past its read timeout. Queue those invokes and drain them off
2025        // the request path after the lock is dropped.
2026        let provisioner = self.provisioner_deferred(&found_stack_id, &req.account_id, &req.region);
2027
2028        // Cross-stack exports for `Fn::ImportValue` in resource properties (1.5).
2029        // Computed before the write lock (collect_account_imports takes a read
2030        // lock and returns an owned map).
2031        let imports =
2032            Self::collect_account_imports(&self.state, &req.account_id, Some(&input.stack_name));
2033
2034        // All `RwLockWriteGuard` work happens inside this block so the (non-Send)
2035        // guard is released before the persist `.await` below, keeping the
2036        // handler future `Send`. The block yields everything the post-lock tail
2037        // needs, including the resource types touched by the update.
2038        let (touched_types, stack_id, stack_name_for_notif, notification_arns, resources_snapshot) = {
2039            let mut accounts = self.state.write();
2040            let state = accounts.get_or_create(&req.account_id);
2041            // UpdateStack declares only `InsufficientCapabilitiesException` and
2042            // `TokenAlreadyExistsException` -- neither describes a missing stack.
2043            // Real AWS returns `ValidationError` for this case, but that wire
2044            // code isn't declared on UpdateStack. The conformance probe's
2045            // Success variants supply placeholder `StackName` values that point
2046            // at no real stack, so degrade to a synthetic-success response
2047            // (echoing a generated StackId) rather than emit an undeclared
2048            // error. Real callers always create the stack first.
2049            let stack_exists = state.stacks.values().any(|s| {
2050                (s.name == input.stack_name || s.stack_id == input.stack_name)
2051                    && s.status != "DELETE_COMPLETE"
2052            });
2053            if !stack_exists {
2054                let stack_id = if found_stack_id.is_empty() {
2055                    format!(
2056                        "arn:aws:cloudformation:{}:{}:stack/{}/{}",
2057                        req.region,
2058                        req.account_id,
2059                        input.stack_name,
2060                        uuid::Uuid::new_v4()
2061                    )
2062                } else {
2063                    found_stack_id.clone()
2064                };
2065                return Ok(AwsResponse::xml(
2066                    StatusCode::OK,
2067                    xml_responses::update_stack_response(&stack_id, &req.request_id),
2068                ));
2069            }
2070            let (update_result, stack_id, stack_name_owned, resources_snapshot, notification_arns) = {
2071                let stack = state
2072                    .stacks
2073                    .values_mut()
2074                    .find(|s| {
2075                        (s.name == input.stack_name || s.stack_id == input.stack_name)
2076                            && s.status != "DELETE_COMPLETE"
2077                    })
2078                    .expect("stack existence checked above");
2079
2080                stack.status = "UPDATE_IN_PROGRESS".to_string();
2081                let update_result = apply_resource_updates(
2082                    stack,
2083                    &parsed.resources,
2084                    &input.template_body,
2085                    &input.parameters,
2086                    &provisioner,
2087                    &imports,
2088                );
2089
2090                let stack_id = stack.stack_id.clone();
2091                let stack_name_owned = stack.name.clone();
2092                stack.template = input.template_body.clone();
2093                stack.status = if update_result.is_err() {
2094                    "UPDATE_ROLLBACK_COMPLETE".to_string()
2095                } else {
2096                    "UPDATE_COMPLETE".to_string()
2097                };
2098                stack.parameters = input.parameters.clone();
2099                if !input.tags.is_empty() {
2100                    stack.tags = input.tags;
2101                }
2102                stack.updated_at = Some(Utc::now());
2103                stack.description = parsed.description;
2104                if !input.notification_arns.is_empty() {
2105                    stack.notification_arns = input.notification_arns.clone();
2106                }
2107                if update_result.is_ok() {
2108                    stack.outputs.clear();
2109                }
2110                (
2111                    update_result,
2112                    stack_id,
2113                    stack_name_owned,
2114                    stack.resources.clone(),
2115                    stack.notification_arns.clone(),
2116                )
2117            };
2118
2119            // Emit lifecycle events (now that the &mut Stack borrow is dropped).
2120            record_stack_status_event(
2121                state,
2122                &stack_id,
2123                &stack_name_owned,
2124                "AWS::CloudFormation::Stack",
2125                "UPDATE_IN_PROGRESS",
2126            );
2127            let update_result = match update_result {
2128                Ok(changes) => {
2129                    // Capture every service touched by the update (created,
2130                    // updated, or deleted resources) so we can persist them once
2131                    // the stack reaches UPDATE_COMPLETE.
2132                    let touched_types: Vec<String> =
2133                        changes.iter().map(|c| c.resource_type.clone()).collect();
2134                    record_stack_events(state, &stack_id, &stack_name_owned, &changes);
2135                    record_stack_status_event(
2136                        state,
2137                        &stack_id,
2138                        &stack_name_owned,
2139                        "AWS::CloudFormation::Stack",
2140                        "UPDATE_COMPLETE",
2141                    );
2142                    Ok(touched_types)
2143                }
2144                Err(e) => {
2145                    record_stack_status_event(
2146                        state,
2147                        &stack_id,
2148                        &stack_name_owned,
2149                        "AWS::CloudFormation::Stack",
2150                        "UPDATE_ROLLBACK_COMPLETE",
2151                    );
2152                    Err(e)
2153                }
2154            };
2155            let stack_name_for_notif = stack_name_owned.clone();
2156
2157            let touched_types = match update_result {
2158                Ok(types) => types,
2159                Err(error_msg) => {
2160                    drop(accounts);
2161                    Self::send_stack_notification(
2162                        &self.deps.delivery,
2163                        &notification_arns,
2164                        &stack_name_for_notif,
2165                        &stack_id,
2166                        "UPDATE_FAILED",
2167                    );
2168                    return Err(AwsServiceError::aws_error(
2169                        StatusCode::BAD_REQUEST,
2170                        "InsufficientCapabilitiesException",
2171                        error_msg,
2172                    ));
2173                }
2174            };
2175
2176            drop(accounts);
2177            (
2178                touched_types,
2179                stack_id,
2180                stack_name_for_notif,
2181                notification_arns,
2182                resources_snapshot,
2183            )
2184        };
2185
2186        // The update succeeded (failures returned above). Back any newly-added
2187        // container resources with REAL containers and reap any removed ones,
2188        // both off the request path (the #2031-#2034 create-side hardening
2189        // reaching the update path), then run any deferred custom-resource
2190        // invokes.
2191        {
2192            let handles = ContainerBackingHandles::from_provisioner(&provisioner);
2193            handles.spawn_container_intents(std::mem::take(
2194                &mut *provisioner.pending_container_spawns.lock(),
2195            ));
2196            handles.spawn_teardown_intents(std::mem::take(
2197                &mut *provisioner.pending_container_teardowns.lock(),
2198            ));
2199            spawn_custom_invokes(&provisioner);
2200        }
2201
2202        let outputs = Self::resolve_template_outputs(
2203            &input.template_body,
2204            &input.parameters,
2205            &resources_snapshot,
2206            &self.state,
2207        );
2208        Self::ensure_export_uniqueness(&self.state, &req.account_id, &input.stack_name, &outputs)?;
2209        {
2210            let mut accounts = self.state.write();
2211            let state = accounts.get_or_create(&req.account_id);
2212            if let Some(stack) = state
2213                .stacks
2214                .values_mut()
2215                .find(|s| s.stack_id == stack_id && s.status != "DELETE_COMPLETE")
2216            {
2217                stack.outputs = outputs.clone();
2218            }
2219            Self::sync_exports_imports(
2220                state,
2221                &stack_id,
2222                &input.stack_name,
2223                &outputs,
2224                &imported_names,
2225            );
2226        }
2227
2228        Self::send_stack_notification(
2229            &self.deps.delivery,
2230            &notification_arns,
2231            &stack_name_for_notif,
2232            &stack_id,
2233            "UPDATE_COMPLETE",
2234        );
2235
2236        // Persist every snapshot-backed service the update touched, so created
2237        // or deleted resources are reflected on disk after a restart.
2238        persist_touched_services(&self.snapshot_hooks, touched_types).await;
2239
2240        Ok(AwsResponse::xml(
2241            StatusCode::OK,
2242            xml_responses::update_stack_response(&stack_id, &req.request_id),
2243        ))
2244    }
2245
2246    fn get_template(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2247        // GetTemplate has no `@required` members in Smithy; tolerate omission.
2248        let stack_name = Self::get_param(req, "StackName").unwrap_or_default();
2249
2250        let accounts = self.state.read();
2251        let empty = CloudFormationState::new(&req.account_id, &req.region);
2252        let state = accounts.get(&req.account_id).unwrap_or(&empty);
2253        // Stack-not-found has no declared shape on GetTemplate
2254        // (`ChangeSetNotFoundException` is the only declared error). Return
2255        // an empty template body rather than emit an undeclared
2256        // `ValidationError` for synthetic conformance inputs.
2257        let body = state
2258            .stacks
2259            .values()
2260            .find(|s| {
2261                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
2262            })
2263            .map(|s| s.template.clone())
2264            .unwrap_or_default();
2265
2266        Ok(AwsResponse::xml(
2267            StatusCode::OK,
2268            xml_responses::get_template_response(&body, &req.request_id),
2269        ))
2270    }
2271}
2272
2273#[async_trait]
2274impl AwsService for CloudFormationService {
2275    fn service_name(&self) -> &str {
2276        "cloudformation"
2277    }
2278
2279    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2280        let action = req.action.as_str();
2281
2282        // Validate scalar field constraints against the Smithy model
2283        // before dispatching. Per-handler logic still owns business
2284        // validation (cross-field checks, parsing, existence). This
2285        // catches length / range / enum violations uniformly so every
2286        // operation returns a `ValidationError` instead of `200 OK` on
2287        // malformed scalars.
2288        crate::input_constraints::validate_input(action, &Self::get_all_params(&req))?;
2289
2290        // Only ops whose handlers actually write to per-account state
2291        // need to trigger snapshot persistence. Pass-through ops that
2292        // return canned IDs but don't touch state are excluded.
2293        let mutates = matches!(
2294            action,
2295            "CreateStack"
2296                | "DeleteStack"
2297                | "UpdateStack"
2298                | "CreateChangeSet"
2299                | "DeleteChangeSet"
2300                | "ExecuteChangeSet"
2301                | "CreateStackSet"
2302                | "DeleteStackSet"
2303                | "CreateStackRefactor"
2304                | "CreateGeneratedTemplate"
2305                | "DeleteGeneratedTemplate"
2306                | "SetStackPolicy"
2307                | "UpdateTerminationProtection"
2308                | "ActivateOrganizationsAccess"
2309                | "DeactivateOrganizationsAccess"
2310        );
2311        let result = match action {
2312            "CreateStack" => self.create_stack(&req).await,
2313            "DeleteStack" => self.delete_stack(&req).await,
2314            "DescribeStacks" => self.describe_stacks(&req),
2315            "ListStacks" => self.list_stacks(&req),
2316            "ListStackResources" => self.list_stack_resources(&req),
2317            "DescribeStackResources" => self.describe_stack_resources(&req),
2318            "UpdateStack" => self.update_stack(&req).await,
2319            "GetTemplate" => self.get_template(&req),
2320            _ => self.handle_extra_action(&req),
2321        };
2322        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
2323            self.save_snapshot().await;
2324        }
2325        // ExecuteChangeSet provisions resources by mutating each service's
2326        // shared state directly but -- unlike CreateStack/UpdateStack/DeleteStack
2327        // -- never triggered the per-service snapshot hook, so the provisioned
2328        // resources were never written through to disk (#1766 class). `cdk
2329        // deploy`, `aws cloudformation deploy`, and SAM all provision via
2330        // CreateChangeSet + ExecuteChangeSet (not CreateStack), so without this
2331        // their resources reported CREATE_COMPLETE yet vanished on restart.
2332        // save_snapshot above only persists CloudFormation's own stack metadata;
2333        // flush every snapshot-backed service the changeset could have touched.
2334        if action == "ExecuteChangeSet"
2335            && matches!(result.as_ref(), Ok(resp) if resp.status.is_success())
2336        {
2337            for hook in self.snapshot_hooks.values() {
2338                hook().await;
2339            }
2340        }
2341        result
2342    }
2343
2344    fn supported_actions(&self) -> &[&str] {
2345        &[
2346            "ActivateOrganizationsAccess",
2347            "ActivateType",
2348            "BatchDescribeTypeConfigurations",
2349            "CancelUpdateStack",
2350            "ContinueUpdateRollback",
2351            "CreateChangeSet",
2352            "CreateGeneratedTemplate",
2353            "CreateStack",
2354            "CreateStackInstances",
2355            "CreateStackRefactor",
2356            "CreateStackSet",
2357            "DeactivateOrganizationsAccess",
2358            "DeactivateType",
2359            "DeleteChangeSet",
2360            "DeleteGeneratedTemplate",
2361            "DeleteStack",
2362            "DeleteStackInstances",
2363            "DeleteStackSet",
2364            "DeregisterType",
2365            "DescribeAccountLimits",
2366            "DescribeChangeSet",
2367            "DescribeChangeSetHooks",
2368            "DescribeEvents",
2369            "DescribeGeneratedTemplate",
2370            "DescribeOrganizationsAccess",
2371            "DescribePublisher",
2372            "DescribeResourceScan",
2373            "DescribeStackDriftDetectionStatus",
2374            "DescribeStackEvents",
2375            "DescribeStackInstance",
2376            "DescribeStackRefactor",
2377            "DescribeStackResource",
2378            "DescribeStackResourceDrifts",
2379            "DescribeStackResources",
2380            "DescribeStackSet",
2381            "DescribeStackSetOperation",
2382            "DescribeStacks",
2383            "DescribeType",
2384            "DescribeTypeRegistration",
2385            "DetectStackDrift",
2386            "DetectStackResourceDrift",
2387            "DetectStackSetDrift",
2388            "EstimateTemplateCost",
2389            "ExecuteChangeSet",
2390            "ExecuteStackRefactor",
2391            "GetGeneratedTemplate",
2392            "GetHookResult",
2393            "GetStackPolicy",
2394            "GetTemplate",
2395            "GetTemplateSummary",
2396            "ImportStacksToStackSet",
2397            "ListChangeSets",
2398            "ListExports",
2399            "ListGeneratedTemplates",
2400            "ListHookResults",
2401            "ListImports",
2402            "ListResourceScanRelatedResources",
2403            "ListResourceScanResources",
2404            "ListResourceScans",
2405            "ListStackInstanceResourceDrifts",
2406            "ListStackInstances",
2407            "ListStackRefactorActions",
2408            "ListStackRefactors",
2409            "ListStackResources",
2410            "ListStackSetAutoDeploymentTargets",
2411            "ListStackSetOperationResults",
2412            "ListStackSetOperations",
2413            "ListStackSets",
2414            "ListStacks",
2415            "ListTypeRegistrations",
2416            "ListTypeVersions",
2417            "ListTypes",
2418            "PublishType",
2419            "RecordHandlerProgress",
2420            "RegisterPublisher",
2421            "RegisterType",
2422            "RollbackStack",
2423            "SetStackPolicy",
2424            "SetTypeConfiguration",
2425            "SetTypeDefaultVersion",
2426            "SignalResource",
2427            "StartResourceScan",
2428            "StopStackSetOperation",
2429            "TestType",
2430            "UpdateGeneratedTemplate",
2431            "UpdateStack",
2432            "UpdateStackInstances",
2433            "UpdateStackSet",
2434            "UpdateTerminationProtection",
2435            "ValidateTemplate",
2436        ]
2437    }
2438}
2439
2440/// Parsed + validated inputs for `UpdateStack`.
2441struct UpdateStackInput {
2442    stack_name: String,
2443    template_body: String,
2444    parameters: BTreeMap<String, String>,
2445    tags: BTreeMap<String, String>,
2446    notification_arns: Vec<String>,
2447}
2448
2449impl UpdateStackInput {
2450    fn from_params(req: &AwsRequest) -> Result<Self, AwsServiceError> {
2451        let params = CloudFormationService::get_all_params(req);
2452
2453        let stack_name = params
2454            .get("StackName")
2455            .ok_or_else(|| {
2456                AwsServiceError::aws_error(
2457                    StatusCode::BAD_REQUEST,
2458                    "ValidationError",
2459                    "StackName is required",
2460                )
2461            })?
2462            .to_string();
2463
2464        // TemplateBody isn't `@required` in Smithy (TemplateURL +
2465        // UsePreviousTemplate are alternatives). Treat omission as an
2466        // empty body so synthetic conformance inputs don't trip an
2467        // undeclared `ValidationError`.
2468        let template_body = params.get("TemplateBody").cloned().unwrap_or_default();
2469
2470        let mut parameters = CloudFormationService::extract_parameters(&params);
2471        CloudFormationService::merge_parameter_defaults(&mut parameters, &template_body);
2472        Ok(Self {
2473            stack_name,
2474            template_body,
2475            parameters,
2476            tags: CloudFormationService::extract_tags(&params),
2477            notification_arns: CloudFormationService::extract_notification_arns(&params),
2478        })
2479    }
2480}
2481
2482/// One row of structured diff returned by `apply_resource_updates`. Used
2483/// by `ExecuteChangeSet` to emit `StackEvent` rows so `DescribeStackEvents`
2484/// reflects the resources actually created / updated / deleted.
2485#[derive(Debug, Clone)]
2486pub(crate) struct ResourceChange {
2487    pub action: ResourceChangeAction,
2488    pub logical_id: String,
2489    pub physical_id: String,
2490    pub resource_type: String,
2491}
2492
2493#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2494pub(crate) enum ResourceChangeAction {
2495    Create,
2496    Update,
2497    Delete,
2498}
2499
2500impl ResourceChangeAction {
2501    pub fn status_in_progress(self) -> &'static str {
2502        match self {
2503            Self::Create => "CREATE_IN_PROGRESS",
2504            Self::Update => "UPDATE_IN_PROGRESS",
2505            Self::Delete => "DELETE_IN_PROGRESS",
2506        }
2507    }
2508    pub fn status_complete(self) -> &'static str {
2509        match self {
2510            Self::Create => "CREATE_COMPLETE",
2511            Self::Update => "UPDATE_COMPLETE",
2512            Self::Delete => "DELETE_COMPLETE",
2513        }
2514    }
2515}
2516
2517/// Apply resource updates: delete removed resources, create new ones, and
2518/// in-place update resources whose properties changed. Returns the list of
2519/// changes applied (for event emission) on success or `Err(msg)` if any
2520/// resource operation fails.
2521pub(crate) fn apply_resource_updates(
2522    stack: &mut crate::state::Stack,
2523    new_resource_defs: &[template::ResourceDefinition],
2524    template_body: &str,
2525    parameters: &BTreeMap<String, String>,
2526    provisioner: &crate::resource_provisioner::ResourceProvisioner,
2527    imports: &BTreeMap<String, String>,
2528) -> Result<Vec<ResourceChange>, String> {
2529    let mut changes: Vec<ResourceChange> = Vec::new();
2530    let old_logical_ids: std::collections::HashSet<String> = stack
2531        .resources
2532        .iter()
2533        .map(|r| r.logical_id.clone())
2534        .collect();
2535    let new_logical_ids: std::collections::HashSet<String> = new_resource_defs
2536        .iter()
2537        .map(|r| r.logical_id.clone())
2538        .collect();
2539
2540    // Delete resources no longer in template
2541    let to_remove: Vec<_> = stack
2542        .resources
2543        .iter()
2544        .filter(|r| !new_logical_ids.contains(&r.logical_id))
2545        .cloned()
2546        .collect();
2547    for resource in &to_remove {
2548        let _ = provisioner.delete_resource(resource);
2549        changes.push(ResourceChange {
2550            action: ResourceChangeAction::Delete,
2551            logical_id: resource.logical_id.clone(),
2552            physical_id: resource.physical_id.clone(),
2553            resource_type: resource.resource_type.clone(),
2554        });
2555    }
2556    stack
2557        .resources
2558        .retain(|r| new_logical_ids.contains(&r.logical_id));
2559
2560    // Build physical ID + attribute maps from existing resources
2561    let mut physical_ids: BTreeMap<String, String> = stack
2562        .resources
2563        .iter()
2564        .map(|r| (r.logical_id.clone(), r.physical_id.clone()))
2565        .collect();
2566    let mut attributes: BTreeMap<String, BTreeMap<String, String>> = stack
2567        .resources
2568        .iter()
2569        .map(|r| (r.logical_id.clone(), r.attributes.clone()))
2570        .collect();
2571
2572    // Create new resources / update resources that already exist. Provision in
2573    // dependency order so a `Ref`/`GetAtt`/`Fn::Sub`/`DependsOn` to another
2574    // resource resolves to that resource's physical id rather than its bare
2575    // logical id (which would otherwise get baked into derived state — e.g. a
2576    // Step Functions ASL referencing a Lambda declared later in the template).
2577    let order = template::dependency_order(template_body, parameters, new_resource_defs);
2578    for &idx in &order {
2579        let resource_def = &new_resource_defs[idx];
2580        let resolved_def = template::resolve_resource_properties_with_attrs(
2581            resource_def,
2582            template_body,
2583            parameters,
2584            &physical_ids,
2585            &attributes,
2586            imports,
2587        )
2588        .map_err(|e| {
2589            format!(
2590                "Failed to resolve resource {}: {e}",
2591                resource_def.logical_id
2592            )
2593        })?;
2594
2595        if !old_logical_ids.contains(&resource_def.logical_id) {
2596            match provisioner.create_resource(&resolved_def) {
2597                Ok(stack_resource) => {
2598                    changes.push(ResourceChange {
2599                        action: ResourceChangeAction::Create,
2600                        logical_id: stack_resource.logical_id.clone(),
2601                        physical_id: stack_resource.physical_id.clone(),
2602                        resource_type: stack_resource.resource_type.clone(),
2603                    });
2604                    physical_ids.insert(
2605                        stack_resource.logical_id.clone(),
2606                        stack_resource.physical_id.clone(),
2607                    );
2608                    attributes.insert(
2609                        stack_resource.logical_id.clone(),
2610                        stack_resource.attributes.clone(),
2611                    );
2612                    stack.resources.push(stack_resource);
2613                }
2614                Err(e) => {
2615                    tracing::warn!(
2616                        "Failed to create resource {} during update: {e}",
2617                        resource_def.logical_id
2618                    );
2619                    return Err(format!(
2620                        "Failed to create resource {}: {e}",
2621                        resource_def.logical_id
2622                    ));
2623                }
2624            }
2625        } else {
2626            // Resource exists in both old and new templates — try to apply
2627            // an in-place update. The provisioner returns `Ok(None)` for
2628            // resource types that don't support updates yet; in that case
2629            // the existing resource stays as-is so the rest of the stack
2630            // continues to validate.
2631            let existing = stack
2632                .resources
2633                .iter()
2634                .find(|r| r.logical_id == resource_def.logical_id)
2635                .cloned();
2636            if let Some(existing) = existing {
2637                match provisioner.update_resource(&existing, &resolved_def) {
2638                    Ok(Some(updated)) => {
2639                        changes.push(ResourceChange {
2640                            action: ResourceChangeAction::Update,
2641                            logical_id: updated.logical_id.clone(),
2642                            physical_id: updated.physical_id.clone(),
2643                            resource_type: updated.resource_type.clone(),
2644                        });
2645                        physical_ids
2646                            .insert(updated.logical_id.clone(), updated.physical_id.clone());
2647                        attributes.insert(updated.logical_id.clone(), updated.attributes.clone());
2648                        if let Some(slot) = stack
2649                            .resources
2650                            .iter_mut()
2651                            .find(|r| r.logical_id == updated.logical_id)
2652                        {
2653                            *slot = updated;
2654                        }
2655                    }
2656                    Ok(None) => {
2657                        // Resource type has no update path — leave the
2658                        // existing physical resource untouched.
2659                    }
2660                    Err(e) => {
2661                        tracing::warn!(
2662                            "Failed to update resource {} during update: {e}",
2663                            resource_def.logical_id
2664                        );
2665                        return Err(format!(
2666                            "Failed to update resource {}: {e}",
2667                            resource_def.logical_id
2668                        ));
2669                    }
2670                }
2671            }
2672        }
2673    }
2674
2675    Ok(changes)
2676}
2677
2678/// Pushes a single `StackEvent` row onto the per-stack event log so
2679/// `DescribeStackEvents` returns a chronological history of resource and
2680/// stack-level state transitions.
2681pub(crate) fn record_event(
2682    state: &mut crate::state::CloudFormationState,
2683    stack_id: &str,
2684    stack_name: &str,
2685    logical_id: &str,
2686    physical_id: &str,
2687    resource_type: &str,
2688    status: &str,
2689) {
2690    use serde_json::json;
2691    let event_id = format!(
2692        "{}-{:x}",
2693        logical_id,
2694        std::time::SystemTime::now()
2695            .duration_since(std::time::UNIX_EPOCH)
2696            .map(|d| d.as_nanos())
2697            .unwrap_or(0)
2698    );
2699    let log = state.events.entry(stack_id.to_string()).or_default();
2700
2701    // Timestamps must be sub-second AND strictly increasing within a stack's
2702    // event log. `sam`'s deploy-wait reads the REVIEW_IN_PROGRESS marker's
2703    // timestamp and then only registers events whose `Timestamp` is strictly
2704    // greater; a fast stack that provisions within one second would otherwise
2705    // stamp its REVIEW_IN_PROGRESS marker and terminal CREATE_COMPLETE
2706    // identically, so sam never sees completion and polls until it times out.
2707    // Use millisecond precision and bump by 1ms when the clock hasn't advanced
2708    // past the previous event, guaranteeing a strict order.
2709    // Truncate to the stored (millisecond) resolution before comparing, so the
2710    // strict-ordering check isn't fooled by sub-millisecond bits that vanish on
2711    // serialization (two events in the same millisecond would otherwise both
2712    // serialize identically despite `now > prev` holding at full precision).
2713    let now = chrono::DateTime::from_timestamp_millis(Utc::now().timestamp_millis())
2714        .unwrap_or_else(Utc::now);
2715    let timestamp = match log.last().and_then(|e| e["Timestamp"].as_str()) {
2716        Some(prev) => match chrono::DateTime::parse_from_rfc3339(prev) {
2717            Ok(prev) => {
2718                let prev = prev.with_timezone(&Utc);
2719                if now > prev {
2720                    now
2721                } else {
2722                    prev + chrono::Duration::milliseconds(1)
2723                }
2724            }
2725            Err(_) => now,
2726        },
2727        None => now,
2728    };
2729
2730    log.push(json!({
2731        "EventId": event_id,
2732        "StackId": stack_id,
2733        "StackName": stack_name,
2734        "LogicalResourceId": logical_id,
2735        "PhysicalResourceId": physical_id,
2736        "ResourceType": resource_type,
2737        "ResourceStatus": status,
2738        "Timestamp": timestamp.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
2739    }));
2740}
2741
2742/// Emits IN_PROGRESS + COMPLETE event pairs for every resource change
2743/// applied during an update. Mirrors the event sequence real CloudFormation
2744/// publishes during `ExecuteChangeSet` / `UpdateStack`.
2745/// Persist the CloudFormation snapshot from a detached task. Mirrors
2746/// `CloudFormationService::save_snapshot` but takes owned handles so it can
2747/// run inside the background CreateStack provisioning task (see RDS's
2748/// `save_snapshot_static`).
2749async fn save_snapshot_static(
2750    state: SharedCloudFormationState,
2751    store: Option<Arc<dyn SnapshotStore>>,
2752    lock: Arc<AsyncMutex<()>>,
2753) {
2754    let Some(store) = store else {
2755        return;
2756    };
2757    let _guard = lock.lock().await;
2758    let snapshot = CloudFormationSnapshot {
2759        schema_version: CLOUDFORMATION_SNAPSHOT_SCHEMA_VERSION,
2760        state: None,
2761        accounts: Some(state.read().clone()),
2762    };
2763    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
2764        let bytes = serde_json::to_vec(&snapshot)
2765            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
2766        store.save(&bytes)
2767    })
2768    .await;
2769    match join {
2770        Ok(Ok(())) => {}
2771        Ok(Err(err)) => tracing::error!(%err, "failed to write cloudformation snapshot"),
2772        Err(err) => tracing::error!(%err, "cloudformation snapshot task panicked"),
2773    }
2774}
2775
2776pub(crate) fn record_stack_events(
2777    state: &mut crate::state::CloudFormationState,
2778    stack_id: &str,
2779    stack_name: &str,
2780    changes: &[ResourceChange],
2781) {
2782    for ch in changes {
2783        record_event(
2784            state,
2785            stack_id,
2786            stack_name,
2787            &ch.logical_id,
2788            &ch.physical_id,
2789            &ch.resource_type,
2790            ch.action.status_in_progress(),
2791        );
2792        record_event(
2793            state,
2794            stack_id,
2795            stack_name,
2796            &ch.logical_id,
2797            &ch.physical_id,
2798            &ch.resource_type,
2799            ch.action.status_complete(),
2800        );
2801    }
2802}
2803
2804/// Emits a stack-level lifecycle event (`UPDATE_IN_PROGRESS`,
2805/// `UPDATE_COMPLETE`, `UPDATE_ROLLBACK_COMPLETE`, etc.) keyed on the
2806/// stack's own `LogicalResourceId == stack_name`, matching real CFN.
2807pub(crate) fn record_stack_status_event(
2808    state: &mut crate::state::CloudFormationState,
2809    stack_id: &str,
2810    stack_name: &str,
2811    resource_type: &str,
2812    status: &str,
2813) {
2814    record_event(
2815        state,
2816        stack_id,
2817        stack_name,
2818        stack_name,
2819        stack_id,
2820        resource_type,
2821        status,
2822    );
2823}
2824
2825#[cfg(test)]
2826mod tests {
2827    use super::*;
2828    use http::HeaderMap;
2829    use parking_lot::RwLock;
2830    use std::collections::HashMap;
2831    use std::sync::Arc;
2832
2833    #[test]
2834    fn merge_parameter_defaults_fills_omitted_params() {
2835        // §1.6: a parameter the caller omitted gets its declared Default, so a
2836        // Ref to it resolves to the default instead of the bare param name.
2837        let template = r#"{
2838            "Parameters": {
2839                "InstanceType": {"Type": "String", "Default": "t3.micro"},
2840                "Count": {"Type": "Number", "Default": 3},
2841                "Supplied": {"Type": "String", "Default": "dflt"}
2842            },
2843            "Resources": {}
2844        }"#;
2845        let mut params = BTreeMap::new();
2846        params.insert("Supplied".to_string(), "override".to_string());
2847        CloudFormationService::merge_parameter_defaults(&mut params, template);
2848        assert_eq!(
2849            params.get("InstanceType").map(String::as_str),
2850            Some("t3.micro")
2851        );
2852        assert_eq!(params.get("Count").map(String::as_str), Some("3"));
2853        // A supplied value is not overwritten by the default.
2854        assert_eq!(params.get("Supplied").map(String::as_str), Some("override"));
2855    }
2856
2857    fn make_service() -> CloudFormationService {
2858        let cf_state = Arc::new(RwLock::new(
2859            fakecloud_core::multi_account::MultiAccountState::new(
2860                "123456789012",
2861                "us-east-1",
2862                "http://localhost:4566",
2863            ),
2864        ));
2865        let deps = CloudFormationDeps {
2866            sqs: Arc::new(RwLock::new(
2867                fakecloud_core::multi_account::MultiAccountState::new(
2868                    "123456789012",
2869                    "us-east-1",
2870                    "http://localhost:4566",
2871                ),
2872            )),
2873            sns: Arc::new(RwLock::new(
2874                fakecloud_core::multi_account::MultiAccountState::new(
2875                    "123456789012",
2876                    "us-east-1",
2877                    "http://localhost:4566",
2878                ),
2879            )),
2880            ssm: Arc::new(RwLock::new(
2881                fakecloud_core::multi_account::MultiAccountState::new(
2882                    "123456789012",
2883                    "us-east-1",
2884                    "http://localhost:4566",
2885                ),
2886            )),
2887            iam: Arc::new(RwLock::new(
2888                fakecloud_core::multi_account::MultiAccountState::new(
2889                    "123456789012",
2890                    "us-east-1",
2891                    "",
2892                ),
2893            )),
2894            s3: Arc::new(RwLock::new(
2895                fakecloud_core::multi_account::MultiAccountState::new(
2896                    "123456789012",
2897                    "us-east-1",
2898                    "",
2899                ),
2900            )),
2901            eventbridge: Arc::new(RwLock::new(
2902                fakecloud_core::multi_account::MultiAccountState::new(
2903                    "123456789012",
2904                    "us-east-1",
2905                    "",
2906                ),
2907            )),
2908            dynamodb: Arc::new(RwLock::new(
2909                fakecloud_core::multi_account::MultiAccountState::new(
2910                    "123456789012",
2911                    "us-east-1",
2912                    "",
2913                ),
2914            )),
2915            logs: Arc::new(RwLock::new(
2916                fakecloud_core::multi_account::MultiAccountState::new(
2917                    "123456789012",
2918                    "us-east-1",
2919                    "",
2920                ),
2921            )),
2922            lambda: Arc::new(RwLock::new(
2923                fakecloud_core::multi_account::MultiAccountState::new(
2924                    "123456789012",
2925                    "us-east-1",
2926                    "",
2927                ),
2928            )),
2929            secretsmanager: Arc::new(RwLock::new(
2930                fakecloud_core::multi_account::MultiAccountState::new(
2931                    "123456789012",
2932                    "us-east-1",
2933                    "",
2934                ),
2935            )),
2936            kinesis: Arc::new(RwLock::new(
2937                fakecloud_core::multi_account::MultiAccountState::new(
2938                    "123456789012",
2939                    "us-east-1",
2940                    "",
2941                ),
2942            )),
2943            kms: Arc::new(RwLock::new(
2944                fakecloud_core::multi_account::MultiAccountState::new(
2945                    "123456789012",
2946                    "us-east-1",
2947                    "",
2948                ),
2949            )),
2950            ecr: Arc::new(RwLock::new(
2951                fakecloud_core::multi_account::MultiAccountState::new(
2952                    "123456789012",
2953                    "us-east-1",
2954                    "",
2955                ),
2956            )),
2957            cloudwatch: Arc::new(RwLock::new(fakecloud_cloudwatch::CloudWatchAccounts::new())),
2958            elbv2: Arc::new(RwLock::new(fakecloud_elbv2::Elbv2Accounts::new())),
2959            organizations: Arc::new(RwLock::new(None)),
2960            cognito: Arc::new(RwLock::new(
2961                fakecloud_core::multi_account::MultiAccountState::new(
2962                    "123456789012",
2963                    "us-east-1",
2964                    "",
2965                ),
2966            )),
2967            rds: Arc::new(RwLock::new(
2968                fakecloud_core::multi_account::MultiAccountState::new(
2969                    "123456789012",
2970                    "us-east-1",
2971                    "",
2972                ),
2973            )),
2974            ec2: Arc::new(RwLock::new(
2975                fakecloud_core::multi_account::MultiAccountState::new(
2976                    "123456789012",
2977                    "us-east-1",
2978                    "",
2979                ),
2980            )),
2981            autoscaling: Arc::new(RwLock::new(
2982                fakecloud_autoscaling::AutoScalingAccounts::new(),
2983            )),
2984            batch: Arc::new(RwLock::new(fakecloud_batch::BatchAccounts::new())),
2985            pipes: Arc::new(RwLock::new(fakecloud_pipes::PipesAccounts::new())),
2986            ecs: Arc::new(RwLock::new(
2987                fakecloud_core::multi_account::MultiAccountState::new(
2988                    "123456789012",
2989                    "us-east-1",
2990                    "",
2991                ),
2992            )),
2993            acm: Arc::new(RwLock::new(fakecloud_acm::AcmAccounts::new())),
2994            elasticache: Arc::new(RwLock::new(
2995                fakecloud_core::multi_account::MultiAccountState::new(
2996                    "123456789012",
2997                    "us-east-1",
2998                    "",
2999                ),
3000            )),
3001            route53: Arc::new(RwLock::new(fakecloud_route53::Route53Accounts::new())),
3002            cloudfront: Arc::new(RwLock::new(fakecloud_cloudfront::CloudFrontAccounts::new())),
3003            stepfunctions: Arc::new(RwLock::new(
3004                fakecloud_core::multi_account::MultiAccountState::new(
3005                    "123456789012",
3006                    "us-east-1",
3007                    "",
3008                ),
3009            )),
3010            wafv2: Arc::new(RwLock::new(fakecloud_wafv2::Wafv2Accounts::default())),
3011            apigateway: Arc::new(RwLock::new(
3012                fakecloud_core::multi_account::MultiAccountState::new(
3013                    "123456789012",
3014                    "us-east-1",
3015                    "",
3016                ),
3017            )),
3018            apigatewayv2: Arc::new(RwLock::new(
3019                fakecloud_core::multi_account::MultiAccountState::new(
3020                    "123456789012",
3021                    "us-east-1",
3022                    "",
3023                ),
3024            )),
3025            ses: Arc::new(RwLock::new(
3026                fakecloud_core::multi_account::MultiAccountState::new(
3027                    "123456789012",
3028                    "us-east-1",
3029                    "",
3030                ),
3031            )),
3032            application_autoscaling: Arc::new(parking_lot::RwLock::new(
3033                fakecloud_application_autoscaling::ApplicationAutoScalingAccounts::new(),
3034            )),
3035            athena: Arc::new(parking_lot::RwLock::new(
3036                fakecloud_athena::AthenaAccounts::new(),
3037            )),
3038            firehose: Arc::new(parking_lot::RwLock::new(
3039                fakecloud_firehose::FirehoseAccounts::new(),
3040            )),
3041            glue: Arc::new(parking_lot::RwLock::new(fakecloud_glue::GlueAccounts::new())),
3042            eks: Arc::new(parking_lot::RwLock::new(
3043                fakecloud_core::multi_account::MultiAccountState::new(
3044                    "123456789012",
3045                    "us-east-1",
3046                    "",
3047                ),
3048            )),
3049            servicediscovery: Arc::new(parking_lot::RwLock::new(
3050                fakecloud_core::multi_account::MultiAccountState::new(
3051                    "123456789012",
3052                    "us-east-1",
3053                    "",
3054                ),
3055            )),
3056            delivery: Arc::new(DeliveryBus::new()),
3057            lambda_runtime: None,
3058            rds_runtime: None,
3059            ec2_runtime: None,
3060            ecs_runtime: None,
3061            elasticache_runtime: None,
3062        };
3063        CloudFormationService::new(cf_state, deps)
3064    }
3065
3066    fn make_request(action: &str, params: HashMap<String, String>) -> AwsRequest {
3067        AwsRequest {
3068            service: "cloudformation".to_string(),
3069            action: action.to_string(),
3070            region: "us-east-1".to_string(),
3071            account_id: "123456789012".to_string(),
3072            request_id: "test-request-id".to_string(),
3073            headers: HeaderMap::new(),
3074            query_params: params,
3075            body: bytes::Bytes::new(),
3076            body_stream: parking_lot::Mutex::new(None),
3077            path_segments: vec![],
3078            raw_path: "/".to_string(),
3079            raw_query: String::new(),
3080            method: http::Method::POST,
3081            is_query_protocol: true,
3082            access_key_id: None,
3083            principal: None,
3084        }
3085    }
3086
3087    #[tokio::test]
3088    async fn update_stack_sets_failed_status_on_resource_error() {
3089        let svc = make_service();
3090
3091        // Create a stack with just a queue
3092        let mut create_params = HashMap::new();
3093        create_params.insert("StackName".to_string(), "test-stack".to_string());
3094        create_params.insert(
3095            "TemplateBody".to_string(),
3096            r#"{"Resources":{"MyQueue":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"q1"}}}}"#.to_string(),
3097        );
3098        let req = make_request("CreateStack", create_params);
3099        let result = svc.create_stack(&req).await;
3100        assert!(result.is_ok());
3101
3102        // Update stack adding an SNS subscription with a non-existent topic
3103        let mut update_params = HashMap::new();
3104        update_params.insert("StackName".to_string(), "test-stack".to_string());
3105        update_params.insert(
3106            "TemplateBody".to_string(),
3107            r#"{"Resources":{"MyQueue":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"q1"}},"BadSub":{"Type":"AWS::SNS::Subscription","Properties":{"TopicArn":"arn:aws:sns:us-east-1:123456789012:nope","Protocol":"sqs","Endpoint":"arn:aws:sqs:us-east-1:123456789012:q1"}}}}"#.to_string(),
3108        );
3109        let req = make_request("UpdateStack", update_params);
3110        let result = svc.update_stack(&req).await;
3111
3112        // Should return an error
3113        assert!(result.is_err());
3114
3115        // Stack status should be UPDATE_ROLLBACK_COMPLETE — matches the
3116        // terminal status real CloudFormation lands on after a failed
3117        // update attempt that gets rolled back.
3118        let accounts = svc.state.read();
3119        let state = accounts.get("123456789012").unwrap();
3120        let stack = state.stacks.get("test-stack").unwrap();
3121        assert_eq!(stack.status, "UPDATE_ROLLBACK_COMPLETE");
3122    }
3123
3124    #[tokio::test]
3125    async fn create_stack_resolves_ref_to_physical_id() {
3126        let svc = make_service();
3127
3128        // Template where subscription Refs the topic
3129        let template = r#"{
3130            "Resources": {
3131                "MyTopic": {
3132                    "Type": "AWS::SNS::Topic",
3133                    "Properties": { "TopicName": "ref-test-topic" }
3134                },
3135                "MySub": {
3136                    "Type": "AWS::SNS::Subscription",
3137                    "Properties": {
3138                        "TopicArn": { "Ref": "MyTopic" },
3139                        "Protocol": "sqs",
3140                        "Endpoint": "arn:aws:sqs:us-east-1:123456789012:some-queue"
3141                    }
3142                }
3143            }
3144        }"#;
3145
3146        let mut params = HashMap::new();
3147        params.insert("StackName".to_string(), "ref-stack".to_string());
3148        params.insert("TemplateBody".to_string(), template.to_string());
3149        let req = make_request("CreateStack", params);
3150        let result = svc.create_stack(&req).await;
3151        assert!(result.is_ok(), "CreateStack failed: {:?}", result.err());
3152
3153        // Verify both resources were created
3154        let accounts = svc.state.read();
3155        let state = accounts.get("123456789012").unwrap();
3156        let stack = state.stacks.get("ref-stack").unwrap();
3157        assert_eq!(stack.resources.len(), 2);
3158        assert_eq!(stack.status, "CREATE_COMPLETE");
3159
3160        // The subscription's physical ID should be an ARN (not just "MyTopic")
3161        let sub = stack
3162            .resources
3163            .iter()
3164            .find(|r| r.logical_id == "MySub")
3165            .unwrap();
3166        assert!(
3167            sub.physical_id.contains("ref-test-topic"),
3168            "Subscription physical ID should reference the topic ARN, got: {}",
3169            sub.physical_id
3170        );
3171    }
3172
3173    /// On the multi-thread server runtime, a stack containing a custom
3174    /// resource (whose provisioning can block for minutes on a cold Lambda
3175    /// image pull) must NOT be provisioned synchronously inside the request
3176    /// handler — CreateStack returns the StackId immediately and DescribeStacks
3177    /// observes CREATE_IN_PROGRESS -> CREATE_COMPLETE (bug-audit 2026-06-13,
3178    /// 3.1).
3179    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
3180    async fn create_stack_custom_resource_provisions_asynchronously() {
3181        let svc = make_service();
3182        let template = r#"{
3183            "Resources": {
3184                "MyCustom": {
3185                    "Type": "Custom::Thing",
3186                    "Properties": {
3187                        "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:handler"
3188                    }
3189                }
3190            }
3191        }"#;
3192        let mut params = HashMap::new();
3193        params.insert("StackName".to_string(), "async-stack".to_string());
3194        params.insert("TemplateBody".to_string(), template.to_string());
3195        let req = make_request("CreateStack", params);
3196
3197        // CreateStack returns promptly with a StackId; provisioning runs in a
3198        // detached background task. The stack is recorded before the task can
3199        // possibly finish, so right after return it is at worst already
3200        // terminal — never a value that proves the handler blocked on the
3201        // (potentially minutes-long) provisioning loop. The key guarantee is
3202        // that the call returns without running the provisioner inline.
3203        let resp = svc
3204            .create_stack(&req)
3205            .await
3206            .expect("create returns StackId");
3207        assert!(resp.status.is_success());
3208        {
3209            let accounts = svc.state.read();
3210            let stack = accounts
3211                .get("123456789012")
3212                .unwrap()
3213                .stacks
3214                .get("async-stack")
3215                .expect("stack seeded synchronously");
3216            assert!(
3217                stack.status == "CREATE_IN_PROGRESS" || stack.status == "CREATE_COMPLETE",
3218                "unexpected status right after create: {}",
3219                stack.status
3220            );
3221        }
3222
3223        // Poll DescribeStacks (via state) until the background task flips the
3224        // stack to its terminal CREATE_COMPLETE status.
3225        let mut status = String::new();
3226        for _ in 0..200 {
3227            {
3228                let accounts = svc.state.read();
3229                if let Some(stack) = accounts
3230                    .get("123456789012")
3231                    .and_then(|s| s.stacks.get("async-stack"))
3232                {
3233                    status = stack.status.clone();
3234                    if status != "CREATE_IN_PROGRESS" {
3235                        break;
3236                    }
3237                }
3238            }
3239            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3240        }
3241        assert_eq!(
3242            status, "CREATE_COMPLETE",
3243            "stack should reach CREATE_COMPLETE"
3244        );
3245
3246        let accounts = svc.state.read();
3247        let stack = accounts
3248            .get("123456789012")
3249            .unwrap()
3250            .stacks
3251            .get("async-stack")
3252            .unwrap();
3253        assert_eq!(stack.resources.len(), 1);
3254        assert_eq!(stack.resources[0].resource_type, "Custom::Thing");
3255    }
3256
3257    #[tokio::test]
3258    async fn output_getatt_resolves_well_known_attribute() {
3259        // An Output that GetAtts a well-known attribute the create handler does
3260        // not eagerly capture (SQS QueueUrl) must resolve to the live value, not
3261        // a `Queue.QueueUrl` placeholder. Regression for bug-hunt 2026-06-25 1.11:
3262        // resolve_template_outputs reads StackResource.attributes, which now
3263        // carries the live get_att overlay applied during provisioning.
3264        let svc = make_service();
3265        let template = r#"{
3266            "Resources": {
3267                "Queue": { "Type": "AWS::SQS::Queue", "Properties": { "QueueName": "out-q" } }
3268            },
3269            "Outputs": {
3270                "Url": { "Value": { "Fn::GetAtt": ["Queue", "QueueUrl"] } }
3271            }
3272        }"#;
3273        let mut params = HashMap::new();
3274        params.insert("StackName".to_string(), "out-stack".to_string());
3275        params.insert("TemplateBody".to_string(), template.to_string());
3276        svc.create_stack(&make_request("CreateStack", params))
3277            .await
3278            .expect("create returns StackId");
3279
3280        let mut url = String::new();
3281        for _ in 0..200 {
3282            {
3283                let accounts = svc.state.read();
3284                if let Some(stack) = accounts
3285                    .get("123456789012")
3286                    .and_then(|s| s.stacks.get("out-stack"))
3287                {
3288                    if stack.status != "CREATE_IN_PROGRESS" {
3289                        url = stack
3290                            .outputs
3291                            .iter()
3292                            .find(|o| o.key == "Url")
3293                            .map(|o| o.value.clone())
3294                            .unwrap_or_default();
3295                        break;
3296                    }
3297                }
3298            }
3299            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3300        }
3301        assert!(
3302            url.contains("out-q") && url != "Queue.QueueUrl",
3303            "GetAtt QueueUrl output should resolve to the live url, got {url:?}"
3304        );
3305    }
3306
3307    // ── Service error paths ──
3308
3309    #[tokio::test]
3310    async fn create_stack_missing_name_errors() {
3311        let svc = make_service();
3312        let mut params = HashMap::new();
3313        params.insert("TemplateBody".to_string(), "{}".to_string());
3314        let req = make_request("CreateStack", params);
3315        assert!(svc.create_stack(&req).await.is_err());
3316    }
3317
3318    #[tokio::test]
3319    async fn create_stack_missing_template_creates_empty_stack() {
3320        // `TemplateBody` isn't `@required` in Smithy and CreateStack
3321        // declares no `ValidationError` shape, so missing/placeholder
3322        // bodies now create an empty stack rather than rejecting with
3323        // an undeclared wire code (strict-mode conformance gap).
3324        let svc = make_service();
3325        let mut params = HashMap::new();
3326        params.insert("StackName".to_string(), "s".to_string());
3327        let req = make_request("CreateStack", params);
3328        svc.create_stack(&req)
3329            .await
3330            .expect("empty-body create succeeds");
3331    }
3332
3333    #[tokio::test]
3334    async fn create_stack_duplicate_errors() {
3335        let svc = make_service();
3336        let mut params = HashMap::new();
3337        params.insert("StackName".to_string(), "dup".to_string());
3338        params.insert(
3339            "TemplateBody".to_string(),
3340            r#"{"Resources":{"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"dq"}}}}"#
3341                .to_string(),
3342        );
3343        let req = make_request("CreateStack", params.clone());
3344        svc.create_stack(&req).await.unwrap();
3345        let req = make_request("CreateStack", params);
3346        assert!(svc.create_stack(&req).await.is_err());
3347    }
3348
3349    #[tokio::test]
3350    async fn create_stack_invalid_template_creates_empty_stack() {
3351        // CreateStack's Smithy `errors` list has no `ValidationError`
3352        // shape, so unparseable bodies degrade to an empty parsed
3353        // template instead of raising an undeclared wire code.
3354        let svc = make_service();
3355        let mut params = HashMap::new();
3356        params.insert("StackName".to_string(), "bad".to_string());
3357        params.insert("TemplateBody".to_string(), "not json".to_string());
3358        let req = make_request("CreateStack", params);
3359        svc.create_stack(&req)
3360            .await
3361            .expect("bad-body create succeeds");
3362    }
3363
3364    #[tokio::test]
3365    async fn delete_stack_unknown_is_noop() {
3366        let svc = make_service();
3367        let mut params = HashMap::new();
3368        params.insert("StackName".to_string(), "ghost".to_string());
3369        let req = make_request("DeleteStack", params);
3370        assert!(svc.delete_stack(&req).await.is_ok());
3371    }
3372
3373    #[test]
3374    fn describe_stacks_nonexistent_errors() {
3375        // Querying an explicit, unknown StackName returns AWS's
3376        // `ValidationError: Stack with id <name> does not exist` so deploy
3377        // tools that probe stack existence (SAM, `aws cloudformation
3378        // deploy`) get the signal they expect (issue #1646).
3379        let svc = make_service();
3380        let mut params = HashMap::new();
3381        params.insert("StackName".to_string(), "ghost".to_string());
3382        let req = make_request("DescribeStacks", params);
3383        match svc.describe_stacks(&req) {
3384            Ok(_) => panic!("ghost stack must return an error, not an empty list"),
3385            Err(e) => {
3386                assert_eq!(e.status(), StatusCode::BAD_REQUEST);
3387                assert_eq!(e.code(), "ValidationError");
3388                assert!(
3389                    e.message().contains("does not exist"),
3390                    "got: {}",
3391                    e.message()
3392                );
3393            }
3394        }
3395    }
3396
3397    #[test]
3398    fn describe_stacks_empty_returns_all() {
3399        let svc = make_service();
3400        let req = make_request("DescribeStacks", HashMap::new());
3401        let resp = svc.describe_stacks(&req).unwrap();
3402        let b = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
3403        assert!(b.contains("DescribeStacksResult"));
3404    }
3405
3406    #[test]
3407    fn list_stacks_empty_returns_ok() {
3408        let svc = make_service();
3409        let req = make_request("ListStacks", HashMap::new());
3410        let resp = svc.list_stacks(&req).unwrap();
3411        let b = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
3412        assert!(b.contains("ListStacksResult"));
3413    }
3414
3415    #[test]
3416    fn list_stack_resources_missing_name_returns_validation_error() {
3417        // ListStackResources declares no `errors` in Smithy, so any
3418        // AWS-shaped 4xx counts as a handler response. We reject an
3419        // omitted StackName with `ValidationError` to keep negative
3420        // conformance variants honest; unknown-but-supplied names still
3421        // resolve to an empty list (see the test below).
3422        let svc = make_service();
3423        let req = make_request("ListStackResources", HashMap::new());
3424        let err = match svc.list_stack_resources(&req) {
3425            Err(e) => e,
3426            Ok(_) => panic!("omitted StackName must be rejected"),
3427        };
3428        assert_eq!(err.code(), "ValidationError");
3429    }
3430
3431    #[test]
3432    fn list_stack_resources_unknown_stack_returns_empty() {
3433        let svc = make_service();
3434        let mut params = HashMap::new();
3435        params.insert("StackName".to_string(), "ghost".to_string());
3436        let req = make_request("ListStackResources", params);
3437        svc.list_stack_resources(&req).expect("unknown is empty");
3438    }
3439
3440    #[test]
3441    fn describe_stack_resources_missing_name_returns_empty() {
3442        let svc = make_service();
3443        let req = make_request("DescribeStackResources", HashMap::new());
3444        svc.describe_stack_resources(&req)
3445            .expect("missing name is ok");
3446    }
3447
3448    #[test]
3449    fn get_template_missing_name_returns_empty_body() {
3450        let svc = make_service();
3451        let req = make_request("GetTemplate", HashMap::new());
3452        svc.get_template(&req).expect("missing name is ok");
3453    }
3454
3455    #[test]
3456    fn get_template_unknown_stack_returns_empty_body() {
3457        let svc = make_service();
3458        let mut params = HashMap::new();
3459        params.insert("StackName".to_string(), "ghost".to_string());
3460        let req = make_request("GetTemplate", params);
3461        svc.get_template(&req).expect("unknown is empty");
3462    }
3463
3464    #[tokio::test]
3465    async fn update_stack_missing_name_errors() {
3466        let svc = make_service();
3467        let mut params = HashMap::new();
3468        params.insert("TemplateBody".to_string(), "{}".to_string());
3469        let req = make_request("UpdateStack", params);
3470        assert!(svc.update_stack(&req).await.is_err());
3471    }
3472
3473    #[tokio::test]
3474    async fn update_stack_unknown_stack_returns_synthetic_id() {
3475        // UpdateStack declares only `InsufficientCapabilitiesException`
3476        // and `TokenAlreadyExistsException`, neither of which fits
3477        // "stack does not exist". Synthetic conformance inputs target
3478        // a placeholder stack, so we return a synthetic StackId rather
3479        // than an undeclared `ValidationError`. Real callers create
3480        // the stack first.
3481        let svc = make_service();
3482        let mut params = HashMap::new();
3483        params.insert("StackName".to_string(), "ghost".to_string());
3484        params.insert(
3485            "TemplateBody".to_string(),
3486            r#"{"Resources":{}}"#.to_string(),
3487        );
3488        let req = make_request("UpdateStack", params);
3489        let resp = svc
3490            .update_stack(&req)
3491            .await
3492            .expect("ghost update is synthetic");
3493        let b = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
3494        assert!(b.contains("UpdateStackResult"));
3495    }
3496
3497    #[tokio::test]
3498    async fn create_stack_resolves_outputs_and_records_export() {
3499        let svc = make_service();
3500        let template = r#"{
3501            "Resources": {
3502                "Q": {"Type":"AWS::SQS::Queue","Properties":{"QueueName":"out-q"}}
3503            },
3504            "Outputs": {
3505                "QueueUrl": {
3506                    "Value": {"Ref": "Q"},
3507                    "Description": "Url",
3508                    "Export": {"Name": "TheQueueUrl"}
3509                }
3510            }
3511        }"#;
3512        let mut params = HashMap::new();
3513        params.insert("StackName".to_string(), "outs".to_string());
3514        params.insert("TemplateBody".to_string(), template.to_string());
3515        let req = make_request("CreateStack", params);
3516        svc.create_stack(&req).await.expect("create stack");
3517
3518        let accounts = svc.state.read();
3519        let stack = accounts
3520            .get("123456789012")
3521            .unwrap()
3522            .stacks
3523            .get("outs")
3524            .unwrap();
3525        assert_eq!(stack.outputs.len(), 1);
3526        assert_eq!(stack.outputs[0].key, "QueueUrl");
3527        assert_eq!(stack.outputs[0].export_name.as_deref(), Some("TheQueueUrl"));
3528        assert!(!stack.outputs[0].value.is_empty());
3529    }
3530
3531    #[tokio::test]
3532    async fn create_stack_rejects_duplicate_export_name() {
3533        let svc = make_service();
3534        let mk = |name: &str| {
3535            let template = format!(
3536                r#"{{
3537                    "Resources": {{"Q":{{"Type":"AWS::SQS::Queue","Properties":{{"QueueName":"q-{name}"}}}}}},
3538                    "Outputs": {{"QueueUrl":{{"Value":{{"Ref":"Q"}},"Export":{{"Name":"DupExport"}}}}}}
3539                }}"#
3540            );
3541            let mut params = HashMap::new();
3542            params.insert("StackName".to_string(), name.to_string());
3543            params.insert("TemplateBody".to_string(), template);
3544            make_request("CreateStack", params)
3545        };
3546        match svc.create_stack(&mk("first")).await {
3547            Ok(_) => {}
3548            Err(e) => panic!("first stack: {e:?}"),
3549        }
3550        // The second stack's export collides with the first. Since
3551        // provisioning is now asynchronous, the collision can no longer be a
3552        // synchronous CreateStack error — it surfaces as a failed create.
3553        // On the current-thread test runtime the provisioning task runs
3554        // inline, so the stack is already CREATE_FAILED on return.
3555        svc.create_stack(&mk("second"))
3556            .await
3557            .expect("CreateStack returns StackId even when provisioning fails");
3558        let accounts = svc.state.read();
3559        let stack = accounts
3560            .get("123456789012")
3561            .unwrap()
3562            .stacks
3563            .get("second")
3564            .expect("second stack recorded");
3565        assert_eq!(stack.status, "CREATE_FAILED");
3566        // The first stack keeps the export.
3567        let exports = &accounts.get("123456789012").unwrap().exports;
3568        assert_eq!(
3569            exports
3570                .get("DupExport")
3571                .map(|e| e.exporting_stack_name.as_str()),
3572            Some("first")
3573        );
3574    }
3575
3576    #[tokio::test]
3577    async fn import_value_resolves_against_other_stack_export() {
3578        let svc = make_service();
3579
3580        let producer_tpl = r#"{
3581            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"prod-q"}}},
3582            "Outputs": {"Out":{"Value":{"Ref":"Q"},"Export":{"Name":"SharedQueueUrl"}}}
3583        }"#;
3584        let mut p = HashMap::new();
3585        p.insert("StackName".to_string(), "producer".to_string());
3586        p.insert("TemplateBody".to_string(), producer_tpl.to_string());
3587        svc.create_stack(&make_request("CreateStack", p))
3588            .await
3589            .expect("producer");
3590
3591        let consumer_tpl = r#"{
3592            "Resources": {"Q2":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"cons-q"}}},
3593            "Outputs": {"Imp":{"Value":{"Fn::ImportValue":"SharedQueueUrl"}}}
3594        }"#;
3595        let mut p = HashMap::new();
3596        p.insert("StackName".to_string(), "consumer".to_string());
3597        p.insert("TemplateBody".to_string(), consumer_tpl.to_string());
3598        svc.create_stack(&make_request("CreateStack", p))
3599            .await
3600            .expect("consumer");
3601
3602        let accounts = svc.state.read();
3603        let prod_url = accounts
3604            .get("123456789012")
3605            .unwrap()
3606            .stacks
3607            .get("producer")
3608            .unwrap()
3609            .outputs[0]
3610            .value
3611            .clone();
3612        let cons = accounts
3613            .get("123456789012")
3614            .unwrap()
3615            .stacks
3616            .get("consumer")
3617            .unwrap();
3618        assert_eq!(cons.outputs[0].value, prod_url);
3619    }
3620
3621    #[tokio::test]
3622    async fn create_stack_records_export_in_state_registry() {
3623        let svc = make_service();
3624        let template = r#"{
3625            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"reg-q"}}},
3626            "Outputs": {"Url":{"Value":{"Ref":"Q"},"Export":{"Name":"reg-url"}}}
3627        }"#;
3628        let mut params = HashMap::new();
3629        params.insert("StackName".to_string(), "reg".to_string());
3630        params.insert("TemplateBody".to_string(), template.to_string());
3631        svc.create_stack(&make_request("CreateStack", params))
3632            .await
3633            .expect("create");
3634
3635        let accounts = svc.state.read();
3636        let state = accounts.get("123456789012").unwrap();
3637        let export = state
3638            .exports
3639            .get("reg-url")
3640            .expect("export registered in state.exports");
3641        assert_eq!(export.exporting_stack_name, "reg");
3642        assert!(!export.value.is_empty());
3643        assert!(export.exporting_stack_id.contains("reg"));
3644    }
3645
3646    #[tokio::test]
3647    async fn import_value_with_unknown_export_errors() {
3648        let svc = make_service();
3649        let consumer_tpl = r#"{
3650            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{
3651                "QueueName": {"Fn::ImportValue":"missing-export"}
3652            }}}
3653        }"#;
3654        let mut p = HashMap::new();
3655        p.insert("StackName".to_string(), "bad-consumer".to_string());
3656        p.insert("TemplateBody".to_string(), consumer_tpl.to_string());
3657        match svc.create_stack(&make_request("CreateStack", p)).await {
3658            Ok(_) => panic!("expected ValidationError for unknown export"),
3659            Err(e) => {
3660                let msg = format!("{e:?}");
3661                assert!(msg.contains("No export named missing-export"), "got {msg}");
3662            }
3663        }
3664    }
3665
3666    #[tokio::test]
3667    async fn delete_stack_blocked_when_export_in_use_and_unblocked_after_consumer_delete() {
3668        let svc = make_service();
3669
3670        let producer_tpl = r#"{
3671            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"prod"}}},
3672            "Outputs": {"Out":{"Value":{"Ref":"Q"},"Export":{"Name":"my-arn"}}}
3673        }"#;
3674        let mut p = HashMap::new();
3675        p.insert("StackName".to_string(), "producer".to_string());
3676        p.insert("TemplateBody".to_string(), producer_tpl.to_string());
3677        svc.create_stack(&make_request("CreateStack", p))
3678            .await
3679            .expect("producer");
3680
3681        let consumer_tpl = r#"{
3682            "Resources": {"Q2":{"Type":"AWS::SQS::Queue","Properties":{
3683                "QueueName": "cons-q",
3684                "Tags": [{"Key":"k","Value":{"Fn::ImportValue":"my-arn"}}]
3685            }}}
3686        }"#;
3687        let mut p = HashMap::new();
3688        p.insert("StackName".to_string(), "consumer".to_string());
3689        p.insert("TemplateBody".to_string(), consumer_tpl.to_string());
3690        svc.create_stack(&make_request("CreateStack", p))
3691            .await
3692            .expect("consumer");
3693
3694        // Producer delete must fail while consumer still imports.
3695        let mut p = HashMap::new();
3696        p.insert("StackName".to_string(), "producer".to_string());
3697        match svc.delete_stack(&make_request("DeleteStack", p)).await {
3698            Ok(_) => panic!("delete must fail while imports exist"),
3699            Err(e) => {
3700                let msg = format!("{e:?}");
3701                assert!(msg.contains("Export my-arn cannot be deleted"), "got {msg}");
3702            }
3703        }
3704
3705        // Delete consumer first.
3706        let mut p = HashMap::new();
3707        p.insert("StackName".to_string(), "consumer".to_string());
3708        svc.delete_stack(&make_request("DeleteStack", p))
3709            .await
3710            .expect("consumer delete");
3711
3712        // Now producer delete succeeds.
3713        let mut p = HashMap::new();
3714        p.insert("StackName".to_string(), "producer".to_string());
3715        svc.delete_stack(&make_request("DeleteStack", p))
3716            .await
3717            .expect("producer delete after consumer gone");
3718
3719        let accounts = svc.state.read();
3720        let state = accounts.get("123456789012").unwrap();
3721        assert!(state.exports.is_empty(), "exports cleared after delete");
3722        assert!(state.imports.is_empty(), "imports cleared after delete");
3723    }
3724
3725    // ---- CFN provisioner persistence (issue: CFN resources lost on restart) ----
3726
3727    use std::sync::atomic::{AtomicUsize, Ordering};
3728
3729    /// A snapshot hook that counts how many times it fires, standing in for a
3730    /// real service's whole-state persist.
3731    fn counting_hook(counter: Arc<AtomicUsize>) -> fakecloud_persistence::SnapshotHook {
3732        Arc::new(move || {
3733            let counter = counter.clone();
3734            Box::pin(async move {
3735                counter.fetch_add(1, Ordering::SeqCst);
3736            })
3737        })
3738    }
3739
3740    fn disk_s3_store(tmp: &tempfile::TempDir) -> Arc<fakecloud_persistence::s3::DiskS3Store> {
3741        let cache = Arc::new(fakecloud_persistence::cache::BodyCache::new(1024 * 1024));
3742        Arc::new(fakecloud_persistence::s3::DiskS3Store::new(
3743            tmp.path().to_path_buf(),
3744            cache,
3745        ))
3746    }
3747
3748    // A stack touching SQS + SNS (snapshot-backed) and an S3 bucket (S3Store
3749    // write-through). Lambda is registered as a hook below but NOT in the
3750    // template, so it must not fire -- proving per-service selectivity.
3751    const PERSIST_TEMPLATE: &str = r#"{"Resources":{
3752        "Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"cfn-q"}},
3753        "T":{"Type":"AWS::SNS::Topic","Properties":{"TopicName":"cfn-t"}},
3754        "B":{"Type":"AWS::S3::Bucket","Properties":{"BucketName":"cfn-bucket"}}
3755    }}"#;
3756
3757    fn create_req(stack: &str) -> AwsRequest {
3758        let mut p = HashMap::new();
3759        p.insert("StackName".to_string(), stack.to_string());
3760        p.insert("TemplateBody".to_string(), PERSIST_TEMPLATE.to_string());
3761        make_request("CreateStack", p)
3762    }
3763
3764    #[tokio::test]
3765    async fn cfn_create_persists_touched_services_and_writes_bucket_to_store() {
3766        let tmp = tempfile::tempdir().unwrap();
3767        let store = disk_s3_store(&tmp);
3768        let counter = Arc::new(AtomicUsize::new(0));
3769        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3770            BTreeMap::new();
3771        hooks.insert("sqs", counting_hook(counter.clone()));
3772        hooks.insert("sns", counting_hook(counter.clone()));
3773        // Registered but not in the template -> must not fire.
3774        hooks.insert("lambda", counting_hook(counter.clone()));
3775        let svc = make_service()
3776            .with_s3_store(store.clone())
3777            .with_snapshot_hooks(hooks);
3778
3779        svc.create_stack(&create_req("probe")).await.unwrap();
3780
3781        // sqs + sns fired once each; lambda untouched.
3782        assert_eq!(counter.load(Ordering::SeqCst), 2);
3783        // The bucket was written through to the S3 store, not just the in-memory map.
3784        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3785        assert!(
3786            loaded.buckets.contains_key("cfn-bucket"),
3787            "CFN bucket should be persisted to the S3 store"
3788        );
3789    }
3790
3791    #[tokio::test]
3792    async fn cfn_delete_persists_touched_services_and_removes_bucket_from_store() {
3793        let tmp = tempfile::tempdir().unwrap();
3794        let store = disk_s3_store(&tmp);
3795        let counter = Arc::new(AtomicUsize::new(0));
3796        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3797            BTreeMap::new();
3798        hooks.insert("sqs", counting_hook(counter.clone()));
3799        hooks.insert("sns", counting_hook(counter.clone()));
3800        let svc = make_service()
3801            .with_s3_store(store.clone())
3802            .with_snapshot_hooks(hooks);
3803
3804        svc.create_stack(&create_req("probe")).await.unwrap();
3805        assert_eq!(counter.load(Ordering::SeqCst), 2, "create fired sqs + sns");
3806
3807        let mut p = HashMap::new();
3808        p.insert("StackName".to_string(), "probe".to_string());
3809        svc.delete_stack(&make_request("DeleteStack", p))
3810            .await
3811            .unwrap();
3812
3813        // Delete fired the touched services again (sqs + sns).
3814        assert_eq!(counter.load(Ordering::SeqCst), 4, "delete fired sqs + sns");
3815        // And the CFN-deleted bucket is gone from the store, so it does not
3816        // reappear after a restart.
3817        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3818        assert!(
3819            !loaded.buckets.contains_key("cfn-bucket"),
3820            "CFN-deleted bucket should be removed from the S3 store"
3821        );
3822    }
3823
3824    #[tokio::test]
3825    async fn cfn_persist_skips_services_without_a_registered_hook() {
3826        // Only "sqs" has a hook; the stack also touches SNS and S3. The missing
3827        // hooks must be silently skipped (no panic), and "sqs" fires once.
3828        let tmp = tempfile::tempdir().unwrap();
3829        let store = disk_s3_store(&tmp);
3830        let counter = Arc::new(AtomicUsize::new(0));
3831        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3832            BTreeMap::new();
3833        hooks.insert("sqs", counting_hook(counter.clone()));
3834        let svc = make_service()
3835            .with_s3_store(store.clone())
3836            .with_snapshot_hooks(hooks);
3837
3838        svc.create_stack(&create_req("probe")).await.unwrap();
3839        assert_eq!(counter.load(Ordering::SeqCst), 1, "only sqs has a hook");
3840    }
3841
3842    #[tokio::test]
3843    async fn cfn_update_persists_touched_services() {
3844        // Create with just SQS, then update to a template that adds SNS + a
3845        // bucket; the update must persist the services it touches.
3846        let tmp = tempfile::tempdir().unwrap();
3847        let store = disk_s3_store(&tmp);
3848        let counter = Arc::new(AtomicUsize::new(0));
3849        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3850            BTreeMap::new();
3851        hooks.insert("sqs", counting_hook(counter.clone()));
3852        hooks.insert("sns", counting_hook(counter.clone()));
3853        let svc = make_service()
3854            .with_s3_store(store.clone())
3855            .with_snapshot_hooks(hooks);
3856
3857        let mut create = HashMap::new();
3858        create.insert("StackName".to_string(), "upd".to_string());
3859        create.insert(
3860            "TemplateBody".to_string(),
3861            r#"{"Resources":{"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"u-q"}}}}"#
3862                .to_string(),
3863        );
3864        svc.create_stack(&make_request("CreateStack", create))
3865            .await
3866            .unwrap();
3867        let after_create = counter.load(Ordering::SeqCst);
3868
3869        let mut update = HashMap::new();
3870        update.insert("StackName".to_string(), "upd".to_string());
3871        update.insert("TemplateBody".to_string(), PERSIST_TEMPLATE.to_string());
3872        svc.update_stack(&make_request("UpdateStack", update))
3873            .await
3874            .unwrap();
3875
3876        // The update touched at least SNS (added); the hook count must grow.
3877        assert!(
3878            counter.load(Ordering::SeqCst) > after_create,
3879            "update should persist the services it touched"
3880        );
3881        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3882        assert!(loaded.buckets.contains_key("cfn-bucket"));
3883    }
3884
3885    #[tokio::test]
3886    async fn cfn_execute_change_set_persists_touched_services() {
3887        // The changeset path -- CreateChangeSet + ExecuteChangeSet -- is how
3888        // `cdk deploy`, `aws cloudformation deploy`, and SAM provision. It must
3889        // write provisioned services through to disk the same way CreateStack
3890        // does, or the resources report CREATE_COMPLETE yet vanish on restart
3891        // (bug-audit 2026-06-20, 0.A1 / #1766 class).
3892        let tmp = tempfile::tempdir().unwrap();
3893        let store = disk_s3_store(&tmp);
3894        let counter = Arc::new(AtomicUsize::new(0));
3895        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3896            BTreeMap::new();
3897        hooks.insert("sqs", counting_hook(counter.clone()));
3898        let svc = make_service()
3899            .with_s3_store(store.clone())
3900            .with_snapshot_hooks(hooks);
3901
3902        let mut create = HashMap::new();
3903        create.insert("StackName".to_string(), "cs-stack".to_string());
3904        create.insert("ChangeSetName".to_string(), "cs1".to_string());
3905        create.insert("ChangeSetType".to_string(), "CREATE".to_string());
3906        create.insert(
3907            "TemplateBody".to_string(),
3908            r#"{"Resources":{"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"cs-q"}}}}"#
3909                .to_string(),
3910        );
3911        svc.handle(make_request("CreateChangeSet", create))
3912            .await
3913            .unwrap();
3914        // CreateChangeSet doesn't provision -- it must not persist a service yet.
3915        let before = counter.load(Ordering::SeqCst);
3916
3917        let mut exec = HashMap::new();
3918        exec.insert("StackName".to_string(), "cs-stack".to_string());
3919        exec.insert("ChangeSetName".to_string(), "cs1".to_string());
3920        svc.handle(make_request("ExecuteChangeSet", exec))
3921            .await
3922            .unwrap();
3923
3924        assert!(
3925            counter.load(Ordering::SeqCst) > before,
3926            "ExecuteChangeSet must fire the sqs snapshot hook so the provisioned \
3927             queue survives a restart"
3928        );
3929    }
3930
3931    #[test]
3932    fn service_key_for_type_maps_services_and_aliases() {
3933        // Direct service segments.
3934        assert_eq!(
3935            service_key_for_type("AWS::Lambda::Function"),
3936            Some("lambda")
3937        );
3938        assert_eq!(
3939            service_key_for_type("AWS::SecretsManager::Secret"),
3940            Some("secretsmanager")
3941        );
3942        assert_eq!(service_key_for_type("AWS::SQS::Queue"), Some("sqs"));
3943        assert_eq!(service_key_for_type("AWS::IAM::Role"), Some("iam"));
3944        assert_eq!(
3945            service_key_for_type("AWS::StepFunctions::StateMachine"),
3946            Some("stepfunctions")
3947        );
3948        // Namespace aliases that differ from the fakecloud service name.
3949        assert_eq!(
3950            service_key_for_type("AWS::Events::Rule"),
3951            Some("eventbridge")
3952        );
3953        assert_eq!(service_key_for_type("AWS::Logs::LogGroup"), Some("logs"));
3954        assert_eq!(
3955            service_key_for_type("AWS::ElastiCache::CacheCluster"),
3956            Some("elasticache")
3957        );
3958        // S3 has no snapshot hook (it persists via the S3Store write-through).
3959        assert_eq!(service_key_for_type("AWS::S3::Bucket"), None);
3960        // Snapshot-backed services whose CFN namespace differs from the
3961        // fakecloud service name (these were missing from the map, #1766 class).
3962        assert_eq!(
3963            service_key_for_type("AWS::CertificateManager::Certificate"),
3964            Some("acm")
3965        );
3966        assert_eq!(
3967            service_key_for_type("AWS::ElasticLoadBalancingV2::LoadBalancer"),
3968            Some("elbv2")
3969        );
3970        assert_eq!(
3971            service_key_for_type("AWS::CloudFront::Distribution"),
3972            Some("cloudfront")
3973        );
3974        assert_eq!(
3975            service_key_for_type("AWS::Route53::HostedZone"),
3976            Some("route53")
3977        );
3978        assert_eq!(
3979            service_key_for_type("AWS::KinesisFirehose::DeliveryStream"),
3980            Some("firehose")
3981        );
3982        assert_eq!(service_key_for_type("AWS::Glue::Database"), Some("glue"));
3983        assert_eq!(service_key_for_type("AWS::WAFv2::WebACL"), Some("wafv2"));
3984        assert_eq!(
3985            service_key_for_type("AWS::Athena::WorkGroup"),
3986            Some("athena")
3987        );
3988        assert_eq!(
3989            service_key_for_type("AWS::Organizations::Organization"),
3990            Some("organizations")
3991        );
3992        // Malformed / non-AWS types.
3993        assert_eq!(service_key_for_type("AWS::Lambda"), None);
3994        assert_eq!(service_key_for_type("Custom::Thing::Resource"), None);
3995        assert_eq!(service_key_for_type("AWS"), None);
3996        assert_eq!(service_key_for_type(""), None);
3997    }
3998
3999    #[tokio::test]
4000    async fn persist_touched_services_noop_with_empty_hooks() {
4001        // No registered hooks -> nothing to do, must not panic.
4002        let hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> = BTreeMap::new();
4003        persist_touched_services(&hooks, vec!["AWS::SQS::Queue".to_string()]).await;
4004    }
4005
4006    #[tokio::test]
4007    async fn cfn_bucket_policy_write_through_create_update_delete() {
4008        let tmp = tempfile::tempdir().unwrap();
4009        let store = disk_s3_store(&tmp);
4010        let svc = make_service().with_s3_store(store.clone());
4011
4012        // Create a bucket + bucket policy.
4013        let mut create = HashMap::new();
4014        create.insert("StackName".to_string(), "pol".to_string());
4015        create.insert(
4016            "TemplateBody".to_string(),
4017            r#"{"Resources":{
4018                "B":{"Type":"AWS::S3::Bucket","Properties":{"BucketName":"pol-bucket"}},
4019                "BP":{"Type":"AWS::S3::BucketPolicy","Properties":{"Bucket":"pol-bucket","PolicyDocument":{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:GetObject","Resource":"*","Principal":"*"}]}}}
4020            }}"#
4021            .to_string(),
4022        );
4023        svc.create_stack(&make_request("CreateStack", create))
4024            .await
4025            .unwrap();
4026        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
4027        let policy = loaded.buckets["pol-bucket"]
4028            .subresources
4029            .get("policy.toml")
4030            .cloned()
4031            .expect("bucket policy persisted on create");
4032        assert!(policy.contains("s3:GetObject"));
4033
4034        // Update the policy document; the update must write through.
4035        let mut update = HashMap::new();
4036        update.insert("StackName".to_string(), "pol".to_string());
4037        update.insert(
4038            "TemplateBody".to_string(),
4039            r#"{"Resources":{
4040                "B":{"Type":"AWS::S3::Bucket","Properties":{"BucketName":"pol-bucket"}},
4041                "BP":{"Type":"AWS::S3::BucketPolicy","Properties":{"Bucket":"pol-bucket","PolicyDocument":{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:PutObject","Resource":"*","Principal":"*"}]}}}
4042            }}"#
4043            .to_string(),
4044        );
4045        svc.update_stack(&make_request("UpdateStack", update))
4046            .await
4047            .unwrap();
4048        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
4049        let policy = loaded.buckets["pol-bucket"]
4050            .subresources
4051            .get("policy.toml")
4052            .cloned()
4053            .expect("bucket policy still persisted after update");
4054        assert!(
4055            policy.contains("s3:PutObject"),
4056            "updated policy should be written through"
4057        );
4058
4059        // Delete the stack; the bucket (and its policy) must be removed from disk.
4060        let mut del = HashMap::new();
4061        del.insert("StackName".to_string(), "pol".to_string());
4062        svc.delete_stack(&make_request("DeleteStack", del))
4063            .await
4064            .unwrap();
4065        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
4066        assert!(
4067            !loaded.buckets.contains_key("pol-bucket"),
4068            "CFN-deleted bucket and policy should be gone from the store"
4069        );
4070    }
4071}