Skip to main content

fakecloud_cloudformation/
resource_provisioner.rs

1use chrono::Utc;
2use parking_lot::RwLock;
3use std::collections::HashMap;
4use std::sync::Arc;
5use uuid::Uuid;
6
7use fakecloud_core::delivery::DeliveryBus;
8use fakecloud_dynamodb::state::{
9    AttributeDefinition, DynamoTable, KeySchemaElement, ProvisionedThroughput, SharedDynamoDbState,
10};
11use fakecloud_eventbridge::state::{EventRule, SharedEventBridgeState};
12use fakecloud_iam::state::{IamPolicy, IamRole, PolicyVersion, SharedIamState};
13use fakecloud_logs::state::SharedLogsState;
14use fakecloud_s3::state::{S3Bucket, SharedS3State};
15use fakecloud_sns::state::{SharedSnsState, SnsSubscription, SnsTopic};
16use fakecloud_sqs::state::{SharedSqsState, SqsQueue};
17use fakecloud_ssm::state::{SharedSsmState, SsmParameter};
18
19use crate::state::StackResource;
20use crate::template::ResourceDefinition;
21
22/// Holds references to all service states so CloudFormation can provision resources.
23pub struct ResourceProvisioner {
24    pub sqs_state: SharedSqsState,
25    pub sns_state: SharedSnsState,
26    pub ssm_state: SharedSsmState,
27    pub iam_state: SharedIamState,
28    pub s3_state: SharedS3State,
29    pub eventbridge_state: SharedEventBridgeState,
30    pub dynamodb_state: SharedDynamoDbState,
31    pub logs_state: SharedLogsState,
32    pub delivery: Arc<DeliveryBus>,
33    pub account_id: String,
34    pub region: String,
35    pub stack_id: String,
36}
37
38impl ResourceProvisioner {
39    /// Create a resource and return the StackResource with physical ID.
40    pub fn create_resource(&self, resource: &ResourceDefinition) -> Result<StackResource, String> {
41        let result = match resource.resource_type.as_str() {
42            "AWS::SQS::Queue" => self.create_sqs_queue(resource),
43            "AWS::SNS::Topic" => self.create_sns_topic(resource),
44            "AWS::SNS::Subscription" => self.create_sns_subscription(resource),
45            "AWS::SSM::Parameter" => self.create_ssm_parameter(resource),
46            "AWS::IAM::Role" => self.create_iam_role(resource),
47            "AWS::IAM::Policy" => self.create_iam_policy(resource),
48            "AWS::S3::Bucket" => self.create_s3_bucket(resource),
49            "AWS::Events::Rule" => self.create_eventbridge_rule(resource),
50            "AWS::DynamoDB::Table" => self.create_dynamodb_table(resource),
51            "AWS::Logs::LogGroup" => self.create_log_group(resource),
52            t if t.starts_with("Custom::") || t == "AWS::CloudFormation::CustomResource" => {
53                self.create_custom_resource(resource)
54            }
55            other => Err(format!("Unsupported resource type: {other}")),
56        };
57
58        let is_custom = resource.resource_type.starts_with("Custom::")
59            || resource.resource_type == "AWS::CloudFormation::CustomResource";
60        let service_token = if is_custom {
61            resource
62                .properties
63                .get("ServiceToken")
64                .and_then(|v| v.as_str())
65                .map(|s| s.to_string())
66        } else {
67            None
68        };
69
70        result.map(|physical_id| StackResource {
71            logical_id: resource.logical_id.clone(),
72            physical_id,
73            resource_type: resource.resource_type.clone(),
74            status: "CREATE_COMPLETE".to_string(),
75            service_token,
76        })
77    }
78
79    /// Delete a previously created resource.
80    pub fn delete_resource(&self, resource: &StackResource) -> Result<(), String> {
81        match resource.resource_type.as_str() {
82            "AWS::SQS::Queue" => self.delete_sqs_queue(&resource.physical_id),
83            "AWS::SNS::Topic" => self.delete_sns_topic(&resource.physical_id),
84            "AWS::SNS::Subscription" => self.delete_sns_subscription(&resource.physical_id),
85            "AWS::SSM::Parameter" => self.delete_ssm_parameter(&resource.physical_id),
86            "AWS::IAM::Role" => self.delete_iam_role(&resource.physical_id),
87            "AWS::IAM::Policy" => self.delete_iam_policy(&resource.physical_id),
88            "AWS::S3::Bucket" => self.delete_s3_bucket(&resource.physical_id),
89            "AWS::Events::Rule" => self.delete_eventbridge_rule(&resource.physical_id),
90            "AWS::DynamoDB::Table" => self.delete_dynamodb_table(&resource.physical_id),
91            "AWS::Logs::LogGroup" => self.delete_log_group(&resource.physical_id),
92            t if t.starts_with("Custom::") || t == "AWS::CloudFormation::CustomResource" => {
93                self.delete_custom_resource(resource)
94            }
95            other => Err(format!("Unsupported resource type: {other}")),
96        }
97    }
98
99    // --- SQS ---
100
101    fn create_sqs_queue(&self, resource: &ResourceDefinition) -> Result<String, String> {
102        let props = &resource.properties;
103        let queue_name = props
104            .get("QueueName")
105            .and_then(|v| v.as_str())
106            .unwrap_or(&resource.logical_id);
107
108        let mut state = self.sqs_state.write();
109        let queue_url = format!("{}/{}/{}", state.endpoint, state.account_id, queue_name);
110        let arn = format!(
111            "arn:aws:sqs:{}:{}:{}",
112            state.region, state.account_id, queue_name
113        );
114
115        let is_fifo = queue_name.ends_with(".fifo");
116        let mut attributes = HashMap::new();
117        if let Some(obj) = props.as_object() {
118            for (k, v) in obj {
119                if k != "QueueName" {
120                    if let Some(s) = v.as_str() {
121                        attributes.insert(k.clone(), s.to_string());
122                    } else if let Some(n) = v.as_i64() {
123                        attributes.insert(k.clone(), n.to_string());
124                    }
125                }
126            }
127        }
128
129        let queue = SqsQueue {
130            queue_name: queue_name.to_string(),
131            queue_url: queue_url.clone(),
132            arn,
133            created_at: Utc::now(),
134            messages: std::collections::VecDeque::new(),
135            inflight: Vec::new(),
136            attributes,
137            is_fifo,
138            dedup_cache: HashMap::new(),
139            redrive_policy: None,
140            tags: HashMap::new(),
141            next_sequence_number: 0,
142            permission_labels: Vec::new(),
143            receipt_handle_map: HashMap::new(),
144        };
145
146        state
147            .name_to_url
148            .insert(queue_name.to_string(), queue_url.clone());
149        state.queues.insert(queue_url.clone(), queue);
150
151        Ok(queue_url)
152    }
153
154    fn delete_sqs_queue(&self, physical_id: &str) -> Result<(), String> {
155        let mut state = self.sqs_state.write();
156        if let Some(queue) = state.queues.remove(physical_id) {
157            state.name_to_url.remove(&queue.queue_name);
158        }
159        Ok(())
160    }
161
162    // --- SNS ---
163
164    fn create_sns_topic(&self, resource: &ResourceDefinition) -> Result<String, String> {
165        let props = &resource.properties;
166        let topic_name = props
167            .get("TopicName")
168            .and_then(|v| v.as_str())
169            .unwrap_or(&resource.logical_id);
170
171        let mut state = self.sns_state.write();
172        let topic_arn = format!(
173            "arn:aws:sns:{}:{}:{}",
174            state.region, state.account_id, topic_name
175        );
176
177        let topic = SnsTopic {
178            topic_arn: topic_arn.clone(),
179            name: topic_name.to_string(),
180            attributes: HashMap::new(),
181            tags: Vec::new(),
182            is_fifo: topic_name.ends_with(".fifo"),
183            created_at: Utc::now(),
184        };
185
186        state.topics.insert(topic_arn.clone(), topic);
187        Ok(topic_arn)
188    }
189
190    fn delete_sns_topic(&self, physical_id: &str) -> Result<(), String> {
191        let mut state = self.sns_state.write();
192        state.topics.remove(physical_id);
193        // Also remove subscriptions for this topic
194        state
195            .subscriptions
196            .retain(|_, sub| sub.topic_arn != physical_id);
197        Ok(())
198    }
199
200    // --- SNS Subscription ---
201
202    fn create_sns_subscription(&self, resource: &ResourceDefinition) -> Result<String, String> {
203        let props = &resource.properties;
204        let topic_arn = props
205            .get("TopicArn")
206            .and_then(|v| v.as_str())
207            .ok_or("SNS Subscription requires TopicArn")?;
208        let protocol = props
209            .get("Protocol")
210            .and_then(|v| v.as_str())
211            .ok_or("SNS Subscription requires Protocol")?;
212        let endpoint = props
213            .get("Endpoint")
214            .and_then(|v| v.as_str())
215            .ok_or("SNS Subscription requires Endpoint")?;
216
217        let mut state = self.sns_state.write();
218
219        // Validate that the topic exists
220        if !state.topics.contains_key(topic_arn) {
221            return Err(format!("Topic ARN does not exist: {topic_arn}"));
222        }
223
224        let sub_arn = format!("{}:{}", topic_arn, Uuid::new_v4());
225
226        let subscription = SnsSubscription {
227            subscription_arn: sub_arn.clone(),
228            topic_arn: topic_arn.to_string(),
229            protocol: protocol.to_string(),
230            endpoint: endpoint.to_string(),
231            owner: state.account_id.clone(),
232            attributes: HashMap::new(),
233            confirmed: true,
234            confirmation_token: None,
235        };
236
237        state.subscriptions.insert(sub_arn.clone(), subscription);
238        Ok(sub_arn)
239    }
240
241    fn delete_sns_subscription(&self, physical_id: &str) -> Result<(), String> {
242        let mut state = self.sns_state.write();
243        state.subscriptions.remove(physical_id);
244        Ok(())
245    }
246
247    // --- SSM ---
248
249    fn create_ssm_parameter(&self, resource: &ResourceDefinition) -> Result<String, String> {
250        let props = &resource.properties;
251        let name = props
252            .get("Name")
253            .and_then(|v| v.as_str())
254            .ok_or("SSM Parameter requires Name")?;
255        let value = props
256            .get("Value")
257            .and_then(|v| v.as_str())
258            .ok_or("SSM Parameter requires Value")?;
259        let param_type = props
260            .get("Type")
261            .and_then(|v| v.as_str())
262            .unwrap_or("String");
263
264        let mut state = self.ssm_state.write();
265        let arn = format!(
266            "arn:aws:ssm:{}:{}:parameter{}",
267            state.region,
268            state.account_id,
269            if name.starts_with('/') {
270                name.to_string()
271            } else {
272                format!("/{name}")
273            }
274        );
275
276        let parameter = SsmParameter {
277            name: name.to_string(),
278            value: value.to_string(),
279            param_type: param_type.to_string(),
280            version: 1,
281            arn: arn.clone(),
282            last_modified: Utc::now(),
283            history: Vec::new(),
284            tags: HashMap::new(),
285            labels: HashMap::new(),
286            description: props
287                .get("Description")
288                .and_then(|v| v.as_str())
289                .map(|s| s.to_string()),
290            allowed_pattern: None,
291            key_id: None,
292            data_type: "text".to_string(),
293            tier: "Standard".to_string(),
294            policies: None,
295        };
296
297        state.parameters.insert(name.to_string(), parameter);
298        Ok(name.to_string())
299    }
300
301    fn delete_ssm_parameter(&self, physical_id: &str) -> Result<(), String> {
302        let mut state = self.ssm_state.write();
303        state.parameters.remove(physical_id);
304        Ok(())
305    }
306
307    // --- IAM Role ---
308
309    fn create_iam_role(&self, resource: &ResourceDefinition) -> Result<String, String> {
310        let props = &resource.properties;
311        let role_name = props
312            .get("RoleName")
313            .and_then(|v| v.as_str())
314            .unwrap_or(&resource.logical_id);
315
316        let assume_role_policy = props
317            .get("AssumeRolePolicyDocument")
318            .map(|v| {
319                if v.is_string() {
320                    v.as_str().unwrap().to_string()
321                } else {
322                    serde_json::to_string(v).unwrap_or_default()
323                }
324            })
325            .unwrap_or_default();
326
327        let path = props.get("Path").and_then(|v| v.as_str()).unwrap_or("/");
328
329        let mut state = self.iam_state.write();
330        let role_id = format!(
331            "FKIA{}",
332            &Uuid::new_v4().to_string().replace('-', "").to_uppercase()[..16]
333        );
334        let arn = format!(
335            "arn:aws:iam::{}:role{}{}",
336            state.account_id,
337            if path == "/" { "/" } else { path },
338            role_name
339        );
340
341        let role = IamRole {
342            role_name: role_name.to_string(),
343            role_id,
344            arn: arn.clone(),
345            path: path.to_string(),
346            assume_role_policy_document: assume_role_policy,
347            created_at: Utc::now(),
348            description: props
349                .get("Description")
350                .and_then(|v| v.as_str())
351                .map(|s| s.to_string()),
352            max_session_duration: 3600,
353            tags: Vec::new(),
354            permissions_boundary: None,
355        };
356
357        state.roles.insert(role_name.to_string(), role);
358        Ok(arn)
359    }
360
361    fn delete_iam_role(&self, physical_id: &str) -> Result<(), String> {
362        let mut state = self.iam_state.write();
363        // physical_id is the ARN; find the role name
364        let role_name = state
365            .roles
366            .iter()
367            .find(|(_, r)| r.arn == physical_id)
368            .map(|(name, _)| name.clone());
369        if let Some(name) = role_name {
370            state.roles.remove(&name);
371            state.role_policies.remove(&name);
372            state.role_inline_policies.remove(&name);
373        }
374        Ok(())
375    }
376
377    // --- IAM Policy ---
378
379    fn create_iam_policy(&self, resource: &ResourceDefinition) -> Result<String, String> {
380        let props = &resource.properties;
381        let policy_name = props
382            .get("PolicyName")
383            .and_then(|v| v.as_str())
384            .unwrap_or(&resource.logical_id);
385
386        let policy_document = props
387            .get("PolicyDocument")
388            .map(|v| {
389                if v.is_string() {
390                    v.as_str().unwrap().to_string()
391                } else {
392                    serde_json::to_string(v).unwrap_or_default()
393                }
394            })
395            .unwrap_or_default();
396
397        let path = props.get("Path").and_then(|v| v.as_str()).unwrap_or("/");
398
399        let mut state = self.iam_state.write();
400        let policy_id = format!(
401            "FSIA{}",
402            &Uuid::new_v4().to_string().replace('-', "").to_uppercase()[..16]
403        );
404        let arn = format!(
405            "arn:aws:iam::{}:policy{}{}",
406            state.account_id,
407            if path == "/" { "/" } else { path },
408            policy_name
409        );
410
411        let now = Utc::now();
412        let policy = IamPolicy {
413            policy_name: policy_name.to_string(),
414            policy_id,
415            arn: arn.clone(),
416            path: path.to_string(),
417            description: props
418                .get("Description")
419                .and_then(|v| v.as_str())
420                .unwrap_or("")
421                .to_string(),
422            created_at: now,
423            tags: Vec::new(),
424            default_version_id: "v1".to_string(),
425            versions: vec![PolicyVersion {
426                version_id: "v1".to_string(),
427                document: policy_document,
428                is_default: true,
429                created_at: now,
430            }],
431            next_version_num: 2,
432            attachment_count: 0,
433        };
434
435        state.policies.insert(arn.clone(), policy);
436        Ok(arn)
437    }
438
439    fn delete_iam_policy(&self, physical_id: &str) -> Result<(), String> {
440        let mut state = self.iam_state.write();
441        state.policies.remove(physical_id);
442        Ok(())
443    }
444
445    // --- S3 ---
446
447    fn create_s3_bucket(&self, resource: &ResourceDefinition) -> Result<String, String> {
448        let props = &resource.properties;
449        let bucket_name = props
450            .get("BucketName")
451            .and_then(|v| v.as_str())
452            .unwrap_or(&resource.logical_id);
453
454        let mut state = self.s3_state.write();
455        let bucket = S3Bucket::new(bucket_name, &state.region, &state.account_id);
456        state.buckets.insert(bucket_name.to_string(), bucket);
457        Ok(bucket_name.to_string())
458    }
459
460    fn delete_s3_bucket(&self, physical_id: &str) -> Result<(), String> {
461        let mut state = self.s3_state.write();
462        state.buckets.remove(physical_id);
463        Ok(())
464    }
465
466    // --- EventBridge ---
467
468    fn create_eventbridge_rule(&self, resource: &ResourceDefinition) -> Result<String, String> {
469        let props = &resource.properties;
470        let rule_name = props
471            .get("Name")
472            .and_then(|v| v.as_str())
473            .unwrap_or(&resource.logical_id);
474        let event_bus_name = props
475            .get("EventBusName")
476            .and_then(|v| v.as_str())
477            .unwrap_or("default");
478
479        let mut state = self.eventbridge_state.write();
480
481        // Validate that the event bus exists
482        if !state.buses.contains_key(event_bus_name) {
483            return Err(format!("Event bus does not exist: {event_bus_name}"));
484        }
485
486        let arn = if event_bus_name == "default" {
487            format!(
488                "arn:aws:events:{}:{}:rule/{}",
489                state.region, state.account_id, rule_name
490            )
491        } else {
492            format!(
493                "arn:aws:events:{}:{}:rule/{}/{}",
494                state.region, state.account_id, event_bus_name, rule_name
495            )
496        };
497
498        let rule = EventRule {
499            name: rule_name.to_string(),
500            arn: arn.clone(),
501            event_bus_name: event_bus_name.to_string(),
502            event_pattern: props.get("EventPattern").map(|v| {
503                if v.is_string() {
504                    v.as_str().unwrap().to_string()
505                } else {
506                    serde_json::to_string(v).unwrap_or_default()
507                }
508            }),
509            schedule_expression: props
510                .get("ScheduleExpression")
511                .and_then(|v| v.as_str())
512                .map(|s| s.to_string()),
513            state: props
514                .get("State")
515                .and_then(|v| v.as_str())
516                .unwrap_or("ENABLED")
517                .to_string(),
518            description: props
519                .get("Description")
520                .and_then(|v| v.as_str())
521                .map(|s| s.to_string()),
522            role_arn: props
523                .get("RoleArn")
524                .and_then(|v| v.as_str())
525                .map(|s| s.to_string()),
526            managed_by: None,
527            created_by: None,
528            targets: Vec::new(),
529            tags: HashMap::new(),
530            last_fired: None,
531        };
532
533        state
534            .rules
535            .insert((event_bus_name.to_string(), rule_name.to_string()), rule);
536        Ok(arn)
537    }
538
539    fn delete_eventbridge_rule(&self, physical_id: &str) -> Result<(), String> {
540        let mut state = self.eventbridge_state.write();
541        // physical_id is the ARN; find the rule key
542        let key = state
543            .rules
544            .iter()
545            .find(|(_, r)| r.arn == physical_id)
546            .map(|(k, _)| k.clone());
547        if let Some(k) = key {
548            state.rules.remove(&k);
549        }
550        Ok(())
551    }
552
553    // --- DynamoDB ---
554
555    fn create_dynamodb_table(&self, resource: &ResourceDefinition) -> Result<String, String> {
556        let props = &resource.properties;
557        let table_name = props
558            .get("TableName")
559            .and_then(|v| v.as_str())
560            .unwrap_or(&resource.logical_id);
561
562        let mut key_schema = Vec::new();
563        if let Some(ks) = props.get("KeySchema").and_then(|v| v.as_array()) {
564            for item in ks {
565                let attr_name = item
566                    .get("AttributeName")
567                    .and_then(|v| v.as_str())
568                    .unwrap_or("")
569                    .to_string();
570                let key_type = item
571                    .get("KeyType")
572                    .and_then(|v| v.as_str())
573                    .unwrap_or("HASH")
574                    .to_string();
575                key_schema.push(KeySchemaElement {
576                    attribute_name: attr_name,
577                    key_type,
578                });
579            }
580        }
581
582        let mut attribute_definitions = Vec::new();
583        if let Some(defs) = props.get("AttributeDefinitions").and_then(|v| v.as_array()) {
584            for item in defs {
585                let attr_name = item
586                    .get("AttributeName")
587                    .and_then(|v| v.as_str())
588                    .unwrap_or("")
589                    .to_string();
590                let attr_type = item
591                    .get("AttributeType")
592                    .and_then(|v| v.as_str())
593                    .unwrap_or("S")
594                    .to_string();
595                attribute_definitions.push(AttributeDefinition {
596                    attribute_name: attr_name,
597                    attribute_type: attr_type,
598                });
599            }
600        }
601
602        let billing_mode = props
603            .get("BillingMode")
604            .and_then(|v| v.as_str())
605            .unwrap_or("PAY_PER_REQUEST")
606            .to_string();
607
608        let provisioned_throughput = if billing_mode == "PROVISIONED" {
609            if let Some(pt) = props.get("ProvisionedThroughput") {
610                ProvisionedThroughput {
611                    read_capacity_units: pt
612                        .get("ReadCapacityUnits")
613                        .and_then(|v| v.as_i64())
614                        .unwrap_or(5),
615                    write_capacity_units: pt
616                        .get("WriteCapacityUnits")
617                        .and_then(|v| v.as_i64())
618                        .unwrap_or(5),
619                }
620            } else {
621                ProvisionedThroughput {
622                    read_capacity_units: 5,
623                    write_capacity_units: 5,
624                }
625            }
626        } else {
627            ProvisionedThroughput {
628                read_capacity_units: 0,
629                write_capacity_units: 0,
630            }
631        };
632
633        let mut state = self.dynamodb_state.write();
634        let arn = format!(
635            "arn:aws:dynamodb:{}:{}:table/{}",
636            state.region, state.account_id, table_name
637        );
638
639        let table = DynamoTable {
640            name: table_name.to_string(),
641            arn: arn.clone(),
642            key_schema,
643            attribute_definitions,
644            provisioned_throughput,
645            items: Vec::new(),
646            gsi: Vec::new(),
647            lsi: Vec::new(),
648            tags: HashMap::new(),
649            created_at: Utc::now(),
650            status: "ACTIVE".to_string(),
651            item_count: 0,
652            size_bytes: 0,
653            billing_mode,
654            ttl_attribute: None,
655            ttl_enabled: false,
656            resource_policy: None,
657            pitr_enabled: false,
658            kinesis_destinations: Vec::new(),
659            contributor_insights_status: "DISABLED".to_string(),
660            contributor_insights_counters: HashMap::new(),
661            stream_enabled: false,
662            stream_view_type: None,
663            stream_arn: None,
664            stream_records: Arc::new(RwLock::new(Vec::new())),
665            sse_type: None,
666            sse_kms_key_arn: None,
667        };
668
669        state.tables.insert(table_name.to_string(), table);
670        Ok(arn)
671    }
672
673    fn delete_dynamodb_table(&self, physical_id: &str) -> Result<(), String> {
674        let mut state = self.dynamodb_state.write();
675        // physical_id is the ARN; find the table name
676        let table_name = state
677            .tables
678            .iter()
679            .find(|(_, t)| t.arn == physical_id)
680            .map(|(name, _)| name.clone());
681        if let Some(name) = table_name {
682            state.tables.remove(&name);
683        }
684        Ok(())
685    }
686
687    // --- CloudWatch Logs ---
688
689    fn create_log_group(&self, resource: &ResourceDefinition) -> Result<String, String> {
690        let props = &resource.properties;
691        let log_group_name = props
692            .get("LogGroupName")
693            .and_then(|v| v.as_str())
694            .unwrap_or(&resource.logical_id);
695
696        let retention_in_days = props
697            .get("RetentionInDays")
698            .and_then(|v| v.as_i64())
699            .map(|v| v as i32);
700
701        let mut state = self.logs_state.write();
702        let arn = format!(
703            "arn:aws:logs:{}:{}:log-group:{}:*",
704            state.region, state.account_id, log_group_name
705        );
706
707        let log_group = fakecloud_logs::state::LogGroup {
708            name: log_group_name.to_string(),
709            arn: arn.clone(),
710            creation_time: Utc::now().timestamp_millis(),
711            retention_in_days,
712            kms_key_id: None,
713            stored_bytes: 0,
714            log_streams: HashMap::new(),
715            tags: HashMap::new(),
716            subscription_filters: Vec::new(),
717            data_protection_policy: None,
718            index_policies: Vec::new(),
719            transformer: None,
720            deletion_protection: false,
721        };
722
723        state
724            .log_groups
725            .insert(log_group_name.to_string(), log_group);
726        Ok(arn)
727    }
728
729    fn delete_log_group(&self, physical_id: &str) -> Result<(), String> {
730        let mut state = self.logs_state.write();
731        // physical_id is the ARN; find the log group name
732        let name = state
733            .log_groups
734            .iter()
735            .find(|(_, g)| g.arn == physical_id)
736            .map(|(name, _)| name.clone());
737        if let Some(name) = name {
738            state.log_groups.remove(&name);
739        }
740        Ok(())
741    }
742
743    // --- Custom Resources ---
744
745    /// Invoke a Lambda function synchronously via the delivery bus.
746    fn invoke_lambda_sync(&self, function_arn: &str, payload: &str) -> Result<(), String> {
747        let delivery = self.delivery.clone();
748        let function_arn = function_arn.to_string();
749        let payload = payload.to_string();
750        std::thread::scope(|s| {
751            s.spawn(|| {
752                let rt = tokio::runtime::Builder::new_current_thread()
753                    .enable_all()
754                    .build()
755                    .map_err(|e| format!("Failed to create runtime: {e}"))?;
756                rt.block_on(async {
757                    match delivery.invoke_lambda(&function_arn, &payload).await {
758                        Some(Ok(_)) => {
759                            tracing::info!(
760                                "Custom resource Lambda {} invoked successfully",
761                                function_arn
762                            );
763                            Ok(())
764                        }
765                        Some(Err(e)) => {
766                            tracing::warn!(
767                                "Custom resource Lambda {} invocation failed: {e}",
768                                function_arn
769                            );
770                            Err(format!("Lambda invocation failed: {e}"))
771                        }
772                        None => {
773                            tracing::warn!(
774                                "No Lambda delivery configured; skipping custom resource invocation for {}",
775                                function_arn
776                            );
777                            Ok(())
778                        }
779                    }
780                })
781            })
782            .join()
783            .map_err(|_| "Lambda invocation thread panicked".to_string())?
784        })
785    }
786
787    fn create_custom_resource(&self, resource: &ResourceDefinition) -> Result<String, String> {
788        let props = &resource.properties;
789        let service_token = props
790            .get("ServiceToken")
791            .and_then(|v| v.as_str())
792            .ok_or("Custom resource requires ServiceToken property")?;
793
794        let request_id = Uuid::new_v4().to_string();
795
796        // Build the CloudFormation custom resource event
797        let event = serde_json::json!({
798            "RequestType": "Create",
799            "ServiceToken": service_token,
800            "StackId": self.stack_id,
801            "RequestId": request_id,
802            "ResourceType": resource.resource_type,
803            "LogicalResourceId": resource.logical_id,
804            "ResourceProperties": props,
805        });
806
807        let payload = serde_json::to_string(&event).map_err(|e| e.to_string())?;
808        self.invoke_lambda_sync(service_token, &payload)?;
809
810        // Physical resource ID: use a generated ID (the Lambda could return one,
811        // but for simplicity we generate one here).
812        let physical_id = format!("{}-{}", resource.logical_id, &request_id[..8]);
813        Ok(physical_id)
814    }
815
816    fn delete_custom_resource(&self, resource: &StackResource) -> Result<(), String> {
817        let service_token = match &resource.service_token {
818            Some(token) => token.clone(),
819            None => {
820                // No ServiceToken stored — nothing to invoke
821                return Ok(());
822            }
823        };
824
825        let request_id = Uuid::new_v4().to_string();
826
827        let event = serde_json::json!({
828            "RequestType": "Delete",
829            "ServiceToken": service_token,
830            "StackId": self.stack_id,
831            "RequestId": request_id,
832            "ResourceType": resource.resource_type,
833            "LogicalResourceId": resource.logical_id,
834            "PhysicalResourceId": resource.physical_id,
835        });
836
837        let payload = serde_json::to_string(&event).map_err(|e| e.to_string())?;
838
839        // Best-effort: don't fail stack deletion if Lambda invocation fails
840        if let Err(e) = self.invoke_lambda_sync(&service_token, &payload) {
841            tracing::warn!(
842                "Custom resource delete Lambda invocation failed for {}: {e}",
843                resource.logical_id
844            );
845        }
846        Ok(())
847    }
848}
849
850#[cfg(test)]
851mod tests {
852    use super::*;
853    use parking_lot::RwLock;
854
855    fn make_provisioner() -> ResourceProvisioner {
856        ResourceProvisioner {
857            sqs_state: Arc::new(RwLock::new(fakecloud_sqs::state::SqsState::new(
858                "123456789012",
859                "us-east-1",
860                "http://localhost:4566",
861            ))),
862            sns_state: Arc::new(RwLock::new(fakecloud_sns::state::SnsState::new(
863                "123456789012",
864                "us-east-1",
865                "http://localhost:4566",
866            ))),
867            ssm_state: Arc::new(RwLock::new(fakecloud_ssm::state::SsmState::new(
868                "123456789012",
869                "us-east-1",
870            ))),
871            iam_state: Arc::new(RwLock::new(fakecloud_iam::state::IamState::new(
872                "123456789012",
873            ))),
874            s3_state: Arc::new(RwLock::new(fakecloud_s3::state::S3State::new(
875                "123456789012",
876                "us-east-1",
877            ))),
878            eventbridge_state: Arc::new(RwLock::new(
879                fakecloud_eventbridge::state::EventBridgeState::new("123456789012", "us-east-1"),
880            )),
881            dynamodb_state: Arc::new(RwLock::new(fakecloud_dynamodb::state::DynamoDbState::new(
882                "123456789012",
883                "us-east-1",
884            ))),
885            logs_state: Arc::new(RwLock::new(fakecloud_logs::state::LogsState::new(
886                "123456789012",
887                "us-east-1",
888            ))),
889            delivery: Arc::new(DeliveryBus::new()),
890            account_id: "123456789012".to_string(),
891            region: "us-east-1".to_string(),
892            stack_id: "arn:aws:cloudformation:us-east-1:123456789012:stack/test/00000000-0000-0000-0000-000000000000".to_string(),
893        }
894    }
895
896    fn make_resource(
897        resource_type: &str,
898        logical_id: &str,
899        props: serde_json::Value,
900    ) -> ResourceDefinition {
901        ResourceDefinition {
902            logical_id: logical_id.to_string(),
903            resource_type: resource_type.to_string(),
904            properties: props,
905        }
906    }
907
908    #[test]
909    fn sns_subscription_rejects_nonexistent_topic() {
910        let prov = make_provisioner();
911        let resource = make_resource(
912            "AWS::SNS::Subscription",
913            "MySub",
914            serde_json::json!({
915                "TopicArn": "arn:aws:sns:us-east-1:123456789012:NonExistent",
916                "Protocol": "sqs",
917                "Endpoint": "arn:aws:sqs:us-east-1:123456789012:my-queue"
918            }),
919        );
920        let result = prov.create_resource(&resource);
921        assert!(result.is_err());
922        assert!(result.unwrap_err().contains("does not exist"));
923    }
924
925    #[test]
926    fn sns_subscription_succeeds_when_topic_exists() {
927        let prov = make_provisioner();
928        // First create the topic
929        let topic = make_resource(
930            "AWS::SNS::Topic",
931            "MyTopic",
932            serde_json::json!({ "TopicName": "my-topic" }),
933        );
934        let topic_result = prov.create_resource(&topic);
935        assert!(topic_result.is_ok());
936        let topic_arn = topic_result.unwrap().physical_id;
937
938        // Now create subscription referencing that topic
939        let sub = make_resource(
940            "AWS::SNS::Subscription",
941            "MySub",
942            serde_json::json!({
943                "TopicArn": topic_arn,
944                "Protocol": "sqs",
945                "Endpoint": "arn:aws:sqs:us-east-1:123456789012:my-queue"
946            }),
947        );
948        let result = prov.create_resource(&sub);
949        assert!(result.is_ok());
950    }
951
952    #[test]
953    fn eventbridge_rule_arn_default_bus_omits_bus_name() {
954        let prov = make_provisioner();
955        let resource = make_resource(
956            "AWS::Events::Rule",
957            "MyRule",
958            serde_json::json!({
959                "Name": "my-rule",
960                "ScheduleExpression": "rate(1 hour)"
961            }),
962        );
963        let result = prov.create_resource(&resource).unwrap();
964        // For default bus, ARN should be rule/<name> without /default/
965        assert_eq!(
966            result.physical_id,
967            "arn:aws:events:us-east-1:123456789012:rule/my-rule"
968        );
969        assert!(!result.physical_id.contains("rule/default/"));
970    }
971
972    #[test]
973    fn eventbridge_rule_arn_custom_bus_includes_bus_name() {
974        let prov = make_provisioner();
975        // Create a custom bus first
976        {
977            let mut state = prov.eventbridge_state.write();
978            state.buses.insert(
979                "custom-bus".to_string(),
980                fakecloud_eventbridge::state::EventBus {
981                    name: "custom-bus".to_string(),
982                    arn: "arn:aws:events:us-east-1:123456789012:event-bus/custom-bus".to_string(),
983                    policy: None,
984                    creation_time: Utc::now(),
985                    last_modified_time: Utc::now(),
986                    description: None,
987                    kms_key_identifier: None,
988                    dead_letter_config: None,
989                    tags: HashMap::new(),
990                },
991            );
992        }
993        let resource = make_resource(
994            "AWS::Events::Rule",
995            "MyRule",
996            serde_json::json!({
997                "Name": "my-rule",
998                "EventBusName": "custom-bus",
999                "ScheduleExpression": "rate(1 hour)"
1000            }),
1001        );
1002        let result = prov.create_resource(&resource).unwrap();
1003        assert_eq!(
1004            result.physical_id,
1005            "arn:aws:events:us-east-1:123456789012:rule/custom-bus/my-rule"
1006        );
1007    }
1008
1009    #[test]
1010    fn eventbridge_rule_rejects_nonexistent_bus() {
1011        let prov = make_provisioner();
1012        let resource = make_resource(
1013            "AWS::Events::Rule",
1014            "MyRule",
1015            serde_json::json!({
1016                "Name": "my-rule",
1017                "EventBusName": "nonexistent-bus",
1018                "ScheduleExpression": "rate(1 hour)"
1019            }),
1020        );
1021        let result = prov.create_resource(&resource);
1022        assert!(result.is_err());
1023        assert!(result.unwrap_err().contains("does not exist"));
1024    }
1025
1026    #[test]
1027    fn custom_resource_requires_service_token() {
1028        let prov = make_provisioner();
1029        let resource = make_resource(
1030            "Custom::MyResource",
1031            "MyCustom",
1032            serde_json::json!({
1033                "Foo": "bar"
1034            }),
1035        );
1036        let result = prov.create_resource(&resource);
1037        assert!(result.is_err());
1038        assert!(
1039            result.unwrap_err().contains("ServiceToken"),
1040            "Should require ServiceToken property"
1041        );
1042    }
1043
1044    #[test]
1045    fn custom_resource_succeeds_without_lambda_delivery() {
1046        // When no Lambda delivery is configured, custom resource creation
1047        // should still succeed (the invocation is silently skipped).
1048        let prov = make_provisioner();
1049        let resource = make_resource(
1050            "Custom::MyResource",
1051            "MyCustom",
1052            serde_json::json!({
1053                "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:my-func",
1054                "Foo": "bar"
1055            }),
1056        );
1057        let result = prov.create_resource(&resource);
1058        assert!(result.is_ok());
1059        let sr = result.unwrap();
1060        assert_eq!(sr.logical_id, "MyCustom");
1061        assert_eq!(sr.resource_type, "Custom::MyResource");
1062        assert!(sr.physical_id.starts_with("MyCustom-"));
1063    }
1064
1065    #[test]
1066    fn cloudformation_custom_resource_type_succeeds() {
1067        let prov = make_provisioner();
1068        let resource = make_resource(
1069            "AWS::CloudFormation::CustomResource",
1070            "MyCustom2",
1071            serde_json::json!({
1072                "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:my-func",
1073                "Key": "value"
1074            }),
1075        );
1076        let result = prov.create_resource(&resource);
1077        assert!(result.is_ok());
1078        let sr = result.unwrap();
1079        assert_eq!(sr.resource_type, "AWS::CloudFormation::CustomResource");
1080    }
1081}