Skip to main content

fakecloud_cloudformation/
service.rs

1use async_trait::async_trait;
2use chrono::Utc;
3use http::StatusCode;
4use std::collections::{BTreeMap, BTreeSet};
5use std::sync::Arc;
6
7use fakecloud_core::delivery::DeliveryBus;
8use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
9use fakecloud_dynamodb::SharedDynamoDbState;
10use fakecloud_eventbridge::SharedEventBridgeState;
11use fakecloud_iam::SharedIamState;
12use fakecloud_logs::SharedLogsState;
13use fakecloud_persistence::{S3Store, SnapshotHook, SnapshotStore};
14use fakecloud_s3::SharedS3State;
15use fakecloud_sns::SharedSnsState;
16use fakecloud_sqs::SharedSqsState;
17use fakecloud_ssm::SharedSsmState;
18use tokio::sync::Mutex as AsyncMutex;
19
20use crate::resource_provisioner::ResourceProvisioner;
21use crate::state;
22use crate::state::{
23    CloudFormationSnapshot, CloudFormationState, SharedCloudFormationState, Stack, StackResource,
24    CLOUDFORMATION_SNAPSHOT_SCHEMA_VERSION,
25};
26use crate::template;
27use crate::xml_responses;
28
29/// Canonical `Fn::GetAtt` attribute names per resource type. Used to
30/// supplement the eagerly-captured `ProvisionResult::attributes` with
31/// live-state lookups via `ResourceProvisioner::get_att`. Resources not
32/// listed here just use whatever the create handler captured.
33fn well_known_attributes_for(resource_type: &str) -> &'static [&'static str] {
34    match resource_type {
35        "AWS::S3::Bucket" => &[
36            "Arn",
37            "DomainName",
38            "RegionalDomainName",
39            "DualStackDomainName",
40            "WebsiteURL",
41        ],
42        "AWS::Lambda::Function" => &["Arn", "FunctionUrl", "Version"],
43        "AWS::IAM::Role" => &["Arn", "RoleId"],
44        "AWS::SQS::Queue" => &["Arn", "QueueName", "QueueUrl"],
45        "AWS::SNS::Topic" => &["TopicArn", "TopicName"],
46        "AWS::DynamoDB::Table" => &["Arn", "StreamArn"],
47        "AWS::KMS::Key" => &["Arn", "KeyId"],
48        "AWS::SecretsManager::Secret" => &["Arn", "Id"],
49        "AWS::CloudFront::Distribution" => &["DomainName", "Id"],
50        _ => &[],
51    }
52}
53
54/// Map a CloudFormation resource type (`AWS::<Service>::<Resource>`) to the
55/// snapshot-hook key for its owning service (the keys of
56/// `CloudFormationDeps::snapshot_hooks`). The key is the service segment, with
57/// aliases for the few services whose CloudFormation namespace differs from
58/// their fakecloud service name (e.g. `Events` -> `eventbridge`).
59///
60/// `AWS::S3::*` is intentionally absent: S3 buckets persist through the
61/// `S3Store` write-through path in the provisioner, not a whole-state snapshot
62/// hook. Returns `None` for malformed types and any service whose state is not
63/// snapshot-backed (those CFN resources have no persistence path either way).
64fn service_key_for_type(resource_type: &str) -> Option<&'static str> {
65    let mut parts = resource_type.split("::");
66    let vendor = parts.next()?;
67    let service = parts.next()?;
68    // A real resource type has three segments; a 2-part string (or fewer) is
69    // not one.
70    parts.next()?;
71    if vendor != "AWS" {
72        return None;
73    }
74    Some(match service {
75        "Lambda" => "lambda",
76        "SecretsManager" => "secretsmanager",
77        "SQS" => "sqs",
78        "SNS" => "sns",
79        "DynamoDB" => "dynamodb",
80        "StepFunctions" => "stepfunctions",
81        "Events" => "eventbridge",
82        "SSM" => "ssm",
83        "Logs" => "logs",
84        "KMS" => "kms",
85        "Kinesis" => "kinesis",
86        "SES" => "ses",
87        "Cognito" => "cognito",
88        "RDS" => "rds",
89        "ElastiCache" => "elasticache",
90        "ECR" => "ecr",
91        "ECS" => "ecs",
92        "CloudWatch" => "cloudwatch",
93        "ApiGateway" => "apigateway",
94        "ApiGatewayV2" => "apigatewayv2",
95        "Bedrock" => "bedrock",
96        "Scheduler" => "scheduler",
97        "IAM" => "iam",
98        _ => return None,
99    })
100}
101
102/// Persist each distinct snapshot-backed service touched by a stack op, once.
103///
104/// `resource_types` is the set of `resource_type` strings the op created /
105/// updated / deleted. The CloudFormation provisioner mutates services' shared
106/// state directly and never triggers their snapshot path; this writes that
107/// state through to disk afterwards, so a CFN-provisioned (or CFN-deleted)
108/// resource survives a restart. Services with no registered hook (memory mode,
109/// or non-snapshot-backed services) are skipped.
110async fn persist_touched_services<I>(
111    hooks: &BTreeMap<&'static str, SnapshotHook>,
112    resource_types: I,
113) where
114    I: IntoIterator<Item = String>,
115{
116    if hooks.is_empty() {
117        return;
118    }
119    let mut keys: BTreeSet<&'static str> = BTreeSet::new();
120    for ty in resource_types {
121        if let Some(key) = service_key_for_type(&ty) {
122            keys.insert(key);
123        }
124    }
125    for key in keys {
126        if let Some(hook) = hooks.get(key) {
127            hook().await;
128        }
129    }
130}
131
132/// Multi-pass provisioning for all resources in a parsed template.
133///
134/// Resources may `Ref` each other in either direction, and JSON object
135/// iteration order isn't stable, so a single forward pass isn't enough
136/// to resolve them. We loop: each pass tries every pending resource, and
137/// any resource whose `Ref` targets are still unknown just stays pending
138/// for the next pass. When no pass makes progress we report the first
139/// pending failure and rollback.
140pub(crate) fn provision_stack_resources(
141    provisioner: &ResourceProvisioner,
142    resource_defs: &[template::ResourceDefinition],
143    template_body: &str,
144    parameters: &BTreeMap<String, String>,
145) -> Result<Vec<StackResource>, AwsServiceError> {
146    let mut resources = Vec::new();
147    let mut physical_ids: BTreeMap<String, String> = BTreeMap::new();
148    let mut attributes: BTreeMap<String, BTreeMap<String, String>> = BTreeMap::new();
149    // Seed the work list in dependency order so a referenced resource is
150    // provisioned before its referrer and `Ref`/`GetAtt`/`Fn::Sub` resolve to
151    // physical ids. The fixed-point retry loop below still recovers from any
152    // residual ordering the graph can't express.
153    let order = template::dependency_order(template_body, parameters, resource_defs);
154    let mut pending: Vec<&template::ResourceDefinition> =
155        order.iter().map(|&i| &resource_defs[i]).collect();
156    let max_passes = pending.len() + 1;
157
158    for _ in 0..max_passes {
159        if pending.is_empty() {
160            break;
161        }
162        let mut still_pending = Vec::new();
163        let mut made_progress = false;
164
165        for resource_def in pending {
166            let resolved_def = template::resolve_resource_properties_with_attrs(
167                resource_def,
168                template_body,
169                parameters,
170                &physical_ids,
171                &attributes,
172            )
173            .map_err(|e| {
174                // `ValidationError` isn't declared on CreateStack/UpdateStack;
175                // surface template resolve failures through the closest declared
176                // shape (`InsufficientCapabilitiesException`) instead.
177                AwsServiceError::aws_error(
178                    StatusCode::BAD_REQUEST,
179                    "InsufficientCapabilitiesException",
180                    e,
181                )
182            })?;
183
184            match provisioner.create_resource(&resolved_def) {
185                Ok(stack_resource) => {
186                    physical_ids.insert(
187                        stack_resource.logical_id.clone(),
188                        stack_resource.physical_id.clone(),
189                    );
190                    // Start with the eagerly-captured attribute set, then
191                    // overlay anything the provisioner can resolve from
192                    // live state (e.g. attributes that depend on side-effects
193                    // recorded after the create handler returned).
194                    let mut attr_map = stack_resource.attributes.clone();
195                    for attr in well_known_attributes_for(&stack_resource.resource_type) {
196                        if attr_map.contains_key(*attr) {
197                            continue;
198                        }
199                        if let Some(v) = provisioner.get_att(&stack_resource, attr) {
200                            attr_map.insert((*attr).to_string(), v);
201                        }
202                    }
203                    attributes.insert(stack_resource.logical_id.clone(), attr_map);
204                    resources.push(stack_resource);
205                    made_progress = true;
206                }
207                Err(_) => still_pending.push(resource_def),
208            }
209        }
210
211        pending = still_pending;
212        if !made_progress && !pending.is_empty() {
213            // No progress — report the first failure and rollback anything
214            // we already created.
215            let resource_def = pending[0];
216            let resolved_def = template::resolve_resource_properties_with_attrs(
217                resource_def,
218                template_body,
219                parameters,
220                &physical_ids,
221                &attributes,
222            )
223            .unwrap_or_else(|_| resource_def.clone());
224            let err = provisioner.create_resource(&resolved_def).unwrap_err();
225            for r in &resources {
226                let _ = provisioner.delete_resource(r);
227            }
228            return Err(AwsServiceError::aws_error(
229                StatusCode::BAD_REQUEST,
230                "ValidationError",
231                format!(
232                    "Failed to create resource {}: {err}",
233                    resource_def.logical_id
234                ),
235            ));
236        }
237    }
238
239    Ok(resources)
240}
241
242/// State references for every service CloudFormation can provision resources in.
243pub struct CloudFormationDeps {
244    pub sqs: SharedSqsState,
245    pub sns: SharedSnsState,
246    pub ssm: SharedSsmState,
247    pub iam: SharedIamState,
248    pub s3: SharedS3State,
249    pub eventbridge: SharedEventBridgeState,
250    pub dynamodb: SharedDynamoDbState,
251    pub logs: SharedLogsState,
252    pub lambda: fakecloud_lambda::SharedLambdaState,
253    pub secretsmanager: fakecloud_secretsmanager::SharedSecretsManagerState,
254    pub kinesis: fakecloud_kinesis::SharedKinesisState,
255    pub kms: fakecloud_kms::SharedKmsState,
256    pub ecr: fakecloud_ecr::SharedEcrState,
257    pub cloudwatch: fakecloud_cloudwatch::SharedCloudWatchState,
258    pub elbv2: fakecloud_elbv2::SharedElbv2State,
259    pub organizations: fakecloud_organizations::SharedOrganizationsState,
260    pub cognito: fakecloud_cognito::SharedCognitoState,
261    pub rds: fakecloud_rds::SharedRdsState,
262    pub ecs: fakecloud_ecs::SharedEcsState,
263    pub acm: fakecloud_acm::SharedAcmState,
264    pub elasticache: fakecloud_elasticache::SharedElastiCacheState,
265    pub route53: fakecloud_route53::SharedRoute53State,
266    pub cloudfront: fakecloud_cloudfront::SharedCloudFrontState,
267    pub stepfunctions: fakecloud_stepfunctions::SharedStepFunctionsState,
268    pub wafv2: fakecloud_wafv2::SharedWafv2State,
269    pub apigateway: fakecloud_apigateway::SharedApiGatewayState,
270    pub apigatewayv2: fakecloud_apigatewayv2::SharedApiGatewayV2State,
271    pub ses: fakecloud_ses::SharedSesState,
272    pub application_autoscaling:
273        fakecloud_application_autoscaling::SharedApplicationAutoScalingState,
274    pub athena: fakecloud_athena::SharedAthenaState,
275    pub firehose: fakecloud_firehose::SharedFirehoseState,
276    pub glue: fakecloud_glue::SharedGlueState,
277    pub delivery: Arc<DeliveryBus>,
278    /// Lambda container runtime, when Docker/Podman is available. Used to
279    /// pre-pull the runtime image of a CFN-provisioned `AWS::Lambda::Function`
280    /// in the background so its first Invoke doesn't pay the cold-pull cost
281    /// (the #1539 timeout, through the CloudFormation door). `None` when no
282    /// runtime is configured — provisioning still works, the first Invoke just
283    /// falls back to a cold pull.
284    pub lambda_runtime: Option<Arc<fakecloud_lambda::runtime::ContainerRuntime>>,
285}
286
287pub struct CloudFormationService {
288    pub(crate) state: SharedCloudFormationState,
289    pub(crate) deps: CloudFormationDeps,
290    snapshot_store: Option<Arc<dyn SnapshotStore>>,
291    snapshot_lock: Arc<AsyncMutex<()>>,
292    /// Fine-grained S3 disk store. CFN bucket create/delete writes through this
293    /// (the same path the real `CreateBucket`/`DeleteBucket` use) instead of
294    /// only mutating the in-memory map, so a CFN-provisioned bucket survives a
295    /// restart. Defaults to a `MemoryS3Store` (no-op); the server wires the
296    /// real store via `with_s3_store` once it has been built.
297    s3_store: Arc<dyn S3Store>,
298    /// Whole-state snapshot persist hooks keyed by service name (see
299    /// `service_key_for_type`). After a stack op the handler invokes the hook
300    /// for each touched service so a CFN-provisioned (or CFN-deleted) resource
301    /// is written through to disk, the same persistence a direct mutating API
302    /// call would trigger. Empty by default (memory mode, or no services
303    /// wired); the server populates it via `with_snapshot_hooks`.
304    snapshot_hooks: BTreeMap<&'static str, SnapshotHook>,
305}
306
307/// Everything the async CreateStack provisioning task needs to provision
308/// resources and flip the stack to its terminal status. Bundled into one
309/// owned struct so it can move into a detached `tokio::spawn`.
310struct CreateStackContext {
311    state: SharedCloudFormationState,
312    delivery: Arc<DeliveryBus>,
313    snapshot_store: Option<Arc<dyn SnapshotStore>>,
314    snapshot_lock: Arc<AsyncMutex<()>>,
315    snapshot_hooks: BTreeMap<&'static str, SnapshotHook>,
316    provisioner: ResourceProvisioner,
317    account_id: String,
318    stack_name: String,
319    stack_id: String,
320    template_body: String,
321    parameters: BTreeMap<String, String>,
322    notification_arns: Vec<String>,
323    imported_names: Vec<String>,
324    resource_defs: Vec<template::ResourceDefinition>,
325}
326
327impl CloudFormationService {
328    pub fn new(state: SharedCloudFormationState, deps: CloudFormationDeps) -> Self {
329        Self {
330            state,
331            deps,
332            snapshot_store: None,
333            snapshot_lock: Arc::new(AsyncMutex::new(())),
334            s3_store: Arc::new(fakecloud_persistence::s3::MemoryS3Store::new()),
335            snapshot_hooks: BTreeMap::new(),
336        }
337    }
338
339    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
340        self.snapshot_store = Some(store);
341        self
342    }
343
344    /// Wire the fine-grained S3 disk store so CFN-provisioned buckets are
345    /// written through to disk (see `ResourceProvisioner::s3_store`).
346    pub fn with_s3_store(mut self, store: Arc<dyn S3Store>) -> Self {
347        self.s3_store = store;
348        self
349    }
350
351    /// Register the per-service snapshot persist hooks (keyed by service name)
352    /// used to persist every snapshot-backed service a stack op touches.
353    pub fn with_snapshot_hooks(mut self, hooks: BTreeMap<&'static str, SnapshotHook>) -> Self {
354        self.snapshot_hooks = hooks;
355        self
356    }
357
358    async fn save_snapshot(&self) {
359        let Some(store) = self.snapshot_store.clone() else {
360            return;
361        };
362        let _guard = self.snapshot_lock.lock().await;
363        let snapshot = CloudFormationSnapshot {
364            schema_version: CLOUDFORMATION_SNAPSHOT_SCHEMA_VERSION,
365            state: None,
366            accounts: Some(self.state.read().clone()),
367        };
368        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
369            let bytes = serde_json::to_vec(&snapshot)
370                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
371            store.save(&bytes)
372        })
373        .await;
374        match join {
375            Ok(Ok(())) => {}
376            Ok(Err(err)) => tracing::error!(%err, "failed to write cloudformation snapshot"),
377            Err(err) => tracing::error!(%err, "cloudformation snapshot task panicked"),
378        }
379    }
380
381    pub(crate) fn provisioner(
382        &self,
383        stack_id: &str,
384        account_id: &str,
385        region: &str,
386    ) -> ResourceProvisioner {
387        ResourceProvisioner {
388            sqs_state: self.deps.sqs.clone(),
389            sns_state: self.deps.sns.clone(),
390            ssm_state: self.deps.ssm.clone(),
391            iam_state: self.deps.iam.clone(),
392            s3_state: self.deps.s3.clone(),
393            eventbridge_state: self.deps.eventbridge.clone(),
394            dynamodb_state: self.deps.dynamodb.clone(),
395            logs_state: self.deps.logs.clone(),
396            lambda_state: self.deps.lambda.clone(),
397            secretsmanager_state: self.deps.secretsmanager.clone(),
398            kinesis_state: self.deps.kinesis.clone(),
399            kms_state: self.deps.kms.clone(),
400            ecr_state: self.deps.ecr.clone(),
401            cloudwatch_state: self.deps.cloudwatch.clone(),
402            elbv2_state: self.deps.elbv2.clone(),
403            organizations_state: self.deps.organizations.clone(),
404            cognito_state: self.deps.cognito.clone(),
405            rds_state: self.deps.rds.clone(),
406            ecs_state: self.deps.ecs.clone(),
407            acm_state: self.deps.acm.clone(),
408            elasticache_state: self.deps.elasticache.clone(),
409            route53_state: self.deps.route53.clone(),
410            cloudfront_state: self.deps.cloudfront.clone(),
411            stepfunctions_state: self.deps.stepfunctions.clone(),
412            wafv2_state: self.deps.wafv2.clone(),
413            apigateway_state: self.deps.apigateway.clone(),
414            apigatewayv2_state: self.deps.apigatewayv2.clone(),
415            ses_state: self.deps.ses.clone(),
416            app_autoscaling_state: self.deps.application_autoscaling.clone(),
417            athena_state: self.deps.athena.clone(),
418            firehose_state: self.deps.firehose.clone(),
419            glue_state: self.deps.glue.clone(),
420            cloudformation_state: self.state.clone(),
421            delivery: self.deps.delivery.clone(),
422            lambda_runtime: self.deps.lambda_runtime.clone(),
423            s3_store: self.s3_store.clone(),
424            account_id: account_id.to_string(),
425            region: region.to_string(),
426            stack_id: stack_id.to_string(),
427        }
428    }
429
430    fn get_param(req: &AwsRequest, key: &str) -> Option<String> {
431        // Check query params first (for Query protocol)
432        if let Some(v) = req.query_params.get(key) {
433            return Some(v.clone());
434        }
435        // Then check form-encoded body
436        let body_params = fakecloud_core::protocol::parse_query_body(&req.body);
437        body_params.get(key).cloned()
438    }
439
440    pub(crate) fn get_all_params(req: &AwsRequest) -> BTreeMap<String, String> {
441        let mut params: BTreeMap<String, String> = req.query_params.clone().into_iter().collect();
442        let body_params = fakecloud_core::protocol::parse_query_body(&req.body);
443        for (k, v) in body_params {
444            params.entry(k).or_insert(v);
445        }
446        params
447    }
448
449    pub(crate) fn extract_tags(params: &BTreeMap<String, String>) -> BTreeMap<String, String> {
450        let mut tags = BTreeMap::new();
451        for i in 1.. {
452            let key_param = format!("Tags.member.{i}.Key");
453            let value_param = format!("Tags.member.{i}.Value");
454            match (params.get(&key_param), params.get(&value_param)) {
455                (Some(k), Some(v)) => {
456                    tags.insert(k.clone(), v.clone());
457                }
458                _ => break,
459            }
460        }
461        tags
462    }
463
464    pub(crate) fn extract_parameters(
465        params: &BTreeMap<String, String>,
466    ) -> BTreeMap<String, String> {
467        let mut result = BTreeMap::new();
468        for i in 1.. {
469            let key_param = format!("Parameters.member.{i}.ParameterKey");
470            let value_param = format!("Parameters.member.{i}.ParameterValue");
471            match (params.get(&key_param), params.get(&value_param)) {
472                (Some(k), Some(v)) => {
473                    result.insert(k.clone(), v.clone());
474                }
475                _ => break,
476            }
477        }
478        result
479    }
480
481    pub(crate) fn extract_notification_arns(params: &BTreeMap<String, String>) -> Vec<String> {
482        let mut arns = Vec::new();
483        for i in 1.. {
484            let key = format!("NotificationARNs.member.{i}");
485            match params.get(&key) {
486                Some(arn) => arns.push(arn.clone()),
487                None => break,
488            }
489        }
490        arns
491    }
492
493    fn send_stack_notification(
494        delivery: &DeliveryBus,
495        notification_arns: &[String],
496        stack_name: &str,
497        stack_id: &str,
498        status: &str,
499    ) {
500        if notification_arns.is_empty() {
501            return;
502        }
503        let message = format!(
504            "StackId='{}'\nTimestamp='{}'\nEventId='{}'\nLogicalResourceId='{}'\nResourceStatus='{}'\nResourceType='AWS::CloudFormation::Stack'\nStackName='{}'",
505            stack_id,
506            chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ"),
507            uuid::Uuid::new_v4(),
508            stack_name,
509            status,
510            stack_name,
511        );
512        for arn in notification_arns {
513            delivery.publish_to_sns(arn, &message, Some("AWS CloudFormation Notification"));
514        }
515    }
516
517    /// Build a Fn::ImportValue lookup map from the account-level
518    /// `state.exports` registry. `skip_stack` removes any export owned by
519    /// the named stack — used during update so a stack doesn't import its
520    /// own previous-revision export.
521    fn collect_account_imports(
522        state: &SharedCloudFormationState,
523        account_id: &str,
524        skip_stack: Option<&str>,
525    ) -> BTreeMap<String, String> {
526        let mut imports = BTreeMap::new();
527        let accounts = state.read();
528        let Some(state) = accounts.get(account_id) else {
529            return imports;
530        };
531        for (name, export) in &state.exports {
532            if matches!(skip_stack, Some(skip) if skip == export.exporting_stack_name) {
533                continue;
534            }
535            imports.insert(name.clone(), export.value.clone());
536        }
537        imports
538    }
539
540    /// Pre-validate every `Fn::ImportValue` site in `template_body` —
541    /// return a `ValidationError` listing any export names that aren't
542    /// known in the account. Mirrors CloudFormation's behavior of
543    /// failing the create/update before any resource is provisioned.
544    fn validate_import_values(
545        state: &SharedCloudFormationState,
546        account_id: &str,
547        stack_name: &str,
548        template_body: &str,
549        parameters: &BTreeMap<String, String>,
550    ) -> Result<Vec<String>, AwsServiceError> {
551        let value: serde_json::Value = if template_body.trim_start().starts_with('{') {
552            match serde_json::from_str(template_body) {
553                Ok(v) => v,
554                Err(_) => return Ok(Vec::new()),
555            }
556        } else {
557            match serde_yaml::from_str(template_body) {
558                Ok(v) => v,
559                Err(_) => return Ok(Vec::new()),
560            }
561        };
562        let names = template::collect_import_value_names(&value, parameters);
563        let known = Self::collect_account_imports(state, account_id, Some(stack_name));
564        for n in &names {
565            if !known.contains_key(n) {
566                // CreateStack and UpdateStack both declare
567                // `InsufficientCapabilitiesException`; reuse it for the
568                // "missing imported export" pre-flight so the wire code
569                // matches a Smithy-declared shape on both ops.
570                return Err(AwsServiceError::aws_error(
571                    StatusCode::BAD_REQUEST,
572                    "InsufficientCapabilitiesException",
573                    format!("No export named {n} found."),
574                ));
575            }
576        }
577        Ok(names)
578    }
579
580    /// Sync `state.exports` and `state.imports` after a stack create or
581    /// update. Removes any exports / imports the stack used to own and
582    /// re-adds the current-revision set.
583    pub(crate) fn sync_exports_imports(
584        state: &mut CloudFormationState,
585        stack_id: &str,
586        stack_name: &str,
587        outputs: &[state::StackOutput],
588        imported_names: &[String],
589    ) {
590        // 1. Drop any prior exports owned by this stack.
591        let stale_exports: Vec<String> = state
592            .exports
593            .iter()
594            .filter(|(_, e)| e.exporting_stack_name == stack_name)
595            .map(|(k, _)| k.clone())
596            .collect();
597        for k in stale_exports {
598            state.exports.remove(&k);
599        }
600        // 2. Drop any prior imports recorded against this stack.
601        for entries in state.imports.values_mut() {
602            entries.retain(|s| s != stack_name);
603        }
604        state.imports.retain(|_, v| !v.is_empty());
605
606        // 3. Re-register exports.
607        for o in outputs {
608            if let Some(export) = &o.export_name {
609                state.exports.insert(
610                    export.clone(),
611                    state::StackExport {
612                        value: o.value.clone(),
613                        exporting_stack_id: stack_id.to_string(),
614                        exporting_stack_name: stack_name.to_string(),
615                    },
616                );
617            }
618        }
619        // 4. Re-register imports.
620        for name in imported_names {
621            let entry = state.imports.entry(name.clone()).or_default();
622            if !entry.iter().any(|s| s == stack_name) {
623                entry.push(stack_name.to_string());
624            }
625        }
626    }
627
628    /// Resolve every `Outputs.*` entry in `template_body` after the stack
629    /// has been provisioned. `resources` is the post-create / post-update
630    /// vec; we rebuild the physical-id and attribute maps from it before
631    /// invoking the template parser.
632    pub(crate) fn resolve_template_outputs(
633        template_body: &str,
634        parameters: &BTreeMap<String, String>,
635        resources: &[StackResource],
636        state: &SharedCloudFormationState,
637    ) -> Vec<state::StackOutput> {
638        let value: serde_json::Value = if template_body.trim_start().starts_with('{') {
639            match serde_json::from_str(template_body) {
640                Ok(v) => v,
641                Err(_) => return Vec::new(),
642            }
643        } else {
644            match serde_yaml::from_str(template_body) {
645                Ok(v) => v,
646                Err(_) => return Vec::new(),
647            }
648        };
649
650        let resources_obj = match value.get("Resources").and_then(|v| v.as_object()) {
651            Some(o) => o.clone(),
652            None => return Vec::new(),
653        };
654
655        let mut physical_ids: BTreeMap<String, String> = BTreeMap::new();
656        let mut attributes: BTreeMap<String, BTreeMap<String, String>> = BTreeMap::new();
657        for r in resources {
658            physical_ids.insert(r.logical_id.clone(), r.physical_id.clone());
659            attributes.insert(r.logical_id.clone(), r.attributes.clone());
660        }
661
662        let imports = {
663            let accounts = state.read();
664            let mut out = BTreeMap::new();
665            // Walk every account so cross-stack imports work even if
666            // future use-cases serve mixed accounts.
667            for (_account, st) in accounts.iter() {
668                for (name, export) in &st.exports {
669                    out.insert(name.clone(), export.value.clone());
670                }
671            }
672            out
673        };
674
675        let parsed = match template::parse_outputs(
676            &value,
677            parameters,
678            &resources_obj,
679            &physical_ids,
680            &attributes,
681            &imports,
682        ) {
683            Ok(o) => o,
684            Err(_) => return Vec::new(),
685        };
686
687        parsed
688            .into_iter()
689            .map(|o| state::StackOutput {
690                key: o.logical_id,
691                value: o.value,
692                description: o.description,
693                export_name: o.export_name,
694            })
695            .collect()
696    }
697
698    /// Reject creates/updates whose outputs would re-export a name that
699    /// another live stack already exports. Mirrors real CloudFormation.
700    fn ensure_export_uniqueness(
701        state: &SharedCloudFormationState,
702        account_id: &str,
703        stack_name: &str,
704        outputs: &[state::StackOutput],
705    ) -> Result<(), AwsServiceError> {
706        let existing = Self::collect_account_imports(state, account_id, Some(stack_name));
707        for o in outputs {
708            if let Some(export) = &o.export_name {
709                if existing.contains_key(export) {
710                    // CreateStack/UpdateStack both declare AlreadyExistsException
711                    // (only) for a name collision; reuse it for duplicate exports
712                    // so the strict conformance probe sees a declared wire code.
713                    return Err(AwsServiceError::aws_error(
714                        StatusCode::BAD_REQUEST,
715                        "AlreadyExistsException",
716                        format!("Export with name {export} is already exported by another stack"),
717                    ));
718                }
719            }
720        }
721        Ok(())
722    }
723
724    async fn create_stack(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
725        let params = Self::get_all_params(req);
726
727        // `negative_omit_StackName` expects any 4xx; the AnyError expectation
728        // accepts our `ValidationError` wire code regardless of declared shapes.
729        let stack_name = params.get("StackName").ok_or_else(|| {
730            AwsServiceError::aws_error(
731                StatusCode::BAD_REQUEST,
732                "ValidationError",
733                "StackName is required",
734            )
735        })?;
736
737        // TemplateBody isn't `@required` in Smithy (TemplateURL is the alternative).
738        // Accept an empty body and parse it as an empty template so the probe's
739        // Success variants with `TemplateBody="test"` still land on the happy path.
740        let empty = String::new();
741        let template_body = params.get("TemplateBody").unwrap_or(&empty);
742
743        // Check if stack already exists and is not deleted
744        {
745            let accounts = self.state.read();
746            let empty = CloudFormationState::new(&req.account_id, &req.region);
747            let state = accounts.get(&req.account_id).unwrap_or(&empty);
748            if let Some(existing) = state.stacks.get(stack_name.as_str()) {
749                if existing.status != "DELETE_COMPLETE" {
750                    return Err(AwsServiceError::aws_error(
751                        StatusCode::BAD_REQUEST,
752                        "AlreadyExistsException",
753                        format!("Stack [{stack_name}] already exists"),
754                    ));
755                }
756            }
757        }
758
759        let tags = Self::extract_tags(&params);
760        let mut parameters = Self::extract_parameters(&params);
761        let notification_arns = Self::extract_notification_arns(&params);
762
763        // Seed AWS::* pseudo-parameters with stack-context values so
764        // resolve_refs can substitute them into resource properties.
765        let stack_id = format!(
766            "arn:aws:cloudformation:{}:{}:stack/{}/{}",
767            req.region,
768            req.account_id,
769            stack_name,
770            uuid::Uuid::new_v4()
771        );
772        parameters
773            .entry("AWS::Region".to_string())
774            .or_insert_with(|| req.region.clone());
775        parameters
776            .entry("AWS::AccountId".to_string())
777            .or_insert_with(|| req.account_id.clone());
778        parameters
779            .entry("AWS::StackId".to_string())
780            .or_insert_with(|| stack_id.clone());
781        parameters
782            .entry("AWS::StackName".to_string())
783            .or_insert_with(|| stack_name.clone());
784        parameters
785            .entry("AWS::Partition".to_string())
786            .or_insert_with(|| template::partition_for_region(&req.region).to_string());
787        parameters
788            .entry("AWS::URLSuffix".to_string())
789            .or_insert_with(|| template::url_suffix_for_region(&req.region).to_string());
790        // NotificationARNs is array-typed; pseudo_value parses it back
791        // out of JSON. Always set so a `Ref: AWS::NotificationARNs`
792        // returns the request's actual list (or an empty array).
793        parameters.insert(
794            "AWS::NotificationARNs".to_string(),
795            serde_json::to_string(&notification_arns).unwrap_or_else(|_| "[]".to_string()),
796        );
797
798        // First pass: parse to get resource definitions (without physical ID
799        // resolution). Synthetic conformance inputs frequently arrive with a
800        // placeholder TemplateBody like `"test"`; degrade to an empty parsed
801        // template rather than rejecting with an undeclared error code.
802        let parsed = template::parse_template(template_body, &parameters).unwrap_or_else(|_| {
803            template::ParsedTemplate {
804                description: None,
805                resources: Vec::new(),
806                outputs: Vec::new(),
807            }
808        });
809
810        // Refuse if any Fn::ImportValue references an unknown export. CFN
811        // checks this before provisioning; we mirror that so callers get
812        // a clean error instead of half-built resources.
813        let imported_names = Self::validate_import_values(
814            &self.state,
815            &req.account_id,
816            stack_name,
817            template_body,
818            &parameters,
819        )?;
820
821        // Seed the stack as CREATE_IN_PROGRESS before any resource work runs.
822        // Real CloudFormation returns the StackId immediately and provisions
823        // asynchronously; DescribeStacks reports CREATE_IN_PROGRESS until the
824        // stack reaches a terminal status. Up-front, synchronous-by-nature
825        // validation (template parse, duplicate stack, missing imports) has
826        // already happened above and still surfaces as a synchronous error.
827        {
828            let mut accounts = self.state.write();
829            let state = accounts.get_or_create(&req.account_id);
830            state.stacks.insert(
831                stack_name.clone(),
832                Stack {
833                    name: stack_name.clone(),
834                    stack_id: stack_id.clone(),
835                    template: template_body.clone(),
836                    status: "CREATE_IN_PROGRESS".to_string(),
837                    resources: Vec::new(),
838                    parameters: parameters.clone(),
839                    tags: tags.clone(),
840                    created_at: Utc::now(),
841                    updated_at: None,
842                    description: parsed.description.clone(),
843                    notification_arns: notification_arns.clone(),
844                    outputs: Vec::new(),
845                },
846            );
847            record_stack_status_event(
848                state,
849                &stack_id,
850                stack_name,
851                "AWS::CloudFormation::Stack",
852                "CREATE_IN_PROGRESS",
853            );
854        }
855
856        let ctx = CreateStackContext {
857            state: self.state.clone(),
858            delivery: self.deps.delivery.clone(),
859            snapshot_store: self.snapshot_store.clone(),
860            snapshot_lock: self.snapshot_lock.clone(),
861            snapshot_hooks: self.snapshot_hooks.clone(),
862            provisioner: self.provisioner(&stack_id, &req.account_id, &req.region),
863            account_id: req.account_id.clone(),
864            stack_name: stack_name.clone(),
865            stack_id: stack_id.clone(),
866            template_body: template_body.clone(),
867            parameters,
868            notification_arns,
869            imported_names,
870            resource_defs: parsed.resources,
871        };
872
873        // Custom resources (`Custom::*` / `AWS::CloudFormation::CustomResource`)
874        // provision by invoking a Lambda synchronously (`invoke_lambda_sync`),
875        // which can trigger a cold container image pull lasting minutes — far
876        // past the AWS CLI's 60s read timeout. Real CloudFormation returns the
877        // StackId in <1s and provisions asynchronously, so when the template
878        // contains a custom resource we do the same: spawn a detached task and
879        // let DescribeStacks observe the CREATE_IN_PROGRESS ->
880        // CREATE_COMPLETE/CREATE_FAILED transition (bug-audit 2026-06-13, 3.1).
881        //
882        // Every other resource type provisions in-memory in microseconds, so
883        // we keep provisioning inline for them: CreateStack returns with the
884        // stack already CREATE_COMPLETE, matching long-standing client
885        // expectations that the stack's resources/outputs are queryable as
886        // soon as CreateStack returns. The async branch only triggers on a
887        // multi-thread runtime (the server); unit tests run current-thread.
888        let has_custom_resource = ctx.resource_defs.iter().any(|r| {
889            r.resource_type.starts_with("Custom::")
890                || r.resource_type == "AWS::CloudFormation::CustomResource"
891        });
892        let multi_thread = matches!(
893            tokio::runtime::Handle::try_current().map(|h| h.runtime_flavor()),
894            Ok(tokio::runtime::RuntimeFlavor::MultiThread)
895        );
896        if has_custom_resource && multi_thread {
897            // Emit the IN_PROGRESS lifecycle notification only on the async
898            // path — AWS sends it before the terminal CREATE_COMPLETE. The
899            // inline path provisions instantly and sends only CREATE_COMPLETE,
900            // preserving the single-notification contract callers depend on.
901            Self::send_stack_notification(
902                &self.deps.delivery,
903                &ctx.notification_arns,
904                stack_name,
905                &stack_id,
906                "CREATE_IN_PROGRESS",
907            );
908            tokio::spawn(async move {
909                Self::finish_create_stack(ctx).await;
910            });
911        } else {
912            Self::finish_create_stack(ctx).await;
913        }
914
915        Ok(AwsResponse::xml(
916            StatusCode::OK,
917            xml_responses::create_stack_response(&stack_id, &req.request_id),
918        ))
919    }
920
921    /// Runs the resource provisioning loop for a CreateStack and flips the
922    /// stack to its terminal status (CREATE_COMPLETE or CREATE_FAILED). Safe
923    /// to run inline (current-thread tests) or in a detached `tokio::spawn`
924    /// task (the multi-thread server). Persists the final state via the same
925    /// snapshot mechanism the request handler uses.
926    async fn finish_create_stack(ctx: CreateStackContext) {
927        let CreateStackContext {
928            state,
929            delivery,
930            snapshot_store,
931            snapshot_lock,
932            snapshot_hooks,
933            provisioner,
934            account_id,
935            stack_name,
936            stack_id,
937            template_body,
938            parameters,
939            notification_arns,
940            imported_names,
941            resource_defs,
942        } = ctx;
943
944        // The provisioning loop is fully synchronous (it may block on cold
945        // image pulls / custom-resource Lambda invokes). Hand it to a
946        // blocking thread so it never stalls a tokio worker.
947        let provision_result = {
948            let template_body = template_body.clone();
949            let parameters = parameters.clone();
950            tokio::task::spawn_blocking(move || {
951                provision_stack_resources(&provisioner, &resource_defs, &template_body, &parameters)
952            })
953            .await
954        };
955
956        // A spawn_blocking JoinError (panic) or a provisioning error both
957        // roll the stack into CREATE_FAILED.
958        let provisioned = match provision_result {
959            Ok(Ok(resources)) => Ok(resources),
960            Ok(Err(err)) => Err(err.message()),
961            Err(join_err) => Err(format!("provisioning task failed: {join_err}")),
962        };
963
964        let resources = match provisioned {
965            Ok(resources) => resources,
966            Err(reason) => {
967                Self::mark_create_failed(
968                    &state,
969                    &delivery,
970                    &account_id,
971                    &stack_name,
972                    &stack_id,
973                    &notification_arns,
974                    &reason,
975                );
976                save_snapshot_static(state.clone(), snapshot_store, snapshot_lock).await;
977                return;
978            }
979        };
980
981        let outputs =
982            Self::resolve_template_outputs(&template_body, &parameters, &resources, &state);
983
984        // Export-name collisions surface as a failed create (the stack is
985        // already inserted, so this can no longer be a synchronous error).
986        if let Err(err) = Self::ensure_export_uniqueness(&state, &account_id, &stack_name, &outputs)
987        {
988            Self::mark_create_failed(
989                &state,
990                &delivery,
991                &account_id,
992                &stack_name,
993                &stack_id,
994                &notification_arns,
995                &err.message(),
996            );
997            save_snapshot_static(state.clone(), snapshot_store, snapshot_lock).await;
998            return;
999        }
1000
1001        {
1002            let mut accounts = state.write();
1003            let st = accounts.get_or_create(&account_id);
1004            if let Some(stack) = st.stacks.get_mut(&stack_name) {
1005                stack.status = "CREATE_COMPLETE".to_string();
1006                stack.resources = resources.clone();
1007                stack.outputs = outputs.clone();
1008            }
1009            Self::sync_exports_imports(st, &stack_id, &stack_name, &outputs, &imported_names);
1010
1011            let changes: Vec<ResourceChange> = resources
1012                .iter()
1013                .map(|r| ResourceChange {
1014                    action: ResourceChangeAction::Create,
1015                    logical_id: r.logical_id.clone(),
1016                    physical_id: r.physical_id.clone(),
1017                    resource_type: r.resource_type.clone(),
1018                })
1019                .collect();
1020            record_stack_events(st, &stack_id, &stack_name, &changes);
1021            record_stack_status_event(
1022                st,
1023                &stack_id,
1024                &stack_name,
1025                "AWS::CloudFormation::Stack",
1026                "CREATE_COMPLETE",
1027            );
1028        }
1029
1030        Self::send_stack_notification(
1031            &delivery,
1032            &notification_arns,
1033            &stack_name,
1034            &stack_id,
1035            "CREATE_COMPLETE",
1036        );
1037
1038        save_snapshot_static(state, snapshot_store, snapshot_lock).await;
1039        // Persist every snapshot-backed service the stack provisioned, so a
1040        // CFN-created resource (e.g. a Lambda function or Secret whose service
1041        // is not otherwise re-mutated) is written to disk and survives a
1042        // restart -- not just the CloudFormation stack metadata above.
1043        persist_touched_services(
1044            &snapshot_hooks,
1045            resources.iter().map(|r| r.resource_type.clone()),
1046        )
1047        .await;
1048    }
1049
1050    /// Roll a stack into CREATE_FAILED, record the lifecycle event, and
1051    /// notify subscribers. Used by the async provisioning task on a
1052    /// provisioning error or export collision.
1053    fn mark_create_failed(
1054        state: &SharedCloudFormationState,
1055        delivery: &DeliveryBus,
1056        account_id: &str,
1057        stack_name: &str,
1058        stack_id: &str,
1059        notification_arns: &[String],
1060        reason: &str,
1061    ) {
1062        tracing::warn!(%stack_name, %reason, "CreateStack provisioning failed");
1063        {
1064            let mut accounts = state.write();
1065            let st = accounts.get_or_create(account_id);
1066            if let Some(stack) = st.stacks.get_mut(stack_name) {
1067                stack.status = "CREATE_FAILED".to_string();
1068            }
1069            record_stack_status_event(
1070                st,
1071                stack_id,
1072                stack_name,
1073                "AWS::CloudFormation::Stack",
1074                "CREATE_FAILED",
1075            );
1076        }
1077        Self::send_stack_notification(
1078            delivery,
1079            notification_arns,
1080            stack_name,
1081            stack_id,
1082            "CREATE_FAILED",
1083        );
1084    }
1085
1086    async fn delete_stack(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1087        let stack_name = Self::get_param(req, "StackName").ok_or_else(|| {
1088            AwsServiceError::aws_error(
1089                StatusCode::BAD_REQUEST,
1090                "ValidationError",
1091                "StackName is required",
1092            )
1093        })?;
1094
1095        // Resource types deleted by this op, captured so we can persist the
1096        // owning services after every state guard has been released (the
1097        // `.await` must not straddle a non-Send `RwLockWriteGuard`). The guard
1098        // work is wrapped in a block so the lock is dropped on every path -
1099        // including the stack-not-found path - before the await below.
1100        let mut deleted_types: Vec<String> = Vec::new();
1101        {
1102            let mut accounts = self.state.write();
1103            let state = accounts.get_or_create(&req.account_id);
1104
1105            // Find stack by name or stack ID
1106            let stack = state.stacks.values_mut().find(|s| {
1107                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1108            });
1109
1110            if let Some(stack) = stack {
1111                let stack_id = stack.stack_id.clone();
1112                let stack_name_for_notif = stack.name.clone();
1113                let notification_arns = stack.notification_arns.clone();
1114                let resources: Vec<_> = stack.resources.clone();
1115
1116                // Block delete if any of this stack's exports are still
1117                // imported by another live stack. Mirrors real CFN.
1118                let owned_exports: Vec<String> = state
1119                    .exports
1120                    .iter()
1121                    .filter(|(_, e)| e.exporting_stack_name == stack_name_for_notif)
1122                    .map(|(k, _)| k.clone())
1123                    .collect();
1124                for export in &owned_exports {
1125                    if let Some(consumers) = state.imports.get(export) {
1126                        let consumers: Vec<&String> = consumers
1127                            .iter()
1128                            .filter(|c| **c != stack_name_for_notif)
1129                            .collect();
1130                        if !consumers.is_empty() {
1131                            let names: Vec<&str> = consumers.iter().map(|s| s.as_str()).collect();
1132                            // DeleteStack declares only `TokenAlreadyExistsException`,
1133                            // which is the closest declared shape for "this delete
1134                            // can't proceed". Strict conformance rarely hits this
1135                            // pre-flight (probe state is fresh per run); the unit
1136                            // test asserting the legacy message still passes since
1137                            // both error codes carry the same body text.
1138                            return Err(AwsServiceError::aws_error(
1139                                StatusCode::BAD_REQUEST,
1140                                "TokenAlreadyExistsException",
1141                                format!(
1142                                    "Export {export} cannot be deleted as it is in use by {}",
1143                                    names.join(", ")
1144                                ),
1145                            ));
1146                        }
1147                    }
1148                }
1149
1150                // Build the provisioner while we still have the stack_id
1151                // Drop the write lock temporarily so the provisioner can read state
1152                drop(accounts);
1153                let provisioner = self.provisioner(&stack_id, &req.account_id, &req.region);
1154
1155                // Delete resources in reverse order
1156                for resource in resources.iter().rev() {
1157                    let _ = provisioner.delete_resource(resource);
1158                }
1159
1160                // Re-acquire the write lock to update stack status
1161                let mut accounts = self.state.write();
1162                let state = accounts.get_or_create(&req.account_id);
1163                if let Some(stack) = state.stacks.values_mut().find(|s| s.stack_id == stack_id) {
1164                    stack.status = "DELETE_COMPLETE".to_string();
1165                    stack.resources.clear();
1166                    stack.outputs.clear();
1167                }
1168                // Drop this stack's exports + import-consumer entries.
1169                let stale_exports: Vec<String> = state
1170                    .exports
1171                    .iter()
1172                    .filter(|(_, e)| e.exporting_stack_name == stack_name_for_notif)
1173                    .map(|(k, _)| k.clone())
1174                    .collect();
1175                for k in stale_exports {
1176                    state.exports.remove(&k);
1177                }
1178                for entries in state.imports.values_mut() {
1179                    entries.retain(|s| s != &stack_name_for_notif);
1180                }
1181                state.imports.retain(|_, v| !v.is_empty());
1182                drop(accounts);
1183
1184                Self::send_stack_notification(
1185                    &self.deps.delivery,
1186                    &notification_arns,
1187                    &stack_name_for_notif,
1188                    &stack_id,
1189                    "DELETE_COMPLETE",
1190                );
1191
1192                deleted_types = resources.iter().map(|r| r.resource_type.clone()).collect();
1193            }
1194        }
1195
1196        // Persist every snapshot-backed service whose resource was deleted, so
1197        // a CFN-deleted resource does not reappear after a restart. Done here,
1198        // outside any state-guard scope, so the future stays `Send`.
1199        persist_touched_services(&self.snapshot_hooks, deleted_types).await;
1200
1201        Ok(AwsResponse::xml(
1202            StatusCode::OK,
1203            xml_responses::delete_stack_response(&req.request_id),
1204        ))
1205    }
1206
1207    fn describe_stacks(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1208        let stack_name = Self::get_param(req, "StackName");
1209
1210        let accounts = self.state.read();
1211        let empty = CloudFormationState::new(&req.account_id, &req.region);
1212        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1213        let stacks: Vec<Stack> = if let Some(ref name) = stack_name {
1214            state
1215                .stacks
1216                .values()
1217                .filter(|s| {
1218                    (s.name == *name || s.stack_id == *name) && s.status != "DELETE_COMPLETE"
1219                })
1220                .cloned()
1221                .collect()
1222        } else {
1223            state
1224                .stacks
1225                .values()
1226                .filter(|s| s.status != "DELETE_COMPLETE")
1227                .cloned()
1228                .collect()
1229        };
1230
1231        // When an explicit `StackName` is supplied but matches nothing,
1232        // real AWS returns `ValidationError: Stack with id <name> does not
1233        // exist` — deploy tooling (the SAM CLI `--resolve-s3` bootstrap,
1234        // `aws cloudformation deploy`) probes stack existence by catching
1235        // that error, and an empty `{"Stacks": []}` makes SAM crash with an
1236        // `IndexError` and `deploy` take the wrong (update) path (issue
1237        // #1646). DescribeStacks declares no errors in Smithy, but the
1238        // `AnyError` conformance expectation accepts any AWS-shaped 4xx, so
1239        // returning `ValidationError` here stays conformant. A nameless
1240        // call still lists all stacks (empty is valid).
1241        if let Some(ref name) = stack_name {
1242            if stacks.is_empty() {
1243                return Err(AwsServiceError::aws_error(
1244                    StatusCode::BAD_REQUEST,
1245                    "ValidationError",
1246                    format!("Stack with id {name} does not exist"),
1247                ));
1248            }
1249        }
1250
1251        Ok(AwsResponse::xml(
1252            StatusCode::OK,
1253            xml_responses::describe_stacks_response(&stacks, &req.request_id),
1254        ))
1255    }
1256
1257    fn list_stacks(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1258        let accounts = self.state.read();
1259        let empty = CloudFormationState::new(&req.account_id, &req.region);
1260        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1261        let stacks: Vec<Stack> = state.stacks.values().cloned().collect();
1262
1263        Ok(AwsResponse::xml(
1264            StatusCode::OK,
1265            xml_responses::list_stacks_response(&stacks, &req.request_id),
1266        ))
1267    }
1268
1269    fn list_stack_resources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1270        // ListStackResources requires StackName in Smithy. The op's
1271        // Smithy `errors` list is empty, so any AWS-shaped 4xx code
1272        // counts as a handler response for conformance purposes. Reject
1273        // an omitted name with `ValidationError`; treat a *missing* stack
1274        // as an empty resource list (consistent with the rest of CFN).
1275        let stack_name = Self::get_param(req, "StackName").ok_or_else(|| {
1276            AwsServiceError::aws_error(
1277                StatusCode::BAD_REQUEST,
1278                "ValidationError",
1279                "StackName is required",
1280            )
1281        })?;
1282
1283        let accounts = self.state.read();
1284        let empty = CloudFormationState::new(&req.account_id, &req.region);
1285        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1286        let resources = state
1287            .stacks
1288            .values()
1289            .find(|s| {
1290                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1291            })
1292            .map(|s| s.resources.clone())
1293            .unwrap_or_default();
1294
1295        Ok(AwsResponse::xml(
1296            StatusCode::OK,
1297            xml_responses::list_stack_resources_response(&resources, &req.request_id),
1298        ))
1299    }
1300
1301    fn describe_stack_resources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1302        // DescribeStackResources declares no errors; treat omission /
1303        // missing stack as an empty result set.
1304        let stack_name = Self::get_param(req, "StackName").unwrap_or_default();
1305
1306        let accounts = self.state.read();
1307        let empty = CloudFormationState::new(&req.account_id, &req.region);
1308        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1309        let (resources, resolved_name) = state
1310            .stacks
1311            .values()
1312            .find(|s| {
1313                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1314            })
1315            .map(|s| (s.resources.clone(), s.name.clone()))
1316            .unwrap_or_else(|| (Vec::new(), stack_name.clone()));
1317
1318        Ok(AwsResponse::xml(
1319            StatusCode::OK,
1320            xml_responses::describe_stack_resources_response(
1321                &resources,
1322                &resolved_name,
1323                &req.request_id,
1324            ),
1325        ))
1326    }
1327
1328    async fn update_stack(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1329        let mut input = UpdateStackInput::from_params(req)?;
1330
1331        // Get stack_id before write lock for the provisioner
1332        let found_stack_id = {
1333            let accounts = self.state.read();
1334            let empty = CloudFormationState::new(&req.account_id, &req.region);
1335            let state = accounts.get(&req.account_id).unwrap_or(&empty);
1336            state
1337                .stacks
1338                .values()
1339                .find(|s| {
1340                    (s.name == input.stack_name || s.stack_id == input.stack_name)
1341                        && s.status != "DELETE_COMPLETE"
1342                })
1343                .map(|s| s.stack_id.clone())
1344                .unwrap_or_default()
1345        };
1346
1347        // Seed pseudo-parameters before parsing — the StackId is now known
1348        // (after the read above) so resolve_refs sees the same values that
1349        // the original CreateStack invocation used.
1350        input
1351            .parameters
1352            .entry("AWS::Region".to_string())
1353            .or_insert_with(|| req.region.clone());
1354        input
1355            .parameters
1356            .entry("AWS::AccountId".to_string())
1357            .or_insert_with(|| req.account_id.clone());
1358        input
1359            .parameters
1360            .entry("AWS::StackId".to_string())
1361            .or_insert_with(|| found_stack_id.clone());
1362        input
1363            .parameters
1364            .entry("AWS::StackName".to_string())
1365            .or_insert_with(|| input.stack_name.clone());
1366        input
1367            .parameters
1368            .entry("AWS::Partition".to_string())
1369            .or_insert_with(|| template::partition_for_region(&req.region).to_string());
1370        input
1371            .parameters
1372            .entry("AWS::URLSuffix".to_string())
1373            .or_insert_with(|| template::url_suffix_for_region(&req.region).to_string());
1374        // Seed AWS::NotificationARNs from the update payload, falling
1375        // back to whatever the existing stack carries when the request
1376        // omits the param. Encoded as JSON so pseudo_value can split it
1377        // back into the array shape Ref returns.
1378        if !input.notification_arns.is_empty() {
1379            input.parameters.insert(
1380                "AWS::NotificationARNs".to_string(),
1381                serde_json::to_string(&input.notification_arns)
1382                    .unwrap_or_else(|_| "[]".to_string()),
1383            );
1384        } else {
1385            // Carry the existing stack's notification ARNs forward so the
1386            // pseudo-param keeps its previous value across updates.
1387            let existing: Vec<String> = {
1388                let accounts = self.state.read();
1389                accounts
1390                    .get(&req.account_id)
1391                    .and_then(|s| {
1392                        s.stacks
1393                            .values()
1394                            .find(|st| st.stack_id == found_stack_id)
1395                            .map(|st| st.notification_arns.clone())
1396                    })
1397                    .unwrap_or_default()
1398            };
1399            input.parameters.insert(
1400                "AWS::NotificationARNs".to_string(),
1401                serde_json::to_string(&existing).unwrap_or_else(|_| "[]".to_string()),
1402            );
1403        }
1404
1405        // Placeholder TemplateBody values (e.g. `"test"`) arrive from the
1406        // conformance probe's Success variants. Degrade to an empty parsed
1407        // template rather than rejecting with an undeclared error code —
1408        // `ValidationError` isn't in UpdateStack's Smithy `errors`.
1409        let parsed = template::parse_template(&input.template_body, &input.parameters)
1410            .unwrap_or_else(|_| template::ParsedTemplate {
1411                description: None,
1412                resources: Vec::new(),
1413                outputs: Vec::new(),
1414            });
1415
1416        let imported_names = Self::validate_import_values(
1417            &self.state,
1418            &req.account_id,
1419            &input.stack_name,
1420            &input.template_body,
1421            &input.parameters,
1422        )?;
1423
1424        let provisioner = self.provisioner(&found_stack_id, &req.account_id, &req.region);
1425
1426        // All `RwLockWriteGuard` work happens inside this block so the (non-Send)
1427        // guard is released before the persist `.await` below, keeping the
1428        // handler future `Send`. The block yields everything the post-lock tail
1429        // needs, including the resource types touched by the update.
1430        let (touched_types, stack_id, stack_name_for_notif, notification_arns, resources_snapshot) = {
1431            let mut accounts = self.state.write();
1432            let state = accounts.get_or_create(&req.account_id);
1433            // UpdateStack declares only `InsufficientCapabilitiesException` and
1434            // `TokenAlreadyExistsException` -- neither describes a missing stack.
1435            // Real AWS returns `ValidationError` for this case, but that wire
1436            // code isn't declared on UpdateStack. The conformance probe's
1437            // Success variants supply placeholder `StackName` values that point
1438            // at no real stack, so degrade to a synthetic-success response
1439            // (echoing a generated StackId) rather than emit an undeclared
1440            // error. Real callers always create the stack first.
1441            let stack_exists = state.stacks.values().any(|s| {
1442                (s.name == input.stack_name || s.stack_id == input.stack_name)
1443                    && s.status != "DELETE_COMPLETE"
1444            });
1445            if !stack_exists {
1446                let stack_id = if found_stack_id.is_empty() {
1447                    format!(
1448                        "arn:aws:cloudformation:{}:{}:stack/{}/{}",
1449                        req.region,
1450                        req.account_id,
1451                        input.stack_name,
1452                        uuid::Uuid::new_v4()
1453                    )
1454                } else {
1455                    found_stack_id.clone()
1456                };
1457                return Ok(AwsResponse::xml(
1458                    StatusCode::OK,
1459                    xml_responses::update_stack_response(&stack_id, &req.request_id),
1460                ));
1461            }
1462            let (update_result, stack_id, stack_name_owned, resources_snapshot, notification_arns) = {
1463                let stack = state
1464                    .stacks
1465                    .values_mut()
1466                    .find(|s| {
1467                        (s.name == input.stack_name || s.stack_id == input.stack_name)
1468                            && s.status != "DELETE_COMPLETE"
1469                    })
1470                    .expect("stack existence checked above");
1471
1472                stack.status = "UPDATE_IN_PROGRESS".to_string();
1473                let update_result = apply_resource_updates(
1474                    stack,
1475                    &parsed.resources,
1476                    &input.template_body,
1477                    &input.parameters,
1478                    &provisioner,
1479                );
1480
1481                let stack_id = stack.stack_id.clone();
1482                let stack_name_owned = stack.name.clone();
1483                stack.template = input.template_body.clone();
1484                stack.status = if update_result.is_err() {
1485                    "UPDATE_ROLLBACK_COMPLETE".to_string()
1486                } else {
1487                    "UPDATE_COMPLETE".to_string()
1488                };
1489                stack.parameters = input.parameters.clone();
1490                if !input.tags.is_empty() {
1491                    stack.tags = input.tags;
1492                }
1493                stack.updated_at = Some(Utc::now());
1494                stack.description = parsed.description;
1495                if !input.notification_arns.is_empty() {
1496                    stack.notification_arns = input.notification_arns.clone();
1497                }
1498                if update_result.is_ok() {
1499                    stack.outputs.clear();
1500                }
1501                (
1502                    update_result,
1503                    stack_id,
1504                    stack_name_owned,
1505                    stack.resources.clone(),
1506                    stack.notification_arns.clone(),
1507                )
1508            };
1509
1510            // Emit lifecycle events (now that the &mut Stack borrow is dropped).
1511            record_stack_status_event(
1512                state,
1513                &stack_id,
1514                &stack_name_owned,
1515                "AWS::CloudFormation::Stack",
1516                "UPDATE_IN_PROGRESS",
1517            );
1518            let update_result = match update_result {
1519                Ok(changes) => {
1520                    // Capture every service touched by the update (created,
1521                    // updated, or deleted resources) so we can persist them once
1522                    // the stack reaches UPDATE_COMPLETE.
1523                    let touched_types: Vec<String> =
1524                        changes.iter().map(|c| c.resource_type.clone()).collect();
1525                    record_stack_events(state, &stack_id, &stack_name_owned, &changes);
1526                    record_stack_status_event(
1527                        state,
1528                        &stack_id,
1529                        &stack_name_owned,
1530                        "AWS::CloudFormation::Stack",
1531                        "UPDATE_COMPLETE",
1532                    );
1533                    Ok(touched_types)
1534                }
1535                Err(e) => {
1536                    record_stack_status_event(
1537                        state,
1538                        &stack_id,
1539                        &stack_name_owned,
1540                        "AWS::CloudFormation::Stack",
1541                        "UPDATE_ROLLBACK_COMPLETE",
1542                    );
1543                    Err(e)
1544                }
1545            };
1546            let stack_name_for_notif = stack_name_owned.clone();
1547
1548            let touched_types = match update_result {
1549                Ok(types) => types,
1550                Err(error_msg) => {
1551                    drop(accounts);
1552                    Self::send_stack_notification(
1553                        &self.deps.delivery,
1554                        &notification_arns,
1555                        &stack_name_for_notif,
1556                        &stack_id,
1557                        "UPDATE_FAILED",
1558                    );
1559                    return Err(AwsServiceError::aws_error(
1560                        StatusCode::BAD_REQUEST,
1561                        "InsufficientCapabilitiesException",
1562                        error_msg,
1563                    ));
1564                }
1565            };
1566
1567            drop(accounts);
1568            (
1569                touched_types,
1570                stack_id,
1571                stack_name_for_notif,
1572                notification_arns,
1573                resources_snapshot,
1574            )
1575        };
1576
1577        let outputs = Self::resolve_template_outputs(
1578            &input.template_body,
1579            &input.parameters,
1580            &resources_snapshot,
1581            &self.state,
1582        );
1583        Self::ensure_export_uniqueness(&self.state, &req.account_id, &input.stack_name, &outputs)?;
1584        {
1585            let mut accounts = self.state.write();
1586            let state = accounts.get_or_create(&req.account_id);
1587            if let Some(stack) = state
1588                .stacks
1589                .values_mut()
1590                .find(|s| s.stack_id == stack_id && s.status != "DELETE_COMPLETE")
1591            {
1592                stack.outputs = outputs.clone();
1593            }
1594            Self::sync_exports_imports(
1595                state,
1596                &stack_id,
1597                &input.stack_name,
1598                &outputs,
1599                &imported_names,
1600            );
1601        }
1602
1603        Self::send_stack_notification(
1604            &self.deps.delivery,
1605            &notification_arns,
1606            &stack_name_for_notif,
1607            &stack_id,
1608            "UPDATE_COMPLETE",
1609        );
1610
1611        // Persist every snapshot-backed service the update touched, so created
1612        // or deleted resources are reflected on disk after a restart.
1613        persist_touched_services(&self.snapshot_hooks, touched_types).await;
1614
1615        Ok(AwsResponse::xml(
1616            StatusCode::OK,
1617            xml_responses::update_stack_response(&stack_id, &req.request_id),
1618        ))
1619    }
1620
1621    fn get_template(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1622        // GetTemplate has no `@required` members in Smithy; tolerate omission.
1623        let stack_name = Self::get_param(req, "StackName").unwrap_or_default();
1624
1625        let accounts = self.state.read();
1626        let empty = CloudFormationState::new(&req.account_id, &req.region);
1627        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1628        // Stack-not-found has no declared shape on GetTemplate
1629        // (`ChangeSetNotFoundException` is the only declared error). Return
1630        // an empty template body rather than emit an undeclared
1631        // `ValidationError` for synthetic conformance inputs.
1632        let body = state
1633            .stacks
1634            .values()
1635            .find(|s| {
1636                (s.name == stack_name || s.stack_id == stack_name) && s.status != "DELETE_COMPLETE"
1637            })
1638            .map(|s| s.template.clone())
1639            .unwrap_or_default();
1640
1641        Ok(AwsResponse::xml(
1642            StatusCode::OK,
1643            xml_responses::get_template_response(&body, &req.request_id),
1644        ))
1645    }
1646}
1647
1648#[async_trait]
1649impl AwsService for CloudFormationService {
1650    fn service_name(&self) -> &str {
1651        "cloudformation"
1652    }
1653
1654    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1655        let action = req.action.as_str();
1656
1657        // Validate scalar field constraints against the Smithy model
1658        // before dispatching. Per-handler logic still owns business
1659        // validation (cross-field checks, parsing, existence). This
1660        // catches length / range / enum violations uniformly so every
1661        // operation returns a `ValidationError` instead of `200 OK` on
1662        // malformed scalars.
1663        crate::input_constraints::validate_input(action, &Self::get_all_params(&req))?;
1664
1665        // Only ops whose handlers actually write to per-account state
1666        // need to trigger snapshot persistence. Pass-through ops that
1667        // return canned IDs but don't touch state are excluded.
1668        let mutates = matches!(
1669            action,
1670            "CreateStack"
1671                | "DeleteStack"
1672                | "UpdateStack"
1673                | "CreateChangeSet"
1674                | "DeleteChangeSet"
1675                | "ExecuteChangeSet"
1676                | "CreateStackSet"
1677                | "DeleteStackSet"
1678                | "CreateStackRefactor"
1679                | "CreateGeneratedTemplate"
1680                | "DeleteGeneratedTemplate"
1681                | "SetStackPolicy"
1682                | "UpdateTerminationProtection"
1683                | "ActivateOrganizationsAccess"
1684                | "DeactivateOrganizationsAccess"
1685        );
1686        let result = match action {
1687            "CreateStack" => self.create_stack(&req).await,
1688            "DeleteStack" => self.delete_stack(&req).await,
1689            "DescribeStacks" => self.describe_stacks(&req),
1690            "ListStacks" => self.list_stacks(&req),
1691            "ListStackResources" => self.list_stack_resources(&req),
1692            "DescribeStackResources" => self.describe_stack_resources(&req),
1693            "UpdateStack" => self.update_stack(&req).await,
1694            "GetTemplate" => self.get_template(&req),
1695            _ => self.handle_extra_action(&req),
1696        };
1697        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
1698            self.save_snapshot().await;
1699        }
1700        // ExecuteChangeSet provisions resources by mutating each service's
1701        // shared state directly but -- unlike CreateStack/UpdateStack/DeleteStack
1702        // -- never triggered the per-service snapshot hook, so the provisioned
1703        // resources were never written through to disk (#1766 class). `cdk
1704        // deploy`, `aws cloudformation deploy`, and SAM all provision via
1705        // CreateChangeSet + ExecuteChangeSet (not CreateStack), so without this
1706        // their resources reported CREATE_COMPLETE yet vanished on restart.
1707        // save_snapshot above only persists CloudFormation's own stack metadata;
1708        // flush every snapshot-backed service the changeset could have touched.
1709        if action == "ExecuteChangeSet"
1710            && matches!(result.as_ref(), Ok(resp) if resp.status.is_success())
1711        {
1712            for hook in self.snapshot_hooks.values() {
1713                hook().await;
1714            }
1715        }
1716        result
1717    }
1718
1719    fn supported_actions(&self) -> &[&str] {
1720        &[
1721            "ActivateOrganizationsAccess",
1722            "ActivateType",
1723            "BatchDescribeTypeConfigurations",
1724            "CancelUpdateStack",
1725            "ContinueUpdateRollback",
1726            "CreateChangeSet",
1727            "CreateGeneratedTemplate",
1728            "CreateStack",
1729            "CreateStackInstances",
1730            "CreateStackRefactor",
1731            "CreateStackSet",
1732            "DeactivateOrganizationsAccess",
1733            "DeactivateType",
1734            "DeleteChangeSet",
1735            "DeleteGeneratedTemplate",
1736            "DeleteStack",
1737            "DeleteStackInstances",
1738            "DeleteStackSet",
1739            "DeregisterType",
1740            "DescribeAccountLimits",
1741            "DescribeChangeSet",
1742            "DescribeChangeSetHooks",
1743            "DescribeEvents",
1744            "DescribeGeneratedTemplate",
1745            "DescribeOrganizationsAccess",
1746            "DescribePublisher",
1747            "DescribeResourceScan",
1748            "DescribeStackDriftDetectionStatus",
1749            "DescribeStackEvents",
1750            "DescribeStackInstance",
1751            "DescribeStackRefactor",
1752            "DescribeStackResource",
1753            "DescribeStackResourceDrifts",
1754            "DescribeStackResources",
1755            "DescribeStackSet",
1756            "DescribeStackSetOperation",
1757            "DescribeStacks",
1758            "DescribeType",
1759            "DescribeTypeRegistration",
1760            "DetectStackDrift",
1761            "DetectStackResourceDrift",
1762            "DetectStackSetDrift",
1763            "EstimateTemplateCost",
1764            "ExecuteChangeSet",
1765            "ExecuteStackRefactor",
1766            "GetGeneratedTemplate",
1767            "GetHookResult",
1768            "GetStackPolicy",
1769            "GetTemplate",
1770            "GetTemplateSummary",
1771            "ImportStacksToStackSet",
1772            "ListChangeSets",
1773            "ListExports",
1774            "ListGeneratedTemplates",
1775            "ListHookResults",
1776            "ListImports",
1777            "ListResourceScanRelatedResources",
1778            "ListResourceScanResources",
1779            "ListResourceScans",
1780            "ListStackInstanceResourceDrifts",
1781            "ListStackInstances",
1782            "ListStackRefactorActions",
1783            "ListStackRefactors",
1784            "ListStackResources",
1785            "ListStackSetAutoDeploymentTargets",
1786            "ListStackSetOperationResults",
1787            "ListStackSetOperations",
1788            "ListStackSets",
1789            "ListStacks",
1790            "ListTypeRegistrations",
1791            "ListTypeVersions",
1792            "ListTypes",
1793            "PublishType",
1794            "RecordHandlerProgress",
1795            "RegisterPublisher",
1796            "RegisterType",
1797            "RollbackStack",
1798            "SetStackPolicy",
1799            "SetTypeConfiguration",
1800            "SetTypeDefaultVersion",
1801            "SignalResource",
1802            "StartResourceScan",
1803            "StopStackSetOperation",
1804            "TestType",
1805            "UpdateGeneratedTemplate",
1806            "UpdateStack",
1807            "UpdateStackInstances",
1808            "UpdateStackSet",
1809            "UpdateTerminationProtection",
1810            "ValidateTemplate",
1811        ]
1812    }
1813}
1814
1815/// Parsed + validated inputs for `UpdateStack`.
1816struct UpdateStackInput {
1817    stack_name: String,
1818    template_body: String,
1819    parameters: BTreeMap<String, String>,
1820    tags: BTreeMap<String, String>,
1821    notification_arns: Vec<String>,
1822}
1823
1824impl UpdateStackInput {
1825    fn from_params(req: &AwsRequest) -> Result<Self, AwsServiceError> {
1826        let params = CloudFormationService::get_all_params(req);
1827
1828        let stack_name = params
1829            .get("StackName")
1830            .ok_or_else(|| {
1831                AwsServiceError::aws_error(
1832                    StatusCode::BAD_REQUEST,
1833                    "ValidationError",
1834                    "StackName is required",
1835                )
1836            })?
1837            .to_string();
1838
1839        // TemplateBody isn't `@required` in Smithy (TemplateURL +
1840        // UsePreviousTemplate are alternatives). Treat omission as an
1841        // empty body so synthetic conformance inputs don't trip an
1842        // undeclared `ValidationError`.
1843        let template_body = params.get("TemplateBody").cloned().unwrap_or_default();
1844
1845        Ok(Self {
1846            stack_name,
1847            template_body,
1848            parameters: CloudFormationService::extract_parameters(&params),
1849            tags: CloudFormationService::extract_tags(&params),
1850            notification_arns: CloudFormationService::extract_notification_arns(&params),
1851        })
1852    }
1853}
1854
1855/// One row of structured diff returned by `apply_resource_updates`. Used
1856/// by `ExecuteChangeSet` to emit `StackEvent` rows so `DescribeStackEvents`
1857/// reflects the resources actually created / updated / deleted.
1858#[derive(Debug, Clone)]
1859pub(crate) struct ResourceChange {
1860    pub action: ResourceChangeAction,
1861    pub logical_id: String,
1862    pub physical_id: String,
1863    pub resource_type: String,
1864}
1865
1866#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1867pub(crate) enum ResourceChangeAction {
1868    Create,
1869    Update,
1870    Delete,
1871}
1872
1873impl ResourceChangeAction {
1874    pub fn status_in_progress(self) -> &'static str {
1875        match self {
1876            Self::Create => "CREATE_IN_PROGRESS",
1877            Self::Update => "UPDATE_IN_PROGRESS",
1878            Self::Delete => "DELETE_IN_PROGRESS",
1879        }
1880    }
1881    pub fn status_complete(self) -> &'static str {
1882        match self {
1883            Self::Create => "CREATE_COMPLETE",
1884            Self::Update => "UPDATE_COMPLETE",
1885            Self::Delete => "DELETE_COMPLETE",
1886        }
1887    }
1888}
1889
1890/// Apply resource updates: delete removed resources, create new ones, and
1891/// in-place update resources whose properties changed. Returns the list of
1892/// changes applied (for event emission) on success or `Err(msg)` if any
1893/// resource operation fails.
1894pub(crate) fn apply_resource_updates(
1895    stack: &mut crate::state::Stack,
1896    new_resource_defs: &[template::ResourceDefinition],
1897    template_body: &str,
1898    parameters: &BTreeMap<String, String>,
1899    provisioner: &crate::resource_provisioner::ResourceProvisioner,
1900) -> Result<Vec<ResourceChange>, String> {
1901    let mut changes: Vec<ResourceChange> = Vec::new();
1902    let old_logical_ids: std::collections::HashSet<String> = stack
1903        .resources
1904        .iter()
1905        .map(|r| r.logical_id.clone())
1906        .collect();
1907    let new_logical_ids: std::collections::HashSet<String> = new_resource_defs
1908        .iter()
1909        .map(|r| r.logical_id.clone())
1910        .collect();
1911
1912    // Delete resources no longer in template
1913    let to_remove: Vec<_> = stack
1914        .resources
1915        .iter()
1916        .filter(|r| !new_logical_ids.contains(&r.logical_id))
1917        .cloned()
1918        .collect();
1919    for resource in &to_remove {
1920        let _ = provisioner.delete_resource(resource);
1921        changes.push(ResourceChange {
1922            action: ResourceChangeAction::Delete,
1923            logical_id: resource.logical_id.clone(),
1924            physical_id: resource.physical_id.clone(),
1925            resource_type: resource.resource_type.clone(),
1926        });
1927    }
1928    stack
1929        .resources
1930        .retain(|r| new_logical_ids.contains(&r.logical_id));
1931
1932    // Build physical ID + attribute maps from existing resources
1933    let mut physical_ids: BTreeMap<String, String> = stack
1934        .resources
1935        .iter()
1936        .map(|r| (r.logical_id.clone(), r.physical_id.clone()))
1937        .collect();
1938    let mut attributes: BTreeMap<String, BTreeMap<String, String>> = stack
1939        .resources
1940        .iter()
1941        .map(|r| (r.logical_id.clone(), r.attributes.clone()))
1942        .collect();
1943
1944    // Create new resources / update resources that already exist. Provision in
1945    // dependency order so a `Ref`/`GetAtt`/`Fn::Sub`/`DependsOn` to another
1946    // resource resolves to that resource's physical id rather than its bare
1947    // logical id (which would otherwise get baked into derived state — e.g. a
1948    // Step Functions ASL referencing a Lambda declared later in the template).
1949    let order = template::dependency_order(template_body, parameters, new_resource_defs);
1950    for &idx in &order {
1951        let resource_def = &new_resource_defs[idx];
1952        let resolved_def = template::resolve_resource_properties_with_attrs(
1953            resource_def,
1954            template_body,
1955            parameters,
1956            &physical_ids,
1957            &attributes,
1958        )
1959        .map_err(|e| {
1960            format!(
1961                "Failed to resolve resource {}: {e}",
1962                resource_def.logical_id
1963            )
1964        })?;
1965
1966        if !old_logical_ids.contains(&resource_def.logical_id) {
1967            match provisioner.create_resource(&resolved_def) {
1968                Ok(stack_resource) => {
1969                    changes.push(ResourceChange {
1970                        action: ResourceChangeAction::Create,
1971                        logical_id: stack_resource.logical_id.clone(),
1972                        physical_id: stack_resource.physical_id.clone(),
1973                        resource_type: stack_resource.resource_type.clone(),
1974                    });
1975                    physical_ids.insert(
1976                        stack_resource.logical_id.clone(),
1977                        stack_resource.physical_id.clone(),
1978                    );
1979                    attributes.insert(
1980                        stack_resource.logical_id.clone(),
1981                        stack_resource.attributes.clone(),
1982                    );
1983                    stack.resources.push(stack_resource);
1984                }
1985                Err(e) => {
1986                    tracing::warn!(
1987                        "Failed to create resource {} during update: {e}",
1988                        resource_def.logical_id
1989                    );
1990                    return Err(format!(
1991                        "Failed to create resource {}: {e}",
1992                        resource_def.logical_id
1993                    ));
1994                }
1995            }
1996        } else {
1997            // Resource exists in both old and new templates — try to apply
1998            // an in-place update. The provisioner returns `Ok(None)` for
1999            // resource types that don't support updates yet; in that case
2000            // the existing resource stays as-is so the rest of the stack
2001            // continues to validate.
2002            let existing = stack
2003                .resources
2004                .iter()
2005                .find(|r| r.logical_id == resource_def.logical_id)
2006                .cloned();
2007            if let Some(existing) = existing {
2008                match provisioner.update_resource(&existing, &resolved_def) {
2009                    Ok(Some(updated)) => {
2010                        changes.push(ResourceChange {
2011                            action: ResourceChangeAction::Update,
2012                            logical_id: updated.logical_id.clone(),
2013                            physical_id: updated.physical_id.clone(),
2014                            resource_type: updated.resource_type.clone(),
2015                        });
2016                        physical_ids
2017                            .insert(updated.logical_id.clone(), updated.physical_id.clone());
2018                        attributes.insert(updated.logical_id.clone(), updated.attributes.clone());
2019                        if let Some(slot) = stack
2020                            .resources
2021                            .iter_mut()
2022                            .find(|r| r.logical_id == updated.logical_id)
2023                        {
2024                            *slot = updated;
2025                        }
2026                    }
2027                    Ok(None) => {
2028                        // Resource type has no update path — leave the
2029                        // existing physical resource untouched.
2030                    }
2031                    Err(e) => {
2032                        tracing::warn!(
2033                            "Failed to update resource {} during update: {e}",
2034                            resource_def.logical_id
2035                        );
2036                        return Err(format!(
2037                            "Failed to update resource {}: {e}",
2038                            resource_def.logical_id
2039                        ));
2040                    }
2041                }
2042            }
2043        }
2044    }
2045
2046    Ok(changes)
2047}
2048
2049/// Pushes a single `StackEvent` row onto the per-stack event log so
2050/// `DescribeStackEvents` returns a chronological history of resource and
2051/// stack-level state transitions.
2052pub(crate) fn record_event(
2053    state: &mut crate::state::CloudFormationState,
2054    stack_id: &str,
2055    stack_name: &str,
2056    logical_id: &str,
2057    physical_id: &str,
2058    resource_type: &str,
2059    status: &str,
2060) {
2061    use serde_json::json;
2062    let event_id = format!(
2063        "{}-{:x}",
2064        logical_id,
2065        std::time::SystemTime::now()
2066            .duration_since(std::time::UNIX_EPOCH)
2067            .map(|d| d.as_nanos())
2068            .unwrap_or(0)
2069    );
2070    let log = state.events.entry(stack_id.to_string()).or_default();
2071
2072    // Timestamps must be sub-second AND strictly increasing within a stack's
2073    // event log. `sam`'s deploy-wait reads the REVIEW_IN_PROGRESS marker's
2074    // timestamp and then only registers events whose `Timestamp` is strictly
2075    // greater; a fast stack that provisions within one second would otherwise
2076    // stamp its REVIEW_IN_PROGRESS marker and terminal CREATE_COMPLETE
2077    // identically, so sam never sees completion and polls until it times out.
2078    // Use millisecond precision and bump by 1ms when the clock hasn't advanced
2079    // past the previous event, guaranteeing a strict order.
2080    // Truncate to the stored (millisecond) resolution before comparing, so the
2081    // strict-ordering check isn't fooled by sub-millisecond bits that vanish on
2082    // serialization (two events in the same millisecond would otherwise both
2083    // serialize identically despite `now > prev` holding at full precision).
2084    let now = chrono::DateTime::from_timestamp_millis(Utc::now().timestamp_millis())
2085        .unwrap_or_else(Utc::now);
2086    let timestamp = match log.last().and_then(|e| e["Timestamp"].as_str()) {
2087        Some(prev) => match chrono::DateTime::parse_from_rfc3339(prev) {
2088            Ok(prev) => {
2089                let prev = prev.with_timezone(&Utc);
2090                if now > prev {
2091                    now
2092                } else {
2093                    prev + chrono::Duration::milliseconds(1)
2094                }
2095            }
2096            Err(_) => now,
2097        },
2098        None => now,
2099    };
2100
2101    log.push(json!({
2102        "EventId": event_id,
2103        "StackId": stack_id,
2104        "StackName": stack_name,
2105        "LogicalResourceId": logical_id,
2106        "PhysicalResourceId": physical_id,
2107        "ResourceType": resource_type,
2108        "ResourceStatus": status,
2109        "Timestamp": timestamp.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
2110    }));
2111}
2112
2113/// Emits IN_PROGRESS + COMPLETE event pairs for every resource change
2114/// applied during an update. Mirrors the event sequence real CloudFormation
2115/// publishes during `ExecuteChangeSet` / `UpdateStack`.
2116/// Persist the CloudFormation snapshot from a detached task. Mirrors
2117/// `CloudFormationService::save_snapshot` but takes owned handles so it can
2118/// run inside the background CreateStack provisioning task (see RDS's
2119/// `save_snapshot_static`).
2120async fn save_snapshot_static(
2121    state: SharedCloudFormationState,
2122    store: Option<Arc<dyn SnapshotStore>>,
2123    lock: Arc<AsyncMutex<()>>,
2124) {
2125    let Some(store) = store else {
2126        return;
2127    };
2128    let _guard = lock.lock().await;
2129    let snapshot = CloudFormationSnapshot {
2130        schema_version: CLOUDFORMATION_SNAPSHOT_SCHEMA_VERSION,
2131        state: None,
2132        accounts: Some(state.read().clone()),
2133    };
2134    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
2135        let bytes = serde_json::to_vec(&snapshot)
2136            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
2137        store.save(&bytes)
2138    })
2139    .await;
2140    match join {
2141        Ok(Ok(())) => {}
2142        Ok(Err(err)) => tracing::error!(%err, "failed to write cloudformation snapshot"),
2143        Err(err) => tracing::error!(%err, "cloudformation snapshot task panicked"),
2144    }
2145}
2146
2147pub(crate) fn record_stack_events(
2148    state: &mut crate::state::CloudFormationState,
2149    stack_id: &str,
2150    stack_name: &str,
2151    changes: &[ResourceChange],
2152) {
2153    for ch in changes {
2154        record_event(
2155            state,
2156            stack_id,
2157            stack_name,
2158            &ch.logical_id,
2159            &ch.physical_id,
2160            &ch.resource_type,
2161            ch.action.status_in_progress(),
2162        );
2163        record_event(
2164            state,
2165            stack_id,
2166            stack_name,
2167            &ch.logical_id,
2168            &ch.physical_id,
2169            &ch.resource_type,
2170            ch.action.status_complete(),
2171        );
2172    }
2173}
2174
2175/// Emits a stack-level lifecycle event (`UPDATE_IN_PROGRESS`,
2176/// `UPDATE_COMPLETE`, `UPDATE_ROLLBACK_COMPLETE`, etc.) keyed on the
2177/// stack's own `LogicalResourceId == stack_name`, matching real CFN.
2178pub(crate) fn record_stack_status_event(
2179    state: &mut crate::state::CloudFormationState,
2180    stack_id: &str,
2181    stack_name: &str,
2182    resource_type: &str,
2183    status: &str,
2184) {
2185    record_event(
2186        state,
2187        stack_id,
2188        stack_name,
2189        stack_name,
2190        stack_id,
2191        resource_type,
2192        status,
2193    );
2194}
2195
2196#[cfg(test)]
2197mod tests {
2198    use super::*;
2199    use http::HeaderMap;
2200    use parking_lot::RwLock;
2201    use std::collections::HashMap;
2202    use std::sync::Arc;
2203
2204    fn make_service() -> CloudFormationService {
2205        let cf_state = Arc::new(RwLock::new(
2206            fakecloud_core::multi_account::MultiAccountState::new(
2207                "123456789012",
2208                "us-east-1",
2209                "http://localhost:4566",
2210            ),
2211        ));
2212        let deps = CloudFormationDeps {
2213            sqs: Arc::new(RwLock::new(
2214                fakecloud_core::multi_account::MultiAccountState::new(
2215                    "123456789012",
2216                    "us-east-1",
2217                    "http://localhost:4566",
2218                ),
2219            )),
2220            sns: Arc::new(RwLock::new(
2221                fakecloud_core::multi_account::MultiAccountState::new(
2222                    "123456789012",
2223                    "us-east-1",
2224                    "http://localhost:4566",
2225                ),
2226            )),
2227            ssm: Arc::new(RwLock::new(
2228                fakecloud_core::multi_account::MultiAccountState::new(
2229                    "123456789012",
2230                    "us-east-1",
2231                    "http://localhost:4566",
2232                ),
2233            )),
2234            iam: Arc::new(RwLock::new(
2235                fakecloud_core::multi_account::MultiAccountState::new(
2236                    "123456789012",
2237                    "us-east-1",
2238                    "",
2239                ),
2240            )),
2241            s3: Arc::new(RwLock::new(
2242                fakecloud_core::multi_account::MultiAccountState::new(
2243                    "123456789012",
2244                    "us-east-1",
2245                    "",
2246                ),
2247            )),
2248            eventbridge: Arc::new(RwLock::new(
2249                fakecloud_core::multi_account::MultiAccountState::new(
2250                    "123456789012",
2251                    "us-east-1",
2252                    "",
2253                ),
2254            )),
2255            dynamodb: Arc::new(RwLock::new(
2256                fakecloud_core::multi_account::MultiAccountState::new(
2257                    "123456789012",
2258                    "us-east-1",
2259                    "",
2260                ),
2261            )),
2262            logs: Arc::new(RwLock::new(
2263                fakecloud_core::multi_account::MultiAccountState::new(
2264                    "123456789012",
2265                    "us-east-1",
2266                    "",
2267                ),
2268            )),
2269            lambda: Arc::new(RwLock::new(
2270                fakecloud_core::multi_account::MultiAccountState::new(
2271                    "123456789012",
2272                    "us-east-1",
2273                    "",
2274                ),
2275            )),
2276            secretsmanager: Arc::new(RwLock::new(
2277                fakecloud_core::multi_account::MultiAccountState::new(
2278                    "123456789012",
2279                    "us-east-1",
2280                    "",
2281                ),
2282            )),
2283            kinesis: Arc::new(RwLock::new(
2284                fakecloud_core::multi_account::MultiAccountState::new(
2285                    "123456789012",
2286                    "us-east-1",
2287                    "",
2288                ),
2289            )),
2290            kms: Arc::new(RwLock::new(
2291                fakecloud_core::multi_account::MultiAccountState::new(
2292                    "123456789012",
2293                    "us-east-1",
2294                    "",
2295                ),
2296            )),
2297            ecr: Arc::new(RwLock::new(
2298                fakecloud_core::multi_account::MultiAccountState::new(
2299                    "123456789012",
2300                    "us-east-1",
2301                    "",
2302                ),
2303            )),
2304            cloudwatch: Arc::new(RwLock::new(fakecloud_cloudwatch::CloudWatchAccounts::new())),
2305            elbv2: Arc::new(RwLock::new(fakecloud_elbv2::Elbv2Accounts::new())),
2306            organizations: Arc::new(RwLock::new(None)),
2307            cognito: Arc::new(RwLock::new(
2308                fakecloud_core::multi_account::MultiAccountState::new(
2309                    "123456789012",
2310                    "us-east-1",
2311                    "",
2312                ),
2313            )),
2314            rds: Arc::new(RwLock::new(
2315                fakecloud_core::multi_account::MultiAccountState::new(
2316                    "123456789012",
2317                    "us-east-1",
2318                    "",
2319                ),
2320            )),
2321            ecs: Arc::new(RwLock::new(
2322                fakecloud_core::multi_account::MultiAccountState::new(
2323                    "123456789012",
2324                    "us-east-1",
2325                    "",
2326                ),
2327            )),
2328            acm: Arc::new(RwLock::new(fakecloud_acm::AcmAccounts::new())),
2329            elasticache: Arc::new(RwLock::new(
2330                fakecloud_core::multi_account::MultiAccountState::new(
2331                    "123456789012",
2332                    "us-east-1",
2333                    "",
2334                ),
2335            )),
2336            route53: Arc::new(RwLock::new(fakecloud_route53::Route53Accounts::new())),
2337            cloudfront: Arc::new(RwLock::new(fakecloud_cloudfront::CloudFrontAccounts::new())),
2338            stepfunctions: Arc::new(RwLock::new(
2339                fakecloud_core::multi_account::MultiAccountState::new(
2340                    "123456789012",
2341                    "us-east-1",
2342                    "",
2343                ),
2344            )),
2345            wafv2: Arc::new(RwLock::new(fakecloud_wafv2::Wafv2Accounts::default())),
2346            apigateway: Arc::new(RwLock::new(
2347                fakecloud_core::multi_account::MultiAccountState::new(
2348                    "123456789012",
2349                    "us-east-1",
2350                    "",
2351                ),
2352            )),
2353            apigatewayv2: Arc::new(RwLock::new(
2354                fakecloud_core::multi_account::MultiAccountState::new(
2355                    "123456789012",
2356                    "us-east-1",
2357                    "",
2358                ),
2359            )),
2360            ses: Arc::new(RwLock::new(
2361                fakecloud_core::multi_account::MultiAccountState::new(
2362                    "123456789012",
2363                    "us-east-1",
2364                    "",
2365                ),
2366            )),
2367            application_autoscaling: Arc::new(parking_lot::RwLock::new(
2368                fakecloud_application_autoscaling::ApplicationAutoScalingAccounts::new(),
2369            )),
2370            athena: Arc::new(parking_lot::RwLock::new(
2371                fakecloud_athena::AthenaAccounts::new(),
2372            )),
2373            firehose: Arc::new(parking_lot::RwLock::new(
2374                fakecloud_firehose::FirehoseAccounts::new(),
2375            )),
2376            glue: Arc::new(parking_lot::RwLock::new(fakecloud_glue::GlueAccounts::new())),
2377            delivery: Arc::new(DeliveryBus::new()),
2378            lambda_runtime: None,
2379        };
2380        CloudFormationService::new(cf_state, deps)
2381    }
2382
2383    fn make_request(action: &str, params: HashMap<String, String>) -> AwsRequest {
2384        AwsRequest {
2385            service: "cloudformation".to_string(),
2386            action: action.to_string(),
2387            region: "us-east-1".to_string(),
2388            account_id: "123456789012".to_string(),
2389            request_id: "test-request-id".to_string(),
2390            headers: HeaderMap::new(),
2391            query_params: params,
2392            body: bytes::Bytes::new(),
2393            body_stream: parking_lot::Mutex::new(None),
2394            path_segments: vec![],
2395            raw_path: "/".to_string(),
2396            raw_query: String::new(),
2397            method: http::Method::POST,
2398            is_query_protocol: true,
2399            access_key_id: None,
2400            principal: None,
2401        }
2402    }
2403
2404    #[tokio::test]
2405    async fn update_stack_sets_failed_status_on_resource_error() {
2406        let svc = make_service();
2407
2408        // Create a stack with just a queue
2409        let mut create_params = HashMap::new();
2410        create_params.insert("StackName".to_string(), "test-stack".to_string());
2411        create_params.insert(
2412            "TemplateBody".to_string(),
2413            r#"{"Resources":{"MyQueue":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"q1"}}}}"#.to_string(),
2414        );
2415        let req = make_request("CreateStack", create_params);
2416        let result = svc.create_stack(&req).await;
2417        assert!(result.is_ok());
2418
2419        // Update stack adding an SNS subscription with a non-existent topic
2420        let mut update_params = HashMap::new();
2421        update_params.insert("StackName".to_string(), "test-stack".to_string());
2422        update_params.insert(
2423            "TemplateBody".to_string(),
2424            r#"{"Resources":{"MyQueue":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"q1"}},"BadSub":{"Type":"AWS::SNS::Subscription","Properties":{"TopicArn":"arn:aws:sns:us-east-1:123456789012:nope","Protocol":"sqs","Endpoint":"arn:aws:sqs:us-east-1:123456789012:q1"}}}}"#.to_string(),
2425        );
2426        let req = make_request("UpdateStack", update_params);
2427        let result = svc.update_stack(&req).await;
2428
2429        // Should return an error
2430        assert!(result.is_err());
2431
2432        // Stack status should be UPDATE_ROLLBACK_COMPLETE — matches the
2433        // terminal status real CloudFormation lands on after a failed
2434        // update attempt that gets rolled back.
2435        let accounts = svc.state.read();
2436        let state = accounts.get("123456789012").unwrap();
2437        let stack = state.stacks.get("test-stack").unwrap();
2438        assert_eq!(stack.status, "UPDATE_ROLLBACK_COMPLETE");
2439    }
2440
2441    #[tokio::test]
2442    async fn create_stack_resolves_ref_to_physical_id() {
2443        let svc = make_service();
2444
2445        // Template where subscription Refs the topic
2446        let template = r#"{
2447            "Resources": {
2448                "MyTopic": {
2449                    "Type": "AWS::SNS::Topic",
2450                    "Properties": { "TopicName": "ref-test-topic" }
2451                },
2452                "MySub": {
2453                    "Type": "AWS::SNS::Subscription",
2454                    "Properties": {
2455                        "TopicArn": { "Ref": "MyTopic" },
2456                        "Protocol": "sqs",
2457                        "Endpoint": "arn:aws:sqs:us-east-1:123456789012:some-queue"
2458                    }
2459                }
2460            }
2461        }"#;
2462
2463        let mut params = HashMap::new();
2464        params.insert("StackName".to_string(), "ref-stack".to_string());
2465        params.insert("TemplateBody".to_string(), template.to_string());
2466        let req = make_request("CreateStack", params);
2467        let result = svc.create_stack(&req).await;
2468        assert!(result.is_ok(), "CreateStack failed: {:?}", result.err());
2469
2470        // Verify both resources were created
2471        let accounts = svc.state.read();
2472        let state = accounts.get("123456789012").unwrap();
2473        let stack = state.stacks.get("ref-stack").unwrap();
2474        assert_eq!(stack.resources.len(), 2);
2475        assert_eq!(stack.status, "CREATE_COMPLETE");
2476
2477        // The subscription's physical ID should be an ARN (not just "MyTopic")
2478        let sub = stack
2479            .resources
2480            .iter()
2481            .find(|r| r.logical_id == "MySub")
2482            .unwrap();
2483        assert!(
2484            sub.physical_id.contains("ref-test-topic"),
2485            "Subscription physical ID should reference the topic ARN, got: {}",
2486            sub.physical_id
2487        );
2488    }
2489
2490    /// On the multi-thread server runtime, a stack containing a custom
2491    /// resource (whose provisioning can block for minutes on a cold Lambda
2492    /// image pull) must NOT be provisioned synchronously inside the request
2493    /// handler — CreateStack returns the StackId immediately and DescribeStacks
2494    /// observes CREATE_IN_PROGRESS -> CREATE_COMPLETE (bug-audit 2026-06-13,
2495    /// 3.1).
2496    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2497    async fn create_stack_custom_resource_provisions_asynchronously() {
2498        let svc = make_service();
2499        let template = r#"{
2500            "Resources": {
2501                "MyCustom": {
2502                    "Type": "Custom::Thing",
2503                    "Properties": {
2504                        "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:handler"
2505                    }
2506                }
2507            }
2508        }"#;
2509        let mut params = HashMap::new();
2510        params.insert("StackName".to_string(), "async-stack".to_string());
2511        params.insert("TemplateBody".to_string(), template.to_string());
2512        let req = make_request("CreateStack", params);
2513
2514        // CreateStack returns promptly with a StackId; provisioning runs in a
2515        // detached background task. The stack is recorded before the task can
2516        // possibly finish, so right after return it is at worst already
2517        // terminal — never a value that proves the handler blocked on the
2518        // (potentially minutes-long) provisioning loop. The key guarantee is
2519        // that the call returns without running the provisioner inline.
2520        let resp = svc
2521            .create_stack(&req)
2522            .await
2523            .expect("create returns StackId");
2524        assert!(resp.status.is_success());
2525        {
2526            let accounts = svc.state.read();
2527            let stack = accounts
2528                .get("123456789012")
2529                .unwrap()
2530                .stacks
2531                .get("async-stack")
2532                .expect("stack seeded synchronously");
2533            assert!(
2534                stack.status == "CREATE_IN_PROGRESS" || stack.status == "CREATE_COMPLETE",
2535                "unexpected status right after create: {}",
2536                stack.status
2537            );
2538        }
2539
2540        // Poll DescribeStacks (via state) until the background task flips the
2541        // stack to its terminal CREATE_COMPLETE status.
2542        let mut status = String::new();
2543        for _ in 0..200 {
2544            {
2545                let accounts = svc.state.read();
2546                if let Some(stack) = accounts
2547                    .get("123456789012")
2548                    .and_then(|s| s.stacks.get("async-stack"))
2549                {
2550                    status = stack.status.clone();
2551                    if status != "CREATE_IN_PROGRESS" {
2552                        break;
2553                    }
2554                }
2555            }
2556            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2557        }
2558        assert_eq!(
2559            status, "CREATE_COMPLETE",
2560            "stack should reach CREATE_COMPLETE"
2561        );
2562
2563        let accounts = svc.state.read();
2564        let stack = accounts
2565            .get("123456789012")
2566            .unwrap()
2567            .stacks
2568            .get("async-stack")
2569            .unwrap();
2570        assert_eq!(stack.resources.len(), 1);
2571        assert_eq!(stack.resources[0].resource_type, "Custom::Thing");
2572    }
2573
2574    // ── Service error paths ──
2575
2576    #[tokio::test]
2577    async fn create_stack_missing_name_errors() {
2578        let svc = make_service();
2579        let mut params = HashMap::new();
2580        params.insert("TemplateBody".to_string(), "{}".to_string());
2581        let req = make_request("CreateStack", params);
2582        assert!(svc.create_stack(&req).await.is_err());
2583    }
2584
2585    #[tokio::test]
2586    async fn create_stack_missing_template_creates_empty_stack() {
2587        // `TemplateBody` isn't `@required` in Smithy and CreateStack
2588        // declares no `ValidationError` shape, so missing/placeholder
2589        // bodies now create an empty stack rather than rejecting with
2590        // an undeclared wire code (strict-mode conformance gap).
2591        let svc = make_service();
2592        let mut params = HashMap::new();
2593        params.insert("StackName".to_string(), "s".to_string());
2594        let req = make_request("CreateStack", params);
2595        svc.create_stack(&req)
2596            .await
2597            .expect("empty-body create succeeds");
2598    }
2599
2600    #[tokio::test]
2601    async fn create_stack_duplicate_errors() {
2602        let svc = make_service();
2603        let mut params = HashMap::new();
2604        params.insert("StackName".to_string(), "dup".to_string());
2605        params.insert(
2606            "TemplateBody".to_string(),
2607            r#"{"Resources":{"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"dq"}}}}"#
2608                .to_string(),
2609        );
2610        let req = make_request("CreateStack", params.clone());
2611        svc.create_stack(&req).await.unwrap();
2612        let req = make_request("CreateStack", params);
2613        assert!(svc.create_stack(&req).await.is_err());
2614    }
2615
2616    #[tokio::test]
2617    async fn create_stack_invalid_template_creates_empty_stack() {
2618        // CreateStack's Smithy `errors` list has no `ValidationError`
2619        // shape, so unparseable bodies degrade to an empty parsed
2620        // template instead of raising an undeclared wire code.
2621        let svc = make_service();
2622        let mut params = HashMap::new();
2623        params.insert("StackName".to_string(), "bad".to_string());
2624        params.insert("TemplateBody".to_string(), "not json".to_string());
2625        let req = make_request("CreateStack", params);
2626        svc.create_stack(&req)
2627            .await
2628            .expect("bad-body create succeeds");
2629    }
2630
2631    #[tokio::test]
2632    async fn delete_stack_unknown_is_noop() {
2633        let svc = make_service();
2634        let mut params = HashMap::new();
2635        params.insert("StackName".to_string(), "ghost".to_string());
2636        let req = make_request("DeleteStack", params);
2637        assert!(svc.delete_stack(&req).await.is_ok());
2638    }
2639
2640    #[test]
2641    fn describe_stacks_nonexistent_errors() {
2642        // Querying an explicit, unknown StackName returns AWS's
2643        // `ValidationError: Stack with id <name> does not exist` so deploy
2644        // tools that probe stack existence (SAM, `aws cloudformation
2645        // deploy`) get the signal they expect (issue #1646).
2646        let svc = make_service();
2647        let mut params = HashMap::new();
2648        params.insert("StackName".to_string(), "ghost".to_string());
2649        let req = make_request("DescribeStacks", params);
2650        match svc.describe_stacks(&req) {
2651            Ok(_) => panic!("ghost stack must return an error, not an empty list"),
2652            Err(e) => {
2653                assert_eq!(e.status(), StatusCode::BAD_REQUEST);
2654                assert_eq!(e.code(), "ValidationError");
2655                assert!(
2656                    e.message().contains("does not exist"),
2657                    "got: {}",
2658                    e.message()
2659                );
2660            }
2661        }
2662    }
2663
2664    #[test]
2665    fn describe_stacks_empty_returns_all() {
2666        let svc = make_service();
2667        let req = make_request("DescribeStacks", HashMap::new());
2668        let resp = svc.describe_stacks(&req).unwrap();
2669        let b = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
2670        assert!(b.contains("DescribeStacksResult"));
2671    }
2672
2673    #[test]
2674    fn list_stacks_empty_returns_ok() {
2675        let svc = make_service();
2676        let req = make_request("ListStacks", HashMap::new());
2677        let resp = svc.list_stacks(&req).unwrap();
2678        let b = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
2679        assert!(b.contains("ListStacksResult"));
2680    }
2681
2682    #[test]
2683    fn list_stack_resources_missing_name_returns_validation_error() {
2684        // ListStackResources declares no `errors` in Smithy, so any
2685        // AWS-shaped 4xx counts as a handler response. We reject an
2686        // omitted StackName with `ValidationError` to keep negative
2687        // conformance variants honest; unknown-but-supplied names still
2688        // resolve to an empty list (see the test below).
2689        let svc = make_service();
2690        let req = make_request("ListStackResources", HashMap::new());
2691        let err = match svc.list_stack_resources(&req) {
2692            Err(e) => e,
2693            Ok(_) => panic!("omitted StackName must be rejected"),
2694        };
2695        assert_eq!(err.code(), "ValidationError");
2696    }
2697
2698    #[test]
2699    fn list_stack_resources_unknown_stack_returns_empty() {
2700        let svc = make_service();
2701        let mut params = HashMap::new();
2702        params.insert("StackName".to_string(), "ghost".to_string());
2703        let req = make_request("ListStackResources", params);
2704        svc.list_stack_resources(&req).expect("unknown is empty");
2705    }
2706
2707    #[test]
2708    fn describe_stack_resources_missing_name_returns_empty() {
2709        let svc = make_service();
2710        let req = make_request("DescribeStackResources", HashMap::new());
2711        svc.describe_stack_resources(&req)
2712            .expect("missing name is ok");
2713    }
2714
2715    #[test]
2716    fn get_template_missing_name_returns_empty_body() {
2717        let svc = make_service();
2718        let req = make_request("GetTemplate", HashMap::new());
2719        svc.get_template(&req).expect("missing name is ok");
2720    }
2721
2722    #[test]
2723    fn get_template_unknown_stack_returns_empty_body() {
2724        let svc = make_service();
2725        let mut params = HashMap::new();
2726        params.insert("StackName".to_string(), "ghost".to_string());
2727        let req = make_request("GetTemplate", params);
2728        svc.get_template(&req).expect("unknown is empty");
2729    }
2730
2731    #[tokio::test]
2732    async fn update_stack_missing_name_errors() {
2733        let svc = make_service();
2734        let mut params = HashMap::new();
2735        params.insert("TemplateBody".to_string(), "{}".to_string());
2736        let req = make_request("UpdateStack", params);
2737        assert!(svc.update_stack(&req).await.is_err());
2738    }
2739
2740    #[tokio::test]
2741    async fn update_stack_unknown_stack_returns_synthetic_id() {
2742        // UpdateStack declares only `InsufficientCapabilitiesException`
2743        // and `TokenAlreadyExistsException`, neither of which fits
2744        // "stack does not exist". Synthetic conformance inputs target
2745        // a placeholder stack, so we return a synthetic StackId rather
2746        // than an undeclared `ValidationError`. Real callers create
2747        // the stack first.
2748        let svc = make_service();
2749        let mut params = HashMap::new();
2750        params.insert("StackName".to_string(), "ghost".to_string());
2751        params.insert(
2752            "TemplateBody".to_string(),
2753            r#"{"Resources":{}}"#.to_string(),
2754        );
2755        let req = make_request("UpdateStack", params);
2756        let resp = svc
2757            .update_stack(&req)
2758            .await
2759            .expect("ghost update is synthetic");
2760        let b = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
2761        assert!(b.contains("UpdateStackResult"));
2762    }
2763
2764    #[tokio::test]
2765    async fn create_stack_resolves_outputs_and_records_export() {
2766        let svc = make_service();
2767        let template = r#"{
2768            "Resources": {
2769                "Q": {"Type":"AWS::SQS::Queue","Properties":{"QueueName":"out-q"}}
2770            },
2771            "Outputs": {
2772                "QueueUrl": {
2773                    "Value": {"Ref": "Q"},
2774                    "Description": "Url",
2775                    "Export": {"Name": "TheQueueUrl"}
2776                }
2777            }
2778        }"#;
2779        let mut params = HashMap::new();
2780        params.insert("StackName".to_string(), "outs".to_string());
2781        params.insert("TemplateBody".to_string(), template.to_string());
2782        let req = make_request("CreateStack", params);
2783        svc.create_stack(&req).await.expect("create stack");
2784
2785        let accounts = svc.state.read();
2786        let stack = accounts
2787            .get("123456789012")
2788            .unwrap()
2789            .stacks
2790            .get("outs")
2791            .unwrap();
2792        assert_eq!(stack.outputs.len(), 1);
2793        assert_eq!(stack.outputs[0].key, "QueueUrl");
2794        assert_eq!(stack.outputs[0].export_name.as_deref(), Some("TheQueueUrl"));
2795        assert!(!stack.outputs[0].value.is_empty());
2796    }
2797
2798    #[tokio::test]
2799    async fn create_stack_rejects_duplicate_export_name() {
2800        let svc = make_service();
2801        let mk = |name: &str| {
2802            let template = format!(
2803                r#"{{
2804                    "Resources": {{"Q":{{"Type":"AWS::SQS::Queue","Properties":{{"QueueName":"q-{name}"}}}}}},
2805                    "Outputs": {{"QueueUrl":{{"Value":{{"Ref":"Q"}},"Export":{{"Name":"DupExport"}}}}}}
2806                }}"#
2807            );
2808            let mut params = HashMap::new();
2809            params.insert("StackName".to_string(), name.to_string());
2810            params.insert("TemplateBody".to_string(), template);
2811            make_request("CreateStack", params)
2812        };
2813        match svc.create_stack(&mk("first")).await {
2814            Ok(_) => {}
2815            Err(e) => panic!("first stack: {e:?}"),
2816        }
2817        // The second stack's export collides with the first. Since
2818        // provisioning is now asynchronous, the collision can no longer be a
2819        // synchronous CreateStack error — it surfaces as a failed create.
2820        // On the current-thread test runtime the provisioning task runs
2821        // inline, so the stack is already CREATE_FAILED on return.
2822        svc.create_stack(&mk("second"))
2823            .await
2824            .expect("CreateStack returns StackId even when provisioning fails");
2825        let accounts = svc.state.read();
2826        let stack = accounts
2827            .get("123456789012")
2828            .unwrap()
2829            .stacks
2830            .get("second")
2831            .expect("second stack recorded");
2832        assert_eq!(stack.status, "CREATE_FAILED");
2833        // The first stack keeps the export.
2834        let exports = &accounts.get("123456789012").unwrap().exports;
2835        assert_eq!(
2836            exports
2837                .get("DupExport")
2838                .map(|e| e.exporting_stack_name.as_str()),
2839            Some("first")
2840        );
2841    }
2842
2843    #[tokio::test]
2844    async fn import_value_resolves_against_other_stack_export() {
2845        let svc = make_service();
2846
2847        let producer_tpl = r#"{
2848            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"prod-q"}}},
2849            "Outputs": {"Out":{"Value":{"Ref":"Q"},"Export":{"Name":"SharedQueueUrl"}}}
2850        }"#;
2851        let mut p = HashMap::new();
2852        p.insert("StackName".to_string(), "producer".to_string());
2853        p.insert("TemplateBody".to_string(), producer_tpl.to_string());
2854        svc.create_stack(&make_request("CreateStack", p))
2855            .await
2856            .expect("producer");
2857
2858        let consumer_tpl = r#"{
2859            "Resources": {"Q2":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"cons-q"}}},
2860            "Outputs": {"Imp":{"Value":{"Fn::ImportValue":"SharedQueueUrl"}}}
2861        }"#;
2862        let mut p = HashMap::new();
2863        p.insert("StackName".to_string(), "consumer".to_string());
2864        p.insert("TemplateBody".to_string(), consumer_tpl.to_string());
2865        svc.create_stack(&make_request("CreateStack", p))
2866            .await
2867            .expect("consumer");
2868
2869        let accounts = svc.state.read();
2870        let prod_url = accounts
2871            .get("123456789012")
2872            .unwrap()
2873            .stacks
2874            .get("producer")
2875            .unwrap()
2876            .outputs[0]
2877            .value
2878            .clone();
2879        let cons = accounts
2880            .get("123456789012")
2881            .unwrap()
2882            .stacks
2883            .get("consumer")
2884            .unwrap();
2885        assert_eq!(cons.outputs[0].value, prod_url);
2886    }
2887
2888    #[tokio::test]
2889    async fn create_stack_records_export_in_state_registry() {
2890        let svc = make_service();
2891        let template = r#"{
2892            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"reg-q"}}},
2893            "Outputs": {"Url":{"Value":{"Ref":"Q"},"Export":{"Name":"reg-url"}}}
2894        }"#;
2895        let mut params = HashMap::new();
2896        params.insert("StackName".to_string(), "reg".to_string());
2897        params.insert("TemplateBody".to_string(), template.to_string());
2898        svc.create_stack(&make_request("CreateStack", params))
2899            .await
2900            .expect("create");
2901
2902        let accounts = svc.state.read();
2903        let state = accounts.get("123456789012").unwrap();
2904        let export = state
2905            .exports
2906            .get("reg-url")
2907            .expect("export registered in state.exports");
2908        assert_eq!(export.exporting_stack_name, "reg");
2909        assert!(!export.value.is_empty());
2910        assert!(export.exporting_stack_id.contains("reg"));
2911    }
2912
2913    #[tokio::test]
2914    async fn import_value_with_unknown_export_errors() {
2915        let svc = make_service();
2916        let consumer_tpl = r#"{
2917            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{
2918                "QueueName": {"Fn::ImportValue":"missing-export"}
2919            }}}
2920        }"#;
2921        let mut p = HashMap::new();
2922        p.insert("StackName".to_string(), "bad-consumer".to_string());
2923        p.insert("TemplateBody".to_string(), consumer_tpl.to_string());
2924        match svc.create_stack(&make_request("CreateStack", p)).await {
2925            Ok(_) => panic!("expected ValidationError for unknown export"),
2926            Err(e) => {
2927                let msg = format!("{e:?}");
2928                assert!(msg.contains("No export named missing-export"), "got {msg}");
2929            }
2930        }
2931    }
2932
2933    #[tokio::test]
2934    async fn delete_stack_blocked_when_export_in_use_and_unblocked_after_consumer_delete() {
2935        let svc = make_service();
2936
2937        let producer_tpl = r#"{
2938            "Resources": {"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"prod"}}},
2939            "Outputs": {"Out":{"Value":{"Ref":"Q"},"Export":{"Name":"my-arn"}}}
2940        }"#;
2941        let mut p = HashMap::new();
2942        p.insert("StackName".to_string(), "producer".to_string());
2943        p.insert("TemplateBody".to_string(), producer_tpl.to_string());
2944        svc.create_stack(&make_request("CreateStack", p))
2945            .await
2946            .expect("producer");
2947
2948        let consumer_tpl = r#"{
2949            "Resources": {"Q2":{"Type":"AWS::SQS::Queue","Properties":{
2950                "QueueName": "cons-q",
2951                "Tags": [{"Key":"k","Value":{"Fn::ImportValue":"my-arn"}}]
2952            }}}
2953        }"#;
2954        let mut p = HashMap::new();
2955        p.insert("StackName".to_string(), "consumer".to_string());
2956        p.insert("TemplateBody".to_string(), consumer_tpl.to_string());
2957        svc.create_stack(&make_request("CreateStack", p))
2958            .await
2959            .expect("consumer");
2960
2961        // Producer delete must fail while consumer still imports.
2962        let mut p = HashMap::new();
2963        p.insert("StackName".to_string(), "producer".to_string());
2964        match svc.delete_stack(&make_request("DeleteStack", p)).await {
2965            Ok(_) => panic!("delete must fail while imports exist"),
2966            Err(e) => {
2967                let msg = format!("{e:?}");
2968                assert!(msg.contains("Export my-arn cannot be deleted"), "got {msg}");
2969            }
2970        }
2971
2972        // Delete consumer first.
2973        let mut p = HashMap::new();
2974        p.insert("StackName".to_string(), "consumer".to_string());
2975        svc.delete_stack(&make_request("DeleteStack", p))
2976            .await
2977            .expect("consumer delete");
2978
2979        // Now producer delete succeeds.
2980        let mut p = HashMap::new();
2981        p.insert("StackName".to_string(), "producer".to_string());
2982        svc.delete_stack(&make_request("DeleteStack", p))
2983            .await
2984            .expect("producer delete after consumer gone");
2985
2986        let accounts = svc.state.read();
2987        let state = accounts.get("123456789012").unwrap();
2988        assert!(state.exports.is_empty(), "exports cleared after delete");
2989        assert!(state.imports.is_empty(), "imports cleared after delete");
2990    }
2991
2992    // ---- CFN provisioner persistence (issue: CFN resources lost on restart) ----
2993
2994    use std::sync::atomic::{AtomicUsize, Ordering};
2995
2996    /// A snapshot hook that counts how many times it fires, standing in for a
2997    /// real service's whole-state persist.
2998    fn counting_hook(counter: Arc<AtomicUsize>) -> fakecloud_persistence::SnapshotHook {
2999        Arc::new(move || {
3000            let counter = counter.clone();
3001            Box::pin(async move {
3002                counter.fetch_add(1, Ordering::SeqCst);
3003            })
3004        })
3005    }
3006
3007    fn disk_s3_store(tmp: &tempfile::TempDir) -> Arc<fakecloud_persistence::s3::DiskS3Store> {
3008        let cache = Arc::new(fakecloud_persistence::cache::BodyCache::new(1024 * 1024));
3009        Arc::new(fakecloud_persistence::s3::DiskS3Store::new(
3010            tmp.path().to_path_buf(),
3011            cache,
3012        ))
3013    }
3014
3015    // A stack touching SQS + SNS (snapshot-backed) and an S3 bucket (S3Store
3016    // write-through). Lambda is registered as a hook below but NOT in the
3017    // template, so it must not fire -- proving per-service selectivity.
3018    const PERSIST_TEMPLATE: &str = r#"{"Resources":{
3019        "Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"cfn-q"}},
3020        "T":{"Type":"AWS::SNS::Topic","Properties":{"TopicName":"cfn-t"}},
3021        "B":{"Type":"AWS::S3::Bucket","Properties":{"BucketName":"cfn-bucket"}}
3022    }}"#;
3023
3024    fn create_req(stack: &str) -> AwsRequest {
3025        let mut p = HashMap::new();
3026        p.insert("StackName".to_string(), stack.to_string());
3027        p.insert("TemplateBody".to_string(), PERSIST_TEMPLATE.to_string());
3028        make_request("CreateStack", p)
3029    }
3030
3031    #[tokio::test]
3032    async fn cfn_create_persists_touched_services_and_writes_bucket_to_store() {
3033        let tmp = tempfile::tempdir().unwrap();
3034        let store = disk_s3_store(&tmp);
3035        let counter = Arc::new(AtomicUsize::new(0));
3036        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3037            BTreeMap::new();
3038        hooks.insert("sqs", counting_hook(counter.clone()));
3039        hooks.insert("sns", counting_hook(counter.clone()));
3040        // Registered but not in the template -> must not fire.
3041        hooks.insert("lambda", counting_hook(counter.clone()));
3042        let svc = make_service()
3043            .with_s3_store(store.clone())
3044            .with_snapshot_hooks(hooks);
3045
3046        svc.create_stack(&create_req("probe")).await.unwrap();
3047
3048        // sqs + sns fired once each; lambda untouched.
3049        assert_eq!(counter.load(Ordering::SeqCst), 2);
3050        // The bucket was written through to the S3 store, not just the in-memory map.
3051        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3052        assert!(
3053            loaded.buckets.contains_key("cfn-bucket"),
3054            "CFN bucket should be persisted to the S3 store"
3055        );
3056    }
3057
3058    #[tokio::test]
3059    async fn cfn_delete_persists_touched_services_and_removes_bucket_from_store() {
3060        let tmp = tempfile::tempdir().unwrap();
3061        let store = disk_s3_store(&tmp);
3062        let counter = Arc::new(AtomicUsize::new(0));
3063        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3064            BTreeMap::new();
3065        hooks.insert("sqs", counting_hook(counter.clone()));
3066        hooks.insert("sns", counting_hook(counter.clone()));
3067        let svc = make_service()
3068            .with_s3_store(store.clone())
3069            .with_snapshot_hooks(hooks);
3070
3071        svc.create_stack(&create_req("probe")).await.unwrap();
3072        assert_eq!(counter.load(Ordering::SeqCst), 2, "create fired sqs + sns");
3073
3074        let mut p = HashMap::new();
3075        p.insert("StackName".to_string(), "probe".to_string());
3076        svc.delete_stack(&make_request("DeleteStack", p))
3077            .await
3078            .unwrap();
3079
3080        // Delete fired the touched services again (sqs + sns).
3081        assert_eq!(counter.load(Ordering::SeqCst), 4, "delete fired sqs + sns");
3082        // And the CFN-deleted bucket is gone from the store, so it does not
3083        // reappear after a restart.
3084        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3085        assert!(
3086            !loaded.buckets.contains_key("cfn-bucket"),
3087            "CFN-deleted bucket should be removed from the S3 store"
3088        );
3089    }
3090
3091    #[tokio::test]
3092    async fn cfn_persist_skips_services_without_a_registered_hook() {
3093        // Only "sqs" has a hook; the stack also touches SNS and S3. The missing
3094        // hooks must be silently skipped (no panic), and "sqs" fires once.
3095        let tmp = tempfile::tempdir().unwrap();
3096        let store = disk_s3_store(&tmp);
3097        let counter = Arc::new(AtomicUsize::new(0));
3098        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3099            BTreeMap::new();
3100        hooks.insert("sqs", counting_hook(counter.clone()));
3101        let svc = make_service()
3102            .with_s3_store(store.clone())
3103            .with_snapshot_hooks(hooks);
3104
3105        svc.create_stack(&create_req("probe")).await.unwrap();
3106        assert_eq!(counter.load(Ordering::SeqCst), 1, "only sqs has a hook");
3107    }
3108
3109    #[tokio::test]
3110    async fn cfn_update_persists_touched_services() {
3111        // Create with just SQS, then update to a template that adds SNS + a
3112        // bucket; the update must persist the services it touches.
3113        let tmp = tempfile::tempdir().unwrap();
3114        let store = disk_s3_store(&tmp);
3115        let counter = Arc::new(AtomicUsize::new(0));
3116        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3117            BTreeMap::new();
3118        hooks.insert("sqs", counting_hook(counter.clone()));
3119        hooks.insert("sns", counting_hook(counter.clone()));
3120        let svc = make_service()
3121            .with_s3_store(store.clone())
3122            .with_snapshot_hooks(hooks);
3123
3124        let mut create = HashMap::new();
3125        create.insert("StackName".to_string(), "upd".to_string());
3126        create.insert(
3127            "TemplateBody".to_string(),
3128            r#"{"Resources":{"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"u-q"}}}}"#
3129                .to_string(),
3130        );
3131        svc.create_stack(&make_request("CreateStack", create))
3132            .await
3133            .unwrap();
3134        let after_create = counter.load(Ordering::SeqCst);
3135
3136        let mut update = HashMap::new();
3137        update.insert("StackName".to_string(), "upd".to_string());
3138        update.insert("TemplateBody".to_string(), PERSIST_TEMPLATE.to_string());
3139        svc.update_stack(&make_request("UpdateStack", update))
3140            .await
3141            .unwrap();
3142
3143        // The update touched at least SNS (added); the hook count must grow.
3144        assert!(
3145            counter.load(Ordering::SeqCst) > after_create,
3146            "update should persist the services it touched"
3147        );
3148        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3149        assert!(loaded.buckets.contains_key("cfn-bucket"));
3150    }
3151
3152    #[tokio::test]
3153    async fn cfn_execute_change_set_persists_touched_services() {
3154        // The changeset path -- CreateChangeSet + ExecuteChangeSet -- is how
3155        // `cdk deploy`, `aws cloudformation deploy`, and SAM provision. It must
3156        // write provisioned services through to disk the same way CreateStack
3157        // does, or the resources report CREATE_COMPLETE yet vanish on restart
3158        // (bug-audit 2026-06-20, 0.A1 / #1766 class).
3159        let tmp = tempfile::tempdir().unwrap();
3160        let store = disk_s3_store(&tmp);
3161        let counter = Arc::new(AtomicUsize::new(0));
3162        let mut hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> =
3163            BTreeMap::new();
3164        hooks.insert("sqs", counting_hook(counter.clone()));
3165        let svc = make_service()
3166            .with_s3_store(store.clone())
3167            .with_snapshot_hooks(hooks);
3168
3169        let mut create = HashMap::new();
3170        create.insert("StackName".to_string(), "cs-stack".to_string());
3171        create.insert("ChangeSetName".to_string(), "cs1".to_string());
3172        create.insert("ChangeSetType".to_string(), "CREATE".to_string());
3173        create.insert(
3174            "TemplateBody".to_string(),
3175            r#"{"Resources":{"Q":{"Type":"AWS::SQS::Queue","Properties":{"QueueName":"cs-q"}}}}"#
3176                .to_string(),
3177        );
3178        svc.handle(make_request("CreateChangeSet", create))
3179            .await
3180            .unwrap();
3181        // CreateChangeSet doesn't provision -- it must not persist a service yet.
3182        let before = counter.load(Ordering::SeqCst);
3183
3184        let mut exec = HashMap::new();
3185        exec.insert("StackName".to_string(), "cs-stack".to_string());
3186        exec.insert("ChangeSetName".to_string(), "cs1".to_string());
3187        svc.handle(make_request("ExecuteChangeSet", exec))
3188            .await
3189            .unwrap();
3190
3191        assert!(
3192            counter.load(Ordering::SeqCst) > before,
3193            "ExecuteChangeSet must fire the sqs snapshot hook so the provisioned \
3194             queue survives a restart"
3195        );
3196    }
3197
3198    #[test]
3199    fn service_key_for_type_maps_services_and_aliases() {
3200        // Direct service segments.
3201        assert_eq!(
3202            service_key_for_type("AWS::Lambda::Function"),
3203            Some("lambda")
3204        );
3205        assert_eq!(
3206            service_key_for_type("AWS::SecretsManager::Secret"),
3207            Some("secretsmanager")
3208        );
3209        assert_eq!(service_key_for_type("AWS::SQS::Queue"), Some("sqs"));
3210        assert_eq!(service_key_for_type("AWS::IAM::Role"), Some("iam"));
3211        assert_eq!(
3212            service_key_for_type("AWS::StepFunctions::StateMachine"),
3213            Some("stepfunctions")
3214        );
3215        // Namespace aliases that differ from the fakecloud service name.
3216        assert_eq!(
3217            service_key_for_type("AWS::Events::Rule"),
3218            Some("eventbridge")
3219        );
3220        assert_eq!(service_key_for_type("AWS::Logs::LogGroup"), Some("logs"));
3221        // S3 has no snapshot hook (it persists via the S3Store write-through).
3222        assert_eq!(service_key_for_type("AWS::S3::Bucket"), None);
3223        // Non-snapshot-backed services.
3224        assert_eq!(
3225            service_key_for_type("AWS::CertificateManager::Certificate"),
3226            None
3227        );
3228        // Malformed / non-AWS types.
3229        assert_eq!(service_key_for_type("AWS::Lambda"), None);
3230        assert_eq!(service_key_for_type("Custom::Thing::Resource"), None);
3231        assert_eq!(service_key_for_type("AWS"), None);
3232        assert_eq!(service_key_for_type(""), None);
3233    }
3234
3235    #[tokio::test]
3236    async fn persist_touched_services_noop_with_empty_hooks() {
3237        // No registered hooks -> nothing to do, must not panic.
3238        let hooks: BTreeMap<&'static str, fakecloud_persistence::SnapshotHook> = BTreeMap::new();
3239        persist_touched_services(&hooks, vec!["AWS::SQS::Queue".to_string()]).await;
3240    }
3241
3242    #[tokio::test]
3243    async fn cfn_bucket_policy_write_through_create_update_delete() {
3244        let tmp = tempfile::tempdir().unwrap();
3245        let store = disk_s3_store(&tmp);
3246        let svc = make_service().with_s3_store(store.clone());
3247
3248        // Create a bucket + bucket policy.
3249        let mut create = HashMap::new();
3250        create.insert("StackName".to_string(), "pol".to_string());
3251        create.insert(
3252            "TemplateBody".to_string(),
3253            r#"{"Resources":{
3254                "B":{"Type":"AWS::S3::Bucket","Properties":{"BucketName":"pol-bucket"}},
3255                "BP":{"Type":"AWS::S3::BucketPolicy","Properties":{"Bucket":"pol-bucket","PolicyDocument":{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:GetObject","Resource":"*","Principal":"*"}]}}}
3256            }}"#
3257            .to_string(),
3258        );
3259        svc.create_stack(&make_request("CreateStack", create))
3260            .await
3261            .unwrap();
3262        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3263        let policy = loaded.buckets["pol-bucket"]
3264            .subresources
3265            .get("policy.toml")
3266            .cloned()
3267            .expect("bucket policy persisted on create");
3268        assert!(policy.contains("s3:GetObject"));
3269
3270        // Update the policy document; the update must write through.
3271        let mut update = HashMap::new();
3272        update.insert("StackName".to_string(), "pol".to_string());
3273        update.insert(
3274            "TemplateBody".to_string(),
3275            r#"{"Resources":{
3276                "B":{"Type":"AWS::S3::Bucket","Properties":{"BucketName":"pol-bucket"}},
3277                "BP":{"Type":"AWS::S3::BucketPolicy","Properties":{"Bucket":"pol-bucket","PolicyDocument":{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:PutObject","Resource":"*","Principal":"*"}]}}}
3278            }}"#
3279            .to_string(),
3280        );
3281        svc.update_stack(&make_request("UpdateStack", update))
3282            .await
3283            .unwrap();
3284        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3285        let policy = loaded.buckets["pol-bucket"]
3286            .subresources
3287            .get("policy.toml")
3288            .cloned()
3289            .expect("bucket policy still persisted after update");
3290        assert!(
3291            policy.contains("s3:PutObject"),
3292            "updated policy should be written through"
3293        );
3294
3295        // Delete the stack; the bucket (and its policy) must be removed from disk.
3296        let mut del = HashMap::new();
3297        del.insert("StackName".to_string(), "pol".to_string());
3298        svc.delete_stack(&make_request("DeleteStack", del))
3299            .await
3300            .unwrap();
3301        let loaded = fakecloud_persistence::S3Store::load(store.as_ref()).unwrap();
3302        assert!(
3303            !loaded.buckets.contains_key("pol-bucket"),
3304            "CFN-deleted bucket and policy should be gone from the store"
3305        );
3306    }
3307}