Skip to main content

fakecloud_cloudformation/
resource_provisioner.rs

1use chrono::Utc;
2use std::collections::HashMap;
3use std::sync::Arc;
4use uuid::Uuid;
5
6use fakecloud_core::delivery::DeliveryBus;
7use fakecloud_dynamodb::state::{
8    AttributeDefinition, DynamoTable, KeySchemaElement, ProvisionedThroughput, SharedDynamoDbState,
9};
10use fakecloud_eventbridge::state::{EventRule, SharedEventBridgeState};
11use fakecloud_iam::state::{IamPolicy, IamRole, PolicyVersion, SharedIamState};
12use fakecloud_logs::state::SharedLogsState;
13use fakecloud_s3::state::{S3Bucket, SharedS3State};
14use fakecloud_sns::state::{SharedSnsState, SnsSubscription, SnsTopic};
15use fakecloud_sqs::state::{SharedSqsState, SqsQueue};
16use fakecloud_ssm::state::{SharedSsmState, SsmParameter};
17
18use crate::state::StackResource;
19use crate::template::ResourceDefinition;
20
21/// Holds references to all service states so CloudFormation can provision resources.
22pub struct ResourceProvisioner {
23    pub sqs_state: SharedSqsState,
24    pub sns_state: SharedSnsState,
25    pub ssm_state: SharedSsmState,
26    pub iam_state: SharedIamState,
27    pub s3_state: SharedS3State,
28    pub eventbridge_state: SharedEventBridgeState,
29    pub dynamodb_state: SharedDynamoDbState,
30    pub logs_state: SharedLogsState,
31    pub delivery: Arc<DeliveryBus>,
32    pub account_id: String,
33    pub region: String,
34    pub stack_id: String,
35}
36
37impl ResourceProvisioner {
38    /// Create a resource and return the StackResource with physical ID.
39    pub fn create_resource(&self, resource: &ResourceDefinition) -> Result<StackResource, String> {
40        let result = match resource.resource_type.as_str() {
41            "AWS::SQS::Queue" => self.create_sqs_queue(resource),
42            "AWS::SNS::Topic" => self.create_sns_topic(resource),
43            "AWS::SNS::Subscription" => self.create_sns_subscription(resource),
44            "AWS::SSM::Parameter" => self.create_ssm_parameter(resource),
45            "AWS::IAM::Role" => self.create_iam_role(resource),
46            "AWS::IAM::Policy" => self.create_iam_policy(resource),
47            "AWS::S3::Bucket" => self.create_s3_bucket(resource),
48            "AWS::Events::Rule" => self.create_eventbridge_rule(resource),
49            "AWS::DynamoDB::Table" => self.create_dynamodb_table(resource),
50            "AWS::Logs::LogGroup" => self.create_log_group(resource),
51            t if t.starts_with("Custom::") || t == "AWS::CloudFormation::CustomResource" => {
52                self.create_custom_resource(resource)
53            }
54            other => Err(format!("Unsupported resource type: {other}")),
55        };
56
57        let is_custom = resource.resource_type.starts_with("Custom::")
58            || resource.resource_type == "AWS::CloudFormation::CustomResource";
59        let service_token = if is_custom {
60            resource
61                .properties
62                .get("ServiceToken")
63                .and_then(|v| v.as_str())
64                .map(|s| s.to_string())
65        } else {
66            None
67        };
68
69        result.map(|physical_id| StackResource {
70            logical_id: resource.logical_id.clone(),
71            physical_id,
72            resource_type: resource.resource_type.clone(),
73            status: "CREATE_COMPLETE".to_string(),
74            service_token,
75        })
76    }
77
78    /// Delete a previously created resource.
79    pub fn delete_resource(&self, resource: &StackResource) -> Result<(), String> {
80        match resource.resource_type.as_str() {
81            "AWS::SQS::Queue" => self.delete_sqs_queue(&resource.physical_id),
82            "AWS::SNS::Topic" => self.delete_sns_topic(&resource.physical_id),
83            "AWS::SNS::Subscription" => self.delete_sns_subscription(&resource.physical_id),
84            "AWS::SSM::Parameter" => self.delete_ssm_parameter(&resource.physical_id),
85            "AWS::IAM::Role" => self.delete_iam_role(&resource.physical_id),
86            "AWS::IAM::Policy" => self.delete_iam_policy(&resource.physical_id),
87            "AWS::S3::Bucket" => self.delete_s3_bucket(&resource.physical_id),
88            "AWS::Events::Rule" => self.delete_eventbridge_rule(&resource.physical_id),
89            "AWS::DynamoDB::Table" => self.delete_dynamodb_table(&resource.physical_id),
90            "AWS::Logs::LogGroup" => self.delete_log_group(&resource.physical_id),
91            t if t.starts_with("Custom::") || t == "AWS::CloudFormation::CustomResource" => {
92                self.delete_custom_resource(resource)
93            }
94            other => Err(format!("Unsupported resource type: {other}")),
95        }
96    }
97
98    // --- SQS ---
99
100    fn create_sqs_queue(&self, resource: &ResourceDefinition) -> Result<String, String> {
101        let props = &resource.properties;
102        let queue_name = props
103            .get("QueueName")
104            .and_then(|v| v.as_str())
105            .unwrap_or(&resource.logical_id);
106
107        let mut state = self.sqs_state.write();
108        let queue_url = format!("{}/{}/{}", state.endpoint, state.account_id, queue_name);
109        let arn = format!(
110            "arn:aws:sqs:{}:{}:{}",
111            state.region, state.account_id, queue_name
112        );
113
114        let is_fifo = queue_name.ends_with(".fifo");
115        let mut attributes = HashMap::new();
116        if let Some(obj) = props.as_object() {
117            for (k, v) in obj {
118                if k != "QueueName" {
119                    if let Some(s) = v.as_str() {
120                        attributes.insert(k.clone(), s.to_string());
121                    } else if let Some(n) = v.as_i64() {
122                        attributes.insert(k.clone(), n.to_string());
123                    }
124                }
125            }
126        }
127
128        let queue = SqsQueue {
129            queue_name: queue_name.to_string(),
130            queue_url: queue_url.clone(),
131            arn,
132            created_at: Utc::now(),
133            messages: std::collections::VecDeque::new(),
134            inflight: Vec::new(),
135            attributes,
136            is_fifo,
137            dedup_cache: HashMap::new(),
138            redrive_policy: None,
139            tags: HashMap::new(),
140            next_sequence_number: 0,
141            permission_labels: Vec::new(),
142            receipt_handle_map: HashMap::new(),
143        };
144
145        state
146            .name_to_url
147            .insert(queue_name.to_string(), queue_url.clone());
148        state.queues.insert(queue_url.clone(), queue);
149
150        Ok(queue_url)
151    }
152
153    fn delete_sqs_queue(&self, physical_id: &str) -> Result<(), String> {
154        let mut state = self.sqs_state.write();
155        if let Some(queue) = state.queues.remove(physical_id) {
156            state.name_to_url.remove(&queue.queue_name);
157        }
158        Ok(())
159    }
160
161    // --- SNS ---
162
163    fn create_sns_topic(&self, resource: &ResourceDefinition) -> Result<String, String> {
164        let props = &resource.properties;
165        let topic_name = props
166            .get("TopicName")
167            .and_then(|v| v.as_str())
168            .unwrap_or(&resource.logical_id);
169
170        let mut state = self.sns_state.write();
171        let topic_arn = format!(
172            "arn:aws:sns:{}:{}:{}",
173            state.region, state.account_id, topic_name
174        );
175
176        let topic = SnsTopic {
177            topic_arn: topic_arn.clone(),
178            name: topic_name.to_string(),
179            attributes: HashMap::new(),
180            tags: Vec::new(),
181            is_fifo: topic_name.ends_with(".fifo"),
182            created_at: Utc::now(),
183        };
184
185        state.topics.insert(topic_arn.clone(), topic);
186        Ok(topic_arn)
187    }
188
189    fn delete_sns_topic(&self, physical_id: &str) -> Result<(), String> {
190        let mut state = self.sns_state.write();
191        state.topics.remove(physical_id);
192        // Also remove subscriptions for this topic
193        state
194            .subscriptions
195            .retain(|_, sub| sub.topic_arn != physical_id);
196        Ok(())
197    }
198
199    // --- SNS Subscription ---
200
201    fn create_sns_subscription(&self, resource: &ResourceDefinition) -> Result<String, String> {
202        let props = &resource.properties;
203        let topic_arn = props
204            .get("TopicArn")
205            .and_then(|v| v.as_str())
206            .ok_or("SNS Subscription requires TopicArn")?;
207        let protocol = props
208            .get("Protocol")
209            .and_then(|v| v.as_str())
210            .ok_or("SNS Subscription requires Protocol")?;
211        let endpoint = props
212            .get("Endpoint")
213            .and_then(|v| v.as_str())
214            .ok_or("SNS Subscription requires Endpoint")?;
215
216        let mut state = self.sns_state.write();
217
218        // Validate that the topic exists
219        if !state.topics.contains_key(topic_arn) {
220            return Err(format!("Topic ARN does not exist: {topic_arn}"));
221        }
222
223        let sub_arn = format!("{}:{}", topic_arn, Uuid::new_v4());
224
225        let subscription = SnsSubscription {
226            subscription_arn: sub_arn.clone(),
227            topic_arn: topic_arn.to_string(),
228            protocol: protocol.to_string(),
229            endpoint: endpoint.to_string(),
230            owner: state.account_id.clone(),
231            attributes: HashMap::new(),
232            confirmed: true,
233            confirmation_token: None,
234        };
235
236        state.subscriptions.insert(sub_arn.clone(), subscription);
237        Ok(sub_arn)
238    }
239
240    fn delete_sns_subscription(&self, physical_id: &str) -> Result<(), String> {
241        let mut state = self.sns_state.write();
242        state.subscriptions.remove(physical_id);
243        Ok(())
244    }
245
246    // --- SSM ---
247
248    fn create_ssm_parameter(&self, resource: &ResourceDefinition) -> Result<String, String> {
249        let props = &resource.properties;
250        let name = props
251            .get("Name")
252            .and_then(|v| v.as_str())
253            .ok_or("SSM Parameter requires Name")?;
254        let value = props
255            .get("Value")
256            .and_then(|v| v.as_str())
257            .ok_or("SSM Parameter requires Value")?;
258        let param_type = props
259            .get("Type")
260            .and_then(|v| v.as_str())
261            .unwrap_or("String");
262
263        let mut state = self.ssm_state.write();
264        let arn = format!(
265            "arn:aws:ssm:{}:{}:parameter{}",
266            state.region,
267            state.account_id,
268            if name.starts_with('/') {
269                name.to_string()
270            } else {
271                format!("/{name}")
272            }
273        );
274
275        let parameter = SsmParameter {
276            name: name.to_string(),
277            value: value.to_string(),
278            param_type: param_type.to_string(),
279            version: 1,
280            arn: arn.clone(),
281            last_modified: Utc::now(),
282            history: Vec::new(),
283            tags: HashMap::new(),
284            labels: HashMap::new(),
285            description: props
286                .get("Description")
287                .and_then(|v| v.as_str())
288                .map(|s| s.to_string()),
289            allowed_pattern: None,
290            key_id: None,
291            data_type: "text".to_string(),
292            tier: "Standard".to_string(),
293            policies: None,
294        };
295
296        state.parameters.insert(name.to_string(), parameter);
297        Ok(name.to_string())
298    }
299
300    fn delete_ssm_parameter(&self, physical_id: &str) -> Result<(), String> {
301        let mut state = self.ssm_state.write();
302        state.parameters.remove(physical_id);
303        Ok(())
304    }
305
306    // --- IAM Role ---
307
308    fn create_iam_role(&self, resource: &ResourceDefinition) -> Result<String, String> {
309        let props = &resource.properties;
310        let role_name = props
311            .get("RoleName")
312            .and_then(|v| v.as_str())
313            .unwrap_or(&resource.logical_id);
314
315        let assume_role_policy = props
316            .get("AssumeRolePolicyDocument")
317            .map(|v| {
318                if v.is_string() {
319                    v.as_str().unwrap().to_string()
320                } else {
321                    serde_json::to_string(v).unwrap_or_default()
322                }
323            })
324            .unwrap_or_default();
325
326        let path = props.get("Path").and_then(|v| v.as_str()).unwrap_or("/");
327
328        let mut state = self.iam_state.write();
329        let role_id = format!(
330            "FKIA{}",
331            &Uuid::new_v4().to_string().replace('-', "").to_uppercase()[..16]
332        );
333        let arn = format!(
334            "arn:aws:iam::{}:role{}{}",
335            state.account_id,
336            if path == "/" { "/" } else { path },
337            role_name
338        );
339
340        let role = IamRole {
341            role_name: role_name.to_string(),
342            role_id,
343            arn: arn.clone(),
344            path: path.to_string(),
345            assume_role_policy_document: assume_role_policy,
346            created_at: Utc::now(),
347            description: props
348                .get("Description")
349                .and_then(|v| v.as_str())
350                .map(|s| s.to_string()),
351            max_session_duration: 3600,
352            tags: Vec::new(),
353            permissions_boundary: None,
354        };
355
356        state.roles.insert(role_name.to_string(), role);
357        Ok(arn)
358    }
359
360    fn delete_iam_role(&self, physical_id: &str) -> Result<(), String> {
361        let mut state = self.iam_state.write();
362        // physical_id is the ARN; find the role name
363        let role_name = state
364            .roles
365            .iter()
366            .find(|(_, r)| r.arn == physical_id)
367            .map(|(name, _)| name.clone());
368        if let Some(name) = role_name {
369            state.roles.remove(&name);
370            state.role_policies.remove(&name);
371            state.role_inline_policies.remove(&name);
372        }
373        Ok(())
374    }
375
376    // --- IAM Policy ---
377
378    fn create_iam_policy(&self, resource: &ResourceDefinition) -> Result<String, String> {
379        let props = &resource.properties;
380        let policy_name = props
381            .get("PolicyName")
382            .and_then(|v| v.as_str())
383            .unwrap_or(&resource.logical_id);
384
385        let policy_document = props
386            .get("PolicyDocument")
387            .map(|v| {
388                if v.is_string() {
389                    v.as_str().unwrap().to_string()
390                } else {
391                    serde_json::to_string(v).unwrap_or_default()
392                }
393            })
394            .unwrap_or_default();
395
396        let path = props.get("Path").and_then(|v| v.as_str()).unwrap_or("/");
397
398        let mut state = self.iam_state.write();
399        let policy_id = format!(
400            "FSIA{}",
401            &Uuid::new_v4().to_string().replace('-', "").to_uppercase()[..16]
402        );
403        let arn = format!(
404            "arn:aws:iam::{}:policy{}{}",
405            state.account_id,
406            if path == "/" { "/" } else { path },
407            policy_name
408        );
409
410        let now = Utc::now();
411        let policy = IamPolicy {
412            policy_name: policy_name.to_string(),
413            policy_id,
414            arn: arn.clone(),
415            path: path.to_string(),
416            description: props
417                .get("Description")
418                .and_then(|v| v.as_str())
419                .unwrap_or("")
420                .to_string(),
421            created_at: now,
422            tags: Vec::new(),
423            default_version_id: "v1".to_string(),
424            versions: vec![PolicyVersion {
425                version_id: "v1".to_string(),
426                document: policy_document,
427                is_default: true,
428                created_at: now,
429            }],
430            next_version_num: 2,
431            attachment_count: 0,
432        };
433
434        state.policies.insert(arn.clone(), policy);
435        Ok(arn)
436    }
437
438    fn delete_iam_policy(&self, physical_id: &str) -> Result<(), String> {
439        let mut state = self.iam_state.write();
440        state.policies.remove(physical_id);
441        Ok(())
442    }
443
444    // --- S3 ---
445
446    fn create_s3_bucket(&self, resource: &ResourceDefinition) -> Result<String, String> {
447        let props = &resource.properties;
448        let bucket_name = props
449            .get("BucketName")
450            .and_then(|v| v.as_str())
451            .unwrap_or(&resource.logical_id);
452
453        let mut state = self.s3_state.write();
454        let bucket = S3Bucket::new(bucket_name, &state.region, &state.account_id);
455        state.buckets.insert(bucket_name.to_string(), bucket);
456        Ok(bucket_name.to_string())
457    }
458
459    fn delete_s3_bucket(&self, physical_id: &str) -> Result<(), String> {
460        let mut state = self.s3_state.write();
461        state.buckets.remove(physical_id);
462        Ok(())
463    }
464
465    // --- EventBridge ---
466
467    fn create_eventbridge_rule(&self, resource: &ResourceDefinition) -> Result<String, String> {
468        let props = &resource.properties;
469        let rule_name = props
470            .get("Name")
471            .and_then(|v| v.as_str())
472            .unwrap_or(&resource.logical_id);
473        let event_bus_name = props
474            .get("EventBusName")
475            .and_then(|v| v.as_str())
476            .unwrap_or("default");
477
478        let mut state = self.eventbridge_state.write();
479
480        // Validate that the event bus exists
481        if !state.buses.contains_key(event_bus_name) {
482            return Err(format!("Event bus does not exist: {event_bus_name}"));
483        }
484
485        let arn = if event_bus_name == "default" {
486            format!(
487                "arn:aws:events:{}:{}:rule/{}",
488                state.region, state.account_id, rule_name
489            )
490        } else {
491            format!(
492                "arn:aws:events:{}:{}:rule/{}/{}",
493                state.region, state.account_id, event_bus_name, rule_name
494            )
495        };
496
497        let rule = EventRule {
498            name: rule_name.to_string(),
499            arn: arn.clone(),
500            event_bus_name: event_bus_name.to_string(),
501            event_pattern: props.get("EventPattern").map(|v| {
502                if v.is_string() {
503                    v.as_str().unwrap().to_string()
504                } else {
505                    serde_json::to_string(v).unwrap_or_default()
506                }
507            }),
508            schedule_expression: props
509                .get("ScheduleExpression")
510                .and_then(|v| v.as_str())
511                .map(|s| s.to_string()),
512            state: props
513                .get("State")
514                .and_then(|v| v.as_str())
515                .unwrap_or("ENABLED")
516                .to_string(),
517            description: props
518                .get("Description")
519                .and_then(|v| v.as_str())
520                .map(|s| s.to_string()),
521            role_arn: props
522                .get("RoleArn")
523                .and_then(|v| v.as_str())
524                .map(|s| s.to_string()),
525            managed_by: None,
526            created_by: None,
527            targets: Vec::new(),
528            tags: HashMap::new(),
529            last_fired: None,
530        };
531
532        state
533            .rules
534            .insert((event_bus_name.to_string(), rule_name.to_string()), rule);
535        Ok(arn)
536    }
537
538    fn delete_eventbridge_rule(&self, physical_id: &str) -> Result<(), String> {
539        let mut state = self.eventbridge_state.write();
540        // physical_id is the ARN; find the rule key
541        let key = state
542            .rules
543            .iter()
544            .find(|(_, r)| r.arn == physical_id)
545            .map(|(k, _)| k.clone());
546        if let Some(k) = key {
547            state.rules.remove(&k);
548        }
549        Ok(())
550    }
551
552    // --- DynamoDB ---
553
554    fn create_dynamodb_table(&self, resource: &ResourceDefinition) -> Result<String, String> {
555        let props = &resource.properties;
556        let table_name = props
557            .get("TableName")
558            .and_then(|v| v.as_str())
559            .unwrap_or(&resource.logical_id);
560
561        let mut key_schema = Vec::new();
562        if let Some(ks) = props.get("KeySchema").and_then(|v| v.as_array()) {
563            for item in ks {
564                let attr_name = item
565                    .get("AttributeName")
566                    .and_then(|v| v.as_str())
567                    .unwrap_or("")
568                    .to_string();
569                let key_type = item
570                    .get("KeyType")
571                    .and_then(|v| v.as_str())
572                    .unwrap_or("HASH")
573                    .to_string();
574                key_schema.push(KeySchemaElement {
575                    attribute_name: attr_name,
576                    key_type,
577                });
578            }
579        }
580
581        let mut attribute_definitions = Vec::new();
582        if let Some(defs) = props.get("AttributeDefinitions").and_then(|v| v.as_array()) {
583            for item in defs {
584                let attr_name = item
585                    .get("AttributeName")
586                    .and_then(|v| v.as_str())
587                    .unwrap_or("")
588                    .to_string();
589                let attr_type = item
590                    .get("AttributeType")
591                    .and_then(|v| v.as_str())
592                    .unwrap_or("S")
593                    .to_string();
594                attribute_definitions.push(AttributeDefinition {
595                    attribute_name: attr_name,
596                    attribute_type: attr_type,
597                });
598            }
599        }
600
601        let billing_mode = props
602            .get("BillingMode")
603            .and_then(|v| v.as_str())
604            .unwrap_or("PAY_PER_REQUEST")
605            .to_string();
606
607        let provisioned_throughput = if billing_mode == "PROVISIONED" {
608            if let Some(pt) = props.get("ProvisionedThroughput") {
609                ProvisionedThroughput {
610                    read_capacity_units: pt
611                        .get("ReadCapacityUnits")
612                        .and_then(|v| v.as_i64())
613                        .unwrap_or(5),
614                    write_capacity_units: pt
615                        .get("WriteCapacityUnits")
616                        .and_then(|v| v.as_i64())
617                        .unwrap_or(5),
618                }
619            } else {
620                ProvisionedThroughput {
621                    read_capacity_units: 5,
622                    write_capacity_units: 5,
623                }
624            }
625        } else {
626            ProvisionedThroughput {
627                read_capacity_units: 0,
628                write_capacity_units: 0,
629            }
630        };
631
632        let mut state = self.dynamodb_state.write();
633        let arn = format!(
634            "arn:aws:dynamodb:{}:{}:table/{}",
635            state.region, state.account_id, table_name
636        );
637
638        let table = DynamoTable {
639            name: table_name.to_string(),
640            arn: arn.clone(),
641            key_schema,
642            attribute_definitions,
643            provisioned_throughput,
644            items: Vec::new(),
645            gsi: Vec::new(),
646            lsi: Vec::new(),
647            tags: HashMap::new(),
648            created_at: Utc::now(),
649            status: "ACTIVE".to_string(),
650            item_count: 0,
651            size_bytes: 0,
652            billing_mode,
653            ttl_attribute: None,
654            ttl_enabled: false,
655            resource_policy: None,
656            pitr_enabled: false,
657            kinesis_destinations: Vec::new(),
658            contributor_insights_status: "DISABLED".to_string(),
659            contributor_insights_counters: HashMap::new(),
660        };
661
662        state.tables.insert(table_name.to_string(), table);
663        Ok(arn)
664    }
665
666    fn delete_dynamodb_table(&self, physical_id: &str) -> Result<(), String> {
667        let mut state = self.dynamodb_state.write();
668        // physical_id is the ARN; find the table name
669        let table_name = state
670            .tables
671            .iter()
672            .find(|(_, t)| t.arn == physical_id)
673            .map(|(name, _)| name.clone());
674        if let Some(name) = table_name {
675            state.tables.remove(&name);
676        }
677        Ok(())
678    }
679
680    // --- CloudWatch Logs ---
681
682    fn create_log_group(&self, resource: &ResourceDefinition) -> Result<String, String> {
683        let props = &resource.properties;
684        let log_group_name = props
685            .get("LogGroupName")
686            .and_then(|v| v.as_str())
687            .unwrap_or(&resource.logical_id);
688
689        let retention_in_days = props
690            .get("RetentionInDays")
691            .and_then(|v| v.as_i64())
692            .map(|v| v as i32);
693
694        let mut state = self.logs_state.write();
695        let arn = format!(
696            "arn:aws:logs:{}:{}:log-group:{}:*",
697            state.region, state.account_id, log_group_name
698        );
699
700        let log_group = fakecloud_logs::state::LogGroup {
701            name: log_group_name.to_string(),
702            arn: arn.clone(),
703            creation_time: Utc::now().timestamp_millis(),
704            retention_in_days,
705            kms_key_id: None,
706            stored_bytes: 0,
707            log_streams: HashMap::new(),
708            tags: HashMap::new(),
709            subscription_filters: Vec::new(),
710            data_protection_policy: None,
711            index_policies: Vec::new(),
712            transformer: None,
713            deletion_protection: false,
714        };
715
716        state
717            .log_groups
718            .insert(log_group_name.to_string(), log_group);
719        Ok(arn)
720    }
721
722    fn delete_log_group(&self, physical_id: &str) -> Result<(), String> {
723        let mut state = self.logs_state.write();
724        // physical_id is the ARN; find the log group name
725        let name = state
726            .log_groups
727            .iter()
728            .find(|(_, g)| g.arn == physical_id)
729            .map(|(name, _)| name.clone());
730        if let Some(name) = name {
731            state.log_groups.remove(&name);
732        }
733        Ok(())
734    }
735
736    // --- Custom Resources ---
737
738    /// Invoke a Lambda function synchronously via the delivery bus.
739    fn invoke_lambda_sync(&self, function_arn: &str, payload: &str) -> Result<(), String> {
740        let delivery = self.delivery.clone();
741        let function_arn = function_arn.to_string();
742        let payload = payload.to_string();
743        std::thread::scope(|s| {
744            s.spawn(|| {
745                let rt = tokio::runtime::Builder::new_current_thread()
746                    .enable_all()
747                    .build()
748                    .map_err(|e| format!("Failed to create runtime: {e}"))?;
749                rt.block_on(async {
750                    match delivery.invoke_lambda(&function_arn, &payload).await {
751                        Some(Ok(_)) => {
752                            tracing::info!(
753                                "Custom resource Lambda {} invoked successfully",
754                                function_arn
755                            );
756                            Ok(())
757                        }
758                        Some(Err(e)) => {
759                            tracing::warn!(
760                                "Custom resource Lambda {} invocation failed: {e}",
761                                function_arn
762                            );
763                            Err(format!("Lambda invocation failed: {e}"))
764                        }
765                        None => {
766                            tracing::warn!(
767                                "No Lambda delivery configured; skipping custom resource invocation for {}",
768                                function_arn
769                            );
770                            Ok(())
771                        }
772                    }
773                })
774            })
775            .join()
776            .map_err(|_| "Lambda invocation thread panicked".to_string())?
777        })
778    }
779
780    fn create_custom_resource(&self, resource: &ResourceDefinition) -> Result<String, String> {
781        let props = &resource.properties;
782        let service_token = props
783            .get("ServiceToken")
784            .and_then(|v| v.as_str())
785            .ok_or("Custom resource requires ServiceToken property")?;
786
787        let request_id = Uuid::new_v4().to_string();
788
789        // Build the CloudFormation custom resource event
790        let event = serde_json::json!({
791            "RequestType": "Create",
792            "ServiceToken": service_token,
793            "StackId": self.stack_id,
794            "RequestId": request_id,
795            "ResourceType": resource.resource_type,
796            "LogicalResourceId": resource.logical_id,
797            "ResourceProperties": props,
798        });
799
800        let payload = serde_json::to_string(&event).map_err(|e| e.to_string())?;
801        self.invoke_lambda_sync(service_token, &payload)?;
802
803        // Physical resource ID: use a generated ID (the Lambda could return one,
804        // but for simplicity we generate one here).
805        let physical_id = format!("{}-{}", resource.logical_id, &request_id[..8]);
806        Ok(physical_id)
807    }
808
809    fn delete_custom_resource(&self, resource: &StackResource) -> Result<(), String> {
810        let service_token = match &resource.service_token {
811            Some(token) => token.clone(),
812            None => {
813                // No ServiceToken stored — nothing to invoke
814                return Ok(());
815            }
816        };
817
818        let request_id = Uuid::new_v4().to_string();
819
820        let event = serde_json::json!({
821            "RequestType": "Delete",
822            "ServiceToken": service_token,
823            "StackId": self.stack_id,
824            "RequestId": request_id,
825            "ResourceType": resource.resource_type,
826            "LogicalResourceId": resource.logical_id,
827            "PhysicalResourceId": resource.physical_id,
828        });
829
830        let payload = serde_json::to_string(&event).map_err(|e| e.to_string())?;
831
832        // Best-effort: don't fail stack deletion if Lambda invocation fails
833        if let Err(e) = self.invoke_lambda_sync(&service_token, &payload) {
834            tracing::warn!(
835                "Custom resource delete Lambda invocation failed for {}: {e}",
836                resource.logical_id
837            );
838        }
839        Ok(())
840    }
841}
842
843#[cfg(test)]
844mod tests {
845    use super::*;
846    use parking_lot::RwLock;
847
848    fn make_provisioner() -> ResourceProvisioner {
849        ResourceProvisioner {
850            sqs_state: Arc::new(RwLock::new(fakecloud_sqs::state::SqsState::new(
851                "123456789012",
852                "us-east-1",
853                "http://localhost:4566",
854            ))),
855            sns_state: Arc::new(RwLock::new(fakecloud_sns::state::SnsState::new(
856                "123456789012",
857                "us-east-1",
858                "http://localhost:4566",
859            ))),
860            ssm_state: Arc::new(RwLock::new(fakecloud_ssm::state::SsmState::new(
861                "123456789012",
862                "us-east-1",
863            ))),
864            iam_state: Arc::new(RwLock::new(fakecloud_iam::state::IamState::new(
865                "123456789012",
866            ))),
867            s3_state: Arc::new(RwLock::new(fakecloud_s3::state::S3State::new(
868                "123456789012",
869                "us-east-1",
870            ))),
871            eventbridge_state: Arc::new(RwLock::new(
872                fakecloud_eventbridge::state::EventBridgeState::new("123456789012", "us-east-1"),
873            )),
874            dynamodb_state: Arc::new(RwLock::new(fakecloud_dynamodb::state::DynamoDbState::new(
875                "123456789012",
876                "us-east-1",
877            ))),
878            logs_state: Arc::new(RwLock::new(fakecloud_logs::state::LogsState::new(
879                "123456789012",
880                "us-east-1",
881            ))),
882            delivery: Arc::new(DeliveryBus::new()),
883            account_id: "123456789012".to_string(),
884            region: "us-east-1".to_string(),
885            stack_id: "arn:aws:cloudformation:us-east-1:123456789012:stack/test/00000000-0000-0000-0000-000000000000".to_string(),
886        }
887    }
888
889    fn make_resource(
890        resource_type: &str,
891        logical_id: &str,
892        props: serde_json::Value,
893    ) -> ResourceDefinition {
894        ResourceDefinition {
895            logical_id: logical_id.to_string(),
896            resource_type: resource_type.to_string(),
897            properties: props,
898        }
899    }
900
901    #[test]
902    fn sns_subscription_rejects_nonexistent_topic() {
903        let prov = make_provisioner();
904        let resource = make_resource(
905            "AWS::SNS::Subscription",
906            "MySub",
907            serde_json::json!({
908                "TopicArn": "arn:aws:sns:us-east-1:123456789012:NonExistent",
909                "Protocol": "sqs",
910                "Endpoint": "arn:aws:sqs:us-east-1:123456789012:my-queue"
911            }),
912        );
913        let result = prov.create_resource(&resource);
914        assert!(result.is_err());
915        assert!(result.unwrap_err().contains("does not exist"));
916    }
917
918    #[test]
919    fn sns_subscription_succeeds_when_topic_exists() {
920        let prov = make_provisioner();
921        // First create the topic
922        let topic = make_resource(
923            "AWS::SNS::Topic",
924            "MyTopic",
925            serde_json::json!({ "TopicName": "my-topic" }),
926        );
927        let topic_result = prov.create_resource(&topic);
928        assert!(topic_result.is_ok());
929        let topic_arn = topic_result.unwrap().physical_id;
930
931        // Now create subscription referencing that topic
932        let sub = make_resource(
933            "AWS::SNS::Subscription",
934            "MySub",
935            serde_json::json!({
936                "TopicArn": topic_arn,
937                "Protocol": "sqs",
938                "Endpoint": "arn:aws:sqs:us-east-1:123456789012:my-queue"
939            }),
940        );
941        let result = prov.create_resource(&sub);
942        assert!(result.is_ok());
943    }
944
945    #[test]
946    fn eventbridge_rule_arn_default_bus_omits_bus_name() {
947        let prov = make_provisioner();
948        let resource = make_resource(
949            "AWS::Events::Rule",
950            "MyRule",
951            serde_json::json!({
952                "Name": "my-rule",
953                "ScheduleExpression": "rate(1 hour)"
954            }),
955        );
956        let result = prov.create_resource(&resource).unwrap();
957        // For default bus, ARN should be rule/<name> without /default/
958        assert_eq!(
959            result.physical_id,
960            "arn:aws:events:us-east-1:123456789012:rule/my-rule"
961        );
962        assert!(!result.physical_id.contains("rule/default/"));
963    }
964
965    #[test]
966    fn eventbridge_rule_arn_custom_bus_includes_bus_name() {
967        let prov = make_provisioner();
968        // Create a custom bus first
969        {
970            let mut state = prov.eventbridge_state.write();
971            state.buses.insert(
972                "custom-bus".to_string(),
973                fakecloud_eventbridge::state::EventBus {
974                    name: "custom-bus".to_string(),
975                    arn: "arn:aws:events:us-east-1:123456789012:event-bus/custom-bus".to_string(),
976                    policy: None,
977                    creation_time: Utc::now(),
978                    last_modified_time: Utc::now(),
979                    description: None,
980                    kms_key_identifier: None,
981                    dead_letter_config: None,
982                    tags: HashMap::new(),
983                },
984            );
985        }
986        let resource = make_resource(
987            "AWS::Events::Rule",
988            "MyRule",
989            serde_json::json!({
990                "Name": "my-rule",
991                "EventBusName": "custom-bus",
992                "ScheduleExpression": "rate(1 hour)"
993            }),
994        );
995        let result = prov.create_resource(&resource).unwrap();
996        assert_eq!(
997            result.physical_id,
998            "arn:aws:events:us-east-1:123456789012:rule/custom-bus/my-rule"
999        );
1000    }
1001
1002    #[test]
1003    fn eventbridge_rule_rejects_nonexistent_bus() {
1004        let prov = make_provisioner();
1005        let resource = make_resource(
1006            "AWS::Events::Rule",
1007            "MyRule",
1008            serde_json::json!({
1009                "Name": "my-rule",
1010                "EventBusName": "nonexistent-bus",
1011                "ScheduleExpression": "rate(1 hour)"
1012            }),
1013        );
1014        let result = prov.create_resource(&resource);
1015        assert!(result.is_err());
1016        assert!(result.unwrap_err().contains("does not exist"));
1017    }
1018
1019    #[test]
1020    fn custom_resource_requires_service_token() {
1021        let prov = make_provisioner();
1022        let resource = make_resource(
1023            "Custom::MyResource",
1024            "MyCustom",
1025            serde_json::json!({
1026                "Foo": "bar"
1027            }),
1028        );
1029        let result = prov.create_resource(&resource);
1030        assert!(result.is_err());
1031        assert!(
1032            result.unwrap_err().contains("ServiceToken"),
1033            "Should require ServiceToken property"
1034        );
1035    }
1036
1037    #[test]
1038    fn custom_resource_succeeds_without_lambda_delivery() {
1039        // When no Lambda delivery is configured, custom resource creation
1040        // should still succeed (the invocation is silently skipped).
1041        let prov = make_provisioner();
1042        let resource = make_resource(
1043            "Custom::MyResource",
1044            "MyCustom",
1045            serde_json::json!({
1046                "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:my-func",
1047                "Foo": "bar"
1048            }),
1049        );
1050        let result = prov.create_resource(&resource);
1051        assert!(result.is_ok());
1052        let sr = result.unwrap();
1053        assert_eq!(sr.logical_id, "MyCustom");
1054        assert_eq!(sr.resource_type, "Custom::MyResource");
1055        assert!(sr.physical_id.starts_with("MyCustom-"));
1056    }
1057
1058    #[test]
1059    fn cloudformation_custom_resource_type_succeeds() {
1060        let prov = make_provisioner();
1061        let resource = make_resource(
1062            "AWS::CloudFormation::CustomResource",
1063            "MyCustom2",
1064            serde_json::json!({
1065                "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:my-func",
1066                "Key": "value"
1067            }),
1068        );
1069        let result = prov.create_resource(&resource);
1070        assert!(result.is_ok());
1071        let sr = result.unwrap();
1072        assert_eq!(sr.resource_type, "AWS::CloudFormation::CustomResource");
1073    }
1074}