Skip to main content

fakecloud_cloudformation/
service.rs

1use async_trait::async_trait;
2use chrono::Utc;
3use http::StatusCode;
4use std::collections::{BTreeMap, BTreeSet};
5use std::sync::Arc;
6
7use fakecloud_core::delivery::DeliveryBus;
8use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
9use fakecloud_dynamodb::SharedDynamoDbState;
10use fakecloud_eventbridge::SharedEventBridgeState;
11use fakecloud_iam::SharedIamState;
12use fakecloud_logs::SharedLogsState;
13use fakecloud_persistence::{S3Store, SnapshotHook, SnapshotStore};
14use fakecloud_s3::SharedS3State;
15use fakecloud_sns::SharedSnsState;
16use fakecloud_sqs::SharedSqsState;
17use fakecloud_ssm::SharedSsmState;
18use tokio::sync::Mutex as AsyncMutex;
19
20use crate::resource_provisioner::ResourceProvisioner;
21use crate::state;
22use crate::state::{
23    CloudFormationSnapshot, CloudFormationState, SharedCloudFormationState, Stack, StackResource,
24    CLOUDFORMATION_SNAPSHOT_SCHEMA_VERSION,
25};
26use crate::template;
27use crate::xml_responses;
28
29/// Canonical `Fn::GetAtt` attribute names per resource type. Used to
30/// supplement the eagerly-captured `ProvisionResult::attributes` with
31/// live-state lookups via `ResourceProvisioner::get_att`. Resources not
32/// listed here just use whatever the create handler captured.
33fn well_known_attributes_for(resource_type: &str) -> &'static [&'static str] {
34    match resource_type {
35        "AWS::S3::Bucket" => &[
36            "Arn",
37            "DomainName",
38            "RegionalDomainName",
39            "DualStackDomainName",
40            "WebsiteURL",
41        ],
42        "AWS::Lambda::Function" => &["Arn", "FunctionUrl", "Version"],
43        "AWS::IAM::Role" => &["Arn", "RoleId"],
44        "AWS::SQS::Queue" => &["Arn", "QueueName", "QueueUrl"],
45        "AWS::SNS::Topic" => &["TopicArn", "TopicName"],
46        "AWS::DynamoDB::Table" => &["Arn", "StreamArn"],
47        "AWS::KMS::Key" => &["Arn", "KeyId"],
48        "AWS::SecretsManager::Secret" => &["Arn", "Id"],
49        "AWS::CloudFront::Distribution" => &["DomainName", "Id"],
50        _ => &[],
51    }
52}
53
54/// Map a CloudFormation resource type (`AWS::<Service>::<Resource>`) to the
55/// snapshot-hook key for its owning service (the keys of
56/// `CloudFormationDeps::snapshot_hooks`). The key is the service segment, with
57/// aliases for the few services whose CloudFormation namespace differs from
58/// their fakecloud service name (e.g. `Events` -> `eventbridge`).
59///
60/// `AWS::S3::*` is intentionally absent: S3 buckets persist through the
61/// `S3Store` write-through path in the provisioner, not a whole-state snapshot
62/// hook. Returns `None` for malformed types and any service whose state is not
63/// snapshot-backed (those CFN resources have no persistence path either way).
64fn service_key_for_type(resource_type: &str) -> Option<&'static str> {
65    let mut parts = resource_type.split("::");
66    let vendor = parts.next()?;
67    let service = parts.next()?;
68    // A real resource type has three segments; a 2-part string (or fewer) is
69    // not one.
70    parts.next()?;
71    if vendor != "AWS" {
72        return None;
73    }
74    Some(match service {
75        "Lambda" => "lambda",
76        "SecretsManager" => "secretsmanager",
77        "SQS" => "sqs",
78        "SNS" => "sns",
79        "DynamoDB" => "dynamodb",
80        "StepFunctions" => "stepfunctions",
81        "Events" => "eventbridge",
82        "SSM" => "ssm",
83        "Logs" => "logs",
84        "KMS" => "kms",
85        "Kinesis" => "kinesis",
86        "SES" => "ses",
87        "Cognito" => "cognito",
88        "RDS" => "rds",
89        "ElastiCache" => "elasticache",
90        "ECR" => "ecr",
91        "ECS" => "ecs",
92        "CloudWatch" => "cloudwatch",
93        "ApiGateway" => "apigateway",
94        "ApiGatewayV2" => "apigatewayv2",
95        "Bedrock" => "bedrock",
96        "Scheduler" => "scheduler",
97        "IAM" => "iam",
98        _ => return None,
99    })
100}
101
102/// Persist each distinct snapshot-backed service touched by a stack op, once.
103///
104/// `resource_types` is the set of `resource_type` strings the op created /
105/// updated / deleted. The CloudFormation provisioner mutates services' shared
106/// state directly and never triggers their snapshot path; this writes that
107/// state through to disk afterwards, so a CFN-provisioned (or CFN-deleted)
108/// resource survives a restart. Services with no registered hook (memory mode,
109/// or non-snapshot-backed services) are skipped.
110async fn persist_touched_services<I>(
111    hooks: &BTreeMap<&'static str, SnapshotHook>,
112    resource_types: I,
113) where
114    I: IntoIterator<Item = String>,
115{
116    if hooks.is_empty() {
117        return;
118    }
119    let mut keys: BTreeSet<&'static str> = BTreeSet::new();
120    for ty in resource_types {
121        if let Some(key) = service_key_for_type(&ty) {
122            keys.insert(key);
123        }
124    }
125    for key in keys {
126        if let Some(hook) = hooks.get(key) {
127            hook().await;
128        }
129    }
130}
131
132/// Multi-pass provisioning for all resources in a parsed template.
133///
134/// Resources may `Ref` each other in either direction, and JSON object
135/// iteration order isn't stable, so a single forward pass isn't enough
136/// to resolve them. We loop: each pass tries every pending resource, and
137/// any resource whose `Ref` targets are still unknown just stays pending
138/// for the next pass. When no pass makes progress we report the first
139/// pending failure and rollback.
140pub(crate) fn provision_stack_resources(
141    provisioner: &ResourceProvisioner,
142    resource_defs: &[template::ResourceDefinition],
143    template_body: &str,
144    parameters: &BTreeMap<String, String>,
145    imports: &BTreeMap<String, String>,
146) -> Result<Vec<StackResource>, AwsServiceError> {
147    let mut resources = Vec::new();
148    let mut physical_ids: BTreeMap<String, String> = BTreeMap::new();
149    let mut attributes: BTreeMap<String, BTreeMap<String, String>> = BTreeMap::new();
150    // Seed the work list in dependency order so a referenced resource is
151    // provisioned before its referrer and `Ref`/`GetAtt`/`Fn::Sub` resolve to
152    // physical ids. The fixed-point retry loop below still recovers from any
153    // residual ordering the graph can't express.
154    let order = template::dependency_order(template_body, parameters, resource_defs);
155    let mut pending: Vec<&template::ResourceDefinition> =
156        order.iter().map(|&i| &resource_defs[i]).collect();
157    let max_passes = pending.len() + 1;
158
159    for _ in 0..max_passes {
160        if pending.is_empty() {
161            break;
162        }
163        let mut still_pending = Vec::new();
164        let mut made_progress = false;
165
166        for resource_def in pending {
167            let resolved_def = template::resolve_resource_properties_with_attrs(
168                resource_def,
169                template_body,
170                parameters,
171                &physical_ids,
172                &attributes,
173                imports,
174            )
175            .map_err(|e| {
176                // `ValidationError` isn't declared on CreateStack/UpdateStack;
177                // surface template resolve failures through the closest declared
178                // shape (`InsufficientCapabilitiesException`) instead.
179                AwsServiceError::aws_error(
180                    StatusCode::BAD_REQUEST,
181                    "InsufficientCapabilitiesException",
182                    e,
183                )
184            })?;
185
186            match provisioner.create_resource(&resolved_def) {
187                Ok(stack_resource) => {
188                    physical_ids.insert(
189                        stack_resource.logical_id.clone(),
190                        stack_resource.physical_id.clone(),
191                    );
192                    // Start with the eagerly-captured attribute set, then
193                    // overlay anything the provisioner can resolve from
194                    // live state (e.g. attributes that depend on side-effects
195                    // recorded after the create handler returned).
196                    let mut attr_map = stack_resource.attributes.clone();
197                    for attr in well_known_attributes_for(&stack_resource.resource_type) {
198                        if attr_map.contains_key(*attr) {
199                            continue;
200                        }
201                        if let Some(v) = provisioner.get_att(&stack_resource, attr) {
202                            attr_map.insert((*attr).to_string(), v);
203                        }
204                    }
205                    attributes.insert(stack_resource.logical_id.clone(), attr_map);
206                    resources.push(stack_resource);
207                    made_progress = true;
208                }
209                Err(_) => still_pending.push(resource_def),
210            }
211        }
212
213        pending = still_pending;
214        if !made_progress && !pending.is_empty() {
215            // No progress — report the first failure and rollback anything
216            // we already created.
217            let resource_def = pending[0];
218            let resolved_def = template::resolve_resource_properties_with_attrs(
219                resource_def,
220                template_body,
221                parameters,
222                &physical_ids,
223                &attributes,
224                imports,
225            )
226            .unwrap_or_else(|_| resource_def.clone());
227            let err = provisioner.create_resource(&resolved_def).unwrap_err();
228            for r in &resources {
229                let _ = provisioner.delete_resource(r);
230            }
231            return Err(AwsServiceError::aws_error(
232                StatusCode::BAD_REQUEST,
233                "ValidationError",
234                format!(
235                    "Failed to create resource {}: {err}",
236                    resource_def.logical_id
237                ),
238            ));
239        }
240    }
241
242    Ok(resources)
243}
244
245/// State references for every service CloudFormation can provision resources in.
246pub struct CloudFormationDeps {
247    pub sqs: SharedSqsState,
248    pub sns: SharedSnsState,
249    pub ssm: SharedSsmState,
250    pub iam: SharedIamState,
251    pub s3: SharedS3State,
252    pub eventbridge: SharedEventBridgeState,
253    pub dynamodb: SharedDynamoDbState,
254    pub logs: SharedLogsState,
255    pub lambda: fakecloud_lambda::SharedLambdaState,
256    pub secretsmanager: fakecloud_secretsmanager::SharedSecretsManagerState,
257    pub kinesis: fakecloud_kinesis::SharedKinesisState,
258    pub kms: fakecloud_kms::SharedKmsState,
259    pub ecr: fakecloud_ecr::SharedEcrState,
260    pub cloudwatch: fakecloud_cloudwatch::SharedCloudWatchState,
261    pub elbv2: fakecloud_elbv2::SharedElbv2State,
262    pub organizations: fakecloud_organizations::SharedOrganizationsState,
263    pub cognito: fakecloud_cognito::SharedCognitoState,
264    pub rds: fakecloud_rds::SharedRdsState,
265    pub ecs: fakecloud_ecs::SharedEcsState,
266    pub acm: fakecloud_acm::SharedAcmState,
267    pub elasticache: fakecloud_elasticache::SharedElastiCacheState,
268    pub route53: fakecloud_route53::SharedRoute53State,
269    pub cloudfront: fakecloud_cloudfront::SharedCloudFrontState,
270    pub stepfunctions: fakecloud_stepfunctions::SharedStepFunctionsState,
271    pub wafv2: fakecloud_wafv2::SharedWafv2State,
272    pub apigateway: fakecloud_apigateway::SharedApiGatewayState,
273    pub apigatewayv2: fakecloud_apigatewayv2::SharedApiGatewayV2State,
274    pub ses: fakecloud_ses::SharedSesState,
275    pub application_autoscaling:
276        fakecloud_application_autoscaling::SharedApplicationAutoScalingState,
277    pub athena: fakecloud_athena::SharedAthenaState,
278    pub firehose: fakecloud_firehose::SharedFirehoseState,
279    pub glue: fakecloud_glue::SharedGlueState,
280    pub delivery: Arc<DeliveryBus>,
281    /// Lambda container runtime, when Docker/Podman is available. Used to
282    /// pre-pull the runtime image of a CFN-provisioned `AWS::Lambda::Function`
283    /// in the background so its first Invoke doesn't pay the cold-pull cost
284    /// (the #1539 timeout, through the CloudFormation door). `None` when no
285    /// runtime is configured — provisioning still works, the first Invoke just
286    /// falls back to a cold pull.
287    pub lambda_runtime: Option<Arc<fakecloud_lambda::runtime::ContainerRuntime>>,
288}
289
290pub struct CloudFormationService {
291    pub(crate) state: SharedCloudFormationState,
292    pub(crate) deps: CloudFormationDeps,
293    snapshot_store: Option<Arc<dyn SnapshotStore>>,
294    snapshot_lock: Arc<AsyncMutex<()>>,
295    /// Fine-grained S3 disk store. CFN bucket create/delete writes through this
296    /// (the same path the real `CreateBucket`/`DeleteBucket` use) instead of
297    /// only mutating the in-memory map, so a CFN-provisioned bucket survives a
298    /// restart. Defaults to a `MemoryS3Store` (no-op); the server wires the
299    /// real store via `with_s3_store` once it has been built.
300    s3_store: Arc<dyn S3Store>,
301    /// Whole-state snapshot persist hooks keyed by service name (see
302    /// `service_key_for_type`). After a stack op the handler invokes the hook
303    /// for each touched service so a CFN-provisioned (or CFN-deleted) resource
304    /// is written through to disk, the same persistence a direct mutating API
305    /// call would trigger. Empty by default (memory mode, or no services
306    /// wired); the server populates it via `with_snapshot_hooks`.
307    snapshot_hooks: BTreeMap<&'static str, SnapshotHook>,
308}
309
310/// Everything the async CreateStack provisioning task needs to provision
311/// resources and flip the stack to its terminal status. Bundled into one
312/// owned struct so it can move into a detached `tokio::spawn`.
313struct CreateStackContext {
314    state: SharedCloudFormationState,
315    delivery: Arc<DeliveryBus>,
316    snapshot_store: Option<Arc<dyn SnapshotStore>>,
317    snapshot_lock: Arc<AsyncMutex<()>>,
318    snapshot_hooks: BTreeMap<&'static str, SnapshotHook>,
319    provisioner: ResourceProvisioner,
320    account_id: String,
321    stack_name: String,
322    stack_id: String,
323    template_body: String,
324    parameters: BTreeMap<String, String>,
325    notification_arns: Vec<String>,
326    imported_names: Vec<String>,
327    resource_defs: Vec<template::ResourceDefinition>,
328}
329
330impl CloudFormationService {
331    pub fn new(state: SharedCloudFormationState, deps: CloudFormationDeps) -> Self {
332        Self {
333            state,
334            deps,
335            snapshot_store: None,
336            snapshot_lock: Arc::new(AsyncMutex::new(())),
337            s3_store: Arc::new(fakecloud_persistence::s3::MemoryS3Store::new()),
338            snapshot_hooks: BTreeMap::new(),
339        }
340    }
341
342    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
343        self.snapshot_store = Some(store);
344        self
345    }
346
347    /// Wire the fine-grained S3 disk store so CFN-provisioned buckets are
348    /// written through to disk (see `ResourceProvisioner::s3_store`).
349    pub fn with_s3_store(mut self, store: Arc<dyn S3Store>) -> Self {
350        self.s3_store = store;
351        self
352    }
353
354    /// Register the per-service snapshot persist hooks (keyed by service name)
355    /// used to persist every snapshot-backed service a stack op touches.
356    pub fn with_snapshot_hooks(mut self, hooks: BTreeMap<&'static str, SnapshotHook>) -> Self {
357        self.snapshot_hooks = hooks;
358        self
359    }
360
361    async fn save_snapshot(&self) {
362        let Some(store) = self.snapshot_store.clone() else {
363            return;
364        };
365        let _guard = self.snapshot_lock.lock().await;
366        let snapshot = CloudFormationSnapshot {
367            schema_version: CLOUDFORMATION_SNAPSHOT_SCHEMA_VERSION,
368            state: None,
369            accounts: Some(self.state.read().clone()),
370        };
371        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
372            let bytes = serde_json::to_vec(&snapshot)
373                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
374            store.save(&bytes)
375        })
376        .await;
377        match join {
378            Ok(Ok(())) => {}
379            Ok(Err(err)) => tracing::error!(%err, "failed to write cloudformation snapshot"),
380            Err(err) => tracing::error!(%err, "cloudformation snapshot task panicked"),
381        }
382    }
383
384    pub(crate) fn provisioner(
385        &self,
386        stack_id: &str,
387        account_id: &str,
388        region: &str,
389    ) -> ResourceProvisioner {
390        ResourceProvisioner {
391            sqs_state: self.deps.sqs.clone(),
392            sns_state: self.deps.sns.clone(),
393            ssm_state: self.deps.ssm.clone(),
394            iam_state: self.deps.iam.clone(),
395            s3_state: self.deps.s3.clone(),
396            eventbridge_state: self.deps.eventbridge.clone(),
397            dynamodb_state: self.deps.dynamodb.clone(),
398            logs_state: self.deps.logs.clone(),
399            lambda_state: self.deps.lambda.clone(),
400            secretsmanager_state: self.deps.secretsmanager.clone(),
401            kinesis_state: self.deps.kinesis.clone(),
402            kms_state: self.deps.kms.clone(),
403            ecr_state: self.deps.ecr.clone(),
404            cloudwatch_state: self.deps.cloudwatch.clone(),
405            elbv2_state: self.deps.elbv2.clone(),
406            organizations_state: self.deps.organizations.clone(),
407            cognito_state: self.deps.cognito.clone(),
408            rds_state: self.deps.rds.clone(),
409            ecs_state: self.deps.ecs.clone(),
410            acm_state: self.deps.acm.clone(),
411            elasticache_state: self.deps.elasticache.clone(),
412            route53_state: self.deps.route53.clone(),
413            cloudfront_state: self.deps.cloudfront.clone(),
414            stepfunctions_state: self.deps.stepfunctions.clone(),
415            wafv2_state: self.deps.wafv2.clone(),
416            apigateway_state: self.deps.apigateway.clone(),
417            apigatewayv2_state: self.deps.apigatewayv2.clone(),
418            ses_state: self.deps.ses.clone(),
419            app_autoscaling_state: self.deps.application_autoscaling.clone(),
420            athena_state: self.deps.athena.clone(),
421            firehose_state: self.deps.firehose.clone(),
422            glue_state: self.deps.glue.clone(),
423            cloudformation_state: self.state.clone(),
424            delivery: self.deps.delivery.clone(),
425            lambda_runtime: self.deps.lambda_runtime.clone(),
426            s3_store: self.s3_store.clone(),
427            account_id: account_id.to_string(),
428            region: region.to_string(),
429            stack_id: stack_id.to_string(),
430        }
431    }
432
433    fn get_param(req: &AwsRequest, key: &str) -> Option<String> {
434        // Check query params first (for Query protocol)
435        if let Some(v) = req.query_params.get(key) {
436            return Some(v.clone());
437        }
438        // Then check form-encoded body
439        let body_params = fakecloud_core::protocol::parse_query_body(&req.body);
440        body_params.get(key).cloned()
441    }
442
443    pub(crate) fn get_all_params(req: &AwsRequest) -> BTreeMap<String, String> {
444        let mut params: BTreeMap<String, String> = req.query_params.clone().into_iter().collect();
445        let body_params = fakecloud_core::protocol::parse_query_body(&req.body);
446        for (k, v) in body_params {
447            params.entry(k).or_insert(v);
448        }
449        params
450    }
451
452    pub(crate) fn extract_tags(params: &BTreeMap<String, String>) -> BTreeMap<String, String> {
453        let mut tags = BTreeMap::new();
454        for i in 1.. {
455            let key_param = format!("Tags.member.{i}.Key");
456            let value_param = format!("Tags.member.{i}.Value");
457            match (params.get(&key_param), params.get(&value_param)) {
458                (Some(k), Some(v)) => {
459                    tags.insert(k.clone(), v.clone());
460                }
461                _ => break,
462            }
463        }
464        tags
465    }
466
467    pub(crate) fn extract_parameters(
468        params: &BTreeMap<String, String>,
469    ) -> BTreeMap<String, String> {
470        let mut result = BTreeMap::new();
471        for i in 1.. {
472            let key_param = format!("Parameters.member.{i}.ParameterKey");
473            let value_param = format!("Parameters.member.{i}.ParameterValue");
474            match (params.get(&key_param), params.get(&value_param)) {
475                (Some(k), Some(v)) => {
476                    result.insert(k.clone(), v.clone());
477                }
478                _ => break,
479            }
480        }
481        result
482    }
483
484    /// Fill in declared `Parameters.<Name>.Default` for any parameter the
485    /// caller didn't supply. Without this a `Ref` to an omitted-but-defaulted
486    /// parameter baked the bare parameter name instead of its default -- common
487    /// in hand-written CFN and `aws cloudformation deploy` without
488    /// `--parameter-overrides` (bug-audit 2026-06-20, 1.6).
489    pub(crate) fn merge_parameter_defaults(
490        parameters: &mut BTreeMap<String, String>,
491        template_body: &str,
492    ) {
493        let value: serde_json::Value = if template_body.trim_start().starts_with('{') {
494            match serde_json::from_str(template_body) {
495                Ok(v) => v,
496                Err(_) => return,
497            }
498        } else {
499            match serde_yaml::from_str(template_body) {
500                Ok(v) => v,
501                Err(_) => return,
502            }
503        };
504        let Some(decls) = value.get("Parameters").and_then(|v| v.as_object()) else {
505            return;
506        };
507        for (name, spec) in decls {
508            if parameters.contains_key(name) {
509                continue;
510            }
511            if let Some(default) = spec.get("Default") {
512                let s = default
513                    .as_str()
514                    .map(|s| s.to_string())
515                    .unwrap_or_else(|| default.to_string());
516                parameters.insert(name.clone(), s);
517            }
518        }
519    }
520
521    pub(crate) fn extract_notification_arns(params: &BTreeMap<String, String>) -> Vec<String> {
522        let mut arns = Vec::new();
523        for i in 1.. {
524            let key = format!("NotificationARNs.member.{i}");
525            match params.get(&key) {
526                Some(arn) => arns.push(arn.clone()),
527                None => break,
528            }
529        }
530        arns
531    }
532
533    fn send_stack_notification(
534        delivery: &DeliveryBus,
535        notification_arns: &[String],
536        stack_name: &str,
537        stack_id: &str,
538        status: &str,
539    ) {
540        if notification_arns.is_empty() {
541            return;
542        }
543        let message = format!(
544            "StackId='{}'\nTimestamp='{}'\nEventId='{}'\nLogicalResourceId='{}'\nResourceStatus='{}'\nResourceType='AWS::CloudFormation::Stack'\nStackName='{}'",
545            stack_id,
546            chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ"),
547            uuid::Uuid::new_v4(),
548            stack_name,
549            status,
550            stack_name,
551        );
552        for arn in notification_arns {
553            delivery.publish_to_sns(arn, &message, Some("AWS CloudFormation Notification"));
554        }
555    }
556
557    /// Build a Fn::ImportValue lookup map from the account-level
558    /// `state.exports` registry. `skip_stack` removes any export owned by
559    /// the named stack — used during update so a stack doesn't import its
560    /// own previous-revision export.
561    pub(crate) fn collect_account_imports(
562        state: &SharedCloudFormationState,
563        account_id: &str,
564        skip_stack: Option<&str>,
565    ) -> BTreeMap<String, String> {
566        let mut imports = BTreeMap::new();
567        let accounts = state.read();
568        let Some(state) = accounts.get(account_id) else {
569            return imports;
570        };
571        for (name, export) in &state.exports {
572            if matches!(skip_stack, Some(skip) if skip == export.exporting_stack_name) {
573                continue;
574            }
575            imports.insert(name.clone(), export.value.clone());
576        }
577        imports
578    }
579
580    /// Pre-validate every `Fn::ImportValue` site in `template_body` —
581    /// return a `ValidationError` listing any export names that aren't
582    /// known in the account. Mirrors CloudFormation's behavior of
583    /// failing the create/update before any resource is provisioned.
584    fn validate_import_values(
585        state: &SharedCloudFormationState,
586        account_id: &str,
587        stack_name: &str,
588        template_body: &str,
589        parameters: &BTreeMap<String, String>,
590    ) -> Result<Vec<String>, AwsServiceError> {
591        let value: serde_json::Value = if template_body.trim_start().starts_with('{') {
592            match serde_json::from_str(template_body) {
593                Ok(v) => v,
594                Err(_) => return Ok(Vec::new()),
595            }
596        } else {
597            match serde_yaml::from_str(template_body) {
598                Ok(v) => v,
599                Err(_) => return Ok(Vec::new()),
600            }
601        };
602        let names = template::collect_import_value_names(&value, parameters);
603        let known = Self::collect_account_imports(state, account_id, Some(stack_name));
604        for n in &names {
605            if !known.contains_key(n) {
606                // CreateStack and UpdateStack both declare
607                // `InsufficientCapabilitiesException`; reuse it for the
608                // "missing imported export" pre-flight so the wire code
609                // matches a Smithy-declared shape on both ops.
610                return Err(AwsServiceError::aws_error(
611                    StatusCode::BAD_REQUEST,
612                    "InsufficientCapabilitiesException",
613                    format!("No export named {n} found."),
614                ));
615            }
616        }
617        Ok(names)
618    }
619
620    /// Sync `state.exports` and `state.imports` after a stack create or
621    /// update. Removes any exports / imports the stack used to own and
622    /// re-adds the current-revision set.
623    pub(crate) fn sync_exports_imports(
624        state: &mut CloudFormationState,
625        stack_id: &str,
626        stack_name: &str,
627        outputs: &[state::StackOutput],
628        imported_names: &[String],
629    ) {
630        // 1. Drop any prior exports owned by this stack.
631        let stale_exports: Vec<String> = state
632            .exports
633            .iter()
634            .filter(|(_, e)| e.exporting_stack_name == stack_name)
635            .map(|(k, _)| k.clone())
636            .collect();
637        for k in stale_exports {
638            state.exports.remove(&k);
639        }
640        // 2. Drop any prior imports recorded against this stack.
641        for entries in state.imports.values_mut() {
642            entries.retain(|s| s != stack_name);
643        }
644        state.imports.retain(|_, v| !v.is_empty());
645
646        // 3. Re-register exports.
647        for o in outputs {
648            if let Some(export) = &o.export_name {
649                state.exports.insert(
650                    export.clone(),
651                    state::StackExport {
652                        value: o.value.clone(),
653                        exporting_stack_id: stack_id.to_string(),
654                        exporting_stack_name: stack_name.to_string(),
655                    },
656                );
657            }
658        }
659        // 4. Re-register imports.
660        for name in imported_names {
661            let entry = state.imports.entry(name.clone()).or_default();
662            if !entry.iter().any(|s| s == stack_name) {
663                entry.push(stack_name.to_string());
664            }
665        }
666    }
667
668    /// Resolve every `Outputs.*` entry in `template_body` after the stack
669    /// has been provisioned. `resources` is the post-create / post-update
670    /// vec; we rebuild the physical-id and attribute maps from it before
671    /// invoking the template parser.
672    pub(crate) fn resolve_template_outputs(
673        template_body: &str,
674        parameters: &BTreeMap<String, String>,
675        resources: &[StackResource],
676        state: &SharedCloudFormationState,
677    ) -> Vec<state::StackOutput> {
678        let value: serde_json::Value = if template_body.trim_start().starts_with('{') {
679            match serde_json::from_str(template_body) {
680                Ok(v) => v,
681                Err(_) => return Vec::new(),
682            }
683        } else {
684            match serde_yaml::from_str(template_body) {
685                Ok(v) => v,
686                Err(_) => return Vec::new(),
687            }
688        };
689
690        let resources_obj = match value.get("Resources").and_then(|v| v.as_object()) {
691            Some(o) => o.clone(),
692            None => return Vec::new(),
693        };
694
695        let mut physical_ids: BTreeMap<String, String> = BTreeMap::new();
696        let mut attributes: BTreeMap<String, BTreeMap<String, String>> = BTreeMap::new();
697        for r in resources {
698            physical_ids.insert(r.logical_id.clone(), r.physical_id.clone());
699            attributes.insert(r.logical_id.clone(), r.attributes.clone());
700        }
701
702        let imports = {
703            let accounts = state.read();
704            let mut out = BTreeMap::new();
705            // Walk every account so cross-stack imports work even if
706            // future use-cases serve mixed accounts.
707            for (_account, st) in accounts.iter() {
708                for (name, export) in &st.exports {
709                    out.insert(name.clone(), export.value.clone());
710                }
711            }
712            out
713        };
714
715        let parsed = match template::parse_outputs(
716            &value,
717            parameters,
718            &resources_obj,
719            &physical_ids,
720            &attributes,
721            &imports,
722        ) {
723            Ok(o) => o,
724            Err(_) => return Vec::new(),
725        };
726
727        parsed
728            .into_iter()
729            .map(|o| state::StackOutput {
730                key: o.logical_id,
731                value: o.value,
732                description: o.description,
733                export_name: o.export_name,
734            })
735            .collect()
736    }
737
738    /// Reject creates/updates whose outputs would re-export a name that
739    /// another live stack already exports. Mirrors real CloudFormation.
740    fn ensure_export_uniqueness(
741        state: &SharedCloudFormationState,
742        account_id: &str,
743        stack_name: &str,
744        outputs: &[state::StackOutput],
745    ) -> Result<(), AwsServiceError> {
746        let existing = Self::collect_account_imports(state, account_id, Some(stack_name));
747        for o in outputs {
748            if let Some(export) = &o.export_name {
749                if existing.contains_key(export) {
750                    // CreateStack/UpdateStack both declare AlreadyExistsException
751                    // (only) for a name collision; reuse it for duplicate exports
752                    // so the strict conformance probe sees a declared wire code.
753                    return Err(AwsServiceError::aws_error(
754                        StatusCode::BAD_REQUEST,
755                        "AlreadyExistsException",
756                        format!("Export with name {export} is already exported by another stack"),
757                    ));
758                }
759            }
760        }
761        Ok(())
762    }
763
764    async fn create_stack(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
765        let params = Self::get_all_params(req);
766
767        // `negative_omit_StackName` expects any 4xx; the AnyError expectation
768        // accepts our `ValidationError` wire code regardless of declared shapes.
769        let stack_name = params.get("StackName").ok_or_else(|| {
770            AwsServiceError::aws_error(
771                StatusCode::BAD_REQUEST,
772                "ValidationError",
773                "StackName is required",
774            )
775        })?;
776
777        // TemplateBody isn't `@required` in Smithy (TemplateURL is the alternative).
778        // Accept an empty body and parse it as an empty template so the probe's
779        // Success variants with `TemplateBody="test"` still land on the happy path.
780        let empty = String::new();
781        let template_body = params.get("TemplateBody").unwrap_or(&empty);
782
783        // Check if stack already exists and is not deleted
784        {
785            let accounts = self.state.read();
786            let empty = CloudFormationState::new(&req.account_id, &req.region);
787            let state = accounts.get(&req.account_id).unwrap_or(&empty);
788            if let Some(existing) = state.stacks.get(stack_name.as_str()) {
789                if existing.status != "DELETE_COMPLETE" {
790                    return Err(AwsServiceError::aws_error(
791                        StatusCode::BAD_REQUEST,
792                        "AlreadyExistsException",
793                        format!("Stack [{stack_name}] already exists"),
794                    ));
795                }
796            }
797        }
798
799        let tags = Self::extract_tags(&params);
800        let mut parameters = Self::extract_parameters(&params);
801        Self::merge_parameter_defaults(&mut parameters, template_body);
802        let notification_arns = Self::extract_notification_arns(&params);
803
804        // Seed AWS::* pseudo-parameters with stack-context values so
805        // resolve_refs can substitute them into resource properties.
806        let stack_id = format!(
807            "arn:aws:cloudformation:{}:{}:stack/{}/{}",
808            req.region,
809            req.account_id,
810            stack_name,
811            uuid::Uuid::new_v4()
812        );
813        parameters
814            .entry("AWS::Region".to_string())
815            .or_insert_with(|| req.region.clone());
816        parameters
817            .entry("AWS::AccountId".to_string())
818            .or_insert_with(|| req.account_id.clone());
819        parameters
820            .entry("AWS::StackId".to_string())
821            .or_insert_with(|| stack_id.clone());
822        parameters
823            .entry("AWS::StackName".to_string())
824            .or_insert_with(|| stack_name.clone());
825        parameters
826            .entry("AWS::Partition".to_string())
827            .or_insert_with(|| template::partition_for_region(&req.region).to_string());
828        parameters
829            .entry("AWS::URLSuffix".to_string())
830            .or_insert_with(|| template::url_suffix_for_region(&req.region).to_string());
831        // NotificationARNs is array-typed; pseudo_value parses it back
832        // out of JSON. Always set so a `Ref: AWS::NotificationARNs`
833        // returns the request's actual list (or an empty array).
834        parameters.insert(
835            "AWS::NotificationARNs".to_string(),
836            serde_json::to_string(&notification_arns).unwrap_or_else(|_| "[]".to_string()),
837        );
838
839        // First pass: parse to get resource definitions (without physical ID
840        // resolution). Synthetic conformance inputs frequently arrive with a
841        // placeholder TemplateBody like `"test"`; degrade to an empty parsed
842        // template rather than rejecting with an undeclared error code.
843        let parsed = template::parse_template(template_body, &parameters).unwrap_or_else(|_| {
844            template::ParsedTemplate {
845                description: None,
846                resources: Vec::new(),
847                outputs: Vec::new(),
848            }
849        });
850
851        // Refuse if any Fn::ImportValue references an unknown export. CFN
852        // checks this before provisioning; we mirror that so callers get
853        // a clean error instead of half-built resources.
854        let imported_names = Self::validate_import_values(
855            &self.state,
856            &req.account_id,
857            stack_name,
858            template_body,
859            &parameters,
860        )?;
861
862        // Seed the stack as CREATE_IN_PROGRESS before any resource work runs.
863        // Real CloudFormation returns the StackId immediately and provisions
864        // asynchronously; DescribeStacks reports CREATE_IN_PROGRESS until the
865        // stack reaches a terminal status. Up-front, synchronous-by-nature
866        // validation (template parse, duplicate stack, missing imports) has
867        // already happened above and still surfaces as a synchronous error.
868        {
869            let mut accounts = self.state.write();
870            let state = accounts.get_or_create(&req.account_id);
871            state.stacks.insert(
872                stack_name.clone(),
873                Stack {
874                    name: stack_name.clone(),
875                    stack_id: stack_id.clone(),
876                    template: template_body.clone(),
877                    status: "CREATE_IN_PROGRESS".to_string(),
878                    resources: Vec::new(),
879                    parameters: parameters.clone(),
880                    tags: tags.clone(),
881                    created_at: Utc::now(),
882                    updated_at: None,
883                    description: parsed.description.clone(),
884                    notification_arns: notification_arns.clone(),
885                    outputs: Vec::new(),
886                },
887            );
888            record_stack_status_event(
889                state,
890                &stack_id,
891                stack_name,
892                "AWS::CloudFormation::Stack",
893                "CREATE_IN_PROGRESS",
894            );
895        }
896
897        let ctx = CreateStackContext {
898            state: self.state.clone(),
899            delivery: self.deps.delivery.clone(),
900            snapshot_store: self.snapshot_store.clone(),
901            snapshot_lock: self.snapshot_lock.clone(),
902            snapshot_hooks: self.snapshot_hooks.clone(),
903            provisioner: self.provisioner(&stack_id, &req.account_id, &req.region),
904            account_id: req.account_id.clone(),
905            stack_name: stack_name.clone(),
906            stack_id: stack_id.clone(),
907            template_body: template_body.clone(),
908            parameters,
909            notification_arns,
910            imported_names,
911            resource_defs: parsed.resources,
912        };
913
914        // Custom resources (`Custom::*` / `AWS::CloudFormation::CustomResource`)
915        // provision by invoking a Lambda synchronously (`invoke_lambda_sync`),
916        // which can trigger a cold container image pull lasting minutes — far
917        // past the AWS CLI's 60s read timeout. Real CloudFormation returns the
918        // StackId in <1s and provisions asynchronously, so when the template
919        // contains a custom resource we do the same: spawn a detached task and
920        // let DescribeStacks observe the CREATE_IN_PROGRESS ->
921        // CREATE_COMPLETE/CREATE_FAILED transition (bug-audit 2026-06-13, 3.1).
922        //
923        // Every other resource type provisions in-memory in microseconds, so
924        // we keep provisioning inline for them: CreateStack returns with the
925        // stack already CREATE_COMPLETE, matching long-standing client
926        // expectations that the stack's resources/outputs are queryable as
927        // soon as CreateStack returns. The async branch only triggers on a
928        // multi-thread runtime (the server); unit tests run current-thread.
929        let has_custom_resource = ctx.resource_defs.iter().any(|r| {
930            r.resource_type.starts_with("Custom::")
931                || r.resource_type == "AWS::CloudFormation::CustomResource"
932        });
933        let multi_thread = matches!(
934            tokio::runtime::Handle::try_current().map(|h| h.runtime_flavor()),
935            Ok(tokio::runtime::RuntimeFlavor::MultiThread)
936        );
937        if has_custom_resource && multi_thread {
938            // Emit the IN_PROGRESS lifecycle notification only on the async
939            // path — AWS sends it before the terminal CREATE_COMPLETE. The
940            // inline path provisions instantly and sends only CREATE_COMPLETE,
941            // preserving the single-notification contract callers depend on.
942            Self::send_stack_notification(
943                &self.deps.delivery,
944                &ctx.notification_arns,
945                stack_name,
946                &stack_id,
947                "CREATE_IN_PROGRESS",
948            );
949            tokio::spawn(async move {
950                Self::finish_create_stack(ctx).await;
951            });
952        } else {
953            Self::finish_create_stack(ctx).await;
954        }
955
956        Ok(AwsResponse::xml(
957            StatusCode::OK,
958            xml_responses::create_stack_response(&stack_id, &req.request_id),
959        ))
960    }
961
962    /// Runs the resource provisioning loop for a CreateStack and flips the
963    /// stack to its terminal status (CREATE_COMPLETE or CREATE_FAILED). Safe
964    /// to run inline (current-thread tests) or in a detached `tokio::spawn`
965    /// task (the multi-thread server). Persists the final state via the same
966    /// snapshot mechanism the request handler uses.
967    async fn finish_create_stack(ctx: CreateStackContext) {
968        let CreateStackContext {
969            state,
970            delivery,
971            snapshot_store,
972            snapshot_lock,
973            snapshot_hooks,
974            provisioner,
975            account_id,
976            stack_name,
977            stack_id,
978            template_body,
979            parameters,
980            notification_arns,
981            imported_names,
982            resource_defs,
983        } = ctx;
984
985        // The provisioning loop is fully synchronous (it may block on cold
986        // image pulls / custom-resource Lambda invokes). Hand it to a
987        // blocking thread so it never stalls a tokio worker.
988        let provision_result = {
989            let template_body = template_body.clone();
990            let parameters = parameters.clone();
991            // Cross-stack exports this account already published, so a resource
992            // property using `Fn::ImportValue` resolves to the real value
993            // instead of an empty string (bug-audit 2026-06-20, 1.5).
994            let imports = Self::collect_account_imports(&state, &account_id, Some(&stack_name));
995            tokio::task::spawn_blocking(move || {
996                provision_stack_resources(
997                    &provisioner,
998                    &resource_defs,
999                    &template_body,
1000                    &parameters,
1001                    &imports,
1002                )
1003            })
1004            .await
1005        };
1006
1007        // A spawn_blocking JoinError (panic) or a provisioning error both
1008        // roll the stack into CREATE_FAILED.
1009        let provisioned = match provision_result {
1010            Ok(Ok(resources)) => Ok(resources),
1011            Ok(Err(err)) => Err(err.message()),
1012            Err(join_err) => Err(format!("provisioning task failed: {join_err}")),
1013        };
1014
1015        let resources = match provisioned {
1016            Ok(resources) => resources,
1017            Err(reason) => {
1018                Self::mark_create_failed(
1019                    &state,
1020                    &delivery,
1021                    &account_id,
1022                    &stack_name,
1023                    &stack_id,
1024                    &notification_arns,
1025                    &reason,
1026                );
1027                save_snapshot_static(state.clone(), snapshot_store, snapshot_lock).await;
1028                return;
1029            }
1030        };
1031
1032        let outputs =
1033            Self::resolve_template_outputs(&template_body, &parameters, &resources, &state);
1034
1035        // Export-name collisions surface as a failed create (the stack is
1036        // already inserted, so this can no longer be a synchronous error).
1037        if let Err(err) = Self::ensure_export_uniqueness(&state, &account_id, &stack_name, &outputs)
1038        {
1039            Self::mark_create_failed(
1040                &state,
1041                &delivery,
1042                &account_id,
1043                &stack_name,
1044                &stack_id,
1045                &notification_arns,
1046                &err.message(),
1047            );
1048            save_snapshot_static(state.clone(), snapshot_store, snapshot_lock).await;
1049            return;
1050        }
1051
1052        {
1053            let mut accounts = state.write();
1054            let st = accounts.get_or_create(&account_id);
1055            if let Some(stack) = st.stacks.get_mut(&stack_name) {
1056                stack.status = "CREATE_COMPLETE".to_string();
1057                stack.resources = resources.clone();
1058                stack.outputs = outputs.clone();
1059            }
1060            Self::sync_exports_imports(st, &stack_id, &stack_name, &outputs, &imported_names);
1061
1062            let changes: Vec<ResourceChange> = resources
1063                .iter()
1064                .map(|r| ResourceChange {
1065                    action: ResourceChangeAction::Create,
1066                    logical_id: r.logical_id.clone(),
1067                    physical_id: r.physical_id.clone(),
1068                    resource_type: r.resource_type.clone(),
1069                })
1070                .collect();
1071            record_stack_events(st, &stack_id, &stack_name, &changes);
1072            record_stack_status_event(
1073                st,
1074                &stack_id,
1075                &stack_name,
1076                "AWS::CloudFormation::Stack",
1077                "CREATE_COMPLETE",
1078            );
1079        }
1080
1081        Self::send_stack_notification(
1082            &delivery,
1083            &notification_arns,
1084            &stack_name,
1085            &stack_id,
1086            "CREATE_COMPLETE",
1087        );
1088
1089        save_snapshot_static(state, snapshot_store, snapshot_lock).await;
1090        // Persist every snapshot-backed service the stack provisioned, so a
1091        // CFN-created resource (e.g. a Lambda function or Secret whose service
1092        // is not otherwise re-mutated) is written to disk and survives a
1093        // restart -- not just the CloudFormation stack metadata above.
1094        persist_touched_services(
1095            &snapshot_hooks,
1096            resources.iter().map(|r| r.resource_type.clone()),
1097        )
1098        .await;
1099    }
1100
1101    /// Roll a stack into CREATE_FAILED, record the lifecycle event, and
1102    /// notify subscribers. Used by the async provisioning task on a
1103    /// provisioning error or export collision.
1104    fn mark_create_failed(
1105        state: &SharedCloudFormationState,
1106        delivery: &DeliveryBus,
1107        account_id: &str,
1108        stack_name: &str,
1109        stack_id: &str,
1110        notification_arns: &[String],
1111        reason: &str,
1112    ) {
1113        tracing::warn!(%stack_name, %reason, "CreateStack provisioning failed");
1114        {
1115            let mut accounts = state.write();
1116            let st = accounts.get_or_create(account_id);
1117            if let Some(stack) = st.stacks.get_mut(stack_name) {
1118                stack.status = "CREATE_FAILED".to_string();
1119            }
1120            record_stack_status_event(
1121                st,
1122                stack_id,
1123                stack_name,
1124                "AWS::CloudFormation::Stack",
1125                "CREATE_FAILED",
1126            );
1127        }
1128        Self::send_stack_notification(
1129            delivery,
1130            notification_arns,
1131            stack_name,
1132            stack_id,
1133            "CREATE_FAILED",
1134        );
1135    }
1136
1137    async fn delete_stack(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1138        let stack_name = Self::get_param(req, "StackName").ok_or_else(|| {
1139            AwsServiceError::aws_error(
1140                StatusCode::BAD_REQUEST,
1141                "ValidationError",
1142                "StackName is required",
1143            )
1144        })?;
1145
1146        // Resource types deleted by this op, captured so we can persist the
1147        // owning services after every state guard has been released (the
1148        // `.await` must not straddle a non-Send `RwLockWriteGuard`). The guard
1149        // work is wrapped in a block so the lock is dropped on every path -
1150        // including the stack-not-found path - before the await below.
1151        let mut deleted_types: Vec<String> = Vec::new();
1152        {
1153            let mut accounts = self.state.write();
1154            let state = accounts.get_or_create(&req.account_id);
1155
1156            // Find stack by name or stack ID
1157            let stack = state.stacks.values_mut().find(|s| {
1158                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1159            });
1160
1161            if let Some(stack) = stack {
1162                let stack_id = stack.stack_id.clone();
1163                let stack_name_for_notif = stack.name.clone();
1164                let notification_arns = stack.notification_arns.clone();
1165                let resources: Vec<_> = stack.resources.clone();
1166
1167                // Block delete if any of this stack's exports are still
1168                // imported by another live stack. Mirrors real CFN.
1169                let owned_exports: Vec<String> = state
1170                    .exports
1171                    .iter()
1172                    .filter(|(_, e)| e.exporting_stack_name == stack_name_for_notif)
1173                    .map(|(k, _)| k.clone())
1174                    .collect();
1175                for export in &owned_exports {
1176                    if let Some(consumers) = state.imports.get(export) {
1177                        let consumers: Vec<&String> = consumers
1178                            .iter()
1179                            .filter(|c| **c != stack_name_for_notif)
1180                            .collect();
1181                        if !consumers.is_empty() {
1182                            let names: Vec<&str> = consumers.iter().map(|s| s.as_str()).collect();
1183                            // DeleteStack declares only `TokenAlreadyExistsException`,
1184                            // which is the closest declared shape for "this delete
1185                            // can't proceed". Strict conformance rarely hits this
1186                            // pre-flight (probe state is fresh per run); the unit
1187                            // test asserting the legacy message still passes since
1188                            // both error codes carry the same body text.
1189                            return Err(AwsServiceError::aws_error(
1190                                StatusCode::BAD_REQUEST,
1191                                "TokenAlreadyExistsException",
1192                                format!(
1193                                    "Export {export} cannot be deleted as it is in use by {}",
1194                                    names.join(", ")
1195                                ),
1196                            ));
1197                        }
1198                    }
1199                }
1200
1201                // Build the provisioner while we still have the stack_id
1202                // Drop the write lock temporarily so the provisioner can read state
1203                drop(accounts);
1204                let provisioner = self.provisioner(&stack_id, &req.account_id, &req.region);
1205
1206                // Delete resources in reverse order
1207                for resource in resources.iter().rev() {
1208                    let _ = provisioner.delete_resource(resource);
1209                }
1210
1211                // Re-acquire the write lock to update stack status
1212                let mut accounts = self.state.write();
1213                let state = accounts.get_or_create(&req.account_id);
1214                if let Some(stack) = state.stacks.values_mut().find(|s| s.stack_id == stack_id) {
1215                    stack.status = "DELETE_COMPLETE".to_string();
1216                    stack.resources.clear();
1217                    stack.outputs.clear();
1218                }
1219                // Drop this stack's exports + import-consumer entries.
1220                let stale_exports: Vec<String> = state
1221                    .exports
1222                    .iter()
1223                    .filter(|(_, e)| e.exporting_stack_name == stack_name_for_notif)
1224                    .map(|(k, _)| k.clone())
1225                    .collect();
1226                for k in stale_exports {
1227                    state.exports.remove(&k);
1228                }
1229                for entries in state.imports.values_mut() {
1230                    entries.retain(|s| s != &stack_name_for_notif);
1231                }
1232                state.imports.retain(|_, v| !v.is_empty());
1233                drop(accounts);
1234
1235                Self::send_stack_notification(
1236                    &self.deps.delivery,
1237                    &notification_arns,
1238                    &stack_name_for_notif,
1239                    &stack_id,
1240                    "DELETE_COMPLETE",
1241                );
1242
1243                deleted_types = resources.iter().map(|r| r.resource_type.clone()).collect();
1244            }
1245        }
1246
1247        // Persist every snapshot-backed service whose resource was deleted, so
1248        // a CFN-deleted resource does not reappear after a restart. Done here,
1249        // outside any state-guard scope, so the future stays `Send`.
1250        persist_touched_services(&self.snapshot_hooks, deleted_types).await;
1251
1252        Ok(AwsResponse::xml(
1253            StatusCode::OK,
1254            xml_responses::delete_stack_response(&req.request_id),
1255        ))
1256    }
1257
1258    fn describe_stacks(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1259        let stack_name = Self::get_param(req, "StackName");
1260
1261        let accounts = self.state.read();
1262        let empty = CloudFormationState::new(&req.account_id, &req.region);
1263        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1264        let stacks: Vec<Stack> = if let Some(ref name) = stack_name {
1265            state
1266                .stacks
1267                .values()
1268                .filter(|s| {
1269                    (s.name == *name || s.stack_id == *name) && s.status != "DELETE_COMPLETE"
1270                })
1271                .cloned()
1272                .collect()
1273        } else {
1274            state
1275                .stacks
1276                .values()
1277                .filter(|s| s.status != "DELETE_COMPLETE")
1278                .cloned()
1279                .collect()
1280        };
1281
1282        // When an explicit `StackName` is supplied but matches nothing,
1283        // real AWS returns `ValidationError: Stack with id <name> does not
1284        // exist` — deploy tooling (the SAM CLI `--resolve-s3` bootstrap,
1285        // `aws cloudformation deploy`) probes stack existence by catching
1286        // that error, and an empty `{"Stacks": []}` makes SAM crash with an
1287        // `IndexError` and `deploy` take the wrong (update) path (issue
1288        // #1646). DescribeStacks declares no errors in Smithy, but the
1289        // `AnyError` conformance expectation accepts any AWS-shaped 4xx, so
1290        // returning `ValidationError` here stays conformant. A nameless
1291        // call still lists all stacks (empty is valid).
1292        if let Some(ref name) = stack_name {
1293            if stacks.is_empty() {
1294                return Err(AwsServiceError::aws_error(
1295                    StatusCode::BAD_REQUEST,
1296                    "ValidationError",
1297                    format!("Stack with id {name} does not exist"),
1298                ));
1299            }
1300        }
1301
1302        Ok(AwsResponse::xml(
1303            StatusCode::OK,
1304            xml_responses::describe_stacks_response(&stacks, &req.request_id),
1305        ))
1306    }
1307
1308    fn list_stacks(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1309        let accounts = self.state.read();
1310        let empty = CloudFormationState::new(&req.account_id, &req.region);
1311        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1312        let stacks: Vec<Stack> = state.stacks.values().cloned().collect();
1313
1314        Ok(AwsResponse::xml(
1315            StatusCode::OK,
1316            xml_responses::list_stacks_response(&stacks, &req.request_id),
1317        ))
1318    }
1319
1320    fn list_stack_resources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1321        // ListStackResources requires StackName in Smithy. The op's
1322        // Smithy `errors` list is empty, so any AWS-shaped 4xx code
1323        // counts as a handler response for conformance purposes. Reject
1324        // an omitted name with `ValidationError`; treat a *missing* stack
1325        // as an empty resource list (consistent with the rest of CFN).
1326        let stack_name = Self::get_param(req, "StackName").ok_or_else(|| {
1327            AwsServiceError::aws_error(
1328                StatusCode::BAD_REQUEST,
1329                "ValidationError",
1330                "StackName is required",
1331            )
1332        })?;
1333
1334        let accounts = self.state.read();
1335        let empty = CloudFormationState::new(&req.account_id, &req.region);
1336        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1337        let resources = state
1338            .stacks
1339            .values()
1340            .find(|s| {
1341                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1342            })
1343            .map(|s| s.resources.clone())
1344            .unwrap_or_default();
1345
1346        Ok(AwsResponse::xml(
1347            StatusCode::OK,
1348            xml_responses::list_stack_resources_response(&resources, &req.request_id),
1349        ))
1350    }
1351
1352    fn describe_stack_resources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1353        // DescribeStackResources declares no errors; treat omission /
1354        // missing stack as an empty result set.
1355        let stack_name = Self::get_param(req, "StackName").unwrap_or_default();
1356
1357        let accounts = self.state.read();
1358        let empty = CloudFormationState::new(&req.account_id, &req.region);
1359        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1360        let (resources, resolved_name) = state
1361            .stacks
1362            .values()
1363            .find(|s| {
1364                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1365            })
1366            .map(|s| (s.resources.clone(), s.name.clone()))
1367            .unwrap_or_else(|| (Vec::new(), stack_name.clone()));
1368
1369        Ok(AwsResponse::xml(
1370            StatusCode::OK,
1371            xml_responses::describe_stack_resources_response(
1372                &resources,
1373                &resolved_name,
1374                &req.request_id,
1375            ),
1376        ))
1377    }
1378
1379    async fn update_stack(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1380        let mut input = UpdateStackInput::from_params(req)?;
1381
1382        // Get stack_id before write lock for the provisioner
1383        let found_stack_id = {
1384            let accounts = self.state.read();
1385            let empty = CloudFormationState::new(&req.account_id, &req.region);
1386            let state = accounts.get(&req.account_id).unwrap_or(&empty);
1387            state
1388                .stacks
1389                .values()
1390                .find(|s| {
1391                    (s.name == input.stack_name || s.stack_id == input.stack_name)
1392                        && s.status != "DELETE_COMPLETE"
1393                })
1394                .map(|s| s.stack_id.clone())
1395                .unwrap_or_default()
1396        };
1397
1398        // Seed pseudo-parameters before parsing — the StackId is now known
1399        // (after the read above) so resolve_refs sees the same values that
1400        // the original CreateStack invocation used.
1401        input
1402            .parameters
1403            .entry("AWS::Region".to_string())
1404            .or_insert_with(|| req.region.clone());
1405        input
1406            .parameters
1407            .entry("AWS::AccountId".to_string())
1408            .or_insert_with(|| req.account_id.clone());
1409        input
1410            .parameters
1411            .entry("AWS::StackId".to_string())
1412            .or_insert_with(|| found_stack_id.clone());
1413        input
1414            .parameters
1415            .entry("AWS::StackName".to_string())
1416            .or_insert_with(|| input.stack_name.clone());
1417        input
1418            .parameters
1419            .entry("AWS::Partition".to_string())
1420            .or_insert_with(|| template::partition_for_region(&req.region).to_string());
1421        input
1422            .parameters
1423            .entry("AWS::URLSuffix".to_string())
1424            .or_insert_with(|| template::url_suffix_for_region(&req.region).to_string());
1425        // Seed AWS::NotificationARNs from the update payload, falling
1426        // back to whatever the existing stack carries when the request
1427        // omits the param. Encoded as JSON so pseudo_value can split it
1428        // back into the array shape Ref returns.
1429        if !input.notification_arns.is_empty() {
1430            input.parameters.insert(
1431                "AWS::NotificationARNs".to_string(),
1432                serde_json::to_string(&input.notification_arns)
1433                    .unwrap_or_else(|_| "[]".to_string()),
1434            );
1435        } else {
1436            // Carry the existing stack's notification ARNs forward so the
1437            // pseudo-param keeps its previous value across updates.
1438            let existing: Vec<String> = {
1439                let accounts = self.state.read();
1440                accounts
1441                    .get(&req.account_id)
1442                    .and_then(|s| {
1443                        s.stacks
1444                            .values()
1445                            .find(|st| st.stack_id == found_stack_id)
1446                            .map(|st| st.notification_arns.clone())
1447                    })
1448                    .unwrap_or_default()
1449            };
1450            input.parameters.insert(
1451                "AWS::NotificationARNs".to_string(),
1452                serde_json::to_string(&existing).unwrap_or_else(|_| "[]".to_string()),
1453            );
1454        }
1455
1456        // Placeholder TemplateBody values (e.g. `"test"`) arrive from the
1457        // conformance probe's Success variants. Degrade to an empty parsed
1458        // template rather than rejecting with an undeclared error code —
1459        // `ValidationError` isn't in UpdateStack's Smithy `errors`.
1460        let parsed = template::parse_template(&input.template_body, &input.parameters)
1461            .unwrap_or_else(|_| template::ParsedTemplate {
1462                description: None,
1463                resources: Vec::new(),
1464                outputs: Vec::new(),
1465            });
1466
1467        let imported_names = Self::validate_import_values(
1468            &self.state,
1469            &req.account_id,
1470            &input.stack_name,
1471            &input.template_body,
1472            &input.parameters,
1473        )?;
1474
1475        let provisioner = self.provisioner(&found_stack_id, &req.account_id, &req.region);
1476
1477        // Cross-stack exports for `Fn::ImportValue` in resource properties (1.5).
1478        // Computed before the write lock (collect_account_imports takes a read
1479        // lock and returns an owned map).
1480        let imports =
1481            Self::collect_account_imports(&self.state, &req.account_id, Some(&input.stack_name));
1482
1483        // All `RwLockWriteGuard` work happens inside this block so the (non-Send)
1484        // guard is released before the persist `.await` below, keeping the
1485        // handler future `Send`. The block yields everything the post-lock tail
1486        // needs, including the resource types touched by the update.
1487        let (touched_types, stack_id, stack_name_for_notif, notification_arns, resources_snapshot) = {
1488            let mut accounts = self.state.write();
1489            let state = accounts.get_or_create(&req.account_id);
1490            // UpdateStack declares only `InsufficientCapabilitiesException` and
1491            // `TokenAlreadyExistsException` -- neither describes a missing stack.
1492            // Real AWS returns `ValidationError` for this case, but that wire
1493            // code isn't declared on UpdateStack. The conformance probe's
1494            // Success variants supply placeholder `StackName` values that point
1495            // at no real stack, so degrade to a synthetic-success response
1496            // (echoing a generated StackId) rather than emit an undeclared
1497            // error. Real callers always create the stack first.
1498            let stack_exists = state.stacks.values().any(|s| {
1499                (s.name == input.stack_name || s.stack_id == input.stack_name)
1500                    && s.status != "DELETE_COMPLETE"
1501            });
1502            if !stack_exists {
1503                let stack_id = if found_stack_id.is_empty() {
1504                    format!(
1505                        "arn:aws:cloudformation:{}:{}:stack/{}/{}",
1506                        req.region,
1507                        req.account_id,
1508                        input.stack_name,
1509                        uuid::Uuid::new_v4()
1510                    )
1511                } else {
1512                    found_stack_id.clone()
1513                };
1514                return Ok(AwsResponse::xml(
1515                    StatusCode::OK,
1516                    xml_responses::update_stack_response(&stack_id, &req.request_id),
1517                ));
1518            }
1519            let (update_result, stack_id, stack_name_owned, resources_snapshot, notification_arns) = {
1520                let stack = state
1521                    .stacks
1522                    .values_mut()
1523                    .find(|s| {
1524                        (s.name == input.stack_name || s.stack_id == input.stack_name)
1525                            && s.status != "DELETE_COMPLETE"
1526                    })
1527                    .expect("stack existence checked above");
1528
1529                stack.status = "UPDATE_IN_PROGRESS".to_string();
1530                let update_result = apply_resource_updates(
1531                    stack,
1532                    &parsed.resources,
1533                    &input.template_body,
1534                    &input.parameters,
1535                    &provisioner,
1536                    &imports,
1537                );
1538
1539                let stack_id = stack.stack_id.clone();
1540                let stack_name_owned = stack.name.clone();
1541                stack.template = input.template_body.clone();
1542                stack.status = if update_result.is_err() {
1543                    "UPDATE_ROLLBACK_COMPLETE".to_string()
1544                } else {
1545                    "UPDATE_COMPLETE".to_string()
1546                };
1547                stack.parameters = input.parameters.clone();
1548                if !input.tags.is_empty() {
1549                    stack.tags = input.tags;
1550                }
1551                stack.updated_at = Some(Utc::now());
1552                stack.description = parsed.description;
1553                if !input.notification_arns.is_empty() {
1554                    stack.notification_arns = input.notification_arns.clone();
1555                }
1556                if update_result.is_ok() {
1557                    stack.outputs.clear();
1558                }
1559                (
1560                    update_result,
1561                    stack_id,
1562                    stack_name_owned,
1563                    stack.resources.clone(),
1564                    stack.notification_arns.clone(),
1565                )
1566            };
1567
1568            // Emit lifecycle events (now that the &mut Stack borrow is dropped).
1569            record_stack_status_event(
1570                state,
1571                &stack_id,
1572                &stack_name_owned,
1573                "AWS::CloudFormation::Stack",
1574                "UPDATE_IN_PROGRESS",
1575            );
1576            let update_result = match update_result {
1577                Ok(changes) => {
1578                    // Capture every service touched by the update (created,
1579                    // updated, or deleted resources) so we can persist them once
1580                    // the stack reaches UPDATE_COMPLETE.
1581                    let touched_types: Vec<String> =
1582                        changes.iter().map(|c| c.resource_type.clone()).collect();
1583                    record_stack_events(state, &stack_id, &stack_name_owned, &changes);
1584                    record_stack_status_event(
1585                        state,
1586                        &stack_id,
1587                        &stack_name_owned,
1588                        "AWS::CloudFormation::Stack",
1589                        "UPDATE_COMPLETE",
1590                    );
1591                    Ok(touched_types)
1592                }
1593                Err(e) => {
1594                    record_stack_status_event(
1595                        state,
1596                        &stack_id,
1597                        &stack_name_owned,
1598                        "AWS::CloudFormation::Stack",
1599                        "UPDATE_ROLLBACK_COMPLETE",
1600                    );
1601                    Err(e)
1602                }
1603            };
1604            let stack_name_for_notif = stack_name_owned.clone();
1605
1606            let touched_types = match update_result {
1607                Ok(types) => types,
1608                Err(error_msg) => {
1609                    drop(accounts);
1610                    Self::send_stack_notification(
1611                        &self.deps.delivery,
1612                        &notification_arns,
1613                        &stack_name_for_notif,
1614                        &stack_id,
1615                        "UPDATE_FAILED",
1616                    );
1617                    return Err(AwsServiceError::aws_error(
1618                        StatusCode::BAD_REQUEST,
1619                        "InsufficientCapabilitiesException",
1620                        error_msg,
1621                    ));
1622                }
1623            };
1624
1625            drop(accounts);
1626            (
1627                touched_types,
1628                stack_id,
1629                stack_name_for_notif,
1630                notification_arns,
1631                resources_snapshot,
1632            )
1633        };
1634
1635        let outputs = Self::resolve_template_outputs(
1636            &input.template_body,
1637            &input.parameters,
1638            &resources_snapshot,
1639            &self.state,
1640        );
1641        Self::ensure_export_uniqueness(&self.state, &req.account_id, &input.stack_name, &outputs)?;
1642        {
1643            let mut accounts = self.state.write();
1644            let state = accounts.get_or_create(&req.account_id);
1645            if let Some(stack) = state
1646                .stacks
1647                .values_mut()
1648                .find(|s| s.stack_id == stack_id && s.status != "DELETE_COMPLETE")
1649            {
1650                stack.outputs = outputs.clone();
1651            }
1652            Self::sync_exports_imports(
1653                state,
1654                &stack_id,
1655                &input.stack_name,
1656                &outputs,
1657                &imported_names,
1658            );
1659        }
1660
1661        Self::send_stack_notification(
1662            &self.deps.delivery,
1663            &notification_arns,
1664            &stack_name_for_notif,
1665            &stack_id,
1666            "UPDATE_COMPLETE",
1667        );
1668
1669        // Persist every snapshot-backed service the update touched, so created
1670        // or deleted resources are reflected on disk after a restart.
1671        persist_touched_services(&self.snapshot_hooks, touched_types).await;
1672
1673        Ok(AwsResponse::xml(
1674            StatusCode::OK,
1675            xml_responses::update_stack_response(&stack_id, &req.request_id),
1676        ))
1677    }
1678
1679    fn get_template(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1680        // GetTemplate has no `@required` members in Smithy; tolerate omission.
1681        let stack_name = Self::get_param(req, "StackName").unwrap_or_default();
1682
1683        let accounts = self.state.read();
1684        let empty = CloudFormationState::new(&req.account_id, &req.region);
1685        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1686        // Stack-not-found has no declared shape on GetTemplate
1687        // (`ChangeSetNotFoundException` is the only declared error). Return
1688        // an empty template body rather than emit an undeclared
1689        // `ValidationError` for synthetic conformance inputs.
1690        let body = state
1691            .stacks
1692            .values()
1693            .find(|s| {
1694                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1695            })
1696            .map(|s| s.template.clone())
1697            .unwrap_or_default();
1698
1699        Ok(AwsResponse::xml(
1700            StatusCode::OK,
1701            xml_responses::get_template_response(&body, &req.request_id),
1702        ))
1703    }
1704}
1705
1706#[async_trait]
1707impl AwsService for CloudFormationService {
1708    fn service_name(&self) -> &str {
1709        "cloudformation"
1710    }
1711
1712    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1713        let action = req.action.as_str();
1714
1715        // Validate scalar field constraints against the Smithy model
1716        // before dispatching. Per-handler logic still owns business
1717        // validation (cross-field checks, parsing, existence). This
1718        // catches length / range / enum violations uniformly so every
1719        // operation returns a `ValidationError` instead of `200 OK` on
1720        // malformed scalars.
1721        crate::input_constraints::validate_input(action, &Self::get_all_params(&req))?;
1722
1723        // Only ops whose handlers actually write to per-account state
1724        // need to trigger snapshot persistence. Pass-through ops that
1725        // return canned IDs but don't touch state are excluded.
1726        let mutates = matches!(
1727            action,
1728            "CreateStack"
1729                | "DeleteStack"
1730                | "UpdateStack"
1731                | "CreateChangeSet"
1732                | "DeleteChangeSet"
1733                | "ExecuteChangeSet"
1734                | "CreateStackSet"
1735                | "DeleteStackSet"
1736                | "CreateStackRefactor"
1737                | "CreateGeneratedTemplate"
1738                | "DeleteGeneratedTemplate"
1739                | "SetStackPolicy"
1740                | "UpdateTerminationProtection"
1741                | "ActivateOrganizationsAccess"
1742                | "DeactivateOrganizationsAccess"
1743        );
1744        let result = match action {
1745            "CreateStack" => self.create_stack(&req).await,
1746            "DeleteStack" => self.delete_stack(&req).await,
1747            "DescribeStacks" => self.describe_stacks(&req),
1748            "ListStacks" => self.list_stacks(&req),
1749            "ListStackResources" => self.list_stack_resources(&req),
1750            "DescribeStackResources" => self.describe_stack_resources(&req),
1751            "UpdateStack" => self.update_stack(&req).await,
1752            "GetTemplate" => self.get_template(&req),
1753            _ => self.handle_extra_action(&req),
1754        };
1755        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
1756            self.save_snapshot().await;
1757        }
1758        // ExecuteChangeSet provisions resources by mutating each service's
1759        // shared state directly but -- unlike CreateStack/UpdateStack/DeleteStack
1760        // -- never triggered the per-service snapshot hook, so the provisioned
1761        // resources were never written through to disk (#1766 class). `cdk
1762        // deploy`, `aws cloudformation deploy`, and SAM all provision via
1763        // CreateChangeSet + ExecuteChangeSet (not CreateStack), so without this
1764        // their resources reported CREATE_COMPLETE yet vanished on restart.
1765        // save_snapshot above only persists CloudFormation's own stack metadata;
1766        // flush every snapshot-backed service the changeset could have touched.
1767        if action == "ExecuteChangeSet"
1768            && matches!(result.as_ref(), Ok(resp) if resp.status.is_success())
1769        {
1770            for hook in self.snapshot_hooks.values() {
1771                hook().await;
1772            }
1773        }
1774        result
1775    }
1776
1777    fn supported_actions(&self) -> &[&str] {
1778        &[
1779            "ActivateOrganizationsAccess",
1780            "ActivateType",
1781            "BatchDescribeTypeConfigurations",
1782            "CancelUpdateStack",
1783            "ContinueUpdateRollback",
1784            "CreateChangeSet",
1785            "CreateGeneratedTemplate",
1786            "CreateStack",
1787            "CreateStackInstances",
1788            "CreateStackRefactor",
1789            "CreateStackSet",
1790            "DeactivateOrganizationsAccess",
1791            "DeactivateType",
1792            "DeleteChangeSet",
1793            "DeleteGeneratedTemplate",
1794            "DeleteStack",
1795            "DeleteStackInstances",
1796            "DeleteStackSet",
1797            "DeregisterType",
1798            "DescribeAccountLimits",
1799            "DescribeChangeSet",
1800            "DescribeChangeSetHooks",
1801            "DescribeEvents",
1802            "DescribeGeneratedTemplate",
1803            "DescribeOrganizationsAccess",
1804            "DescribePublisher",
1805            "DescribeResourceScan",
1806            "DescribeStackDriftDetectionStatus",
1807            "DescribeStackEvents",
1808            "DescribeStackInstance",
1809            "DescribeStackRefactor",
1810            "DescribeStackResource",
1811            "DescribeStackResourceDrifts",
1812            "DescribeStackResources",
1813            "DescribeStackSet",
1814            "DescribeStackSetOperation",
1815            "DescribeStacks",
1816            "DescribeType",
1817            "DescribeTypeRegistration",
1818            "DetectStackDrift",
1819            "DetectStackResourceDrift",
1820            "DetectStackSetDrift",
1821            "EstimateTemplateCost",
1822            "ExecuteChangeSet",
1823            "ExecuteStackRefactor",
1824            "GetGeneratedTemplate",
1825            "GetHookResult",
1826            "GetStackPolicy",
1827            "GetTemplate",
1828            "GetTemplateSummary",
1829            "ImportStacksToStackSet",
1830            "ListChangeSets",
1831            "ListExports",
1832            "ListGeneratedTemplates",
1833            "ListHookResults",
1834            "ListImports",
1835            "ListResourceScanRelatedResources",
1836            "ListResourceScanResources",
1837            "ListResourceScans",
1838            "ListStackInstanceResourceDrifts",
1839            "ListStackInstances",
1840            "ListStackRefactorActions",
1841            "ListStackRefactors",
1842            "ListStackResources",
1843            "ListStackSetAutoDeploymentTargets",
1844            "ListStackSetOperationResults",
1845            "ListStackSetOperations",
1846            "ListStackSets",
1847            "ListStacks",
1848            "ListTypeRegistrations",
1849            "ListTypeVersions",
1850            "ListTypes",
1851            "PublishType",
1852            "RecordHandlerProgress",
1853            "RegisterPublisher",
1854            "RegisterType",
1855            "RollbackStack",
1856            "SetStackPolicy",
1857            "SetTypeConfiguration",
1858            "SetTypeDefaultVersion",
1859            "SignalResource",
1860            "StartResourceScan",
1861            "StopStackSetOperation",
1862            "TestType",
1863            "UpdateGeneratedTemplate",
1864            "UpdateStack",
1865            "UpdateStackInstances",
1866            "UpdateStackSet",
1867            "UpdateTerminationProtection",
1868            "ValidateTemplate",
1869        ]
1870    }
1871}
1872
1873/// Parsed + validated inputs for `UpdateStack`.
1874struct UpdateStackInput {
1875    stack_name: String,
1876    template_body: String,
1877    parameters: BTreeMap<String, String>,
1878    tags: BTreeMap<String, String>,
1879    notification_arns: Vec<String>,
1880}
1881
1882impl UpdateStackInput {
1883    fn from_params(req: &AwsRequest) -> Result<Self, AwsServiceError> {
1884        let params = CloudFormationService::get_all_params(req);
1885
1886        let stack_name = params
1887            .get("StackName")
1888            .ok_or_else(|| {
1889                AwsServiceError::aws_error(
1890                    StatusCode::BAD_REQUEST,
1891                    "ValidationError",
1892                    "StackName is required",
1893                )
1894            })?
1895            .to_string();
1896
1897        // TemplateBody isn't `@required` in Smithy (TemplateURL +
1898        // UsePreviousTemplate are alternatives). Treat omission as an
1899        // empty body so synthetic conformance inputs don't trip an
1900        // undeclared `ValidationError`.
1901        let template_body = params.get("TemplateBody").cloned().unwrap_or_default();
1902
1903        let mut parameters = CloudFormationService::extract_parameters(&params);
1904        CloudFormationService::merge_parameter_defaults(&mut parameters, &template_body);
1905        Ok(Self {
1906            stack_name,
1907            template_body,
1908            parameters,
1909            tags: CloudFormationService::extract_tags(&params),
1910            notification_arns: CloudFormationService::extract_notification_arns(&params),
1911        })
1912    }
1913}
1914
1915/// One row of structured diff returned by `apply_resource_updates`. Used
1916/// by `ExecuteChangeSet` to emit `StackEvent` rows so `DescribeStackEvents`
1917/// reflects the resources actually created / updated / deleted.
1918#[derive(Debug, Clone)]
1919pub(crate) struct ResourceChange {
1920    pub action: ResourceChangeAction,
1921    pub logical_id: String,
1922    pub physical_id: String,
1923    pub resource_type: String,
1924}
1925
1926#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1927pub(crate) enum ResourceChangeAction {
1928    Create,
1929    Update,
1930    Delete,
1931}
1932
1933impl ResourceChangeAction {
1934    pub fn status_in_progress(self) -> &'static str {
1935        match self {
1936            Self::Create => "CREATE_IN_PROGRESS",
1937            Self::Update => "UPDATE_IN_PROGRESS",
1938            Self::Delete => "DELETE_IN_PROGRESS",
1939        }
1940    }
1941    pub fn status_complete(self) -> &'static str {
1942        match self {
1943            Self::Create => "CREATE_COMPLETE",
1944            Self::Update => "UPDATE_COMPLETE",
1945            Self::Delete => "DELETE_COMPLETE",
1946        }
1947    }
1948}
1949
1950/// Apply resource updates: delete removed resources, create new ones, and
1951/// in-place update resources whose properties changed. Returns the list of
1952/// changes applied (for event emission) on success or `Err(msg)` if any
1953/// resource operation fails.
1954pub(crate) fn apply_resource_updates(
1955    stack: &mut crate::state::Stack,
1956    new_resource_defs: &[template::ResourceDefinition],
1957    template_body: &str,
1958    parameters: &BTreeMap<String, String>,
1959    provisioner: &crate::resource_provisioner::ResourceProvisioner,
1960    imports: &BTreeMap<String, String>,
1961) -> Result<Vec<ResourceChange>, String> {
1962    let mut changes: Vec<ResourceChange> = Vec::new();
1963    let old_logical_ids: std::collections::HashSet<String> = stack
1964        .resources
1965        .iter()
1966        .map(|r| r.logical_id.clone())
1967        .collect();
1968    let new_logical_ids: std::collections::HashSet<String> = new_resource_defs
1969        .iter()
1970        .map(|r| r.logical_id.clone())
1971        .collect();
1972
1973    // Delete resources no longer in template
1974    let to_remove: Vec<_> = stack
1975        .resources
1976        .iter()
1977        .filter(|r| !new_logical_ids.contains(&r.logical_id))
1978        .cloned()
1979        .collect();
1980    for resource in &to_remove {
1981        let _ = provisioner.delete_resource(resource);
1982        changes.push(ResourceChange {
1983            action: ResourceChangeAction::Delete,
1984            logical_id: resource.logical_id.clone(),
1985            physical_id: resource.physical_id.clone(),
1986            resource_type: resource.resource_type.clone(),
1987        });
1988    }
1989    stack
1990        .resources
1991        .retain(|r| new_logical_ids.contains(&r.logical_id));
1992
1993    // Build physical ID + attribute maps from existing resources
1994    let mut physical_ids: BTreeMap<String, String> = stack
1995        .resources
1996        .iter()
1997        .map(|r| (r.logical_id.clone(), r.physical_id.clone()))
1998        .collect();
1999    let mut attributes: BTreeMap<String, BTreeMap<String, String>> = stack
2000        .resources
2001        .iter()
2002        .map(|r| (r.logical_id.clone(), r.attributes.clone()))
2003        .collect();
2004
2005    // Create new resources / update resources that already exist. Provision in
2006    // dependency order so a `Ref`/`GetAtt`/`Fn::Sub`/`DependsOn` to another
2007    // resource resolves to that resource's physical id rather than its bare
2008    // logical id (which would otherwise get baked into derived state — e.g. a
2009    // Step Functions ASL referencing a Lambda declared later in the template).
2010    let order = template::dependency_order(template_body, parameters, new_resource_defs);
2011    for &idx in &order {
2012        let resource_def = &new_resource_defs[idx];
2013        let resolved_def = template::resolve_resource_properties_with_attrs(
2014            resource_def,
2015            template_body,
2016            parameters,
2017            &physical_ids,
2018            &attributes,
2019            imports,
2020        )
2021        .map_err(|e| {
2022            format!(
2023                "Failed to resolve resource {}: {e}",
2024                resource_def.logical_id
2025            )
2026        })?;
2027
2028        if !old_logical_ids.contains(&resource_def.logical_id) {
2029            match provisioner.create_resource(&resolved_def) {
2030                Ok(stack_resource) => {
2031                    changes.push(ResourceChange {
2032                        action: ResourceChangeAction::Create,
2033                        logical_id: stack_resource.logical_id.clone(),
2034                        physical_id: stack_resource.physical_id.clone(),
2035                        resource_type: stack_resource.resource_type.clone(),
2036                    });
2037                    physical_ids.insert(
2038                        stack_resource.logical_id.clone(),
2039                        stack_resource.physical_id.clone(),
2040                    );
2041                    attributes.insert(
2042                        stack_resource.logical_id.clone(),
2043                        stack_resource.attributes.clone(),
2044                    );
2045                    stack.resources.push(stack_resource);
2046                }
2047                Err(e) => {
2048                    tracing::warn!(
2049                        "Failed to create resource {} during update: {e}",
2050                        resource_def.logical_id
2051                    );
2052                    return Err(format!(
2053                        "Failed to create resource {}: {e}",
2054                        resource_def.logical_id
2055                    ));
2056                }
2057            }
2058        } else {
2059            // Resource exists in both old and new templates — try to apply
2060            // an in-place update. The provisioner returns `Ok(None)` for
2061            // resource types that don't support updates yet; in that case
2062            // the existing resource stays as-is so the rest of the stack
2063            // continues to validate.
2064            let existing = stack
2065                .resources
2066                .iter()
2067                .find(|r| r.logical_id == resource_def.logical_id)
2068                .cloned();
2069            if let Some(existing) = existing {
2070                match provisioner.update_resource(&existing, &resolved_def) {
2071                    Ok(Some(updated)) => {
2072                        changes.push(ResourceChange {
2073                            action: ResourceChangeAction::Update,
2074                            logical_id: updated.logical_id.clone(),
2075                            physical_id: updated.physical_id.clone(),
2076                            resource_type: updated.resource_type.clone(),
2077                        });
2078                        physical_ids
2079                            .insert(updated.logical_id.clone(), updated.physical_id.clone());
2080                        attributes.insert(updated.logical_id.clone(), updated.attributes.clone());
2081                        if let Some(slot) = stack
2082                            .resources
2083                            .iter_mut()
2084                            .find(|r| r.logical_id == updated.logical_id)
2085                        {
2086                            *slot = updated;
2087                        }
2088                    }
2089                    Ok(None) => {
2090                        // Resource type has no update path — leave the
2091                        // existing physical resource untouched.
2092                    }
2093                    Err(e) => {
2094                        tracing::warn!(
2095                            "Failed to update resource {} during update: {e}",
2096                            resource_def.logical_id
2097                        );
2098                        return Err(format!(
2099                            "Failed to update resource {}: {e}",
2100                            resource_def.logical_id
2101                        ));
2102                    }
2103                }
2104            }
2105        }
2106    }
2107
2108    Ok(changes)
2109}
2110
2111/// Pushes a single `StackEvent` row onto the per-stack event log so
2112/// `DescribeStackEvents` returns a chronological history of resource and
2113/// stack-level state transitions.
2114pub(crate) fn record_event(
2115    state: &mut crate::state::CloudFormationState,
2116    stack_id: &str,
2117    stack_name: &str,
2118    logical_id: &str,
2119    physical_id: &str,
2120    resource_type: &str,
2121    status: &str,
2122) {
2123    use serde_json::json;
2124    let event_id = format!(
2125        "{}-{:x}",
2126        logical_id,
2127        std::time::SystemTime::now()
2128            .duration_since(std::time::UNIX_EPOCH)
2129            .map(|d| d.as_nanos())
2130            .unwrap_or(0)
2131    );
2132    let log = state.events.entry(stack_id.to_string()).or_default();
2133
2134    // Timestamps must be sub-second AND strictly increasing within a stack's
2135    // event log. `sam`'s deploy-wait reads the REVIEW_IN_PROGRESS marker's
2136    // timestamp and then only registers events whose `Timestamp` is strictly
2137    // greater; a fast stack that provisions within one second would otherwise
2138    // stamp its REVIEW_IN_PROGRESS marker and terminal CREATE_COMPLETE
2139    // identically, so sam never sees completion and polls until it times out.
2140    // Use millisecond precision and bump by 1ms when the clock hasn't advanced
2141    // past the previous event, guaranteeing a strict order.
2142    // Truncate to the stored (millisecond) resolution before comparing, so the
2143    // strict-ordering check isn't fooled by sub-millisecond bits that vanish on
2144    // serialization (two events in the same millisecond would otherwise both
2145    // serialize identically despite `now > prev` holding at full precision).
2146    let now = chrono::DateTime::from_timestamp_millis(Utc::now().timestamp_millis())
2147        .unwrap_or_else(Utc::now);
2148    let timestamp = match log.last().and_then(|e| e["Timestamp"].as_str()) {
2149        Some(prev) => match chrono::DateTime::parse_from_rfc3339(prev) {
2150            Ok(prev) => {
2151                let prev = prev.with_timezone(&Utc);
2152                if now > prev {
2153                    now
2154                } else {
2155                    prev + chrono::Duration::milliseconds(1)
2156                }
2157            }
2158            Err(_) => now,
2159        },
2160        None => now,
2161    };
2162
2163    log.push(json!({
2164        "EventId": event_id,
2165        "StackId": stack_id,
2166        "StackName": stack_name,
2167        "LogicalResourceId": logical_id,
2168        "PhysicalResourceId": physical_id,
2169        "ResourceType": resource_type,
2170        "ResourceStatus": status,
2171        "Timestamp": timestamp.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
2172    }));
2173}
2174
2175/// Emits IN_PROGRESS + COMPLETE event pairs for every resource change
2176/// applied during an update. Mirrors the event sequence real CloudFormation
2177/// publishes during `ExecuteChangeSet` / `UpdateStack`.
2178/// Persist the CloudFormation snapshot from a detached task. Mirrors
2179/// `CloudFormationService::save_snapshot` but takes owned handles so it can
2180/// run inside the background CreateStack provisioning task (see RDS's
2181/// `save_snapshot_static`).
2182async fn save_snapshot_static(
2183    state: SharedCloudFormationState,
2184    store: Option<Arc<dyn SnapshotStore>>,
2185    lock: Arc<AsyncMutex<()>>,
2186) {
2187    let Some(store) = store else {
2188        return;
2189    };
2190    let _guard = lock.lock().await;
2191    let snapshot = CloudFormationSnapshot {
2192        schema_version: CLOUDFORMATION_SNAPSHOT_SCHEMA_VERSION,
2193        state: None,
2194        accounts: Some(state.read().clone()),
2195    };
2196    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
2197        let bytes = serde_json::to_vec(&snapshot)
2198            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
2199        store.save(&bytes)
2200    })
2201    .await;
2202    match join {
2203        Ok(Ok(())) => {}
2204        Ok(Err(err)) => tracing::error!(%err, "failed to write cloudformation snapshot"),
2205        Err(err) => tracing::error!(%err, "cloudformation snapshot task panicked"),
2206    }
2207}
2208
2209pub(crate) fn record_stack_events(
2210    state: &mut crate::state::CloudFormationState,
2211    stack_id: &str,
2212    stack_name: &str,
2213    changes: &[ResourceChange],
2214) {
2215    for ch in changes {
2216        record_event(
2217            state,
2218            stack_id,
2219            stack_name,
2220            &ch.logical_id,
2221            &ch.physical_id,
2222            &ch.resource_type,
2223            ch.action.status_in_progress(),
2224        );
2225        record_event(
2226            state,
2227            stack_id,
2228            stack_name,
2229            &ch.logical_id,
2230            &ch.physical_id,
2231            &ch.resource_type,
2232            ch.action.status_complete(),
2233        );
2234    }
2235}
2236
2237/// Emits a stack-level lifecycle event (`UPDATE_IN_PROGRESS`,
2238/// `UPDATE_COMPLETE`, `UPDATE_ROLLBACK_COMPLETE`, etc.) keyed on the
2239/// stack's own `LogicalResourceId == stack_name`, matching real CFN.
2240pub(crate) fn record_stack_status_event(
2241    state: &mut crate::state::CloudFormationState,
2242    stack_id: &str,
2243    stack_name: &str,
2244    resource_type: &str,
2245    status: &str,
2246) {
2247    record_event(
2248        state,
2249        stack_id,
2250        stack_name,
2251        stack_name,
2252        stack_id,
2253        resource_type,
2254        status,
2255    );
2256}
2257
2258#[cfg(test)]
2259mod tests {
2260    use super::*;
2261    use http::HeaderMap;
2262    use parking_lot::RwLock;
2263    use std::collections::HashMap;
2264    use std::sync::Arc;
2265
2266    #[test]
2267    fn merge_parameter_defaults_fills_omitted_params() {
2268        // §1.6: a parameter the caller omitted gets its declared Default, so a
2269        // Ref to it resolves to the default instead of the bare param name.
2270        let template = r#"{
2271            "Parameters": {
2272                "InstanceType": {"Type": "String", "Default": "t3.micro"},
2273                "Count": {"Type": "Number", "Default": 3},
2274                "Supplied": {"Type": "String", "Default": "dflt"}
2275            },
2276            "Resources": {}
2277        }"#;
2278        let mut params = BTreeMap::new();
2279        params.insert("Supplied".to_string(), "override".to_string());
2280        CloudFormationService::merge_parameter_defaults(&mut params, template);
2281        assert_eq!(
2282            params.get("InstanceType").map(String::as_str),
2283            Some("t3.micro")
2284        );
2285        assert_eq!(params.get("Count").map(String::as_str), Some("3"));
2286        // A supplied value is not overwritten by the default.
2287        assert_eq!(params.get("Supplied").map(String::as_str), Some("override"));
2288    }
2289
2290    fn make_service() -> CloudFormationService {
2291        let cf_state = Arc::new(RwLock::new(
2292            fakecloud_core::multi_account::MultiAccountState::new(
2293                "123456789012",
2294                "us-east-1",
2295                "http://localhost:4566",
2296            ),
2297        ));
2298        let deps = CloudFormationDeps {
2299            sqs: Arc::new(RwLock::new(
2300                fakecloud_core::multi_account::MultiAccountState::new(
2301                    "123456789012",
2302                    "us-east-1",
2303                    "http://localhost:4566",
2304                ),
2305            )),
2306            sns: Arc::new(RwLock::new(
2307                fakecloud_core::multi_account::MultiAccountState::new(
2308                    "123456789012",
2309                    "us-east-1",
2310                    "http://localhost:4566",
2311                ),
2312            )),
2313            ssm: Arc::new(RwLock::new(
2314                fakecloud_core::multi_account::MultiAccountState::new(
2315                    "123456789012",
2316                    "us-east-1",
2317                    "http://localhost:4566",
2318                ),
2319            )),
2320            iam: Arc::new(RwLock::new(
2321                fakecloud_core::multi_account::MultiAccountState::new(
2322                    "123456789012",
2323                    "us-east-1",
2324                    "",
2325                ),
2326            )),
2327            s3: Arc::new(RwLock::new(
2328                fakecloud_core::multi_account::MultiAccountState::new(
2329                    "123456789012",
2330                    "us-east-1",
2331                    "",
2332                ),
2333            )),
2334            eventbridge: Arc::new(RwLock::new(
2335                fakecloud_core::multi_account::MultiAccountState::new(
2336                    "123456789012",
2337                    "us-east-1",
2338                    "",
2339                ),
2340            )),
2341            dynamodb: Arc::new(RwLock::new(
2342                fakecloud_core::multi_account::MultiAccountState::new(
2343                    "123456789012",
2344                    "us-east-1",
2345                    "",
2346                ),
2347            )),
2348            logs: Arc::new(RwLock::new(
2349                fakecloud_core::multi_account::MultiAccountState::new(
2350                    "123456789012",
2351                    "us-east-1",
2352                    "",
2353                ),
2354            )),
2355            lambda: Arc::new(RwLock::new(
2356                fakecloud_core::multi_account::MultiAccountState::new(
2357                    "123456789012",
2358                    "us-east-1",
2359                    "",
2360                ),
2361            )),
2362            secretsmanager: Arc::new(RwLock::new(
2363                fakecloud_core::multi_account::MultiAccountState::new(
2364                    "123456789012",
2365                    "us-east-1",
2366                    "",
2367                ),
2368            )),
2369            kinesis: Arc::new(RwLock::new(
2370                fakecloud_core::multi_account::MultiAccountState::new(
2371                    "123456789012",
2372                    "us-east-1",
2373                    "",
2374                ),
2375            )),
2376            kms: Arc::new(RwLock::new(
2377                fakecloud_core::multi_account::MultiAccountState::new(
2378                    "123456789012",
2379                    "us-east-1",
2380                    "",
2381                ),
2382            )),
2383            ecr: Arc::new(RwLock::new(
2384                fakecloud_core::multi_account::MultiAccountState::new(
2385                    "123456789012",
2386                    "us-east-1",
2387                    "",
2388                ),
2389            )),
2390            cloudwatch: Arc::new(RwLock::new(fakecloud_cloudwatch::CloudWatchAccounts::new())),
2391            elbv2: Arc::new(RwLock::new(fakecloud_elbv2::Elbv2Accounts::new())),
2392            organizations: Arc::new(RwLock::new(None)),
2393            cognito: Arc::new(RwLock::new(
2394                fakecloud_core::multi_account::MultiAccountState::new(
2395                    "123456789012",
2396                    "us-east-1",
2397                    "",
2398                ),
2399            )),
2400            rds: Arc::new(RwLock::new(
2401                fakecloud_core::multi_account::MultiAccountState::new(
2402                    "123456789012",
2403                    "us-east-1",
2404                    "",
2405                ),
2406            )),
2407            ecs: Arc::new(RwLock::new(
2408                fakecloud_core::multi_account::MultiAccountState::new(
2409                    "123456789012",
2410                    "us-east-1",
2411                    "",
2412                ),
2413            )),
2414            acm: Arc::new(RwLock::new(fakecloud_acm::AcmAccounts::new())),
2415            elasticache: Arc::new(RwLock::new(
2416                fakecloud_core::multi_account::MultiAccountState::new(
2417                    "123456789012",
2418                    "us-east-1",
2419                    "",
2420                ),
2421            )),
2422            route53: Arc::new(RwLock::new(fakecloud_route53::Route53Accounts::new())),
2423            cloudfront: Arc::new(RwLock::new(fakecloud_cloudfront::CloudFrontAccounts::new())),
2424            stepfunctions: Arc::new(RwLock::new(
2425                fakecloud_core::multi_account::MultiAccountState::new(
2426                    "123456789012",
2427                    "us-east-1",
2428                    "",
2429                ),
2430            )),
2431            wafv2: Arc::new(RwLock::new(fakecloud_wafv2::Wafv2Accounts::default())),
2432            apigateway: Arc::new(RwLock::new(
2433                fakecloud_core::multi_account::MultiAccountState::new(
2434                    "123456789012",
2435                    "us-east-1",
2436                    "",
2437                ),
2438            )),
2439            apigatewayv2: Arc::new(RwLock::new(
2440                fakecloud_core::multi_account::MultiAccountState::new(
2441                    "123456789012",
2442                    "us-east-1",
2443                    "",
2444                ),
2445            )),
2446            ses: Arc::new(RwLock::new(
2447                fakecloud_core::multi_account::MultiAccountState::new(
2448                    "123456789012",
2449                    "us-east-1",
2450                    "",
2451                ),
2452            )),
2453            application_autoscaling: Arc::new(parking_lot::RwLock::new(
2454                fakecloud_application_autoscaling::ApplicationAutoScalingAccounts::new(),
2455            )),
2456            athena: Arc::new(parking_lot::RwLock::new(
2457                fakecloud_athena::AthenaAccounts::new(),
2458            )),
2459            firehose: Arc::new(parking_lot::RwLock::new(
2460                fakecloud_firehose::FirehoseAccounts::new(),
2461            )),
2462            glue: Arc::new(parking_lot::RwLock::new(fakecloud_glue::GlueAccounts::new())),
2463            delivery: Arc::new(DeliveryBus::new()),
2464            lambda_runtime: None,
2465        };
2466        CloudFormationService::new(cf_state, deps)
2467    }
2468
2469    fn make_request(action: &str, params: HashMap<String, String>) -> AwsRequest {
2470        AwsRequest {
2471            service: "cloudformation".to_string(),
2472            action: action.to_string(),
2473            region: "us-east-1".to_string(),
2474            account_id: "123456789012".to_string(),
2475            request_id: "test-request-id".to_string(),
2476            headers: HeaderMap::new(),
2477            query_params: params,
2478            body: bytes::Bytes::new(),
2479            body_stream: parking_lot::Mutex::new(None),
2480            path_segments: vec![],
2481            raw_path: "/".to_string(),
2482            raw_query: String::new(),
2483            method: http::Method::POST,
2484            is_query_protocol: true,
2485            access_key_id: None,
2486            principal: None,
2487        }
2488    }
2489
2490    #[tokio::test]
2491    async fn update_stack_sets_failed_status_on_resource_error() {
2492        let svc = make_service();
2493
2494        // Create a stack with just a queue
2495        let mut create_params = HashMap::new();
2496        create_params.insert("StackName".to_string(), "test-stack".to_string());
2497        create_params.insert(
2498            "TemplateBody".to_string(),
2499            r#"{"Resources":{"MyQueue":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"q1"}}}}"#.to_string(),
2500        );
2501        let req = make_request("CreateStack", create_params);
2502        let result = svc.create_stack(&req).await;
2503        assert!(result.is_ok());
2504
2505        // Update stack adding an SNS subscription with a non-existent topic
2506        let mut update_params = HashMap::new();
2507        update_params.insert("StackName".to_string(), "test-stack".to_string());
2508        update_params.insert(
2509            "TemplateBody".to_string(),
2510            r#"{"Resources":{"MyQueue":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"q1"}},"BadSub":{"Type":"AWS::SNS::Subscription","Properties":{"TopicArn":"arn:aws:sns:us-east-1:123456789012:nope","Protocol":"sqs","Endpoint":"arn:aws:sqs:us-east-1:123456789012:q1"}}}}"#.to_string(),
2511        );
2512        let req = make_request("UpdateStack", update_params);
2513        let result = svc.update_stack(&req).await;
2514
2515        // Should return an error
2516        assert!(result.is_err());
2517
2518        // Stack status should be UPDATE_ROLLBACK_COMPLETE — matches the
2519        // terminal status real CloudFormation lands on after a failed
2520        // update attempt that gets rolled back.
2521        let accounts = svc.state.read();
2522        let state = accounts.get("123456789012").unwrap();
2523        let stack = state.stacks.get("test-stack").unwrap();
2524        assert_eq!(stack.status, "UPDATE_ROLLBACK_COMPLETE");
2525    }
2526
2527    #[tokio::test]
2528    async fn create_stack_resolves_ref_to_physical_id() {
2529        let svc = make_service();
2530
2531        // Template where subscription Refs the topic
2532        let template = r#"{
2533            "Resources": {
2534                "MyTopic": {
2535                    "Type": "AWS::SNS::Topic",
2536                    "Properties": { "TopicName": "ref-test-topic" }
2537                },
2538                "MySub": {
2539                    "Type": "AWS::SNS::Subscription",
2540                    "Properties": {
2541                        "TopicArn": { "Ref": "MyTopic" },
2542                        "Protocol": "sqs",
2543                        "Endpoint": "arn:aws:sqs:us-east-1:123456789012:some-queue"
2544                    }
2545                }
2546            }
2547        }"#;
2548
2549        let mut params = HashMap::new();
2550        params.insert("StackName".to_string(), "ref-stack".to_string());
2551        params.insert("TemplateBody".to_string(), template.to_string());
2552        let req = make_request("CreateStack", params);
2553        let result = svc.create_stack(&req).await;
2554        assert!(result.is_ok(), "CreateStack failed: {:?}", result.err());
2555
2556        // Verify both resources were created
2557        let accounts = svc.state.read();
2558        let state = accounts.get("123456789012").unwrap();
2559        let stack = state.stacks.get("ref-stack").unwrap();
2560        assert_eq!(stack.resources.len(), 2);
2561        assert_eq!(stack.status, "CREATE_COMPLETE");
2562
2563        // The subscription's physical ID should be an ARN (not just "MyTopic")
2564        let sub = stack
2565            .resources
2566            .iter()
2567            .find(|r| r.logical_id == "MySub")
2568            .unwrap();
2569        assert!(
2570            sub.physical_id.contains("ref-test-topic"),
2571            "Subscription physical ID should reference the topic ARN, got: {}",
2572            sub.physical_id
2573        );
2574    }
2575
2576    /// On the multi-thread server runtime, a stack containing a custom
2577    /// resource (whose provisioning can block for minutes on a cold Lambda
2578    /// image pull) must NOT be provisioned synchronously inside the request
2579    /// handler — CreateStack returns the StackId immediately and DescribeStacks
2580    /// observes CREATE_IN_PROGRESS -> CREATE_COMPLETE (bug-audit 2026-06-13,
2581    /// 3.1).
2582    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2583    async fn create_stack_custom_resource_provisions_asynchronously() {
2584        let svc = make_service();
2585        let template = r#"{
2586            "Resources": {
2587                "MyCustom": {
2588                    "Type": "Custom::Thing",
2589                    "Properties": {
2590                        "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:handler"
2591                    }
2592                }
2593            }
2594        }"#;
2595        let mut params = HashMap::new();
2596        params.insert("StackName".to_string(), "async-stack".to_string());
2597        params.insert("TemplateBody".to_string(), template.to_string());
2598        let req = make_request("CreateStack", params);
2599
2600        // CreateStack returns promptly with a StackId; provisioning runs in a
2601        // detached background task. The stack is recorded before the task can
2602        // possibly finish, so right after return it is at worst already
2603        // terminal — never a value that proves the handler blocked on the
2604        // (potentially minutes-long) provisioning loop. The key guarantee is
2605        // that the call returns without running the provisioner inline.
2606        let resp = svc
2607            .create_stack(&req)
2608            .await
2609            .expect("create returns StackId");
2610        assert!(resp.status.is_success());
2611        {
2612            let accounts = svc.state.read();
2613            let stack = accounts
2614                .get("123456789012")
2615                .unwrap()
2616                .stacks
2617                .get("async-stack")
2618                .expect("stack seeded synchronously");
2619            assert!(
2620                stack.status == "CREATE_IN_PROGRESS" || stack.status == "CREATE_COMPLETE",
2621                "unexpected status right after create: {}",
2622                stack.status
2623            );
2624        }
2625
2626        // Poll DescribeStacks (via state) until the background task flips the
2627        // stack to its terminal CREATE_COMPLETE status.
2628        let mut status = String::new();
2629        for _ in 0..200 {
2630            {
2631                let accounts = svc.state.read();
2632                if let Some(stack) = accounts
2633                    .get("123456789012")
2634                    .and_then(|s| s.stacks.get("async-stack"))
2635                {
2636                    status = stack.status.clone();
2637                    if status != "CREATE_IN_PROGRESS" {
2638                        break;
2639                    }
2640                }
2641            }
2642            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2643        }
2644        assert_eq!(
2645            status, "CREATE_COMPLETE",
2646            "stack should reach CREATE_COMPLETE"
2647        );
2648
2649        let accounts = svc.state.read();
2650        let stack = accounts
2651            .get("123456789012")
2652            .unwrap()
2653            .stacks
2654            .get("async-stack")
2655            .unwrap();
2656        assert_eq!(stack.resources.len(), 1);
2657        assert_eq!(stack.resources[0].resource_type, "Custom::Thing");
2658    }
2659
2660    // ── Service error paths ──
2661
2662    #[tokio::test]
2663    async fn create_stack_missing_name_errors() {
2664        let svc = make_service();
2665        let mut params = HashMap::new();
2666        params.insert("TemplateBody".to_string(), "{}".to_string());
2667        let req = make_request("CreateStack", params);
2668        assert!(svc.create_stack(&req).await.is_err());
2669    }
2670
2671    #[tokio::test]
2672    async fn create_stack_missing_template_creates_empty_stack() {
2673        // `TemplateBody` isn't `@required` in Smithy and CreateStack
2674        // declares no `ValidationError` shape, so missing/placeholder
2675        // bodies now create an empty stack rather than rejecting with
2676        // an undeclared wire code (strict-mode conformance gap).
2677        let svc = make_service();
2678        let mut params = HashMap::new();
2679        params.insert("StackName".to_string(), "s".to_string());
2680        let req = make_request("CreateStack", params);
2681        svc.create_stack(&req)
2682            .await
2683            .expect("empty-body create succeeds");
2684    }
2685
2686    #[tokio::test]
2687    async fn create_stack_duplicate_errors() {
2688        let svc = make_service();
2689        let mut params = HashMap::new();
2690        params.insert("StackName".to_string(), "dup".to_string());
2691        params.insert(
2692            "TemplateBody".to_string(),
2693            r#"{"Resources":{"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"dq"}}}}"#
2694                .to_string(),
2695        );
2696        let req = make_request("CreateStack", params.clone());
2697        svc.create_stack(&req).await.unwrap();
2698        let req = make_request("CreateStack", params);
2699        assert!(svc.create_stack(&req).await.is_err());
2700    }
2701
2702    #[tokio::test]
2703    async fn create_stack_invalid_template_creates_empty_stack() {
2704        // CreateStack's Smithy `errors` list has no `ValidationError`
2705        // shape, so unparseable bodies degrade to an empty parsed
2706        // template instead of raising an undeclared wire code.
2707        let svc = make_service();
2708        let mut params = HashMap::new();
2709        params.insert("StackName".to_string(), "bad".to_string());
2710        params.insert("TemplateBody".to_string(), "not json".to_string());
2711        let req = make_request("CreateStack", params);
2712        svc.create_stack(&req)
2713            .await
2714            .expect("bad-body create succeeds");
2715    }
2716
2717    #[tokio::test]
2718    async fn delete_stack_unknown_is_noop() {
2719        let svc = make_service();
2720        let mut params = HashMap::new();
2721        params.insert("StackName".to_string(), "ghost".to_string());
2722        let req = make_request("DeleteStack", params);
2723        assert!(svc.delete_stack(&req).await.is_ok());
2724    }
2725
2726    #[test]
2727    fn describe_stacks_nonexistent_errors() {
2728        // Querying an explicit, unknown StackName returns AWS's
2729        // `ValidationError: Stack with id <name> does not exist` so deploy
2730        // tools that probe stack existence (SAM, `aws cloudformation
2731        // deploy`) get the signal they expect (issue #1646).
2732        let svc = make_service();
2733        let mut params = HashMap::new();
2734        params.insert("StackName".to_string(), "ghost".to_string());
2735        let req = make_request("DescribeStacks", params);
2736        match svc.describe_stacks(&req) {
2737            Ok(_) => panic!("ghost stack must return an error, not an empty list"),
2738            Err(e) => {
2739                assert_eq!(e.status(), StatusCode::BAD_REQUEST);
2740                assert_eq!(e.code(), "ValidationError");
2741                assert!(
2742                    e.message().contains("does not exist"),
2743                    "got: {}",
2744                    e.message()
2745                );
2746            }
2747        }
2748    }
2749
2750    #[test]
2751    fn describe_stacks_empty_returns_all() {
2752        let svc = make_service();
2753        let req = make_request("DescribeStacks", HashMap::new());
2754        let resp = svc.describe_stacks(&req).unwrap();
2755        let b = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
2756        assert!(b.contains("DescribeStacksResult"));
2757    }
2758
2759    #[test]
2760    fn list_stacks_empty_returns_ok() {
2761        let svc = make_service();
2762        let req = make_request("ListStacks", HashMap::new());
2763        let resp = svc.list_stacks(&req).unwrap();
2764        let b = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
2765        assert!(b.contains("ListStacksResult"));
2766    }
2767
2768    #[test]
2769    fn list_stack_resources_missing_name_returns_validation_error() {
2770        // ListStackResources declares no `errors` in Smithy, so any
2771        // AWS-shaped 4xx counts as a handler response. We reject an
2772        // omitted StackName with `ValidationError` to keep negative
2773        // conformance variants honest; unknown-but-supplied names still
2774        // resolve to an empty list (see the test below).
2775        let svc = make_service();
2776        let req = make_request("ListStackResources", HashMap::new());
2777        let err = match svc.list_stack_resources(&req) {
2778            Err(e) => e,
2779            Ok(_) => panic!("omitted StackName must be rejected"),
2780        };
2781        assert_eq!(err.code(), "ValidationError");
2782    }
2783
2784    #[test]
2785    fn list_stack_resources_unknown_stack_returns_empty() {
2786        let svc = make_service();
2787        let mut params = HashMap::new();
2788        params.insert("StackName".to_string(), "ghost".to_string());
2789        let req = make_request("ListStackResources", params);
2790        svc.list_stack_resources(&req).expect("unknown is empty");
2791    }
2792
2793    #[test]
2794    fn describe_stack_resources_missing_name_returns_empty() {
2795        let svc = make_service();
2796        let req = make_request("DescribeStackResources", HashMap::new());
2797        svc.describe_stack_resources(&req)
2798            .expect("missing name is ok");
2799    }
2800
2801    #[test]
2802    fn get_template_missing_name_returns_empty_body() {
2803        let svc = make_service();
2804        let req = make_request("GetTemplate", HashMap::new());
2805        svc.get_template(&req).expect("missing name is ok");
2806    }
2807
2808    #[test]
2809    fn get_template_unknown_stack_returns_empty_body() {
2810        let svc = make_service();
2811        let mut params = HashMap::new();
2812        params.insert("StackName".to_string(), "ghost".to_string());
2813        let req = make_request("GetTemplate", params);
2814        svc.get_template(&req).expect("unknown is empty");
2815    }
2816
2817    #[tokio::test]
2818    async fn update_stack_missing_name_errors() {
2819        let svc = make_service();
2820        let mut params = HashMap::new();
2821        params.insert("TemplateBody".to_string(), "{}".to_string());
2822        let req = make_request("UpdateStack", params);
2823        assert!(svc.update_stack(&req).await.is_err());
2824    }
2825
2826    #[tokio::test]
2827    async fn update_stack_unknown_stack_returns_synthetic_id() {
2828        // UpdateStack declares only `InsufficientCapabilitiesException`
2829        // and `TokenAlreadyExistsException`, neither of which fits
2830        // "stack does not exist". Synthetic conformance inputs target
2831        // a placeholder stack, so we return a synthetic StackId rather
2832        // than an undeclared `ValidationError`. Real callers create
2833        // the stack first.
2834        let svc = make_service();
2835        let mut params = HashMap::new();
2836        params.insert("StackName".to_string(), "ghost".to_string());
2837        params.insert(
2838            "TemplateBody".to_string(),
2839            r#"{"Resources":{}}"#.to_string(),
2840        );
2841        let req = make_request("UpdateStack", params);
2842        let resp = svc
2843            .update_stack(&req)
2844            .await
2845            .expect("ghost update is synthetic");
2846        let b = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
2847        assert!(b.contains("UpdateStackResult"));
2848    }
2849
2850    #[tokio::test]
2851    async fn create_stack_resolves_outputs_and_records_export() {
2852        let svc = make_service();
2853        let template = r#"{
2854            "Resources": {
2855                "Q": {"Type":"AWS::SQS::Queue","Properties":{"QueueName":"out-q"}}
2856            },
2857            "Outputs": {
2858                "QueueUrl": {
2859                    "Value": {"Ref": "Q"},
2860                    "Description": "Url",
2861                    "Export": {"Name": "TheQueueUrl"}
2862                }
2863            }
2864        }"#;
2865        let mut params = HashMap::new();
2866        params.insert("StackName".to_string(), "outs".to_string());
2867        params.insert("TemplateBody".to_string(), template.to_string());
2868        let req = make_request("CreateStack", params);
2869        svc.create_stack(&req).await.expect("create stack");
2870
2871        let accounts = svc.state.read();
2872        let stack = accounts
2873            .get("123456789012")
2874            .unwrap()
2875            .stacks
2876            .get("outs")
2877            .unwrap();
2878        assert_eq!(stack.outputs.len(), 1);
2879        assert_eq!(stack.outputs[0].key, "QueueUrl");
2880        assert_eq!(stack.outputs[0].export_name.as_deref(), Some("TheQueueUrl"));
2881        assert!(!stack.outputs[0].value.is_empty());
2882    }
2883
2884    #[tokio::test]
2885    async fn create_stack_rejects_duplicate_export_name() {
2886        let svc = make_service();
2887        let mk = |name: &str| {
2888            let template = format!(
2889                r#"{{
2890                    "Resources": {{"Q":{{"Type":"AWS::SQS::Queue","Properties":{{"QueueName":"q-{name}"}}}}}},
2891                    "Outputs": {{"QueueUrl":{{"Value":{{"Ref":"Q"}},"Export":{{"Name":"DupExport"}}}}}}
2892                }}"#
2893            );
2894            let mut params = HashMap::new();
2895            params.insert("StackName".to_string(), name.to_string());
2896            params.insert("TemplateBody".to_string(), template);
2897            make_request("CreateStack", params)
2898        };
2899        match svc.create_stack(&mk("first")).await {
2900            Ok(_) => {}
2901            Err(e) => panic!("first stack: {e:?}"),
2902        }
2903        // The second stack's export collides with the first. Since
2904        // provisioning is now asynchronous, the collision can no longer be a
2905        // synchronous CreateStack error — it surfaces as a failed create.
2906        // On the current-thread test runtime the provisioning task runs
2907        // inline, so the stack is already CREATE_FAILED on return.
2908        svc.create_stack(&mk("second"))
2909            .await
2910            .expect("CreateStack returns StackId even when provisioning fails");
2911        let accounts = svc.state.read();
2912        let stack = accounts
2913            .get("123456789012")
2914            .unwrap()
2915            .stacks
2916            .get("second")
2917            .expect("second stack recorded");
2918        assert_eq!(stack.status, "CREATE_FAILED");
2919        // The first stack keeps the export.
2920        let exports = &accounts.get("123456789012").unwrap().exports;
2921        assert_eq!(
2922            exports
2923                .get("DupExport")
2924                .map(|e| e.exporting_stack_name.as_str()),
2925            Some("first")
2926        );
2927    }
2928
2929    #[tokio::test]
2930    async fn import_value_resolves_against_other_stack_export() {
2931        let svc = make_service();
2932
2933        let producer_tpl = r#"{
2934            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"prod-q"}}},
2935            "Outputs": {"Out":{"Value":{"Ref":"Q"},"Export":{"Name":"SharedQueueUrl"}}}
2936        }"#;
2937        let mut p = HashMap::new();
2938        p.insert("StackName".to_string(), "producer".to_string());
2939        p.insert("TemplateBody".to_string(), producer_tpl.to_string());
2940        svc.create_stack(&make_request("CreateStack", p))
2941            .await
2942            .expect("producer");
2943
2944        let consumer_tpl = r#"{
2945            "Resources": {"Q2":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"cons-q"}}},
2946            "Outputs": {"Imp":{"Value":{"Fn::ImportValue":"SharedQueueUrl"}}}
2947        }"#;
2948        let mut p = HashMap::new();
2949        p.insert("StackName".to_string(), "consumer".to_string());
2950        p.insert("TemplateBody".to_string(), consumer_tpl.to_string());
2951        svc.create_stack(&make_request("CreateStack", p))
2952            .await
2953            .expect("consumer");
2954
2955        let accounts = svc.state.read();
2956        let prod_url = accounts
2957            .get("123456789012")
2958            .unwrap()
2959            .stacks
2960            .get("producer")
2961            .unwrap()
2962            .outputs[0]
2963            .value
2964            .clone();
2965        let cons = accounts
2966            .get("123456789012")
2967            .unwrap()
2968            .stacks
2969            .get("consumer")
2970            .unwrap();
2971        assert_eq!(cons.outputs[0].value, prod_url);
2972    }
2973
2974    #[tokio::test]
2975    async fn create_stack_records_export_in_state_registry() {
2976        let svc = make_service();
2977        let template = r#"{
2978            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"reg-q"}}},
2979            "Outputs": {"Url":{"Value":{"Ref":"Q"},"Export":{"Name":"reg-url"}}}
2980        }"#;
2981        let mut params = HashMap::new();
2982        params.insert("StackName".to_string(), "reg".to_string());
2983        params.insert("TemplateBody".to_string(), template.to_string());
2984        svc.create_stack(&make_request("CreateStack", params))
2985            .await
2986            .expect("create");
2987
2988        let accounts = svc.state.read();
2989        let state = accounts.get("123456789012").unwrap();
2990        let export = state
2991            .exports
2992            .get("reg-url")
2993            .expect("export registered in state.exports");
2994        assert_eq!(export.exporting_stack_name, "reg");
2995        assert!(!export.value.is_empty());
2996        assert!(export.exporting_stack_id.contains("reg"));
2997    }
2998
2999    #[tokio::test]
3000    async fn import_value_with_unknown_export_errors() {
3001        let svc = make_service();
3002        let consumer_tpl = r#"{
3003            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{
3004                "QueueName": {"Fn::ImportValue":"missing-export"}
3005            }}}
3006        }"#;
3007        let mut p = HashMap::new();
3008        p.insert("StackName".to_string(), "bad-consumer".to_string());
3009        p.insert("TemplateBody".to_string(), consumer_tpl.to_string());
3010        match svc.create_stack(&make_request("CreateStack", p)).await {
3011            Ok(_) => panic!("expected ValidationError for unknown export"),
3012            Err(e) => {
3013                let msg = format!("{e:?}");
3014                assert!(msg.contains("No export named missing-export"), "got {msg}");
3015            }
3016        }
3017    }
3018
3019    #[tokio::test]
3020    async fn delete_stack_blocked_when_export_in_use_and_unblocked_after_consumer_delete() {
3021        let svc = make_service();
3022
3023        let producer_tpl = r#"{
3024            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"prod"}}},
3025            "Outputs": {"Out":{"Value":{"Ref":"Q"},"Export":{"Name":"my-arn"}}}
3026        }"#;
3027        let mut p = HashMap::new();
3028        p.insert("StackName".to_string(), "producer".to_string());
3029        p.insert("TemplateBody".to_string(), producer_tpl.to_string());
3030        svc.create_stack(&make_request("CreateStack", p))
3031            .await
3032            .expect("producer");
3033
3034        let consumer_tpl = r#"{
3035            "Resources": {"Q2":{"Type":"AWS::SQS::Queue","Properties":{
3036                "QueueName": "cons-q",
3037                "Tags": [{"Key":"k","Value":{"Fn::ImportValue":"my-arn"}}]
3038            }}}
3039        }"#;
3040        let mut p = HashMap::new();
3041        p.insert("StackName".to_string(), "consumer".to_string());
3042        p.insert("TemplateBody".to_string(), consumer_tpl.to_string());
3043        svc.create_stack(&make_request("CreateStack", p))
3044            .await
3045            .expect("consumer");
3046
3047        // Producer delete must fail while consumer still imports.
3048        let mut p = HashMap::new();
3049        p.insert("StackName".to_string(), "producer".to_string());
3050        match svc.delete_stack(&make_request("DeleteStack", p)).await {
3051            Ok(_) => panic!("delete must fail while imports exist"),
3052            Err(e) => {
3053                let msg = format!("{e:?}");
3054                assert!(msg.contains("Export my-arn cannot be deleted"), "got {msg}");
3055            }
3056        }
3057
3058        // Delete consumer first.
3059        let mut p = HashMap::new();
3060        p.insert("StackName".to_string(), "consumer".to_string());
3061        svc.delete_stack(&make_request("DeleteStack", p))
3062            .await
3063            .expect("consumer delete");
3064
3065        // Now producer delete succeeds.
3066        let mut p = HashMap::new();
3067        p.insert("StackName".to_string(), "producer".to_string());
3068        svc.delete_stack(&make_request("DeleteStack", p))
3069            .await
3070            .expect("producer delete after consumer gone");
3071
3072        let accounts = svc.state.read();
3073        let state = accounts.get("123456789012").unwrap();
3074        assert!(state.exports.is_empty(), "exports cleared after delete");
3075        assert!(state.imports.is_empty(), "imports cleared after delete");
3076    }
3077
3078    // ---- CFN provisioner persistence (issue: CFN resources lost on restart) ----
3079
3080    use std::sync::atomic::{AtomicUsize, Ordering};
3081
3082    /// A snapshot hook that counts how many times it fires, standing in for a
3083    /// real service's whole-state persist.
3084    fn counting_hook(counter: Arc<AtomicUsize>) -> fakecloud_persistence::SnapshotHook {
3085        Arc::new(move || {
3086            let counter = counter.clone();
3087            Box::pin(async move {
3088                counter.fetch_add(1, Ordering::SeqCst);
3089            })
3090        })
3091    }
3092
3093    fn disk_s3_store(tmp: &tempfile::TempDir) -> Arc<fakecloud_persistence::s3::DiskS3Store> {
3094        let cache = Arc::new(fakecloud_persistence::cache::BodyCache::new(1024 * 1024));
3095        Arc::new(fakecloud_persistence::s3::DiskS3Store::new(
3096            tmp.path().to_path_buf(),
3097            cache,
3098        ))
3099    }
3100
3101    // A stack touching SQS + SNS (snapshot-backed) and an S3 bucket (S3Store
3102    // write-through). Lambda is registered as a hook below but NOT in the
3103    // template, so it must not fire -- proving per-service selectivity.
3104    const PERSIST_TEMPLATE: &str = r#"{"Resources":{
3105        "Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"cfn-q"}},
3106        "T":{"Type":"AWS::SNS::Topic","Properties":{"TopicName":"cfn-t"}},
3107        "B":{"Type":"AWS::S3::Bucket","Properties":{"BucketName":"cfn-bucket"}}
3108    }}"#;
3109
3110    fn create_req(stack: &str) -> AwsRequest {
3111        let mut p = HashMap::new();
3112        p.insert("StackName".to_string(), stack.to_string());
3113        p.insert("TemplateBody".to_string(), PERSIST_TEMPLATE.to_string());
3114        make_request("CreateStack", p)
3115    }
3116
3117    #[tokio::test]
3118    async fn cfn_create_persists_touched_services_and_writes_bucket_to_store() {
3119        let tmp = tempfile::tempdir().unwrap();
3120        let store = disk_s3_store(&tmp);
3121        let counter = Arc::new(AtomicUsize::new(0));
3122        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3123            BTreeMap::new();
3124        hooks.insert("sqs", counting_hook(counter.clone()));
3125        hooks.insert("sns", counting_hook(counter.clone()));
3126        // Registered but not in the template -> must not fire.
3127        hooks.insert("lambda", counting_hook(counter.clone()));
3128        let svc = make_service()
3129            .with_s3_store(store.clone())
3130            .with_snapshot_hooks(hooks);
3131
3132        svc.create_stack(&create_req("probe")).await.unwrap();
3133
3134        // sqs + sns fired once each; lambda untouched.
3135        assert_eq!(counter.load(Ordering::SeqCst), 2);
3136        // The bucket was written through to the S3 store, not just the in-memory map.
3137        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3138        assert!(
3139            loaded.buckets.contains_key("cfn-bucket"),
3140            "CFN bucket should be persisted to the S3 store"
3141        );
3142    }
3143
3144    #[tokio::test]
3145    async fn cfn_delete_persists_touched_services_and_removes_bucket_from_store() {
3146        let tmp = tempfile::tempdir().unwrap();
3147        let store = disk_s3_store(&tmp);
3148        let counter = Arc::new(AtomicUsize::new(0));
3149        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3150            BTreeMap::new();
3151        hooks.insert("sqs", counting_hook(counter.clone()));
3152        hooks.insert("sns", counting_hook(counter.clone()));
3153        let svc = make_service()
3154            .with_s3_store(store.clone())
3155            .with_snapshot_hooks(hooks);
3156
3157        svc.create_stack(&create_req("probe")).await.unwrap();
3158        assert_eq!(counter.load(Ordering::SeqCst), 2, "create fired sqs + sns");
3159
3160        let mut p = HashMap::new();
3161        p.insert("StackName".to_string(), "probe".to_string());
3162        svc.delete_stack(&make_request("DeleteStack", p))
3163            .await
3164            .unwrap();
3165
3166        // Delete fired the touched services again (sqs + sns).
3167        assert_eq!(counter.load(Ordering::SeqCst), 4, "delete fired sqs + sns");
3168        // And the CFN-deleted bucket is gone from the store, so it does not
3169        // reappear after a restart.
3170        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3171        assert!(
3172            !loaded.buckets.contains_key("cfn-bucket"),
3173            "CFN-deleted bucket should be removed from the S3 store"
3174        );
3175    }
3176
3177    #[tokio::test]
3178    async fn cfn_persist_skips_services_without_a_registered_hook() {
3179        // Only "sqs" has a hook; the stack also touches SNS and S3. The missing
3180        // hooks must be silently skipped (no panic), and "sqs" fires once.
3181        let tmp = tempfile::tempdir().unwrap();
3182        let store = disk_s3_store(&tmp);
3183        let counter = Arc::new(AtomicUsize::new(0));
3184        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3185            BTreeMap::new();
3186        hooks.insert("sqs", counting_hook(counter.clone()));
3187        let svc = make_service()
3188            .with_s3_store(store.clone())
3189            .with_snapshot_hooks(hooks);
3190
3191        svc.create_stack(&create_req("probe")).await.unwrap();
3192        assert_eq!(counter.load(Ordering::SeqCst), 1, "only sqs has a hook");
3193    }
3194
3195    #[tokio::test]
3196    async fn cfn_update_persists_touched_services() {
3197        // Create with just SQS, then update to a template that adds SNS + a
3198        // bucket; the update must persist the services it touches.
3199        let tmp = tempfile::tempdir().unwrap();
3200        let store = disk_s3_store(&tmp);
3201        let counter = Arc::new(AtomicUsize::new(0));
3202        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3203            BTreeMap::new();
3204        hooks.insert("sqs", counting_hook(counter.clone()));
3205        hooks.insert("sns", counting_hook(counter.clone()));
3206        let svc = make_service()
3207            .with_s3_store(store.clone())
3208            .with_snapshot_hooks(hooks);
3209
3210        let mut create = HashMap::new();
3211        create.insert("StackName".to_string(), "upd".to_string());
3212        create.insert(
3213            "TemplateBody".to_string(),
3214            r#"{"Resources":{"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"u-q"}}}}"#
3215                .to_string(),
3216        );
3217        svc.create_stack(&make_request("CreateStack", create))
3218            .await
3219            .unwrap();
3220        let after_create = counter.load(Ordering::SeqCst);
3221
3222        let mut update = HashMap::new();
3223        update.insert("StackName".to_string(), "upd".to_string());
3224        update.insert("TemplateBody".to_string(), PERSIST_TEMPLATE.to_string());
3225        svc.update_stack(&make_request("UpdateStack", update))
3226            .await
3227            .unwrap();
3228
3229        // The update touched at least SNS (added); the hook count must grow.
3230        assert!(
3231            counter.load(Ordering::SeqCst) > after_create,
3232            "update should persist the services it touched"
3233        );
3234        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3235        assert!(loaded.buckets.contains_key("cfn-bucket"));
3236    }
3237
3238    #[tokio::test]
3239    async fn cfn_execute_change_set_persists_touched_services() {
3240        // The changeset path -- CreateChangeSet + ExecuteChangeSet -- is how
3241        // `cdk deploy`, `aws cloudformation deploy`, and SAM provision. It must
3242        // write provisioned services through to disk the same way CreateStack
3243        // does, or the resources report CREATE_COMPLETE yet vanish on restart
3244        // (bug-audit 2026-06-20, 0.A1 / #1766 class).
3245        let tmp = tempfile::tempdir().unwrap();
3246        let store = disk_s3_store(&tmp);
3247        let counter = Arc::new(AtomicUsize::new(0));
3248        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3249            BTreeMap::new();
3250        hooks.insert("sqs", counting_hook(counter.clone()));
3251        let svc = make_service()
3252            .with_s3_store(store.clone())
3253            .with_snapshot_hooks(hooks);
3254
3255        let mut create = HashMap::new();
3256        create.insert("StackName".to_string(), "cs-stack".to_string());
3257        create.insert("ChangeSetName".to_string(), "cs1".to_string());
3258        create.insert("ChangeSetType".to_string(), "CREATE".to_string());
3259        create.insert(
3260            "TemplateBody".to_string(),
3261            r#"{"Resources":{"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"cs-q"}}}}"#
3262                .to_string(),
3263        );
3264        svc.handle(make_request("CreateChangeSet", create))
3265            .await
3266            .unwrap();
3267        // CreateChangeSet doesn't provision -- it must not persist a service yet.
3268        let before = counter.load(Ordering::SeqCst);
3269
3270        let mut exec = HashMap::new();
3271        exec.insert("StackName".to_string(), "cs-stack".to_string());
3272        exec.insert("ChangeSetName".to_string(), "cs1".to_string());
3273        svc.handle(make_request("ExecuteChangeSet", exec))
3274            .await
3275            .unwrap();
3276
3277        assert!(
3278            counter.load(Ordering::SeqCst) > before,
3279            "ExecuteChangeSet must fire the sqs snapshot hook so the provisioned \
3280             queue survives a restart"
3281        );
3282    }
3283
3284    #[test]
3285    fn service_key_for_type_maps_services_and_aliases() {
3286        // Direct service segments.
3287        assert_eq!(
3288            service_key_for_type("AWS::Lambda::Function"),
3289            Some("lambda")
3290        );
3291        assert_eq!(
3292            service_key_for_type("AWS::SecretsManager::Secret"),
3293            Some("secretsmanager")
3294        );
3295        assert_eq!(service_key_for_type("AWS::SQS::Queue"), Some("sqs"));
3296        assert_eq!(service_key_for_type("AWS::IAM::Role"), Some("iam"));
3297        assert_eq!(
3298            service_key_for_type("AWS::StepFunctions::StateMachine"),
3299            Some("stepfunctions")
3300        );
3301        // Namespace aliases that differ from the fakecloud service name.
3302        assert_eq!(
3303            service_key_for_type("AWS::Events::Rule"),
3304            Some("eventbridge")
3305        );
3306        assert_eq!(service_key_for_type("AWS::Logs::LogGroup"), Some("logs"));
3307        // S3 has no snapshot hook (it persists via the S3Store write-through).
3308        assert_eq!(service_key_for_type("AWS::S3::Bucket"), None);
3309        // Non-snapshot-backed services.
3310        assert_eq!(
3311            service_key_for_type("AWS::CertificateManager::Certificate"),
3312            None
3313        );
3314        // Malformed / non-AWS types.
3315        assert_eq!(service_key_for_type("AWS::Lambda"), None);
3316        assert_eq!(service_key_for_type("Custom::Thing::Resource"), None);
3317        assert_eq!(service_key_for_type("AWS"), None);
3318        assert_eq!(service_key_for_type(""), None);
3319    }
3320
3321    #[tokio::test]
3322    async fn persist_touched_services_noop_with_empty_hooks() {
3323        // No registered hooks -> nothing to do, must not panic.
3324        let hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> = BTreeMap::new();
3325        persist_touched_services(&hooks, vec!["AWS::SQS::Queue".to_string()]).await;
3326    }
3327
3328    #[tokio::test]
3329    async fn cfn_bucket_policy_write_through_create_update_delete() {
3330        let tmp = tempfile::tempdir().unwrap();
3331        let store = disk_s3_store(&tmp);
3332        let svc = make_service().with_s3_store(store.clone());
3333
3334        // Create a bucket + bucket policy.
3335        let mut create = HashMap::new();
3336        create.insert("StackName".to_string(), "pol".to_string());
3337        create.insert(
3338            "TemplateBody".to_string(),
3339            r#"{"Resources":{
3340                "B":{"Type":"AWS::S3::Bucket","Properties":{"BucketName":"pol-bucket"}},
3341                "BP":{"Type":"AWS::S3::BucketPolicy","Properties":{"Bucket":"pol-bucket","PolicyDocument":{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:GetObject","Resource":"*","Principal":"*"}]}}}
3342            }}"#
3343            .to_string(),
3344        );
3345        svc.create_stack(&make_request("CreateStack", create))
3346            .await
3347            .unwrap();
3348        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3349        let policy = loaded.buckets["pol-bucket"]
3350            .subresources
3351            .get("policy.toml")
3352            .cloned()
3353            .expect("bucket policy persisted on create");
3354        assert!(policy.contains("s3:GetObject"));
3355
3356        // Update the policy document; the update must write through.
3357        let mut update = HashMap::new();
3358        update.insert("StackName".to_string(), "pol".to_string());
3359        update.insert(
3360            "TemplateBody".to_string(),
3361            r#"{"Resources":{
3362                "B":{"Type":"AWS::S3::Bucket","Properties":{"BucketName":"pol-bucket"}},
3363                "BP":{"Type":"AWS::S3::BucketPolicy","Properties":{"Bucket":"pol-bucket","PolicyDocument":{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:PutObject","Resource":"*","Principal":"*"}]}}}
3364            }}"#
3365            .to_string(),
3366        );
3367        svc.update_stack(&make_request("UpdateStack", update))
3368            .await
3369            .unwrap();
3370        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3371        let policy = loaded.buckets["pol-bucket"]
3372            .subresources
3373            .get("policy.toml")
3374            .cloned()
3375            .expect("bucket policy still persisted after update");
3376        assert!(
3377            policy.contains("s3:PutObject"),
3378            "updated policy should be written through"
3379        );
3380
3381        // Delete the stack; the bucket (and its policy) must be removed from disk.
3382        let mut del = HashMap::new();
3383        del.insert("StackName".to_string(), "pol".to_string());
3384        svc.delete_stack(&make_request("DeleteStack", del))
3385            .await
3386            .unwrap();
3387        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3388        assert!(
3389            !loaded.buckets.contains_key("pol-bucket"),
3390            "CFN-deleted bucket and policy should be gone from the store"
3391        );
3392    }
3393}