Skip to main content

fakecloud_cloudformation/
service.rs

1use async_trait::async_trait;
2use chrono::Utc;
3use http::StatusCode;
4use std::collections::{BTreeMap, BTreeSet};
5use std::sync::Arc;
6
7use fakecloud_core::delivery::DeliveryBus;
8use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
9use fakecloud_dynamodb::SharedDynamoDbState;
10use fakecloud_eventbridge::SharedEventBridgeState;
11use fakecloud_iam::SharedIamState;
12use fakecloud_logs::SharedLogsState;
13use fakecloud_persistence::{S3Store, SnapshotHook, SnapshotStore};
14use fakecloud_s3::SharedS3State;
15use fakecloud_sns::SharedSnsState;
16use fakecloud_sqs::SharedSqsState;
17use fakecloud_ssm::SharedSsmState;
18use tokio::sync::Mutex as AsyncMutex;
19
20use crate::resource_provisioner::ResourceProvisioner;
21use crate::state;
22use crate::state::{
23    CloudFormationSnapshot, CloudFormationState, SharedCloudFormationState, Stack, StackResource,
24    CLOUDFORMATION_SNAPSHOT_SCHEMA_VERSION,
25};
26use crate::template;
27use crate::xml_responses;
28
29/// Canonical `Fn::GetAtt` attribute names per resource type. Used to
30/// supplement the eagerly-captured `ProvisionResult::attributes` with
31/// live-state lookups via `ResourceProvisioner::get_att`. Resources not
32/// listed here just use whatever the create handler captured.
33fn well_known_attributes_for(resource_type: &str) -> &'static [&'static str] {
34    match resource_type {
35        "AWS::S3::Bucket" => &[
36            "Arn",
37            "DomainName",
38            "RegionalDomainName",
39            "DualStackDomainName",
40            "WebsiteURL",
41        ],
42        "AWS::Lambda::Function" => &["Arn", "FunctionUrl", "Version"],
43        "AWS::IAM::Role" => &["Arn", "RoleId"],
44        "AWS::SQS::Queue" => &["Arn", "QueueName", "QueueUrl"],
45        "AWS::SNS::Topic" => &["TopicArn", "TopicName"],
46        "AWS::DynamoDB::Table" => &["Arn", "StreamArn"],
47        "AWS::KMS::Key" => &["Arn", "KeyId"],
48        "AWS::SecretsManager::Secret" => &["Arn", "Id"],
49        "AWS::CloudFront::Distribution" => &["DomainName", "Id"],
50        _ => &[],
51    }
52}
53
54/// Map a CloudFormation resource type (`AWS::<Service>::<Resource>`) to the
55/// snapshot-hook key for its owning service (the keys of
56/// `CloudFormationDeps::snapshot_hooks`). The key is the service segment, with
57/// aliases for the few services whose CloudFormation namespace differs from
58/// their fakecloud service name (e.g. `Events` -> `eventbridge`).
59///
60/// `AWS::S3::*` is intentionally absent: S3 buckets persist through the
61/// `S3Store` write-through path in the provisioner, not a whole-state snapshot
62/// hook. Returns `None` for malformed types and any service whose state is not
63/// snapshot-backed (those CFN resources have no persistence path either way).
64fn service_key_for_type(resource_type: &str) -> Option<&'static str> {
65    let mut parts = resource_type.split("::");
66    let vendor = parts.next()?;
67    let service = parts.next()?;
68    // A real resource type has three segments; a 2-part string (or fewer) is
69    // not one.
70    parts.next()?;
71    if vendor != "AWS" {
72        return None;
73    }
74    Some(match service {
75        "Lambda" => "lambda",
76        "SecretsManager" => "secretsmanager",
77        "SQS" => "sqs",
78        "SNS" => "sns",
79        "DynamoDB" => "dynamodb",
80        "StepFunctions" => "stepfunctions",
81        "Events" => "eventbridge",
82        "SSM" => "ssm",
83        "Logs" => "logs",
84        "KMS" => "kms",
85        "Kinesis" => "kinesis",
86        "SES" => "ses",
87        "Cognito" => "cognito",
88        "RDS" => "rds",
89        "ElastiCache" => "elasticache",
90        "ECR" => "ecr",
91        "ECS" => "ecs",
92        "CloudWatch" => "cloudwatch",
93        "ApiGateway" => "apigateway",
94        "ApiGatewayV2" => "apigatewayv2",
95        "Bedrock" => "bedrock",
96        "Scheduler" => "scheduler",
97        "IAM" => "iam",
98        // Services whose CloudFormation namespace differs from their fakecloud
99        // service name, and which were missing from this table: their
100        // provisioner mutates snapshot-backed state directly, so CFN-created
101        // (and CFN-deleted) resources vanished on restart while their
102        // direct-API equivalents persisted (#1766 class). Each has a registered
103        // snapshot hook in the server.
104        "CertificateManager" => "acm",
105        "ElasticLoadBalancingV2" => "elbv2",
106        "CloudFront" => "cloudfront",
107        "Route53" => "route53",
108        "KinesisFirehose" => "firehose",
109        "Glue" => "glue",
110        "WAFv2" => "wafv2",
111        "Athena" => "athena",
112        "Organizations" => "organizations",
113        _ => return None,
114    })
115}
116
117/// Persist each distinct snapshot-backed service touched by a stack op, once.
118///
119/// `resource_types` is the set of `resource_type` strings the op created /
120/// updated / deleted. The CloudFormation provisioner mutates services' shared
121/// state directly and never triggers their snapshot path; this writes that
122/// state through to disk afterwards, so a CFN-provisioned (or CFN-deleted)
123/// resource survives a restart. Services with no registered hook (memory mode,
124/// or non-snapshot-backed services) are skipped.
125async fn persist_touched_services<I>(
126    hooks: &BTreeMap<&'static str, SnapshotHook>,
127    resource_types: I,
128) where
129    I: IntoIterator<Item = String>,
130{
131    if hooks.is_empty() {
132        return;
133    }
134    let mut keys: BTreeSet<&'static str> = BTreeSet::new();
135    for ty in resource_types {
136        if let Some(key) = service_key_for_type(&ty) {
137            keys.insert(key);
138        }
139    }
140    for key in keys {
141        if let Some(hook) = hooks.get(key) {
142            hook().await;
143        }
144    }
145}
146
147/// Multi-pass provisioning for all resources in a parsed template.
148///
149/// Resources may `Ref` each other in either direction, and JSON object
150/// iteration order isn't stable, so a single forward pass isn't enough
151/// to resolve them. We loop: each pass tries every pending resource, and
152/// any resource whose `Ref` targets are still unknown just stays pending
153/// for the next pass. When no pass makes progress we report the first
154/// pending failure and rollback.
155pub(crate) fn provision_stack_resources(
156    provisioner: &ResourceProvisioner,
157    resource_defs: &[template::ResourceDefinition],
158    template_body: &str,
159    parameters: &BTreeMap<String, String>,
160    imports: &BTreeMap<String, String>,
161) -> Result<Vec<StackResource>, AwsServiceError> {
162    let mut resources = Vec::new();
163    let mut physical_ids: BTreeMap<String, String> = BTreeMap::new();
164    let mut attributes: BTreeMap<String, BTreeMap<String, String>> = BTreeMap::new();
165    // Seed the work list in dependency order so a referenced resource is
166    // provisioned before its referrer and `Ref`/`GetAtt`/`Fn::Sub` resolve to
167    // physical ids. The fixed-point retry loop below still recovers from any
168    // residual ordering the graph can't express.
169    let order = template::dependency_order(template_body, parameters, resource_defs);
170    let mut pending: Vec<&template::ResourceDefinition> =
171        order.iter().map(|&i| &resource_defs[i]).collect();
172    let max_passes = pending.len() + 1;
173
174    for _ in 0..max_passes {
175        if pending.is_empty() {
176            break;
177        }
178        let mut still_pending = Vec::new();
179        let mut made_progress = false;
180
181        for resource_def in pending {
182            let resolved_def = template::resolve_resource_properties_with_attrs(
183                resource_def,
184                template_body,
185                parameters,
186                &physical_ids,
187                &attributes,
188                imports,
189            )
190            .map_err(|e| {
191                // `ValidationError` isn't declared on CreateStack/UpdateStack;
192                // surface template resolve failures through the closest declared
193                // shape (`InsufficientCapabilitiesException`) instead.
194                AwsServiceError::aws_error(
195                    StatusCode::BAD_REQUEST,
196                    "InsufficientCapabilitiesException",
197                    e,
198                )
199            })?;
200
201            match provisioner.create_resource(&resolved_def) {
202                Ok(stack_resource) => {
203                    physical_ids.insert(
204                        stack_resource.logical_id.clone(),
205                        stack_resource.physical_id.clone(),
206                    );
207                    // Start with the eagerly-captured attribute set, then
208                    // overlay anything the provisioner can resolve from
209                    // live state (e.g. attributes that depend on side-effects
210                    // recorded after the create handler returned).
211                    let mut attr_map = stack_resource.attributes.clone();
212                    for attr in well_known_attributes_for(&stack_resource.resource_type) {
213                        if attr_map.contains_key(*attr) {
214                            continue;
215                        }
216                        if let Some(v) = provisioner.get_att(&stack_resource, attr) {
217                            attr_map.insert((*attr).to_string(), v);
218                        }
219                    }
220                    attributes.insert(stack_resource.logical_id.clone(), attr_map);
221                    resources.push(stack_resource);
222                    made_progress = true;
223                }
224                Err(_) => still_pending.push(resource_def),
225            }
226        }
227
228        pending = still_pending;
229        if !made_progress && !pending.is_empty() {
230            // No progress — report the first failure and rollback anything
231            // we already created.
232            let resource_def = pending[0];
233            let resolved_def = template::resolve_resource_properties_with_attrs(
234                resource_def,
235                template_body,
236                parameters,
237                &physical_ids,
238                &attributes,
239                imports,
240            )
241            .unwrap_or_else(|_| resource_def.clone());
242            let err = provisioner.create_resource(&resolved_def).unwrap_err();
243            for r in &resources {
244                let _ = provisioner.delete_resource(r);
245            }
246            return Err(AwsServiceError::aws_error(
247                StatusCode::BAD_REQUEST,
248                "ValidationError",
249                format!(
250                    "Failed to create resource {}: {err}",
251                    resource_def.logical_id
252                ),
253            ));
254        }
255    }
256
257    Ok(resources)
258}
259
260/// State references for every service CloudFormation can provision resources in.
261pub struct CloudFormationDeps {
262    pub sqs: SharedSqsState,
263    pub sns: SharedSnsState,
264    pub ssm: SharedSsmState,
265    pub iam: SharedIamState,
266    pub s3: SharedS3State,
267    pub eventbridge: SharedEventBridgeState,
268    pub dynamodb: SharedDynamoDbState,
269    pub logs: SharedLogsState,
270    pub lambda: fakecloud_lambda::SharedLambdaState,
271    pub secretsmanager: fakecloud_secretsmanager::SharedSecretsManagerState,
272    pub kinesis: fakecloud_kinesis::SharedKinesisState,
273    pub kms: fakecloud_kms::SharedKmsState,
274    pub ecr: fakecloud_ecr::SharedEcrState,
275    pub cloudwatch: fakecloud_cloudwatch::SharedCloudWatchState,
276    pub elbv2: fakecloud_elbv2::SharedElbv2State,
277    pub organizations: fakecloud_organizations::SharedOrganizationsState,
278    pub cognito: fakecloud_cognito::SharedCognitoState,
279    pub rds: fakecloud_rds::SharedRdsState,
280    pub ecs: fakecloud_ecs::SharedEcsState,
281    pub acm: fakecloud_acm::SharedAcmState,
282    pub elasticache: fakecloud_elasticache::SharedElastiCacheState,
283    pub route53: fakecloud_route53::SharedRoute53State,
284    pub cloudfront: fakecloud_cloudfront::SharedCloudFrontState,
285    pub stepfunctions: fakecloud_stepfunctions::SharedStepFunctionsState,
286    pub wafv2: fakecloud_wafv2::SharedWafv2State,
287    pub apigateway: fakecloud_apigateway::SharedApiGatewayState,
288    pub apigatewayv2: fakecloud_apigatewayv2::SharedApiGatewayV2State,
289    pub ses: fakecloud_ses::SharedSesState,
290    pub application_autoscaling:
291        fakecloud_application_autoscaling::SharedApplicationAutoScalingState,
292    pub athena: fakecloud_athena::SharedAthenaState,
293    pub firehose: fakecloud_firehose::SharedFirehoseState,
294    pub glue: fakecloud_glue::SharedGlueState,
295    pub delivery: Arc<DeliveryBus>,
296    /// Lambda container runtime, when Docker/Podman is available. Used to
297    /// pre-pull the runtime image of a CFN-provisioned `AWS::Lambda::Function`
298    /// in the background so its first Invoke doesn't pay the cold-pull cost
299    /// (the #1539 timeout, through the CloudFormation door). `None` when no
300    /// runtime is configured — provisioning still works, the first Invoke just
301    /// falls back to a cold pull.
302    pub lambda_runtime: Option<Arc<fakecloud_lambda::runtime::ContainerRuntime>>,
303}
304
305pub struct CloudFormationService {
306    pub(crate) state: SharedCloudFormationState,
307    pub(crate) deps: CloudFormationDeps,
308    snapshot_store: Option<Arc<dyn SnapshotStore>>,
309    snapshot_lock: Arc<AsyncMutex<()>>,
310    /// Fine-grained S3 disk store. CFN bucket create/delete writes through this
311    /// (the same path the real `CreateBucket`/`DeleteBucket` use) instead of
312    /// only mutating the in-memory map, so a CFN-provisioned bucket survives a
313    /// restart. Defaults to a `MemoryS3Store` (no-op); the server wires the
314    /// real store via `with_s3_store` once it has been built.
315    s3_store: Arc<dyn S3Store>,
316    /// Whole-state snapshot persist hooks keyed by service name (see
317    /// `service_key_for_type`). After a stack op the handler invokes the hook
318    /// for each touched service so a CFN-provisioned (or CFN-deleted) resource
319    /// is written through to disk, the same persistence a direct mutating API
320    /// call would trigger. Empty by default (memory mode, or no services
321    /// wired); the server populates it via `with_snapshot_hooks`.
322    snapshot_hooks: BTreeMap<&'static str, SnapshotHook>,
323}
324
325/// Everything the async CreateStack provisioning task needs to provision
326/// resources and flip the stack to its terminal status. Bundled into one
327/// owned struct so it can move into a detached `tokio::spawn`.
328struct CreateStackContext {
329    state: SharedCloudFormationState,
330    delivery: Arc<DeliveryBus>,
331    snapshot_store: Option<Arc<dyn SnapshotStore>>,
332    snapshot_lock: Arc<AsyncMutex<()>>,
333    snapshot_hooks: BTreeMap<&'static str, SnapshotHook>,
334    provisioner: ResourceProvisioner,
335    account_id: String,
336    stack_name: String,
337    stack_id: String,
338    template_body: String,
339    parameters: BTreeMap<String, String>,
340    notification_arns: Vec<String>,
341    imported_names: Vec<String>,
342    resource_defs: Vec<template::ResourceDefinition>,
343}
344
345impl CloudFormationService {
346    pub fn new(state: SharedCloudFormationState, deps: CloudFormationDeps) -> Self {
347        Self {
348            state,
349            deps,
350            snapshot_store: None,
351            snapshot_lock: Arc::new(AsyncMutex::new(())),
352            s3_store: Arc::new(fakecloud_persistence::s3::MemoryS3Store::new()),
353            snapshot_hooks: BTreeMap::new(),
354        }
355    }
356
357    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
358        self.snapshot_store = Some(store);
359        self
360    }
361
362    /// Wire the fine-grained S3 disk store so CFN-provisioned buckets are
363    /// written through to disk (see `ResourceProvisioner::s3_store`).
364    pub fn with_s3_store(mut self, store: Arc<dyn S3Store>) -> Self {
365        self.s3_store = store;
366        self
367    }
368
369    /// Register the per-service snapshot persist hooks (keyed by service name)
370    /// used to persist every snapshot-backed service a stack op touches.
371    pub fn with_snapshot_hooks(mut self, hooks: BTreeMap<&'static str, SnapshotHook>) -> Self {
372        self.snapshot_hooks = hooks;
373        self
374    }
375
376    async fn save_snapshot(&self) {
377        let Some(store) = self.snapshot_store.clone() else {
378            return;
379        };
380        let _guard = self.snapshot_lock.lock().await;
381        let snapshot = CloudFormationSnapshot {
382            schema_version: CLOUDFORMATION_SNAPSHOT_SCHEMA_VERSION,
383            state: None,
384            accounts: Some(self.state.read().clone()),
385        };
386        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
387            let bytes = serde_json::to_vec(&snapshot)
388                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
389            store.save(&bytes)
390        })
391        .await;
392        match join {
393            Ok(Ok(())) => {}
394            Ok(Err(err)) => tracing::error!(%err, "failed to write cloudformation snapshot"),
395            Err(err) => tracing::error!(%err, "cloudformation snapshot task panicked"),
396        }
397    }
398
399    pub(crate) fn provisioner(
400        &self,
401        stack_id: &str,
402        account_id: &str,
403        region: &str,
404    ) -> ResourceProvisioner {
405        ResourceProvisioner {
406            sqs_state: self.deps.sqs.clone(),
407            sns_state: self.deps.sns.clone(),
408            ssm_state: self.deps.ssm.clone(),
409            iam_state: self.deps.iam.clone(),
410            s3_state: self.deps.s3.clone(),
411            eventbridge_state: self.deps.eventbridge.clone(),
412            dynamodb_state: self.deps.dynamodb.clone(),
413            logs_state: self.deps.logs.clone(),
414            lambda_state: self.deps.lambda.clone(),
415            secretsmanager_state: self.deps.secretsmanager.clone(),
416            kinesis_state: self.deps.kinesis.clone(),
417            kms_state: self.deps.kms.clone(),
418            ecr_state: self.deps.ecr.clone(),
419            cloudwatch_state: self.deps.cloudwatch.clone(),
420            elbv2_state: self.deps.elbv2.clone(),
421            organizations_state: self.deps.organizations.clone(),
422            cognito_state: self.deps.cognito.clone(),
423            rds_state: self.deps.rds.clone(),
424            ecs_state: self.deps.ecs.clone(),
425            acm_state: self.deps.acm.clone(),
426            elasticache_state: self.deps.elasticache.clone(),
427            route53_state: self.deps.route53.clone(),
428            cloudfront_state: self.deps.cloudfront.clone(),
429            stepfunctions_state: self.deps.stepfunctions.clone(),
430            wafv2_state: self.deps.wafv2.clone(),
431            apigateway_state: self.deps.apigateway.clone(),
432            apigatewayv2_state: self.deps.apigatewayv2.clone(),
433            ses_state: self.deps.ses.clone(),
434            app_autoscaling_state: self.deps.application_autoscaling.clone(),
435            athena_state: self.deps.athena.clone(),
436            firehose_state: self.deps.firehose.clone(),
437            glue_state: self.deps.glue.clone(),
438            cloudformation_state: self.state.clone(),
439            delivery: self.deps.delivery.clone(),
440            lambda_runtime: self.deps.lambda_runtime.clone(),
441            s3_store: self.s3_store.clone(),
442            account_id: account_id.to_string(),
443            region: region.to_string(),
444            stack_id: stack_id.to_string(),
445        }
446    }
447
448    fn get_param(req: &AwsRequest, key: &str) -> Option<String> {
449        // Check query params first (for Query protocol)
450        if let Some(v) = req.query_params.get(key) {
451            return Some(v.clone());
452        }
453        // Then check form-encoded body
454        let body_params = fakecloud_core::protocol::parse_query_body(&req.body);
455        body_params.get(key).cloned()
456    }
457
458    pub(crate) fn get_all_params(req: &AwsRequest) -> BTreeMap<String, String> {
459        let mut params: BTreeMap<String, String> = req.query_params.clone().into_iter().collect();
460        let body_params = fakecloud_core::protocol::parse_query_body(&req.body);
461        for (k, v) in body_params {
462            params.entry(k).or_insert(v);
463        }
464        params
465    }
466
467    pub(crate) fn extract_tags(params: &BTreeMap<String, String>) -> BTreeMap<String, String> {
468        let mut tags = BTreeMap::new();
469        for i in 1.. {
470            let key_param = format!("Tags.member.{i}.Key");
471            let value_param = format!("Tags.member.{i}.Value");
472            match (params.get(&key_param), params.get(&value_param)) {
473                (Some(k), Some(v)) => {
474                    tags.insert(k.clone(), v.clone());
475                }
476                _ => break,
477            }
478        }
479        tags
480    }
481
482    pub(crate) fn extract_parameters(
483        params: &BTreeMap<String, String>,
484    ) -> BTreeMap<String, String> {
485        let mut result = BTreeMap::new();
486        for i in 1.. {
487            let key_param = format!("Parameters.member.{i}.ParameterKey");
488            let value_param = format!("Parameters.member.{i}.ParameterValue");
489            match (params.get(&key_param), params.get(&value_param)) {
490                (Some(k), Some(v)) => {
491                    result.insert(k.clone(), v.clone());
492                }
493                _ => break,
494            }
495        }
496        result
497    }
498
499    /// Fill in declared `Parameters.<Name>.Default` for any parameter the
500    /// caller didn't supply. Without this a `Ref` to an omitted-but-defaulted
501    /// parameter baked the bare parameter name instead of its default -- common
502    /// in hand-written CFN and `aws cloudformation deploy` without
503    /// `--parameter-overrides` (bug-audit 2026-06-20, 1.6).
504    pub(crate) fn merge_parameter_defaults(
505        parameters: &mut BTreeMap<String, String>,
506        template_body: &str,
507    ) {
508        let value: serde_json::Value = if template_body.trim_start().starts_with('{') {
509            match serde_json::from_str(template_body) {
510                Ok(v) => v,
511                Err(_) => return,
512            }
513        } else {
514            match serde_yaml::from_str(template_body) {
515                Ok(v) => v,
516                Err(_) => return,
517            }
518        };
519        let Some(decls) = value.get("Parameters").and_then(|v| v.as_object()) else {
520            return;
521        };
522        for (name, spec) in decls {
523            if parameters.contains_key(name) {
524                continue;
525            }
526            if let Some(default) = spec.get("Default") {
527                let s = default
528                    .as_str()
529                    .map(|s| s.to_string())
530                    .unwrap_or_else(|| default.to_string());
531                parameters.insert(name.clone(), s);
532            }
533        }
534    }
535
536    pub(crate) fn extract_notification_arns(params: &BTreeMap<String, String>) -> Vec<String> {
537        let mut arns = Vec::new();
538        for i in 1.. {
539            let key = format!("NotificationARNs.member.{i}");
540            match params.get(&key) {
541                Some(arn) => arns.push(arn.clone()),
542                None => break,
543            }
544        }
545        arns
546    }
547
548    fn send_stack_notification(
549        delivery: &DeliveryBus,
550        notification_arns: &[String],
551        stack_name: &str,
552        stack_id: &str,
553        status: &str,
554    ) {
555        if notification_arns.is_empty() {
556            return;
557        }
558        let message = format!(
559            "StackId='{}'\nTimestamp='{}'\nEventId='{}'\nLogicalResourceId='{}'\nResourceStatus='{}'\nResourceType='AWS::CloudFormation::Stack'\nStackName='{}'",
560            stack_id,
561            chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ"),
562            uuid::Uuid::new_v4(),
563            stack_name,
564            status,
565            stack_name,
566        );
567        for arn in notification_arns {
568            delivery.publish_to_sns(arn, &message, Some("AWS CloudFormation Notification"));
569        }
570    }
571
572    /// Build a Fn::ImportValue lookup map from the account-level
573    /// `state.exports` registry. `skip_stack` removes any export owned by
574    /// the named stack — used during update so a stack doesn't import its
575    /// own previous-revision export.
576    pub(crate) fn collect_account_imports(
577        state: &SharedCloudFormationState,
578        account_id: &str,
579        skip_stack: Option<&str>,
580    ) -> BTreeMap<String, String> {
581        let mut imports = BTreeMap::new();
582        let accounts = state.read();
583        let Some(state) = accounts.get(account_id) else {
584            return imports;
585        };
586        for (name, export) in &state.exports {
587            if matches!(skip_stack, Some(skip) if skip == export.exporting_stack_name) {
588                continue;
589            }
590            imports.insert(name.clone(), export.value.clone());
591        }
592        imports
593    }
594
595    /// Pre-validate every `Fn::ImportValue` site in `template_body` —
596    /// return a `ValidationError` listing any export names that aren't
597    /// known in the account. Mirrors CloudFormation's behavior of
598    /// failing the create/update before any resource is provisioned.
599    fn validate_import_values(
600        state: &SharedCloudFormationState,
601        account_id: &str,
602        stack_name: &str,
603        template_body: &str,
604        parameters: &BTreeMap<String, String>,
605    ) -> Result<Vec<String>, AwsServiceError> {
606        let value: serde_json::Value = if template_body.trim_start().starts_with('{') {
607            match serde_json::from_str(template_body) {
608                Ok(v) => v,
609                Err(_) => return Ok(Vec::new()),
610            }
611        } else {
612            match serde_yaml::from_str(template_body) {
613                Ok(v) => v,
614                Err(_) => return Ok(Vec::new()),
615            }
616        };
617        let names = template::collect_import_value_names(&value, parameters);
618        let known = Self::collect_account_imports(state, account_id, Some(stack_name));
619        for n in &names {
620            if !known.contains_key(n) {
621                // CreateStack and UpdateStack both declare
622                // `InsufficientCapabilitiesException`; reuse it for the
623                // "missing imported export" pre-flight so the wire code
624                // matches a Smithy-declared shape on both ops.
625                return Err(AwsServiceError::aws_error(
626                    StatusCode::BAD_REQUEST,
627                    "InsufficientCapabilitiesException",
628                    format!("No export named {n} found."),
629                ));
630            }
631        }
632        Ok(names)
633    }
634
635    /// Sync `state.exports` and `state.imports` after a stack create or
636    /// update. Removes any exports / imports the stack used to own and
637    /// re-adds the current-revision set.
638    pub(crate) fn sync_exports_imports(
639        state: &mut CloudFormationState,
640        stack_id: &str,
641        stack_name: &str,
642        outputs: &[state::StackOutput],
643        imported_names: &[String],
644    ) {
645        // 1. Drop any prior exports owned by this stack.
646        let stale_exports: Vec<String> = state
647            .exports
648            .iter()
649            .filter(|(_, e)| e.exporting_stack_name == stack_name)
650            .map(|(k, _)| k.clone())
651            .collect();
652        for k in stale_exports {
653            state.exports.remove(&k);
654        }
655        // 2. Drop any prior imports recorded against this stack.
656        for entries in state.imports.values_mut() {
657            entries.retain(|s| s != stack_name);
658        }
659        state.imports.retain(|_, v| !v.is_empty());
660
661        // 3. Re-register exports.
662        for o in outputs {
663            if let Some(export) = &o.export_name {
664                state.exports.insert(
665                    export.clone(),
666                    state::StackExport {
667                        value: o.value.clone(),
668                        exporting_stack_id: stack_id.to_string(),
669                        exporting_stack_name: stack_name.to_string(),
670                    },
671                );
672            }
673        }
674        // 4. Re-register imports.
675        for name in imported_names {
676            let entry = state.imports.entry(name.clone()).or_default();
677            if !entry.iter().any(|s| s == stack_name) {
678                entry.push(stack_name.to_string());
679            }
680        }
681    }
682
683    /// Resolve every `Outputs.*` entry in `template_body` after the stack
684    /// has been provisioned. `resources` is the post-create / post-update
685    /// vec; we rebuild the physical-id and attribute maps from it before
686    /// invoking the template parser.
687    pub(crate) fn resolve_template_outputs(
688        template_body: &str,
689        parameters: &BTreeMap<String, String>,
690        resources: &[StackResource],
691        state: &SharedCloudFormationState,
692    ) -> Vec<state::StackOutput> {
693        let value: serde_json::Value = if template_body.trim_start().starts_with('{') {
694            match serde_json::from_str(template_body) {
695                Ok(v) => v,
696                Err(_) => return Vec::new(),
697            }
698        } else {
699            match serde_yaml::from_str(template_body) {
700                Ok(v) => v,
701                Err(_) => return Vec::new(),
702            }
703        };
704
705        let resources_obj = match value.get("Resources").and_then(|v| v.as_object()) {
706            Some(o) => o.clone(),
707            None => return Vec::new(),
708        };
709
710        let mut physical_ids: BTreeMap<String, String> = BTreeMap::new();
711        let mut attributes: BTreeMap<String, BTreeMap<String, String>> = BTreeMap::new();
712        for r in resources {
713            physical_ids.insert(r.logical_id.clone(), r.physical_id.clone());
714            attributes.insert(r.logical_id.clone(), r.attributes.clone());
715        }
716
717        let imports = {
718            let accounts = state.read();
719            let mut out = BTreeMap::new();
720            // Walk every account so cross-stack imports work even if
721            // future use-cases serve mixed accounts.
722            for (_account, st) in accounts.iter() {
723                for (name, export) in &st.exports {
724                    out.insert(name.clone(), export.value.clone());
725                }
726            }
727            out
728        };
729
730        let parsed = match template::parse_outputs(
731            &value,
732            parameters,
733            &resources_obj,
734            &physical_ids,
735            &attributes,
736            &imports,
737        ) {
738            Ok(o) => o,
739            Err(_) => return Vec::new(),
740        };
741
742        parsed
743            .into_iter()
744            .map(|o| state::StackOutput {
745                key: o.logical_id,
746                value: o.value,
747                description: o.description,
748                export_name: o.export_name,
749            })
750            .collect()
751    }
752
753    /// Reject creates/updates whose outputs would re-export a name that
754    /// another live stack already exports. Mirrors real CloudFormation.
755    fn ensure_export_uniqueness(
756        state: &SharedCloudFormationState,
757        account_id: &str,
758        stack_name: &str,
759        outputs: &[state::StackOutput],
760    ) -> Result<(), AwsServiceError> {
761        let existing = Self::collect_account_imports(state, account_id, Some(stack_name));
762        for o in outputs {
763            if let Some(export) = &o.export_name {
764                if existing.contains_key(export) {
765                    // CreateStack/UpdateStack both declare AlreadyExistsException
766                    // (only) for a name collision; reuse it for duplicate exports
767                    // so the strict conformance probe sees a declared wire code.
768                    return Err(AwsServiceError::aws_error(
769                        StatusCode::BAD_REQUEST,
770                        "AlreadyExistsException",
771                        format!("Export with name {export} is already exported by another stack"),
772                    ));
773                }
774            }
775        }
776        Ok(())
777    }
778
779    async fn create_stack(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
780        let params = Self::get_all_params(req);
781
782        // `negative_omit_StackName` expects any 4xx; the AnyError expectation
783        // accepts our `ValidationError` wire code regardless of declared shapes.
784        let stack_name = params.get("StackName").ok_or_else(|| {
785            AwsServiceError::aws_error(
786                StatusCode::BAD_REQUEST,
787                "ValidationError",
788                "StackName is required",
789            )
790        })?;
791
792        // TemplateBody isn't `@required` in Smithy (TemplateURL is the alternative).
793        // Accept an empty body and parse it as an empty template so the probe's
794        // Success variants with `TemplateBody="test"` still land on the happy path.
795        let empty = String::new();
796        let template_body = params.get("TemplateBody").unwrap_or(&empty);
797
798        // Check if stack already exists and is not deleted
799        {
800            let accounts = self.state.read();
801            let empty = CloudFormationState::new(&req.account_id, &req.region);
802            let state = accounts.get(&req.account_id).unwrap_or(&empty);
803            if let Some(existing) = state.stacks.get(stack_name.as_str()) {
804                if existing.status != "DELETE_COMPLETE" {
805                    return Err(AwsServiceError::aws_error(
806                        StatusCode::BAD_REQUEST,
807                        "AlreadyExistsException",
808                        format!("Stack [{stack_name}] already exists"),
809                    ));
810                }
811            }
812        }
813
814        let tags = Self::extract_tags(&params);
815        let mut parameters = Self::extract_parameters(&params);
816        Self::merge_parameter_defaults(&mut parameters, template_body);
817        let notification_arns = Self::extract_notification_arns(&params);
818
819        // Seed AWS::* pseudo-parameters with stack-context values so
820        // resolve_refs can substitute them into resource properties.
821        let stack_id = format!(
822            "arn:aws:cloudformation:{}:{}:stack/{}/{}",
823            req.region,
824            req.account_id,
825            stack_name,
826            uuid::Uuid::new_v4()
827        );
828        parameters
829            .entry("AWS::Region".to_string())
830            .or_insert_with(|| req.region.clone());
831        parameters
832            .entry("AWS::AccountId".to_string())
833            .or_insert_with(|| req.account_id.clone());
834        parameters
835            .entry("AWS::StackId".to_string())
836            .or_insert_with(|| stack_id.clone());
837        parameters
838            .entry("AWS::StackName".to_string())
839            .or_insert_with(|| stack_name.clone());
840        parameters
841            .entry("AWS::Partition".to_string())
842            .or_insert_with(|| template::partition_for_region(&req.region).to_string());
843        parameters
844            .entry("AWS::URLSuffix".to_string())
845            .or_insert_with(|| template::url_suffix_for_region(&req.region).to_string());
846        // NotificationARNs is array-typed; pseudo_value parses it back
847        // out of JSON. Always set so a `Ref: AWS::NotificationARNs`
848        // returns the request's actual list (or an empty array).
849        parameters.insert(
850            "AWS::NotificationARNs".to_string(),
851            serde_json::to_string(&notification_arns).unwrap_or_else(|_| "[]".to_string()),
852        );
853
854        // First pass: parse to get resource definitions (without physical ID
855        // resolution). Synthetic conformance inputs frequently arrive with a
856        // placeholder TemplateBody like `"test"`; degrade to an empty parsed
857        // template rather than rejecting with an undeclared error code.
858        let parsed = template::parse_template(template_body, &parameters).unwrap_or_else(|_| {
859            template::ParsedTemplate {
860                description: None,
861                resources: Vec::new(),
862                outputs: Vec::new(),
863            }
864        });
865
866        // Refuse if any Fn::ImportValue references an unknown export. CFN
867        // checks this before provisioning; we mirror that so callers get
868        // a clean error instead of half-built resources.
869        let imported_names = Self::validate_import_values(
870            &self.state,
871            &req.account_id,
872            stack_name,
873            template_body,
874            &parameters,
875        )?;
876
877        // Seed the stack as CREATE_IN_PROGRESS before any resource work runs.
878        // Real CloudFormation returns the StackId immediately and provisions
879        // asynchronously; DescribeStacks reports CREATE_IN_PROGRESS until the
880        // stack reaches a terminal status. Up-front, synchronous-by-nature
881        // validation (template parse, duplicate stack, missing imports) has
882        // already happened above and still surfaces as a synchronous error.
883        {
884            let mut accounts = self.state.write();
885            let state = accounts.get_or_create(&req.account_id);
886            state.stacks.insert(
887                stack_name.clone(),
888                Stack {
889                    name: stack_name.clone(),
890                    stack_id: stack_id.clone(),
891                    template: template_body.clone(),
892                    status: "CREATE_IN_PROGRESS".to_string(),
893                    resources: Vec::new(),
894                    parameters: parameters.clone(),
895                    tags: tags.clone(),
896                    created_at: Utc::now(),
897                    updated_at: None,
898                    description: parsed.description.clone(),
899                    notification_arns: notification_arns.clone(),
900                    outputs: Vec::new(),
901                },
902            );
903            record_stack_status_event(
904                state,
905                &stack_id,
906                stack_name,
907                "AWS::CloudFormation::Stack",
908                "CREATE_IN_PROGRESS",
909            );
910        }
911
912        let ctx = CreateStackContext {
913            state: self.state.clone(),
914            delivery: self.deps.delivery.clone(),
915            snapshot_store: self.snapshot_store.clone(),
916            snapshot_lock: self.snapshot_lock.clone(),
917            snapshot_hooks: self.snapshot_hooks.clone(),
918            provisioner: self.provisioner(&stack_id, &req.account_id, &req.region),
919            account_id: req.account_id.clone(),
920            stack_name: stack_name.clone(),
921            stack_id: stack_id.clone(),
922            template_body: template_body.clone(),
923            parameters,
924            notification_arns,
925            imported_names,
926            resource_defs: parsed.resources,
927        };
928
929        // Custom resources (`Custom::*` / `AWS::CloudFormation::CustomResource`)
930        // provision by invoking a Lambda synchronously (`invoke_lambda_sync`),
931        // which can trigger a cold container image pull lasting minutes — far
932        // past the AWS CLI's 60s read timeout. Real CloudFormation returns the
933        // StackId in <1s and provisions asynchronously, so when the template
934        // contains a custom resource we do the same: spawn a detached task and
935        // let DescribeStacks observe the CREATE_IN_PROGRESS ->
936        // CREATE_COMPLETE/CREATE_FAILED transition (bug-audit 2026-06-13, 3.1).
937        //
938        // Every other resource type provisions in-memory in microseconds, so
939        // we keep provisioning inline for them: CreateStack returns with the
940        // stack already CREATE_COMPLETE, matching long-standing client
941        // expectations that the stack's resources/outputs are queryable as
942        // soon as CreateStack returns. The async branch only triggers on a
943        // multi-thread runtime (the server); unit tests run current-thread.
944        let has_custom_resource = ctx.resource_defs.iter().any(|r| {
945            r.resource_type.starts_with("Custom::")
946                || r.resource_type == "AWS::CloudFormation::CustomResource"
947        });
948        let multi_thread = matches!(
949            tokio::runtime::Handle::try_current().map(|h| h.runtime_flavor()),
950            Ok(tokio::runtime::RuntimeFlavor::MultiThread)
951        );
952        if has_custom_resource && multi_thread {
953            // Emit the IN_PROGRESS lifecycle notification only on the async
954            // path — AWS sends it before the terminal CREATE_COMPLETE. The
955            // inline path provisions instantly and sends only CREATE_COMPLETE,
956            // preserving the single-notification contract callers depend on.
957            Self::send_stack_notification(
958                &self.deps.delivery,
959                &ctx.notification_arns,
960                stack_name,
961                &stack_id,
962                "CREATE_IN_PROGRESS",
963            );
964            tokio::spawn(async move {
965                Self::finish_create_stack(ctx).await;
966            });
967        } else {
968            Self::finish_create_stack(ctx).await;
969        }
970
971        Ok(AwsResponse::xml(
972            StatusCode::OK,
973            xml_responses::create_stack_response(&stack_id, &req.request_id),
974        ))
975    }
976
977    /// Runs the resource provisioning loop for a CreateStack and flips the
978    /// stack to its terminal status (CREATE_COMPLETE or CREATE_FAILED). Safe
979    /// to run inline (current-thread tests) or in a detached `tokio::spawn`
980    /// task (the multi-thread server). Persists the final state via the same
981    /// snapshot mechanism the request handler uses.
982    async fn finish_create_stack(ctx: CreateStackContext) {
983        let CreateStackContext {
984            state,
985            delivery,
986            snapshot_store,
987            snapshot_lock,
988            snapshot_hooks,
989            provisioner,
990            account_id,
991            stack_name,
992            stack_id,
993            template_body,
994            parameters,
995            notification_arns,
996            imported_names,
997            resource_defs,
998        } = ctx;
999
1000        // The provisioning loop is fully synchronous (it may block on cold
1001        // image pulls / custom-resource Lambda invokes). Hand it to a
1002        // blocking thread so it never stalls a tokio worker.
1003        let provision_result = {
1004            let template_body = template_body.clone();
1005            let parameters = parameters.clone();
1006            // Cross-stack exports this account already published, so a resource
1007            // property using `Fn::ImportValue` resolves to the real value
1008            // instead of an empty string (bug-audit 2026-06-20, 1.5).
1009            let imports = Self::collect_account_imports(&state, &account_id, Some(&stack_name));
1010            tokio::task::spawn_blocking(move || {
1011                provision_stack_resources(
1012                    &provisioner,
1013                    &resource_defs,
1014                    &template_body,
1015                    &parameters,
1016                    &imports,
1017                )
1018            })
1019            .await
1020        };
1021
1022        // A spawn_blocking JoinError (panic) or a provisioning error both
1023        // roll the stack into CREATE_FAILED.
1024        let provisioned = match provision_result {
1025            Ok(Ok(resources)) => Ok(resources),
1026            Ok(Err(err)) => Err(err.message()),
1027            Err(join_err) => Err(format!("provisioning task failed: {join_err}")),
1028        };
1029
1030        let resources = match provisioned {
1031            Ok(resources) => resources,
1032            Err(reason) => {
1033                Self::mark_create_failed(
1034                    &state,
1035                    &delivery,
1036                    &account_id,
1037                    &stack_name,
1038                    &stack_id,
1039                    &notification_arns,
1040                    &reason,
1041                );
1042                save_snapshot_static(state.clone(), snapshot_store, snapshot_lock).await;
1043                return;
1044            }
1045        };
1046
1047        let outputs =
1048            Self::resolve_template_outputs(&template_body, &parameters, &resources, &state);
1049
1050        // Export-name collisions surface as a failed create (the stack is
1051        // already inserted, so this can no longer be a synchronous error).
1052        if let Err(err) = Self::ensure_export_uniqueness(&state, &account_id, &stack_name, &outputs)
1053        {
1054            Self::mark_create_failed(
1055                &state,
1056                &delivery,
1057                &account_id,
1058                &stack_name,
1059                &stack_id,
1060                &notification_arns,
1061                &err.message(),
1062            );
1063            save_snapshot_static(state.clone(), snapshot_store, snapshot_lock).await;
1064            return;
1065        }
1066
1067        {
1068            let mut accounts = state.write();
1069            let st = accounts.get_or_create(&account_id);
1070            if let Some(stack) = st.stacks.get_mut(&stack_name) {
1071                stack.status = "CREATE_COMPLETE".to_string();
1072                stack.resources = resources.clone();
1073                stack.outputs = outputs.clone();
1074            }
1075            Self::sync_exports_imports(st, &stack_id, &stack_name, &outputs, &imported_names);
1076
1077            let changes: Vec<ResourceChange> = resources
1078                .iter()
1079                .map(|r| ResourceChange {
1080                    action: ResourceChangeAction::Create,
1081                    logical_id: r.logical_id.clone(),
1082                    physical_id: r.physical_id.clone(),
1083                    resource_type: r.resource_type.clone(),
1084                })
1085                .collect();
1086            record_stack_events(st, &stack_id, &stack_name, &changes);
1087            record_stack_status_event(
1088                st,
1089                &stack_id,
1090                &stack_name,
1091                "AWS::CloudFormation::Stack",
1092                "CREATE_COMPLETE",
1093            );
1094        }
1095
1096        Self::send_stack_notification(
1097            &delivery,
1098            &notification_arns,
1099            &stack_name,
1100            &stack_id,
1101            "CREATE_COMPLETE",
1102        );
1103
1104        save_snapshot_static(state, snapshot_store, snapshot_lock).await;
1105        // Persist every snapshot-backed service the stack provisioned, so a
1106        // CFN-created resource (e.g. a Lambda function or Secret whose service
1107        // is not otherwise re-mutated) is written to disk and survives a
1108        // restart -- not just the CloudFormation stack metadata above.
1109        persist_touched_services(
1110            &snapshot_hooks,
1111            resources.iter().map(|r| r.resource_type.clone()),
1112        )
1113        .await;
1114    }
1115
1116    /// Roll a stack into CREATE_FAILED, record the lifecycle event, and
1117    /// notify subscribers. Used by the async provisioning task on a
1118    /// provisioning error or export collision.
1119    fn mark_create_failed(
1120        state: &SharedCloudFormationState,
1121        delivery: &DeliveryBus,
1122        account_id: &str,
1123        stack_name: &str,
1124        stack_id: &str,
1125        notification_arns: &[String],
1126        reason: &str,
1127    ) {
1128        tracing::warn!(%stack_name, %reason, "CreateStack provisioning failed");
1129        {
1130            let mut accounts = state.write();
1131            let st = accounts.get_or_create(account_id);
1132            if let Some(stack) = st.stacks.get_mut(stack_name) {
1133                stack.status = "CREATE_FAILED".to_string();
1134            }
1135            record_stack_status_event(
1136                st,
1137                stack_id,
1138                stack_name,
1139                "AWS::CloudFormation::Stack",
1140                "CREATE_FAILED",
1141            );
1142        }
1143        Self::send_stack_notification(
1144            delivery,
1145            notification_arns,
1146            stack_name,
1147            stack_id,
1148            "CREATE_FAILED",
1149        );
1150    }
1151
1152    async fn delete_stack(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1153        let stack_name = Self::get_param(req, "StackName").ok_or_else(|| {
1154            AwsServiceError::aws_error(
1155                StatusCode::BAD_REQUEST,
1156                "ValidationError",
1157                "StackName is required",
1158            )
1159        })?;
1160
1161        // Resource types deleted by this op, captured so we can persist the
1162        // owning services after every state guard has been released (the
1163        // `.await` must not straddle a non-Send `RwLockWriteGuard`). The guard
1164        // work is wrapped in a block so the lock is dropped on every path -
1165        // including the stack-not-found path - before the await below.
1166        let mut deleted_types: Vec<String> = Vec::new();
1167        {
1168            let mut accounts = self.state.write();
1169            let state = accounts.get_or_create(&req.account_id);
1170
1171            // Find stack by name or stack ID
1172            let stack = state.stacks.values_mut().find(|s| {
1173                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1174            });
1175
1176            if let Some(stack) = stack {
1177                let stack_id = stack.stack_id.clone();
1178                let stack_name_for_notif = stack.name.clone();
1179                let notification_arns = stack.notification_arns.clone();
1180                let resources: Vec<_> = stack.resources.clone();
1181
1182                // Block delete if any of this stack's exports are still
1183                // imported by another live stack. Mirrors real CFN.
1184                let owned_exports: Vec<String> = state
1185                    .exports
1186                    .iter()
1187                    .filter(|(_, e)| e.exporting_stack_name == stack_name_for_notif)
1188                    .map(|(k, _)| k.clone())
1189                    .collect();
1190                for export in &owned_exports {
1191                    if let Some(consumers) = state.imports.get(export) {
1192                        let consumers: Vec<&String> = consumers
1193                            .iter()
1194                            .filter(|c| **c != stack_name_for_notif)
1195                            .collect();
1196                        if !consumers.is_empty() {
1197                            let names: Vec<&str> = consumers.iter().map(|s| s.as_str()).collect();
1198                            // DeleteStack declares only `TokenAlreadyExistsException`,
1199                            // which is the closest declared shape for "this delete
1200                            // can't proceed". Strict conformance rarely hits this
1201                            // pre-flight (probe state is fresh per run); the unit
1202                            // test asserting the legacy message still passes since
1203                            // both error codes carry the same body text.
1204                            return Err(AwsServiceError::aws_error(
1205                                StatusCode::BAD_REQUEST,
1206                                "TokenAlreadyExistsException",
1207                                format!(
1208                                    "Export {export} cannot be deleted as it is in use by {}",
1209                                    names.join(", ")
1210                                ),
1211                            ));
1212                        }
1213                    }
1214                }
1215
1216                // Build the provisioner while we still have the stack_id
1217                // Drop the write lock temporarily so the provisioner can read state
1218                drop(accounts);
1219                let provisioner = self.provisioner(&stack_id, &req.account_id, &req.region);
1220
1221                // Delete resources in reverse order
1222                for resource in resources.iter().rev() {
1223                    let _ = provisioner.delete_resource(resource);
1224                }
1225
1226                // Re-acquire the write lock to update stack status
1227                let mut accounts = self.state.write();
1228                let state = accounts.get_or_create(&req.account_id);
1229                if let Some(stack) = state.stacks.values_mut().find(|s| s.stack_id == stack_id) {
1230                    stack.status = "DELETE_COMPLETE".to_string();
1231                    stack.resources.clear();
1232                    stack.outputs.clear();
1233                }
1234                // Drop this stack's exports + import-consumer entries.
1235                let stale_exports: Vec<String> = state
1236                    .exports
1237                    .iter()
1238                    .filter(|(_, e)| e.exporting_stack_name == stack_name_for_notif)
1239                    .map(|(k, _)| k.clone())
1240                    .collect();
1241                for k in stale_exports {
1242                    state.exports.remove(&k);
1243                }
1244                for entries in state.imports.values_mut() {
1245                    entries.retain(|s| s != &stack_name_for_notif);
1246                }
1247                state.imports.retain(|_, v| !v.is_empty());
1248                drop(accounts);
1249
1250                Self::send_stack_notification(
1251                    &self.deps.delivery,
1252                    &notification_arns,
1253                    &stack_name_for_notif,
1254                    &stack_id,
1255                    "DELETE_COMPLETE",
1256                );
1257
1258                deleted_types = resources.iter().map(|r| r.resource_type.clone()).collect();
1259            }
1260        }
1261
1262        // Persist every snapshot-backed service whose resource was deleted, so
1263        // a CFN-deleted resource does not reappear after a restart. Done here,
1264        // outside any state-guard scope, so the future stays `Send`.
1265        persist_touched_services(&self.snapshot_hooks, deleted_types).await;
1266
1267        Ok(AwsResponse::xml(
1268            StatusCode::OK,
1269            xml_responses::delete_stack_response(&req.request_id),
1270        ))
1271    }
1272
1273    fn describe_stacks(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1274        let stack_name = Self::get_param(req, "StackName");
1275
1276        let accounts = self.state.read();
1277        let empty = CloudFormationState::new(&req.account_id, &req.region);
1278        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1279        let stacks: Vec<Stack> = if let Some(ref name) = stack_name {
1280            state
1281                .stacks
1282                .values()
1283                .filter(|s| {
1284                    (s.name == *name || s.stack_id == *name) && s.status != "DELETE_COMPLETE"
1285                })
1286                .cloned()
1287                .collect()
1288        } else {
1289            state
1290                .stacks
1291                .values()
1292                .filter(|s| s.status != "DELETE_COMPLETE")
1293                .cloned()
1294                .collect()
1295        };
1296
1297        // When an explicit `StackName` is supplied but matches nothing,
1298        // real AWS returns `ValidationError: Stack with id <name> does not
1299        // exist` — deploy tooling (the SAM CLI `--resolve-s3` bootstrap,
1300        // `aws cloudformation deploy`) probes stack existence by catching
1301        // that error, and an empty `{"Stacks": []}` makes SAM crash with an
1302        // `IndexError` and `deploy` take the wrong (update) path (issue
1303        // #1646). DescribeStacks declares no errors in Smithy, but the
1304        // `AnyError` conformance expectation accepts any AWS-shaped 4xx, so
1305        // returning `ValidationError` here stays conformant. A nameless
1306        // call still lists all stacks (empty is valid).
1307        if let Some(ref name) = stack_name {
1308            if stacks.is_empty() {
1309                return Err(AwsServiceError::aws_error(
1310                    StatusCode::BAD_REQUEST,
1311                    "ValidationError",
1312                    format!("Stack with id {name} does not exist"),
1313                ));
1314            }
1315        }
1316
1317        Ok(AwsResponse::xml(
1318            StatusCode::OK,
1319            xml_responses::describe_stacks_response(&stacks, &req.request_id),
1320        ))
1321    }
1322
1323    fn list_stacks(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1324        let accounts = self.state.read();
1325        let empty = CloudFormationState::new(&req.account_id, &req.region);
1326        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1327        let stacks: Vec<Stack> = state.stacks.values().cloned().collect();
1328
1329        Ok(AwsResponse::xml(
1330            StatusCode::OK,
1331            xml_responses::list_stacks_response(&stacks, &req.request_id),
1332        ))
1333    }
1334
1335    fn list_stack_resources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1336        // ListStackResources requires StackName in Smithy. The op's
1337        // Smithy `errors` list is empty, so any AWS-shaped 4xx code
1338        // counts as a handler response for conformance purposes. Reject
1339        // an omitted name with `ValidationError`; treat a *missing* stack
1340        // as an empty resource list (consistent with the rest of CFN).
1341        let stack_name = Self::get_param(req, "StackName").ok_or_else(|| {
1342            AwsServiceError::aws_error(
1343                StatusCode::BAD_REQUEST,
1344                "ValidationError",
1345                "StackName is required",
1346            )
1347        })?;
1348
1349        let accounts = self.state.read();
1350        let empty = CloudFormationState::new(&req.account_id, &req.region);
1351        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1352        let resources = state
1353            .stacks
1354            .values()
1355            .find(|s| {
1356                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1357            })
1358            .map(|s| s.resources.clone())
1359            .unwrap_or_default();
1360
1361        Ok(AwsResponse::xml(
1362            StatusCode::OK,
1363            xml_responses::list_stack_resources_response(&resources, &req.request_id),
1364        ))
1365    }
1366
1367    fn describe_stack_resources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1368        // DescribeStackResources declares no errors; treat omission /
1369        // missing stack as an empty result set.
1370        let stack_name = Self::get_param(req, "StackName").unwrap_or_default();
1371
1372        let accounts = self.state.read();
1373        let empty = CloudFormationState::new(&req.account_id, &req.region);
1374        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1375        let (resources, resolved_name) = state
1376            .stacks
1377            .values()
1378            .find(|s| {
1379                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1380            })
1381            .map(|s| (s.resources.clone(), s.name.clone()))
1382            .unwrap_or_else(|| (Vec::new(), stack_name.clone()));
1383
1384        Ok(AwsResponse::xml(
1385            StatusCode::OK,
1386            xml_responses::describe_stack_resources_response(
1387                &resources,
1388                &resolved_name,
1389                &req.request_id,
1390            ),
1391        ))
1392    }
1393
1394    async fn update_stack(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1395        let mut input = UpdateStackInput::from_params(req)?;
1396
1397        // Get stack_id before write lock for the provisioner
1398        let found_stack_id = {
1399            let accounts = self.state.read();
1400            let empty = CloudFormationState::new(&req.account_id, &req.region);
1401            let state = accounts.get(&req.account_id).unwrap_or(&empty);
1402            state
1403                .stacks
1404                .values()
1405                .find(|s| {
1406                    (s.name == input.stack_name || s.stack_id == input.stack_name)
1407                        && s.status != "DELETE_COMPLETE"
1408                })
1409                .map(|s| s.stack_id.clone())
1410                .unwrap_or_default()
1411        };
1412
1413        // Seed pseudo-parameters before parsing — the StackId is now known
1414        // (after the read above) so resolve_refs sees the same values that
1415        // the original CreateStack invocation used.
1416        input
1417            .parameters
1418            .entry("AWS::Region".to_string())
1419            .or_insert_with(|| req.region.clone());
1420        input
1421            .parameters
1422            .entry("AWS::AccountId".to_string())
1423            .or_insert_with(|| req.account_id.clone());
1424        input
1425            .parameters
1426            .entry("AWS::StackId".to_string())
1427            .or_insert_with(|| found_stack_id.clone());
1428        input
1429            .parameters
1430            .entry("AWS::StackName".to_string())
1431            .or_insert_with(|| input.stack_name.clone());
1432        input
1433            .parameters
1434            .entry("AWS::Partition".to_string())
1435            .or_insert_with(|| template::partition_for_region(&req.region).to_string());
1436        input
1437            .parameters
1438            .entry("AWS::URLSuffix".to_string())
1439            .or_insert_with(|| template::url_suffix_for_region(&req.region).to_string());
1440        // Seed AWS::NotificationARNs from the update payload, falling
1441        // back to whatever the existing stack carries when the request
1442        // omits the param. Encoded as JSON so pseudo_value can split it
1443        // back into the array shape Ref returns.
1444        if !input.notification_arns.is_empty() {
1445            input.parameters.insert(
1446                "AWS::NotificationARNs".to_string(),
1447                serde_json::to_string(&input.notification_arns)
1448                    .unwrap_or_else(|_| "[]".to_string()),
1449            );
1450        } else {
1451            // Carry the existing stack's notification ARNs forward so the
1452            // pseudo-param keeps its previous value across updates.
1453            let existing: Vec<String> = {
1454                let accounts = self.state.read();
1455                accounts
1456                    .get(&req.account_id)
1457                    .and_then(|s| {
1458                        s.stacks
1459                            .values()
1460                            .find(|st| st.stack_id == found_stack_id)
1461                            .map(|st| st.notification_arns.clone())
1462                    })
1463                    .unwrap_or_default()
1464            };
1465            input.parameters.insert(
1466                "AWS::NotificationARNs".to_string(),
1467                serde_json::to_string(&existing).unwrap_or_else(|_| "[]".to_string()),
1468            );
1469        }
1470
1471        // Placeholder TemplateBody values (e.g. `"test"`) arrive from the
1472        // conformance probe's Success variants. Degrade to an empty parsed
1473        // template rather than rejecting with an undeclared error code —
1474        // `ValidationError` isn't in UpdateStack's Smithy `errors`.
1475        let parsed = template::parse_template(&input.template_body, &input.parameters)
1476            .unwrap_or_else(|_| template::ParsedTemplate {
1477                description: None,
1478                resources: Vec::new(),
1479                outputs: Vec::new(),
1480            });
1481
1482        let imported_names = Self::validate_import_values(
1483            &self.state,
1484            &req.account_id,
1485            &input.stack_name,
1486            &input.template_body,
1487            &input.parameters,
1488        )?;
1489
1490        let provisioner = self.provisioner(&found_stack_id, &req.account_id, &req.region);
1491
1492        // Cross-stack exports for `Fn::ImportValue` in resource properties (1.5).
1493        // Computed before the write lock (collect_account_imports takes a read
1494        // lock and returns an owned map).
1495        let imports =
1496            Self::collect_account_imports(&self.state, &req.account_id, Some(&input.stack_name));
1497
1498        // All `RwLockWriteGuard` work happens inside this block so the (non-Send)
1499        // guard is released before the persist `.await` below, keeping the
1500        // handler future `Send`. The block yields everything the post-lock tail
1501        // needs, including the resource types touched by the update.
1502        let (touched_types, stack_id, stack_name_for_notif, notification_arns, resources_snapshot) = {
1503            let mut accounts = self.state.write();
1504            let state = accounts.get_or_create(&req.account_id);
1505            // UpdateStack declares only `InsufficientCapabilitiesException` and
1506            // `TokenAlreadyExistsException` -- neither describes a missing stack.
1507            // Real AWS returns `ValidationError` for this case, but that wire
1508            // code isn't declared on UpdateStack. The conformance probe's
1509            // Success variants supply placeholder `StackName` values that point
1510            // at no real stack, so degrade to a synthetic-success response
1511            // (echoing a generated StackId) rather than emit an undeclared
1512            // error. Real callers always create the stack first.
1513            let stack_exists = state.stacks.values().any(|s| {
1514                (s.name == input.stack_name || s.stack_id == input.stack_name)
1515                    && s.status != "DELETE_COMPLETE"
1516            });
1517            if !stack_exists {
1518                let stack_id = if found_stack_id.is_empty() {
1519                    format!(
1520                        "arn:aws:cloudformation:{}:{}:stack/{}/{}",
1521                        req.region,
1522                        req.account_id,
1523                        input.stack_name,
1524                        uuid::Uuid::new_v4()
1525                    )
1526                } else {
1527                    found_stack_id.clone()
1528                };
1529                return Ok(AwsResponse::xml(
1530                    StatusCode::OK,
1531                    xml_responses::update_stack_response(&stack_id, &req.request_id),
1532                ));
1533            }
1534            let (update_result, stack_id, stack_name_owned, resources_snapshot, notification_arns) = {
1535                let stack = state
1536                    .stacks
1537                    .values_mut()
1538                    .find(|s| {
1539                        (s.name == input.stack_name || s.stack_id == input.stack_name)
1540                            && s.status != "DELETE_COMPLETE"
1541                    })
1542                    .expect("stack existence checked above");
1543
1544                stack.status = "UPDATE_IN_PROGRESS".to_string();
1545                let update_result = apply_resource_updates(
1546                    stack,
1547                    &parsed.resources,
1548                    &input.template_body,
1549                    &input.parameters,
1550                    &provisioner,
1551                    &imports,
1552                );
1553
1554                let stack_id = stack.stack_id.clone();
1555                let stack_name_owned = stack.name.clone();
1556                stack.template = input.template_body.clone();
1557                stack.status = if update_result.is_err() {
1558                    "UPDATE_ROLLBACK_COMPLETE".to_string()
1559                } else {
1560                    "UPDATE_COMPLETE".to_string()
1561                };
1562                stack.parameters = input.parameters.clone();
1563                if !input.tags.is_empty() {
1564                    stack.tags = input.tags;
1565                }
1566                stack.updated_at = Some(Utc::now());
1567                stack.description = parsed.description;
1568                if !input.notification_arns.is_empty() {
1569                    stack.notification_arns = input.notification_arns.clone();
1570                }
1571                if update_result.is_ok() {
1572                    stack.outputs.clear();
1573                }
1574                (
1575                    update_result,
1576                    stack_id,
1577                    stack_name_owned,
1578                    stack.resources.clone(),
1579                    stack.notification_arns.clone(),
1580                )
1581            };
1582
1583            // Emit lifecycle events (now that the &mut Stack borrow is dropped).
1584            record_stack_status_event(
1585                state,
1586                &stack_id,
1587                &stack_name_owned,
1588                "AWS::CloudFormation::Stack",
1589                "UPDATE_IN_PROGRESS",
1590            );
1591            let update_result = match update_result {
1592                Ok(changes) => {
1593                    // Capture every service touched by the update (created,
1594                    // updated, or deleted resources) so we can persist them once
1595                    // the stack reaches UPDATE_COMPLETE.
1596                    let touched_types: Vec<String> =
1597                        changes.iter().map(|c| c.resource_type.clone()).collect();
1598                    record_stack_events(state, &stack_id, &stack_name_owned, &changes);
1599                    record_stack_status_event(
1600                        state,
1601                        &stack_id,
1602                        &stack_name_owned,
1603                        "AWS::CloudFormation::Stack",
1604                        "UPDATE_COMPLETE",
1605                    );
1606                    Ok(touched_types)
1607                }
1608                Err(e) => {
1609                    record_stack_status_event(
1610                        state,
1611                        &stack_id,
1612                        &stack_name_owned,
1613                        "AWS::CloudFormation::Stack",
1614                        "UPDATE_ROLLBACK_COMPLETE",
1615                    );
1616                    Err(e)
1617                }
1618            };
1619            let stack_name_for_notif = stack_name_owned.clone();
1620
1621            let touched_types = match update_result {
1622                Ok(types) => types,
1623                Err(error_msg) => {
1624                    drop(accounts);
1625                    Self::send_stack_notification(
1626                        &self.deps.delivery,
1627                        &notification_arns,
1628                        &stack_name_for_notif,
1629                        &stack_id,
1630                        "UPDATE_FAILED",
1631                    );
1632                    return Err(AwsServiceError::aws_error(
1633                        StatusCode::BAD_REQUEST,
1634                        "InsufficientCapabilitiesException",
1635                        error_msg,
1636                    ));
1637                }
1638            };
1639
1640            drop(accounts);
1641            (
1642                touched_types,
1643                stack_id,
1644                stack_name_for_notif,
1645                notification_arns,
1646                resources_snapshot,
1647            )
1648        };
1649
1650        let outputs = Self::resolve_template_outputs(
1651            &input.template_body,
1652            &input.parameters,
1653            &resources_snapshot,
1654            &self.state,
1655        );
1656        Self::ensure_export_uniqueness(&self.state, &req.account_id, &input.stack_name, &outputs)?;
1657        {
1658            let mut accounts = self.state.write();
1659            let state = accounts.get_or_create(&req.account_id);
1660            if let Some(stack) = state
1661                .stacks
1662                .values_mut()
1663                .find(|s| s.stack_id == stack_id && s.status != "DELETE_COMPLETE")
1664            {
1665                stack.outputs = outputs.clone();
1666            }
1667            Self::sync_exports_imports(
1668                state,
1669                &stack_id,
1670                &input.stack_name,
1671                &outputs,
1672                &imported_names,
1673            );
1674        }
1675
1676        Self::send_stack_notification(
1677            &self.deps.delivery,
1678            &notification_arns,
1679            &stack_name_for_notif,
1680            &stack_id,
1681            "UPDATE_COMPLETE",
1682        );
1683
1684        // Persist every snapshot-backed service the update touched, so created
1685        // or deleted resources are reflected on disk after a restart.
1686        persist_touched_services(&self.snapshot_hooks, touched_types).await;
1687
1688        Ok(AwsResponse::xml(
1689            StatusCode::OK,
1690            xml_responses::update_stack_response(&stack_id, &req.request_id),
1691        ))
1692    }
1693
1694    fn get_template(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1695        // GetTemplate has no `@required` members in Smithy; tolerate omission.
1696        let stack_name = Self::get_param(req, "StackName").unwrap_or_default();
1697
1698        let accounts = self.state.read();
1699        let empty = CloudFormationState::new(&req.account_id, &req.region);
1700        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1701        // Stack-not-found has no declared shape on GetTemplate
1702        // (`ChangeSetNotFoundException` is the only declared error). Return
1703        // an empty template body rather than emit an undeclared
1704        // `ValidationError` for synthetic conformance inputs.
1705        let body = state
1706            .stacks
1707            .values()
1708            .find(|s| {
1709                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1710            })
1711            .map(|s| s.template.clone())
1712            .unwrap_or_default();
1713
1714        Ok(AwsResponse::xml(
1715            StatusCode::OK,
1716            xml_responses::get_template_response(&body, &req.request_id),
1717        ))
1718    }
1719}
1720
1721#[async_trait]
1722impl AwsService for CloudFormationService {
1723    fn service_name(&self) -> &str {
1724        "cloudformation"
1725    }
1726
1727    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1728        let action = req.action.as_str();
1729
1730        // Validate scalar field constraints against the Smithy model
1731        // before dispatching. Per-handler logic still owns business
1732        // validation (cross-field checks, parsing, existence). This
1733        // catches length / range / enum violations uniformly so every
1734        // operation returns a `ValidationError` instead of `200 OK` on
1735        // malformed scalars.
1736        crate::input_constraints::validate_input(action, &Self::get_all_params(&req))?;
1737
1738        // Only ops whose handlers actually write to per-account state
1739        // need to trigger snapshot persistence. Pass-through ops that
1740        // return canned IDs but don't touch state are excluded.
1741        let mutates = matches!(
1742            action,
1743            "CreateStack"
1744                | "DeleteStack"
1745                | "UpdateStack"
1746                | "CreateChangeSet"
1747                | "DeleteChangeSet"
1748                | "ExecuteChangeSet"
1749                | "CreateStackSet"
1750                | "DeleteStackSet"
1751                | "CreateStackRefactor"
1752                | "CreateGeneratedTemplate"
1753                | "DeleteGeneratedTemplate"
1754                | "SetStackPolicy"
1755                | "UpdateTerminationProtection"
1756                | "ActivateOrganizationsAccess"
1757                | "DeactivateOrganizationsAccess"
1758        );
1759        let result = match action {
1760            "CreateStack" => self.create_stack(&req).await,
1761            "DeleteStack" => self.delete_stack(&req).await,
1762            "DescribeStacks" => self.describe_stacks(&req),
1763            "ListStacks" => self.list_stacks(&req),
1764            "ListStackResources" => self.list_stack_resources(&req),
1765            "DescribeStackResources" => self.describe_stack_resources(&req),
1766            "UpdateStack" => self.update_stack(&req).await,
1767            "GetTemplate" => self.get_template(&req),
1768            _ => self.handle_extra_action(&req),
1769        };
1770        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
1771            self.save_snapshot().await;
1772        }
1773        // ExecuteChangeSet provisions resources by mutating each service's
1774        // shared state directly but -- unlike CreateStack/UpdateStack/DeleteStack
1775        // -- never triggered the per-service snapshot hook, so the provisioned
1776        // resources were never written through to disk (#1766 class). `cdk
1777        // deploy`, `aws cloudformation deploy`, and SAM all provision via
1778        // CreateChangeSet + ExecuteChangeSet (not CreateStack), so without this
1779        // their resources reported CREATE_COMPLETE yet vanished on restart.
1780        // save_snapshot above only persists CloudFormation's own stack metadata;
1781        // flush every snapshot-backed service the changeset could have touched.
1782        if action == "ExecuteChangeSet"
1783            && matches!(result.as_ref(), Ok(resp) if resp.status.is_success())
1784        {
1785            for hook in self.snapshot_hooks.values() {
1786                hook().await;
1787            }
1788        }
1789        result
1790    }
1791
1792    fn supported_actions(&self) -> &[&str] {
1793        &[
1794            "ActivateOrganizationsAccess",
1795            "ActivateType",
1796            "BatchDescribeTypeConfigurations",
1797            "CancelUpdateStack",
1798            "ContinueUpdateRollback",
1799            "CreateChangeSet",
1800            "CreateGeneratedTemplate",
1801            "CreateStack",
1802            "CreateStackInstances",
1803            "CreateStackRefactor",
1804            "CreateStackSet",
1805            "DeactivateOrganizationsAccess",
1806            "DeactivateType",
1807            "DeleteChangeSet",
1808            "DeleteGeneratedTemplate",
1809            "DeleteStack",
1810            "DeleteStackInstances",
1811            "DeleteStackSet",
1812            "DeregisterType",
1813            "DescribeAccountLimits",
1814            "DescribeChangeSet",
1815            "DescribeChangeSetHooks",
1816            "DescribeEvents",
1817            "DescribeGeneratedTemplate",
1818            "DescribeOrganizationsAccess",
1819            "DescribePublisher",
1820            "DescribeResourceScan",
1821            "DescribeStackDriftDetectionStatus",
1822            "DescribeStackEvents",
1823            "DescribeStackInstance",
1824            "DescribeStackRefactor",
1825            "DescribeStackResource",
1826            "DescribeStackResourceDrifts",
1827            "DescribeStackResources",
1828            "DescribeStackSet",
1829            "DescribeStackSetOperation",
1830            "DescribeStacks",
1831            "DescribeType",
1832            "DescribeTypeRegistration",
1833            "DetectStackDrift",
1834            "DetectStackResourceDrift",
1835            "DetectStackSetDrift",
1836            "EstimateTemplateCost",
1837            "ExecuteChangeSet",
1838            "ExecuteStackRefactor",
1839            "GetGeneratedTemplate",
1840            "GetHookResult",
1841            "GetStackPolicy",
1842            "GetTemplate",
1843            "GetTemplateSummary",
1844            "ImportStacksToStackSet",
1845            "ListChangeSets",
1846            "ListExports",
1847            "ListGeneratedTemplates",
1848            "ListHookResults",
1849            "ListImports",
1850            "ListResourceScanRelatedResources",
1851            "ListResourceScanResources",
1852            "ListResourceScans",
1853            "ListStackInstanceResourceDrifts",
1854            "ListStackInstances",
1855            "ListStackRefactorActions",
1856            "ListStackRefactors",
1857            "ListStackResources",
1858            "ListStackSetAutoDeploymentTargets",
1859            "ListStackSetOperationResults",
1860            "ListStackSetOperations",
1861            "ListStackSets",
1862            "ListStacks",
1863            "ListTypeRegistrations",
1864            "ListTypeVersions",
1865            "ListTypes",
1866            "PublishType",
1867            "RecordHandlerProgress",
1868            "RegisterPublisher",
1869            "RegisterType",
1870            "RollbackStack",
1871            "SetStackPolicy",
1872            "SetTypeConfiguration",
1873            "SetTypeDefaultVersion",
1874            "SignalResource",
1875            "StartResourceScan",
1876            "StopStackSetOperation",
1877            "TestType",
1878            "UpdateGeneratedTemplate",
1879            "UpdateStack",
1880            "UpdateStackInstances",
1881            "UpdateStackSet",
1882            "UpdateTerminationProtection",
1883            "ValidateTemplate",
1884        ]
1885    }
1886}
1887
1888/// Parsed + validated inputs for `UpdateStack`.
1889struct UpdateStackInput {
1890    stack_name: String,
1891    template_body: String,
1892    parameters: BTreeMap<String, String>,
1893    tags: BTreeMap<String, String>,
1894    notification_arns: Vec<String>,
1895}
1896
1897impl UpdateStackInput {
1898    fn from_params(req: &AwsRequest) -> Result<Self, AwsServiceError> {
1899        let params = CloudFormationService::get_all_params(req);
1900
1901        let stack_name = params
1902            .get("StackName")
1903            .ok_or_else(|| {
1904                AwsServiceError::aws_error(
1905                    StatusCode::BAD_REQUEST,
1906                    "ValidationError",
1907                    "StackName is required",
1908                )
1909            })?
1910            .to_string();
1911
1912        // TemplateBody isn't `@required` in Smithy (TemplateURL +
1913        // UsePreviousTemplate are alternatives). Treat omission as an
1914        // empty body so synthetic conformance inputs don't trip an
1915        // undeclared `ValidationError`.
1916        let template_body = params.get("TemplateBody").cloned().unwrap_or_default();
1917
1918        let mut parameters = CloudFormationService::extract_parameters(&params);
1919        CloudFormationService::merge_parameter_defaults(&mut parameters, &template_body);
1920        Ok(Self {
1921            stack_name,
1922            template_body,
1923            parameters,
1924            tags: CloudFormationService::extract_tags(&params),
1925            notification_arns: CloudFormationService::extract_notification_arns(&params),
1926        })
1927    }
1928}
1929
1930/// One row of structured diff returned by `apply_resource_updates`. Used
1931/// by `ExecuteChangeSet` to emit `StackEvent` rows so `DescribeStackEvents`
1932/// reflects the resources actually created / updated / deleted.
1933#[derive(Debug, Clone)]
1934pub(crate) struct ResourceChange {
1935    pub action: ResourceChangeAction,
1936    pub logical_id: String,
1937    pub physical_id: String,
1938    pub resource_type: String,
1939}
1940
1941#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1942pub(crate) enum ResourceChangeAction {
1943    Create,
1944    Update,
1945    Delete,
1946}
1947
1948impl ResourceChangeAction {
1949    pub fn status_in_progress(self) -> &'static str {
1950        match self {
1951            Self::Create => "CREATE_IN_PROGRESS",
1952            Self::Update => "UPDATE_IN_PROGRESS",
1953            Self::Delete => "DELETE_IN_PROGRESS",
1954        }
1955    }
1956    pub fn status_complete(self) -> &'static str {
1957        match self {
1958            Self::Create => "CREATE_COMPLETE",
1959            Self::Update => "UPDATE_COMPLETE",
1960            Self::Delete => "DELETE_COMPLETE",
1961        }
1962    }
1963}
1964
1965/// Apply resource updates: delete removed resources, create new ones, and
1966/// in-place update resources whose properties changed. Returns the list of
1967/// changes applied (for event emission) on success or `Err(msg)` if any
1968/// resource operation fails.
1969pub(crate) fn apply_resource_updates(
1970    stack: &mut crate::state::Stack,
1971    new_resource_defs: &[template::ResourceDefinition],
1972    template_body: &str,
1973    parameters: &BTreeMap<String, String>,
1974    provisioner: &crate::resource_provisioner::ResourceProvisioner,
1975    imports: &BTreeMap<String, String>,
1976) -> Result<Vec<ResourceChange>, String> {
1977    let mut changes: Vec<ResourceChange> = Vec::new();
1978    let old_logical_ids: std::collections::HashSet<String> = stack
1979        .resources
1980        .iter()
1981        .map(|r| r.logical_id.clone())
1982        .collect();
1983    let new_logical_ids: std::collections::HashSet<String> = new_resource_defs
1984        .iter()
1985        .map(|r| r.logical_id.clone())
1986        .collect();
1987
1988    // Delete resources no longer in template
1989    let to_remove: Vec<_> = stack
1990        .resources
1991        .iter()
1992        .filter(|r| !new_logical_ids.contains(&r.logical_id))
1993        .cloned()
1994        .collect();
1995    for resource in &to_remove {
1996        let _ = provisioner.delete_resource(resource);
1997        changes.push(ResourceChange {
1998            action: ResourceChangeAction::Delete,
1999            logical_id: resource.logical_id.clone(),
2000            physical_id: resource.physical_id.clone(),
2001            resource_type: resource.resource_type.clone(),
2002        });
2003    }
2004    stack
2005        .resources
2006        .retain(|r| new_logical_ids.contains(&r.logical_id));
2007
2008    // Build physical ID + attribute maps from existing resources
2009    let mut physical_ids: BTreeMap<String, String> = stack
2010        .resources
2011        .iter()
2012        .map(|r| (r.logical_id.clone(), r.physical_id.clone()))
2013        .collect();
2014    let mut attributes: BTreeMap<String, BTreeMap<String, String>> = stack
2015        .resources
2016        .iter()
2017        .map(|r| (r.logical_id.clone(), r.attributes.clone()))
2018        .collect();
2019
2020    // Create new resources / update resources that already exist. Provision in
2021    // dependency order so a `Ref`/`GetAtt`/`Fn::Sub`/`DependsOn` to another
2022    // resource resolves to that resource's physical id rather than its bare
2023    // logical id (which would otherwise get baked into derived state — e.g. a
2024    // Step Functions ASL referencing a Lambda declared later in the template).
2025    let order = template::dependency_order(template_body, parameters, new_resource_defs);
2026    for &idx in &order {
2027        let resource_def = &new_resource_defs[idx];
2028        let resolved_def = template::resolve_resource_properties_with_attrs(
2029            resource_def,
2030            template_body,
2031            parameters,
2032            &physical_ids,
2033            &attributes,
2034            imports,
2035        )
2036        .map_err(|e| {
2037            format!(
2038                "Failed to resolve resource {}: {e}",
2039                resource_def.logical_id
2040            )
2041        })?;
2042
2043        if !old_logical_ids.contains(&resource_def.logical_id) {
2044            match provisioner.create_resource(&resolved_def) {
2045                Ok(stack_resource) => {
2046                    changes.push(ResourceChange {
2047                        action: ResourceChangeAction::Create,
2048                        logical_id: stack_resource.logical_id.clone(),
2049                        physical_id: stack_resource.physical_id.clone(),
2050                        resource_type: stack_resource.resource_type.clone(),
2051                    });
2052                    physical_ids.insert(
2053                        stack_resource.logical_id.clone(),
2054                        stack_resource.physical_id.clone(),
2055                    );
2056                    attributes.insert(
2057                        stack_resource.logical_id.clone(),
2058                        stack_resource.attributes.clone(),
2059                    );
2060                    stack.resources.push(stack_resource);
2061                }
2062                Err(e) => {
2063                    tracing::warn!(
2064                        "Failed to create resource {} during update: {e}",
2065                        resource_def.logical_id
2066                    );
2067                    return Err(format!(
2068                        "Failed to create resource {}: {e}",
2069                        resource_def.logical_id
2070                    ));
2071                }
2072            }
2073        } else {
2074            // Resource exists in both old and new templates — try to apply
2075            // an in-place update. The provisioner returns `Ok(None)` for
2076            // resource types that don't support updates yet; in that case
2077            // the existing resource stays as-is so the rest of the stack
2078            // continues to validate.
2079            let existing = stack
2080                .resources
2081                .iter()
2082                .find(|r| r.logical_id == resource_def.logical_id)
2083                .cloned();
2084            if let Some(existing) = existing {
2085                match provisioner.update_resource(&existing, &resolved_def) {
2086                    Ok(Some(updated)) => {
2087                        changes.push(ResourceChange {
2088                            action: ResourceChangeAction::Update,
2089                            logical_id: updated.logical_id.clone(),
2090                            physical_id: updated.physical_id.clone(),
2091                            resource_type: updated.resource_type.clone(),
2092                        });
2093                        physical_ids
2094                            .insert(updated.logical_id.clone(), updated.physical_id.clone());
2095                        attributes.insert(updated.logical_id.clone(), updated.attributes.clone());
2096                        if let Some(slot) = stack
2097                            .resources
2098                            .iter_mut()
2099                            .find(|r| r.logical_id == updated.logical_id)
2100                        {
2101                            *slot = updated;
2102                        }
2103                    }
2104                    Ok(None) => {
2105                        // Resource type has no update path — leave the
2106                        // existing physical resource untouched.
2107                    }
2108                    Err(e) => {
2109                        tracing::warn!(
2110                            "Failed to update resource {} during update: {e}",
2111                            resource_def.logical_id
2112                        );
2113                        return Err(format!(
2114                            "Failed to update resource {}: {e}",
2115                            resource_def.logical_id
2116                        ));
2117                    }
2118                }
2119            }
2120        }
2121    }
2122
2123    Ok(changes)
2124}
2125
2126/// Pushes a single `StackEvent` row onto the per-stack event log so
2127/// `DescribeStackEvents` returns a chronological history of resource and
2128/// stack-level state transitions.
2129pub(crate) fn record_event(
2130    state: &mut crate::state::CloudFormationState,
2131    stack_id: &str,
2132    stack_name: &str,
2133    logical_id: &str,
2134    physical_id: &str,
2135    resource_type: &str,
2136    status: &str,
2137) {
2138    use serde_json::json;
2139    let event_id = format!(
2140        "{}-{:x}",
2141        logical_id,
2142        std::time::SystemTime::now()
2143            .duration_since(std::time::UNIX_EPOCH)
2144            .map(|d| d.as_nanos())
2145            .unwrap_or(0)
2146    );
2147    let log = state.events.entry(stack_id.to_string()).or_default();
2148
2149    // Timestamps must be sub-second AND strictly increasing within a stack's
2150    // event log. `sam`'s deploy-wait reads the REVIEW_IN_PROGRESS marker's
2151    // timestamp and then only registers events whose `Timestamp` is strictly
2152    // greater; a fast stack that provisions within one second would otherwise
2153    // stamp its REVIEW_IN_PROGRESS marker and terminal CREATE_COMPLETE
2154    // identically, so sam never sees completion and polls until it times out.
2155    // Use millisecond precision and bump by 1ms when the clock hasn't advanced
2156    // past the previous event, guaranteeing a strict order.
2157    // Truncate to the stored (millisecond) resolution before comparing, so the
2158    // strict-ordering check isn't fooled by sub-millisecond bits that vanish on
2159    // serialization (two events in the same millisecond would otherwise both
2160    // serialize identically despite `now > prev` holding at full precision).
2161    let now = chrono::DateTime::from_timestamp_millis(Utc::now().timestamp_millis())
2162        .unwrap_or_else(Utc::now);
2163    let timestamp = match log.last().and_then(|e| e["Timestamp"].as_str()) {
2164        Some(prev) => match chrono::DateTime::parse_from_rfc3339(prev) {
2165            Ok(prev) => {
2166                let prev = prev.with_timezone(&Utc);
2167                if now > prev {
2168                    now
2169                } else {
2170                    prev + chrono::Duration::milliseconds(1)
2171                }
2172            }
2173            Err(_) => now,
2174        },
2175        None => now,
2176    };
2177
2178    log.push(json!({
2179        "EventId": event_id,
2180        "StackId": stack_id,
2181        "StackName": stack_name,
2182        "LogicalResourceId": logical_id,
2183        "PhysicalResourceId": physical_id,
2184        "ResourceType": resource_type,
2185        "ResourceStatus": status,
2186        "Timestamp": timestamp.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
2187    }));
2188}
2189
2190/// Emits IN_PROGRESS + COMPLETE event pairs for every resource change
2191/// applied during an update. Mirrors the event sequence real CloudFormation
2192/// publishes during `ExecuteChangeSet` / `UpdateStack`.
2193/// Persist the CloudFormation snapshot from a detached task. Mirrors
2194/// `CloudFormationService::save_snapshot` but takes owned handles so it can
2195/// run inside the background CreateStack provisioning task (see RDS's
2196/// `save_snapshot_static`).
2197async fn save_snapshot_static(
2198    state: SharedCloudFormationState,
2199    store: Option<Arc<dyn SnapshotStore>>,
2200    lock: Arc<AsyncMutex<()>>,
2201) {
2202    let Some(store) = store else {
2203        return;
2204    };
2205    let _guard = lock.lock().await;
2206    let snapshot = CloudFormationSnapshot {
2207        schema_version: CLOUDFORMATION_SNAPSHOT_SCHEMA_VERSION,
2208        state: None,
2209        accounts: Some(state.read().clone()),
2210    };
2211    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
2212        let bytes = serde_json::to_vec(&snapshot)
2213            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
2214        store.save(&bytes)
2215    })
2216    .await;
2217    match join {
2218        Ok(Ok(())) => {}
2219        Ok(Err(err)) => tracing::error!(%err, "failed to write cloudformation snapshot"),
2220        Err(err) => tracing::error!(%err, "cloudformation snapshot task panicked"),
2221    }
2222}
2223
2224pub(crate) fn record_stack_events(
2225    state: &mut crate::state::CloudFormationState,
2226    stack_id: &str,
2227    stack_name: &str,
2228    changes: &[ResourceChange],
2229) {
2230    for ch in changes {
2231        record_event(
2232            state,
2233            stack_id,
2234            stack_name,
2235            &ch.logical_id,
2236            &ch.physical_id,
2237            &ch.resource_type,
2238            ch.action.status_in_progress(),
2239        );
2240        record_event(
2241            state,
2242            stack_id,
2243            stack_name,
2244            &ch.logical_id,
2245            &ch.physical_id,
2246            &ch.resource_type,
2247            ch.action.status_complete(),
2248        );
2249    }
2250}
2251
2252/// Emits a stack-level lifecycle event (`UPDATE_IN_PROGRESS`,
2253/// `UPDATE_COMPLETE`, `UPDATE_ROLLBACK_COMPLETE`, etc.) keyed on the
2254/// stack's own `LogicalResourceId == stack_name`, matching real CFN.
2255pub(crate) fn record_stack_status_event(
2256    state: &mut crate::state::CloudFormationState,
2257    stack_id: &str,
2258    stack_name: &str,
2259    resource_type: &str,
2260    status: &str,
2261) {
2262    record_event(
2263        state,
2264        stack_id,
2265        stack_name,
2266        stack_name,
2267        stack_id,
2268        resource_type,
2269        status,
2270    );
2271}
2272
2273#[cfg(test)]
2274mod tests {
2275    use super::*;
2276    use http::HeaderMap;
2277    use parking_lot::RwLock;
2278    use std::collections::HashMap;
2279    use std::sync::Arc;
2280
2281    #[test]
2282    fn merge_parameter_defaults_fills_omitted_params() {
2283        // §1.6: a parameter the caller omitted gets its declared Default, so a
2284        // Ref to it resolves to the default instead of the bare param name.
2285        let template = r#"{
2286            "Parameters": {
2287                "InstanceType": {"Type": "String", "Default": "t3.micro"},
2288                "Count": {"Type": "Number", "Default": 3},
2289                "Supplied": {"Type": "String", "Default": "dflt"}
2290            },
2291            "Resources": {}
2292        }"#;
2293        let mut params = BTreeMap::new();
2294        params.insert("Supplied".to_string(), "override".to_string());
2295        CloudFormationService::merge_parameter_defaults(&mut params, template);
2296        assert_eq!(
2297            params.get("InstanceType").map(String::as_str),
2298            Some("t3.micro")
2299        );
2300        assert_eq!(params.get("Count").map(String::as_str), Some("3"));
2301        // A supplied value is not overwritten by the default.
2302        assert_eq!(params.get("Supplied").map(String::as_str), Some("override"));
2303    }
2304
2305    fn make_service() -> CloudFormationService {
2306        let cf_state = Arc::new(RwLock::new(
2307            fakecloud_core::multi_account::MultiAccountState::new(
2308                "123456789012",
2309                "us-east-1",
2310                "http://localhost:4566",
2311            ),
2312        ));
2313        let deps = CloudFormationDeps {
2314            sqs: Arc::new(RwLock::new(
2315                fakecloud_core::multi_account::MultiAccountState::new(
2316                    "123456789012",
2317                    "us-east-1",
2318                    "http://localhost:4566",
2319                ),
2320            )),
2321            sns: Arc::new(RwLock::new(
2322                fakecloud_core::multi_account::MultiAccountState::new(
2323                    "123456789012",
2324                    "us-east-1",
2325                    "http://localhost:4566",
2326                ),
2327            )),
2328            ssm: Arc::new(RwLock::new(
2329                fakecloud_core::multi_account::MultiAccountState::new(
2330                    "123456789012",
2331                    "us-east-1",
2332                    "http://localhost:4566",
2333                ),
2334            )),
2335            iam: Arc::new(RwLock::new(
2336                fakecloud_core::multi_account::MultiAccountState::new(
2337                    "123456789012",
2338                    "us-east-1",
2339                    "",
2340                ),
2341            )),
2342            s3: Arc::new(RwLock::new(
2343                fakecloud_core::multi_account::MultiAccountState::new(
2344                    "123456789012",
2345                    "us-east-1",
2346                    "",
2347                ),
2348            )),
2349            eventbridge: Arc::new(RwLock::new(
2350                fakecloud_core::multi_account::MultiAccountState::new(
2351                    "123456789012",
2352                    "us-east-1",
2353                    "",
2354                ),
2355            )),
2356            dynamodb: Arc::new(RwLock::new(
2357                fakecloud_core::multi_account::MultiAccountState::new(
2358                    "123456789012",
2359                    "us-east-1",
2360                    "",
2361                ),
2362            )),
2363            logs: Arc::new(RwLock::new(
2364                fakecloud_core::multi_account::MultiAccountState::new(
2365                    "123456789012",
2366                    "us-east-1",
2367                    "",
2368                ),
2369            )),
2370            lambda: Arc::new(RwLock::new(
2371                fakecloud_core::multi_account::MultiAccountState::new(
2372                    "123456789012",
2373                    "us-east-1",
2374                    "",
2375                ),
2376            )),
2377            secretsmanager: Arc::new(RwLock::new(
2378                fakecloud_core::multi_account::MultiAccountState::new(
2379                    "123456789012",
2380                    "us-east-1",
2381                    "",
2382                ),
2383            )),
2384            kinesis: Arc::new(RwLock::new(
2385                fakecloud_core::multi_account::MultiAccountState::new(
2386                    "123456789012",
2387                    "us-east-1",
2388                    "",
2389                ),
2390            )),
2391            kms: Arc::new(RwLock::new(
2392                fakecloud_core::multi_account::MultiAccountState::new(
2393                    "123456789012",
2394                    "us-east-1",
2395                    "",
2396                ),
2397            )),
2398            ecr: Arc::new(RwLock::new(
2399                fakecloud_core::multi_account::MultiAccountState::new(
2400                    "123456789012",
2401                    "us-east-1",
2402                    "",
2403                ),
2404            )),
2405            cloudwatch: Arc::new(RwLock::new(fakecloud_cloudwatch::CloudWatchAccounts::new())),
2406            elbv2: Arc::new(RwLock::new(fakecloud_elbv2::Elbv2Accounts::new())),
2407            organizations: Arc::new(RwLock::new(None)),
2408            cognito: Arc::new(RwLock::new(
2409                fakecloud_core::multi_account::MultiAccountState::new(
2410                    "123456789012",
2411                    "us-east-1",
2412                    "",
2413                ),
2414            )),
2415            rds: Arc::new(RwLock::new(
2416                fakecloud_core::multi_account::MultiAccountState::new(
2417                    "123456789012",
2418                    "us-east-1",
2419                    "",
2420                ),
2421            )),
2422            ecs: Arc::new(RwLock::new(
2423                fakecloud_core::multi_account::MultiAccountState::new(
2424                    "123456789012",
2425                    "us-east-1",
2426                    "",
2427                ),
2428            )),
2429            acm: Arc::new(RwLock::new(fakecloud_acm::AcmAccounts::new())),
2430            elasticache: Arc::new(RwLock::new(
2431                fakecloud_core::multi_account::MultiAccountState::new(
2432                    "123456789012",
2433                    "us-east-1",
2434                    "",
2435                ),
2436            )),
2437            route53: Arc::new(RwLock::new(fakecloud_route53::Route53Accounts::new())),
2438            cloudfront: Arc::new(RwLock::new(fakecloud_cloudfront::CloudFrontAccounts::new())),
2439            stepfunctions: Arc::new(RwLock::new(
2440                fakecloud_core::multi_account::MultiAccountState::new(
2441                    "123456789012",
2442                    "us-east-1",
2443                    "",
2444                ),
2445            )),
2446            wafv2: Arc::new(RwLock::new(fakecloud_wafv2::Wafv2Accounts::default())),
2447            apigateway: Arc::new(RwLock::new(
2448                fakecloud_core::multi_account::MultiAccountState::new(
2449                    "123456789012",
2450                    "us-east-1",
2451                    "",
2452                ),
2453            )),
2454            apigatewayv2: Arc::new(RwLock::new(
2455                fakecloud_core::multi_account::MultiAccountState::new(
2456                    "123456789012",
2457                    "us-east-1",
2458                    "",
2459                ),
2460            )),
2461            ses: Arc::new(RwLock::new(
2462                fakecloud_core::multi_account::MultiAccountState::new(
2463                    "123456789012",
2464                    "us-east-1",
2465                    "",
2466                ),
2467            )),
2468            application_autoscaling: Arc::new(parking_lot::RwLock::new(
2469                fakecloud_application_autoscaling::ApplicationAutoScalingAccounts::new(),
2470            )),
2471            athena: Arc::new(parking_lot::RwLock::new(
2472                fakecloud_athena::AthenaAccounts::new(),
2473            )),
2474            firehose: Arc::new(parking_lot::RwLock::new(
2475                fakecloud_firehose::FirehoseAccounts::new(),
2476            )),
2477            glue: Arc::new(parking_lot::RwLock::new(fakecloud_glue::GlueAccounts::new())),
2478            delivery: Arc::new(DeliveryBus::new()),
2479            lambda_runtime: None,
2480        };
2481        CloudFormationService::new(cf_state, deps)
2482    }
2483
2484    fn make_request(action: &str, params: HashMap<String, String>) -> AwsRequest {
2485        AwsRequest {
2486            service: "cloudformation".to_string(),
2487            action: action.to_string(),
2488            region: "us-east-1".to_string(),
2489            account_id: "123456789012".to_string(),
2490            request_id: "test-request-id".to_string(),
2491            headers: HeaderMap::new(),
2492            query_params: params,
2493            body: bytes::Bytes::new(),
2494            body_stream: parking_lot::Mutex::new(None),
2495            path_segments: vec![],
2496            raw_path: "/".to_string(),
2497            raw_query: String::new(),
2498            method: http::Method::POST,
2499            is_query_protocol: true,
2500            access_key_id: None,
2501            principal: None,
2502        }
2503    }
2504
2505    #[tokio::test]
2506    async fn update_stack_sets_failed_status_on_resource_error() {
2507        let svc = make_service();
2508
2509        // Create a stack with just a queue
2510        let mut create_params = HashMap::new();
2511        create_params.insert("StackName".to_string(), "test-stack".to_string());
2512        create_params.insert(
2513            "TemplateBody".to_string(),
2514            r#"{"Resources":{"MyQueue":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"q1"}}}}"#.to_string(),
2515        );
2516        let req = make_request("CreateStack", create_params);
2517        let result = svc.create_stack(&req).await;
2518        assert!(result.is_ok());
2519
2520        // Update stack adding an SNS subscription with a non-existent topic
2521        let mut update_params = HashMap::new();
2522        update_params.insert("StackName".to_string(), "test-stack".to_string());
2523        update_params.insert(
2524            "TemplateBody".to_string(),
2525            r#"{"Resources":{"MyQueue":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"q1"}},"BadSub":{"Type":"AWS::SNS::Subscription","Properties":{"TopicArn":"arn:aws:sns:us-east-1:123456789012:nope","Protocol":"sqs","Endpoint":"arn:aws:sqs:us-east-1:123456789012:q1"}}}}"#.to_string(),
2526        );
2527        let req = make_request("UpdateStack", update_params);
2528        let result = svc.update_stack(&req).await;
2529
2530        // Should return an error
2531        assert!(result.is_err());
2532
2533        // Stack status should be UPDATE_ROLLBACK_COMPLETE — matches the
2534        // terminal status real CloudFormation lands on after a failed
2535        // update attempt that gets rolled back.
2536        let accounts = svc.state.read();
2537        let state = accounts.get("123456789012").unwrap();
2538        let stack = state.stacks.get("test-stack").unwrap();
2539        assert_eq!(stack.status, "UPDATE_ROLLBACK_COMPLETE");
2540    }
2541
2542    #[tokio::test]
2543    async fn create_stack_resolves_ref_to_physical_id() {
2544        let svc = make_service();
2545
2546        // Template where subscription Refs the topic
2547        let template = r#"{
2548            "Resources": {
2549                "MyTopic": {
2550                    "Type": "AWS::SNS::Topic",
2551                    "Properties": { "TopicName": "ref-test-topic" }
2552                },
2553                "MySub": {
2554                    "Type": "AWS::SNS::Subscription",
2555                    "Properties": {
2556                        "TopicArn": { "Ref": "MyTopic" },
2557                        "Protocol": "sqs",
2558                        "Endpoint": "arn:aws:sqs:us-east-1:123456789012:some-queue"
2559                    }
2560                }
2561            }
2562        }"#;
2563
2564        let mut params = HashMap::new();
2565        params.insert("StackName".to_string(), "ref-stack".to_string());
2566        params.insert("TemplateBody".to_string(), template.to_string());
2567        let req = make_request("CreateStack", params);
2568        let result = svc.create_stack(&req).await;
2569        assert!(result.is_ok(), "CreateStack failed: {:?}", result.err());
2570
2571        // Verify both resources were created
2572        let accounts = svc.state.read();
2573        let state = accounts.get("123456789012").unwrap();
2574        let stack = state.stacks.get("ref-stack").unwrap();
2575        assert_eq!(stack.resources.len(), 2);
2576        assert_eq!(stack.status, "CREATE_COMPLETE");
2577
2578        // The subscription's physical ID should be an ARN (not just "MyTopic")
2579        let sub = stack
2580            .resources
2581            .iter()
2582            .find(|r| r.logical_id == "MySub")
2583            .unwrap();
2584        assert!(
2585            sub.physical_id.contains("ref-test-topic"),
2586            "Subscription physical ID should reference the topic ARN, got: {}",
2587            sub.physical_id
2588        );
2589    }
2590
2591    /// On the multi-thread server runtime, a stack containing a custom
2592    /// resource (whose provisioning can block for minutes on a cold Lambda
2593    /// image pull) must NOT be provisioned synchronously inside the request
2594    /// handler — CreateStack returns the StackId immediately and DescribeStacks
2595    /// observes CREATE_IN_PROGRESS -> CREATE_COMPLETE (bug-audit 2026-06-13,
2596    /// 3.1).
2597    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2598    async fn create_stack_custom_resource_provisions_asynchronously() {
2599        let svc = make_service();
2600        let template = r#"{
2601            "Resources": {
2602                "MyCustom": {
2603                    "Type": "Custom::Thing",
2604                    "Properties": {
2605                        "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:handler"
2606                    }
2607                }
2608            }
2609        }"#;
2610        let mut params = HashMap::new();
2611        params.insert("StackName".to_string(), "async-stack".to_string());
2612        params.insert("TemplateBody".to_string(), template.to_string());
2613        let req = make_request("CreateStack", params);
2614
2615        // CreateStack returns promptly with a StackId; provisioning runs in a
2616        // detached background task. The stack is recorded before the task can
2617        // possibly finish, so right after return it is at worst already
2618        // terminal — never a value that proves the handler blocked on the
2619        // (potentially minutes-long) provisioning loop. The key guarantee is
2620        // that the call returns without running the provisioner inline.
2621        let resp = svc
2622            .create_stack(&req)
2623            .await
2624            .expect("create returns StackId");
2625        assert!(resp.status.is_success());
2626        {
2627            let accounts = svc.state.read();
2628            let stack = accounts
2629                .get("123456789012")
2630                .unwrap()
2631                .stacks
2632                .get("async-stack")
2633                .expect("stack seeded synchronously");
2634            assert!(
2635                stack.status == "CREATE_IN_PROGRESS" || stack.status == "CREATE_COMPLETE",
2636                "unexpected status right after create: {}",
2637                stack.status
2638            );
2639        }
2640
2641        // Poll DescribeStacks (via state) until the background task flips the
2642        // stack to its terminal CREATE_COMPLETE status.
2643        let mut status = String::new();
2644        for _ in 0..200 {
2645            {
2646                let accounts = svc.state.read();
2647                if let Some(stack) = accounts
2648                    .get("123456789012")
2649                    .and_then(|s| s.stacks.get("async-stack"))
2650                {
2651                    status = stack.status.clone();
2652                    if status != "CREATE_IN_PROGRESS" {
2653                        break;
2654                    }
2655                }
2656            }
2657            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2658        }
2659        assert_eq!(
2660            status, "CREATE_COMPLETE",
2661            "stack should reach CREATE_COMPLETE"
2662        );
2663
2664        let accounts = svc.state.read();
2665        let stack = accounts
2666            .get("123456789012")
2667            .unwrap()
2668            .stacks
2669            .get("async-stack")
2670            .unwrap();
2671        assert_eq!(stack.resources.len(), 1);
2672        assert_eq!(stack.resources[0].resource_type, "Custom::Thing");
2673    }
2674
2675    // ── Service error paths ──
2676
2677    #[tokio::test]
2678    async fn create_stack_missing_name_errors() {
2679        let svc = make_service();
2680        let mut params = HashMap::new();
2681        params.insert("TemplateBody".to_string(), "{}".to_string());
2682        let req = make_request("CreateStack", params);
2683        assert!(svc.create_stack(&req).await.is_err());
2684    }
2685
2686    #[tokio::test]
2687    async fn create_stack_missing_template_creates_empty_stack() {
2688        // `TemplateBody` isn't `@required` in Smithy and CreateStack
2689        // declares no `ValidationError` shape, so missing/placeholder
2690        // bodies now create an empty stack rather than rejecting with
2691        // an undeclared wire code (strict-mode conformance gap).
2692        let svc = make_service();
2693        let mut params = HashMap::new();
2694        params.insert("StackName".to_string(), "s".to_string());
2695        let req = make_request("CreateStack", params);
2696        svc.create_stack(&req)
2697            .await
2698            .expect("empty-body create succeeds");
2699    }
2700
2701    #[tokio::test]
2702    async fn create_stack_duplicate_errors() {
2703        let svc = make_service();
2704        let mut params = HashMap::new();
2705        params.insert("StackName".to_string(), "dup".to_string());
2706        params.insert(
2707            "TemplateBody".to_string(),
2708            r#"{"Resources":{"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"dq"}}}}"#
2709                .to_string(),
2710        );
2711        let req = make_request("CreateStack", params.clone());
2712        svc.create_stack(&req).await.unwrap();
2713        let req = make_request("CreateStack", params);
2714        assert!(svc.create_stack(&req).await.is_err());
2715    }
2716
2717    #[tokio::test]
2718    async fn create_stack_invalid_template_creates_empty_stack() {
2719        // CreateStack's Smithy `errors` list has no `ValidationError`
2720        // shape, so unparseable bodies degrade to an empty parsed
2721        // template instead of raising an undeclared wire code.
2722        let svc = make_service();
2723        let mut params = HashMap::new();
2724        params.insert("StackName".to_string(), "bad".to_string());
2725        params.insert("TemplateBody".to_string(), "not json".to_string());
2726        let req = make_request("CreateStack", params);
2727        svc.create_stack(&req)
2728            .await
2729            .expect("bad-body create succeeds");
2730    }
2731
2732    #[tokio::test]
2733    async fn delete_stack_unknown_is_noop() {
2734        let svc = make_service();
2735        let mut params = HashMap::new();
2736        params.insert("StackName".to_string(), "ghost".to_string());
2737        let req = make_request("DeleteStack", params);
2738        assert!(svc.delete_stack(&req).await.is_ok());
2739    }
2740
2741    #[test]
2742    fn describe_stacks_nonexistent_errors() {
2743        // Querying an explicit, unknown StackName returns AWS's
2744        // `ValidationError: Stack with id <name> does not exist` so deploy
2745        // tools that probe stack existence (SAM, `aws cloudformation
2746        // deploy`) get the signal they expect (issue #1646).
2747        let svc = make_service();
2748        let mut params = HashMap::new();
2749        params.insert("StackName".to_string(), "ghost".to_string());
2750        let req = make_request("DescribeStacks", params);
2751        match svc.describe_stacks(&req) {
2752            Ok(_) => panic!("ghost stack must return an error, not an empty list"),
2753            Err(e) => {
2754                assert_eq!(e.status(), StatusCode::BAD_REQUEST);
2755                assert_eq!(e.code(), "ValidationError");
2756                assert!(
2757                    e.message().contains("does not exist"),
2758                    "got: {}",
2759                    e.message()
2760                );
2761            }
2762        }
2763    }
2764
2765    #[test]
2766    fn describe_stacks_empty_returns_all() {
2767        let svc = make_service();
2768        let req = make_request("DescribeStacks", HashMap::new());
2769        let resp = svc.describe_stacks(&req).unwrap();
2770        let b = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
2771        assert!(b.contains("DescribeStacksResult"));
2772    }
2773
2774    #[test]
2775    fn list_stacks_empty_returns_ok() {
2776        let svc = make_service();
2777        let req = make_request("ListStacks", HashMap::new());
2778        let resp = svc.list_stacks(&req).unwrap();
2779        let b = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
2780        assert!(b.contains("ListStacksResult"));
2781    }
2782
2783    #[test]
2784    fn list_stack_resources_missing_name_returns_validation_error() {
2785        // ListStackResources declares no `errors` in Smithy, so any
2786        // AWS-shaped 4xx counts as a handler response. We reject an
2787        // omitted StackName with `ValidationError` to keep negative
2788        // conformance variants honest; unknown-but-supplied names still
2789        // resolve to an empty list (see the test below).
2790        let svc = make_service();
2791        let req = make_request("ListStackResources", HashMap::new());
2792        let err = match svc.list_stack_resources(&req) {
2793            Err(e) => e,
2794            Ok(_) => panic!("omitted StackName must be rejected"),
2795        };
2796        assert_eq!(err.code(), "ValidationError");
2797    }
2798
2799    #[test]
2800    fn list_stack_resources_unknown_stack_returns_empty() {
2801        let svc = make_service();
2802        let mut params = HashMap::new();
2803        params.insert("StackName".to_string(), "ghost".to_string());
2804        let req = make_request("ListStackResources", params);
2805        svc.list_stack_resources(&req).expect("unknown is empty");
2806    }
2807
2808    #[test]
2809    fn describe_stack_resources_missing_name_returns_empty() {
2810        let svc = make_service();
2811        let req = make_request("DescribeStackResources", HashMap::new());
2812        svc.describe_stack_resources(&req)
2813            .expect("missing name is ok");
2814    }
2815
2816    #[test]
2817    fn get_template_missing_name_returns_empty_body() {
2818        let svc = make_service();
2819        let req = make_request("GetTemplate", HashMap::new());
2820        svc.get_template(&req).expect("missing name is ok");
2821    }
2822
2823    #[test]
2824    fn get_template_unknown_stack_returns_empty_body() {
2825        let svc = make_service();
2826        let mut params = HashMap::new();
2827        params.insert("StackName".to_string(), "ghost".to_string());
2828        let req = make_request("GetTemplate", params);
2829        svc.get_template(&req).expect("unknown is empty");
2830    }
2831
2832    #[tokio::test]
2833    async fn update_stack_missing_name_errors() {
2834        let svc = make_service();
2835        let mut params = HashMap::new();
2836        params.insert("TemplateBody".to_string(), "{}".to_string());
2837        let req = make_request("UpdateStack", params);
2838        assert!(svc.update_stack(&req).await.is_err());
2839    }
2840
2841    #[tokio::test]
2842    async fn update_stack_unknown_stack_returns_synthetic_id() {
2843        // UpdateStack declares only `InsufficientCapabilitiesException`
2844        // and `TokenAlreadyExistsException`, neither of which fits
2845        // "stack does not exist". Synthetic conformance inputs target
2846        // a placeholder stack, so we return a synthetic StackId rather
2847        // than an undeclared `ValidationError`. Real callers create
2848        // the stack first.
2849        let svc = make_service();
2850        let mut params = HashMap::new();
2851        params.insert("StackName".to_string(), "ghost".to_string());
2852        params.insert(
2853            "TemplateBody".to_string(),
2854            r#"{"Resources":{}}"#.to_string(),
2855        );
2856        let req = make_request("UpdateStack", params);
2857        let resp = svc
2858            .update_stack(&req)
2859            .await
2860            .expect("ghost update is synthetic");
2861        let b = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
2862        assert!(b.contains("UpdateStackResult"));
2863    }
2864
2865    #[tokio::test]
2866    async fn create_stack_resolves_outputs_and_records_export() {
2867        let svc = make_service();
2868        let template = r#"{
2869            "Resources": {
2870                "Q": {"Type":"AWS::SQS::Queue","Properties":{"QueueName":"out-q"}}
2871            },
2872            "Outputs": {
2873                "QueueUrl": {
2874                    "Value": {"Ref": "Q"},
2875                    "Description": "Url",
2876                    "Export": {"Name": "TheQueueUrl"}
2877                }
2878            }
2879        }"#;
2880        let mut params = HashMap::new();
2881        params.insert("StackName".to_string(), "outs".to_string());
2882        params.insert("TemplateBody".to_string(), template.to_string());
2883        let req = make_request("CreateStack", params);
2884        svc.create_stack(&req).await.expect("create stack");
2885
2886        let accounts = svc.state.read();
2887        let stack = accounts
2888            .get("123456789012")
2889            .unwrap()
2890            .stacks
2891            .get("outs")
2892            .unwrap();
2893        assert_eq!(stack.outputs.len(), 1);
2894        assert_eq!(stack.outputs[0].key, "QueueUrl");
2895        assert_eq!(stack.outputs[0].export_name.as_deref(), Some("TheQueueUrl"));
2896        assert!(!stack.outputs[0].value.is_empty());
2897    }
2898
2899    #[tokio::test]
2900    async fn create_stack_rejects_duplicate_export_name() {
2901        let svc = make_service();
2902        let mk = |name: &str| {
2903            let template = format!(
2904                r#"{{
2905                    "Resources": {{"Q":{{"Type":"AWS::SQS::Queue","Properties":{{"QueueName":"q-{name}"}}}}}},
2906                    "Outputs": {{"QueueUrl":{{"Value":{{"Ref":"Q"}},"Export":{{"Name":"DupExport"}}}}}}
2907                }}"#
2908            );
2909            let mut params = HashMap::new();
2910            params.insert("StackName".to_string(), name.to_string());
2911            params.insert("TemplateBody".to_string(), template);
2912            make_request("CreateStack", params)
2913        };
2914        match svc.create_stack(&mk("first")).await {
2915            Ok(_) => {}
2916            Err(e) => panic!("first stack: {e:?}"),
2917        }
2918        // The second stack's export collides with the first. Since
2919        // provisioning is now asynchronous, the collision can no longer be a
2920        // synchronous CreateStack error — it surfaces as a failed create.
2921        // On the current-thread test runtime the provisioning task runs
2922        // inline, so the stack is already CREATE_FAILED on return.
2923        svc.create_stack(&mk("second"))
2924            .await
2925            .expect("CreateStack returns StackId even when provisioning fails");
2926        let accounts = svc.state.read();
2927        let stack = accounts
2928            .get("123456789012")
2929            .unwrap()
2930            .stacks
2931            .get("second")
2932            .expect("second stack recorded");
2933        assert_eq!(stack.status, "CREATE_FAILED");
2934        // The first stack keeps the export.
2935        let exports = &accounts.get("123456789012").unwrap().exports;
2936        assert_eq!(
2937            exports
2938                .get("DupExport")
2939                .map(|e| e.exporting_stack_name.as_str()),
2940            Some("first")
2941        );
2942    }
2943
2944    #[tokio::test]
2945    async fn import_value_resolves_against_other_stack_export() {
2946        let svc = make_service();
2947
2948        let producer_tpl = r#"{
2949            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"prod-q"}}},
2950            "Outputs": {"Out":{"Value":{"Ref":"Q"},"Export":{"Name":"SharedQueueUrl"}}}
2951        }"#;
2952        let mut p = HashMap::new();
2953        p.insert("StackName".to_string(), "producer".to_string());
2954        p.insert("TemplateBody".to_string(), producer_tpl.to_string());
2955        svc.create_stack(&make_request("CreateStack", p))
2956            .await
2957            .expect("producer");
2958
2959        let consumer_tpl = r#"{
2960            "Resources": {"Q2":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"cons-q"}}},
2961            "Outputs": {"Imp":{"Value":{"Fn::ImportValue":"SharedQueueUrl"}}}
2962        }"#;
2963        let mut p = HashMap::new();
2964        p.insert("StackName".to_string(), "consumer".to_string());
2965        p.insert("TemplateBody".to_string(), consumer_tpl.to_string());
2966        svc.create_stack(&make_request("CreateStack", p))
2967            .await
2968            .expect("consumer");
2969
2970        let accounts = svc.state.read();
2971        let prod_url = accounts
2972            .get("123456789012")
2973            .unwrap()
2974            .stacks
2975            .get("producer")
2976            .unwrap()
2977            .outputs[0]
2978            .value
2979            .clone();
2980        let cons = accounts
2981            .get("123456789012")
2982            .unwrap()
2983            .stacks
2984            .get("consumer")
2985            .unwrap();
2986        assert_eq!(cons.outputs[0].value, prod_url);
2987    }
2988
2989    #[tokio::test]
2990    async fn create_stack_records_export_in_state_registry() {
2991        let svc = make_service();
2992        let template = r#"{
2993            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"reg-q"}}},
2994            "Outputs": {"Url":{"Value":{"Ref":"Q"},"Export":{"Name":"reg-url"}}}
2995        }"#;
2996        let mut params = HashMap::new();
2997        params.insert("StackName".to_string(), "reg".to_string());
2998        params.insert("TemplateBody".to_string(), template.to_string());
2999        svc.create_stack(&make_request("CreateStack", params))
3000            .await
3001            .expect("create");
3002
3003        let accounts = svc.state.read();
3004        let state = accounts.get("123456789012").unwrap();
3005        let export = state
3006            .exports
3007            .get("reg-url")
3008            .expect("export registered in state.exports");
3009        assert_eq!(export.exporting_stack_name, "reg");
3010        assert!(!export.value.is_empty());
3011        assert!(export.exporting_stack_id.contains("reg"));
3012    }
3013
3014    #[tokio::test]
3015    async fn import_value_with_unknown_export_errors() {
3016        let svc = make_service();
3017        let consumer_tpl = r#"{
3018            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{
3019                "QueueName": {"Fn::ImportValue":"missing-export"}
3020            }}}
3021        }"#;
3022        let mut p = HashMap::new();
3023        p.insert("StackName".to_string(), "bad-consumer".to_string());
3024        p.insert("TemplateBody".to_string(), consumer_tpl.to_string());
3025        match svc.create_stack(&make_request("CreateStack", p)).await {
3026            Ok(_) => panic!("expected ValidationError for unknown export"),
3027            Err(e) => {
3028                let msg = format!("{e:?}");
3029                assert!(msg.contains("No export named missing-export"), "got {msg}");
3030            }
3031        }
3032    }
3033
3034    #[tokio::test]
3035    async fn delete_stack_blocked_when_export_in_use_and_unblocked_after_consumer_delete() {
3036        let svc = make_service();
3037
3038        let producer_tpl = r#"{
3039            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"prod"}}},
3040            "Outputs": {"Out":{"Value":{"Ref":"Q"},"Export":{"Name":"my-arn"}}}
3041        }"#;
3042        let mut p = HashMap::new();
3043        p.insert("StackName".to_string(), "producer".to_string());
3044        p.insert("TemplateBody".to_string(), producer_tpl.to_string());
3045        svc.create_stack(&make_request("CreateStack", p))
3046            .await
3047            .expect("producer");
3048
3049        let consumer_tpl = r#"{
3050            "Resources": {"Q2":{"Type":"AWS::SQS::Queue","Properties":{
3051                "QueueName": "cons-q",
3052                "Tags": [{"Key":"k","Value":{"Fn::ImportValue":"my-arn"}}]
3053            }}}
3054        }"#;
3055        let mut p = HashMap::new();
3056        p.insert("StackName".to_string(), "consumer".to_string());
3057        p.insert("TemplateBody".to_string(), consumer_tpl.to_string());
3058        svc.create_stack(&make_request("CreateStack", p))
3059            .await
3060            .expect("consumer");
3061
3062        // Producer delete must fail while consumer still imports.
3063        let mut p = HashMap::new();
3064        p.insert("StackName".to_string(), "producer".to_string());
3065        match svc.delete_stack(&make_request("DeleteStack", p)).await {
3066            Ok(_) => panic!("delete must fail while imports exist"),
3067            Err(e) => {
3068                let msg = format!("{e:?}");
3069                assert!(msg.contains("Export my-arn cannot be deleted"), "got {msg}");
3070            }
3071        }
3072
3073        // Delete consumer first.
3074        let mut p = HashMap::new();
3075        p.insert("StackName".to_string(), "consumer".to_string());
3076        svc.delete_stack(&make_request("DeleteStack", p))
3077            .await
3078            .expect("consumer delete");
3079
3080        // Now producer delete succeeds.
3081        let mut p = HashMap::new();
3082        p.insert("StackName".to_string(), "producer".to_string());
3083        svc.delete_stack(&make_request("DeleteStack", p))
3084            .await
3085            .expect("producer delete after consumer gone");
3086
3087        let accounts = svc.state.read();
3088        let state = accounts.get("123456789012").unwrap();
3089        assert!(state.exports.is_empty(), "exports cleared after delete");
3090        assert!(state.imports.is_empty(), "imports cleared after delete");
3091    }
3092
3093    // ---- CFN provisioner persistence (issue: CFN resources lost on restart) ----
3094
3095    use std::sync::atomic::{AtomicUsize, Ordering};
3096
3097    /// A snapshot hook that counts how many times it fires, standing in for a
3098    /// real service's whole-state persist.
3099    fn counting_hook(counter: Arc<AtomicUsize>) -> fakecloud_persistence::SnapshotHook {
3100        Arc::new(move || {
3101            let counter = counter.clone();
3102            Box::pin(async move {
3103                counter.fetch_add(1, Ordering::SeqCst);
3104            })
3105        })
3106    }
3107
3108    fn disk_s3_store(tmp: &tempfile::TempDir) -> Arc<fakecloud_persistence::s3::DiskS3Store> {
3109        let cache = Arc::new(fakecloud_persistence::cache::BodyCache::new(1024 * 1024));
3110        Arc::new(fakecloud_persistence::s3::DiskS3Store::new(
3111            tmp.path().to_path_buf(),
3112            cache,
3113        ))
3114    }
3115
3116    // A stack touching SQS + SNS (snapshot-backed) and an S3 bucket (S3Store
3117    // write-through). Lambda is registered as a hook below but NOT in the
3118    // template, so it must not fire -- proving per-service selectivity.
3119    const PERSIST_TEMPLATE: &str = r#"{"Resources":{
3120        "Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"cfn-q"}},
3121        "T":{"Type":"AWS::SNS::Topic","Properties":{"TopicName":"cfn-t"}},
3122        "B":{"Type":"AWS::S3::Bucket","Properties":{"BucketName":"cfn-bucket"}}
3123    }}"#;
3124
3125    fn create_req(stack: &str) -> AwsRequest {
3126        let mut p = HashMap::new();
3127        p.insert("StackName".to_string(), stack.to_string());
3128        p.insert("TemplateBody".to_string(), PERSIST_TEMPLATE.to_string());
3129        make_request("CreateStack", p)
3130    }
3131
3132    #[tokio::test]
3133    async fn cfn_create_persists_touched_services_and_writes_bucket_to_store() {
3134        let tmp = tempfile::tempdir().unwrap();
3135        let store = disk_s3_store(&tmp);
3136        let counter = Arc::new(AtomicUsize::new(0));
3137        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3138            BTreeMap::new();
3139        hooks.insert("sqs", counting_hook(counter.clone()));
3140        hooks.insert("sns", counting_hook(counter.clone()));
3141        // Registered but not in the template -> must not fire.
3142        hooks.insert("lambda", counting_hook(counter.clone()));
3143        let svc = make_service()
3144            .with_s3_store(store.clone())
3145            .with_snapshot_hooks(hooks);
3146
3147        svc.create_stack(&create_req("probe")).await.unwrap();
3148
3149        // sqs + sns fired once each; lambda untouched.
3150        assert_eq!(counter.load(Ordering::SeqCst), 2);
3151        // The bucket was written through to the S3 store, not just the in-memory map.
3152        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3153        assert!(
3154            loaded.buckets.contains_key("cfn-bucket"),
3155            "CFN bucket should be persisted to the S3 store"
3156        );
3157    }
3158
3159    #[tokio::test]
3160    async fn cfn_delete_persists_touched_services_and_removes_bucket_from_store() {
3161        let tmp = tempfile::tempdir().unwrap();
3162        let store = disk_s3_store(&tmp);
3163        let counter = Arc::new(AtomicUsize::new(0));
3164        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3165            BTreeMap::new();
3166        hooks.insert("sqs", counting_hook(counter.clone()));
3167        hooks.insert("sns", counting_hook(counter.clone()));
3168        let svc = make_service()
3169            .with_s3_store(store.clone())
3170            .with_snapshot_hooks(hooks);
3171
3172        svc.create_stack(&create_req("probe")).await.unwrap();
3173        assert_eq!(counter.load(Ordering::SeqCst), 2, "create fired sqs + sns");
3174
3175        let mut p = HashMap::new();
3176        p.insert("StackName".to_string(), "probe".to_string());
3177        svc.delete_stack(&make_request("DeleteStack", p))
3178            .await
3179            .unwrap();
3180
3181        // Delete fired the touched services again (sqs + sns).
3182        assert_eq!(counter.load(Ordering::SeqCst), 4, "delete fired sqs + sns");
3183        // And the CFN-deleted bucket is gone from the store, so it does not
3184        // reappear after a restart.
3185        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3186        assert!(
3187            !loaded.buckets.contains_key("cfn-bucket"),
3188            "CFN-deleted bucket should be removed from the S3 store"
3189        );
3190    }
3191
3192    #[tokio::test]
3193    async fn cfn_persist_skips_services_without_a_registered_hook() {
3194        // Only "sqs" has a hook; the stack also touches SNS and S3. The missing
3195        // hooks must be silently skipped (no panic), and "sqs" fires once.
3196        let tmp = tempfile::tempdir().unwrap();
3197        let store = disk_s3_store(&tmp);
3198        let counter = Arc::new(AtomicUsize::new(0));
3199        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3200            BTreeMap::new();
3201        hooks.insert("sqs", counting_hook(counter.clone()));
3202        let svc = make_service()
3203            .with_s3_store(store.clone())
3204            .with_snapshot_hooks(hooks);
3205
3206        svc.create_stack(&create_req("probe")).await.unwrap();
3207        assert_eq!(counter.load(Ordering::SeqCst), 1, "only sqs has a hook");
3208    }
3209
3210    #[tokio::test]
3211    async fn cfn_update_persists_touched_services() {
3212        // Create with just SQS, then update to a template that adds SNS + a
3213        // bucket; the update must persist the services it touches.
3214        let tmp = tempfile::tempdir().unwrap();
3215        let store = disk_s3_store(&tmp);
3216        let counter = Arc::new(AtomicUsize::new(0));
3217        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3218            BTreeMap::new();
3219        hooks.insert("sqs", counting_hook(counter.clone()));
3220        hooks.insert("sns", counting_hook(counter.clone()));
3221        let svc = make_service()
3222            .with_s3_store(store.clone())
3223            .with_snapshot_hooks(hooks);
3224
3225        let mut create = HashMap::new();
3226        create.insert("StackName".to_string(), "upd".to_string());
3227        create.insert(
3228            "TemplateBody".to_string(),
3229            r#"{"Resources":{"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"u-q"}}}}"#
3230                .to_string(),
3231        );
3232        svc.create_stack(&make_request("CreateStack", create))
3233            .await
3234            .unwrap();
3235        let after_create = counter.load(Ordering::SeqCst);
3236
3237        let mut update = HashMap::new();
3238        update.insert("StackName".to_string(), "upd".to_string());
3239        update.insert("TemplateBody".to_string(), PERSIST_TEMPLATE.to_string());
3240        svc.update_stack(&make_request("UpdateStack", update))
3241            .await
3242            .unwrap();
3243
3244        // The update touched at least SNS (added); the hook count must grow.
3245        assert!(
3246            counter.load(Ordering::SeqCst) > after_create,
3247            "update should persist the services it touched"
3248        );
3249        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3250        assert!(loaded.buckets.contains_key("cfn-bucket"));
3251    }
3252
3253    #[tokio::test]
3254    async fn cfn_execute_change_set_persists_touched_services() {
3255        // The changeset path -- CreateChangeSet + ExecuteChangeSet -- is how
3256        // `cdk deploy`, `aws cloudformation deploy`, and SAM provision. It must
3257        // write provisioned services through to disk the same way CreateStack
3258        // does, or the resources report CREATE_COMPLETE yet vanish on restart
3259        // (bug-audit 2026-06-20, 0.A1 / #1766 class).
3260        let tmp = tempfile::tempdir().unwrap();
3261        let store = disk_s3_store(&tmp);
3262        let counter = Arc::new(AtomicUsize::new(0));
3263        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3264            BTreeMap::new();
3265        hooks.insert("sqs", counting_hook(counter.clone()));
3266        let svc = make_service()
3267            .with_s3_store(store.clone())
3268            .with_snapshot_hooks(hooks);
3269
3270        let mut create = HashMap::new();
3271        create.insert("StackName".to_string(), "cs-stack".to_string());
3272        create.insert("ChangeSetName".to_string(), "cs1".to_string());
3273        create.insert("ChangeSetType".to_string(), "CREATE".to_string());
3274        create.insert(
3275            "TemplateBody".to_string(),
3276            r#"{"Resources":{"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"cs-q"}}}}"#
3277                .to_string(),
3278        );
3279        svc.handle(make_request("CreateChangeSet", create))
3280            .await
3281            .unwrap();
3282        // CreateChangeSet doesn't provision -- it must not persist a service yet.
3283        let before = counter.load(Ordering::SeqCst);
3284
3285        let mut exec = HashMap::new();
3286        exec.insert("StackName".to_string(), "cs-stack".to_string());
3287        exec.insert("ChangeSetName".to_string(), "cs1".to_string());
3288        svc.handle(make_request("ExecuteChangeSet", exec))
3289            .await
3290            .unwrap();
3291
3292        assert!(
3293            counter.load(Ordering::SeqCst) > before,
3294            "ExecuteChangeSet must fire the sqs snapshot hook so the provisioned \
3295             queue survives a restart"
3296        );
3297    }
3298
3299    #[test]
3300    fn service_key_for_type_maps_services_and_aliases() {
3301        // Direct service segments.
3302        assert_eq!(
3303            service_key_for_type("AWS::Lambda::Function"),
3304            Some("lambda")
3305        );
3306        assert_eq!(
3307            service_key_for_type("AWS::SecretsManager::Secret"),
3308            Some("secretsmanager")
3309        );
3310        assert_eq!(service_key_for_type("AWS::SQS::Queue"), Some("sqs"));
3311        assert_eq!(service_key_for_type("AWS::IAM::Role"), Some("iam"));
3312        assert_eq!(
3313            service_key_for_type("AWS::StepFunctions::StateMachine"),
3314            Some("stepfunctions")
3315        );
3316        // Namespace aliases that differ from the fakecloud service name.
3317        assert_eq!(
3318            service_key_for_type("AWS::Events::Rule"),
3319            Some("eventbridge")
3320        );
3321        assert_eq!(service_key_for_type("AWS::Logs::LogGroup"), Some("logs"));
3322        assert_eq!(
3323            service_key_for_type("AWS::ElastiCache::CacheCluster"),
3324            Some("elasticache")
3325        );
3326        // S3 has no snapshot hook (it persists via the S3Store write-through).
3327        assert_eq!(service_key_for_type("AWS::S3::Bucket"), None);
3328        // Snapshot-backed services whose CFN namespace differs from the
3329        // fakecloud service name (these were missing from the map, #1766 class).
3330        assert_eq!(
3331            service_key_for_type("AWS::CertificateManager::Certificate"),
3332            Some("acm")
3333        );
3334        assert_eq!(
3335            service_key_for_type("AWS::ElasticLoadBalancingV2::LoadBalancer"),
3336            Some("elbv2")
3337        );
3338        assert_eq!(
3339            service_key_for_type("AWS::CloudFront::Distribution"),
3340            Some("cloudfront")
3341        );
3342        assert_eq!(
3343            service_key_for_type("AWS::Route53::HostedZone"),
3344            Some("route53")
3345        );
3346        assert_eq!(
3347            service_key_for_type("AWS::KinesisFirehose::DeliveryStream"),
3348            Some("firehose")
3349        );
3350        assert_eq!(service_key_for_type("AWS::Glue::Database"), Some("glue"));
3351        assert_eq!(service_key_for_type("AWS::WAFv2::WebACL"), Some("wafv2"));
3352        assert_eq!(
3353            service_key_for_type("AWS::Athena::WorkGroup"),
3354            Some("athena")
3355        );
3356        assert_eq!(
3357            service_key_for_type("AWS::Organizations::Organization"),
3358            Some("organizations")
3359        );
3360        // Malformed / non-AWS types.
3361        assert_eq!(service_key_for_type("AWS::Lambda"), None);
3362        assert_eq!(service_key_for_type("Custom::Thing::Resource"), None);
3363        assert_eq!(service_key_for_type("AWS"), None);
3364        assert_eq!(service_key_for_type(""), None);
3365    }
3366
3367    #[tokio::test]
3368    async fn persist_touched_services_noop_with_empty_hooks() {
3369        // No registered hooks -> nothing to do, must not panic.
3370        let hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> = BTreeMap::new();
3371        persist_touched_services(&hooks, vec!["AWS::SQS::Queue".to_string()]).await;
3372    }
3373
3374    #[tokio::test]
3375    async fn cfn_bucket_policy_write_through_create_update_delete() {
3376        let tmp = tempfile::tempdir().unwrap();
3377        let store = disk_s3_store(&tmp);
3378        let svc = make_service().with_s3_store(store.clone());
3379
3380        // Create a bucket + bucket policy.
3381        let mut create = HashMap::new();
3382        create.insert("StackName".to_string(), "pol".to_string());
3383        create.insert(
3384            "TemplateBody".to_string(),
3385            r#"{"Resources":{
3386                "B":{"Type":"AWS::S3::Bucket","Properties":{"BucketName":"pol-bucket"}},
3387                "BP":{"Type":"AWS::S3::BucketPolicy","Properties":{"Bucket":"pol-bucket","PolicyDocument":{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:GetObject","Resource":"*","Principal":"*"}]}}}
3388            }}"#
3389            .to_string(),
3390        );
3391        svc.create_stack(&make_request("CreateStack", create))
3392            .await
3393            .unwrap();
3394        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3395        let policy = loaded.buckets["pol-bucket"]
3396            .subresources
3397            .get("policy.toml")
3398            .cloned()
3399            .expect("bucket policy persisted on create");
3400        assert!(policy.contains("s3:GetObject"));
3401
3402        // Update the policy document; the update must write through.
3403        let mut update = HashMap::new();
3404        update.insert("StackName".to_string(), "pol".to_string());
3405        update.insert(
3406            "TemplateBody".to_string(),
3407            r#"{"Resources":{
3408                "B":{"Type":"AWS::S3::Bucket","Properties":{"BucketName":"pol-bucket"}},
3409                "BP":{"Type":"AWS::S3::BucketPolicy","Properties":{"Bucket":"pol-bucket","PolicyDocument":{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:PutObject","Resource":"*","Principal":"*"}]}}}
3410            }}"#
3411            .to_string(),
3412        );
3413        svc.update_stack(&make_request("UpdateStack", update))
3414            .await
3415            .unwrap();
3416        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3417        let policy = loaded.buckets["pol-bucket"]
3418            .subresources
3419            .get("policy.toml")
3420            .cloned()
3421            .expect("bucket policy still persisted after update");
3422        assert!(
3423            policy.contains("s3:PutObject"),
3424            "updated policy should be written through"
3425        );
3426
3427        // Delete the stack; the bucket (and its policy) must be removed from disk.
3428        let mut del = HashMap::new();
3429        del.insert("StackName".to_string(), "pol".to_string());
3430        svc.delete_stack(&make_request("DeleteStack", del))
3431            .await
3432            .unwrap();
3433        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3434        assert!(
3435            !loaded.buckets.contains_key("pol-bucket"),
3436            "CFN-deleted bucket and policy should be gone from the store"
3437        );
3438    }
3439}