Skip to main content

fakecloud_cloudformation/
service.rs

1use async_trait::async_trait;
2use chrono::Utc;
3use http::StatusCode;
4use std::collections::{BTreeMap, BTreeSet};
5use std::sync::Arc;
6
7use fakecloud_core::delivery::DeliveryBus;
8use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
9use fakecloud_dynamodb::SharedDynamoDbState;
10use fakecloud_eventbridge::SharedEventBridgeState;
11use fakecloud_iam::SharedIamState;
12use fakecloud_logs::SharedLogsState;
13use fakecloud_persistence::{S3Store, SnapshotHook, SnapshotStore};
14use fakecloud_s3::SharedS3State;
15use fakecloud_sns::SharedSnsState;
16use fakecloud_sqs::SharedSqsState;
17use fakecloud_ssm::SharedSsmState;
18use tokio::sync::Mutex as AsyncMutex;
19
20use crate::resource_provisioner::ResourceProvisioner;
21use crate::state;
22use crate::state::{
23    CloudFormationSnapshot, CloudFormationState, SharedCloudFormationState, Stack, StackResource,
24    CLOUDFORMATION_SNAPSHOT_SCHEMA_VERSION,
25};
26use crate::template;
27use crate::xml_responses;
28
29/// Canonical `Fn::GetAtt` attribute names per resource type. Used to
30/// supplement the eagerly-captured `ProvisionResult::attributes` with
31/// live-state lookups via `ResourceProvisioner::get_att`. Resources not
32/// listed here just use whatever the create handler captured.
33fn well_known_attributes_for(resource_type: &str) -> &'static [&'static str] {
34    match resource_type {
35        "AWS::S3::Bucket" => &[
36            "Arn",
37            "DomainName",
38            "RegionalDomainName",
39            "DualStackDomainName",
40            "WebsiteURL",
41        ],
42        "AWS::Lambda::Function" => &["Arn", "FunctionUrl", "Version"],
43        "AWS::IAM::Role" => &["Arn", "RoleId"],
44        "AWS::SQS::Queue" => &["Arn", "QueueName", "QueueUrl"],
45        "AWS::SNS::Topic" => &["TopicArn", "TopicName"],
46        "AWS::DynamoDB::Table" => &["Arn", "StreamArn"],
47        "AWS::KMS::Key" => &["Arn", "KeyId"],
48        "AWS::SecretsManager::Secret" => &["Arn", "Id"],
49        "AWS::CloudFront::Distribution" => &["DomainName", "Id"],
50        "AWS::EC2::VPC" => &["VpcId", "CidrBlock"],
51        "AWS::EC2::Subnet" => &["SubnetId", "AvailabilityZone", "VpcId", "CidrBlock"],
52        "AWS::EC2::SecurityGroup" => &["GroupId", "VpcId"],
53        "AWS::EC2::InternetGateway" => &["InternetGatewayId"],
54        "AWS::EC2::RouteTable" => &["RouteTableId"],
55        "AWS::Pipes::Pipe" => &["Arn"],
56        _ => &[],
57    }
58}
59
60/// Map a CloudFormation resource type (`AWS::<Service>::<Resource>`) to the
61/// snapshot-hook key for its owning service (the keys of
62/// `CloudFormationDeps::snapshot_hooks`). The key is the service segment, with
63/// aliases for the few services whose CloudFormation namespace differs from
64/// their fakecloud service name (e.g. `Events` -> `eventbridge`).
65///
66/// `AWS::S3::*` is intentionally absent: S3 buckets persist through the
67/// `S3Store` write-through path in the provisioner, not a whole-state snapshot
68/// hook. Returns `None` for malformed types and any service whose state is not
69/// snapshot-backed (those CFN resources have no persistence path either way).
70fn service_key_for_type(resource_type: &str) -> Option<&'static str> {
71    let mut parts = resource_type.split("::");
72    let vendor = parts.next()?;
73    let service = parts.next()?;
74    // A real resource type has three segments; a 2-part string (or fewer) is
75    // not one.
76    parts.next()?;
77    if vendor != "AWS" {
78        return None;
79    }
80    Some(match service {
81        "Lambda" => "lambda",
82        "SecretsManager" => "secretsmanager",
83        "SQS" => "sqs",
84        "SNS" => "sns",
85        "DynamoDB" => "dynamodb",
86        "StepFunctions" => "stepfunctions",
87        "Events" => "eventbridge",
88        "SSM" => "ssm",
89        "Logs" => "logs",
90        "KMS" => "kms",
91        "Kinesis" => "kinesis",
92        "SES" => "ses",
93        "Cognito" => "cognito",
94        "RDS" => "rds",
95        "ElastiCache" => "elasticache",
96        "ECR" => "ecr",
97        "ECS" => "ecs",
98        "CloudWatch" => "cloudwatch",
99        "ApiGateway" => "apigateway",
100        "ApiGatewayV2" => "apigatewayv2",
101        "Bedrock" => "bedrock",
102        "Scheduler" => "scheduler",
103        "IAM" => "iam",
104        // Services whose CloudFormation namespace differs from their fakecloud
105        // service name, and which were missing from this table: their
106        // provisioner mutates snapshot-backed state directly, so CFN-created
107        // (and CFN-deleted) resources vanished on restart while their
108        // direct-API equivalents persisted (#1766 class). Each has a registered
109        // snapshot hook in the server.
110        "CertificateManager" => "acm",
111        "ElasticLoadBalancingV2" => "elbv2",
112        "CloudFront" => "cloudfront",
113        "Route53" => "route53",
114        "KinesisFirehose" => "firehose",
115        "Glue" => "glue",
116        "WAFv2" => "wafv2",
117        "Athena" => "athena",
118        "Organizations" => "organizations",
119        // EC2 and ApplicationAutoScaling were wired into the provisioner after
120        // this table was last audited (EC2: 2026-06-25 #1957;
121        // application-autoscaling: persistence sweep), so their CFN-created VPC
122        // / Subnet / SecurityGroup / scalable-target state mutated in memory and
123        // vanished on restart (#1766 class). Both have a registered snapshot
124        // hook in the server.
125        "EC2" => "ec2",
126        "AutoScaling" => "autoscaling",
127        "Batch" => "batch",
128        "ApplicationAutoScaling" => "application-autoscaling",
129        "Pipes" => "pipes",
130        _ => return None,
131    })
132}
133
134/// Persist each distinct snapshot-backed service touched by a stack op, once.
135///
136/// `resource_types` is the set of `resource_type` strings the op created /
137/// updated / deleted. The CloudFormation provisioner mutates services' shared
138/// state directly and never triggers their snapshot path; this writes that
139/// state through to disk afterwards, so a CFN-provisioned (or CFN-deleted)
140/// resource survives a restart. Services with no registered hook (memory mode,
141/// or non-snapshot-backed services) are skipped.
142async fn persist_touched_services<I>(
143    hooks: &BTreeMap<&'static str, SnapshotHook>,
144    resource_types: I,
145) where
146    I: IntoIterator<Item = String>,
147{
148    if hooks.is_empty() {
149        return;
150    }
151    let mut keys: BTreeSet<&'static str> = BTreeSet::new();
152    for ty in resource_types {
153        if let Some(key) = service_key_for_type(&ty) {
154            keys.insert(key);
155        }
156    }
157    for key in keys {
158        if let Some(hook) = hooks.get(key) {
159            hook().await;
160        }
161    }
162}
163
164/// Multi-pass provisioning for all resources in a parsed template.
165///
166/// Resources may `Ref` each other in either direction, and JSON object
167/// iteration order isn't stable, so a single forward pass isn't enough
168/// to resolve them. We loop: each pass tries every pending resource, and
169/// any resource whose `Ref` targets are still unknown just stays pending
170/// for the next pass. When no pass makes progress we report the first
171/// pending failure and rollback.
172pub(crate) fn provision_stack_resources(
173    provisioner: &ResourceProvisioner,
174    resource_defs: &[template::ResourceDefinition],
175    template_body: &str,
176    parameters: &BTreeMap<String, String>,
177    imports: &BTreeMap<String, String>,
178) -> Result<Vec<StackResource>, AwsServiceError> {
179    let mut resources = Vec::new();
180    let mut physical_ids: BTreeMap<String, String> = BTreeMap::new();
181    let mut attributes: BTreeMap<String, BTreeMap<String, String>> = BTreeMap::new();
182    // Seed the work list in dependency order so a referenced resource is
183    // provisioned before its referrer and `Ref`/`GetAtt`/`Fn::Sub` resolve to
184    // physical ids. The fixed-point retry loop below still recovers from any
185    // residual ordering the graph can't express.
186    let order = template::dependency_order(template_body, parameters, resource_defs);
187    let mut pending: Vec<&template::ResourceDefinition> =
188        order.iter().map(|&i| &resource_defs[i]).collect();
189    let max_passes = pending.len() + 1;
190
191    for _ in 0..max_passes {
192        if pending.is_empty() {
193            break;
194        }
195        let mut still_pending = Vec::new();
196        let mut made_progress = false;
197
198        for resource_def in pending {
199            let resolved_def = template::resolve_resource_properties_with_attrs(
200                resource_def,
201                template_body,
202                parameters,
203                &physical_ids,
204                &attributes,
205                imports,
206            )
207            .map_err(|e| {
208                // `ValidationError` isn't declared on CreateStack/UpdateStack;
209                // surface template resolve failures through the closest declared
210                // shape (`InsufficientCapabilitiesException`) instead.
211                AwsServiceError::aws_error(
212                    StatusCode::BAD_REQUEST,
213                    "InsufficientCapabilitiesException",
214                    e,
215                )
216            })?;
217
218            match provisioner.create_resource(&resolved_def) {
219                Ok(mut stack_resource) => {
220                    physical_ids.insert(
221                        stack_resource.logical_id.clone(),
222                        stack_resource.physical_id.clone(),
223                    );
224                    // Start with the eagerly-captured attribute set, then
225                    // overlay anything the provisioner can resolve from
226                    // live state (e.g. attributes that depend on side-effects
227                    // recorded after the create handler returned).
228                    let mut attr_map = stack_resource.attributes.clone();
229                    for attr in well_known_attributes_for(&stack_resource.resource_type) {
230                        if attr_map.contains_key(*attr) {
231                            continue;
232                        }
233                        if let Some(v) = provisioner.get_att(&stack_resource, attr) {
234                            attr_map.insert((*attr).to_string(), v);
235                        }
236                    }
237                    attributes.insert(stack_resource.logical_id.clone(), attr_map.clone());
238                    // Persist the live-resolved attributes onto the stored
239                    // resource so later readers — notably resolve_template_outputs,
240                    // which reads StackResource.attributes directly without the
241                    // overlay — see the full GetAtt set, not just the eager subset.
242                    stack_resource.attributes = attr_map;
243                    resources.push(stack_resource);
244                    made_progress = true;
245                }
246                Err(_) => still_pending.push(resource_def),
247            }
248        }
249
250        pending = still_pending;
251        if !made_progress && !pending.is_empty() {
252            // No progress — report the first failure and rollback anything
253            // we already created.
254            let resource_def = pending[0];
255            let resolved_def = template::resolve_resource_properties_with_attrs(
256                resource_def,
257                template_body,
258                parameters,
259                &physical_ids,
260                &attributes,
261                imports,
262            )
263            .unwrap_or_else(|_| resource_def.clone());
264            let err = provisioner.create_resource(&resolved_def).unwrap_err();
265            for r in &resources {
266                let _ = provisioner.delete_resource(r);
267            }
268            return Err(AwsServiceError::aws_error(
269                StatusCode::BAD_REQUEST,
270                "ValidationError",
271                format!(
272                    "Failed to create resource {}: {err}",
273                    resource_def.logical_id
274                ),
275            ));
276        }
277    }
278
279    Ok(resources)
280}
281
282/// State references for every service CloudFormation can provision resources in.
283/// The result of a Cloud Control API resource provisioning call: the resource's
284/// primary identifier plus its `GetAtt`-resolvable attributes. Kept free of the
285/// crate-internal `StackResource` type so the `cloudcontrol` crate can consume
286/// it directly.
287#[derive(Debug, Clone)]
288pub struct CloudControlOutcome {
289    pub physical_id: String,
290    pub attributes: BTreeMap<String, String>,
291}
292
293impl CloudControlOutcome {
294    fn from(resource: &StackResource) -> Self {
295        Self {
296            physical_id: resource.physical_id.clone(),
297            attributes: resource.attributes.clone(),
298        }
299    }
300}
301
302/// Rebuild the `StackResource` shape the update/delete handlers expect from the
303/// identity Cloud Control persists between calls.
304fn reconstruct_stack_resource(
305    type_name: &str,
306    physical_id: &str,
307    attributes: &BTreeMap<String, String>,
308) -> StackResource {
309    StackResource {
310        logical_id: "Resource".to_string(),
311        physical_id: physical_id.to_string(),
312        resource_type: type_name.to_string(),
313        status: "CREATE_COMPLETE".to_string(),
314        service_token: None,
315        attributes: attributes.clone(),
316    }
317}
318
319pub struct CloudFormationDeps {
320    pub sqs: SharedSqsState,
321    pub sns: SharedSnsState,
322    pub ssm: SharedSsmState,
323    pub iam: SharedIamState,
324    pub s3: SharedS3State,
325    pub eventbridge: SharedEventBridgeState,
326    pub dynamodb: SharedDynamoDbState,
327    pub logs: SharedLogsState,
328    pub lambda: fakecloud_lambda::SharedLambdaState,
329    pub secretsmanager: fakecloud_secretsmanager::SharedSecretsManagerState,
330    pub kinesis: fakecloud_kinesis::SharedKinesisState,
331    pub kms: fakecloud_kms::SharedKmsState,
332    pub ecr: fakecloud_ecr::SharedEcrState,
333    pub cloudwatch: fakecloud_cloudwatch::SharedCloudWatchState,
334    pub elbv2: fakecloud_elbv2::SharedElbv2State,
335    pub organizations: fakecloud_organizations::SharedOrganizationsState,
336    pub cognito: fakecloud_cognito::SharedCognitoState,
337    pub rds: fakecloud_rds::SharedRdsState,
338    pub ec2: fakecloud_ec2::SharedEc2State,
339    pub autoscaling: fakecloud_autoscaling::SharedAutoScalingState,
340    pub batch: fakecloud_batch::SharedBatchState,
341    pub pipes: fakecloud_pipes::SharedPipesState,
342    pub ecs: fakecloud_ecs::SharedEcsState,
343    pub acm: fakecloud_acm::SharedAcmState,
344    pub elasticache: fakecloud_elasticache::SharedElastiCacheState,
345    pub route53: fakecloud_route53::SharedRoute53State,
346    pub cloudfront: fakecloud_cloudfront::SharedCloudFrontState,
347    pub stepfunctions: fakecloud_stepfunctions::SharedStepFunctionsState,
348    pub wafv2: fakecloud_wafv2::SharedWafv2State,
349    pub apigateway: fakecloud_apigateway::SharedApiGatewayState,
350    pub apigatewayv2: fakecloud_apigatewayv2::SharedApiGatewayV2State,
351    pub ses: fakecloud_ses::SharedSesState,
352    pub application_autoscaling:
353        fakecloud_application_autoscaling::SharedApplicationAutoScalingState,
354    pub athena: fakecloud_athena::SharedAthenaState,
355    pub firehose: fakecloud_firehose::SharedFirehoseState,
356    pub glue: fakecloud_glue::SharedGlueState,
357    pub delivery: Arc<DeliveryBus>,
358    /// Lambda container runtime, when Docker/Podman is available. Used to
359    /// pre-pull the runtime image of a CFN-provisioned `AWS::Lambda::Function`
360    /// in the background so its first Invoke doesn't pay the cold-pull cost
361    /// (the #1539 timeout, through the CloudFormation door). `None` when no
362    /// runtime is configured — provisioning still works, the first Invoke just
363    /// falls back to a cold pull.
364    pub lambda_runtime: Option<Arc<fakecloud_lambda::runtime::ContainerRuntime>>,
365    /// Container runtimes for the stateful services whose CFN-provisioned
366    /// resources must be backed by REAL containers (not phantom metadata).
367    /// When present, the provisioner inserts the resource record synchronously
368    /// (so `Ref`/`GetAtt` resolve during provisioning) and then backs it with a
369    /// real container in the background — the same container the direct API
370    /// path spawns. `None` (no Docker/Podman, e.g. CI) keeps the metadata-only
371    /// behavior, matching how the direct API degrades without a runtime.
372    pub rds_runtime: Option<Arc<fakecloud_rds::runtime::RdsRuntime>>,
373    pub ec2_runtime: Option<Arc<fakecloud_ec2::runtime::Ec2Runtime>>,
374    pub ecs_runtime: Option<Arc<fakecloud_ecs::runtime::EcsRuntime>>,
375    pub elasticache_runtime: Option<Arc<fakecloud_elasticache::runtime::ElastiCacheRuntime>>,
376}
377
378pub struct CloudFormationService {
379    pub(crate) state: SharedCloudFormationState,
380    pub(crate) deps: CloudFormationDeps,
381    snapshot_store: Option<Arc<dyn SnapshotStore>>,
382    snapshot_lock: Arc<AsyncMutex<()>>,
383    /// Fine-grained S3 disk store. CFN bucket create/delete writes through this
384    /// (the same path the real `CreateBucket`/`DeleteBucket` use) instead of
385    /// only mutating the in-memory map, so a CFN-provisioned bucket survives a
386    /// restart. Defaults to a `MemoryS3Store` (no-op); the server wires the
387    /// real store via `with_s3_store` once it has been built.
388    s3_store: Arc<dyn S3Store>,
389    /// Whole-state snapshot persist hooks keyed by service name (see
390    /// `service_key_for_type`). After a stack op the handler invokes the hook
391    /// for each touched service so a CFN-provisioned (or CFN-deleted) resource
392    /// is written through to disk, the same persistence a direct mutating API
393    /// call would trigger. Empty by default (memory mode, or no services
394    /// wired); the server populates it via `with_snapshot_hooks`.
395    snapshot_hooks: BTreeMap<&'static str, SnapshotHook>,
396}
397
398/// Everything the async CreateStack provisioning task needs to provision
399/// resources and flip the stack to its terminal status. Bundled into one
400/// owned struct so it can move into a detached `tokio::spawn`.
401struct CreateStackContext {
402    state: SharedCloudFormationState,
403    delivery: Arc<DeliveryBus>,
404    snapshot_store: Option<Arc<dyn SnapshotStore>>,
405    snapshot_lock: Arc<AsyncMutex<()>>,
406    snapshot_hooks: BTreeMap<&'static str, SnapshotHook>,
407    provisioner: ResourceProvisioner,
408    account_id: String,
409    stack_name: String,
410    stack_id: String,
411    template_body: String,
412    parameters: BTreeMap<String, String>,
413    notification_arns: Vec<String>,
414    imported_names: Vec<String>,
415    resource_defs: Vec<template::ResourceDefinition>,
416}
417
418/// Owned clones of the service-state + container-runtime handles a provisioner
419/// holds, so the spawn / teardown drains can `tokio::spawn` REAL container work
420/// after the provisioner itself has been moved (CreateStack moves it into
421/// `spawn_blocking`) or after the CFN state lock has been dropped (the
422/// changeset/update/delete paths). Centralizes the per-resource-type match that
423/// backs a freshly-provisioned container resource or reaps a deleted one, so the
424/// CreateStack / ExecuteChangeSet / UpdateStack / DeleteStack paths stay in
425/// lockstep (the #2031-#2034 create-side hardening reaching every path).
426pub(crate) struct ContainerBackingHandles {
427    account_id: String,
428    region: String,
429    rds_state: fakecloud_rds::SharedRdsState,
430    rds_runtime: Option<Arc<fakecloud_rds::runtime::RdsRuntime>>,
431    ec2_state: fakecloud_ec2::SharedEc2State,
432    ec2_runtime: Option<Arc<fakecloud_ec2::runtime::Ec2Runtime>>,
433    autoscaling_state: fakecloud_autoscaling::SharedAutoScalingState,
434    elasticache_state: fakecloud_elasticache::SharedElastiCacheState,
435    elasticache_runtime: Option<Arc<fakecloud_elasticache::runtime::ElastiCacheRuntime>>,
436    ecs_state: fakecloud_ecs::SharedEcsState,
437    ecs_runtime: Option<Arc<fakecloud_ecs::runtime::EcsRuntime>>,
438}
439
440impl ContainerBackingHandles {
441    pub(crate) fn from_provisioner(p: &ResourceProvisioner) -> Self {
442        Self {
443            account_id: p.account_id.clone(),
444            region: p.region.clone(),
445            rds_state: p.rds_state.clone(),
446            rds_runtime: p.rds_runtime.clone(),
447            ec2_state: p.ec2_state.clone(),
448            ec2_runtime: p.ec2_runtime.clone(),
449            autoscaling_state: p.autoscaling_state.clone(),
450            elasticache_state: p.elasticache_state.clone(),
451            elasticache_runtime: p.elasticache_runtime.clone(),
452            ecs_state: p.ecs_state.clone(),
453            ecs_runtime: p.ecs_runtime.clone(),
454        }
455    }
456
457    /// Back each freshly-inserted container resource with a REAL container in a
458    /// detached task, so the stack op never blocks on a container boot/pull (the
459    /// #1539/#1730 timeout lesson). Drains the provisioner's queued spawn intents.
460    pub(crate) fn spawn_container_intents(
461        &self,
462        intents: Vec<crate::resource_provisioner::ContainerSpawnIntent>,
463    ) {
464        use crate::resource_provisioner::ContainerSpawnIntent;
465        for intent in intents {
466            match intent {
467                ContainerSpawnIntent::RdsInstance { identifier } => {
468                    if let Some(runtime) = self.rds_runtime.clone() {
469                        let rds_state = self.rds_state.clone();
470                        let account = self.account_id.clone();
471                        let region = self.region.clone();
472                        tokio::spawn(async move {
473                            fakecloud_rds::cfn_provision::cfn_ensure_instance_container(
474                                rds_state, runtime, identifier, account, region,
475                            )
476                            .await;
477                        });
478                    }
479                }
480                ContainerSpawnIntent::AsgInstances { group_name } => {
481                    let asg_state = self.autoscaling_state.clone();
482                    let ec2_state = self.ec2_state.clone();
483                    let ec2_runtime = self.ec2_runtime.clone();
484                    let account = self.account_id.clone();
485                    let region = self.region.clone();
486                    tokio::spawn(async move {
487                        fakecloud_autoscaling::cfn_provision::cfn_reconcile_capacity(
488                            asg_state,
489                            ec2_state,
490                            ec2_runtime,
491                            group_name,
492                            account,
493                            region,
494                        )
495                        .await;
496                    });
497                }
498                ContainerSpawnIntent::Ec2Instance { instance_id } => {
499                    let ec2_state = self.ec2_state.clone();
500                    let ec2_runtime = self.ec2_runtime.clone();
501                    let account = self.account_id.clone();
502                    tokio::spawn(async move {
503                        fakecloud_ec2::cfn_provision::cfn_back_instance(
504                            ec2_state,
505                            ec2_runtime,
506                            account,
507                            instance_id,
508                        )
509                        .await;
510                    });
511                }
512                ContainerSpawnIntent::ElastiCacheCluster { cache_cluster_id } => {
513                    if let Some(runtime) = self.elasticache_runtime.clone() {
514                        let ec_state = self.elasticache_state.clone();
515                        let account = self.account_id.clone();
516                        tokio::spawn(async move {
517                            fakecloud_elasticache::cfn_provision::cfn_ensure_cluster_container(
518                                ec_state,
519                                runtime,
520                                cache_cluster_id,
521                                account,
522                            )
523                            .await;
524                        });
525                    }
526                }
527                ContainerSpawnIntent::ElastiCacheReplicationGroup {
528                    replication_group_id,
529                } => {
530                    if let Some(runtime) = self.elasticache_runtime.clone() {
531                        let ec_state = self.elasticache_state.clone();
532                        let account = self.account_id.clone();
533                        tokio::spawn(async move {
534                            fakecloud_elasticache::cfn_provision::cfn_ensure_replication_group_container(
535                                ec_state,
536                                runtime,
537                                replication_group_id,
538                                account,
539                            )
540                            .await;
541                        });
542                    }
543                }
544                ContainerSpawnIntent::EcsServiceTasks {
545                    cluster_name,
546                    service_name,
547                } => {
548                    if let Some(runtime) = self.ecs_runtime.clone() {
549                        let ecs_state = self.ecs_state.clone();
550                        let account = self.account_id.clone();
551                        tokio::spawn(async move {
552                            fakecloud_ecs::cfn_provision::cfn_launch_service_tasks(
553                                ecs_state,
554                                runtime,
555                                cluster_name,
556                                service_name,
557                                account,
558                            )
559                            .await;
560                        });
561                    }
562                }
563            }
564        }
565    }
566
567    /// Reap the REAL backing container for each deleted container resource in a
568    /// detached task, so a stack delete (or update-removed) never leaks a
569    /// running RDS / ElastiCache / ECS / EC2 container. Drains the provisioner's
570    /// queued teardown intents.
571    pub(crate) fn spawn_teardown_intents(
572        &self,
573        intents: Vec<crate::resource_provisioner::ContainerTeardownIntent>,
574    ) {
575        use crate::resource_provisioner::ContainerTeardownIntent;
576        for intent in intents {
577            match intent {
578                ContainerTeardownIntent::RdsInstance { identifier } => {
579                    if let Some(runtime) = self.rds_runtime.clone() {
580                        let account = self.account_id.clone();
581                        tokio::spawn(async move {
582                            fakecloud_rds::cfn_provision::cfn_teardown_instance_container(
583                                runtime, identifier, account,
584                            )
585                            .await;
586                        });
587                    }
588                }
589                ContainerTeardownIntent::ElastiCacheCluster { cache_cluster_id } => {
590                    if let Some(runtime) = self.elasticache_runtime.clone() {
591                        tokio::spawn(async move {
592                            fakecloud_elasticache::cfn_provision::cfn_teardown_cluster_container(
593                                runtime,
594                                cache_cluster_id,
595                            )
596                            .await;
597                        });
598                    }
599                }
600                ContainerTeardownIntent::ElastiCacheReplicationGroup {
601                    replication_group_id,
602                } => {
603                    if let Some(runtime) = self.elasticache_runtime.clone() {
604                        tokio::spawn(async move {
605                            fakecloud_elasticache::cfn_provision::cfn_teardown_replication_group_container(
606                                runtime,
607                                replication_group_id,
608                            )
609                            .await;
610                        });
611                    }
612                }
613                ContainerTeardownIntent::EcsService {
614                    cluster_name,
615                    service_name,
616                } => {
617                    if let Some(runtime) = self.ecs_runtime.clone() {
618                        let ecs_state = self.ecs_state.clone();
619                        let account = self.account_id.clone();
620                        tokio::spawn(async move {
621                            fakecloud_ecs::cfn_provision::cfn_stop_service_tasks(
622                                ecs_state,
623                                runtime,
624                                cluster_name,
625                                service_name,
626                                account,
627                            )
628                            .await;
629                        });
630                    }
631                }
632                ContainerTeardownIntent::Ec2Instance { instance_id } => {
633                    let ec2_state = self.ec2_state.clone();
634                    let ec2_runtime = self.ec2_runtime.clone();
635                    let account = self.account_id.clone();
636                    let region = self.region.clone();
637                    tokio::spawn(async move {
638                        fakecloud_ec2::cfn_provision::cfn_terminate(
639                            ec2_state,
640                            ec2_runtime,
641                            account,
642                            region,
643                            instance_id,
644                        )
645                        .await;
646                    });
647                }
648                ContainerTeardownIntent::AsgInstances { instance_ids } => {
649                    let asg_state = self.autoscaling_state.clone();
650                    let ec2_state = self.ec2_state.clone();
651                    let ec2_runtime = self.ec2_runtime.clone();
652                    let account = self.account_id.clone();
653                    let region = self.region.clone();
654                    tokio::spawn(async move {
655                        fakecloud_autoscaling::cfn_provision::cfn_terminate_instances(
656                            asg_state,
657                            ec2_state,
658                            ec2_runtime,
659                            instance_ids,
660                            account,
661                            region,
662                        )
663                        .await;
664                    });
665                }
666            }
667        }
668    }
669}
670
671/// Drain a provisioner's queued custom-resource Lambda invokes (`Custom::*`),
672/// running each off the request path in a detached task so a cold image pull
673/// never blocks the client or stalls the CFN state lock. Used by the
674/// changeset/update/delete paths (where `defer_custom_invokes` is set).
675pub(crate) fn spawn_custom_invokes(provisioner: &ResourceProvisioner) {
676    let intents = std::mem::take(&mut *provisioner.pending_custom_invokes.lock());
677    if intents.is_empty() {
678        return;
679    }
680    let delivery = provisioner.delivery.clone();
681    for intent in intents {
682        let delivery = delivery.clone();
683        tokio::spawn(async move {
684            match delivery
685                .invoke_lambda(&intent.service_token, &intent.payload)
686                .await
687            {
688                Some(Ok(_)) => {
689                    tracing::info!(
690                        "Custom resource Lambda {} invoked successfully",
691                        intent.service_token
692                    );
693                }
694                Some(Err(e)) => {
695                    tracing::warn!(
696                        "Custom resource Lambda {} invocation failed: {e}",
697                        intent.service_token
698                    );
699                }
700                None => {}
701            }
702        });
703    }
704}
705
706impl CloudFormationService {
707    pub fn new(state: SharedCloudFormationState, deps: CloudFormationDeps) -> Self {
708        Self {
709            state,
710            deps,
711            snapshot_store: None,
712            snapshot_lock: Arc::new(AsyncMutex::new(())),
713            s3_store: Arc::new(fakecloud_persistence::s3::MemoryS3Store::new()),
714            snapshot_hooks: BTreeMap::new(),
715        }
716    }
717
718    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
719        self.snapshot_store = Some(store);
720        self
721    }
722
723    /// Wire the fine-grained S3 disk store so CFN-provisioned buckets are
724    /// written through to disk (see `ResourceProvisioner::s3_store`).
725    pub fn with_s3_store(mut self, store: Arc<dyn S3Store>) -> Self {
726        self.s3_store = store;
727        self
728    }
729
730    /// Register the per-service snapshot persist hooks (keyed by service name)
731    /// used to persist every snapshot-backed service a stack op touches.
732    pub fn with_snapshot_hooks(mut self, hooks: BTreeMap<&'static str, SnapshotHook>) -> Self {
733        self.snapshot_hooks = hooks;
734        self
735    }
736
737    async fn save_snapshot(&self) {
738        let Some(store) = self.snapshot_store.clone() else {
739            return;
740        };
741        let _guard = self.snapshot_lock.lock().await;
742        let snapshot = CloudFormationSnapshot {
743            schema_version: CLOUDFORMATION_SNAPSHOT_SCHEMA_VERSION,
744            state: None,
745            accounts: Some(self.state.read().clone()),
746        };
747        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
748            let bytes = serde_json::to_vec(&snapshot)
749                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
750            store.save(&bytes)
751        })
752        .await;
753        match join {
754            Ok(Ok(())) => {}
755            Ok(Err(err)) => tracing::error!(%err, "failed to write cloudformation snapshot"),
756            Err(err) => tracing::error!(%err, "cloudformation snapshot task panicked"),
757        }
758    }
759
760    pub(crate) fn provisioner(
761        &self,
762        stack_id: &str,
763        account_id: &str,
764        region: &str,
765    ) -> ResourceProvisioner {
766        ResourceProvisioner {
767            sqs_state: self.deps.sqs.clone(),
768            sns_state: self.deps.sns.clone(),
769            ssm_state: self.deps.ssm.clone(),
770            iam_state: self.deps.iam.clone(),
771            s3_state: self.deps.s3.clone(),
772            eventbridge_state: self.deps.eventbridge.clone(),
773            dynamodb_state: self.deps.dynamodb.clone(),
774            logs_state: self.deps.logs.clone(),
775            lambda_state: self.deps.lambda.clone(),
776            secretsmanager_state: self.deps.secretsmanager.clone(),
777            kinesis_state: self.deps.kinesis.clone(),
778            kms_state: self.deps.kms.clone(),
779            ecr_state: self.deps.ecr.clone(),
780            cloudwatch_state: self.deps.cloudwatch.clone(),
781            elbv2_state: self.deps.elbv2.clone(),
782            organizations_state: self.deps.organizations.clone(),
783            cognito_state: self.deps.cognito.clone(),
784            rds_state: self.deps.rds.clone(),
785            ec2_state: self.deps.ec2.clone(),
786            autoscaling_state: self.deps.autoscaling.clone(),
787            batch_state: self.deps.batch.clone(),
788            pipes_state: self.deps.pipes.clone(),
789            ecs_state: self.deps.ecs.clone(),
790            acm_state: self.deps.acm.clone(),
791            elasticache_state: self.deps.elasticache.clone(),
792            route53_state: self.deps.route53.clone(),
793            cloudfront_state: self.deps.cloudfront.clone(),
794            stepfunctions_state: self.deps.stepfunctions.clone(),
795            wafv2_state: self.deps.wafv2.clone(),
796            apigateway_state: self.deps.apigateway.clone(),
797            apigatewayv2_state: self.deps.apigatewayv2.clone(),
798            ses_state: self.deps.ses.clone(),
799            app_autoscaling_state: self.deps.application_autoscaling.clone(),
800            athena_state: self.deps.athena.clone(),
801            firehose_state: self.deps.firehose.clone(),
802            glue_state: self.deps.glue.clone(),
803            cloudformation_state: self.state.clone(),
804            delivery: self.deps.delivery.clone(),
805            lambda_runtime: self.deps.lambda_runtime.clone(),
806            rds_runtime: self.deps.rds_runtime.clone(),
807            ec2_runtime: self.deps.ec2_runtime.clone(),
808            ecs_runtime: self.deps.ecs_runtime.clone(),
809            elasticache_runtime: self.deps.elasticache_runtime.clone(),
810            pending_container_spawns: Arc::new(parking_lot::Mutex::new(Vec::new())),
811            pending_container_teardowns: Arc::new(parking_lot::Mutex::new(Vec::new())),
812            pending_custom_invokes: Arc::new(parking_lot::Mutex::new(Vec::new())),
813            // CreateStack provisions in a detached task, so its synchronous
814            // custom-resource invoke never blocks the client or the state lock.
815            // The changeset/update/delete paths run inside the request, so they
816            // flip this on (see `provisioner_deferred`) to queue the invoke and
817            // drain it off the request path instead.
818            defer_custom_invokes: false,
819            s3_store: self.s3_store.clone(),
820            account_id: account_id.to_string(),
821            region: region.to_string(),
822            stack_id: stack_id.to_string(),
823            // CreateStack + changeset/update/delete accept unmodeled types; only
824            // the Cloud Control bridge flips this on to reject them.
825            strict_unknown_types: false,
826        }
827    }
828
829    // --- Cloud Control API bridge -----------------------------------------
830    //
831    // Cloud Control API (`cloudcontrolapi`) drives the SAME resource
832    // provisioners CloudFormation uses, one resource at a time, with a
833    // caller-supplied `DesiredState` instead of a template. These methods build
834    // a one-shot provisioner (a synthetic stack id per request), run the
835    // relevant create/update/delete handler, and — crucially — drain the
836    // container-spawn / teardown intents the same way `CreateStack`/`DeleteStack`
837    // do, so a CCAPI-provisioned RDS/EC2/ECS/ElastiCache resource is backed by a
838    // REAL container, not just a metadata record.
839
840    /// Provision a single resource of `type_name` with concrete `properties`
841    /// (Cloud Control `DesiredState`). Returns the physical id + `GetAtt`-style
842    /// attributes on success, or the handler error message.
843    pub fn cloudcontrol_create(
844        &self,
845        type_name: &str,
846        properties: serde_json::Value,
847        account_id: &str,
848        region: &str,
849    ) -> Result<CloudControlOutcome, String> {
850        let stack_id = format!("cloudcontrol-{}", uuid::Uuid::new_v4());
851        let mut provisioner = self.provisioner(&stack_id, account_id, region);
852        // Reject a TypeName fakecloud has no provisioner for, so Cloud Control
853        // never records a resource with no backing service state.
854        provisioner.strict_unknown_types = true;
855        let backing = ContainerBackingHandles::from_provisioner(&provisioner);
856        let spawns = provisioner.pending_container_spawns.clone();
857        let def = template::ResourceDefinition {
858            logical_id: "Resource".to_string(),
859            resource_type: type_name.to_string(),
860            properties,
861        };
862        let resource = provisioner.create_resource(&def)?;
863        // Back any container-backed resource with a real container, off the
864        // request path, exactly as CreateStack does.
865        backing.spawn_container_intents(std::mem::take(&mut *spawns.lock()));
866        Ok(CloudControlOutcome::from(&resource))
867    }
868
869    /// Apply a new desired state to an existing resource. `prior_attributes` is
870    /// what the previous create/update returned; it lets us reconstruct the
871    /// backing `StackResource` the update handlers expect without leaking that
872    /// type across the crate boundary.
873    pub fn cloudcontrol_update(
874        &self,
875        type_name: &str,
876        physical_id: &str,
877        prior_attributes: &BTreeMap<String, String>,
878        new_properties: serde_json::Value,
879        account_id: &str,
880        region: &str,
881    ) -> Result<CloudControlOutcome, String> {
882        let stack_id = format!("cloudcontrol-{}", uuid::Uuid::new_v4());
883        let mut provisioner = self.provisioner(&stack_id, account_id, region);
884        provisioner.strict_unknown_types = true;
885        let existing = reconstruct_stack_resource(type_name, physical_id, prior_attributes);
886        let def = template::ResourceDefinition {
887            logical_id: "Resource".to_string(),
888            resource_type: type_name.to_string(),
889            properties: new_properties,
890        };
891        // `None` means the handler treated the change as a no-op (nothing to
892        // mutate); the resource keeps its prior identity + attributes.
893        match provisioner.update_resource(&existing, &def)? {
894            Some(updated) => Ok(CloudControlOutcome::from(&updated)),
895            None => Ok(CloudControlOutcome::from(&existing)),
896        }
897    }
898
899    /// Delete an existing resource, reaping any real backing container.
900    pub fn cloudcontrol_delete(
901        &self,
902        type_name: &str,
903        physical_id: &str,
904        prior_attributes: &BTreeMap<String, String>,
905        account_id: &str,
906        region: &str,
907    ) -> Result<(), String> {
908        let stack_id = format!("cloudcontrol-{}", uuid::Uuid::new_v4());
909        let provisioner = self.provisioner(&stack_id, account_id, region);
910        let teardowns = provisioner.pending_container_teardowns.clone();
911        let existing = reconstruct_stack_resource(type_name, physical_id, prior_attributes);
912        provisioner.delete_resource(&existing)?;
913        ContainerBackingHandles::from_provisioner(&provisioner)
914            .spawn_teardown_intents(std::mem::take(&mut *teardowns.lock()));
915        Ok(())
916    }
917
918    /// Flush the backing service state a Cloud Control op just mutated to disk,
919    /// using the SAME snapshot hooks `CreateStack`/`ExecuteChangeSet` fire. A
920    /// CCAPI create/update/delete goes straight through the provisioner without
921    /// touching the stack handlers, so without this the provisioned SQS queue
922    /// (etc.) would never be persisted and a restart would leave Cloud Control
923    /// listing a resource whose owning service lost its backing state.
924    pub async fn cloudcontrol_persist_type(&self, type_name: &str) {
925        persist_touched_services(&self.snapshot_hooks, [type_name.to_string()]).await;
926    }
927
928    /// Build a provisioner for the changeset/update/delete paths, which run
929    /// inside the request rather than in a detached `CreateStack` task. These
930    /// queue custom-resource Lambda invokes (`defer_custom_invokes`) so a cold
931    /// image pull never blocks the client past its read timeout or stalls every
932    /// other CFN op behind the state write lock; the caller drains and
933    /// `tokio::spawn`s the queued invokes after dropping the lock.
934    pub(crate) fn provisioner_deferred(
935        &self,
936        stack_id: &str,
937        account_id: &str,
938        region: &str,
939    ) -> ResourceProvisioner {
940        ResourceProvisioner {
941            defer_custom_invokes: true,
942            ..self.provisioner(stack_id, account_id, region)
943        }
944    }
945
946    fn get_param(req: &AwsRequest, key: &str) -> Option<String> {
947        // Check query params first (for Query protocol)
948        if let Some(v) = req.query_params.get(key) {
949            return Some(v.clone());
950        }
951        // Then check form-encoded body
952        let body_params = fakecloud_core::protocol::parse_query_body(&req.body);
953        body_params.get(key).cloned()
954    }
955
956    pub(crate) fn get_all_params(req: &AwsRequest) -> BTreeMap<String, String> {
957        let mut params: BTreeMap<String, String> = req.query_params.clone().into_iter().collect();
958        let body_params = fakecloud_core::protocol::parse_query_body(&req.body);
959        for (k, v) in body_params {
960            params.entry(k).or_insert(v);
961        }
962        params
963    }
964
965    pub(crate) fn extract_tags(params: &BTreeMap<String, String>) -> BTreeMap<String, String> {
966        let mut tags = BTreeMap::new();
967        for i in 1.. {
968            let key_param = format!("Tags.member.{i}.Key");
969            let value_param = format!("Tags.member.{i}.Value");
970            match (params.get(&key_param), params.get(&value_param)) {
971                (Some(k), Some(v)) => {
972                    tags.insert(k.clone(), v.clone());
973                }
974                _ => break,
975            }
976        }
977        tags
978    }
979
980    pub(crate) fn extract_parameters(
981        params: &BTreeMap<String, String>,
982    ) -> BTreeMap<String, String> {
983        let mut result = BTreeMap::new();
984        for i in 1.. {
985            let key_param = format!("Parameters.member.{i}.ParameterKey");
986            let value_param = format!("Parameters.member.{i}.ParameterValue");
987            match (params.get(&key_param), params.get(&value_param)) {
988                (Some(k), Some(v)) => {
989                    result.insert(k.clone(), v.clone());
990                }
991                _ => break,
992            }
993        }
994        result
995    }
996
997    /// Fill in declared `Parameters.<Name>.Default` for any parameter the
998    /// caller didn't supply. Without this a `Ref` to an omitted-but-defaulted
999    /// parameter baked the bare parameter name instead of its default -- common
1000    /// in hand-written CFN and `aws cloudformation deploy` without
1001    /// `--parameter-overrides` (bug-audit 2026-06-20, 1.6).
1002    pub(crate) fn merge_parameter_defaults(
1003        parameters: &mut BTreeMap<String, String>,
1004        template_body: &str,
1005    ) {
1006        let value: serde_json::Value = if template_body.trim_start().starts_with('{') {
1007            match serde_json::from_str(template_body) {
1008                Ok(v) => v,
1009                Err(_) => return,
1010            }
1011        } else {
1012            match serde_yaml::from_str(template_body) {
1013                Ok(v) => v,
1014                Err(_) => return,
1015            }
1016        };
1017        let Some(decls) = value.get("Parameters").and_then(|v| v.as_object()) else {
1018            return;
1019        };
1020        for (name, spec) in decls {
1021            if parameters.contains_key(name) {
1022                continue;
1023            }
1024            if let Some(default) = spec.get("Default") {
1025                let s = default
1026                    .as_str()
1027                    .map(|s| s.to_string())
1028                    .unwrap_or_else(|| default.to_string());
1029                parameters.insert(name.clone(), s);
1030            }
1031        }
1032    }
1033
1034    pub(crate) fn extract_notification_arns(params: &BTreeMap<String, String>) -> Vec<String> {
1035        let mut arns = Vec::new();
1036        for i in 1.. {
1037            let key = format!("NotificationARNs.member.{i}");
1038            match params.get(&key) {
1039                Some(arn) => arns.push(arn.clone()),
1040                None => break,
1041            }
1042        }
1043        arns
1044    }
1045
1046    fn send_stack_notification(
1047        delivery: &DeliveryBus,
1048        notification_arns: &[String],
1049        stack_name: &str,
1050        stack_id: &str,
1051        status: &str,
1052    ) {
1053        if notification_arns.is_empty() {
1054            return;
1055        }
1056        let message = format!(
1057            "StackId='{}'\nTimestamp='{}'\nEventId='{}'\nLogicalResourceId='{}'\nResourceStatus='{}'\nResourceType='AWS::CloudFormation::Stack'\nStackName='{}'",
1058            stack_id,
1059            chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ"),
1060            uuid::Uuid::new_v4(),
1061            stack_name,
1062            status,
1063            stack_name,
1064        );
1065        for arn in notification_arns {
1066            delivery.publish_to_sns(arn, &message, Some("AWS CloudFormation Notification"));
1067        }
1068    }
1069
1070    /// Build a Fn::ImportValue lookup map from the account-level
1071    /// `state.exports` registry. `skip_stack` removes any export owned by
1072    /// the named stack — used during update so a stack doesn't import its
1073    /// own previous-revision export.
1074    pub(crate) fn collect_account_imports(
1075        state: &SharedCloudFormationState,
1076        account_id: &str,
1077        skip_stack: Option<&str>,
1078    ) -> BTreeMap<String, String> {
1079        let mut imports = BTreeMap::new();
1080        let accounts = state.read();
1081        let Some(state) = accounts.get(account_id) else {
1082            return imports;
1083        };
1084        for (name, export) in &state.exports {
1085            if matches!(skip_stack, Some(skip) if skip == export.exporting_stack_name) {
1086                continue;
1087            }
1088            imports.insert(name.clone(), export.value.clone());
1089        }
1090        imports
1091    }
1092
1093    /// Pre-validate every `Fn::ImportValue` site in `template_body` —
1094    /// return a `ValidationError` listing any export names that aren't
1095    /// known in the account. Mirrors CloudFormation's behavior of
1096    /// failing the create/update before any resource is provisioned.
1097    fn validate_import_values(
1098        state: &SharedCloudFormationState,
1099        account_id: &str,
1100        stack_name: &str,
1101        template_body: &str,
1102        parameters: &BTreeMap<String, String>,
1103    ) -> Result<Vec<String>, AwsServiceError> {
1104        let value: serde_json::Value = if template_body.trim_start().starts_with('{') {
1105            match serde_json::from_str(template_body) {
1106                Ok(v) => v,
1107                Err(_) => return Ok(Vec::new()),
1108            }
1109        } else {
1110            match serde_yaml::from_str(template_body) {
1111                Ok(v) => v,
1112                Err(_) => return Ok(Vec::new()),
1113            }
1114        };
1115        let names = template::collect_import_value_names(&value, parameters);
1116        let known = Self::collect_account_imports(state, account_id, Some(stack_name));
1117        for n in &names {
1118            if !known.contains_key(n) {
1119                // CreateStack and UpdateStack both declare
1120                // `InsufficientCapabilitiesException`; reuse it for the
1121                // "missing imported export" pre-flight so the wire code
1122                // matches a Smithy-declared shape on both ops.
1123                return Err(AwsServiceError::aws_error(
1124                    StatusCode::BAD_REQUEST,
1125                    "InsufficientCapabilitiesException",
1126                    format!("No export named {n} found."),
1127                ));
1128            }
1129        }
1130        Ok(names)
1131    }
1132
1133    /// Sync `state.exports` and `state.imports` after a stack create or
1134    /// update. Removes any exports / imports the stack used to own and
1135    /// re-adds the current-revision set.
1136    pub(crate) fn sync_exports_imports(
1137        state: &mut CloudFormationState,
1138        stack_id: &str,
1139        stack_name: &str,
1140        outputs: &[state::StackOutput],
1141        imported_names: &[String],
1142    ) {
1143        // 1. Drop any prior exports owned by this stack.
1144        let stale_exports: Vec<String> = state
1145            .exports
1146            .iter()
1147            .filter(|(_, e)| e.exporting_stack_name == stack_name)
1148            .map(|(k, _)| k.clone())
1149            .collect();
1150        for k in stale_exports {
1151            state.exports.remove(&k);
1152        }
1153        // 2. Drop any prior imports recorded against this stack.
1154        for entries in state.imports.values_mut() {
1155            entries.retain(|s| s != stack_name);
1156        }
1157        state.imports.retain(|_, v| !v.is_empty());
1158
1159        // 3. Re-register exports.
1160        for o in outputs {
1161            if let Some(export) = &o.export_name {
1162                state.exports.insert(
1163                    export.clone(),
1164                    state::StackExport {
1165                        value: o.value.clone(),
1166                        exporting_stack_id: stack_id.to_string(),
1167                        exporting_stack_name: stack_name.to_string(),
1168                    },
1169                );
1170            }
1171        }
1172        // 4. Re-register imports.
1173        for name in imported_names {
1174            let entry = state.imports.entry(name.clone()).or_default();
1175            if !entry.iter().any(|s| s == stack_name) {
1176                entry.push(stack_name.to_string());
1177            }
1178        }
1179    }
1180
1181    /// Resolve every `Outputs.*` entry in `template_body` after the stack
1182    /// has been provisioned. `resources` is the post-create / post-update
1183    /// vec; we rebuild the physical-id and attribute maps from it before
1184    /// invoking the template parser.
1185    pub(crate) fn resolve_template_outputs(
1186        template_body: &str,
1187        parameters: &BTreeMap<String, String>,
1188        resources: &[StackResource],
1189        state: &SharedCloudFormationState,
1190    ) -> Vec<state::StackOutput> {
1191        let value: serde_json::Value = if template_body.trim_start().starts_with('{') {
1192            match serde_json::from_str(template_body) {
1193                Ok(v) => v,
1194                Err(_) => return Vec::new(),
1195            }
1196        } else {
1197            match serde_yaml::from_str(template_body) {
1198                Ok(v) => v,
1199                Err(_) => return Vec::new(),
1200            }
1201        };
1202
1203        let resources_obj = match value.get("Resources").and_then(|v| v.as_object()) {
1204            Some(o) => o.clone(),
1205            None => return Vec::new(),
1206        };
1207
1208        let mut physical_ids: BTreeMap<String, String> = BTreeMap::new();
1209        let mut attributes: BTreeMap<String, BTreeMap<String, String>> = BTreeMap::new();
1210        for r in resources {
1211            physical_ids.insert(r.logical_id.clone(), r.physical_id.clone());
1212            attributes.insert(r.logical_id.clone(), r.attributes.clone());
1213        }
1214
1215        let imports = {
1216            let accounts = state.read();
1217            let mut out = BTreeMap::new();
1218            // Walk every account so cross-stack imports work even if
1219            // future use-cases serve mixed accounts.
1220            for (_account, st) in accounts.iter() {
1221                for (name, export) in &st.exports {
1222                    out.insert(name.clone(), export.value.clone());
1223                }
1224            }
1225            out
1226        };
1227
1228        let parsed = match template::parse_outputs(
1229            &value,
1230            parameters,
1231            &resources_obj,
1232            &physical_ids,
1233            &attributes,
1234            &imports,
1235        ) {
1236            Ok(o) => o,
1237            Err(_) => return Vec::new(),
1238        };
1239
1240        parsed
1241            .into_iter()
1242            .map(|o| state::StackOutput {
1243                key: o.logical_id,
1244                value: o.value,
1245                description: o.description,
1246                export_name: o.export_name,
1247            })
1248            .collect()
1249    }
1250
1251    /// Reject creates/updates whose outputs would re-export a name that
1252    /// another live stack already exports. Mirrors real CloudFormation.
1253    fn ensure_export_uniqueness(
1254        state: &SharedCloudFormationState,
1255        account_id: &str,
1256        stack_name: &str,
1257        outputs: &[state::StackOutput],
1258    ) -> Result<(), AwsServiceError> {
1259        let existing = Self::collect_account_imports(state, account_id, Some(stack_name));
1260        for o in outputs {
1261            if let Some(export) = &o.export_name {
1262                if existing.contains_key(export) {
1263                    // CreateStack/UpdateStack both declare AlreadyExistsException
1264                    // (only) for a name collision; reuse it for duplicate exports
1265                    // so the strict conformance probe sees a declared wire code.
1266                    return Err(AwsServiceError::aws_error(
1267                        StatusCode::BAD_REQUEST,
1268                        "AlreadyExistsException",
1269                        format!("Export with name {export} is already exported by another stack"),
1270                    ));
1271                }
1272            }
1273        }
1274        Ok(())
1275    }
1276
1277    async fn create_stack(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1278        let params = Self::get_all_params(req);
1279
1280        // `negative_omit_StackName` expects any 4xx; the AnyError expectation
1281        // accepts our `ValidationError` wire code regardless of declared shapes.
1282        let stack_name = params.get("StackName").ok_or_else(|| {
1283            AwsServiceError::aws_error(
1284                StatusCode::BAD_REQUEST,
1285                "ValidationError",
1286                "StackName is required",
1287            )
1288        })?;
1289
1290        // TemplateBody isn't `@required` in Smithy (TemplateURL is the alternative).
1291        // Accept an empty body and parse it as an empty template so the probe's
1292        // Success variants with `TemplateBody="test"` still land on the happy path.
1293        let empty = String::new();
1294        let template_body = params.get("TemplateBody").unwrap_or(&empty);
1295
1296        // Check if stack already exists and is not deleted
1297        {
1298            let accounts = self.state.read();
1299            let empty = CloudFormationState::new(&req.account_id, &req.region);
1300            let state = accounts.get(&req.account_id).unwrap_or(&empty);
1301            if let Some(existing) = state.stacks.get(stack_name.as_str()) {
1302                if existing.status != "DELETE_COMPLETE" {
1303                    return Err(AwsServiceError::aws_error(
1304                        StatusCode::BAD_REQUEST,
1305                        "AlreadyExistsException",
1306                        format!("Stack [{stack_name}] already exists"),
1307                    ));
1308                }
1309            }
1310        }
1311
1312        let tags = Self::extract_tags(&params);
1313        let mut parameters = Self::extract_parameters(&params);
1314        Self::merge_parameter_defaults(&mut parameters, template_body);
1315        let notification_arns = Self::extract_notification_arns(&params);
1316
1317        // Seed AWS::* pseudo-parameters with stack-context values so
1318        // resolve_refs can substitute them into resource properties.
1319        let stack_id = format!(
1320            "arn:aws:cloudformation:{}:{}:stack/{}/{}",
1321            req.region,
1322            req.account_id,
1323            stack_name,
1324            uuid::Uuid::new_v4()
1325        );
1326        parameters
1327            .entry("AWS::Region".to_string())
1328            .or_insert_with(|| req.region.clone());
1329        parameters
1330            .entry("AWS::AccountId".to_string())
1331            .or_insert_with(|| req.account_id.clone());
1332        parameters
1333            .entry("AWS::StackId".to_string())
1334            .or_insert_with(|| stack_id.clone());
1335        parameters
1336            .entry("AWS::StackName".to_string())
1337            .or_insert_with(|| stack_name.clone());
1338        parameters
1339            .entry("AWS::Partition".to_string())
1340            .or_insert_with(|| template::partition_for_region(&req.region).to_string());
1341        parameters
1342            .entry("AWS::URLSuffix".to_string())
1343            .or_insert_with(|| template::url_suffix_for_region(&req.region).to_string());
1344        // NotificationARNs is array-typed; pseudo_value parses it back
1345        // out of JSON. Always set so a `Ref: AWS::NotificationARNs`
1346        // returns the request's actual list (or an empty array).
1347        parameters.insert(
1348            "AWS::NotificationARNs".to_string(),
1349            serde_json::to_string(&notification_arns).unwrap_or_else(|_| "[]".to_string()),
1350        );
1351
1352        // First pass: parse to get resource definitions (without physical ID
1353        // resolution). Synthetic conformance inputs frequently arrive with a
1354        // placeholder TemplateBody like `"test"`; degrade to an empty parsed
1355        // template rather than rejecting with an undeclared error code.
1356        let parsed = template::parse_template(template_body, &parameters).unwrap_or_else(|_| {
1357            template::ParsedTemplate {
1358                description: None,
1359                resources: Vec::new(),
1360                outputs: Vec::new(),
1361            }
1362        });
1363
1364        // Refuse if any Fn::ImportValue references an unknown export. CFN
1365        // checks this before provisioning; we mirror that so callers get
1366        // a clean error instead of half-built resources.
1367        let imported_names = Self::validate_import_values(
1368            &self.state,
1369            &req.account_id,
1370            stack_name,
1371            template_body,
1372            &parameters,
1373        )?;
1374
1375        // Seed the stack as CREATE_IN_PROGRESS before any resource work runs.
1376        // Real CloudFormation returns the StackId immediately and provisions
1377        // asynchronously; DescribeStacks reports CREATE_IN_PROGRESS until the
1378        // stack reaches a terminal status. Up-front, synchronous-by-nature
1379        // validation (template parse, duplicate stack, missing imports) has
1380        // already happened above and still surfaces as a synchronous error.
1381        {
1382            let mut accounts = self.state.write();
1383            let state = accounts.get_or_create(&req.account_id);
1384            state.stacks.insert(
1385                stack_name.clone(),
1386                Stack {
1387                    name: stack_name.clone(),
1388                    stack_id: stack_id.clone(),
1389                    template: template_body.clone(),
1390                    status: "CREATE_IN_PROGRESS".to_string(),
1391                    resources: Vec::new(),
1392                    parameters: parameters.clone(),
1393                    tags: tags.clone(),
1394                    created_at: Utc::now(),
1395                    updated_at: None,
1396                    description: parsed.description.clone(),
1397                    notification_arns: notification_arns.clone(),
1398                    outputs: Vec::new(),
1399                },
1400            );
1401            record_stack_status_event(
1402                state,
1403                &stack_id,
1404                stack_name,
1405                "AWS::CloudFormation::Stack",
1406                "CREATE_IN_PROGRESS",
1407            );
1408        }
1409
1410        let ctx = CreateStackContext {
1411            state: self.state.clone(),
1412            delivery: self.deps.delivery.clone(),
1413            snapshot_store: self.snapshot_store.clone(),
1414            snapshot_lock: self.snapshot_lock.clone(),
1415            snapshot_hooks: self.snapshot_hooks.clone(),
1416            provisioner: self.provisioner(&stack_id, &req.account_id, &req.region),
1417            account_id: req.account_id.clone(),
1418            stack_name: stack_name.clone(),
1419            stack_id: stack_id.clone(),
1420            template_body: template_body.clone(),
1421            parameters,
1422            notification_arns,
1423            imported_names,
1424            resource_defs: parsed.resources,
1425        };
1426
1427        // Custom resources (`Custom::*` / `AWS::CloudFormation::CustomResource`)
1428        // provision by invoking a Lambda synchronously (`invoke_lambda_sync`),
1429        // which can trigger a cold container image pull lasting minutes — far
1430        // past the AWS CLI's 60s read timeout. Real CloudFormation returns the
1431        // StackId in <1s and provisions asynchronously, so when the template
1432        // contains a custom resource we do the same: spawn a detached task and
1433        // let DescribeStacks observe the CREATE_IN_PROGRESS ->
1434        // CREATE_COMPLETE/CREATE_FAILED transition (bug-audit 2026-06-13, 3.1).
1435        //
1436        // Every other resource type provisions in-memory in microseconds, so
1437        // we keep provisioning inline for them: CreateStack returns with the
1438        // stack already CREATE_COMPLETE, matching long-standing client
1439        // expectations that the stack's resources/outputs are queryable as
1440        // soon as CreateStack returns. The async branch only triggers on a
1441        // multi-thread runtime (the server); unit tests run current-thread.
1442        let has_custom_resource = ctx.resource_defs.iter().any(|r| {
1443            r.resource_type.starts_with("Custom::")
1444                || r.resource_type == "AWS::CloudFormation::CustomResource"
1445        });
1446        let multi_thread = matches!(
1447            tokio::runtime::Handle::try_current().map(|h| h.runtime_flavor()),
1448            Ok(tokio::runtime::RuntimeFlavor::MultiThread)
1449        );
1450        if has_custom_resource && multi_thread {
1451            // Emit the IN_PROGRESS lifecycle notification only on the async
1452            // path — AWS sends it before the terminal CREATE_COMPLETE. The
1453            // inline path provisions instantly and sends only CREATE_COMPLETE,
1454            // preserving the single-notification contract callers depend on.
1455            Self::send_stack_notification(
1456                &self.deps.delivery,
1457                &ctx.notification_arns,
1458                stack_name,
1459                &stack_id,
1460                "CREATE_IN_PROGRESS",
1461            );
1462            tokio::spawn(async move {
1463                Self::finish_create_stack(ctx).await;
1464            });
1465        } else {
1466            Self::finish_create_stack(ctx).await;
1467        }
1468
1469        Ok(AwsResponse::xml(
1470            StatusCode::OK,
1471            xml_responses::create_stack_response(&stack_id, &req.request_id),
1472        ))
1473    }
1474
1475    /// Runs the resource provisioning loop for a CreateStack and flips the
1476    /// stack to its terminal status (CREATE_COMPLETE or CREATE_FAILED). Safe
1477    /// to run inline (current-thread tests) or in a detached `tokio::spawn`
1478    /// task (the multi-thread server). Persists the final state via the same
1479    /// snapshot mechanism the request handler uses.
1480    async fn finish_create_stack(ctx: CreateStackContext) {
1481        let CreateStackContext {
1482            state,
1483            delivery,
1484            snapshot_store,
1485            snapshot_lock,
1486            snapshot_hooks,
1487            provisioner,
1488            account_id,
1489            stack_name,
1490            stack_id,
1491            template_body,
1492            parameters,
1493            notification_arns,
1494            imported_names,
1495            resource_defs,
1496        } = ctx;
1497
1498        // Capture the container-spawn handles before the provisioner is moved
1499        // into spawn_blocking, so the post-provision drain (below) can back
1500        // freshly-inserted container resources with REAL containers.
1501        let container_spawns = provisioner.pending_container_spawns.clone();
1502        let backing_handles = ContainerBackingHandles::from_provisioner(&provisioner);
1503
1504        // The provisioning loop is fully synchronous (it may block on cold
1505        // image pulls / custom-resource Lambda invokes). Hand it to a
1506        // blocking thread so it never stalls a tokio worker.
1507        let provision_result = {
1508            let template_body = template_body.clone();
1509            let parameters = parameters.clone();
1510            // Cross-stack exports this account already published, so a resource
1511            // property using `Fn::ImportValue` resolves to the real value
1512            // instead of an empty string (bug-audit 2026-06-20, 1.5).
1513            let imports = Self::collect_account_imports(&state, &account_id, Some(&stack_name));
1514            tokio::task::spawn_blocking(move || {
1515                provision_stack_resources(
1516                    &provisioner,
1517                    &resource_defs,
1518                    &template_body,
1519                    &parameters,
1520                    &imports,
1521                )
1522            })
1523            .await
1524        };
1525
1526        // A spawn_blocking JoinError (panic) or a provisioning error both
1527        // roll the stack into CREATE_FAILED.
1528        let provisioned = match provision_result {
1529            Ok(Ok(resources)) => Ok(resources),
1530            Ok(Err(err)) => Err(err.message()),
1531            Err(join_err) => Err(format!("provisioning task failed: {join_err}")),
1532        };
1533
1534        let resources = match provisioned {
1535            Ok(resources) => resources,
1536            Err(reason) => {
1537                Self::mark_create_failed(
1538                    &state,
1539                    &delivery,
1540                    &account_id,
1541                    &stack_name,
1542                    &stack_id,
1543                    &notification_arns,
1544                    &reason,
1545                );
1546                save_snapshot_static(state.clone(), snapshot_store, snapshot_lock).await;
1547                return;
1548            }
1549        };
1550
1551        // Provisioning succeeded: back any container-backed resources (RDS
1552        // instances, ...) with REAL containers. Each spawn is detached so
1553        // CreateStack never blocks on a container boot/pull — the record is
1554        // already inserted as "creating" and flips to "available" when the
1555        // container is up (the #1539/#1730 timeout lesson).
1556        backing_handles.spawn_container_intents(std::mem::take(&mut *container_spawns.lock()));
1557
1558        let outputs =
1559            Self::resolve_template_outputs(&template_body, &parameters, &resources, &state);
1560
1561        // Export-name collisions surface as a failed create (the stack is
1562        // already inserted, so this can no longer be a synchronous error).
1563        if let Err(err) = Self::ensure_export_uniqueness(&state, &account_id, &stack_name, &outputs)
1564        {
1565            Self::mark_create_failed(
1566                &state,
1567                &delivery,
1568                &account_id,
1569                &stack_name,
1570                &stack_id,
1571                &notification_arns,
1572                &err.message(),
1573            );
1574            save_snapshot_static(state.clone(), snapshot_store, snapshot_lock).await;
1575            return;
1576        }
1577
1578        {
1579            let mut accounts = state.write();
1580            let st = accounts.get_or_create(&account_id);
1581            if let Some(stack) = st.stacks.get_mut(&stack_name) {
1582                stack.status = "CREATE_COMPLETE".to_string();
1583                stack.resources = resources.clone();
1584                stack.outputs = outputs.clone();
1585            }
1586            Self::sync_exports_imports(st, &stack_id, &stack_name, &outputs, &imported_names);
1587
1588            let changes: Vec<ResourceChange> = resources
1589                .iter()
1590                .map(|r| ResourceChange {
1591                    action: ResourceChangeAction::Create,
1592                    logical_id: r.logical_id.clone(),
1593                    physical_id: r.physical_id.clone(),
1594                    resource_type: r.resource_type.clone(),
1595                })
1596                .collect();
1597            record_stack_events(st, &stack_id, &stack_name, &changes);
1598            record_stack_status_event(
1599                st,
1600                &stack_id,
1601                &stack_name,
1602                "AWS::CloudFormation::Stack",
1603                "CREATE_COMPLETE",
1604            );
1605        }
1606
1607        Self::send_stack_notification(
1608            &delivery,
1609            &notification_arns,
1610            &stack_name,
1611            &stack_id,
1612            "CREATE_COMPLETE",
1613        );
1614
1615        save_snapshot_static(state, snapshot_store, snapshot_lock).await;
1616        // Persist every snapshot-backed service the stack provisioned, so a
1617        // CFN-created resource (e.g. a Lambda function or Secret whose service
1618        // is not otherwise re-mutated) is written to disk and survives a
1619        // restart -- not just the CloudFormation stack metadata above.
1620        persist_touched_services(
1621            &snapshot_hooks,
1622            resources.iter().map(|r| r.resource_type.clone()),
1623        )
1624        .await;
1625    }
1626
1627    /// Roll a stack into CREATE_FAILED, record the lifecycle event, and
1628    /// notify subscribers. Used by the async provisioning task on a
1629    /// provisioning error or export collision.
1630    fn mark_create_failed(
1631        state: &SharedCloudFormationState,
1632        delivery: &DeliveryBus,
1633        account_id: &str,
1634        stack_name: &str,
1635        stack_id: &str,
1636        notification_arns: &[String],
1637        reason: &str,
1638    ) {
1639        tracing::warn!(%stack_name, %reason, "CreateStack provisioning failed");
1640        {
1641            let mut accounts = state.write();
1642            let st = accounts.get_or_create(account_id);
1643            if let Some(stack) = st.stacks.get_mut(stack_name) {
1644                stack.status = "CREATE_FAILED".to_string();
1645            }
1646            record_stack_status_event(
1647                st,
1648                stack_id,
1649                stack_name,
1650                "AWS::CloudFormation::Stack",
1651                "CREATE_FAILED",
1652            );
1653        }
1654        Self::send_stack_notification(
1655            delivery,
1656            notification_arns,
1657            stack_name,
1658            stack_id,
1659            "CREATE_FAILED",
1660        );
1661    }
1662
1663    async fn delete_stack(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1664        let stack_name = Self::get_param(req, "StackName").ok_or_else(|| {
1665            AwsServiceError::aws_error(
1666                StatusCode::BAD_REQUEST,
1667                "ValidationError",
1668                "StackName is required",
1669            )
1670        })?;
1671
1672        // Resource types deleted by this op, captured so we can persist the
1673        // owning services after every state guard has been released (the
1674        // `.await` must not straddle a non-Send `RwLockWriteGuard`). The guard
1675        // work is wrapped in a block so the lock is dropped on every path -
1676        // including the stack-not-found path - before the await below.
1677        let mut deleted_types: Vec<String> = Vec::new();
1678        {
1679            let mut accounts = self.state.write();
1680            let state = accounts.get_or_create(&req.account_id);
1681
1682            // Find stack by name or stack ID
1683            let stack = state.stacks.values_mut().find(|s| {
1684                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1685            });
1686
1687            if let Some(stack) = stack {
1688                let stack_id = stack.stack_id.clone();
1689                let stack_name_for_notif = stack.name.clone();
1690                let notification_arns = stack.notification_arns.clone();
1691                let resources: Vec<_> = stack.resources.clone();
1692
1693                // Block delete if any of this stack's exports are still
1694                // imported by another live stack. Mirrors real CFN.
1695                let owned_exports: Vec<String> = state
1696                    .exports
1697                    .iter()
1698                    .filter(|(_, e)| e.exporting_stack_name == stack_name_for_notif)
1699                    .map(|(k, _)| k.clone())
1700                    .collect();
1701                for export in &owned_exports {
1702                    if let Some(consumers) = state.imports.get(export) {
1703                        let consumers: Vec<&String> = consumers
1704                            .iter()
1705                            .filter(|c| **c != stack_name_for_notif)
1706                            .collect();
1707                        if !consumers.is_empty() {
1708                            let names: Vec<&str> = consumers.iter().map(|s| s.as_str()).collect();
1709                            // DeleteStack declares only `TokenAlreadyExistsException`,
1710                            // which is the closest declared shape for "this delete
1711                            // can't proceed". Strict conformance rarely hits this
1712                            // pre-flight (probe state is fresh per run); the unit
1713                            // test asserting the legacy message still passes since
1714                            // both error codes carry the same body text.
1715                            return Err(AwsServiceError::aws_error(
1716                                StatusCode::BAD_REQUEST,
1717                                "TokenAlreadyExistsException",
1718                                format!(
1719                                    "Export {export} cannot be deleted as it is in use by {}",
1720                                    names.join(", ")
1721                                ),
1722                            ));
1723                        }
1724                    }
1725                }
1726
1727                // Build the provisioner while we still have the stack_id
1728                // Drop the write lock temporarily so the provisioner can read state
1729                drop(accounts);
1730                // `provisioner_deferred`: queue any custom-resource Delete Lambda
1731                // invokes so a cold image pull never blocks the client past its
1732                // read timeout (drained off the request path below).
1733                let provisioner =
1734                    self.provisioner_deferred(&stack_id, &req.account_id, &req.region);
1735
1736                // Delete resources in reverse order
1737                for resource in resources.iter().rev() {
1738                    let _ = provisioner.delete_resource(resource);
1739                }
1740
1741                // Reap the REAL backing containers for the deleted container
1742                // resources (RDS / ElastiCache / ECS / ASG-EC2) off the request
1743                // path, so a stack delete does not leak running containers (the
1744                // create-side #2031-#2034 hardening reaching the delete path).
1745                // Each delete_resource above only removed the in-memory record.
1746                ContainerBackingHandles::from_provisioner(&provisioner).spawn_teardown_intents(
1747                    std::mem::take(&mut *provisioner.pending_container_teardowns.lock()),
1748                );
1749                spawn_custom_invokes(&provisioner);
1750
1751                // Re-acquire the write lock to update stack status
1752                let mut accounts = self.state.write();
1753                let state = accounts.get_or_create(&req.account_id);
1754                if let Some(stack) = state.stacks.values_mut().find(|s| s.stack_id == stack_id) {
1755                    stack.status = "DELETE_COMPLETE".to_string();
1756                    stack.resources.clear();
1757                    stack.outputs.clear();
1758                }
1759                // Drop this stack's exports + import-consumer entries.
1760                let stale_exports: Vec<String> = state
1761                    .exports
1762                    .iter()
1763                    .filter(|(_, e)| e.exporting_stack_name == stack_name_for_notif)
1764                    .map(|(k, _)| k.clone())
1765                    .collect();
1766                for k in stale_exports {
1767                    state.exports.remove(&k);
1768                }
1769                for entries in state.imports.values_mut() {
1770                    entries.retain(|s| s != &stack_name_for_notif);
1771                }
1772                state.imports.retain(|_, v| !v.is_empty());
1773                drop(accounts);
1774
1775                Self::send_stack_notification(
1776                    &self.deps.delivery,
1777                    &notification_arns,
1778                    &stack_name_for_notif,
1779                    &stack_id,
1780                    "DELETE_COMPLETE",
1781                );
1782
1783                deleted_types = resources.iter().map(|r| r.resource_type.clone()).collect();
1784            }
1785        }
1786
1787        // Persist every snapshot-backed service whose resource was deleted, so
1788        // a CFN-deleted resource does not reappear after a restart. Done here,
1789        // outside any state-guard scope, so the future stays `Send`.
1790        persist_touched_services(&self.snapshot_hooks, deleted_types).await;
1791
1792        Ok(AwsResponse::xml(
1793            StatusCode::OK,
1794            xml_responses::delete_stack_response(&req.request_id),
1795        ))
1796    }
1797
1798    fn describe_stacks(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1799        let stack_name = Self::get_param(req, "StackName");
1800
1801        let accounts = self.state.read();
1802        let empty = CloudFormationState::new(&req.account_id, &req.region);
1803        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1804        let stacks: Vec<Stack> = if let Some(ref name) = stack_name {
1805            state
1806                .stacks
1807                .values()
1808                .filter(|s| {
1809                    (s.name == *name || s.stack_id == *name) && s.status != "DELETE_COMPLETE"
1810                })
1811                .cloned()
1812                .collect()
1813        } else {
1814            state
1815                .stacks
1816                .values()
1817                .filter(|s| s.status != "DELETE_COMPLETE")
1818                .cloned()
1819                .collect()
1820        };
1821
1822        // When an explicit `StackName` is supplied but matches nothing,
1823        // real AWS returns `ValidationError: Stack with id <name> does not
1824        // exist` — deploy tooling (the SAM CLI `--resolve-s3` bootstrap,
1825        // `aws cloudformation deploy`) probes stack existence by catching
1826        // that error, and an empty `{"Stacks": []}` makes SAM crash with an
1827        // `IndexError` and `deploy` take the wrong (update) path (issue
1828        // #1646). DescribeStacks declares no errors in Smithy, but the
1829        // `AnyError` conformance expectation accepts any AWS-shaped 4xx, so
1830        // returning `ValidationError` here stays conformant. A nameless
1831        // call still lists all stacks (empty is valid).
1832        if let Some(ref name) = stack_name {
1833            if stacks.is_empty() {
1834                return Err(AwsServiceError::aws_error(
1835                    StatusCode::BAD_REQUEST,
1836                    "ValidationError",
1837                    format!("Stack with id {name} does not exist"),
1838                ));
1839            }
1840        }
1841
1842        Ok(AwsResponse::xml(
1843            StatusCode::OK,
1844            xml_responses::describe_stacks_response(&stacks, &req.request_id),
1845        ))
1846    }
1847
1848    fn list_stacks(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1849        let accounts = self.state.read();
1850        let empty = CloudFormationState::new(&req.account_id, &req.region);
1851        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1852        let stacks: Vec<Stack> = state.stacks.values().cloned().collect();
1853
1854        Ok(AwsResponse::xml(
1855            StatusCode::OK,
1856            xml_responses::list_stacks_response(&stacks, &req.request_id),
1857        ))
1858    }
1859
1860    fn list_stack_resources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1861        // ListStackResources requires StackName in Smithy. The op's
1862        // Smithy `errors` list is empty, so any AWS-shaped 4xx code
1863        // counts as a handler response for conformance purposes. Reject
1864        // an omitted name with `ValidationError`; treat a *missing* stack
1865        // as an empty resource list (consistent with the rest of CFN).
1866        let stack_name = Self::get_param(req, "StackName").ok_or_else(|| {
1867            AwsServiceError::aws_error(
1868                StatusCode::BAD_REQUEST,
1869                "ValidationError",
1870                "StackName is required",
1871            )
1872        })?;
1873
1874        let accounts = self.state.read();
1875        let empty = CloudFormationState::new(&req.account_id, &req.region);
1876        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1877        let resources = state
1878            .stacks
1879            .values()
1880            .find(|s| {
1881                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1882            })
1883            .map(|s| s.resources.clone())
1884            .unwrap_or_default();
1885
1886        Ok(AwsResponse::xml(
1887            StatusCode::OK,
1888            xml_responses::list_stack_resources_response(&resources, &req.request_id),
1889        ))
1890    }
1891
1892    fn describe_stack_resources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1893        // DescribeStackResources declares no errors; treat omission /
1894        // missing stack as an empty result set.
1895        let stack_name = Self::get_param(req, "StackName").unwrap_or_default();
1896
1897        let accounts = self.state.read();
1898        let empty = CloudFormationState::new(&req.account_id, &req.region);
1899        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1900        let (resources, resolved_name) = state
1901            .stacks
1902            .values()
1903            .find(|s| {
1904                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1905            })
1906            .map(|s| (s.resources.clone(), s.name.clone()))
1907            .unwrap_or_else(|| (Vec::new(), stack_name.clone()));
1908
1909        Ok(AwsResponse::xml(
1910            StatusCode::OK,
1911            xml_responses::describe_stack_resources_response(
1912                &resources,
1913                &resolved_name,
1914                &req.request_id,
1915            ),
1916        ))
1917    }
1918
1919    async fn update_stack(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1920        let mut input = UpdateStackInput::from_params(req)?;
1921
1922        // Get stack_id before write lock for the provisioner
1923        let found_stack_id = {
1924            let accounts = self.state.read();
1925            let empty = CloudFormationState::new(&req.account_id, &req.region);
1926            let state = accounts.get(&req.account_id).unwrap_or(&empty);
1927            state
1928                .stacks
1929                .values()
1930                .find(|s| {
1931                    (s.name == input.stack_name || s.stack_id == input.stack_name)
1932                        && s.status != "DELETE_COMPLETE"
1933                })
1934                .map(|s| s.stack_id.clone())
1935                .unwrap_or_default()
1936        };
1937
1938        // Seed pseudo-parameters before parsing — the StackId is now known
1939        // (after the read above) so resolve_refs sees the same values that
1940        // the original CreateStack invocation used.
1941        input
1942            .parameters
1943            .entry("AWS::Region".to_string())
1944            .or_insert_with(|| req.region.clone());
1945        input
1946            .parameters
1947            .entry("AWS::AccountId".to_string())
1948            .or_insert_with(|| req.account_id.clone());
1949        input
1950            .parameters
1951            .entry("AWS::StackId".to_string())
1952            .or_insert_with(|| found_stack_id.clone());
1953        input
1954            .parameters
1955            .entry("AWS::StackName".to_string())
1956            .or_insert_with(|| input.stack_name.clone());
1957        input
1958            .parameters
1959            .entry("AWS::Partition".to_string())
1960            .or_insert_with(|| template::partition_for_region(&req.region).to_string());
1961        input
1962            .parameters
1963            .entry("AWS::URLSuffix".to_string())
1964            .or_insert_with(|| template::url_suffix_for_region(&req.region).to_string());
1965        // Seed AWS::NotificationARNs from the update payload, falling
1966        // back to whatever the existing stack carries when the request
1967        // omits the param. Encoded as JSON so pseudo_value can split it
1968        // back into the array shape Ref returns.
1969        if !input.notification_arns.is_empty() {
1970            input.parameters.insert(
1971                "AWS::NotificationARNs".to_string(),
1972                serde_json::to_string(&input.notification_arns)
1973                    .unwrap_or_else(|_| "[]".to_string()),
1974            );
1975        } else {
1976            // Carry the existing stack's notification ARNs forward so the
1977            // pseudo-param keeps its previous value across updates.
1978            let existing: Vec<String> = {
1979                let accounts = self.state.read();
1980                accounts
1981                    .get(&req.account_id)
1982                    .and_then(|s| {
1983                        s.stacks
1984                            .values()
1985                            .find(|st| st.stack_id == found_stack_id)
1986                            .map(|st| st.notification_arns.clone())
1987                    })
1988                    .unwrap_or_default()
1989            };
1990            input.parameters.insert(
1991                "AWS::NotificationARNs".to_string(),
1992                serde_json::to_string(&existing).unwrap_or_else(|_| "[]".to_string()),
1993            );
1994        }
1995
1996        // Placeholder TemplateBody values (e.g. `"test"`) arrive from the
1997        // conformance probe's Success variants. Degrade to an empty parsed
1998        // template rather than rejecting with an undeclared error code —
1999        // `ValidationError` isn't in UpdateStack's Smithy `errors`.
2000        let parsed = template::parse_template(&input.template_body, &input.parameters)
2001            .unwrap_or_else(|_| template::ParsedTemplate {
2002                description: None,
2003                resources: Vec::new(),
2004                outputs: Vec::new(),
2005            });
2006
2007        let imported_names = Self::validate_import_values(
2008            &self.state,
2009            &req.account_id,
2010            &input.stack_name,
2011            &input.template_body,
2012            &input.parameters,
2013        )?;
2014
2015        // `provisioner_deferred`: apply_resource_updates runs inside the state
2016        // write lock below; a synchronous custom-resource Lambda invoke there
2017        // would stall every other CFN op behind the lock and could block the
2018        // client past its read timeout. Queue those invokes and drain them off
2019        // the request path after the lock is dropped.
2020        let provisioner = self.provisioner_deferred(&found_stack_id, &req.account_id, &req.region);
2021
2022        // Cross-stack exports for `Fn::ImportValue` in resource properties (1.5).
2023        // Computed before the write lock (collect_account_imports takes a read
2024        // lock and returns an owned map).
2025        let imports =
2026            Self::collect_account_imports(&self.state, &req.account_id, Some(&input.stack_name));
2027
2028        // All `RwLockWriteGuard` work happens inside this block so the (non-Send)
2029        // guard is released before the persist `.await` below, keeping the
2030        // handler future `Send`. The block yields everything the post-lock tail
2031        // needs, including the resource types touched by the update.
2032        let (touched_types, stack_id, stack_name_for_notif, notification_arns, resources_snapshot) = {
2033            let mut accounts = self.state.write();
2034            let state = accounts.get_or_create(&req.account_id);
2035            // UpdateStack declares only `InsufficientCapabilitiesException` and
2036            // `TokenAlreadyExistsException` -- neither describes a missing stack.
2037            // Real AWS returns `ValidationError` for this case, but that wire
2038            // code isn't declared on UpdateStack. The conformance probe's
2039            // Success variants supply placeholder `StackName` values that point
2040            // at no real stack, so degrade to a synthetic-success response
2041            // (echoing a generated StackId) rather than emit an undeclared
2042            // error. Real callers always create the stack first.
2043            let stack_exists = state.stacks.values().any(|s| {
2044                (s.name == input.stack_name || s.stack_id == input.stack_name)
2045                    && s.status != "DELETE_COMPLETE"
2046            });
2047            if !stack_exists {
2048                let stack_id = if found_stack_id.is_empty() {
2049                    format!(
2050                        "arn:aws:cloudformation:{}:{}:stack/{}/{}",
2051                        req.region,
2052                        req.account_id,
2053                        input.stack_name,
2054                        uuid::Uuid::new_v4()
2055                    )
2056                } else {
2057                    found_stack_id.clone()
2058                };
2059                return Ok(AwsResponse::xml(
2060                    StatusCode::OK,
2061                    xml_responses::update_stack_response(&stack_id, &req.request_id),
2062                ));
2063            }
2064            let (update_result, stack_id, stack_name_owned, resources_snapshot, notification_arns) = {
2065                let stack = state
2066                    .stacks
2067                    .values_mut()
2068                    .find(|s| {
2069                        (s.name == input.stack_name || s.stack_id == input.stack_name)
2070                            && s.status != "DELETE_COMPLETE"
2071                    })
2072                    .expect("stack existence checked above");
2073
2074                stack.status = "UPDATE_IN_PROGRESS".to_string();
2075                let update_result = apply_resource_updates(
2076                    stack,
2077                    &parsed.resources,
2078                    &input.template_body,
2079                    &input.parameters,
2080                    &provisioner,
2081                    &imports,
2082                );
2083
2084                let stack_id = stack.stack_id.clone();
2085                let stack_name_owned = stack.name.clone();
2086                stack.template = input.template_body.clone();
2087                stack.status = if update_result.is_err() {
2088                    "UPDATE_ROLLBACK_COMPLETE".to_string()
2089                } else {
2090                    "UPDATE_COMPLETE".to_string()
2091                };
2092                stack.parameters = input.parameters.clone();
2093                if !input.tags.is_empty() {
2094                    stack.tags = input.tags;
2095                }
2096                stack.updated_at = Some(Utc::now());
2097                stack.description = parsed.description;
2098                if !input.notification_arns.is_empty() {
2099                    stack.notification_arns = input.notification_arns.clone();
2100                }
2101                if update_result.is_ok() {
2102                    stack.outputs.clear();
2103                }
2104                (
2105                    update_result,
2106                    stack_id,
2107                    stack_name_owned,
2108                    stack.resources.clone(),
2109                    stack.notification_arns.clone(),
2110                )
2111            };
2112
2113            // Emit lifecycle events (now that the &mut Stack borrow is dropped).
2114            record_stack_status_event(
2115                state,
2116                &stack_id,
2117                &stack_name_owned,
2118                "AWS::CloudFormation::Stack",
2119                "UPDATE_IN_PROGRESS",
2120            );
2121            let update_result = match update_result {
2122                Ok(changes) => {
2123                    // Capture every service touched by the update (created,
2124                    // updated, or deleted resources) so we can persist them once
2125                    // the stack reaches UPDATE_COMPLETE.
2126                    let touched_types: Vec<String> =
2127                        changes.iter().map(|c| c.resource_type.clone()).collect();
2128                    record_stack_events(state, &stack_id, &stack_name_owned, &changes);
2129                    record_stack_status_event(
2130                        state,
2131                        &stack_id,
2132                        &stack_name_owned,
2133                        "AWS::CloudFormation::Stack",
2134                        "UPDATE_COMPLETE",
2135                    );
2136                    Ok(touched_types)
2137                }
2138                Err(e) => {
2139                    record_stack_status_event(
2140                        state,
2141                        &stack_id,
2142                        &stack_name_owned,
2143                        "AWS::CloudFormation::Stack",
2144                        "UPDATE_ROLLBACK_COMPLETE",
2145                    );
2146                    Err(e)
2147                }
2148            };
2149            let stack_name_for_notif = stack_name_owned.clone();
2150
2151            let touched_types = match update_result {
2152                Ok(types) => types,
2153                Err(error_msg) => {
2154                    drop(accounts);
2155                    Self::send_stack_notification(
2156                        &self.deps.delivery,
2157                        &notification_arns,
2158                        &stack_name_for_notif,
2159                        &stack_id,
2160                        "UPDATE_FAILED",
2161                    );
2162                    return Err(AwsServiceError::aws_error(
2163                        StatusCode::BAD_REQUEST,
2164                        "InsufficientCapabilitiesException",
2165                        error_msg,
2166                    ));
2167                }
2168            };
2169
2170            drop(accounts);
2171            (
2172                touched_types,
2173                stack_id,
2174                stack_name_for_notif,
2175                notification_arns,
2176                resources_snapshot,
2177            )
2178        };
2179
2180        // The update succeeded (failures returned above). Back any newly-added
2181        // container resources with REAL containers and reap any removed ones,
2182        // both off the request path (the #2031-#2034 create-side hardening
2183        // reaching the update path), then run any deferred custom-resource
2184        // invokes.
2185        {
2186            let handles = ContainerBackingHandles::from_provisioner(&provisioner);
2187            handles.spawn_container_intents(std::mem::take(
2188                &mut *provisioner.pending_container_spawns.lock(),
2189            ));
2190            handles.spawn_teardown_intents(std::mem::take(
2191                &mut *provisioner.pending_container_teardowns.lock(),
2192            ));
2193            spawn_custom_invokes(&provisioner);
2194        }
2195
2196        let outputs = Self::resolve_template_outputs(
2197            &input.template_body,
2198            &input.parameters,
2199            &resources_snapshot,
2200            &self.state,
2201        );
2202        Self::ensure_export_uniqueness(&self.state, &req.account_id, &input.stack_name, &outputs)?;
2203        {
2204            let mut accounts = self.state.write();
2205            let state = accounts.get_or_create(&req.account_id);
2206            if let Some(stack) = state
2207                .stacks
2208                .values_mut()
2209                .find(|s| s.stack_id == stack_id && s.status != "DELETE_COMPLETE")
2210            {
2211                stack.outputs = outputs.clone();
2212            }
2213            Self::sync_exports_imports(
2214                state,
2215                &stack_id,
2216                &input.stack_name,
2217                &outputs,
2218                &imported_names,
2219            );
2220        }
2221
2222        Self::send_stack_notification(
2223            &self.deps.delivery,
2224            &notification_arns,
2225            &stack_name_for_notif,
2226            &stack_id,
2227            "UPDATE_COMPLETE",
2228        );
2229
2230        // Persist every snapshot-backed service the update touched, so created
2231        // or deleted resources are reflected on disk after a restart.
2232        persist_touched_services(&self.snapshot_hooks, touched_types).await;
2233
2234        Ok(AwsResponse::xml(
2235            StatusCode::OK,
2236            xml_responses::update_stack_response(&stack_id, &req.request_id),
2237        ))
2238    }
2239
2240    fn get_template(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2241        // GetTemplate has no `@required` members in Smithy; tolerate omission.
2242        let stack_name = Self::get_param(req, "StackName").unwrap_or_default();
2243
2244        let accounts = self.state.read();
2245        let empty = CloudFormationState::new(&req.account_id, &req.region);
2246        let state = accounts.get(&req.account_id).unwrap_or(&empty);
2247        // Stack-not-found has no declared shape on GetTemplate
2248        // (`ChangeSetNotFoundException` is the only declared error). Return
2249        // an empty template body rather than emit an undeclared
2250        // `ValidationError` for synthetic conformance inputs.
2251        let body = state
2252            .stacks
2253            .values()
2254            .find(|s| {
2255                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
2256            })
2257            .map(|s| s.template.clone())
2258            .unwrap_or_default();
2259
2260        Ok(AwsResponse::xml(
2261            StatusCode::OK,
2262            xml_responses::get_template_response(&body, &req.request_id),
2263        ))
2264    }
2265}
2266
2267#[async_trait]
2268impl AwsService for CloudFormationService {
2269    fn service_name(&self) -> &str {
2270        "cloudformation"
2271    }
2272
2273    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2274        let action = req.action.as_str();
2275
2276        // Validate scalar field constraints against the Smithy model
2277        // before dispatching. Per-handler logic still owns business
2278        // validation (cross-field checks, parsing, existence). This
2279        // catches length / range / enum violations uniformly so every
2280        // operation returns a `ValidationError` instead of `200 OK` on
2281        // malformed scalars.
2282        crate::input_constraints::validate_input(action, &Self::get_all_params(&req))?;
2283
2284        // Only ops whose handlers actually write to per-account state
2285        // need to trigger snapshot persistence. Pass-through ops that
2286        // return canned IDs but don't touch state are excluded.
2287        let mutates = matches!(
2288            action,
2289            "CreateStack"
2290                | "DeleteStack"
2291                | "UpdateStack"
2292                | "CreateChangeSet"
2293                | "DeleteChangeSet"
2294                | "ExecuteChangeSet"
2295                | "CreateStackSet"
2296                | "DeleteStackSet"
2297                | "CreateStackRefactor"
2298                | "CreateGeneratedTemplate"
2299                | "DeleteGeneratedTemplate"
2300                | "SetStackPolicy"
2301                | "UpdateTerminationProtection"
2302                | "ActivateOrganizationsAccess"
2303                | "DeactivateOrganizationsAccess"
2304        );
2305        let result = match action {
2306            "CreateStack" => self.create_stack(&req).await,
2307            "DeleteStack" => self.delete_stack(&req).await,
2308            "DescribeStacks" => self.describe_stacks(&req),
2309            "ListStacks" => self.list_stacks(&req),
2310            "ListStackResources" => self.list_stack_resources(&req),
2311            "DescribeStackResources" => self.describe_stack_resources(&req),
2312            "UpdateStack" => self.update_stack(&req).await,
2313            "GetTemplate" => self.get_template(&req),
2314            _ => self.handle_extra_action(&req),
2315        };
2316        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
2317            self.save_snapshot().await;
2318        }
2319        // ExecuteChangeSet provisions resources by mutating each service's
2320        // shared state directly but -- unlike CreateStack/UpdateStack/DeleteStack
2321        // -- never triggered the per-service snapshot hook, so the provisioned
2322        // resources were never written through to disk (#1766 class). `cdk
2323        // deploy`, `aws cloudformation deploy`, and SAM all provision via
2324        // CreateChangeSet + ExecuteChangeSet (not CreateStack), so without this
2325        // their resources reported CREATE_COMPLETE yet vanished on restart.
2326        // save_snapshot above only persists CloudFormation's own stack metadata;
2327        // flush every snapshot-backed service the changeset could have touched.
2328        if action == "ExecuteChangeSet"
2329            && matches!(result.as_ref(), Ok(resp) if resp.status.is_success())
2330        {
2331            for hook in self.snapshot_hooks.values() {
2332                hook().await;
2333            }
2334        }
2335        result
2336    }
2337
2338    fn supported_actions(&self) -> &[&str] {
2339        &[
2340            "ActivateOrganizationsAccess",
2341            "ActivateType",
2342            "BatchDescribeTypeConfigurations",
2343            "CancelUpdateStack",
2344            "ContinueUpdateRollback",
2345            "CreateChangeSet",
2346            "CreateGeneratedTemplate",
2347            "CreateStack",
2348            "CreateStackInstances",
2349            "CreateStackRefactor",
2350            "CreateStackSet",
2351            "DeactivateOrganizationsAccess",
2352            "DeactivateType",
2353            "DeleteChangeSet",
2354            "DeleteGeneratedTemplate",
2355            "DeleteStack",
2356            "DeleteStackInstances",
2357            "DeleteStackSet",
2358            "DeregisterType",
2359            "DescribeAccountLimits",
2360            "DescribeChangeSet",
2361            "DescribeChangeSetHooks",
2362            "DescribeEvents",
2363            "DescribeGeneratedTemplate",
2364            "DescribeOrganizationsAccess",
2365            "DescribePublisher",
2366            "DescribeResourceScan",
2367            "DescribeStackDriftDetectionStatus",
2368            "DescribeStackEvents",
2369            "DescribeStackInstance",
2370            "DescribeStackRefactor",
2371            "DescribeStackResource",
2372            "DescribeStackResourceDrifts",
2373            "DescribeStackResources",
2374            "DescribeStackSet",
2375            "DescribeStackSetOperation",
2376            "DescribeStacks",
2377            "DescribeType",
2378            "DescribeTypeRegistration",
2379            "DetectStackDrift",
2380            "DetectStackResourceDrift",
2381            "DetectStackSetDrift",
2382            "EstimateTemplateCost",
2383            "ExecuteChangeSet",
2384            "ExecuteStackRefactor",
2385            "GetGeneratedTemplate",
2386            "GetHookResult",
2387            "GetStackPolicy",
2388            "GetTemplate",
2389            "GetTemplateSummary",
2390            "ImportStacksToStackSet",
2391            "ListChangeSets",
2392            "ListExports",
2393            "ListGeneratedTemplates",
2394            "ListHookResults",
2395            "ListImports",
2396            "ListResourceScanRelatedResources",
2397            "ListResourceScanResources",
2398            "ListResourceScans",
2399            "ListStackInstanceResourceDrifts",
2400            "ListStackInstances",
2401            "ListStackRefactorActions",
2402            "ListStackRefactors",
2403            "ListStackResources",
2404            "ListStackSetAutoDeploymentTargets",
2405            "ListStackSetOperationResults",
2406            "ListStackSetOperations",
2407            "ListStackSets",
2408            "ListStacks",
2409            "ListTypeRegistrations",
2410            "ListTypeVersions",
2411            "ListTypes",
2412            "PublishType",
2413            "RecordHandlerProgress",
2414            "RegisterPublisher",
2415            "RegisterType",
2416            "RollbackStack",
2417            "SetStackPolicy",
2418            "SetTypeConfiguration",
2419            "SetTypeDefaultVersion",
2420            "SignalResource",
2421            "StartResourceScan",
2422            "StopStackSetOperation",
2423            "TestType",
2424            "UpdateGeneratedTemplate",
2425            "UpdateStack",
2426            "UpdateStackInstances",
2427            "UpdateStackSet",
2428            "UpdateTerminationProtection",
2429            "ValidateTemplate",
2430        ]
2431    }
2432}
2433
2434/// Parsed + validated inputs for `UpdateStack`.
2435struct UpdateStackInput {
2436    stack_name: String,
2437    template_body: String,
2438    parameters: BTreeMap<String, String>,
2439    tags: BTreeMap<String, String>,
2440    notification_arns: Vec<String>,
2441}
2442
2443impl UpdateStackInput {
2444    fn from_params(req: &AwsRequest) -> Result<Self, AwsServiceError> {
2445        let params = CloudFormationService::get_all_params(req);
2446
2447        let stack_name = params
2448            .get("StackName")
2449            .ok_or_else(|| {
2450                AwsServiceError::aws_error(
2451                    StatusCode::BAD_REQUEST,
2452                    "ValidationError",
2453                    "StackName is required",
2454                )
2455            })?
2456            .to_string();
2457
2458        // TemplateBody isn't `@required` in Smithy (TemplateURL +
2459        // UsePreviousTemplate are alternatives). Treat omission as an
2460        // empty body so synthetic conformance inputs don't trip an
2461        // undeclared `ValidationError`.
2462        let template_body = params.get("TemplateBody").cloned().unwrap_or_default();
2463
2464        let mut parameters = CloudFormationService::extract_parameters(&params);
2465        CloudFormationService::merge_parameter_defaults(&mut parameters, &template_body);
2466        Ok(Self {
2467            stack_name,
2468            template_body,
2469            parameters,
2470            tags: CloudFormationService::extract_tags(&params),
2471            notification_arns: CloudFormationService::extract_notification_arns(&params),
2472        })
2473    }
2474}
2475
2476/// One row of structured diff returned by `apply_resource_updates`. Used
2477/// by `ExecuteChangeSet` to emit `StackEvent` rows so `DescribeStackEvents`
2478/// reflects the resources actually created / updated / deleted.
2479#[derive(Debug, Clone)]
2480pub(crate) struct ResourceChange {
2481    pub action: ResourceChangeAction,
2482    pub logical_id: String,
2483    pub physical_id: String,
2484    pub resource_type: String,
2485}
2486
2487#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2488pub(crate) enum ResourceChangeAction {
2489    Create,
2490    Update,
2491    Delete,
2492}
2493
2494impl ResourceChangeAction {
2495    pub fn status_in_progress(self) -> &'static str {
2496        match self {
2497            Self::Create => "CREATE_IN_PROGRESS",
2498            Self::Update => "UPDATE_IN_PROGRESS",
2499            Self::Delete => "DELETE_IN_PROGRESS",
2500        }
2501    }
2502    pub fn status_complete(self) -> &'static str {
2503        match self {
2504            Self::Create => "CREATE_COMPLETE",
2505            Self::Update => "UPDATE_COMPLETE",
2506            Self::Delete => "DELETE_COMPLETE",
2507        }
2508    }
2509}
2510
2511/// Apply resource updates: delete removed resources, create new ones, and
2512/// in-place update resources whose properties changed. Returns the list of
2513/// changes applied (for event emission) on success or `Err(msg)` if any
2514/// resource operation fails.
2515pub(crate) fn apply_resource_updates(
2516    stack: &mut crate::state::Stack,
2517    new_resource_defs: &[template::ResourceDefinition],
2518    template_body: &str,
2519    parameters: &BTreeMap<String, String>,
2520    provisioner: &crate::resource_provisioner::ResourceProvisioner,
2521    imports: &BTreeMap<String, String>,
2522) -> Result<Vec<ResourceChange>, String> {
2523    let mut changes: Vec<ResourceChange> = Vec::new();
2524    let old_logical_ids: std::collections::HashSet<String> = stack
2525        .resources
2526        .iter()
2527        .map(|r| r.logical_id.clone())
2528        .collect();
2529    let new_logical_ids: std::collections::HashSet<String> = new_resource_defs
2530        .iter()
2531        .map(|r| r.logical_id.clone())
2532        .collect();
2533
2534    // Delete resources no longer in template
2535    let to_remove: Vec<_> = stack
2536        .resources
2537        .iter()
2538        .filter(|r| !new_logical_ids.contains(&r.logical_id))
2539        .cloned()
2540        .collect();
2541    for resource in &to_remove {
2542        let _ = provisioner.delete_resource(resource);
2543        changes.push(ResourceChange {
2544            action: ResourceChangeAction::Delete,
2545            logical_id: resource.logical_id.clone(),
2546            physical_id: resource.physical_id.clone(),
2547            resource_type: resource.resource_type.clone(),
2548        });
2549    }
2550    stack
2551        .resources
2552        .retain(|r| new_logical_ids.contains(&r.logical_id));
2553
2554    // Build physical ID + attribute maps from existing resources
2555    let mut physical_ids: BTreeMap<String, String> = stack
2556        .resources
2557        .iter()
2558        .map(|r| (r.logical_id.clone(), r.physical_id.clone()))
2559        .collect();
2560    let mut attributes: BTreeMap<String, BTreeMap<String, String>> = stack
2561        .resources
2562        .iter()
2563        .map(|r| (r.logical_id.clone(), r.attributes.clone()))
2564        .collect();
2565
2566    // Create new resources / update resources that already exist. Provision in
2567    // dependency order so a `Ref`/`GetAtt`/`Fn::Sub`/`DependsOn` to another
2568    // resource resolves to that resource's physical id rather than its bare
2569    // logical id (which would otherwise get baked into derived state — e.g. a
2570    // Step Functions ASL referencing a Lambda declared later in the template).
2571    let order = template::dependency_order(template_body, parameters, new_resource_defs);
2572    for &idx in &order {
2573        let resource_def = &new_resource_defs[idx];
2574        let resolved_def = template::resolve_resource_properties_with_attrs(
2575            resource_def,
2576            template_body,
2577            parameters,
2578            &physical_ids,
2579            &attributes,
2580            imports,
2581        )
2582        .map_err(|e| {
2583            format!(
2584                "Failed to resolve resource {}: {e}",
2585                resource_def.logical_id
2586            )
2587        })?;
2588
2589        if !old_logical_ids.contains(&resource_def.logical_id) {
2590            match provisioner.create_resource(&resolved_def) {
2591                Ok(stack_resource) => {
2592                    changes.push(ResourceChange {
2593                        action: ResourceChangeAction::Create,
2594                        logical_id: stack_resource.logical_id.clone(),
2595                        physical_id: stack_resource.physical_id.clone(),
2596                        resource_type: stack_resource.resource_type.clone(),
2597                    });
2598                    physical_ids.insert(
2599                        stack_resource.logical_id.clone(),
2600                        stack_resource.physical_id.clone(),
2601                    );
2602                    attributes.insert(
2603                        stack_resource.logical_id.clone(),
2604                        stack_resource.attributes.clone(),
2605                    );
2606                    stack.resources.push(stack_resource);
2607                }
2608                Err(e) => {
2609                    tracing::warn!(
2610                        "Failed to create resource {} during update: {e}",
2611                        resource_def.logical_id
2612                    );
2613                    return Err(format!(
2614                        "Failed to create resource {}: {e}",
2615                        resource_def.logical_id
2616                    ));
2617                }
2618            }
2619        } else {
2620            // Resource exists in both old and new templates — try to apply
2621            // an in-place update. The provisioner returns `Ok(None)` for
2622            // resource types that don't support updates yet; in that case
2623            // the existing resource stays as-is so the rest of the stack
2624            // continues to validate.
2625            let existing = stack
2626                .resources
2627                .iter()
2628                .find(|r| r.logical_id == resource_def.logical_id)
2629                .cloned();
2630            if let Some(existing) = existing {
2631                match provisioner.update_resource(&existing, &resolved_def) {
2632                    Ok(Some(updated)) => {
2633                        changes.push(ResourceChange {
2634                            action: ResourceChangeAction::Update,
2635                            logical_id: updated.logical_id.clone(),
2636                            physical_id: updated.physical_id.clone(),
2637                            resource_type: updated.resource_type.clone(),
2638                        });
2639                        physical_ids
2640                            .insert(updated.logical_id.clone(), updated.physical_id.clone());
2641                        attributes.insert(updated.logical_id.clone(), updated.attributes.clone());
2642                        if let Some(slot) = stack
2643                            .resources
2644                            .iter_mut()
2645                            .find(|r| r.logical_id == updated.logical_id)
2646                        {
2647                            *slot = updated;
2648                        }
2649                    }
2650                    Ok(None) => {
2651                        // Resource type has no update path — leave the
2652                        // existing physical resource untouched.
2653                    }
2654                    Err(e) => {
2655                        tracing::warn!(
2656                            "Failed to update resource {} during update: {e}",
2657                            resource_def.logical_id
2658                        );
2659                        return Err(format!(
2660                            "Failed to update resource {}: {e}",
2661                            resource_def.logical_id
2662                        ));
2663                    }
2664                }
2665            }
2666        }
2667    }
2668
2669    Ok(changes)
2670}
2671
2672/// Pushes a single `StackEvent` row onto the per-stack event log so
2673/// `DescribeStackEvents` returns a chronological history of resource and
2674/// stack-level state transitions.
2675pub(crate) fn record_event(
2676    state: &mut crate::state::CloudFormationState,
2677    stack_id: &str,
2678    stack_name: &str,
2679    logical_id: &str,
2680    physical_id: &str,
2681    resource_type: &str,
2682    status: &str,
2683) {
2684    use serde_json::json;
2685    let event_id = format!(
2686        "{}-{:x}",
2687        logical_id,
2688        std::time::SystemTime::now()
2689            .duration_since(std::time::UNIX_EPOCH)
2690            .map(|d| d.as_nanos())
2691            .unwrap_or(0)
2692    );
2693    let log = state.events.entry(stack_id.to_string()).or_default();
2694
2695    // Timestamps must be sub-second AND strictly increasing within a stack's
2696    // event log. `sam`'s deploy-wait reads the REVIEW_IN_PROGRESS marker's
2697    // timestamp and then only registers events whose `Timestamp` is strictly
2698    // greater; a fast stack that provisions within one second would otherwise
2699    // stamp its REVIEW_IN_PROGRESS marker and terminal CREATE_COMPLETE
2700    // identically, so sam never sees completion and polls until it times out.
2701    // Use millisecond precision and bump by 1ms when the clock hasn't advanced
2702    // past the previous event, guaranteeing a strict order.
2703    // Truncate to the stored (millisecond) resolution before comparing, so the
2704    // strict-ordering check isn't fooled by sub-millisecond bits that vanish on
2705    // serialization (two events in the same millisecond would otherwise both
2706    // serialize identically despite `now > prev` holding at full precision).
2707    let now = chrono::DateTime::from_timestamp_millis(Utc::now().timestamp_millis())
2708        .unwrap_or_else(Utc::now);
2709    let timestamp = match log.last().and_then(|e| e["Timestamp"].as_str()) {
2710        Some(prev) => match chrono::DateTime::parse_from_rfc3339(prev) {
2711            Ok(prev) => {
2712                let prev = prev.with_timezone(&Utc);
2713                if now > prev {
2714                    now
2715                } else {
2716                    prev + chrono::Duration::milliseconds(1)
2717                }
2718            }
2719            Err(_) => now,
2720        },
2721        None => now,
2722    };
2723
2724    log.push(json!({
2725        "EventId": event_id,
2726        "StackId": stack_id,
2727        "StackName": stack_name,
2728        "LogicalResourceId": logical_id,
2729        "PhysicalResourceId": physical_id,
2730        "ResourceType": resource_type,
2731        "ResourceStatus": status,
2732        "Timestamp": timestamp.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
2733    }));
2734}
2735
2736/// Emits IN_PROGRESS + COMPLETE event pairs for every resource change
2737/// applied during an update. Mirrors the event sequence real CloudFormation
2738/// publishes during `ExecuteChangeSet` / `UpdateStack`.
2739/// Persist the CloudFormation snapshot from a detached task. Mirrors
2740/// `CloudFormationService::save_snapshot` but takes owned handles so it can
2741/// run inside the background CreateStack provisioning task (see RDS's
2742/// `save_snapshot_static`).
2743async fn save_snapshot_static(
2744    state: SharedCloudFormationState,
2745    store: Option<Arc<dyn SnapshotStore>>,
2746    lock: Arc<AsyncMutex<()>>,
2747) {
2748    let Some(store) = store else {
2749        return;
2750    };
2751    let _guard = lock.lock().await;
2752    let snapshot = CloudFormationSnapshot {
2753        schema_version: CLOUDFORMATION_SNAPSHOT_SCHEMA_VERSION,
2754        state: None,
2755        accounts: Some(state.read().clone()),
2756    };
2757    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
2758        let bytes = serde_json::to_vec(&snapshot)
2759            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
2760        store.save(&bytes)
2761    })
2762    .await;
2763    match join {
2764        Ok(Ok(())) => {}
2765        Ok(Err(err)) => tracing::error!(%err, "failed to write cloudformation snapshot"),
2766        Err(err) => tracing::error!(%err, "cloudformation snapshot task panicked"),
2767    }
2768}
2769
2770pub(crate) fn record_stack_events(
2771    state: &mut crate::state::CloudFormationState,
2772    stack_id: &str,
2773    stack_name: &str,
2774    changes: &[ResourceChange],
2775) {
2776    for ch in changes {
2777        record_event(
2778            state,
2779            stack_id,
2780            stack_name,
2781            &ch.logical_id,
2782            &ch.physical_id,
2783            &ch.resource_type,
2784            ch.action.status_in_progress(),
2785        );
2786        record_event(
2787            state,
2788            stack_id,
2789            stack_name,
2790            &ch.logical_id,
2791            &ch.physical_id,
2792            &ch.resource_type,
2793            ch.action.status_complete(),
2794        );
2795    }
2796}
2797
2798/// Emits a stack-level lifecycle event (`UPDATE_IN_PROGRESS`,
2799/// `UPDATE_COMPLETE`, `UPDATE_ROLLBACK_COMPLETE`, etc.) keyed on the
2800/// stack's own `LogicalResourceId == stack_name`, matching real CFN.
2801pub(crate) fn record_stack_status_event(
2802    state: &mut crate::state::CloudFormationState,
2803    stack_id: &str,
2804    stack_name: &str,
2805    resource_type: &str,
2806    status: &str,
2807) {
2808    record_event(
2809        state,
2810        stack_id,
2811        stack_name,
2812        stack_name,
2813        stack_id,
2814        resource_type,
2815        status,
2816    );
2817}
2818
2819#[cfg(test)]
2820mod tests {
2821    use super::*;
2822    use http::HeaderMap;
2823    use parking_lot::RwLock;
2824    use std::collections::HashMap;
2825    use std::sync::Arc;
2826
2827    #[test]
2828    fn merge_parameter_defaults_fills_omitted_params() {
2829        // §1.6: a parameter the caller omitted gets its declared Default, so a
2830        // Ref to it resolves to the default instead of the bare param name.
2831        let template = r#"{
2832            "Parameters": {
2833                "InstanceType": {"Type": "String", "Default": "t3.micro"},
2834                "Count": {"Type": "Number", "Default": 3},
2835                "Supplied": {"Type": "String", "Default": "dflt"}
2836            },
2837            "Resources": {}
2838        }"#;
2839        let mut params = BTreeMap::new();
2840        params.insert("Supplied".to_string(), "override".to_string());
2841        CloudFormationService::merge_parameter_defaults(&mut params, template);
2842        assert_eq!(
2843            params.get("InstanceType").map(String::as_str),
2844            Some("t3.micro")
2845        );
2846        assert_eq!(params.get("Count").map(String::as_str), Some("3"));
2847        // A supplied value is not overwritten by the default.
2848        assert_eq!(params.get("Supplied").map(String::as_str), Some("override"));
2849    }
2850
2851    fn make_service() -> CloudFormationService {
2852        let cf_state = Arc::new(RwLock::new(
2853            fakecloud_core::multi_account::MultiAccountState::new(
2854                "123456789012",
2855                "us-east-1",
2856                "http://localhost:4566",
2857            ),
2858        ));
2859        let deps = CloudFormationDeps {
2860            sqs: Arc::new(RwLock::new(
2861                fakecloud_core::multi_account::MultiAccountState::new(
2862                    "123456789012",
2863                    "us-east-1",
2864                    "http://localhost:4566",
2865                ),
2866            )),
2867            sns: Arc::new(RwLock::new(
2868                fakecloud_core::multi_account::MultiAccountState::new(
2869                    "123456789012",
2870                    "us-east-1",
2871                    "http://localhost:4566",
2872                ),
2873            )),
2874            ssm: Arc::new(RwLock::new(
2875                fakecloud_core::multi_account::MultiAccountState::new(
2876                    "123456789012",
2877                    "us-east-1",
2878                    "http://localhost:4566",
2879                ),
2880            )),
2881            iam: Arc::new(RwLock::new(
2882                fakecloud_core::multi_account::MultiAccountState::new(
2883                    "123456789012",
2884                    "us-east-1",
2885                    "",
2886                ),
2887            )),
2888            s3: Arc::new(RwLock::new(
2889                fakecloud_core::multi_account::MultiAccountState::new(
2890                    "123456789012",
2891                    "us-east-1",
2892                    "",
2893                ),
2894            )),
2895            eventbridge: Arc::new(RwLock::new(
2896                fakecloud_core::multi_account::MultiAccountState::new(
2897                    "123456789012",
2898                    "us-east-1",
2899                    "",
2900                ),
2901            )),
2902            dynamodb: Arc::new(RwLock::new(
2903                fakecloud_core::multi_account::MultiAccountState::new(
2904                    "123456789012",
2905                    "us-east-1",
2906                    "",
2907                ),
2908            )),
2909            logs: Arc::new(RwLock::new(
2910                fakecloud_core::multi_account::MultiAccountState::new(
2911                    "123456789012",
2912                    "us-east-1",
2913                    "",
2914                ),
2915            )),
2916            lambda: Arc::new(RwLock::new(
2917                fakecloud_core::multi_account::MultiAccountState::new(
2918                    "123456789012",
2919                    "us-east-1",
2920                    "",
2921                ),
2922            )),
2923            secretsmanager: Arc::new(RwLock::new(
2924                fakecloud_core::multi_account::MultiAccountState::new(
2925                    "123456789012",
2926                    "us-east-1",
2927                    "",
2928                ),
2929            )),
2930            kinesis: Arc::new(RwLock::new(
2931                fakecloud_core::multi_account::MultiAccountState::new(
2932                    "123456789012",
2933                    "us-east-1",
2934                    "",
2935                ),
2936            )),
2937            kms: Arc::new(RwLock::new(
2938                fakecloud_core::multi_account::MultiAccountState::new(
2939                    "123456789012",
2940                    "us-east-1",
2941                    "",
2942                ),
2943            )),
2944            ecr: Arc::new(RwLock::new(
2945                fakecloud_core::multi_account::MultiAccountState::new(
2946                    "123456789012",
2947                    "us-east-1",
2948                    "",
2949                ),
2950            )),
2951            cloudwatch: Arc::new(RwLock::new(fakecloud_cloudwatch::CloudWatchAccounts::new())),
2952            elbv2: Arc::new(RwLock::new(fakecloud_elbv2::Elbv2Accounts::new())),
2953            organizations: Arc::new(RwLock::new(None)),
2954            cognito: Arc::new(RwLock::new(
2955                fakecloud_core::multi_account::MultiAccountState::new(
2956                    "123456789012",
2957                    "us-east-1",
2958                    "",
2959                ),
2960            )),
2961            rds: Arc::new(RwLock::new(
2962                fakecloud_core::multi_account::MultiAccountState::new(
2963                    "123456789012",
2964                    "us-east-1",
2965                    "",
2966                ),
2967            )),
2968            ec2: Arc::new(RwLock::new(
2969                fakecloud_core::multi_account::MultiAccountState::new(
2970                    "123456789012",
2971                    "us-east-1",
2972                    "",
2973                ),
2974            )),
2975            autoscaling: Arc::new(RwLock::new(
2976                fakecloud_autoscaling::AutoScalingAccounts::new(),
2977            )),
2978            batch: Arc::new(RwLock::new(fakecloud_batch::BatchAccounts::new())),
2979            pipes: Arc::new(RwLock::new(fakecloud_pipes::PipesAccounts::new())),
2980            ecs: Arc::new(RwLock::new(
2981                fakecloud_core::multi_account::MultiAccountState::new(
2982                    "123456789012",
2983                    "us-east-1",
2984                    "",
2985                ),
2986            )),
2987            acm: Arc::new(RwLock::new(fakecloud_acm::AcmAccounts::new())),
2988            elasticache: Arc::new(RwLock::new(
2989                fakecloud_core::multi_account::MultiAccountState::new(
2990                    "123456789012",
2991                    "us-east-1",
2992                    "",
2993                ),
2994            )),
2995            route53: Arc::new(RwLock::new(fakecloud_route53::Route53Accounts::new())),
2996            cloudfront: Arc::new(RwLock::new(fakecloud_cloudfront::CloudFrontAccounts::new())),
2997            stepfunctions: Arc::new(RwLock::new(
2998                fakecloud_core::multi_account::MultiAccountState::new(
2999                    "123456789012",
3000                    "us-east-1",
3001                    "",
3002                ),
3003            )),
3004            wafv2: Arc::new(RwLock::new(fakecloud_wafv2::Wafv2Accounts::default())),
3005            apigateway: Arc::new(RwLock::new(
3006                fakecloud_core::multi_account::MultiAccountState::new(
3007                    "123456789012",
3008                    "us-east-1",
3009                    "",
3010                ),
3011            )),
3012            apigatewayv2: Arc::new(RwLock::new(
3013                fakecloud_core::multi_account::MultiAccountState::new(
3014                    "123456789012",
3015                    "us-east-1",
3016                    "",
3017                ),
3018            )),
3019            ses: Arc::new(RwLock::new(
3020                fakecloud_core::multi_account::MultiAccountState::new(
3021                    "123456789012",
3022                    "us-east-1",
3023                    "",
3024                ),
3025            )),
3026            application_autoscaling: Arc::new(parking_lot::RwLock::new(
3027                fakecloud_application_autoscaling::ApplicationAutoScalingAccounts::new(),
3028            )),
3029            athena: Arc::new(parking_lot::RwLock::new(
3030                fakecloud_athena::AthenaAccounts::new(),
3031            )),
3032            firehose: Arc::new(parking_lot::RwLock::new(
3033                fakecloud_firehose::FirehoseAccounts::new(),
3034            )),
3035            glue: Arc::new(parking_lot::RwLock::new(fakecloud_glue::GlueAccounts::new())),
3036            delivery: Arc::new(DeliveryBus::new()),
3037            lambda_runtime: None,
3038            rds_runtime: None,
3039            ec2_runtime: None,
3040            ecs_runtime: None,
3041            elasticache_runtime: None,
3042        };
3043        CloudFormationService::new(cf_state, deps)
3044    }
3045
3046    fn make_request(action: &str, params: HashMap<String, String>) -> AwsRequest {
3047        AwsRequest {
3048            service: "cloudformation".to_string(),
3049            action: action.to_string(),
3050            region: "us-east-1".to_string(),
3051            account_id: "123456789012".to_string(),
3052            request_id: "test-request-id".to_string(),
3053            headers: HeaderMap::new(),
3054            query_params: params,
3055            body: bytes::Bytes::new(),
3056            body_stream: parking_lot::Mutex::new(None),
3057            path_segments: vec![],
3058            raw_path: "/".to_string(),
3059            raw_query: String::new(),
3060            method: http::Method::POST,
3061            is_query_protocol: true,
3062            access_key_id: None,
3063            principal: None,
3064        }
3065    }
3066
3067    #[tokio::test]
3068    async fn update_stack_sets_failed_status_on_resource_error() {
3069        let svc = make_service();
3070
3071        // Create a stack with just a queue
3072        let mut create_params = HashMap::new();
3073        create_params.insert("StackName".to_string(), "test-stack".to_string());
3074        create_params.insert(
3075            "TemplateBody".to_string(),
3076            r#"{"Resources":{"MyQueue":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"q1"}}}}"#.to_string(),
3077        );
3078        let req = make_request("CreateStack", create_params);
3079        let result = svc.create_stack(&req).await;
3080        assert!(result.is_ok());
3081
3082        // Update stack adding an SNS subscription with a non-existent topic
3083        let mut update_params = HashMap::new();
3084        update_params.insert("StackName".to_string(), "test-stack".to_string());
3085        update_params.insert(
3086            "TemplateBody".to_string(),
3087            r#"{"Resources":{"MyQueue":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"q1"}},"BadSub":{"Type":"AWS::SNS::Subscription","Properties":{"TopicArn":"arn:aws:sns:us-east-1:123456789012:nope","Protocol":"sqs","Endpoint":"arn:aws:sqs:us-east-1:123456789012:q1"}}}}"#.to_string(),
3088        );
3089        let req = make_request("UpdateStack", update_params);
3090        let result = svc.update_stack(&req).await;
3091
3092        // Should return an error
3093        assert!(result.is_err());
3094
3095        // Stack status should be UPDATE_ROLLBACK_COMPLETE — matches the
3096        // terminal status real CloudFormation lands on after a failed
3097        // update attempt that gets rolled back.
3098        let accounts = svc.state.read();
3099        let state = accounts.get("123456789012").unwrap();
3100        let stack = state.stacks.get("test-stack").unwrap();
3101        assert_eq!(stack.status, "UPDATE_ROLLBACK_COMPLETE");
3102    }
3103
3104    #[tokio::test]
3105    async fn create_stack_resolves_ref_to_physical_id() {
3106        let svc = make_service();
3107
3108        // Template where subscription Refs the topic
3109        let template = r#"{
3110            "Resources": {
3111                "MyTopic": {
3112                    "Type": "AWS::SNS::Topic",
3113                    "Properties": { "TopicName": "ref-test-topic" }
3114                },
3115                "MySub": {
3116                    "Type": "AWS::SNS::Subscription",
3117                    "Properties": {
3118                        "TopicArn": { "Ref": "MyTopic" },
3119                        "Protocol": "sqs",
3120                        "Endpoint": "arn:aws:sqs:us-east-1:123456789012:some-queue"
3121                    }
3122                }
3123            }
3124        }"#;
3125
3126        let mut params = HashMap::new();
3127        params.insert("StackName".to_string(), "ref-stack".to_string());
3128        params.insert("TemplateBody".to_string(), template.to_string());
3129        let req = make_request("CreateStack", params);
3130        let result = svc.create_stack(&req).await;
3131        assert!(result.is_ok(), "CreateStack failed: {:?}", result.err());
3132
3133        // Verify both resources were created
3134        let accounts = svc.state.read();
3135        let state = accounts.get("123456789012").unwrap();
3136        let stack = state.stacks.get("ref-stack").unwrap();
3137        assert_eq!(stack.resources.len(), 2);
3138        assert_eq!(stack.status, "CREATE_COMPLETE");
3139
3140        // The subscription's physical ID should be an ARN (not just "MyTopic")
3141        let sub = stack
3142            .resources
3143            .iter()
3144            .find(|r| r.logical_id == "MySub")
3145            .unwrap();
3146        assert!(
3147            sub.physical_id.contains("ref-test-topic"),
3148            "Subscription physical ID should reference the topic ARN, got: {}",
3149            sub.physical_id
3150        );
3151    }
3152
3153    /// On the multi-thread server runtime, a stack containing a custom
3154    /// resource (whose provisioning can block for minutes on a cold Lambda
3155    /// image pull) must NOT be provisioned synchronously inside the request
3156    /// handler — CreateStack returns the StackId immediately and DescribeStacks
3157    /// observes CREATE_IN_PROGRESS -> CREATE_COMPLETE (bug-audit 2026-06-13,
3158    /// 3.1).
3159    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
3160    async fn create_stack_custom_resource_provisions_asynchronously() {
3161        let svc = make_service();
3162        let template = r#"{
3163            "Resources": {
3164                "MyCustom": {
3165                    "Type": "Custom::Thing",
3166                    "Properties": {
3167                        "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:handler"
3168                    }
3169                }
3170            }
3171        }"#;
3172        let mut params = HashMap::new();
3173        params.insert("StackName".to_string(), "async-stack".to_string());
3174        params.insert("TemplateBody".to_string(), template.to_string());
3175        let req = make_request("CreateStack", params);
3176
3177        // CreateStack returns promptly with a StackId; provisioning runs in a
3178        // detached background task. The stack is recorded before the task can
3179        // possibly finish, so right after return it is at worst already
3180        // terminal — never a value that proves the handler blocked on the
3181        // (potentially minutes-long) provisioning loop. The key guarantee is
3182        // that the call returns without running the provisioner inline.
3183        let resp = svc
3184            .create_stack(&req)
3185            .await
3186            .expect("create returns StackId");
3187        assert!(resp.status.is_success());
3188        {
3189            let accounts = svc.state.read();
3190            let stack = accounts
3191                .get("123456789012")
3192                .unwrap()
3193                .stacks
3194                .get("async-stack")
3195                .expect("stack seeded synchronously");
3196            assert!(
3197                stack.status == "CREATE_IN_PROGRESS" || stack.status == "CREATE_COMPLETE",
3198                "unexpected status right after create: {}",
3199                stack.status
3200            );
3201        }
3202
3203        // Poll DescribeStacks (via state) until the background task flips the
3204        // stack to its terminal CREATE_COMPLETE status.
3205        let mut status = String::new();
3206        for _ in 0..200 {
3207            {
3208                let accounts = svc.state.read();
3209                if let Some(stack) = accounts
3210                    .get("123456789012")
3211                    .and_then(|s| s.stacks.get("async-stack"))
3212                {
3213                    status = stack.status.clone();
3214                    if status != "CREATE_IN_PROGRESS" {
3215                        break;
3216                    }
3217                }
3218            }
3219            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3220        }
3221        assert_eq!(
3222            status, "CREATE_COMPLETE",
3223            "stack should reach CREATE_COMPLETE"
3224        );
3225
3226        let accounts = svc.state.read();
3227        let stack = accounts
3228            .get("123456789012")
3229            .unwrap()
3230            .stacks
3231            .get("async-stack")
3232            .unwrap();
3233        assert_eq!(stack.resources.len(), 1);
3234        assert_eq!(stack.resources[0].resource_type, "Custom::Thing");
3235    }
3236
3237    #[tokio::test]
3238    async fn output_getatt_resolves_well_known_attribute() {
3239        // An Output that GetAtts a well-known attribute the create handler does
3240        // not eagerly capture (SQS QueueUrl) must resolve to the live value, not
3241        // a `Queue.QueueUrl` placeholder. Regression for bug-hunt 2026-06-25 1.11:
3242        // resolve_template_outputs reads StackResource.attributes, which now
3243        // carries the live get_att overlay applied during provisioning.
3244        let svc = make_service();
3245        let template = r#"{
3246            "Resources": {
3247                "Queue": { "Type": "AWS::SQS::Queue", "Properties": { "QueueName": "out-q" } }
3248            },
3249            "Outputs": {
3250                "Url": { "Value": { "Fn::GetAtt": ["Queue", "QueueUrl"] } }
3251            }
3252        }"#;
3253        let mut params = HashMap::new();
3254        params.insert("StackName".to_string(), "out-stack".to_string());
3255        params.insert("TemplateBody".to_string(), template.to_string());
3256        svc.create_stack(&make_request("CreateStack", params))
3257            .await
3258            .expect("create returns StackId");
3259
3260        let mut url = String::new();
3261        for _ in 0..200 {
3262            {
3263                let accounts = svc.state.read();
3264                if let Some(stack) = accounts
3265                    .get("123456789012")
3266                    .and_then(|s| s.stacks.get("out-stack"))
3267                {
3268                    if stack.status != "CREATE_IN_PROGRESS" {
3269                        url = stack
3270                            .outputs
3271                            .iter()
3272                            .find(|o| o.key == "Url")
3273                            .map(|o| o.value.clone())
3274                            .unwrap_or_default();
3275                        break;
3276                    }
3277                }
3278            }
3279            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3280        }
3281        assert!(
3282            url.contains("out-q") && url != "Queue.QueueUrl",
3283            "GetAtt QueueUrl output should resolve to the live url, got {url:?}"
3284        );
3285    }
3286
3287    // ── Service error paths ──
3288
3289    #[tokio::test]
3290    async fn create_stack_missing_name_errors() {
3291        let svc = make_service();
3292        let mut params = HashMap::new();
3293        params.insert("TemplateBody".to_string(), "{}".to_string());
3294        let req = make_request("CreateStack", params);
3295        assert!(svc.create_stack(&req).await.is_err());
3296    }
3297
3298    #[tokio::test]
3299    async fn create_stack_missing_template_creates_empty_stack() {
3300        // `TemplateBody` isn't `@required` in Smithy and CreateStack
3301        // declares no `ValidationError` shape, so missing/placeholder
3302        // bodies now create an empty stack rather than rejecting with
3303        // an undeclared wire code (strict-mode conformance gap).
3304        let svc = make_service();
3305        let mut params = HashMap::new();
3306        params.insert("StackName".to_string(), "s".to_string());
3307        let req = make_request("CreateStack", params);
3308        svc.create_stack(&req)
3309            .await
3310            .expect("empty-body create succeeds");
3311    }
3312
3313    #[tokio::test]
3314    async fn create_stack_duplicate_errors() {
3315        let svc = make_service();
3316        let mut params = HashMap::new();
3317        params.insert("StackName".to_string(), "dup".to_string());
3318        params.insert(
3319            "TemplateBody".to_string(),
3320            r#"{"Resources":{"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"dq"}}}}"#
3321                .to_string(),
3322        );
3323        let req = make_request("CreateStack", params.clone());
3324        svc.create_stack(&req).await.unwrap();
3325        let req = make_request("CreateStack", params);
3326        assert!(svc.create_stack(&req).await.is_err());
3327    }
3328
3329    #[tokio::test]
3330    async fn create_stack_invalid_template_creates_empty_stack() {
3331        // CreateStack's Smithy `errors` list has no `ValidationError`
3332        // shape, so unparseable bodies degrade to an empty parsed
3333        // template instead of raising an undeclared wire code.
3334        let svc = make_service();
3335        let mut params = HashMap::new();
3336        params.insert("StackName".to_string(), "bad".to_string());
3337        params.insert("TemplateBody".to_string(), "not json".to_string());
3338        let req = make_request("CreateStack", params);
3339        svc.create_stack(&req)
3340            .await
3341            .expect("bad-body create succeeds");
3342    }
3343
3344    #[tokio::test]
3345    async fn delete_stack_unknown_is_noop() {
3346        let svc = make_service();
3347        let mut params = HashMap::new();
3348        params.insert("StackName".to_string(), "ghost".to_string());
3349        let req = make_request("DeleteStack", params);
3350        assert!(svc.delete_stack(&req).await.is_ok());
3351    }
3352
3353    #[test]
3354    fn describe_stacks_nonexistent_errors() {
3355        // Querying an explicit, unknown StackName returns AWS's
3356        // `ValidationError: Stack with id <name> does not exist` so deploy
3357        // tools that probe stack existence (SAM, `aws cloudformation
3358        // deploy`) get the signal they expect (issue #1646).
3359        let svc = make_service();
3360        let mut params = HashMap::new();
3361        params.insert("StackName".to_string(), "ghost".to_string());
3362        let req = make_request("DescribeStacks", params);
3363        match svc.describe_stacks(&req) {
3364            Ok(_) => panic!("ghost stack must return an error, not an empty list"),
3365            Err(e) => {
3366                assert_eq!(e.status(), StatusCode::BAD_REQUEST);
3367                assert_eq!(e.code(), "ValidationError");
3368                assert!(
3369                    e.message().contains("does not exist"),
3370                    "got: {}",
3371                    e.message()
3372                );
3373            }
3374        }
3375    }
3376
3377    #[test]
3378    fn describe_stacks_empty_returns_all() {
3379        let svc = make_service();
3380        let req = make_request("DescribeStacks", HashMap::new());
3381        let resp = svc.describe_stacks(&req).unwrap();
3382        let b = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
3383        assert!(b.contains("DescribeStacksResult"));
3384    }
3385
3386    #[test]
3387    fn list_stacks_empty_returns_ok() {
3388        let svc = make_service();
3389        let req = make_request("ListStacks", HashMap::new());
3390        let resp = svc.list_stacks(&req).unwrap();
3391        let b = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
3392        assert!(b.contains("ListStacksResult"));
3393    }
3394
3395    #[test]
3396    fn list_stack_resources_missing_name_returns_validation_error() {
3397        // ListStackResources declares no `errors` in Smithy, so any
3398        // AWS-shaped 4xx counts as a handler response. We reject an
3399        // omitted StackName with `ValidationError` to keep negative
3400        // conformance variants honest; unknown-but-supplied names still
3401        // resolve to an empty list (see the test below).
3402        let svc = make_service();
3403        let req = make_request("ListStackResources", HashMap::new());
3404        let err = match svc.list_stack_resources(&req) {
3405            Err(e) => e,
3406            Ok(_) => panic!("omitted StackName must be rejected"),
3407        };
3408        assert_eq!(err.code(), "ValidationError");
3409    }
3410
3411    #[test]
3412    fn list_stack_resources_unknown_stack_returns_empty() {
3413        let svc = make_service();
3414        let mut params = HashMap::new();
3415        params.insert("StackName".to_string(), "ghost".to_string());
3416        let req = make_request("ListStackResources", params);
3417        svc.list_stack_resources(&req).expect("unknown is empty");
3418    }
3419
3420    #[test]
3421    fn describe_stack_resources_missing_name_returns_empty() {
3422        let svc = make_service();
3423        let req = make_request("DescribeStackResources", HashMap::new());
3424        svc.describe_stack_resources(&req)
3425            .expect("missing name is ok");
3426    }
3427
3428    #[test]
3429    fn get_template_missing_name_returns_empty_body() {
3430        let svc = make_service();
3431        let req = make_request("GetTemplate", HashMap::new());
3432        svc.get_template(&req).expect("missing name is ok");
3433    }
3434
3435    #[test]
3436    fn get_template_unknown_stack_returns_empty_body() {
3437        let svc = make_service();
3438        let mut params = HashMap::new();
3439        params.insert("StackName".to_string(), "ghost".to_string());
3440        let req = make_request("GetTemplate", params);
3441        svc.get_template(&req).expect("unknown is empty");
3442    }
3443
3444    #[tokio::test]
3445    async fn update_stack_missing_name_errors() {
3446        let svc = make_service();
3447        let mut params = HashMap::new();
3448        params.insert("TemplateBody".to_string(), "{}".to_string());
3449        let req = make_request("UpdateStack", params);
3450        assert!(svc.update_stack(&req).await.is_err());
3451    }
3452
3453    #[tokio::test]
3454    async fn update_stack_unknown_stack_returns_synthetic_id() {
3455        // UpdateStack declares only `InsufficientCapabilitiesException`
3456        // and `TokenAlreadyExistsException`, neither of which fits
3457        // "stack does not exist". Synthetic conformance inputs target
3458        // a placeholder stack, so we return a synthetic StackId rather
3459        // than an undeclared `ValidationError`. Real callers create
3460        // the stack first.
3461        let svc = make_service();
3462        let mut params = HashMap::new();
3463        params.insert("StackName".to_string(), "ghost".to_string());
3464        params.insert(
3465            "TemplateBody".to_string(),
3466            r#"{"Resources":{}}"#.to_string(),
3467        );
3468        let req = make_request("UpdateStack", params);
3469        let resp = svc
3470            .update_stack(&req)
3471            .await
3472            .expect("ghost update is synthetic");
3473        let b = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
3474        assert!(b.contains("UpdateStackResult"));
3475    }
3476
3477    #[tokio::test]
3478    async fn create_stack_resolves_outputs_and_records_export() {
3479        let svc = make_service();
3480        let template = r#"{
3481            "Resources": {
3482                "Q": {"Type":"AWS::SQS::Queue","Properties":{"QueueName":"out-q"}}
3483            },
3484            "Outputs": {
3485                "QueueUrl": {
3486                    "Value": {"Ref": "Q"},
3487                    "Description": "Url",
3488                    "Export": {"Name": "TheQueueUrl"}
3489                }
3490            }
3491        }"#;
3492        let mut params = HashMap::new();
3493        params.insert("StackName".to_string(), "outs".to_string());
3494        params.insert("TemplateBody".to_string(), template.to_string());
3495        let req = make_request("CreateStack", params);
3496        svc.create_stack(&req).await.expect("create stack");
3497
3498        let accounts = svc.state.read();
3499        let stack = accounts
3500            .get("123456789012")
3501            .unwrap()
3502            .stacks
3503            .get("outs")
3504            .unwrap();
3505        assert_eq!(stack.outputs.len(), 1);
3506        assert_eq!(stack.outputs[0].key, "QueueUrl");
3507        assert_eq!(stack.outputs[0].export_name.as_deref(), Some("TheQueueUrl"));
3508        assert!(!stack.outputs[0].value.is_empty());
3509    }
3510
3511    #[tokio::test]
3512    async fn create_stack_rejects_duplicate_export_name() {
3513        let svc = make_service();
3514        let mk = |name: &str| {
3515            let template = format!(
3516                r#"{{
3517                    "Resources": {{"Q":{{"Type":"AWS::SQS::Queue","Properties":{{"QueueName":"q-{name}"}}}}}},
3518                    "Outputs": {{"QueueUrl":{{"Value":{{"Ref":"Q"}},"Export":{{"Name":"DupExport"}}}}}}
3519                }}"#
3520            );
3521            let mut params = HashMap::new();
3522            params.insert("StackName".to_string(), name.to_string());
3523            params.insert("TemplateBody".to_string(), template);
3524            make_request("CreateStack", params)
3525        };
3526        match svc.create_stack(&mk("first")).await {
3527            Ok(_) => {}
3528            Err(e) => panic!("first stack: {e:?}"),
3529        }
3530        // The second stack's export collides with the first. Since
3531        // provisioning is now asynchronous, the collision can no longer be a
3532        // synchronous CreateStack error — it surfaces as a failed create.
3533        // On the current-thread test runtime the provisioning task runs
3534        // inline, so the stack is already CREATE_FAILED on return.
3535        svc.create_stack(&mk("second"))
3536            .await
3537            .expect("CreateStack returns StackId even when provisioning fails");
3538        let accounts = svc.state.read();
3539        let stack = accounts
3540            .get("123456789012")
3541            .unwrap()
3542            .stacks
3543            .get("second")
3544            .expect("second stack recorded");
3545        assert_eq!(stack.status, "CREATE_FAILED");
3546        // The first stack keeps the export.
3547        let exports = &accounts.get("123456789012").unwrap().exports;
3548        assert_eq!(
3549            exports
3550                .get("DupExport")
3551                .map(|e| e.exporting_stack_name.as_str()),
3552            Some("first")
3553        );
3554    }
3555
3556    #[tokio::test]
3557    async fn import_value_resolves_against_other_stack_export() {
3558        let svc = make_service();
3559
3560        let producer_tpl = r#"{
3561            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"prod-q"}}},
3562            "Outputs": {"Out":{"Value":{"Ref":"Q"},"Export":{"Name":"SharedQueueUrl"}}}
3563        }"#;
3564        let mut p = HashMap::new();
3565        p.insert("StackName".to_string(), "producer".to_string());
3566        p.insert("TemplateBody".to_string(), producer_tpl.to_string());
3567        svc.create_stack(&make_request("CreateStack", p))
3568            .await
3569            .expect("producer");
3570
3571        let consumer_tpl = r#"{
3572            "Resources": {"Q2":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"cons-q"}}},
3573            "Outputs": {"Imp":{"Value":{"Fn::ImportValue":"SharedQueueUrl"}}}
3574        }"#;
3575        let mut p = HashMap::new();
3576        p.insert("StackName".to_string(), "consumer".to_string());
3577        p.insert("TemplateBody".to_string(), consumer_tpl.to_string());
3578        svc.create_stack(&make_request("CreateStack", p))
3579            .await
3580            .expect("consumer");
3581
3582        let accounts = svc.state.read();
3583        let prod_url = accounts
3584            .get("123456789012")
3585            .unwrap()
3586            .stacks
3587            .get("producer")
3588            .unwrap()
3589            .outputs[0]
3590            .value
3591            .clone();
3592        let cons = accounts
3593            .get("123456789012")
3594            .unwrap()
3595            .stacks
3596            .get("consumer")
3597            .unwrap();
3598        assert_eq!(cons.outputs[0].value, prod_url);
3599    }
3600
3601    #[tokio::test]
3602    async fn create_stack_records_export_in_state_registry() {
3603        let svc = make_service();
3604        let template = r#"{
3605            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"reg-q"}}},
3606            "Outputs": {"Url":{"Value":{"Ref":"Q"},"Export":{"Name":"reg-url"}}}
3607        }"#;
3608        let mut params = HashMap::new();
3609        params.insert("StackName".to_string(), "reg".to_string());
3610        params.insert("TemplateBody".to_string(), template.to_string());
3611        svc.create_stack(&make_request("CreateStack", params))
3612            .await
3613            .expect("create");
3614
3615        let accounts = svc.state.read();
3616        let state = accounts.get("123456789012").unwrap();
3617        let export = state
3618            .exports
3619            .get("reg-url")
3620            .expect("export registered in state.exports");
3621        assert_eq!(export.exporting_stack_name, "reg");
3622        assert!(!export.value.is_empty());
3623        assert!(export.exporting_stack_id.contains("reg"));
3624    }
3625
3626    #[tokio::test]
3627    async fn import_value_with_unknown_export_errors() {
3628        let svc = make_service();
3629        let consumer_tpl = r#"{
3630            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{
3631                "QueueName": {"Fn::ImportValue":"missing-export"}
3632            }}}
3633        }"#;
3634        let mut p = HashMap::new();
3635        p.insert("StackName".to_string(), "bad-consumer".to_string());
3636        p.insert("TemplateBody".to_string(), consumer_tpl.to_string());
3637        match svc.create_stack(&make_request("CreateStack", p)).await {
3638            Ok(_) => panic!("expected ValidationError for unknown export"),
3639            Err(e) => {
3640                let msg = format!("{e:?}");
3641                assert!(msg.contains("No export named missing-export"), "got {msg}");
3642            }
3643        }
3644    }
3645
3646    #[tokio::test]
3647    async fn delete_stack_blocked_when_export_in_use_and_unblocked_after_consumer_delete() {
3648        let svc = make_service();
3649
3650        let producer_tpl = r#"{
3651            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"prod"}}},
3652            "Outputs": {"Out":{"Value":{"Ref":"Q"},"Export":{"Name":"my-arn"}}}
3653        }"#;
3654        let mut p = HashMap::new();
3655        p.insert("StackName".to_string(), "producer".to_string());
3656        p.insert("TemplateBody".to_string(), producer_tpl.to_string());
3657        svc.create_stack(&make_request("CreateStack", p))
3658            .await
3659            .expect("producer");
3660
3661        let consumer_tpl = r#"{
3662            "Resources": {"Q2":{"Type":"AWS::SQS::Queue","Properties":{
3663                "QueueName": "cons-q",
3664                "Tags": [{"Key":"k","Value":{"Fn::ImportValue":"my-arn"}}]
3665            }}}
3666        }"#;
3667        let mut p = HashMap::new();
3668        p.insert("StackName".to_string(), "consumer".to_string());
3669        p.insert("TemplateBody".to_string(), consumer_tpl.to_string());
3670        svc.create_stack(&make_request("CreateStack", p))
3671            .await
3672            .expect("consumer");
3673
3674        // Producer delete must fail while consumer still imports.
3675        let mut p = HashMap::new();
3676        p.insert("StackName".to_string(), "producer".to_string());
3677        match svc.delete_stack(&make_request("DeleteStack", p)).await {
3678            Ok(_) => panic!("delete must fail while imports exist"),
3679            Err(e) => {
3680                let msg = format!("{e:?}");
3681                assert!(msg.contains("Export my-arn cannot be deleted"), "got {msg}");
3682            }
3683        }
3684
3685        // Delete consumer first.
3686        let mut p = HashMap::new();
3687        p.insert("StackName".to_string(), "consumer".to_string());
3688        svc.delete_stack(&make_request("DeleteStack", p))
3689            .await
3690            .expect("consumer delete");
3691
3692        // Now producer delete succeeds.
3693        let mut p = HashMap::new();
3694        p.insert("StackName".to_string(), "producer".to_string());
3695        svc.delete_stack(&make_request("DeleteStack", p))
3696            .await
3697            .expect("producer delete after consumer gone");
3698
3699        let accounts = svc.state.read();
3700        let state = accounts.get("123456789012").unwrap();
3701        assert!(state.exports.is_empty(), "exports cleared after delete");
3702        assert!(state.imports.is_empty(), "imports cleared after delete");
3703    }
3704
3705    // ---- CFN provisioner persistence (issue: CFN resources lost on restart) ----
3706
3707    use std::sync::atomic::{AtomicUsize, Ordering};
3708
3709    /// A snapshot hook that counts how many times it fires, standing in for a
3710    /// real service's whole-state persist.
3711    fn counting_hook(counter: Arc<AtomicUsize>) -> fakecloud_persistence::SnapshotHook {
3712        Arc::new(move || {
3713            let counter = counter.clone();
3714            Box::pin(async move {
3715                counter.fetch_add(1, Ordering::SeqCst);
3716            })
3717        })
3718    }
3719
3720    fn disk_s3_store(tmp: &tempfile::TempDir) -> Arc<fakecloud_persistence::s3::DiskS3Store> {
3721        let cache = Arc::new(fakecloud_persistence::cache::BodyCache::new(1024 * 1024));
3722        Arc::new(fakecloud_persistence::s3::DiskS3Store::new(
3723            tmp.path().to_path_buf(),
3724            cache,
3725        ))
3726    }
3727
3728    // A stack touching SQS + SNS (snapshot-backed) and an S3 bucket (S3Store
3729    // write-through). Lambda is registered as a hook below but NOT in the
3730    // template, so it must not fire -- proving per-service selectivity.
3731    const PERSIST_TEMPLATE: &str = r#"{"Resources":{
3732        "Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"cfn-q"}},
3733        "T":{"Type":"AWS::SNS::Topic","Properties":{"TopicName":"cfn-t"}},
3734        "B":{"Type":"AWS::S3::Bucket","Properties":{"BucketName":"cfn-bucket"}}
3735    }}"#;
3736
3737    fn create_req(stack: &str) -> AwsRequest {
3738        let mut p = HashMap::new();
3739        p.insert("StackName".to_string(), stack.to_string());
3740        p.insert("TemplateBody".to_string(), PERSIST_TEMPLATE.to_string());
3741        make_request("CreateStack", p)
3742    }
3743
3744    #[tokio::test]
3745    async fn cfn_create_persists_touched_services_and_writes_bucket_to_store() {
3746        let tmp = tempfile::tempdir().unwrap();
3747        let store = disk_s3_store(&tmp);
3748        let counter = Arc::new(AtomicUsize::new(0));
3749        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3750            BTreeMap::new();
3751        hooks.insert("sqs", counting_hook(counter.clone()));
3752        hooks.insert("sns", counting_hook(counter.clone()));
3753        // Registered but not in the template -> must not fire.
3754        hooks.insert("lambda", counting_hook(counter.clone()));
3755        let svc = make_service()
3756            .with_s3_store(store.clone())
3757            .with_snapshot_hooks(hooks);
3758
3759        svc.create_stack(&create_req("probe")).await.unwrap();
3760
3761        // sqs + sns fired once each; lambda untouched.
3762        assert_eq!(counter.load(Ordering::SeqCst), 2);
3763        // The bucket was written through to the S3 store, not just the in-memory map.
3764        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3765        assert!(
3766            loaded.buckets.contains_key("cfn-bucket"),
3767            "CFN bucket should be persisted to the S3 store"
3768        );
3769    }
3770
3771    #[tokio::test]
3772    async fn cfn_delete_persists_touched_services_and_removes_bucket_from_store() {
3773        let tmp = tempfile::tempdir().unwrap();
3774        let store = disk_s3_store(&tmp);
3775        let counter = Arc::new(AtomicUsize::new(0));
3776        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3777            BTreeMap::new();
3778        hooks.insert("sqs", counting_hook(counter.clone()));
3779        hooks.insert("sns", counting_hook(counter.clone()));
3780        let svc = make_service()
3781            .with_s3_store(store.clone())
3782            .with_snapshot_hooks(hooks);
3783
3784        svc.create_stack(&create_req("probe")).await.unwrap();
3785        assert_eq!(counter.load(Ordering::SeqCst), 2, "create fired sqs + sns");
3786
3787        let mut p = HashMap::new();
3788        p.insert("StackName".to_string(), "probe".to_string());
3789        svc.delete_stack(&make_request("DeleteStack", p))
3790            .await
3791            .unwrap();
3792
3793        // Delete fired the touched services again (sqs + sns).
3794        assert_eq!(counter.load(Ordering::SeqCst), 4, "delete fired sqs + sns");
3795        // And the CFN-deleted bucket is gone from the store, so it does not
3796        // reappear after a restart.
3797        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3798        assert!(
3799            !loaded.buckets.contains_key("cfn-bucket"),
3800            "CFN-deleted bucket should be removed from the S3 store"
3801        );
3802    }
3803
3804    #[tokio::test]
3805    async fn cfn_persist_skips_services_without_a_registered_hook() {
3806        // Only "sqs" has a hook; the stack also touches SNS and S3. The missing
3807        // hooks must be silently skipped (no panic), and "sqs" fires once.
3808        let tmp = tempfile::tempdir().unwrap();
3809        let store = disk_s3_store(&tmp);
3810        let counter = Arc::new(AtomicUsize::new(0));
3811        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3812            BTreeMap::new();
3813        hooks.insert("sqs", counting_hook(counter.clone()));
3814        let svc = make_service()
3815            .with_s3_store(store.clone())
3816            .with_snapshot_hooks(hooks);
3817
3818        svc.create_stack(&create_req("probe")).await.unwrap();
3819        assert_eq!(counter.load(Ordering::SeqCst), 1, "only sqs has a hook");
3820    }
3821
3822    #[tokio::test]
3823    async fn cfn_update_persists_touched_services() {
3824        // Create with just SQS, then update to a template that adds SNS + a
3825        // bucket; the update must persist the services it touches.
3826        let tmp = tempfile::tempdir().unwrap();
3827        let store = disk_s3_store(&tmp);
3828        let counter = Arc::new(AtomicUsize::new(0));
3829        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3830            BTreeMap::new();
3831        hooks.insert("sqs", counting_hook(counter.clone()));
3832        hooks.insert("sns", counting_hook(counter.clone()));
3833        let svc = make_service()
3834            .with_s3_store(store.clone())
3835            .with_snapshot_hooks(hooks);
3836
3837        let mut create = HashMap::new();
3838        create.insert("StackName".to_string(), "upd".to_string());
3839        create.insert(
3840            "TemplateBody".to_string(),
3841            r#"{"Resources":{"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"u-q"}}}}"#
3842                .to_string(),
3843        );
3844        svc.create_stack(&make_request("CreateStack", create))
3845            .await
3846            .unwrap();
3847        let after_create = counter.load(Ordering::SeqCst);
3848
3849        let mut update = HashMap::new();
3850        update.insert("StackName".to_string(), "upd".to_string());
3851        update.insert("TemplateBody".to_string(), PERSIST_TEMPLATE.to_string());
3852        svc.update_stack(&make_request("UpdateStack", update))
3853            .await
3854            .unwrap();
3855
3856        // The update touched at least SNS (added); the hook count must grow.
3857        assert!(
3858            counter.load(Ordering::SeqCst) > after_create,
3859            "update should persist the services it touched"
3860        );
3861        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3862        assert!(loaded.buckets.contains_key("cfn-bucket"));
3863    }
3864
3865    #[tokio::test]
3866    async fn cfn_execute_change_set_persists_touched_services() {
3867        // The changeset path -- CreateChangeSet + ExecuteChangeSet -- is how
3868        // `cdk deploy`, `aws cloudformation deploy`, and SAM provision. It must
3869        // write provisioned services through to disk the same way CreateStack
3870        // does, or the resources report CREATE_COMPLETE yet vanish on restart
3871        // (bug-audit 2026-06-20, 0.A1 / #1766 class).
3872        let tmp = tempfile::tempdir().unwrap();
3873        let store = disk_s3_store(&tmp);
3874        let counter = Arc::new(AtomicUsize::new(0));
3875        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3876            BTreeMap::new();
3877        hooks.insert("sqs", counting_hook(counter.clone()));
3878        let svc = make_service()
3879            .with_s3_store(store.clone())
3880            .with_snapshot_hooks(hooks);
3881
3882        let mut create = HashMap::new();
3883        create.insert("StackName".to_string(), "cs-stack".to_string());
3884        create.insert("ChangeSetName".to_string(), "cs1".to_string());
3885        create.insert("ChangeSetType".to_string(), "CREATE".to_string());
3886        create.insert(
3887            "TemplateBody".to_string(),
3888            r#"{"Resources":{"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"cs-q"}}}}"#
3889                .to_string(),
3890        );
3891        svc.handle(make_request("CreateChangeSet", create))
3892            .await
3893            .unwrap();
3894        // CreateChangeSet doesn't provision -- it must not persist a service yet.
3895        let before = counter.load(Ordering::SeqCst);
3896
3897        let mut exec = HashMap::new();
3898        exec.insert("StackName".to_string(), "cs-stack".to_string());
3899        exec.insert("ChangeSetName".to_string(), "cs1".to_string());
3900        svc.handle(make_request("ExecuteChangeSet", exec))
3901            .await
3902            .unwrap();
3903
3904        assert!(
3905            counter.load(Ordering::SeqCst) > before,
3906            "ExecuteChangeSet must fire the sqs snapshot hook so the provisioned \
3907             queue survives a restart"
3908        );
3909    }
3910
3911    #[test]
3912    fn service_key_for_type_maps_services_and_aliases() {
3913        // Direct service segments.
3914        assert_eq!(
3915            service_key_for_type("AWS::Lambda::Function"),
3916            Some("lambda")
3917        );
3918        assert_eq!(
3919            service_key_for_type("AWS::SecretsManager::Secret"),
3920            Some("secretsmanager")
3921        );
3922        assert_eq!(service_key_for_type("AWS::SQS::Queue"), Some("sqs"));
3923        assert_eq!(service_key_for_type("AWS::IAM::Role"), Some("iam"));
3924        assert_eq!(
3925            service_key_for_type("AWS::StepFunctions::StateMachine"),
3926            Some("stepfunctions")
3927        );
3928        // Namespace aliases that differ from the fakecloud service name.
3929        assert_eq!(
3930            service_key_for_type("AWS::Events::Rule"),
3931            Some("eventbridge")
3932        );
3933        assert_eq!(service_key_for_type("AWS::Logs::LogGroup"), Some("logs"));
3934        assert_eq!(
3935            service_key_for_type("AWS::ElastiCache::CacheCluster"),
3936            Some("elasticache")
3937        );
3938        // S3 has no snapshot hook (it persists via the S3Store write-through).
3939        assert_eq!(service_key_for_type("AWS::S3::Bucket"), None);
3940        // Snapshot-backed services whose CFN namespace differs from the
3941        // fakecloud service name (these were missing from the map, #1766 class).
3942        assert_eq!(
3943            service_key_for_type("AWS::CertificateManager::Certificate"),
3944            Some("acm")
3945        );
3946        assert_eq!(
3947            service_key_for_type("AWS::ElasticLoadBalancingV2::LoadBalancer"),
3948            Some("elbv2")
3949        );
3950        assert_eq!(
3951            service_key_for_type("AWS::CloudFront::Distribution"),
3952            Some("cloudfront")
3953        );
3954        assert_eq!(
3955            service_key_for_type("AWS::Route53::HostedZone"),
3956            Some("route53")
3957        );
3958        assert_eq!(
3959            service_key_for_type("AWS::KinesisFirehose::DeliveryStream"),
3960            Some("firehose")
3961        );
3962        assert_eq!(service_key_for_type("AWS::Glue::Database"), Some("glue"));
3963        assert_eq!(service_key_for_type("AWS::WAFv2::WebACL"), Some("wafv2"));
3964        assert_eq!(
3965            service_key_for_type("AWS::Athena::WorkGroup"),
3966            Some("athena")
3967        );
3968        assert_eq!(
3969            service_key_for_type("AWS::Organizations::Organization"),
3970            Some("organizations")
3971        );
3972        // Malformed / non-AWS types.
3973        assert_eq!(service_key_for_type("AWS::Lambda"), None);
3974        assert_eq!(service_key_for_type("Custom::Thing::Resource"), None);
3975        assert_eq!(service_key_for_type("AWS"), None);
3976        assert_eq!(service_key_for_type(""), None);
3977    }
3978
3979    #[tokio::test]
3980    async fn persist_touched_services_noop_with_empty_hooks() {
3981        // No registered hooks -> nothing to do, must not panic.
3982        let hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> = BTreeMap::new();
3983        persist_touched_services(&hooks, vec!["AWS::SQS::Queue".to_string()]).await;
3984    }
3985
3986    #[tokio::test]
3987    async fn cfn_bucket_policy_write_through_create_update_delete() {
3988        let tmp = tempfile::tempdir().unwrap();
3989        let store = disk_s3_store(&tmp);
3990        let svc = make_service().with_s3_store(store.clone());
3991
3992        // Create a bucket + bucket policy.
3993        let mut create = HashMap::new();
3994        create.insert("StackName".to_string(), "pol".to_string());
3995        create.insert(
3996            "TemplateBody".to_string(),
3997            r#"{"Resources":{
3998                "B":{"Type":"AWS::S3::Bucket","Properties":{"BucketName":"pol-bucket"}},
3999                "BP":{"Type":"AWS::S3::BucketPolicy","Properties":{"Bucket":"pol-bucket","PolicyDocument":{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:GetObject","Resource":"*","Principal":"*"}]}}}
4000            }}"#
4001            .to_string(),
4002        );
4003        svc.create_stack(&make_request("CreateStack", create))
4004            .await
4005            .unwrap();
4006        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
4007        let policy = loaded.buckets["pol-bucket"]
4008            .subresources
4009            .get("policy.toml")
4010            .cloned()
4011            .expect("bucket policy persisted on create");
4012        assert!(policy.contains("s3:GetObject"));
4013
4014        // Update the policy document; the update must write through.
4015        let mut update = HashMap::new();
4016        update.insert("StackName".to_string(), "pol".to_string());
4017        update.insert(
4018            "TemplateBody".to_string(),
4019            r#"{"Resources":{
4020                "B":{"Type":"AWS::S3::Bucket","Properties":{"BucketName":"pol-bucket"}},
4021                "BP":{"Type":"AWS::S3::BucketPolicy","Properties":{"Bucket":"pol-bucket","PolicyDocument":{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:PutObject","Resource":"*","Principal":"*"}]}}}
4022            }}"#
4023            .to_string(),
4024        );
4025        svc.update_stack(&make_request("UpdateStack", update))
4026            .await
4027            .unwrap();
4028        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
4029        let policy = loaded.buckets["pol-bucket"]
4030            .subresources
4031            .get("policy.toml")
4032            .cloned()
4033            .expect("bucket policy still persisted after update");
4034        assert!(
4035            policy.contains("s3:PutObject"),
4036            "updated policy should be written through"
4037        );
4038
4039        // Delete the stack; the bucket (and its policy) must be removed from disk.
4040        let mut del = HashMap::new();
4041        del.insert("StackName".to_string(), "pol".to_string());
4042        svc.delete_stack(&make_request("DeleteStack", del))
4043            .await
4044            .unwrap();
4045        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
4046        assert!(
4047            !loaded.buckets.contains_key("pol-bucket"),
4048            "CFN-deleted bucket and policy should be gone from the store"
4049        );
4050    }
4051}