Skip to main content

fakecloud_cloudformation/
service.rs

1use async_trait::async_trait;
2use chrono::Utc;
3use http::StatusCode;
4use std::collections::{BTreeMap, BTreeSet};
5use std::sync::Arc;
6
7use fakecloud_core::delivery::DeliveryBus;
8use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
9use fakecloud_dynamodb::SharedDynamoDbState;
10use fakecloud_eventbridge::SharedEventBridgeState;
11use fakecloud_iam::SharedIamState;
12use fakecloud_logs::SharedLogsState;
13use fakecloud_persistence::{S3Store, SnapshotHook, SnapshotStore};
14use fakecloud_s3::SharedS3State;
15use fakecloud_sns::SharedSnsState;
16use fakecloud_sqs::SharedSqsState;
17use fakecloud_ssm::SharedSsmState;
18use tokio::sync::Mutex as AsyncMutex;
19
20use crate::resource_provisioner::ResourceProvisioner;
21use crate::state;
22use crate::state::{
23    CloudFormationSnapshot, CloudFormationState, SharedCloudFormationState, Stack, StackResource,
24    CLOUDFORMATION_SNAPSHOT_SCHEMA_VERSION,
25};
26use crate::template;
27use crate::xml_responses;
28
29/// Canonical `Fn::GetAtt` attribute names per resource type. Used to
30/// supplement the eagerly-captured `ProvisionResult::attributes` with
31/// live-state lookups via `ResourceProvisioner::get_att`. Resources not
32/// listed here just use whatever the create handler captured.
33fn well_known_attributes_for(resource_type: &str) -> &'static [&'static str] {
34    match resource_type {
35        "AWS::S3::Bucket" => &[
36            "Arn",
37            "DomainName",
38            "RegionalDomainName",
39            "DualStackDomainName",
40            "WebsiteURL",
41        ],
42        "AWS::Lambda::Function" => &["Arn", "FunctionUrl", "Version"],
43        "AWS::IAM::Role" => &["Arn", "RoleId"],
44        "AWS::SQS::Queue" => &["Arn", "QueueName", "QueueUrl"],
45        "AWS::SNS::Topic" => &["TopicArn", "TopicName"],
46        "AWS::DynamoDB::Table" => &["Arn", "StreamArn"],
47        "AWS::KMS::Key" => &["Arn", "KeyId"],
48        "AWS::SecretsManager::Secret" => &["Arn", "Id"],
49        "AWS::CloudFront::Distribution" => &["DomainName", "Id"],
50        "AWS::EC2::VPC" => &["VpcId", "CidrBlock"],
51        "AWS::EC2::Subnet" => &["SubnetId", "AvailabilityZone"],
52        "AWS::EC2::SecurityGroup" => &["GroupId", "VpcId"],
53        "AWS::EC2::InternetGateway" => &["InternetGatewayId"],
54        "AWS::EC2::RouteTable" => &["RouteTableId"],
55        _ => &[],
56    }
57}
58
59/// Map a CloudFormation resource type (`AWS::<Service>::<Resource>`) to the
60/// snapshot-hook key for its owning service (the keys of
61/// `CloudFormationDeps::snapshot_hooks`). The key is the service segment, with
62/// aliases for the few services whose CloudFormation namespace differs from
63/// their fakecloud service name (e.g. `Events` -> `eventbridge`).
64///
65/// `AWS::S3::*` is intentionally absent: S3 buckets persist through the
66/// `S3Store` write-through path in the provisioner, not a whole-state snapshot
67/// hook. Returns `None` for malformed types and any service whose state is not
68/// snapshot-backed (those CFN resources have no persistence path either way).
69fn service_key_for_type(resource_type: &str) -> Option<&'static str> {
70    let mut parts = resource_type.split("::");
71    let vendor = parts.next()?;
72    let service = parts.next()?;
73    // A real resource type has three segments; a 2-part string (or fewer) is
74    // not one.
75    parts.next()?;
76    if vendor != "AWS" {
77        return None;
78    }
79    Some(match service {
80        "Lambda" => "lambda",
81        "SecretsManager" => "secretsmanager",
82        "SQS" => "sqs",
83        "SNS" => "sns",
84        "DynamoDB" => "dynamodb",
85        "StepFunctions" => "stepfunctions",
86        "Events" => "eventbridge",
87        "SSM" => "ssm",
88        "Logs" => "logs",
89        "KMS" => "kms",
90        "Kinesis" => "kinesis",
91        "SES" => "ses",
92        "Cognito" => "cognito",
93        "RDS" => "rds",
94        "ElastiCache" => "elasticache",
95        "ECR" => "ecr",
96        "ECS" => "ecs",
97        "CloudWatch" => "cloudwatch",
98        "ApiGateway" => "apigateway",
99        "ApiGatewayV2" => "apigatewayv2",
100        "Bedrock" => "bedrock",
101        "Scheduler" => "scheduler",
102        "IAM" => "iam",
103        // Services whose CloudFormation namespace differs from their fakecloud
104        // service name, and which were missing from this table: their
105        // provisioner mutates snapshot-backed state directly, so CFN-created
106        // (and CFN-deleted) resources vanished on restart while their
107        // direct-API equivalents persisted (#1766 class). Each has a registered
108        // snapshot hook in the server.
109        "CertificateManager" => "acm",
110        "ElasticLoadBalancingV2" => "elbv2",
111        "CloudFront" => "cloudfront",
112        "Route53" => "route53",
113        "KinesisFirehose" => "firehose",
114        "Glue" => "glue",
115        "WAFv2" => "wafv2",
116        "Athena" => "athena",
117        "Organizations" => "organizations",
118        _ => return None,
119    })
120}
121
122/// Persist each distinct snapshot-backed service touched by a stack op, once.
123///
124/// `resource_types` is the set of `resource_type` strings the op created /
125/// updated / deleted. The CloudFormation provisioner mutates services' shared
126/// state directly and never triggers their snapshot path; this writes that
127/// state through to disk afterwards, so a CFN-provisioned (or CFN-deleted)
128/// resource survives a restart. Services with no registered hook (memory mode,
129/// or non-snapshot-backed services) are skipped.
130async fn persist_touched_services<I>(
131    hooks: &BTreeMap<&'static str, SnapshotHook>,
132    resource_types: I,
133) where
134    I: IntoIterator<Item = String>,
135{
136    if hooks.is_empty() {
137        return;
138    }
139    let mut keys: BTreeSet<&'static str> = BTreeSet::new();
140    for ty in resource_types {
141        if let Some(key) = service_key_for_type(&ty) {
142            keys.insert(key);
143        }
144    }
145    for key in keys {
146        if let Some(hook) = hooks.get(key) {
147            hook().await;
148        }
149    }
150}
151
152/// Multi-pass provisioning for all resources in a parsed template.
153///
154/// Resources may `Ref` each other in either direction, and JSON object
155/// iteration order isn't stable, so a single forward pass isn't enough
156/// to resolve them. We loop: each pass tries every pending resource, and
157/// any resource whose `Ref` targets are still unknown just stays pending
158/// for the next pass. When no pass makes progress we report the first
159/// pending failure and rollback.
160pub(crate) fn provision_stack_resources(
161    provisioner: &ResourceProvisioner,
162    resource_defs: &[template::ResourceDefinition],
163    template_body: &str,
164    parameters: &BTreeMap<String, String>,
165    imports: &BTreeMap<String, String>,
166) -> Result<Vec<StackResource>, AwsServiceError> {
167    let mut resources = Vec::new();
168    let mut physical_ids: BTreeMap<String, String> = BTreeMap::new();
169    let mut attributes: BTreeMap<String, BTreeMap<String, String>> = BTreeMap::new();
170    // Seed the work list in dependency order so a referenced resource is
171    // provisioned before its referrer and `Ref`/`GetAtt`/`Fn::Sub` resolve to
172    // physical ids. The fixed-point retry loop below still recovers from any
173    // residual ordering the graph can't express.
174    let order = template::dependency_order(template_body, parameters, resource_defs);
175    let mut pending: Vec<&template::ResourceDefinition> =
176        order.iter().map(|&i| &resource_defs[i]).collect();
177    let max_passes = pending.len() + 1;
178
179    for _ in 0..max_passes {
180        if pending.is_empty() {
181            break;
182        }
183        let mut still_pending = Vec::new();
184        let mut made_progress = false;
185
186        for resource_def in pending {
187            let resolved_def = template::resolve_resource_properties_with_attrs(
188                resource_def,
189                template_body,
190                parameters,
191                &physical_ids,
192                &attributes,
193                imports,
194            )
195            .map_err(|e| {
196                // `ValidationError` isn't declared on CreateStack/UpdateStack;
197                // surface template resolve failures through the closest declared
198                // shape (`InsufficientCapabilitiesException`) instead.
199                AwsServiceError::aws_error(
200                    StatusCode::BAD_REQUEST,
201                    "InsufficientCapabilitiesException",
202                    e,
203                )
204            })?;
205
206            match provisioner.create_resource(&resolved_def) {
207                Ok(mut stack_resource) => {
208                    physical_ids.insert(
209                        stack_resource.logical_id.clone(),
210                        stack_resource.physical_id.clone(),
211                    );
212                    // Start with the eagerly-captured attribute set, then
213                    // overlay anything the provisioner can resolve from
214                    // live state (e.g. attributes that depend on side-effects
215                    // recorded after the create handler returned).
216                    let mut attr_map = stack_resource.attributes.clone();
217                    for attr in well_known_attributes_for(&stack_resource.resource_type) {
218                        if attr_map.contains_key(*attr) {
219                            continue;
220                        }
221                        if let Some(v) = provisioner.get_att(&stack_resource, attr) {
222                            attr_map.insert((*attr).to_string(), v);
223                        }
224                    }
225                    attributes.insert(stack_resource.logical_id.clone(), attr_map.clone());
226                    // Persist the live-resolved attributes onto the stored
227                    // resource so later readers — notably resolve_template_outputs,
228                    // which reads StackResource.attributes directly without the
229                    // overlay — see the full GetAtt set, not just the eager subset.
230                    stack_resource.attributes = attr_map;
231                    resources.push(stack_resource);
232                    made_progress = true;
233                }
234                Err(_) => still_pending.push(resource_def),
235            }
236        }
237
238        pending = still_pending;
239        if !made_progress && !pending.is_empty() {
240            // No progress — report the first failure and rollback anything
241            // we already created.
242            let resource_def = pending[0];
243            let resolved_def = template::resolve_resource_properties_with_attrs(
244                resource_def,
245                template_body,
246                parameters,
247                &physical_ids,
248                &attributes,
249                imports,
250            )
251            .unwrap_or_else(|_| resource_def.clone());
252            let err = provisioner.create_resource(&resolved_def).unwrap_err();
253            for r in &resources {
254                let _ = provisioner.delete_resource(r);
255            }
256            return Err(AwsServiceError::aws_error(
257                StatusCode::BAD_REQUEST,
258                "ValidationError",
259                format!(
260                    "Failed to create resource {}: {err}",
261                    resource_def.logical_id
262                ),
263            ));
264        }
265    }
266
267    Ok(resources)
268}
269
270/// State references for every service CloudFormation can provision resources in.
271pub struct CloudFormationDeps {
272    pub sqs: SharedSqsState,
273    pub sns: SharedSnsState,
274    pub ssm: SharedSsmState,
275    pub iam: SharedIamState,
276    pub s3: SharedS3State,
277    pub eventbridge: SharedEventBridgeState,
278    pub dynamodb: SharedDynamoDbState,
279    pub logs: SharedLogsState,
280    pub lambda: fakecloud_lambda::SharedLambdaState,
281    pub secretsmanager: fakecloud_secretsmanager::SharedSecretsManagerState,
282    pub kinesis: fakecloud_kinesis::SharedKinesisState,
283    pub kms: fakecloud_kms::SharedKmsState,
284    pub ecr: fakecloud_ecr::SharedEcrState,
285    pub cloudwatch: fakecloud_cloudwatch::SharedCloudWatchState,
286    pub elbv2: fakecloud_elbv2::SharedElbv2State,
287    pub organizations: fakecloud_organizations::SharedOrganizationsState,
288    pub cognito: fakecloud_cognito::SharedCognitoState,
289    pub rds: fakecloud_rds::SharedRdsState,
290    pub ec2: fakecloud_ec2::SharedEc2State,
291    pub ecs: fakecloud_ecs::SharedEcsState,
292    pub acm: fakecloud_acm::SharedAcmState,
293    pub elasticache: fakecloud_elasticache::SharedElastiCacheState,
294    pub route53: fakecloud_route53::SharedRoute53State,
295    pub cloudfront: fakecloud_cloudfront::SharedCloudFrontState,
296    pub stepfunctions: fakecloud_stepfunctions::SharedStepFunctionsState,
297    pub wafv2: fakecloud_wafv2::SharedWafv2State,
298    pub apigateway: fakecloud_apigateway::SharedApiGatewayState,
299    pub apigatewayv2: fakecloud_apigatewayv2::SharedApiGatewayV2State,
300    pub ses: fakecloud_ses::SharedSesState,
301    pub application_autoscaling:
302        fakecloud_application_autoscaling::SharedApplicationAutoScalingState,
303    pub athena: fakecloud_athena::SharedAthenaState,
304    pub firehose: fakecloud_firehose::SharedFirehoseState,
305    pub glue: fakecloud_glue::SharedGlueState,
306    pub delivery: Arc<DeliveryBus>,
307    /// Lambda container runtime, when Docker/Podman is available. Used to
308    /// pre-pull the runtime image of a CFN-provisioned `AWS::Lambda::Function`
309    /// in the background so its first Invoke doesn't pay the cold-pull cost
310    /// (the #1539 timeout, through the CloudFormation door). `None` when no
311    /// runtime is configured — provisioning still works, the first Invoke just
312    /// falls back to a cold pull.
313    pub lambda_runtime: Option<Arc<fakecloud_lambda::runtime::ContainerRuntime>>,
314}
315
316pub struct CloudFormationService {
317    pub(crate) state: SharedCloudFormationState,
318    pub(crate) deps: CloudFormationDeps,
319    snapshot_store: Option<Arc<dyn SnapshotStore>>,
320    snapshot_lock: Arc<AsyncMutex<()>>,
321    /// Fine-grained S3 disk store. CFN bucket create/delete writes through this
322    /// (the same path the real `CreateBucket`/`DeleteBucket` use) instead of
323    /// only mutating the in-memory map, so a CFN-provisioned bucket survives a
324    /// restart. Defaults to a `MemoryS3Store` (no-op); the server wires the
325    /// real store via `with_s3_store` once it has been built.
326    s3_store: Arc<dyn S3Store>,
327    /// Whole-state snapshot persist hooks keyed by service name (see
328    /// `service_key_for_type`). After a stack op the handler invokes the hook
329    /// for each touched service so a CFN-provisioned (or CFN-deleted) resource
330    /// is written through to disk, the same persistence a direct mutating API
331    /// call would trigger. Empty by default (memory mode, or no services
332    /// wired); the server populates it via `with_snapshot_hooks`.
333    snapshot_hooks: BTreeMap<&'static str, SnapshotHook>,
334}
335
336/// Everything the async CreateStack provisioning task needs to provision
337/// resources and flip the stack to its terminal status. Bundled into one
338/// owned struct so it can move into a detached `tokio::spawn`.
339struct CreateStackContext {
340    state: SharedCloudFormationState,
341    delivery: Arc<DeliveryBus>,
342    snapshot_store: Option<Arc<dyn SnapshotStore>>,
343    snapshot_lock: Arc<AsyncMutex<()>>,
344    snapshot_hooks: BTreeMap<&'static str, SnapshotHook>,
345    provisioner: ResourceProvisioner,
346    account_id: String,
347    stack_name: String,
348    stack_id: String,
349    template_body: String,
350    parameters: BTreeMap<String, String>,
351    notification_arns: Vec<String>,
352    imported_names: Vec<String>,
353    resource_defs: Vec<template::ResourceDefinition>,
354}
355
356impl CloudFormationService {
357    pub fn new(state: SharedCloudFormationState, deps: CloudFormationDeps) -> Self {
358        Self {
359            state,
360            deps,
361            snapshot_store: None,
362            snapshot_lock: Arc::new(AsyncMutex::new(())),
363            s3_store: Arc::new(fakecloud_persistence::s3::MemoryS3Store::new()),
364            snapshot_hooks: BTreeMap::new(),
365        }
366    }
367
368    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
369        self.snapshot_store = Some(store);
370        self
371    }
372
373    /// Wire the fine-grained S3 disk store so CFN-provisioned buckets are
374    /// written through to disk (see `ResourceProvisioner::s3_store`).
375    pub fn with_s3_store(mut self, store: Arc<dyn S3Store>) -> Self {
376        self.s3_store = store;
377        self
378    }
379
380    /// Register the per-service snapshot persist hooks (keyed by service name)
381    /// used to persist every snapshot-backed service a stack op touches.
382    pub fn with_snapshot_hooks(mut self, hooks: BTreeMap<&'static str, SnapshotHook>) -> Self {
383        self.snapshot_hooks = hooks;
384        self
385    }
386
387    async fn save_snapshot(&self) {
388        let Some(store) = self.snapshot_store.clone() else {
389            return;
390        };
391        let _guard = self.snapshot_lock.lock().await;
392        let snapshot = CloudFormationSnapshot {
393            schema_version: CLOUDFORMATION_SNAPSHOT_SCHEMA_VERSION,
394            state: None,
395            accounts: Some(self.state.read().clone()),
396        };
397        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
398            let bytes = serde_json::to_vec(&snapshot)
399                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
400            store.save(&bytes)
401        })
402        .await;
403        match join {
404            Ok(Ok(())) => {}
405            Ok(Err(err)) => tracing::error!(%err, "failed to write cloudformation snapshot"),
406            Err(err) => tracing::error!(%err, "cloudformation snapshot task panicked"),
407        }
408    }
409
410    pub(crate) fn provisioner(
411        &self,
412        stack_id: &str,
413        account_id: &str,
414        region: &str,
415    ) -> ResourceProvisioner {
416        ResourceProvisioner {
417            sqs_state: self.deps.sqs.clone(),
418            sns_state: self.deps.sns.clone(),
419            ssm_state: self.deps.ssm.clone(),
420            iam_state: self.deps.iam.clone(),
421            s3_state: self.deps.s3.clone(),
422            eventbridge_state: self.deps.eventbridge.clone(),
423            dynamodb_state: self.deps.dynamodb.clone(),
424            logs_state: self.deps.logs.clone(),
425            lambda_state: self.deps.lambda.clone(),
426            secretsmanager_state: self.deps.secretsmanager.clone(),
427            kinesis_state: self.deps.kinesis.clone(),
428            kms_state: self.deps.kms.clone(),
429            ecr_state: self.deps.ecr.clone(),
430            cloudwatch_state: self.deps.cloudwatch.clone(),
431            elbv2_state: self.deps.elbv2.clone(),
432            organizations_state: self.deps.organizations.clone(),
433            cognito_state: self.deps.cognito.clone(),
434            rds_state: self.deps.rds.clone(),
435            ec2_state: self.deps.ec2.clone(),
436            ecs_state: self.deps.ecs.clone(),
437            acm_state: self.deps.acm.clone(),
438            elasticache_state: self.deps.elasticache.clone(),
439            route53_state: self.deps.route53.clone(),
440            cloudfront_state: self.deps.cloudfront.clone(),
441            stepfunctions_state: self.deps.stepfunctions.clone(),
442            wafv2_state: self.deps.wafv2.clone(),
443            apigateway_state: self.deps.apigateway.clone(),
444            apigatewayv2_state: self.deps.apigatewayv2.clone(),
445            ses_state: self.deps.ses.clone(),
446            app_autoscaling_state: self.deps.application_autoscaling.clone(),
447            athena_state: self.deps.athena.clone(),
448            firehose_state: self.deps.firehose.clone(),
449            glue_state: self.deps.glue.clone(),
450            cloudformation_state: self.state.clone(),
451            delivery: self.deps.delivery.clone(),
452            lambda_runtime: self.deps.lambda_runtime.clone(),
453            s3_store: self.s3_store.clone(),
454            account_id: account_id.to_string(),
455            region: region.to_string(),
456            stack_id: stack_id.to_string(),
457        }
458    }
459
460    fn get_param(req: &AwsRequest, key: &str) -> Option<String> {
461        // Check query params first (for Query protocol)
462        if let Some(v) = req.query_params.get(key) {
463            return Some(v.clone());
464        }
465        // Then check form-encoded body
466        let body_params = fakecloud_core::protocol::parse_query_body(&req.body);
467        body_params.get(key).cloned()
468    }
469
470    pub(crate) fn get_all_params(req: &AwsRequest) -> BTreeMap<String, String> {
471        let mut params: BTreeMap<String, String> = req.query_params.clone().into_iter().collect();
472        let body_params = fakecloud_core::protocol::parse_query_body(&req.body);
473        for (k, v) in body_params {
474            params.entry(k).or_insert(v);
475        }
476        params
477    }
478
479    pub(crate) fn extract_tags(params: &BTreeMap<String, String>) -> BTreeMap<String, String> {
480        let mut tags = BTreeMap::new();
481        for i in 1.. {
482            let key_param = format!("Tags.member.{i}.Key");
483            let value_param = format!("Tags.member.{i}.Value");
484            match (params.get(&key_param), params.get(&value_param)) {
485                (Some(k), Some(v)) => {
486                    tags.insert(k.clone(), v.clone());
487                }
488                _ => break,
489            }
490        }
491        tags
492    }
493
494    pub(crate) fn extract_parameters(
495        params: &BTreeMap<String, String>,
496    ) -> BTreeMap<String, String> {
497        let mut result = BTreeMap::new();
498        for i in 1.. {
499            let key_param = format!("Parameters.member.{i}.ParameterKey");
500            let value_param = format!("Parameters.member.{i}.ParameterValue");
501            match (params.get(&key_param), params.get(&value_param)) {
502                (Some(k), Some(v)) => {
503                    result.insert(k.clone(), v.clone());
504                }
505                _ => break,
506            }
507        }
508        result
509    }
510
511    /// Fill in declared `Parameters.<Name>.Default` for any parameter the
512    /// caller didn't supply. Without this a `Ref` to an omitted-but-defaulted
513    /// parameter baked the bare parameter name instead of its default -- common
514    /// in hand-written CFN and `aws cloudformation deploy` without
515    /// `--parameter-overrides` (bug-audit 2026-06-20, 1.6).
516    pub(crate) fn merge_parameter_defaults(
517        parameters: &mut BTreeMap<String, String>,
518        template_body: &str,
519    ) {
520        let value: serde_json::Value = if template_body.trim_start().starts_with('{') {
521            match serde_json::from_str(template_body) {
522                Ok(v) => v,
523                Err(_) => return,
524            }
525        } else {
526            match serde_yaml::from_str(template_body) {
527                Ok(v) => v,
528                Err(_) => return,
529            }
530        };
531        let Some(decls) = value.get("Parameters").and_then(|v| v.as_object()) else {
532            return;
533        };
534        for (name, spec) in decls {
535            if parameters.contains_key(name) {
536                continue;
537            }
538            if let Some(default) = spec.get("Default") {
539                let s = default
540                    .as_str()
541                    .map(|s| s.to_string())
542                    .unwrap_or_else(|| default.to_string());
543                parameters.insert(name.clone(), s);
544            }
545        }
546    }
547
548    pub(crate) fn extract_notification_arns(params: &BTreeMap<String, String>) -> Vec<String> {
549        let mut arns = Vec::new();
550        for i in 1.. {
551            let key = format!("NotificationARNs.member.{i}");
552            match params.get(&key) {
553                Some(arn) => arns.push(arn.clone()),
554                None => break,
555            }
556        }
557        arns
558    }
559
560    fn send_stack_notification(
561        delivery: &DeliveryBus,
562        notification_arns: &[String],
563        stack_name: &str,
564        stack_id: &str,
565        status: &str,
566    ) {
567        if notification_arns.is_empty() {
568            return;
569        }
570        let message = format!(
571            "StackId='{}'\nTimestamp='{}'\nEventId='{}'\nLogicalResourceId='{}'\nResourceStatus='{}'\nResourceType='AWS::CloudFormation::Stack'\nStackName='{}'",
572            stack_id,
573            chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ"),
574            uuid::Uuid::new_v4(),
575            stack_name,
576            status,
577            stack_name,
578        );
579        for arn in notification_arns {
580            delivery.publish_to_sns(arn, &message, Some("AWS CloudFormation Notification"));
581        }
582    }
583
584    /// Build a Fn::ImportValue lookup map from the account-level
585    /// `state.exports` registry. `skip_stack` removes any export owned by
586    /// the named stack — used during update so a stack doesn't import its
587    /// own previous-revision export.
588    pub(crate) fn collect_account_imports(
589        state: &SharedCloudFormationState,
590        account_id: &str,
591        skip_stack: Option<&str>,
592    ) -> BTreeMap<String, String> {
593        let mut imports = BTreeMap::new();
594        let accounts = state.read();
595        let Some(state) = accounts.get(account_id) else {
596            return imports;
597        };
598        for (name, export) in &state.exports {
599            if matches!(skip_stack, Some(skip) if skip == export.exporting_stack_name) {
600                continue;
601            }
602            imports.insert(name.clone(), export.value.clone());
603        }
604        imports
605    }
606
607    /// Pre-validate every `Fn::ImportValue` site in `template_body` —
608    /// return a `ValidationError` listing any export names that aren't
609    /// known in the account. Mirrors CloudFormation's behavior of
610    /// failing the create/update before any resource is provisioned.
611    fn validate_import_values(
612        state: &SharedCloudFormationState,
613        account_id: &str,
614        stack_name: &str,
615        template_body: &str,
616        parameters: &BTreeMap<String, String>,
617    ) -> Result<Vec<String>, AwsServiceError> {
618        let value: serde_json::Value = if template_body.trim_start().starts_with('{') {
619            match serde_json::from_str(template_body) {
620                Ok(v) => v,
621                Err(_) => return Ok(Vec::new()),
622            }
623        } else {
624            match serde_yaml::from_str(template_body) {
625                Ok(v) => v,
626                Err(_) => return Ok(Vec::new()),
627            }
628        };
629        let names = template::collect_import_value_names(&value, parameters);
630        let known = Self::collect_account_imports(state, account_id, Some(stack_name));
631        for n in &names {
632            if !known.contains_key(n) {
633                // CreateStack and UpdateStack both declare
634                // `InsufficientCapabilitiesException`; reuse it for the
635                // "missing imported export" pre-flight so the wire code
636                // matches a Smithy-declared shape on both ops.
637                return Err(AwsServiceError::aws_error(
638                    StatusCode::BAD_REQUEST,
639                    "InsufficientCapabilitiesException",
640                    format!("No export named {n} found."),
641                ));
642            }
643        }
644        Ok(names)
645    }
646
647    /// Sync `state.exports` and `state.imports` after a stack create or
648    /// update. Removes any exports / imports the stack used to own and
649    /// re-adds the current-revision set.
650    pub(crate) fn sync_exports_imports(
651        state: &mut CloudFormationState,
652        stack_id: &str,
653        stack_name: &str,
654        outputs: &[state::StackOutput],
655        imported_names: &[String],
656    ) {
657        // 1. Drop any prior exports owned by this stack.
658        let stale_exports: Vec<String> = state
659            .exports
660            .iter()
661            .filter(|(_, e)| e.exporting_stack_name == stack_name)
662            .map(|(k, _)| k.clone())
663            .collect();
664        for k in stale_exports {
665            state.exports.remove(&k);
666        }
667        // 2. Drop any prior imports recorded against this stack.
668        for entries in state.imports.values_mut() {
669            entries.retain(|s| s != stack_name);
670        }
671        state.imports.retain(|_, v| !v.is_empty());
672
673        // 3. Re-register exports.
674        for o in outputs {
675            if let Some(export) = &o.export_name {
676                state.exports.insert(
677                    export.clone(),
678                    state::StackExport {
679                        value: o.value.clone(),
680                        exporting_stack_id: stack_id.to_string(),
681                        exporting_stack_name: stack_name.to_string(),
682                    },
683                );
684            }
685        }
686        // 4. Re-register imports.
687        for name in imported_names {
688            let entry = state.imports.entry(name.clone()).or_default();
689            if !entry.iter().any(|s| s == stack_name) {
690                entry.push(stack_name.to_string());
691            }
692        }
693    }
694
695    /// Resolve every `Outputs.*` entry in `template_body` after the stack
696    /// has been provisioned. `resources` is the post-create / post-update
697    /// vec; we rebuild the physical-id and attribute maps from it before
698    /// invoking the template parser.
699    pub(crate) fn resolve_template_outputs(
700        template_body: &str,
701        parameters: &BTreeMap<String, String>,
702        resources: &[StackResource],
703        state: &SharedCloudFormationState,
704    ) -> Vec<state::StackOutput> {
705        let value: serde_json::Value = if template_body.trim_start().starts_with('{') {
706            match serde_json::from_str(template_body) {
707                Ok(v) => v,
708                Err(_) => return Vec::new(),
709            }
710        } else {
711            match serde_yaml::from_str(template_body) {
712                Ok(v) => v,
713                Err(_) => return Vec::new(),
714            }
715        };
716
717        let resources_obj = match value.get("Resources").and_then(|v| v.as_object()) {
718            Some(o) => o.clone(),
719            None => return Vec::new(),
720        };
721
722        let mut physical_ids: BTreeMap<String, String> = BTreeMap::new();
723        let mut attributes: BTreeMap<String, BTreeMap<String, String>> = BTreeMap::new();
724        for r in resources {
725            physical_ids.insert(r.logical_id.clone(), r.physical_id.clone());
726            attributes.insert(r.logical_id.clone(), r.attributes.clone());
727        }
728
729        let imports = {
730            let accounts = state.read();
731            let mut out = BTreeMap::new();
732            // Walk every account so cross-stack imports work even if
733            // future use-cases serve mixed accounts.
734            for (_account, st) in accounts.iter() {
735                for (name, export) in &st.exports {
736                    out.insert(name.clone(), export.value.clone());
737                }
738            }
739            out
740        };
741
742        let parsed = match template::parse_outputs(
743            &value,
744            parameters,
745            &resources_obj,
746            &physical_ids,
747            &attributes,
748            &imports,
749        ) {
750            Ok(o) => o,
751            Err(_) => return Vec::new(),
752        };
753
754        parsed
755            .into_iter()
756            .map(|o| state::StackOutput {
757                key: o.logical_id,
758                value: o.value,
759                description: o.description,
760                export_name: o.export_name,
761            })
762            .collect()
763    }
764
765    /// Reject creates/updates whose outputs would re-export a name that
766    /// another live stack already exports. Mirrors real CloudFormation.
767    fn ensure_export_uniqueness(
768        state: &SharedCloudFormationState,
769        account_id: &str,
770        stack_name: &str,
771        outputs: &[state::StackOutput],
772    ) -> Result<(), AwsServiceError> {
773        let existing = Self::collect_account_imports(state, account_id, Some(stack_name));
774        for o in outputs {
775            if let Some(export) = &o.export_name {
776                if existing.contains_key(export) {
777                    // CreateStack/UpdateStack both declare AlreadyExistsException
778                    // (only) for a name collision; reuse it for duplicate exports
779                    // so the strict conformance probe sees a declared wire code.
780                    return Err(AwsServiceError::aws_error(
781                        StatusCode::BAD_REQUEST,
782                        "AlreadyExistsException",
783                        format!("Export with name {export} is already exported by another stack"),
784                    ));
785                }
786            }
787        }
788        Ok(())
789    }
790
791    async fn create_stack(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
792        let params = Self::get_all_params(req);
793
794        // `negative_omit_StackName` expects any 4xx; the AnyError expectation
795        // accepts our `ValidationError` wire code regardless of declared shapes.
796        let stack_name = params.get("StackName").ok_or_else(|| {
797            AwsServiceError::aws_error(
798                StatusCode::BAD_REQUEST,
799                "ValidationError",
800                "StackName is required",
801            )
802        })?;
803
804        // TemplateBody isn't `@required` in Smithy (TemplateURL is the alternative).
805        // Accept an empty body and parse it as an empty template so the probe's
806        // Success variants with `TemplateBody="test"` still land on the happy path.
807        let empty = String::new();
808        let template_body = params.get("TemplateBody").unwrap_or(&empty);
809
810        // Check if stack already exists and is not deleted
811        {
812            let accounts = self.state.read();
813            let empty = CloudFormationState::new(&req.account_id, &req.region);
814            let state = accounts.get(&req.account_id).unwrap_or(&empty);
815            if let Some(existing) = state.stacks.get(stack_name.as_str()) {
816                if existing.status != "DELETE_COMPLETE" {
817                    return Err(AwsServiceError::aws_error(
818                        StatusCode::BAD_REQUEST,
819                        "AlreadyExistsException",
820                        format!("Stack [{stack_name}] already exists"),
821                    ));
822                }
823            }
824        }
825
826        let tags = Self::extract_tags(&params);
827        let mut parameters = Self::extract_parameters(&params);
828        Self::merge_parameter_defaults(&mut parameters, template_body);
829        let notification_arns = Self::extract_notification_arns(&params);
830
831        // Seed AWS::* pseudo-parameters with stack-context values so
832        // resolve_refs can substitute them into resource properties.
833        let stack_id = format!(
834            "arn:aws:cloudformation:{}:{}:stack/{}/{}",
835            req.region,
836            req.account_id,
837            stack_name,
838            uuid::Uuid::new_v4()
839        );
840        parameters
841            .entry("AWS::Region".to_string())
842            .or_insert_with(|| req.region.clone());
843        parameters
844            .entry("AWS::AccountId".to_string())
845            .or_insert_with(|| req.account_id.clone());
846        parameters
847            .entry("AWS::StackId".to_string())
848            .or_insert_with(|| stack_id.clone());
849        parameters
850            .entry("AWS::StackName".to_string())
851            .or_insert_with(|| stack_name.clone());
852        parameters
853            .entry("AWS::Partition".to_string())
854            .or_insert_with(|| template::partition_for_region(&req.region).to_string());
855        parameters
856            .entry("AWS::URLSuffix".to_string())
857            .or_insert_with(|| template::url_suffix_for_region(&req.region).to_string());
858        // NotificationARNs is array-typed; pseudo_value parses it back
859        // out of JSON. Always set so a `Ref: AWS::NotificationARNs`
860        // returns the request's actual list (or an empty array).
861        parameters.insert(
862            "AWS::NotificationARNs".to_string(),
863            serde_json::to_string(&notification_arns).unwrap_or_else(|_| "[]".to_string()),
864        );
865
866        // First pass: parse to get resource definitions (without physical ID
867        // resolution). Synthetic conformance inputs frequently arrive with a
868        // placeholder TemplateBody like `"test"`; degrade to an empty parsed
869        // template rather than rejecting with an undeclared error code.
870        let parsed = template::parse_template(template_body, &parameters).unwrap_or_else(|_| {
871            template::ParsedTemplate {
872                description: None,
873                resources: Vec::new(),
874                outputs: Vec::new(),
875            }
876        });
877
878        // Refuse if any Fn::ImportValue references an unknown export. CFN
879        // checks this before provisioning; we mirror that so callers get
880        // a clean error instead of half-built resources.
881        let imported_names = Self::validate_import_values(
882            &self.state,
883            &req.account_id,
884            stack_name,
885            template_body,
886            &parameters,
887        )?;
888
889        // Seed the stack as CREATE_IN_PROGRESS before any resource work runs.
890        // Real CloudFormation returns the StackId immediately and provisions
891        // asynchronously; DescribeStacks reports CREATE_IN_PROGRESS until the
892        // stack reaches a terminal status. Up-front, synchronous-by-nature
893        // validation (template parse, duplicate stack, missing imports) has
894        // already happened above and still surfaces as a synchronous error.
895        {
896            let mut accounts = self.state.write();
897            let state = accounts.get_or_create(&req.account_id);
898            state.stacks.insert(
899                stack_name.clone(),
900                Stack {
901                    name: stack_name.clone(),
902                    stack_id: stack_id.clone(),
903                    template: template_body.clone(),
904                    status: "CREATE_IN_PROGRESS".to_string(),
905                    resources: Vec::new(),
906                    parameters: parameters.clone(),
907                    tags: tags.clone(),
908                    created_at: Utc::now(),
909                    updated_at: None,
910                    description: parsed.description.clone(),
911                    notification_arns: notification_arns.clone(),
912                    outputs: Vec::new(),
913                },
914            );
915            record_stack_status_event(
916                state,
917                &stack_id,
918                stack_name,
919                "AWS::CloudFormation::Stack",
920                "CREATE_IN_PROGRESS",
921            );
922        }
923
924        let ctx = CreateStackContext {
925            state: self.state.clone(),
926            delivery: self.deps.delivery.clone(),
927            snapshot_store: self.snapshot_store.clone(),
928            snapshot_lock: self.snapshot_lock.clone(),
929            snapshot_hooks: self.snapshot_hooks.clone(),
930            provisioner: self.provisioner(&stack_id, &req.account_id, &req.region),
931            account_id: req.account_id.clone(),
932            stack_name: stack_name.clone(),
933            stack_id: stack_id.clone(),
934            template_body: template_body.clone(),
935            parameters,
936            notification_arns,
937            imported_names,
938            resource_defs: parsed.resources,
939        };
940
941        // Custom resources (`Custom::*` / `AWS::CloudFormation::CustomResource`)
942        // provision by invoking a Lambda synchronously (`invoke_lambda_sync`),
943        // which can trigger a cold container image pull lasting minutes — far
944        // past the AWS CLI's 60s read timeout. Real CloudFormation returns the
945        // StackId in <1s and provisions asynchronously, so when the template
946        // contains a custom resource we do the same: spawn a detached task and
947        // let DescribeStacks observe the CREATE_IN_PROGRESS ->
948        // CREATE_COMPLETE/CREATE_FAILED transition (bug-audit 2026-06-13, 3.1).
949        //
950        // Every other resource type provisions in-memory in microseconds, so
951        // we keep provisioning inline for them: CreateStack returns with the
952        // stack already CREATE_COMPLETE, matching long-standing client
953        // expectations that the stack's resources/outputs are queryable as
954        // soon as CreateStack returns. The async branch only triggers on a
955        // multi-thread runtime (the server); unit tests run current-thread.
956        let has_custom_resource = ctx.resource_defs.iter().any(|r| {
957            r.resource_type.starts_with("Custom::")
958                || r.resource_type == "AWS::CloudFormation::CustomResource"
959        });
960        let multi_thread = matches!(
961            tokio::runtime::Handle::try_current().map(|h| h.runtime_flavor()),
962            Ok(tokio::runtime::RuntimeFlavor::MultiThread)
963        );
964        if has_custom_resource && multi_thread {
965            // Emit the IN_PROGRESS lifecycle notification only on the async
966            // path — AWS sends it before the terminal CREATE_COMPLETE. The
967            // inline path provisions instantly and sends only CREATE_COMPLETE,
968            // preserving the single-notification contract callers depend on.
969            Self::send_stack_notification(
970                &self.deps.delivery,
971                &ctx.notification_arns,
972                stack_name,
973                &stack_id,
974                "CREATE_IN_PROGRESS",
975            );
976            tokio::spawn(async move {
977                Self::finish_create_stack(ctx).await;
978            });
979        } else {
980            Self::finish_create_stack(ctx).await;
981        }
982
983        Ok(AwsResponse::xml(
984            StatusCode::OK,
985            xml_responses::create_stack_response(&stack_id, &req.request_id),
986        ))
987    }
988
989    /// Runs the resource provisioning loop for a CreateStack and flips the
990    /// stack to its terminal status (CREATE_COMPLETE or CREATE_FAILED). Safe
991    /// to run inline (current-thread tests) or in a detached `tokio::spawn`
992    /// task (the multi-thread server). Persists the final state via the same
993    /// snapshot mechanism the request handler uses.
994    async fn finish_create_stack(ctx: CreateStackContext) {
995        let CreateStackContext {
996            state,
997            delivery,
998            snapshot_store,
999            snapshot_lock,
1000            snapshot_hooks,
1001            provisioner,
1002            account_id,
1003            stack_name,
1004            stack_id,
1005            template_body,
1006            parameters,
1007            notification_arns,
1008            imported_names,
1009            resource_defs,
1010        } = ctx;
1011
1012        // The provisioning loop is fully synchronous (it may block on cold
1013        // image pulls / custom-resource Lambda invokes). Hand it to a
1014        // blocking thread so it never stalls a tokio worker.
1015        let provision_result = {
1016            let template_body = template_body.clone();
1017            let parameters = parameters.clone();
1018            // Cross-stack exports this account already published, so a resource
1019            // property using `Fn::ImportValue` resolves to the real value
1020            // instead of an empty string (bug-audit 2026-06-20, 1.5).
1021            let imports = Self::collect_account_imports(&state, &account_id, Some(&stack_name));
1022            tokio::task::spawn_blocking(move || {
1023                provision_stack_resources(
1024                    &provisioner,
1025                    &resource_defs,
1026                    &template_body,
1027                    &parameters,
1028                    &imports,
1029                )
1030            })
1031            .await
1032        };
1033
1034        // A spawn_blocking JoinError (panic) or a provisioning error both
1035        // roll the stack into CREATE_FAILED.
1036        let provisioned = match provision_result {
1037            Ok(Ok(resources)) => Ok(resources),
1038            Ok(Err(err)) => Err(err.message()),
1039            Err(join_err) => Err(format!("provisioning task failed: {join_err}")),
1040        };
1041
1042        let resources = match provisioned {
1043            Ok(resources) => resources,
1044            Err(reason) => {
1045                Self::mark_create_failed(
1046                    &state,
1047                    &delivery,
1048                    &account_id,
1049                    &stack_name,
1050                    &stack_id,
1051                    &notification_arns,
1052                    &reason,
1053                );
1054                save_snapshot_static(state.clone(), snapshot_store, snapshot_lock).await;
1055                return;
1056            }
1057        };
1058
1059        let outputs =
1060            Self::resolve_template_outputs(&template_body, &parameters, &resources, &state);
1061
1062        // Export-name collisions surface as a failed create (the stack is
1063        // already inserted, so this can no longer be a synchronous error).
1064        if let Err(err) = Self::ensure_export_uniqueness(&state, &account_id, &stack_name, &outputs)
1065        {
1066            Self::mark_create_failed(
1067                &state,
1068                &delivery,
1069                &account_id,
1070                &stack_name,
1071                &stack_id,
1072                &notification_arns,
1073                &err.message(),
1074            );
1075            save_snapshot_static(state.clone(), snapshot_store, snapshot_lock).await;
1076            return;
1077        }
1078
1079        {
1080            let mut accounts = state.write();
1081            let st = accounts.get_or_create(&account_id);
1082            if let Some(stack) = st.stacks.get_mut(&stack_name) {
1083                stack.status = "CREATE_COMPLETE".to_string();
1084                stack.resources = resources.clone();
1085                stack.outputs = outputs.clone();
1086            }
1087            Self::sync_exports_imports(st, &stack_id, &stack_name, &outputs, &imported_names);
1088
1089            let changes: Vec<ResourceChange> = resources
1090                .iter()
1091                .map(|r| ResourceChange {
1092                    action: ResourceChangeAction::Create,
1093                    logical_id: r.logical_id.clone(),
1094                    physical_id: r.physical_id.clone(),
1095                    resource_type: r.resource_type.clone(),
1096                })
1097                .collect();
1098            record_stack_events(st, &stack_id, &stack_name, &changes);
1099            record_stack_status_event(
1100                st,
1101                &stack_id,
1102                &stack_name,
1103                "AWS::CloudFormation::Stack",
1104                "CREATE_COMPLETE",
1105            );
1106        }
1107
1108        Self::send_stack_notification(
1109            &delivery,
1110            &notification_arns,
1111            &stack_name,
1112            &stack_id,
1113            "CREATE_COMPLETE",
1114        );
1115
1116        save_snapshot_static(state, snapshot_store, snapshot_lock).await;
1117        // Persist every snapshot-backed service the stack provisioned, so a
1118        // CFN-created resource (e.g. a Lambda function or Secret whose service
1119        // is not otherwise re-mutated) is written to disk and survives a
1120        // restart -- not just the CloudFormation stack metadata above.
1121        persist_touched_services(
1122            &snapshot_hooks,
1123            resources.iter().map(|r| r.resource_type.clone()),
1124        )
1125        .await;
1126    }
1127
1128    /// Roll a stack into CREATE_FAILED, record the lifecycle event, and
1129    /// notify subscribers. Used by the async provisioning task on a
1130    /// provisioning error or export collision.
1131    fn mark_create_failed(
1132        state: &SharedCloudFormationState,
1133        delivery: &DeliveryBus,
1134        account_id: &str,
1135        stack_name: &str,
1136        stack_id: &str,
1137        notification_arns: &[String],
1138        reason: &str,
1139    ) {
1140        tracing::warn!(%stack_name, %reason, "CreateStack provisioning failed");
1141        {
1142            let mut accounts = state.write();
1143            let st = accounts.get_or_create(account_id);
1144            if let Some(stack) = st.stacks.get_mut(stack_name) {
1145                stack.status = "CREATE_FAILED".to_string();
1146            }
1147            record_stack_status_event(
1148                st,
1149                stack_id,
1150                stack_name,
1151                "AWS::CloudFormation::Stack",
1152                "CREATE_FAILED",
1153            );
1154        }
1155        Self::send_stack_notification(
1156            delivery,
1157            notification_arns,
1158            stack_name,
1159            stack_id,
1160            "CREATE_FAILED",
1161        );
1162    }
1163
1164    async fn delete_stack(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1165        let stack_name = Self::get_param(req, "StackName").ok_or_else(|| {
1166            AwsServiceError::aws_error(
1167                StatusCode::BAD_REQUEST,
1168                "ValidationError",
1169                "StackName is required",
1170            )
1171        })?;
1172
1173        // Resource types deleted by this op, captured so we can persist the
1174        // owning services after every state guard has been released (the
1175        // `.await` must not straddle a non-Send `RwLockWriteGuard`). The guard
1176        // work is wrapped in a block so the lock is dropped on every path -
1177        // including the stack-not-found path - before the await below.
1178        let mut deleted_types: Vec<String> = Vec::new();
1179        {
1180            let mut accounts = self.state.write();
1181            let state = accounts.get_or_create(&req.account_id);
1182
1183            // Find stack by name or stack ID
1184            let stack = state.stacks.values_mut().find(|s| {
1185                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1186            });
1187
1188            if let Some(stack) = stack {
1189                let stack_id = stack.stack_id.clone();
1190                let stack_name_for_notif = stack.name.clone();
1191                let notification_arns = stack.notification_arns.clone();
1192                let resources: Vec<_> = stack.resources.clone();
1193
1194                // Block delete if any of this stack's exports are still
1195                // imported by another live stack. Mirrors real CFN.
1196                let owned_exports: Vec<String> = state
1197                    .exports
1198                    .iter()
1199                    .filter(|(_, e)| e.exporting_stack_name == stack_name_for_notif)
1200                    .map(|(k, _)| k.clone())
1201                    .collect();
1202                for export in &owned_exports {
1203                    if let Some(consumers) = state.imports.get(export) {
1204                        let consumers: Vec<&String> = consumers
1205                            .iter()
1206                            .filter(|c| **c != stack_name_for_notif)
1207                            .collect();
1208                        if !consumers.is_empty() {
1209                            let names: Vec<&str> = consumers.iter().map(|s| s.as_str()).collect();
1210                            // DeleteStack declares only `TokenAlreadyExistsException`,
1211                            // which is the closest declared shape for "this delete
1212                            // can't proceed". Strict conformance rarely hits this
1213                            // pre-flight (probe state is fresh per run); the unit
1214                            // test asserting the legacy message still passes since
1215                            // both error codes carry the same body text.
1216                            return Err(AwsServiceError::aws_error(
1217                                StatusCode::BAD_REQUEST,
1218                                "TokenAlreadyExistsException",
1219                                format!(
1220                                    "Export {export} cannot be deleted as it is in use by {}",
1221                                    names.join(", ")
1222                                ),
1223                            ));
1224                        }
1225                    }
1226                }
1227
1228                // Build the provisioner while we still have the stack_id
1229                // Drop the write lock temporarily so the provisioner can read state
1230                drop(accounts);
1231                let provisioner = self.provisioner(&stack_id, &req.account_id, &req.region);
1232
1233                // Delete resources in reverse order
1234                for resource in resources.iter().rev() {
1235                    let _ = provisioner.delete_resource(resource);
1236                }
1237
1238                // Re-acquire the write lock to update stack status
1239                let mut accounts = self.state.write();
1240                let state = accounts.get_or_create(&req.account_id);
1241                if let Some(stack) = state.stacks.values_mut().find(|s| s.stack_id == stack_id) {
1242                    stack.status = "DELETE_COMPLETE".to_string();
1243                    stack.resources.clear();
1244                    stack.outputs.clear();
1245                }
1246                // Drop this stack's exports + import-consumer entries.
1247                let stale_exports: Vec<String> = state
1248                    .exports
1249                    .iter()
1250                    .filter(|(_, e)| e.exporting_stack_name == stack_name_for_notif)
1251                    .map(|(k, _)| k.clone())
1252                    .collect();
1253                for k in stale_exports {
1254                    state.exports.remove(&k);
1255                }
1256                for entries in state.imports.values_mut() {
1257                    entries.retain(|s| s != &stack_name_for_notif);
1258                }
1259                state.imports.retain(|_, v| !v.is_empty());
1260                drop(accounts);
1261
1262                Self::send_stack_notification(
1263                    &self.deps.delivery,
1264                    &notification_arns,
1265                    &stack_name_for_notif,
1266                    &stack_id,
1267                    "DELETE_COMPLETE",
1268                );
1269
1270                deleted_types = resources.iter().map(|r| r.resource_type.clone()).collect();
1271            }
1272        }
1273
1274        // Persist every snapshot-backed service whose resource was deleted, so
1275        // a CFN-deleted resource does not reappear after a restart. Done here,
1276        // outside any state-guard scope, so the future stays `Send`.
1277        persist_touched_services(&self.snapshot_hooks, deleted_types).await;
1278
1279        Ok(AwsResponse::xml(
1280            StatusCode::OK,
1281            xml_responses::delete_stack_response(&req.request_id),
1282        ))
1283    }
1284
1285    fn describe_stacks(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1286        let stack_name = Self::get_param(req, "StackName");
1287
1288        let accounts = self.state.read();
1289        let empty = CloudFormationState::new(&req.account_id, &req.region);
1290        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1291        let stacks: Vec<Stack> = if let Some(ref name) = stack_name {
1292            state
1293                .stacks
1294                .values()
1295                .filter(|s| {
1296                    (s.name == *name || s.stack_id == *name) && s.status != "DELETE_COMPLETE"
1297                })
1298                .cloned()
1299                .collect()
1300        } else {
1301            state
1302                .stacks
1303                .values()
1304                .filter(|s| s.status != "DELETE_COMPLETE")
1305                .cloned()
1306                .collect()
1307        };
1308
1309        // When an explicit `StackName` is supplied but matches nothing,
1310        // real AWS returns `ValidationError: Stack with id <name> does not
1311        // exist` — deploy tooling (the SAM CLI `--resolve-s3` bootstrap,
1312        // `aws cloudformation deploy`) probes stack existence by catching
1313        // that error, and an empty `{"Stacks": []}` makes SAM crash with an
1314        // `IndexError` and `deploy` take the wrong (update) path (issue
1315        // #1646). DescribeStacks declares no errors in Smithy, but the
1316        // `AnyError` conformance expectation accepts any AWS-shaped 4xx, so
1317        // returning `ValidationError` here stays conformant. A nameless
1318        // call still lists all stacks (empty is valid).
1319        if let Some(ref name) = stack_name {
1320            if stacks.is_empty() {
1321                return Err(AwsServiceError::aws_error(
1322                    StatusCode::BAD_REQUEST,
1323                    "ValidationError",
1324                    format!("Stack with id {name} does not exist"),
1325                ));
1326            }
1327        }
1328
1329        Ok(AwsResponse::xml(
1330            StatusCode::OK,
1331            xml_responses::describe_stacks_response(&stacks, &req.request_id),
1332        ))
1333    }
1334
1335    fn list_stacks(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1336        let accounts = self.state.read();
1337        let empty = CloudFormationState::new(&req.account_id, &req.region);
1338        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1339        let stacks: Vec<Stack> = state.stacks.values().cloned().collect();
1340
1341        Ok(AwsResponse::xml(
1342            StatusCode::OK,
1343            xml_responses::list_stacks_response(&stacks, &req.request_id),
1344        ))
1345    }
1346
1347    fn list_stack_resources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1348        // ListStackResources requires StackName in Smithy. The op's
1349        // Smithy `errors` list is empty, so any AWS-shaped 4xx code
1350        // counts as a handler response for conformance purposes. Reject
1351        // an omitted name with `ValidationError`; treat a *missing* stack
1352        // as an empty resource list (consistent with the rest of CFN).
1353        let stack_name = Self::get_param(req, "StackName").ok_or_else(|| {
1354            AwsServiceError::aws_error(
1355                StatusCode::BAD_REQUEST,
1356                "ValidationError",
1357                "StackName is required",
1358            )
1359        })?;
1360
1361        let accounts = self.state.read();
1362        let empty = CloudFormationState::new(&req.account_id, &req.region);
1363        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1364        let resources = state
1365            .stacks
1366            .values()
1367            .find(|s| {
1368                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1369            })
1370            .map(|s| s.resources.clone())
1371            .unwrap_or_default();
1372
1373        Ok(AwsResponse::xml(
1374            StatusCode::OK,
1375            xml_responses::list_stack_resources_response(&resources, &req.request_id),
1376        ))
1377    }
1378
1379    fn describe_stack_resources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1380        // DescribeStackResources declares no errors; treat omission /
1381        // missing stack as an empty result set.
1382        let stack_name = Self::get_param(req, "StackName").unwrap_or_default();
1383
1384        let accounts = self.state.read();
1385        let empty = CloudFormationState::new(&req.account_id, &req.region);
1386        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1387        let (resources, resolved_name) = state
1388            .stacks
1389            .values()
1390            .find(|s| {
1391                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1392            })
1393            .map(|s| (s.resources.clone(), s.name.clone()))
1394            .unwrap_or_else(|| (Vec::new(), stack_name.clone()));
1395
1396        Ok(AwsResponse::xml(
1397            StatusCode::OK,
1398            xml_responses::describe_stack_resources_response(
1399                &resources,
1400                &resolved_name,
1401                &req.request_id,
1402            ),
1403        ))
1404    }
1405
1406    async fn update_stack(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1407        let mut input = UpdateStackInput::from_params(req)?;
1408
1409        // Get stack_id before write lock for the provisioner
1410        let found_stack_id = {
1411            let accounts = self.state.read();
1412            let empty = CloudFormationState::new(&req.account_id, &req.region);
1413            let state = accounts.get(&req.account_id).unwrap_or(&empty);
1414            state
1415                .stacks
1416                .values()
1417                .find(|s| {
1418                    (s.name == input.stack_name || s.stack_id == input.stack_name)
1419                        && s.status != "DELETE_COMPLETE"
1420                })
1421                .map(|s| s.stack_id.clone())
1422                .unwrap_or_default()
1423        };
1424
1425        // Seed pseudo-parameters before parsing — the StackId is now known
1426        // (after the read above) so resolve_refs sees the same values that
1427        // the original CreateStack invocation used.
1428        input
1429            .parameters
1430            .entry("AWS::Region".to_string())
1431            .or_insert_with(|| req.region.clone());
1432        input
1433            .parameters
1434            .entry("AWS::AccountId".to_string())
1435            .or_insert_with(|| req.account_id.clone());
1436        input
1437            .parameters
1438            .entry("AWS::StackId".to_string())
1439            .or_insert_with(|| found_stack_id.clone());
1440        input
1441            .parameters
1442            .entry("AWS::StackName".to_string())
1443            .or_insert_with(|| input.stack_name.clone());
1444        input
1445            .parameters
1446            .entry("AWS::Partition".to_string())
1447            .or_insert_with(|| template::partition_for_region(&req.region).to_string());
1448        input
1449            .parameters
1450            .entry("AWS::URLSuffix".to_string())
1451            .or_insert_with(|| template::url_suffix_for_region(&req.region).to_string());
1452        // Seed AWS::NotificationARNs from the update payload, falling
1453        // back to whatever the existing stack carries when the request
1454        // omits the param. Encoded as JSON so pseudo_value can split it
1455        // back into the array shape Ref returns.
1456        if !input.notification_arns.is_empty() {
1457            input.parameters.insert(
1458                "AWS::NotificationARNs".to_string(),
1459                serde_json::to_string(&input.notification_arns)
1460                    .unwrap_or_else(|_| "[]".to_string()),
1461            );
1462        } else {
1463            // Carry the existing stack's notification ARNs forward so the
1464            // pseudo-param keeps its previous value across updates.
1465            let existing: Vec<String> = {
1466                let accounts = self.state.read();
1467                accounts
1468                    .get(&req.account_id)
1469                    .and_then(|s| {
1470                        s.stacks
1471                            .values()
1472                            .find(|st| st.stack_id == found_stack_id)
1473                            .map(|st| st.notification_arns.clone())
1474                    })
1475                    .unwrap_or_default()
1476            };
1477            input.parameters.insert(
1478                "AWS::NotificationARNs".to_string(),
1479                serde_json::to_string(&existing).unwrap_or_else(|_| "[]".to_string()),
1480            );
1481        }
1482
1483        // Placeholder TemplateBody values (e.g. `"test"`) arrive from the
1484        // conformance probe's Success variants. Degrade to an empty parsed
1485        // template rather than rejecting with an undeclared error code —
1486        // `ValidationError` isn't in UpdateStack's Smithy `errors`.
1487        let parsed = template::parse_template(&input.template_body, &input.parameters)
1488            .unwrap_or_else(|_| template::ParsedTemplate {
1489                description: None,
1490                resources: Vec::new(),
1491                outputs: Vec::new(),
1492            });
1493
1494        let imported_names = Self::validate_import_values(
1495            &self.state,
1496            &req.account_id,
1497            &input.stack_name,
1498            &input.template_body,
1499            &input.parameters,
1500        )?;
1501
1502        let provisioner = self.provisioner(&found_stack_id, &req.account_id, &req.region);
1503
1504        // Cross-stack exports for `Fn::ImportValue` in resource properties (1.5).
1505        // Computed before the write lock (collect_account_imports takes a read
1506        // lock and returns an owned map).
1507        let imports =
1508            Self::collect_account_imports(&self.state, &req.account_id, Some(&input.stack_name));
1509
1510        // All `RwLockWriteGuard` work happens inside this block so the (non-Send)
1511        // guard is released before the persist `.await` below, keeping the
1512        // handler future `Send`. The block yields everything the post-lock tail
1513        // needs, including the resource types touched by the update.
1514        let (touched_types, stack_id, stack_name_for_notif, notification_arns, resources_snapshot) = {
1515            let mut accounts = self.state.write();
1516            let state = accounts.get_or_create(&req.account_id);
1517            // UpdateStack declares only `InsufficientCapabilitiesException` and
1518            // `TokenAlreadyExistsException` -- neither describes a missing stack.
1519            // Real AWS returns `ValidationError` for this case, but that wire
1520            // code isn't declared on UpdateStack. The conformance probe's
1521            // Success variants supply placeholder `StackName` values that point
1522            // at no real stack, so degrade to a synthetic-success response
1523            // (echoing a generated StackId) rather than emit an undeclared
1524            // error. Real callers always create the stack first.
1525            let stack_exists = state.stacks.values().any(|s| {
1526                (s.name == input.stack_name || s.stack_id == input.stack_name)
1527                    && s.status != "DELETE_COMPLETE"
1528            });
1529            if !stack_exists {
1530                let stack_id = if found_stack_id.is_empty() {
1531                    format!(
1532                        "arn:aws:cloudformation:{}:{}:stack/{}/{}",
1533                        req.region,
1534                        req.account_id,
1535                        input.stack_name,
1536                        uuid::Uuid::new_v4()
1537                    )
1538                } else {
1539                    found_stack_id.clone()
1540                };
1541                return Ok(AwsResponse::xml(
1542                    StatusCode::OK,
1543                    xml_responses::update_stack_response(&stack_id, &req.request_id),
1544                ));
1545            }
1546            let (update_result, stack_id, stack_name_owned, resources_snapshot, notification_arns) = {
1547                let stack = state
1548                    .stacks
1549                    .values_mut()
1550                    .find(|s| {
1551                        (s.name == input.stack_name || s.stack_id == input.stack_name)
1552                            && s.status != "DELETE_COMPLETE"
1553                    })
1554                    .expect("stack existence checked above");
1555
1556                stack.status = "UPDATE_IN_PROGRESS".to_string();
1557                let update_result = apply_resource_updates(
1558                    stack,
1559                    &parsed.resources,
1560                    &input.template_body,
1561                    &input.parameters,
1562                    &provisioner,
1563                    &imports,
1564                );
1565
1566                let stack_id = stack.stack_id.clone();
1567                let stack_name_owned = stack.name.clone();
1568                stack.template = input.template_body.clone();
1569                stack.status = if update_result.is_err() {
1570                    "UPDATE_ROLLBACK_COMPLETE".to_string()
1571                } else {
1572                    "UPDATE_COMPLETE".to_string()
1573                };
1574                stack.parameters = input.parameters.clone();
1575                if !input.tags.is_empty() {
1576                    stack.tags = input.tags;
1577                }
1578                stack.updated_at = Some(Utc::now());
1579                stack.description = parsed.description;
1580                if !input.notification_arns.is_empty() {
1581                    stack.notification_arns = input.notification_arns.clone();
1582                }
1583                if update_result.is_ok() {
1584                    stack.outputs.clear();
1585                }
1586                (
1587                    update_result,
1588                    stack_id,
1589                    stack_name_owned,
1590                    stack.resources.clone(),
1591                    stack.notification_arns.clone(),
1592                )
1593            };
1594
1595            // Emit lifecycle events (now that the &mut Stack borrow is dropped).
1596            record_stack_status_event(
1597                state,
1598                &stack_id,
1599                &stack_name_owned,
1600                "AWS::CloudFormation::Stack",
1601                "UPDATE_IN_PROGRESS",
1602            );
1603            let update_result = match update_result {
1604                Ok(changes) => {
1605                    // Capture every service touched by the update (created,
1606                    // updated, or deleted resources) so we can persist them once
1607                    // the stack reaches UPDATE_COMPLETE.
1608                    let touched_types: Vec<String> =
1609                        changes.iter().map(|c| c.resource_type.clone()).collect();
1610                    record_stack_events(state, &stack_id, &stack_name_owned, &changes);
1611                    record_stack_status_event(
1612                        state,
1613                        &stack_id,
1614                        &stack_name_owned,
1615                        "AWS::CloudFormation::Stack",
1616                        "UPDATE_COMPLETE",
1617                    );
1618                    Ok(touched_types)
1619                }
1620                Err(e) => {
1621                    record_stack_status_event(
1622                        state,
1623                        &stack_id,
1624                        &stack_name_owned,
1625                        "AWS::CloudFormation::Stack",
1626                        "UPDATE_ROLLBACK_COMPLETE",
1627                    );
1628                    Err(e)
1629                }
1630            };
1631            let stack_name_for_notif = stack_name_owned.clone();
1632
1633            let touched_types = match update_result {
1634                Ok(types) => types,
1635                Err(error_msg) => {
1636                    drop(accounts);
1637                    Self::send_stack_notification(
1638                        &self.deps.delivery,
1639                        &notification_arns,
1640                        &stack_name_for_notif,
1641                        &stack_id,
1642                        "UPDATE_FAILED",
1643                    );
1644                    return Err(AwsServiceError::aws_error(
1645                        StatusCode::BAD_REQUEST,
1646                        "InsufficientCapabilitiesException",
1647                        error_msg,
1648                    ));
1649                }
1650            };
1651
1652            drop(accounts);
1653            (
1654                touched_types,
1655                stack_id,
1656                stack_name_for_notif,
1657                notification_arns,
1658                resources_snapshot,
1659            )
1660        };
1661
1662        let outputs = Self::resolve_template_outputs(
1663            &input.template_body,
1664            &input.parameters,
1665            &resources_snapshot,
1666            &self.state,
1667        );
1668        Self::ensure_export_uniqueness(&self.state, &req.account_id, &input.stack_name, &outputs)?;
1669        {
1670            let mut accounts = self.state.write();
1671            let state = accounts.get_or_create(&req.account_id);
1672            if let Some(stack) = state
1673                .stacks
1674                .values_mut()
1675                .find(|s| s.stack_id == stack_id && s.status != "DELETE_COMPLETE")
1676            {
1677                stack.outputs = outputs.clone();
1678            }
1679            Self::sync_exports_imports(
1680                state,
1681                &stack_id,
1682                &input.stack_name,
1683                &outputs,
1684                &imported_names,
1685            );
1686        }
1687
1688        Self::send_stack_notification(
1689            &self.deps.delivery,
1690            &notification_arns,
1691            &stack_name_for_notif,
1692            &stack_id,
1693            "UPDATE_COMPLETE",
1694        );
1695
1696        // Persist every snapshot-backed service the update touched, so created
1697        // or deleted resources are reflected on disk after a restart.
1698        persist_touched_services(&self.snapshot_hooks, touched_types).await;
1699
1700        Ok(AwsResponse::xml(
1701            StatusCode::OK,
1702            xml_responses::update_stack_response(&stack_id, &req.request_id),
1703        ))
1704    }
1705
1706    fn get_template(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1707        // GetTemplate has no `@required` members in Smithy; tolerate omission.
1708        let stack_name = Self::get_param(req, "StackName").unwrap_or_default();
1709
1710        let accounts = self.state.read();
1711        let empty = CloudFormationState::new(&req.account_id, &req.region);
1712        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1713        // Stack-not-found has no declared shape on GetTemplate
1714        // (`ChangeSetNotFoundException` is the only declared error). Return
1715        // an empty template body rather than emit an undeclared
1716        // `ValidationError` for synthetic conformance inputs.
1717        let body = state
1718            .stacks
1719            .values()
1720            .find(|s| {
1721                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1722            })
1723            .map(|s| s.template.clone())
1724            .unwrap_or_default();
1725
1726        Ok(AwsResponse::xml(
1727            StatusCode::OK,
1728            xml_responses::get_template_response(&body, &req.request_id),
1729        ))
1730    }
1731}
1732
1733#[async_trait]
1734impl AwsService for CloudFormationService {
1735    fn service_name(&self) -> &str {
1736        "cloudformation"
1737    }
1738
1739    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1740        let action = req.action.as_str();
1741
1742        // Validate scalar field constraints against the Smithy model
1743        // before dispatching. Per-handler logic still owns business
1744        // validation (cross-field checks, parsing, existence). This
1745        // catches length / range / enum violations uniformly so every
1746        // operation returns a `ValidationError` instead of `200 OK` on
1747        // malformed scalars.
1748        crate::input_constraints::validate_input(action, &Self::get_all_params(&req))?;
1749
1750        // Only ops whose handlers actually write to per-account state
1751        // need to trigger snapshot persistence. Pass-through ops that
1752        // return canned IDs but don't touch state are excluded.
1753        let mutates = matches!(
1754            action,
1755            "CreateStack"
1756                | "DeleteStack"
1757                | "UpdateStack"
1758                | "CreateChangeSet"
1759                | "DeleteChangeSet"
1760                | "ExecuteChangeSet"
1761                | "CreateStackSet"
1762                | "DeleteStackSet"
1763                | "CreateStackRefactor"
1764                | "CreateGeneratedTemplate"
1765                | "DeleteGeneratedTemplate"
1766                | "SetStackPolicy"
1767                | "UpdateTerminationProtection"
1768                | "ActivateOrganizationsAccess"
1769                | "DeactivateOrganizationsAccess"
1770        );
1771        let result = match action {
1772            "CreateStack" => self.create_stack(&req).await,
1773            "DeleteStack" => self.delete_stack(&req).await,
1774            "DescribeStacks" => self.describe_stacks(&req),
1775            "ListStacks" => self.list_stacks(&req),
1776            "ListStackResources" => self.list_stack_resources(&req),
1777            "DescribeStackResources" => self.describe_stack_resources(&req),
1778            "UpdateStack" => self.update_stack(&req).await,
1779            "GetTemplate" => self.get_template(&req),
1780            _ => self.handle_extra_action(&req),
1781        };
1782        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
1783            self.save_snapshot().await;
1784        }
1785        // ExecuteChangeSet provisions resources by mutating each service's
1786        // shared state directly but -- unlike CreateStack/UpdateStack/DeleteStack
1787        // -- never triggered the per-service snapshot hook, so the provisioned
1788        // resources were never written through to disk (#1766 class). `cdk
1789        // deploy`, `aws cloudformation deploy`, and SAM all provision via
1790        // CreateChangeSet + ExecuteChangeSet (not CreateStack), so without this
1791        // their resources reported CREATE_COMPLETE yet vanished on restart.
1792        // save_snapshot above only persists CloudFormation's own stack metadata;
1793        // flush every snapshot-backed service the changeset could have touched.
1794        if action == "ExecuteChangeSet"
1795            && matches!(result.as_ref(), Ok(resp) if resp.status.is_success())
1796        {
1797            for hook in self.snapshot_hooks.values() {
1798                hook().await;
1799            }
1800        }
1801        result
1802    }
1803
1804    fn supported_actions(&self) -> &[&str] {
1805        &[
1806            "ActivateOrganizationsAccess",
1807            "ActivateType",
1808            "BatchDescribeTypeConfigurations",
1809            "CancelUpdateStack",
1810            "ContinueUpdateRollback",
1811            "CreateChangeSet",
1812            "CreateGeneratedTemplate",
1813            "CreateStack",
1814            "CreateStackInstances",
1815            "CreateStackRefactor",
1816            "CreateStackSet",
1817            "DeactivateOrganizationsAccess",
1818            "DeactivateType",
1819            "DeleteChangeSet",
1820            "DeleteGeneratedTemplate",
1821            "DeleteStack",
1822            "DeleteStackInstances",
1823            "DeleteStackSet",
1824            "DeregisterType",
1825            "DescribeAccountLimits",
1826            "DescribeChangeSet",
1827            "DescribeChangeSetHooks",
1828            "DescribeEvents",
1829            "DescribeGeneratedTemplate",
1830            "DescribeOrganizationsAccess",
1831            "DescribePublisher",
1832            "DescribeResourceScan",
1833            "DescribeStackDriftDetectionStatus",
1834            "DescribeStackEvents",
1835            "DescribeStackInstance",
1836            "DescribeStackRefactor",
1837            "DescribeStackResource",
1838            "DescribeStackResourceDrifts",
1839            "DescribeStackResources",
1840            "DescribeStackSet",
1841            "DescribeStackSetOperation",
1842            "DescribeStacks",
1843            "DescribeType",
1844            "DescribeTypeRegistration",
1845            "DetectStackDrift",
1846            "DetectStackResourceDrift",
1847            "DetectStackSetDrift",
1848            "EstimateTemplateCost",
1849            "ExecuteChangeSet",
1850            "ExecuteStackRefactor",
1851            "GetGeneratedTemplate",
1852            "GetHookResult",
1853            "GetStackPolicy",
1854            "GetTemplate",
1855            "GetTemplateSummary",
1856            "ImportStacksToStackSet",
1857            "ListChangeSets",
1858            "ListExports",
1859            "ListGeneratedTemplates",
1860            "ListHookResults",
1861            "ListImports",
1862            "ListResourceScanRelatedResources",
1863            "ListResourceScanResources",
1864            "ListResourceScans",
1865            "ListStackInstanceResourceDrifts",
1866            "ListStackInstances",
1867            "ListStackRefactorActions",
1868            "ListStackRefactors",
1869            "ListStackResources",
1870            "ListStackSetAutoDeploymentTargets",
1871            "ListStackSetOperationResults",
1872            "ListStackSetOperations",
1873            "ListStackSets",
1874            "ListStacks",
1875            "ListTypeRegistrations",
1876            "ListTypeVersions",
1877            "ListTypes",
1878            "PublishType",
1879            "RecordHandlerProgress",
1880            "RegisterPublisher",
1881            "RegisterType",
1882            "RollbackStack",
1883            "SetStackPolicy",
1884            "SetTypeConfiguration",
1885            "SetTypeDefaultVersion",
1886            "SignalResource",
1887            "StartResourceScan",
1888            "StopStackSetOperation",
1889            "TestType",
1890            "UpdateGeneratedTemplate",
1891            "UpdateStack",
1892            "UpdateStackInstances",
1893            "UpdateStackSet",
1894            "UpdateTerminationProtection",
1895            "ValidateTemplate",
1896        ]
1897    }
1898}
1899
1900/// Parsed + validated inputs for `UpdateStack`.
1901struct UpdateStackInput {
1902    stack_name: String,
1903    template_body: String,
1904    parameters: BTreeMap<String, String>,
1905    tags: BTreeMap<String, String>,
1906    notification_arns: Vec<String>,
1907}
1908
1909impl UpdateStackInput {
1910    fn from_params(req: &AwsRequest) -> Result<Self, AwsServiceError> {
1911        let params = CloudFormationService::get_all_params(req);
1912
1913        let stack_name = params
1914            .get("StackName")
1915            .ok_or_else(|| {
1916                AwsServiceError::aws_error(
1917                    StatusCode::BAD_REQUEST,
1918                    "ValidationError",
1919                    "StackName is required",
1920                )
1921            })?
1922            .to_string();
1923
1924        // TemplateBody isn't `@required` in Smithy (TemplateURL +
1925        // UsePreviousTemplate are alternatives). Treat omission as an
1926        // empty body so synthetic conformance inputs don't trip an
1927        // undeclared `ValidationError`.
1928        let template_body = params.get("TemplateBody").cloned().unwrap_or_default();
1929
1930        let mut parameters = CloudFormationService::extract_parameters(&params);
1931        CloudFormationService::merge_parameter_defaults(&mut parameters, &template_body);
1932        Ok(Self {
1933            stack_name,
1934            template_body,
1935            parameters,
1936            tags: CloudFormationService::extract_tags(&params),
1937            notification_arns: CloudFormationService::extract_notification_arns(&params),
1938        })
1939    }
1940}
1941
1942/// One row of structured diff returned by `apply_resource_updates`. Used
1943/// by `ExecuteChangeSet` to emit `StackEvent` rows so `DescribeStackEvents`
1944/// reflects the resources actually created / updated / deleted.
1945#[derive(Debug, Clone)]
1946pub(crate) struct ResourceChange {
1947    pub action: ResourceChangeAction,
1948    pub logical_id: String,
1949    pub physical_id: String,
1950    pub resource_type: String,
1951}
1952
1953#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1954pub(crate) enum ResourceChangeAction {
1955    Create,
1956    Update,
1957    Delete,
1958}
1959
1960impl ResourceChangeAction {
1961    pub fn status_in_progress(self) -> &'static str {
1962        match self {
1963            Self::Create => "CREATE_IN_PROGRESS",
1964            Self::Update => "UPDATE_IN_PROGRESS",
1965            Self::Delete => "DELETE_IN_PROGRESS",
1966        }
1967    }
1968    pub fn status_complete(self) -> &'static str {
1969        match self {
1970            Self::Create => "CREATE_COMPLETE",
1971            Self::Update => "UPDATE_COMPLETE",
1972            Self::Delete => "DELETE_COMPLETE",
1973        }
1974    }
1975}
1976
1977/// Apply resource updates: delete removed resources, create new ones, and
1978/// in-place update resources whose properties changed. Returns the list of
1979/// changes applied (for event emission) on success or `Err(msg)` if any
1980/// resource operation fails.
1981pub(crate) fn apply_resource_updates(
1982    stack: &mut crate::state::Stack,
1983    new_resource_defs: &[template::ResourceDefinition],
1984    template_body: &str,
1985    parameters: &BTreeMap<String, String>,
1986    provisioner: &crate::resource_provisioner::ResourceProvisioner,
1987    imports: &BTreeMap<String, String>,
1988) -> Result<Vec<ResourceChange>, String> {
1989    let mut changes: Vec<ResourceChange> = Vec::new();
1990    let old_logical_ids: std::collections::HashSet<String> = stack
1991        .resources
1992        .iter()
1993        .map(|r| r.logical_id.clone())
1994        .collect();
1995    let new_logical_ids: std::collections::HashSet<String> = new_resource_defs
1996        .iter()
1997        .map(|r| r.logical_id.clone())
1998        .collect();
1999
2000    // Delete resources no longer in template
2001    let to_remove: Vec<_> = stack
2002        .resources
2003        .iter()
2004        .filter(|r| !new_logical_ids.contains(&r.logical_id))
2005        .cloned()
2006        .collect();
2007    for resource in &to_remove {
2008        let _ = provisioner.delete_resource(resource);
2009        changes.push(ResourceChange {
2010            action: ResourceChangeAction::Delete,
2011            logical_id: resource.logical_id.clone(),
2012            physical_id: resource.physical_id.clone(),
2013            resource_type: resource.resource_type.clone(),
2014        });
2015    }
2016    stack
2017        .resources
2018        .retain(|r| new_logical_ids.contains(&r.logical_id));
2019
2020    // Build physical ID + attribute maps from existing resources
2021    let mut physical_ids: BTreeMap<String, String> = stack
2022        .resources
2023        .iter()
2024        .map(|r| (r.logical_id.clone(), r.physical_id.clone()))
2025        .collect();
2026    let mut attributes: BTreeMap<String, BTreeMap<String, String>> = stack
2027        .resources
2028        .iter()
2029        .map(|r| (r.logical_id.clone(), r.attributes.clone()))
2030        .collect();
2031
2032    // Create new resources / update resources that already exist. Provision in
2033    // dependency order so a `Ref`/`GetAtt`/`Fn::Sub`/`DependsOn` to another
2034    // resource resolves to that resource's physical id rather than its bare
2035    // logical id (which would otherwise get baked into derived state — e.g. a
2036    // Step Functions ASL referencing a Lambda declared later in the template).
2037    let order = template::dependency_order(template_body, parameters, new_resource_defs);
2038    for &idx in &order {
2039        let resource_def = &new_resource_defs[idx];
2040        let resolved_def = template::resolve_resource_properties_with_attrs(
2041            resource_def,
2042            template_body,
2043            parameters,
2044            &physical_ids,
2045            &attributes,
2046            imports,
2047        )
2048        .map_err(|e| {
2049            format!(
2050                "Failed to resolve resource {}: {e}",
2051                resource_def.logical_id
2052            )
2053        })?;
2054
2055        if !old_logical_ids.contains(&resource_def.logical_id) {
2056            match provisioner.create_resource(&resolved_def) {
2057                Ok(stack_resource) => {
2058                    changes.push(ResourceChange {
2059                        action: ResourceChangeAction::Create,
2060                        logical_id: stack_resource.logical_id.clone(),
2061                        physical_id: stack_resource.physical_id.clone(),
2062                        resource_type: stack_resource.resource_type.clone(),
2063                    });
2064                    physical_ids.insert(
2065                        stack_resource.logical_id.clone(),
2066                        stack_resource.physical_id.clone(),
2067                    );
2068                    attributes.insert(
2069                        stack_resource.logical_id.clone(),
2070                        stack_resource.attributes.clone(),
2071                    );
2072                    stack.resources.push(stack_resource);
2073                }
2074                Err(e) => {
2075                    tracing::warn!(
2076                        "Failed to create resource {} during update: {e}",
2077                        resource_def.logical_id
2078                    );
2079                    return Err(format!(
2080                        "Failed to create resource {}: {e}",
2081                        resource_def.logical_id
2082                    ));
2083                }
2084            }
2085        } else {
2086            // Resource exists in both old and new templates — try to apply
2087            // an in-place update. The provisioner returns `Ok(None)` for
2088            // resource types that don't support updates yet; in that case
2089            // the existing resource stays as-is so the rest of the stack
2090            // continues to validate.
2091            let existing = stack
2092                .resources
2093                .iter()
2094                .find(|r| r.logical_id == resource_def.logical_id)
2095                .cloned();
2096            if let Some(existing) = existing {
2097                match provisioner.update_resource(&existing, &resolved_def) {
2098                    Ok(Some(updated)) => {
2099                        changes.push(ResourceChange {
2100                            action: ResourceChangeAction::Update,
2101                            logical_id: updated.logical_id.clone(),
2102                            physical_id: updated.physical_id.clone(),
2103                            resource_type: updated.resource_type.clone(),
2104                        });
2105                        physical_ids
2106                            .insert(updated.logical_id.clone(), updated.physical_id.clone());
2107                        attributes.insert(updated.logical_id.clone(), updated.attributes.clone());
2108                        if let Some(slot) = stack
2109                            .resources
2110                            .iter_mut()
2111                            .find(|r| r.logical_id == updated.logical_id)
2112                        {
2113                            *slot = updated;
2114                        }
2115                    }
2116                    Ok(None) => {
2117                        // Resource type has no update path — leave the
2118                        // existing physical resource untouched.
2119                    }
2120                    Err(e) => {
2121                        tracing::warn!(
2122                            "Failed to update resource {} during update: {e}",
2123                            resource_def.logical_id
2124                        );
2125                        return Err(format!(
2126                            "Failed to update resource {}: {e}",
2127                            resource_def.logical_id
2128                        ));
2129                    }
2130                }
2131            }
2132        }
2133    }
2134
2135    Ok(changes)
2136}
2137
2138/// Pushes a single `StackEvent` row onto the per-stack event log so
2139/// `DescribeStackEvents` returns a chronological history of resource and
2140/// stack-level state transitions.
2141pub(crate) fn record_event(
2142    state: &mut crate::state::CloudFormationState,
2143    stack_id: &str,
2144    stack_name: &str,
2145    logical_id: &str,
2146    physical_id: &str,
2147    resource_type: &str,
2148    status: &str,
2149) {
2150    use serde_json::json;
2151    let event_id = format!(
2152        "{}-{:x}",
2153        logical_id,
2154        std::time::SystemTime::now()
2155            .duration_since(std::time::UNIX_EPOCH)
2156            .map(|d| d.as_nanos())
2157            .unwrap_or(0)
2158    );
2159    let log = state.events.entry(stack_id.to_string()).or_default();
2160
2161    // Timestamps must be sub-second AND strictly increasing within a stack's
2162    // event log. `sam`'s deploy-wait reads the REVIEW_IN_PROGRESS marker's
2163    // timestamp and then only registers events whose `Timestamp` is strictly
2164    // greater; a fast stack that provisions within one second would otherwise
2165    // stamp its REVIEW_IN_PROGRESS marker and terminal CREATE_COMPLETE
2166    // identically, so sam never sees completion and polls until it times out.
2167    // Use millisecond precision and bump by 1ms when the clock hasn't advanced
2168    // past the previous event, guaranteeing a strict order.
2169    // Truncate to the stored (millisecond) resolution before comparing, so the
2170    // strict-ordering check isn't fooled by sub-millisecond bits that vanish on
2171    // serialization (two events in the same millisecond would otherwise both
2172    // serialize identically despite `now > prev` holding at full precision).
2173    let now = chrono::DateTime::from_timestamp_millis(Utc::now().timestamp_millis())
2174        .unwrap_or_else(Utc::now);
2175    let timestamp = match log.last().and_then(|e| e["Timestamp"].as_str()) {
2176        Some(prev) => match chrono::DateTime::parse_from_rfc3339(prev) {
2177            Ok(prev) => {
2178                let prev = prev.with_timezone(&Utc);
2179                if now > prev {
2180                    now
2181                } else {
2182                    prev + chrono::Duration::milliseconds(1)
2183                }
2184            }
2185            Err(_) => now,
2186        },
2187        None => now,
2188    };
2189
2190    log.push(json!({
2191        "EventId": event_id,
2192        "StackId": stack_id,
2193        "StackName": stack_name,
2194        "LogicalResourceId": logical_id,
2195        "PhysicalResourceId": physical_id,
2196        "ResourceType": resource_type,
2197        "ResourceStatus": status,
2198        "Timestamp": timestamp.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
2199    }));
2200}
2201
2202/// Emits IN_PROGRESS + COMPLETE event pairs for every resource change
2203/// applied during an update. Mirrors the event sequence real CloudFormation
2204/// publishes during `ExecuteChangeSet` / `UpdateStack`.
2205/// Persist the CloudFormation snapshot from a detached task. Mirrors
2206/// `CloudFormationService::save_snapshot` but takes owned handles so it can
2207/// run inside the background CreateStack provisioning task (see RDS's
2208/// `save_snapshot_static`).
2209async fn save_snapshot_static(
2210    state: SharedCloudFormationState,
2211    store: Option<Arc<dyn SnapshotStore>>,
2212    lock: Arc<AsyncMutex<()>>,
2213) {
2214    let Some(store) = store else {
2215        return;
2216    };
2217    let _guard = lock.lock().await;
2218    let snapshot = CloudFormationSnapshot {
2219        schema_version: CLOUDFORMATION_SNAPSHOT_SCHEMA_VERSION,
2220        state: None,
2221        accounts: Some(state.read().clone()),
2222    };
2223    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
2224        let bytes = serde_json::to_vec(&snapshot)
2225            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
2226        store.save(&bytes)
2227    })
2228    .await;
2229    match join {
2230        Ok(Ok(())) => {}
2231        Ok(Err(err)) => tracing::error!(%err, "failed to write cloudformation snapshot"),
2232        Err(err) => tracing::error!(%err, "cloudformation snapshot task panicked"),
2233    }
2234}
2235
2236pub(crate) fn record_stack_events(
2237    state: &mut crate::state::CloudFormationState,
2238    stack_id: &str,
2239    stack_name: &str,
2240    changes: &[ResourceChange],
2241) {
2242    for ch in changes {
2243        record_event(
2244            state,
2245            stack_id,
2246            stack_name,
2247            &ch.logical_id,
2248            &ch.physical_id,
2249            &ch.resource_type,
2250            ch.action.status_in_progress(),
2251        );
2252        record_event(
2253            state,
2254            stack_id,
2255            stack_name,
2256            &ch.logical_id,
2257            &ch.physical_id,
2258            &ch.resource_type,
2259            ch.action.status_complete(),
2260        );
2261    }
2262}
2263
2264/// Emits a stack-level lifecycle event (`UPDATE_IN_PROGRESS`,
2265/// `UPDATE_COMPLETE`, `UPDATE_ROLLBACK_COMPLETE`, etc.) keyed on the
2266/// stack's own `LogicalResourceId == stack_name`, matching real CFN.
2267pub(crate) fn record_stack_status_event(
2268    state: &mut crate::state::CloudFormationState,
2269    stack_id: &str,
2270    stack_name: &str,
2271    resource_type: &str,
2272    status: &str,
2273) {
2274    record_event(
2275        state,
2276        stack_id,
2277        stack_name,
2278        stack_name,
2279        stack_id,
2280        resource_type,
2281        status,
2282    );
2283}
2284
2285#[cfg(test)]
2286mod tests {
2287    use super::*;
2288    use http::HeaderMap;
2289    use parking_lot::RwLock;
2290    use std::collections::HashMap;
2291    use std::sync::Arc;
2292
2293    #[test]
2294    fn merge_parameter_defaults_fills_omitted_params() {
2295        // §1.6: a parameter the caller omitted gets its declared Default, so a
2296        // Ref to it resolves to the default instead of the bare param name.
2297        let template = r#"{
2298            "Parameters": {
2299                "InstanceType": {"Type": "String", "Default": "t3.micro"},
2300                "Count": {"Type": "Number", "Default": 3},
2301                "Supplied": {"Type": "String", "Default": "dflt"}
2302            },
2303            "Resources": {}
2304        }"#;
2305        let mut params = BTreeMap::new();
2306        params.insert("Supplied".to_string(), "override".to_string());
2307        CloudFormationService::merge_parameter_defaults(&mut params, template);
2308        assert_eq!(
2309            params.get("InstanceType").map(String::as_str),
2310            Some("t3.micro")
2311        );
2312        assert_eq!(params.get("Count").map(String::as_str), Some("3"));
2313        // A supplied value is not overwritten by the default.
2314        assert_eq!(params.get("Supplied").map(String::as_str), Some("override"));
2315    }
2316
2317    fn make_service() -> CloudFormationService {
2318        let cf_state = Arc::new(RwLock::new(
2319            fakecloud_core::multi_account::MultiAccountState::new(
2320                "123456789012",
2321                "us-east-1",
2322                "http://localhost:4566",
2323            ),
2324        ));
2325        let deps = CloudFormationDeps {
2326            sqs: Arc::new(RwLock::new(
2327                fakecloud_core::multi_account::MultiAccountState::new(
2328                    "123456789012",
2329                    "us-east-1",
2330                    "http://localhost:4566",
2331                ),
2332            )),
2333            sns: Arc::new(RwLock::new(
2334                fakecloud_core::multi_account::MultiAccountState::new(
2335                    "123456789012",
2336                    "us-east-1",
2337                    "http://localhost:4566",
2338                ),
2339            )),
2340            ssm: Arc::new(RwLock::new(
2341                fakecloud_core::multi_account::MultiAccountState::new(
2342                    "123456789012",
2343                    "us-east-1",
2344                    "http://localhost:4566",
2345                ),
2346            )),
2347            iam: Arc::new(RwLock::new(
2348                fakecloud_core::multi_account::MultiAccountState::new(
2349                    "123456789012",
2350                    "us-east-1",
2351                    "",
2352                ),
2353            )),
2354            s3: Arc::new(RwLock::new(
2355                fakecloud_core::multi_account::MultiAccountState::new(
2356                    "123456789012",
2357                    "us-east-1",
2358                    "",
2359                ),
2360            )),
2361            eventbridge: Arc::new(RwLock::new(
2362                fakecloud_core::multi_account::MultiAccountState::new(
2363                    "123456789012",
2364                    "us-east-1",
2365                    "",
2366                ),
2367            )),
2368            dynamodb: Arc::new(RwLock::new(
2369                fakecloud_core::multi_account::MultiAccountState::new(
2370                    "123456789012",
2371                    "us-east-1",
2372                    "",
2373                ),
2374            )),
2375            logs: Arc::new(RwLock::new(
2376                fakecloud_core::multi_account::MultiAccountState::new(
2377                    "123456789012",
2378                    "us-east-1",
2379                    "",
2380                ),
2381            )),
2382            lambda: Arc::new(RwLock::new(
2383                fakecloud_core::multi_account::MultiAccountState::new(
2384                    "123456789012",
2385                    "us-east-1",
2386                    "",
2387                ),
2388            )),
2389            secretsmanager: Arc::new(RwLock::new(
2390                fakecloud_core::multi_account::MultiAccountState::new(
2391                    "123456789012",
2392                    "us-east-1",
2393                    "",
2394                ),
2395            )),
2396            kinesis: Arc::new(RwLock::new(
2397                fakecloud_core::multi_account::MultiAccountState::new(
2398                    "123456789012",
2399                    "us-east-1",
2400                    "",
2401                ),
2402            )),
2403            kms: Arc::new(RwLock::new(
2404                fakecloud_core::multi_account::MultiAccountState::new(
2405                    "123456789012",
2406                    "us-east-1",
2407                    "",
2408                ),
2409            )),
2410            ecr: Arc::new(RwLock::new(
2411                fakecloud_core::multi_account::MultiAccountState::new(
2412                    "123456789012",
2413                    "us-east-1",
2414                    "",
2415                ),
2416            )),
2417            cloudwatch: Arc::new(RwLock::new(fakecloud_cloudwatch::CloudWatchAccounts::new())),
2418            elbv2: Arc::new(RwLock::new(fakecloud_elbv2::Elbv2Accounts::new())),
2419            organizations: Arc::new(RwLock::new(None)),
2420            cognito: Arc::new(RwLock::new(
2421                fakecloud_core::multi_account::MultiAccountState::new(
2422                    "123456789012",
2423                    "us-east-1",
2424                    "",
2425                ),
2426            )),
2427            rds: Arc::new(RwLock::new(
2428                fakecloud_core::multi_account::MultiAccountState::new(
2429                    "123456789012",
2430                    "us-east-1",
2431                    "",
2432                ),
2433            )),
2434            ec2: Arc::new(RwLock::new(
2435                fakecloud_core::multi_account::MultiAccountState::new(
2436                    "123456789012",
2437                    "us-east-1",
2438                    "",
2439                ),
2440            )),
2441            ecs: Arc::new(RwLock::new(
2442                fakecloud_core::multi_account::MultiAccountState::new(
2443                    "123456789012",
2444                    "us-east-1",
2445                    "",
2446                ),
2447            )),
2448            acm: Arc::new(RwLock::new(fakecloud_acm::AcmAccounts::new())),
2449            elasticache: Arc::new(RwLock::new(
2450                fakecloud_core::multi_account::MultiAccountState::new(
2451                    "123456789012",
2452                    "us-east-1",
2453                    "",
2454                ),
2455            )),
2456            route53: Arc::new(RwLock::new(fakecloud_route53::Route53Accounts::new())),
2457            cloudfront: Arc::new(RwLock::new(fakecloud_cloudfront::CloudFrontAccounts::new())),
2458            stepfunctions: Arc::new(RwLock::new(
2459                fakecloud_core::multi_account::MultiAccountState::new(
2460                    "123456789012",
2461                    "us-east-1",
2462                    "",
2463                ),
2464            )),
2465            wafv2: Arc::new(RwLock::new(fakecloud_wafv2::Wafv2Accounts::default())),
2466            apigateway: Arc::new(RwLock::new(
2467                fakecloud_core::multi_account::MultiAccountState::new(
2468                    "123456789012",
2469                    "us-east-1",
2470                    "",
2471                ),
2472            )),
2473            apigatewayv2: Arc::new(RwLock::new(
2474                fakecloud_core::multi_account::MultiAccountState::new(
2475                    "123456789012",
2476                    "us-east-1",
2477                    "",
2478                ),
2479            )),
2480            ses: Arc::new(RwLock::new(
2481                fakecloud_core::multi_account::MultiAccountState::new(
2482                    "123456789012",
2483                    "us-east-1",
2484                    "",
2485                ),
2486            )),
2487            application_autoscaling: Arc::new(parking_lot::RwLock::new(
2488                fakecloud_application_autoscaling::ApplicationAutoScalingAccounts::new(),
2489            )),
2490            athena: Arc::new(parking_lot::RwLock::new(
2491                fakecloud_athena::AthenaAccounts::new(),
2492            )),
2493            firehose: Arc::new(parking_lot::RwLock::new(
2494                fakecloud_firehose::FirehoseAccounts::new(),
2495            )),
2496            glue: Arc::new(parking_lot::RwLock::new(fakecloud_glue::GlueAccounts::new())),
2497            delivery: Arc::new(DeliveryBus::new()),
2498            lambda_runtime: None,
2499        };
2500        CloudFormationService::new(cf_state, deps)
2501    }
2502
2503    fn make_request(action: &str, params: HashMap<String, String>) -> AwsRequest {
2504        AwsRequest {
2505            service: "cloudformation".to_string(),
2506            action: action.to_string(),
2507            region: "us-east-1".to_string(),
2508            account_id: "123456789012".to_string(),
2509            request_id: "test-request-id".to_string(),
2510            headers: HeaderMap::new(),
2511            query_params: params,
2512            body: bytes::Bytes::new(),
2513            body_stream: parking_lot::Mutex::new(None),
2514            path_segments: vec![],
2515            raw_path: "/".to_string(),
2516            raw_query: String::new(),
2517            method: http::Method::POST,
2518            is_query_protocol: true,
2519            access_key_id: None,
2520            principal: None,
2521        }
2522    }
2523
2524    #[tokio::test]
2525    async fn update_stack_sets_failed_status_on_resource_error() {
2526        let svc = make_service();
2527
2528        // Create a stack with just a queue
2529        let mut create_params = HashMap::new();
2530        create_params.insert("StackName".to_string(), "test-stack".to_string());
2531        create_params.insert(
2532            "TemplateBody".to_string(),
2533            r#"{"Resources":{"MyQueue":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"q1"}}}}"#.to_string(),
2534        );
2535        let req = make_request("CreateStack", create_params);
2536        let result = svc.create_stack(&req).await;
2537        assert!(result.is_ok());
2538
2539        // Update stack adding an SNS subscription with a non-existent topic
2540        let mut update_params = HashMap::new();
2541        update_params.insert("StackName".to_string(), "test-stack".to_string());
2542        update_params.insert(
2543            "TemplateBody".to_string(),
2544            r#"{"Resources":{"MyQueue":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"q1"}},"BadSub":{"Type":"AWS::SNS::Subscription","Properties":{"TopicArn":"arn:aws:sns:us-east-1:123456789012:nope","Protocol":"sqs","Endpoint":"arn:aws:sqs:us-east-1:123456789012:q1"}}}}"#.to_string(),
2545        );
2546        let req = make_request("UpdateStack", update_params);
2547        let result = svc.update_stack(&req).await;
2548
2549        // Should return an error
2550        assert!(result.is_err());
2551
2552        // Stack status should be UPDATE_ROLLBACK_COMPLETE — matches the
2553        // terminal status real CloudFormation lands on after a failed
2554        // update attempt that gets rolled back.
2555        let accounts = svc.state.read();
2556        let state = accounts.get("123456789012").unwrap();
2557        let stack = state.stacks.get("test-stack").unwrap();
2558        assert_eq!(stack.status, "UPDATE_ROLLBACK_COMPLETE");
2559    }
2560
2561    #[tokio::test]
2562    async fn create_stack_resolves_ref_to_physical_id() {
2563        let svc = make_service();
2564
2565        // Template where subscription Refs the topic
2566        let template = r#"{
2567            "Resources": {
2568                "MyTopic": {
2569                    "Type": "AWS::SNS::Topic",
2570                    "Properties": { "TopicName": "ref-test-topic" }
2571                },
2572                "MySub": {
2573                    "Type": "AWS::SNS::Subscription",
2574                    "Properties": {
2575                        "TopicArn": { "Ref": "MyTopic" },
2576                        "Protocol": "sqs",
2577                        "Endpoint": "arn:aws:sqs:us-east-1:123456789012:some-queue"
2578                    }
2579                }
2580            }
2581        }"#;
2582
2583        let mut params = HashMap::new();
2584        params.insert("StackName".to_string(), "ref-stack".to_string());
2585        params.insert("TemplateBody".to_string(), template.to_string());
2586        let req = make_request("CreateStack", params);
2587        let result = svc.create_stack(&req).await;
2588        assert!(result.is_ok(), "CreateStack failed: {:?}", result.err());
2589
2590        // Verify both resources were created
2591        let accounts = svc.state.read();
2592        let state = accounts.get("123456789012").unwrap();
2593        let stack = state.stacks.get("ref-stack").unwrap();
2594        assert_eq!(stack.resources.len(), 2);
2595        assert_eq!(stack.status, "CREATE_COMPLETE");
2596
2597        // The subscription's physical ID should be an ARN (not just "MyTopic")
2598        let sub = stack
2599            .resources
2600            .iter()
2601            .find(|r| r.logical_id == "MySub")
2602            .unwrap();
2603        assert!(
2604            sub.physical_id.contains("ref-test-topic"),
2605            "Subscription physical ID should reference the topic ARN, got: {}",
2606            sub.physical_id
2607        );
2608    }
2609
2610    /// On the multi-thread server runtime, a stack containing a custom
2611    /// resource (whose provisioning can block for minutes on a cold Lambda
2612    /// image pull) must NOT be provisioned synchronously inside the request
2613    /// handler — CreateStack returns the StackId immediately and DescribeStacks
2614    /// observes CREATE_IN_PROGRESS -> CREATE_COMPLETE (bug-audit 2026-06-13,
2615    /// 3.1).
2616    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2617    async fn create_stack_custom_resource_provisions_asynchronously() {
2618        let svc = make_service();
2619        let template = r#"{
2620            "Resources": {
2621                "MyCustom": {
2622                    "Type": "Custom::Thing",
2623                    "Properties": {
2624                        "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:handler"
2625                    }
2626                }
2627            }
2628        }"#;
2629        let mut params = HashMap::new();
2630        params.insert("StackName".to_string(), "async-stack".to_string());
2631        params.insert("TemplateBody".to_string(), template.to_string());
2632        let req = make_request("CreateStack", params);
2633
2634        // CreateStack returns promptly with a StackId; provisioning runs in a
2635        // detached background task. The stack is recorded before the task can
2636        // possibly finish, so right after return it is at worst already
2637        // terminal — never a value that proves the handler blocked on the
2638        // (potentially minutes-long) provisioning loop. The key guarantee is
2639        // that the call returns without running the provisioner inline.
2640        let resp = svc
2641            .create_stack(&req)
2642            .await
2643            .expect("create returns StackId");
2644        assert!(resp.status.is_success());
2645        {
2646            let accounts = svc.state.read();
2647            let stack = accounts
2648                .get("123456789012")
2649                .unwrap()
2650                .stacks
2651                .get("async-stack")
2652                .expect("stack seeded synchronously");
2653            assert!(
2654                stack.status == "CREATE_IN_PROGRESS" || stack.status == "CREATE_COMPLETE",
2655                "unexpected status right after create: {}",
2656                stack.status
2657            );
2658        }
2659
2660        // Poll DescribeStacks (via state) until the background task flips the
2661        // stack to its terminal CREATE_COMPLETE status.
2662        let mut status = String::new();
2663        for _ in 0..200 {
2664            {
2665                let accounts = svc.state.read();
2666                if let Some(stack) = accounts
2667                    .get("123456789012")
2668                    .and_then(|s| s.stacks.get("async-stack"))
2669                {
2670                    status = stack.status.clone();
2671                    if status != "CREATE_IN_PROGRESS" {
2672                        break;
2673                    }
2674                }
2675            }
2676            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2677        }
2678        assert_eq!(
2679            status, "CREATE_COMPLETE",
2680            "stack should reach CREATE_COMPLETE"
2681        );
2682
2683        let accounts = svc.state.read();
2684        let stack = accounts
2685            .get("123456789012")
2686            .unwrap()
2687            .stacks
2688            .get("async-stack")
2689            .unwrap();
2690        assert_eq!(stack.resources.len(), 1);
2691        assert_eq!(stack.resources[0].resource_type, "Custom::Thing");
2692    }
2693
2694    #[tokio::test]
2695    async fn output_getatt_resolves_well_known_attribute() {
2696        // An Output that GetAtts a well-known attribute the create handler does
2697        // not eagerly capture (SQS QueueUrl) must resolve to the live value, not
2698        // a `Queue.QueueUrl` placeholder. Regression for bug-hunt 2026-06-25 1.11:
2699        // resolve_template_outputs reads StackResource.attributes, which now
2700        // carries the live get_att overlay applied during provisioning.
2701        let svc = make_service();
2702        let template = r#"{
2703            "Resources": {
2704                "Queue": { "Type": "AWS::SQS::Queue", "Properties": { "QueueName": "out-q" } }
2705            },
2706            "Outputs": {
2707                "Url": { "Value": { "Fn::GetAtt": ["Queue", "QueueUrl"] } }
2708            }
2709        }"#;
2710        let mut params = HashMap::new();
2711        params.insert("StackName".to_string(), "out-stack".to_string());
2712        params.insert("TemplateBody".to_string(), template.to_string());
2713        svc.create_stack(&make_request("CreateStack", params))
2714            .await
2715            .expect("create returns StackId");
2716
2717        let mut url = String::new();
2718        for _ in 0..200 {
2719            {
2720                let accounts = svc.state.read();
2721                if let Some(stack) = accounts
2722                    .get("123456789012")
2723                    .and_then(|s| s.stacks.get("out-stack"))
2724                {
2725                    if stack.status != "CREATE_IN_PROGRESS" {
2726                        url = stack
2727                            .outputs
2728                            .iter()
2729                            .find(|o| o.key == "Url")
2730                            .map(|o| o.value.clone())
2731                            .unwrap_or_default();
2732                        break;
2733                    }
2734                }
2735            }
2736            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2737        }
2738        assert!(
2739            url.contains("out-q") && url != "Queue.QueueUrl",
2740            "GetAtt QueueUrl output should resolve to the live url, got {url:?}"
2741        );
2742    }
2743
2744    // ── Service error paths ──
2745
2746    #[tokio::test]
2747    async fn create_stack_missing_name_errors() {
2748        let svc = make_service();
2749        let mut params = HashMap::new();
2750        params.insert("TemplateBody".to_string(), "{}".to_string());
2751        let req = make_request("CreateStack", params);
2752        assert!(svc.create_stack(&req).await.is_err());
2753    }
2754
2755    #[tokio::test]
2756    async fn create_stack_missing_template_creates_empty_stack() {
2757        // `TemplateBody` isn't `@required` in Smithy and CreateStack
2758        // declares no `ValidationError` shape, so missing/placeholder
2759        // bodies now create an empty stack rather than rejecting with
2760        // an undeclared wire code (strict-mode conformance gap).
2761        let svc = make_service();
2762        let mut params = HashMap::new();
2763        params.insert("StackName".to_string(), "s".to_string());
2764        let req = make_request("CreateStack", params);
2765        svc.create_stack(&req)
2766            .await
2767            .expect("empty-body create succeeds");
2768    }
2769
2770    #[tokio::test]
2771    async fn create_stack_duplicate_errors() {
2772        let svc = make_service();
2773        let mut params = HashMap::new();
2774        params.insert("StackName".to_string(), "dup".to_string());
2775        params.insert(
2776            "TemplateBody".to_string(),
2777            r#"{"Resources":{"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"dq"}}}}"#
2778                .to_string(),
2779        );
2780        let req = make_request("CreateStack", params.clone());
2781        svc.create_stack(&req).await.unwrap();
2782        let req = make_request("CreateStack", params);
2783        assert!(svc.create_stack(&req).await.is_err());
2784    }
2785
2786    #[tokio::test]
2787    async fn create_stack_invalid_template_creates_empty_stack() {
2788        // CreateStack's Smithy `errors` list has no `ValidationError`
2789        // shape, so unparseable bodies degrade to an empty parsed
2790        // template instead of raising an undeclared wire code.
2791        let svc = make_service();
2792        let mut params = HashMap::new();
2793        params.insert("StackName".to_string(), "bad".to_string());
2794        params.insert("TemplateBody".to_string(), "not json".to_string());
2795        let req = make_request("CreateStack", params);
2796        svc.create_stack(&req)
2797            .await
2798            .expect("bad-body create succeeds");
2799    }
2800
2801    #[tokio::test]
2802    async fn delete_stack_unknown_is_noop() {
2803        let svc = make_service();
2804        let mut params = HashMap::new();
2805        params.insert("StackName".to_string(), "ghost".to_string());
2806        let req = make_request("DeleteStack", params);
2807        assert!(svc.delete_stack(&req).await.is_ok());
2808    }
2809
2810    #[test]
2811    fn describe_stacks_nonexistent_errors() {
2812        // Querying an explicit, unknown StackName returns AWS's
2813        // `ValidationError: Stack with id <name> does not exist` so deploy
2814        // tools that probe stack existence (SAM, `aws cloudformation
2815        // deploy`) get the signal they expect (issue #1646).
2816        let svc = make_service();
2817        let mut params = HashMap::new();
2818        params.insert("StackName".to_string(), "ghost".to_string());
2819        let req = make_request("DescribeStacks", params);
2820        match svc.describe_stacks(&req) {
2821            Ok(_) => panic!("ghost stack must return an error, not an empty list"),
2822            Err(e) => {
2823                assert_eq!(e.status(), StatusCode::BAD_REQUEST);
2824                assert_eq!(e.code(), "ValidationError");
2825                assert!(
2826                    e.message().contains("does not exist"),
2827                    "got: {}",
2828                    e.message()
2829                );
2830            }
2831        }
2832    }
2833
2834    #[test]
2835    fn describe_stacks_empty_returns_all() {
2836        let svc = make_service();
2837        let req = make_request("DescribeStacks", HashMap::new());
2838        let resp = svc.describe_stacks(&req).unwrap();
2839        let b = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
2840        assert!(b.contains("DescribeStacksResult"));
2841    }
2842
2843    #[test]
2844    fn list_stacks_empty_returns_ok() {
2845        let svc = make_service();
2846        let req = make_request("ListStacks", HashMap::new());
2847        let resp = svc.list_stacks(&req).unwrap();
2848        let b = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
2849        assert!(b.contains("ListStacksResult"));
2850    }
2851
2852    #[test]
2853    fn list_stack_resources_missing_name_returns_validation_error() {
2854        // ListStackResources declares no `errors` in Smithy, so any
2855        // AWS-shaped 4xx counts as a handler response. We reject an
2856        // omitted StackName with `ValidationError` to keep negative
2857        // conformance variants honest; unknown-but-supplied names still
2858        // resolve to an empty list (see the test below).
2859        let svc = make_service();
2860        let req = make_request("ListStackResources", HashMap::new());
2861        let err = match svc.list_stack_resources(&req) {
2862            Err(e) => e,
2863            Ok(_) => panic!("omitted StackName must be rejected"),
2864        };
2865        assert_eq!(err.code(), "ValidationError");
2866    }
2867
2868    #[test]
2869    fn list_stack_resources_unknown_stack_returns_empty() {
2870        let svc = make_service();
2871        let mut params = HashMap::new();
2872        params.insert("StackName".to_string(), "ghost".to_string());
2873        let req = make_request("ListStackResources", params);
2874        svc.list_stack_resources(&req).expect("unknown is empty");
2875    }
2876
2877    #[test]
2878    fn describe_stack_resources_missing_name_returns_empty() {
2879        let svc = make_service();
2880        let req = make_request("DescribeStackResources", HashMap::new());
2881        svc.describe_stack_resources(&req)
2882            .expect("missing name is ok");
2883    }
2884
2885    #[test]
2886    fn get_template_missing_name_returns_empty_body() {
2887        let svc = make_service();
2888        let req = make_request("GetTemplate", HashMap::new());
2889        svc.get_template(&req).expect("missing name is ok");
2890    }
2891
2892    #[test]
2893    fn get_template_unknown_stack_returns_empty_body() {
2894        let svc = make_service();
2895        let mut params = HashMap::new();
2896        params.insert("StackName".to_string(), "ghost".to_string());
2897        let req = make_request("GetTemplate", params);
2898        svc.get_template(&req).expect("unknown is empty");
2899    }
2900
2901    #[tokio::test]
2902    async fn update_stack_missing_name_errors() {
2903        let svc = make_service();
2904        let mut params = HashMap::new();
2905        params.insert("TemplateBody".to_string(), "{}".to_string());
2906        let req = make_request("UpdateStack", params);
2907        assert!(svc.update_stack(&req).await.is_err());
2908    }
2909
2910    #[tokio::test]
2911    async fn update_stack_unknown_stack_returns_synthetic_id() {
2912        // UpdateStack declares only `InsufficientCapabilitiesException`
2913        // and `TokenAlreadyExistsException`, neither of which fits
2914        // "stack does not exist". Synthetic conformance inputs target
2915        // a placeholder stack, so we return a synthetic StackId rather
2916        // than an undeclared `ValidationError`. Real callers create
2917        // the stack first.
2918        let svc = make_service();
2919        let mut params = HashMap::new();
2920        params.insert("StackName".to_string(), "ghost".to_string());
2921        params.insert(
2922            "TemplateBody".to_string(),
2923            r#"{"Resources":{}}"#.to_string(),
2924        );
2925        let req = make_request("UpdateStack", params);
2926        let resp = svc
2927            .update_stack(&req)
2928            .await
2929            .expect("ghost update is synthetic");
2930        let b = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
2931        assert!(b.contains("UpdateStackResult"));
2932    }
2933
2934    #[tokio::test]
2935    async fn create_stack_resolves_outputs_and_records_export() {
2936        let svc = make_service();
2937        let template = r#"{
2938            "Resources": {
2939                "Q": {"Type":"AWS::SQS::Queue","Properties":{"QueueName":"out-q"}}
2940            },
2941            "Outputs": {
2942                "QueueUrl": {
2943                    "Value": {"Ref": "Q"},
2944                    "Description": "Url",
2945                    "Export": {"Name": "TheQueueUrl"}
2946                }
2947            }
2948        }"#;
2949        let mut params = HashMap::new();
2950        params.insert("StackName".to_string(), "outs".to_string());
2951        params.insert("TemplateBody".to_string(), template.to_string());
2952        let req = make_request("CreateStack", params);
2953        svc.create_stack(&req).await.expect("create stack");
2954
2955        let accounts = svc.state.read();
2956        let stack = accounts
2957            .get("123456789012")
2958            .unwrap()
2959            .stacks
2960            .get("outs")
2961            .unwrap();
2962        assert_eq!(stack.outputs.len(), 1);
2963        assert_eq!(stack.outputs[0].key, "QueueUrl");
2964        assert_eq!(stack.outputs[0].export_name.as_deref(), Some("TheQueueUrl"));
2965        assert!(!stack.outputs[0].value.is_empty());
2966    }
2967
2968    #[tokio::test]
2969    async fn create_stack_rejects_duplicate_export_name() {
2970        let svc = make_service();
2971        let mk = |name: &str| {
2972            let template = format!(
2973                r#"{{
2974                    "Resources": {{"Q":{{"Type":"AWS::SQS::Queue","Properties":{{"QueueName":"q-{name}"}}}}}},
2975                    "Outputs": {{"QueueUrl":{{"Value":{{"Ref":"Q"}},"Export":{{"Name":"DupExport"}}}}}}
2976                }}"#
2977            );
2978            let mut params = HashMap::new();
2979            params.insert("StackName".to_string(), name.to_string());
2980            params.insert("TemplateBody".to_string(), template);
2981            make_request("CreateStack", params)
2982        };
2983        match svc.create_stack(&mk("first")).await {
2984            Ok(_) => {}
2985            Err(e) => panic!("first stack: {e:?}"),
2986        }
2987        // The second stack's export collides with the first. Since
2988        // provisioning is now asynchronous, the collision can no longer be a
2989        // synchronous CreateStack error — it surfaces as a failed create.
2990        // On the current-thread test runtime the provisioning task runs
2991        // inline, so the stack is already CREATE_FAILED on return.
2992        svc.create_stack(&mk("second"))
2993            .await
2994            .expect("CreateStack returns StackId even when provisioning fails");
2995        let accounts = svc.state.read();
2996        let stack = accounts
2997            .get("123456789012")
2998            .unwrap()
2999            .stacks
3000            .get("second")
3001            .expect("second stack recorded");
3002        assert_eq!(stack.status, "CREATE_FAILED");
3003        // The first stack keeps the export.
3004        let exports = &accounts.get("123456789012").unwrap().exports;
3005        assert_eq!(
3006            exports
3007                .get("DupExport")
3008                .map(|e| e.exporting_stack_name.as_str()),
3009            Some("first")
3010        );
3011    }
3012
3013    #[tokio::test]
3014    async fn import_value_resolves_against_other_stack_export() {
3015        let svc = make_service();
3016
3017        let producer_tpl = r#"{
3018            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"prod-q"}}},
3019            "Outputs": {"Out":{"Value":{"Ref":"Q"},"Export":{"Name":"SharedQueueUrl"}}}
3020        }"#;
3021        let mut p = HashMap::new();
3022        p.insert("StackName".to_string(), "producer".to_string());
3023        p.insert("TemplateBody".to_string(), producer_tpl.to_string());
3024        svc.create_stack(&make_request("CreateStack", p))
3025            .await
3026            .expect("producer");
3027
3028        let consumer_tpl = r#"{
3029            "Resources": {"Q2":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"cons-q"}}},
3030            "Outputs": {"Imp":{"Value":{"Fn::ImportValue":"SharedQueueUrl"}}}
3031        }"#;
3032        let mut p = HashMap::new();
3033        p.insert("StackName".to_string(), "consumer".to_string());
3034        p.insert("TemplateBody".to_string(), consumer_tpl.to_string());
3035        svc.create_stack(&make_request("CreateStack", p))
3036            .await
3037            .expect("consumer");
3038
3039        let accounts = svc.state.read();
3040        let prod_url = accounts
3041            .get("123456789012")
3042            .unwrap()
3043            .stacks
3044            .get("producer")
3045            .unwrap()
3046            .outputs[0]
3047            .value
3048            .clone();
3049        let cons = accounts
3050            .get("123456789012")
3051            .unwrap()
3052            .stacks
3053            .get("consumer")
3054            .unwrap();
3055        assert_eq!(cons.outputs[0].value, prod_url);
3056    }
3057
3058    #[tokio::test]
3059    async fn create_stack_records_export_in_state_registry() {
3060        let svc = make_service();
3061        let template = r#"{
3062            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"reg-q"}}},
3063            "Outputs": {"Url":{"Value":{"Ref":"Q"},"Export":{"Name":"reg-url"}}}
3064        }"#;
3065        let mut params = HashMap::new();
3066        params.insert("StackName".to_string(), "reg".to_string());
3067        params.insert("TemplateBody".to_string(), template.to_string());
3068        svc.create_stack(&make_request("CreateStack", params))
3069            .await
3070            .expect("create");
3071
3072        let accounts = svc.state.read();
3073        let state = accounts.get("123456789012").unwrap();
3074        let export = state
3075            .exports
3076            .get("reg-url")
3077            .expect("export registered in state.exports");
3078        assert_eq!(export.exporting_stack_name, "reg");
3079        assert!(!export.value.is_empty());
3080        assert!(export.exporting_stack_id.contains("reg"));
3081    }
3082
3083    #[tokio::test]
3084    async fn import_value_with_unknown_export_errors() {
3085        let svc = make_service();
3086        let consumer_tpl = r#"{
3087            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{
3088                "QueueName": {"Fn::ImportValue":"missing-export"}
3089            }}}
3090        }"#;
3091        let mut p = HashMap::new();
3092        p.insert("StackName".to_string(), "bad-consumer".to_string());
3093        p.insert("TemplateBody".to_string(), consumer_tpl.to_string());
3094        match svc.create_stack(&make_request("CreateStack", p)).await {
3095            Ok(_) => panic!("expected ValidationError for unknown export"),
3096            Err(e) => {
3097                let msg = format!("{e:?}");
3098                assert!(msg.contains("No export named missing-export"), "got {msg}");
3099            }
3100        }
3101    }
3102
3103    #[tokio::test]
3104    async fn delete_stack_blocked_when_export_in_use_and_unblocked_after_consumer_delete() {
3105        let svc = make_service();
3106
3107        let producer_tpl = r#"{
3108            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"prod"}}},
3109            "Outputs": {"Out":{"Value":{"Ref":"Q"},"Export":{"Name":"my-arn"}}}
3110        }"#;
3111        let mut p = HashMap::new();
3112        p.insert("StackName".to_string(), "producer".to_string());
3113        p.insert("TemplateBody".to_string(), producer_tpl.to_string());
3114        svc.create_stack(&make_request("CreateStack", p))
3115            .await
3116            .expect("producer");
3117
3118        let consumer_tpl = r#"{
3119            "Resources": {"Q2":{"Type":"AWS::SQS::Queue","Properties":{
3120                "QueueName": "cons-q",
3121                "Tags": [{"Key":"k","Value":{"Fn::ImportValue":"my-arn"}}]
3122            }}}
3123        }"#;
3124        let mut p = HashMap::new();
3125        p.insert("StackName".to_string(), "consumer".to_string());
3126        p.insert("TemplateBody".to_string(), consumer_tpl.to_string());
3127        svc.create_stack(&make_request("CreateStack", p))
3128            .await
3129            .expect("consumer");
3130
3131        // Producer delete must fail while consumer still imports.
3132        let mut p = HashMap::new();
3133        p.insert("StackName".to_string(), "producer".to_string());
3134        match svc.delete_stack(&make_request("DeleteStack", p)).await {
3135            Ok(_) => panic!("delete must fail while imports exist"),
3136            Err(e) => {
3137                let msg = format!("{e:?}");
3138                assert!(msg.contains("Export my-arn cannot be deleted"), "got {msg}");
3139            }
3140        }
3141
3142        // Delete consumer first.
3143        let mut p = HashMap::new();
3144        p.insert("StackName".to_string(), "consumer".to_string());
3145        svc.delete_stack(&make_request("DeleteStack", p))
3146            .await
3147            .expect("consumer delete");
3148
3149        // Now producer delete succeeds.
3150        let mut p = HashMap::new();
3151        p.insert("StackName".to_string(), "producer".to_string());
3152        svc.delete_stack(&make_request("DeleteStack", p))
3153            .await
3154            .expect("producer delete after consumer gone");
3155
3156        let accounts = svc.state.read();
3157        let state = accounts.get("123456789012").unwrap();
3158        assert!(state.exports.is_empty(), "exports cleared after delete");
3159        assert!(state.imports.is_empty(), "imports cleared after delete");
3160    }
3161
3162    // ---- CFN provisioner persistence (issue: CFN resources lost on restart) ----
3163
3164    use std::sync::atomic::{AtomicUsize, Ordering};
3165
3166    /// A snapshot hook that counts how many times it fires, standing in for a
3167    /// real service's whole-state persist.
3168    fn counting_hook(counter: Arc<AtomicUsize>) -> fakecloud_persistence::SnapshotHook {
3169        Arc::new(move || {
3170            let counter = counter.clone();
3171            Box::pin(async move {
3172                counter.fetch_add(1, Ordering::SeqCst);
3173            })
3174        })
3175    }
3176
3177    fn disk_s3_store(tmp: &tempfile::TempDir) -> Arc<fakecloud_persistence::s3::DiskS3Store> {
3178        let cache = Arc::new(fakecloud_persistence::cache::BodyCache::new(1024 * 1024));
3179        Arc::new(fakecloud_persistence::s3::DiskS3Store::new(
3180            tmp.path().to_path_buf(),
3181            cache,
3182        ))
3183    }
3184
3185    // A stack touching SQS + SNS (snapshot-backed) and an S3 bucket (S3Store
3186    // write-through). Lambda is registered as a hook below but NOT in the
3187    // template, so it must not fire -- proving per-service selectivity.
3188    const PERSIST_TEMPLATE: &str = r#"{"Resources":{
3189        "Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"cfn-q"}},
3190        "T":{"Type":"AWS::SNS::Topic","Properties":{"TopicName":"cfn-t"}},
3191        "B":{"Type":"AWS::S3::Bucket","Properties":{"BucketName":"cfn-bucket"}}
3192    }}"#;
3193
3194    fn create_req(stack: &str) -> AwsRequest {
3195        let mut p = HashMap::new();
3196        p.insert("StackName".to_string(), stack.to_string());
3197        p.insert("TemplateBody".to_string(), PERSIST_TEMPLATE.to_string());
3198        make_request("CreateStack", p)
3199    }
3200
3201    #[tokio::test]
3202    async fn cfn_create_persists_touched_services_and_writes_bucket_to_store() {
3203        let tmp = tempfile::tempdir().unwrap();
3204        let store = disk_s3_store(&tmp);
3205        let counter = Arc::new(AtomicUsize::new(0));
3206        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3207            BTreeMap::new();
3208        hooks.insert("sqs", counting_hook(counter.clone()));
3209        hooks.insert("sns", counting_hook(counter.clone()));
3210        // Registered but not in the template -> must not fire.
3211        hooks.insert("lambda", counting_hook(counter.clone()));
3212        let svc = make_service()
3213            .with_s3_store(store.clone())
3214            .with_snapshot_hooks(hooks);
3215
3216        svc.create_stack(&create_req("probe")).await.unwrap();
3217
3218        // sqs + sns fired once each; lambda untouched.
3219        assert_eq!(counter.load(Ordering::SeqCst), 2);
3220        // The bucket was written through to the S3 store, not just the in-memory map.
3221        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3222        assert!(
3223            loaded.buckets.contains_key("cfn-bucket"),
3224            "CFN bucket should be persisted to the S3 store"
3225        );
3226    }
3227
3228    #[tokio::test]
3229    async fn cfn_delete_persists_touched_services_and_removes_bucket_from_store() {
3230        let tmp = tempfile::tempdir().unwrap();
3231        let store = disk_s3_store(&tmp);
3232        let counter = Arc::new(AtomicUsize::new(0));
3233        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3234            BTreeMap::new();
3235        hooks.insert("sqs", counting_hook(counter.clone()));
3236        hooks.insert("sns", counting_hook(counter.clone()));
3237        let svc = make_service()
3238            .with_s3_store(store.clone())
3239            .with_snapshot_hooks(hooks);
3240
3241        svc.create_stack(&create_req("probe")).await.unwrap();
3242        assert_eq!(counter.load(Ordering::SeqCst), 2, "create fired sqs + sns");
3243
3244        let mut p = HashMap::new();
3245        p.insert("StackName".to_string(), "probe".to_string());
3246        svc.delete_stack(&make_request("DeleteStack", p))
3247            .await
3248            .unwrap();
3249
3250        // Delete fired the touched services again (sqs + sns).
3251        assert_eq!(counter.load(Ordering::SeqCst), 4, "delete fired sqs + sns");
3252        // And the CFN-deleted bucket is gone from the store, so it does not
3253        // reappear after a restart.
3254        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3255        assert!(
3256            !loaded.buckets.contains_key("cfn-bucket"),
3257            "CFN-deleted bucket should be removed from the S3 store"
3258        );
3259    }
3260
3261    #[tokio::test]
3262    async fn cfn_persist_skips_services_without_a_registered_hook() {
3263        // Only "sqs" has a hook; the stack also touches SNS and S3. The missing
3264        // hooks must be silently skipped (no panic), and "sqs" fires once.
3265        let tmp = tempfile::tempdir().unwrap();
3266        let store = disk_s3_store(&tmp);
3267        let counter = Arc::new(AtomicUsize::new(0));
3268        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3269            BTreeMap::new();
3270        hooks.insert("sqs", counting_hook(counter.clone()));
3271        let svc = make_service()
3272            .with_s3_store(store.clone())
3273            .with_snapshot_hooks(hooks);
3274
3275        svc.create_stack(&create_req("probe")).await.unwrap();
3276        assert_eq!(counter.load(Ordering::SeqCst), 1, "only sqs has a hook");
3277    }
3278
3279    #[tokio::test]
3280    async fn cfn_update_persists_touched_services() {
3281        // Create with just SQS, then update to a template that adds SNS + a
3282        // bucket; the update must persist the services it touches.
3283        let tmp = tempfile::tempdir().unwrap();
3284        let store = disk_s3_store(&tmp);
3285        let counter = Arc::new(AtomicUsize::new(0));
3286        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3287            BTreeMap::new();
3288        hooks.insert("sqs", counting_hook(counter.clone()));
3289        hooks.insert("sns", counting_hook(counter.clone()));
3290        let svc = make_service()
3291            .with_s3_store(store.clone())
3292            .with_snapshot_hooks(hooks);
3293
3294        let mut create = HashMap::new();
3295        create.insert("StackName".to_string(), "upd".to_string());
3296        create.insert(
3297            "TemplateBody".to_string(),
3298            r#"{"Resources":{"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"u-q"}}}}"#
3299                .to_string(),
3300        );
3301        svc.create_stack(&make_request("CreateStack", create))
3302            .await
3303            .unwrap();
3304        let after_create = counter.load(Ordering::SeqCst);
3305
3306        let mut update = HashMap::new();
3307        update.insert("StackName".to_string(), "upd".to_string());
3308        update.insert("TemplateBody".to_string(), PERSIST_TEMPLATE.to_string());
3309        svc.update_stack(&make_request("UpdateStack", update))
3310            .await
3311            .unwrap();
3312
3313        // The update touched at least SNS (added); the hook count must grow.
3314        assert!(
3315            counter.load(Ordering::SeqCst) > after_create,
3316            "update should persist the services it touched"
3317        );
3318        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3319        assert!(loaded.buckets.contains_key("cfn-bucket"));
3320    }
3321
3322    #[tokio::test]
3323    async fn cfn_execute_change_set_persists_touched_services() {
3324        // The changeset path -- CreateChangeSet + ExecuteChangeSet -- is how
3325        // `cdk deploy`, `aws cloudformation deploy`, and SAM provision. It must
3326        // write provisioned services through to disk the same way CreateStack
3327        // does, or the resources report CREATE_COMPLETE yet vanish on restart
3328        // (bug-audit 2026-06-20, 0.A1 / #1766 class).
3329        let tmp = tempfile::tempdir().unwrap();
3330        let store = disk_s3_store(&tmp);
3331        let counter = Arc::new(AtomicUsize::new(0));
3332        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3333            BTreeMap::new();
3334        hooks.insert("sqs", counting_hook(counter.clone()));
3335        let svc = make_service()
3336            .with_s3_store(store.clone())
3337            .with_snapshot_hooks(hooks);
3338
3339        let mut create = HashMap::new();
3340        create.insert("StackName".to_string(), "cs-stack".to_string());
3341        create.insert("ChangeSetName".to_string(), "cs1".to_string());
3342        create.insert("ChangeSetType".to_string(), "CREATE".to_string());
3343        create.insert(
3344            "TemplateBody".to_string(),
3345            r#"{"Resources":{"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"cs-q"}}}}"#
3346                .to_string(),
3347        );
3348        svc.handle(make_request("CreateChangeSet", create))
3349            .await
3350            .unwrap();
3351        // CreateChangeSet doesn't provision -- it must not persist a service yet.
3352        let before = counter.load(Ordering::SeqCst);
3353
3354        let mut exec = HashMap::new();
3355        exec.insert("StackName".to_string(), "cs-stack".to_string());
3356        exec.insert("ChangeSetName".to_string(), "cs1".to_string());
3357        svc.handle(make_request("ExecuteChangeSet", exec))
3358            .await
3359            .unwrap();
3360
3361        assert!(
3362            counter.load(Ordering::SeqCst) > before,
3363            "ExecuteChangeSet must fire the sqs snapshot hook so the provisioned \
3364             queue survives a restart"
3365        );
3366    }
3367
3368    #[test]
3369    fn service_key_for_type_maps_services_and_aliases() {
3370        // Direct service segments.
3371        assert_eq!(
3372            service_key_for_type("AWS::Lambda::Function"),
3373            Some("lambda")
3374        );
3375        assert_eq!(
3376            service_key_for_type("AWS::SecretsManager::Secret"),
3377            Some("secretsmanager")
3378        );
3379        assert_eq!(service_key_for_type("AWS::SQS::Queue"), Some("sqs"));
3380        assert_eq!(service_key_for_type("AWS::IAM::Role"), Some("iam"));
3381        assert_eq!(
3382            service_key_for_type("AWS::StepFunctions::StateMachine"),
3383            Some("stepfunctions")
3384        );
3385        // Namespace aliases that differ from the fakecloud service name.
3386        assert_eq!(
3387            service_key_for_type("AWS::Events::Rule"),
3388            Some("eventbridge")
3389        );
3390        assert_eq!(service_key_for_type("AWS::Logs::LogGroup"), Some("logs"));
3391        assert_eq!(
3392            service_key_for_type("AWS::ElastiCache::CacheCluster"),
3393            Some("elasticache")
3394        );
3395        // S3 has no snapshot hook (it persists via the S3Store write-through).
3396        assert_eq!(service_key_for_type("AWS::S3::Bucket"), None);
3397        // Snapshot-backed services whose CFN namespace differs from the
3398        // fakecloud service name (these were missing from the map, #1766 class).
3399        assert_eq!(
3400            service_key_for_type("AWS::CertificateManager::Certificate"),
3401            Some("acm")
3402        );
3403        assert_eq!(
3404            service_key_for_type("AWS::ElasticLoadBalancingV2::LoadBalancer"),
3405            Some("elbv2")
3406        );
3407        assert_eq!(
3408            service_key_for_type("AWS::CloudFront::Distribution"),
3409            Some("cloudfront")
3410        );
3411        assert_eq!(
3412            service_key_for_type("AWS::Route53::HostedZone"),
3413            Some("route53")
3414        );
3415        assert_eq!(
3416            service_key_for_type("AWS::KinesisFirehose::DeliveryStream"),
3417            Some("firehose")
3418        );
3419        assert_eq!(service_key_for_type("AWS::Glue::Database"), Some("glue"));
3420        assert_eq!(service_key_for_type("AWS::WAFv2::WebACL"), Some("wafv2"));
3421        assert_eq!(
3422            service_key_for_type("AWS::Athena::WorkGroup"),
3423            Some("athena")
3424        );
3425        assert_eq!(
3426            service_key_for_type("AWS::Organizations::Organization"),
3427            Some("organizations")
3428        );
3429        // Malformed / non-AWS types.
3430        assert_eq!(service_key_for_type("AWS::Lambda"), None);
3431        assert_eq!(service_key_for_type("Custom::Thing::Resource"), None);
3432        assert_eq!(service_key_for_type("AWS"), None);
3433        assert_eq!(service_key_for_type(""), None);
3434    }
3435
3436    #[tokio::test]
3437    async fn persist_touched_services_noop_with_empty_hooks() {
3438        // No registered hooks -> nothing to do, must not panic.
3439        let hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> = BTreeMap::new();
3440        persist_touched_services(&hooks, vec!["AWS::SQS::Queue".to_string()]).await;
3441    }
3442
3443    #[tokio::test]
3444    async fn cfn_bucket_policy_write_through_create_update_delete() {
3445        let tmp = tempfile::tempdir().unwrap();
3446        let store = disk_s3_store(&tmp);
3447        let svc = make_service().with_s3_store(store.clone());
3448
3449        // Create a bucket + bucket policy.
3450        let mut create = HashMap::new();
3451        create.insert("StackName".to_string(), "pol".to_string());
3452        create.insert(
3453            "TemplateBody".to_string(),
3454            r#"{"Resources":{
3455                "B":{"Type":"AWS::S3::Bucket","Properties":{"BucketName":"pol-bucket"}},
3456                "BP":{"Type":"AWS::S3::BucketPolicy","Properties":{"Bucket":"pol-bucket","PolicyDocument":{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:GetObject","Resource":"*","Principal":"*"}]}}}
3457            }}"#
3458            .to_string(),
3459        );
3460        svc.create_stack(&make_request("CreateStack", create))
3461            .await
3462            .unwrap();
3463        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3464        let policy = loaded.buckets["pol-bucket"]
3465            .subresources
3466            .get("policy.toml")
3467            .cloned()
3468            .expect("bucket policy persisted on create");
3469        assert!(policy.contains("s3:GetObject"));
3470
3471        // Update the policy document; the update must write through.
3472        let mut update = HashMap::new();
3473        update.insert("StackName".to_string(), "pol".to_string());
3474        update.insert(
3475            "TemplateBody".to_string(),
3476            r#"{"Resources":{
3477                "B":{"Type":"AWS::S3::Bucket","Properties":{"BucketName":"pol-bucket"}},
3478                "BP":{"Type":"AWS::S3::BucketPolicy","Properties":{"Bucket":"pol-bucket","PolicyDocument":{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:PutObject","Resource":"*","Principal":"*"}]}}}
3479            }}"#
3480            .to_string(),
3481        );
3482        svc.update_stack(&make_request("UpdateStack", update))
3483            .await
3484            .unwrap();
3485        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3486        let policy = loaded.buckets["pol-bucket"]
3487            .subresources
3488            .get("policy.toml")
3489            .cloned()
3490            .expect("bucket policy still persisted after update");
3491        assert!(
3492            policy.contains("s3:PutObject"),
3493            "updated policy should be written through"
3494        );
3495
3496        // Delete the stack; the bucket (and its policy) must be removed from disk.
3497        let mut del = HashMap::new();
3498        del.insert("StackName".to_string(), "pol".to_string());
3499        svc.delete_stack(&make_request("DeleteStack", del))
3500            .await
3501            .unwrap();
3502        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3503        assert!(
3504            !loaded.buckets.contains_key("pol-bucket"),
3505            "CFN-deleted bucket and policy should be gone from the store"
3506        );
3507    }
3508}