Skip to main content

fakecloud_cloudformation/
resource_provisioner.rs

1use chrono::Utc;
2use parking_lot::RwLock;
3use std::collections::HashMap;
4use std::sync::Arc;
5use uuid::Uuid;
6
7use fakecloud_core::delivery::DeliveryBus;
8use fakecloud_dynamodb::state::{
9    AttributeDefinition, DynamoTable, KeySchemaElement, ProvisionedThroughput, SharedDynamoDbState,
10};
11use fakecloud_eventbridge::state::{EventRule, SharedEventBridgeState};
12use fakecloud_iam::state::{IamPolicy, IamRole, PolicyVersion, SharedIamState};
13use fakecloud_logs::state::SharedLogsState;
14use fakecloud_s3::state::{S3Bucket, SharedS3State};
15use fakecloud_sns::state::{SharedSnsState, SnsSubscription, SnsTopic};
16use fakecloud_sqs::state::{SharedSqsState, SqsQueue};
17use fakecloud_ssm::state::{SharedSsmState, SsmParameter};
18
19use crate::state::StackResource;
20use crate::template::ResourceDefinition;
21
22/// Holds references to all service states so CloudFormation can provision resources.
23pub struct ResourceProvisioner {
24    pub sqs_state: SharedSqsState,
25    pub sns_state: SharedSnsState,
26    pub ssm_state: SharedSsmState,
27    pub iam_state: SharedIamState,
28    pub s3_state: SharedS3State,
29    pub eventbridge_state: SharedEventBridgeState,
30    pub dynamodb_state: SharedDynamoDbState,
31    pub logs_state: SharedLogsState,
32    pub delivery: Arc<DeliveryBus>,
33    pub account_id: String,
34    pub region: String,
35    pub stack_id: String,
36}
37
38impl ResourceProvisioner {
39    /// Create a resource and return the StackResource with physical ID.
40    pub fn create_resource(&self, resource: &ResourceDefinition) -> Result<StackResource, String> {
41        let result = match resource.resource_type.as_str() {
42            "AWS::SQS::Queue" => self.create_sqs_queue(resource),
43            "AWS::SNS::Topic" => self.create_sns_topic(resource),
44            "AWS::SNS::Subscription" => self.create_sns_subscription(resource),
45            "AWS::SSM::Parameter" => self.create_ssm_parameter(resource),
46            "AWS::IAM::Role" => self.create_iam_role(resource),
47            "AWS::IAM::Policy" => self.create_iam_policy(resource),
48            "AWS::S3::Bucket" => self.create_s3_bucket(resource),
49            "AWS::Events::Rule" => self.create_eventbridge_rule(resource),
50            "AWS::DynamoDB::Table" => self.create_dynamodb_table(resource),
51            "AWS::Logs::LogGroup" => self.create_log_group(resource),
52            t if t.starts_with("Custom::") || t == "AWS::CloudFormation::CustomResource" => {
53                self.create_custom_resource(resource)
54            }
55            other => Err(format!("Unsupported resource type: {other}")),
56        };
57
58        let is_custom = resource.resource_type.starts_with("Custom::")
59            || resource.resource_type == "AWS::CloudFormation::CustomResource";
60        let service_token = if is_custom {
61            resource
62                .properties
63                .get("ServiceToken")
64                .and_then(|v| v.as_str())
65                .map(|s| s.to_string())
66        } else {
67            None
68        };
69
70        result.map(|physical_id| StackResource {
71            logical_id: resource.logical_id.clone(),
72            physical_id,
73            resource_type: resource.resource_type.clone(),
74            status: "CREATE_COMPLETE".to_string(),
75            service_token,
76        })
77    }
78
79    /// Delete a previously created resource.
80    pub fn delete_resource(&self, resource: &StackResource) -> Result<(), String> {
81        match resource.resource_type.as_str() {
82            "AWS::SQS::Queue" => self.delete_sqs_queue(&resource.physical_id),
83            "AWS::SNS::Topic" => self.delete_sns_topic(&resource.physical_id),
84            "AWS::SNS::Subscription" => self.delete_sns_subscription(&resource.physical_id),
85            "AWS::SSM::Parameter" => self.delete_ssm_parameter(&resource.physical_id),
86            "AWS::IAM::Role" => self.delete_iam_role(&resource.physical_id),
87            "AWS::IAM::Policy" => self.delete_iam_policy(&resource.physical_id),
88            "AWS::S3::Bucket" => self.delete_s3_bucket(&resource.physical_id),
89            "AWS::Events::Rule" => self.delete_eventbridge_rule(&resource.physical_id),
90            "AWS::DynamoDB::Table" => self.delete_dynamodb_table(&resource.physical_id),
91            "AWS::Logs::LogGroup" => self.delete_log_group(&resource.physical_id),
92            t if t.starts_with("Custom::") || t == "AWS::CloudFormation::CustomResource" => {
93                self.delete_custom_resource(resource)
94            }
95            other => Err(format!("Unsupported resource type: {other}")),
96        }
97    }
98
99    // --- SQS ---
100
101    fn create_sqs_queue(&self, resource: &ResourceDefinition) -> Result<String, String> {
102        let props = &resource.properties;
103        let queue_name = props
104            .get("QueueName")
105            .and_then(|v| v.as_str())
106            .unwrap_or(&resource.logical_id);
107
108        let mut state = self.sqs_state.write();
109        let queue_url = format!("{}/{}/{}", state.endpoint, state.account_id, queue_name);
110        let arn = format!(
111            "arn:aws:sqs:{}:{}:{}",
112            state.region, state.account_id, queue_name
113        );
114
115        let is_fifo = queue_name.ends_with(".fifo");
116        let mut attributes = HashMap::new();
117        if let Some(obj) = props.as_object() {
118            for (k, v) in obj {
119                if k != "QueueName" {
120                    if let Some(s) = v.as_str() {
121                        attributes.insert(k.clone(), s.to_string());
122                    } else if let Some(n) = v.as_i64() {
123                        attributes.insert(k.clone(), n.to_string());
124                    }
125                }
126            }
127        }
128
129        let queue = SqsQueue {
130            queue_name: queue_name.to_string(),
131            queue_url: queue_url.clone(),
132            arn,
133            created_at: Utc::now(),
134            messages: std::collections::VecDeque::new(),
135            inflight: Vec::new(),
136            attributes,
137            is_fifo,
138            dedup_cache: HashMap::new(),
139            redrive_policy: None,
140            tags: HashMap::new(),
141            next_sequence_number: 0,
142            permission_labels: Vec::new(),
143            receipt_handle_map: HashMap::new(),
144        };
145
146        state
147            .name_to_url
148            .insert(queue_name.to_string(), queue_url.clone());
149        state.queues.insert(queue_url.clone(), queue);
150
151        Ok(queue_url)
152    }
153
154    fn delete_sqs_queue(&self, physical_id: &str) -> Result<(), String> {
155        let mut state = self.sqs_state.write();
156        if let Some(queue) = state.queues.remove(physical_id) {
157            state.name_to_url.remove(&queue.queue_name);
158        }
159        Ok(())
160    }
161
162    // --- SNS ---
163
164    fn create_sns_topic(&self, resource: &ResourceDefinition) -> Result<String, String> {
165        let props = &resource.properties;
166        let topic_name = props
167            .get("TopicName")
168            .and_then(|v| v.as_str())
169            .unwrap_or(&resource.logical_id);
170
171        let mut state = self.sns_state.write();
172        let topic_arn = format!(
173            "arn:aws:sns:{}:{}:{}",
174            state.region, state.account_id, topic_name
175        );
176
177        let topic = SnsTopic {
178            topic_arn: topic_arn.clone(),
179            name: topic_name.to_string(),
180            attributes: HashMap::new(),
181            tags: Vec::new(),
182            is_fifo: topic_name.ends_with(".fifo"),
183            created_at: Utc::now(),
184        };
185
186        state.topics.insert(topic_arn.clone(), topic);
187        Ok(topic_arn)
188    }
189
190    fn delete_sns_topic(&self, physical_id: &str) -> Result<(), String> {
191        let mut state = self.sns_state.write();
192        state.topics.remove(physical_id);
193        // Also remove subscriptions for this topic
194        state
195            .subscriptions
196            .retain(|_, sub| sub.topic_arn != physical_id);
197        Ok(())
198    }
199
200    // --- SNS Subscription ---
201
202    fn create_sns_subscription(&self, resource: &ResourceDefinition) -> Result<String, String> {
203        let props = &resource.properties;
204        let topic_arn = props
205            .get("TopicArn")
206            .and_then(|v| v.as_str())
207            .ok_or("SNS Subscription requires TopicArn")?;
208        let protocol = props
209            .get("Protocol")
210            .and_then(|v| v.as_str())
211            .ok_or("SNS Subscription requires Protocol")?;
212        let endpoint = props
213            .get("Endpoint")
214            .and_then(|v| v.as_str())
215            .ok_or("SNS Subscription requires Endpoint")?;
216
217        let mut state = self.sns_state.write();
218
219        // Validate that the topic exists
220        if !state.topics.contains_key(topic_arn) {
221            return Err(format!("Topic ARN does not exist: {topic_arn}"));
222        }
223
224        let sub_arn = format!("{}:{}", topic_arn, Uuid::new_v4());
225
226        let subscription = SnsSubscription {
227            subscription_arn: sub_arn.clone(),
228            topic_arn: topic_arn.to_string(),
229            protocol: protocol.to_string(),
230            endpoint: endpoint.to_string(),
231            owner: state.account_id.clone(),
232            attributes: HashMap::new(),
233            confirmed: true,
234            confirmation_token: None,
235        };
236
237        state.subscriptions.insert(sub_arn.clone(), subscription);
238        Ok(sub_arn)
239    }
240
241    fn delete_sns_subscription(&self, physical_id: &str) -> Result<(), String> {
242        let mut state = self.sns_state.write();
243        state.subscriptions.remove(physical_id);
244        Ok(())
245    }
246
247    // --- SSM ---
248
249    fn create_ssm_parameter(&self, resource: &ResourceDefinition) -> Result<String, String> {
250        let props = &resource.properties;
251        let name = props
252            .get("Name")
253            .and_then(|v| v.as_str())
254            .ok_or("SSM Parameter requires Name")?;
255        let value = props
256            .get("Value")
257            .and_then(|v| v.as_str())
258            .ok_or("SSM Parameter requires Value")?;
259        let param_type = props
260            .get("Type")
261            .and_then(|v| v.as_str())
262            .unwrap_or("String");
263
264        let mut state = self.ssm_state.write();
265        let arn = format!(
266            "arn:aws:ssm:{}:{}:parameter{}",
267            state.region,
268            state.account_id,
269            if name.starts_with('/') {
270                name.to_string()
271            } else {
272                format!("/{name}")
273            }
274        );
275
276        let parameter = SsmParameter {
277            name: name.to_string(),
278            value: value.to_string(),
279            param_type: param_type.to_string(),
280            version: 1,
281            arn: arn.clone(),
282            last_modified: Utc::now(),
283            history: Vec::new(),
284            tags: HashMap::new(),
285            labels: HashMap::new(),
286            description: props
287                .get("Description")
288                .and_then(|v| v.as_str())
289                .map(|s| s.to_string()),
290            allowed_pattern: None,
291            key_id: None,
292            data_type: "text".to_string(),
293            tier: "Standard".to_string(),
294            policies: None,
295        };
296
297        state.parameters.insert(name.to_string(), parameter);
298        Ok(name.to_string())
299    }
300
301    fn delete_ssm_parameter(&self, physical_id: &str) -> Result<(), String> {
302        let mut state = self.ssm_state.write();
303        state.parameters.remove(physical_id);
304        Ok(())
305    }
306
307    // --- IAM Role ---
308
309    fn create_iam_role(&self, resource: &ResourceDefinition) -> Result<String, String> {
310        let props = &resource.properties;
311        let role_name = props
312            .get("RoleName")
313            .and_then(|v| v.as_str())
314            .unwrap_or(&resource.logical_id);
315
316        let assume_role_policy = props
317            .get("AssumeRolePolicyDocument")
318            .map(|v| {
319                if v.is_string() {
320                    v.as_str().unwrap().to_string()
321                } else {
322                    serde_json::to_string(v).unwrap_or_default()
323                }
324            })
325            .unwrap_or_default();
326
327        let path = props.get("Path").and_then(|v| v.as_str()).unwrap_or("/");
328
329        let mut state = self.iam_state.write();
330        let role_id = format!(
331            "FKIA{}",
332            &Uuid::new_v4().to_string().replace('-', "").to_uppercase()[..16]
333        );
334        let arn = format!(
335            "arn:aws:iam::{}:role{}{}",
336            state.account_id,
337            if path == "/" { "/" } else { path },
338            role_name
339        );
340
341        let role = IamRole {
342            role_name: role_name.to_string(),
343            role_id,
344            arn: arn.clone(),
345            path: path.to_string(),
346            assume_role_policy_document: assume_role_policy,
347            created_at: Utc::now(),
348            description: props
349                .get("Description")
350                .and_then(|v| v.as_str())
351                .map(|s| s.to_string()),
352            max_session_duration: 3600,
353            tags: Vec::new(),
354            permissions_boundary: None,
355        };
356
357        state.roles.insert(role_name.to_string(), role);
358        Ok(arn)
359    }
360
361    fn delete_iam_role(&self, physical_id: &str) -> Result<(), String> {
362        let mut state = self.iam_state.write();
363        // physical_id is the ARN; find the role name
364        let role_name = state
365            .roles
366            .iter()
367            .find(|(_, r)| r.arn == physical_id)
368            .map(|(name, _)| name.clone());
369        if let Some(name) = role_name {
370            state.roles.remove(&name);
371            state.role_policies.remove(&name);
372            state.role_inline_policies.remove(&name);
373        }
374        Ok(())
375    }
376
377    // --- IAM Policy ---
378
379    fn create_iam_policy(&self, resource: &ResourceDefinition) -> Result<String, String> {
380        let props = &resource.properties;
381        let policy_name = props
382            .get("PolicyName")
383            .and_then(|v| v.as_str())
384            .unwrap_or(&resource.logical_id);
385
386        let policy_document = props
387            .get("PolicyDocument")
388            .map(|v| {
389                if v.is_string() {
390                    v.as_str().unwrap().to_string()
391                } else {
392                    serde_json::to_string(v).unwrap_or_default()
393                }
394            })
395            .unwrap_or_default();
396
397        let path = props.get("Path").and_then(|v| v.as_str()).unwrap_or("/");
398
399        let mut state = self.iam_state.write();
400        let policy_id = format!(
401            "FSIA{}",
402            &Uuid::new_v4().to_string().replace('-', "").to_uppercase()[..16]
403        );
404        let arn = format!(
405            "arn:aws:iam::{}:policy{}{}",
406            state.account_id,
407            if path == "/" { "/" } else { path },
408            policy_name
409        );
410
411        let now = Utc::now();
412        let policy = IamPolicy {
413            policy_name: policy_name.to_string(),
414            policy_id,
415            arn: arn.clone(),
416            path: path.to_string(),
417            description: props
418                .get("Description")
419                .and_then(|v| v.as_str())
420                .unwrap_or("")
421                .to_string(),
422            created_at: now,
423            tags: Vec::new(),
424            default_version_id: "v1".to_string(),
425            versions: vec![PolicyVersion {
426                version_id: "v1".to_string(),
427                document: policy_document,
428                is_default: true,
429                created_at: now,
430            }],
431            next_version_num: 2,
432            attachment_count: 0,
433        };
434
435        state.policies.insert(arn.clone(), policy);
436        Ok(arn)
437    }
438
439    fn delete_iam_policy(&self, physical_id: &str) -> Result<(), String> {
440        let mut state = self.iam_state.write();
441        state.policies.remove(physical_id);
442        Ok(())
443    }
444
445    // --- S3 ---
446
447    fn create_s3_bucket(&self, resource: &ResourceDefinition) -> Result<String, String> {
448        let props = &resource.properties;
449        let bucket_name = props
450            .get("BucketName")
451            .and_then(|v| v.as_str())
452            .unwrap_or(&resource.logical_id);
453
454        let mut state = self.s3_state.write();
455        let bucket = S3Bucket::new(bucket_name, &state.region, &state.account_id);
456        state.buckets.insert(bucket_name.to_string(), bucket);
457        Ok(bucket_name.to_string())
458    }
459
460    fn delete_s3_bucket(&self, physical_id: &str) -> Result<(), String> {
461        let mut state = self.s3_state.write();
462        state.buckets.remove(physical_id);
463        Ok(())
464    }
465
466    // --- EventBridge ---
467
468    fn create_eventbridge_rule(&self, resource: &ResourceDefinition) -> Result<String, String> {
469        let props = &resource.properties;
470        let rule_name = props
471            .get("Name")
472            .and_then(|v| v.as_str())
473            .unwrap_or(&resource.logical_id);
474        let event_bus_name = props
475            .get("EventBusName")
476            .and_then(|v| v.as_str())
477            .unwrap_or("default");
478
479        let mut state = self.eventbridge_state.write();
480
481        // Validate that the event bus exists
482        if !state.buses.contains_key(event_bus_name) {
483            return Err(format!("Event bus does not exist: {event_bus_name}"));
484        }
485
486        let arn = if event_bus_name == "default" {
487            format!(
488                "arn:aws:events:{}:{}:rule/{}",
489                state.region, state.account_id, rule_name
490            )
491        } else {
492            format!(
493                "arn:aws:events:{}:{}:rule/{}/{}",
494                state.region, state.account_id, event_bus_name, rule_name
495            )
496        };
497
498        let rule = EventRule {
499            name: rule_name.to_string(),
500            arn: arn.clone(),
501            event_bus_name: event_bus_name.to_string(),
502            event_pattern: props.get("EventPattern").map(|v| {
503                if v.is_string() {
504                    v.as_str().unwrap().to_string()
505                } else {
506                    serde_json::to_string(v).unwrap_or_default()
507                }
508            }),
509            schedule_expression: props
510                .get("ScheduleExpression")
511                .and_then(|v| v.as_str())
512                .map(|s| s.to_string()),
513            state: props
514                .get("State")
515                .and_then(|v| v.as_str())
516                .unwrap_or("ENABLED")
517                .to_string(),
518            description: props
519                .get("Description")
520                .and_then(|v| v.as_str())
521                .map(|s| s.to_string()),
522            role_arn: props
523                .get("RoleArn")
524                .and_then(|v| v.as_str())
525                .map(|s| s.to_string()),
526            managed_by: None,
527            created_by: None,
528            targets: Vec::new(),
529            tags: HashMap::new(),
530            last_fired: None,
531        };
532
533        state
534            .rules
535            .insert((event_bus_name.to_string(), rule_name.to_string()), rule);
536        Ok(arn)
537    }
538
539    fn delete_eventbridge_rule(&self, physical_id: &str) -> Result<(), String> {
540        let mut state = self.eventbridge_state.write();
541        // physical_id is the ARN; find the rule key
542        let key = state
543            .rules
544            .iter()
545            .find(|(_, r)| r.arn == physical_id)
546            .map(|(k, _)| k.clone());
547        if let Some(k) = key {
548            state.rules.remove(&k);
549        }
550        Ok(())
551    }
552
553    // --- DynamoDB ---
554
555    fn create_dynamodb_table(&self, resource: &ResourceDefinition) -> Result<String, String> {
556        let props = &resource.properties;
557        let table_name = props
558            .get("TableName")
559            .and_then(|v| v.as_str())
560            .unwrap_or(&resource.logical_id);
561
562        let mut key_schema = Vec::new();
563        if let Some(ks) = props.get("KeySchema").and_then(|v| v.as_array()) {
564            for item in ks {
565                let attr_name = item
566                    .get("AttributeName")
567                    .and_then(|v| v.as_str())
568                    .unwrap_or("")
569                    .to_string();
570                let key_type = item
571                    .get("KeyType")
572                    .and_then(|v| v.as_str())
573                    .unwrap_or("HASH")
574                    .to_string();
575                key_schema.push(KeySchemaElement {
576                    attribute_name: attr_name,
577                    key_type,
578                });
579            }
580        }
581
582        let mut attribute_definitions = Vec::new();
583        if let Some(defs) = props.get("AttributeDefinitions").and_then(|v| v.as_array()) {
584            for item in defs {
585                let attr_name = item
586                    .get("AttributeName")
587                    .and_then(|v| v.as_str())
588                    .unwrap_or("")
589                    .to_string();
590                let attr_type = item
591                    .get("AttributeType")
592                    .and_then(|v| v.as_str())
593                    .unwrap_or("S")
594                    .to_string();
595                attribute_definitions.push(AttributeDefinition {
596                    attribute_name: attr_name,
597                    attribute_type: attr_type,
598                });
599            }
600        }
601
602        let billing_mode = props
603            .get("BillingMode")
604            .and_then(|v| v.as_str())
605            .unwrap_or("PAY_PER_REQUEST")
606            .to_string();
607
608        let provisioned_throughput = if billing_mode == "PROVISIONED" {
609            if let Some(pt) = props.get("ProvisionedThroughput") {
610                ProvisionedThroughput {
611                    read_capacity_units: pt
612                        .get("ReadCapacityUnits")
613                        .and_then(|v| v.as_i64())
614                        .unwrap_or(5),
615                    write_capacity_units: pt
616                        .get("WriteCapacityUnits")
617                        .and_then(|v| v.as_i64())
618                        .unwrap_or(5),
619                }
620            } else {
621                ProvisionedThroughput {
622                    read_capacity_units: 5,
623                    write_capacity_units: 5,
624                }
625            }
626        } else {
627            ProvisionedThroughput {
628                read_capacity_units: 0,
629                write_capacity_units: 0,
630            }
631        };
632
633        // Parse StreamSpecification from CloudFormation properties
634        let (stream_enabled, stream_view_type) =
635            if let Some(stream_spec) = props.get("StreamSpecification") {
636                let view_type = stream_spec
637                    .get("StreamViewType")
638                    .and_then(|v| v.as_str())
639                    .map(|s| s.to_string());
640                let enabled = stream_spec
641                    .get("StreamEnabled")
642                    .and_then(|v| v.as_bool().or_else(|| v.as_str().map(|s| s == "true")))
643                    // If StreamViewType is set, treat streams as enabled even if StreamEnabled is missing
644                    .unwrap_or(view_type.is_some());
645                (enabled, view_type)
646            } else {
647                (false, None)
648            };
649
650        let mut state = self.dynamodb_state.write();
651        let arn = format!(
652            "arn:aws:dynamodb:{}:{}:table/{}",
653            state.region, state.account_id, table_name
654        );
655
656        let stream_arn = if stream_enabled {
657            Some(format!(
658                "{}/stream/{}",
659                arn,
660                Utc::now().format("%Y-%m-%dT%H:%M:%S.%3f")
661            ))
662        } else {
663            None
664        };
665
666        let table = DynamoTable {
667            name: table_name.to_string(),
668            arn: arn.clone(),
669            table_id: Uuid::new_v4().to_string().replace('-', ""),
670            key_schema,
671            attribute_definitions,
672            provisioned_throughput,
673            items: Vec::new(),
674            gsi: Vec::new(),
675            lsi: Vec::new(),
676            tags: HashMap::new(),
677            created_at: Utc::now(),
678            status: "ACTIVE".to_string(),
679            item_count: 0,
680            size_bytes: 0,
681            billing_mode,
682            ttl_attribute: None,
683            ttl_enabled: false,
684            resource_policy: None,
685            pitr_enabled: false,
686            kinesis_destinations: Vec::new(),
687            contributor_insights_status: "DISABLED".to_string(),
688            contributor_insights_counters: HashMap::new(),
689            stream_enabled,
690            stream_view_type,
691            stream_arn,
692            stream_records: Arc::new(RwLock::new(Vec::new())),
693            sse_type: None,
694            sse_kms_key_arn: None,
695        };
696
697        state.tables.insert(table_name.to_string(), table);
698        Ok(arn)
699    }
700
701    fn delete_dynamodb_table(&self, physical_id: &str) -> Result<(), String> {
702        let mut state = self.dynamodb_state.write();
703        // physical_id is the ARN; find the table name
704        let table_name = state
705            .tables
706            .iter()
707            .find(|(_, t)| t.arn == physical_id)
708            .map(|(name, _)| name.clone());
709        if let Some(name) = table_name {
710            state.tables.remove(&name);
711        }
712        Ok(())
713    }
714
715    // --- CloudWatch Logs ---
716
717    fn create_log_group(&self, resource: &ResourceDefinition) -> Result<String, String> {
718        let props = &resource.properties;
719        let log_group_name = props
720            .get("LogGroupName")
721            .and_then(|v| v.as_str())
722            .unwrap_or(&resource.logical_id);
723
724        let retention_in_days = props
725            .get("RetentionInDays")
726            .and_then(|v| v.as_i64())
727            .map(|v| v as i32);
728
729        let mut state = self.logs_state.write();
730        let arn = format!(
731            "arn:aws:logs:{}:{}:log-group:{}:*",
732            state.region, state.account_id, log_group_name
733        );
734
735        let log_group = fakecloud_logs::state::LogGroup {
736            name: log_group_name.to_string(),
737            arn: arn.clone(),
738            creation_time: Utc::now().timestamp_millis(),
739            retention_in_days,
740            kms_key_id: None,
741            stored_bytes: 0,
742            log_streams: HashMap::new(),
743            tags: HashMap::new(),
744            subscription_filters: Vec::new(),
745            data_protection_policy: None,
746            index_policies: Vec::new(),
747            transformer: None,
748            deletion_protection: false,
749        };
750
751        state
752            .log_groups
753            .insert(log_group_name.to_string(), log_group);
754        Ok(arn)
755    }
756
757    fn delete_log_group(&self, physical_id: &str) -> Result<(), String> {
758        let mut state = self.logs_state.write();
759        // physical_id is the ARN; find the log group name
760        let name = state
761            .log_groups
762            .iter()
763            .find(|(_, g)| g.arn == physical_id)
764            .map(|(name, _)| name.clone());
765        if let Some(name) = name {
766            state.log_groups.remove(&name);
767        }
768        Ok(())
769    }
770
771    // --- Custom Resources ---
772
773    /// Invoke a Lambda function synchronously via the delivery bus.
774    fn invoke_lambda_sync(&self, function_arn: &str, payload: &str) -> Result<(), String> {
775        let delivery = self.delivery.clone();
776        let function_arn = function_arn.to_string();
777        let payload = payload.to_string();
778        std::thread::scope(|s| {
779            s.spawn(|| {
780                let rt = tokio::runtime::Builder::new_current_thread()
781                    .enable_all()
782                    .build()
783                    .map_err(|e| format!("Failed to create runtime: {e}"))?;
784                rt.block_on(async {
785                    match delivery.invoke_lambda(&function_arn, &payload).await {
786                        Some(Ok(_)) => {
787                            tracing::info!(
788                                "Custom resource Lambda {} invoked successfully",
789                                function_arn
790                            );
791                            Ok(())
792                        }
793                        Some(Err(e)) => {
794                            tracing::warn!(
795                                "Custom resource Lambda {} invocation failed: {e}",
796                                function_arn
797                            );
798                            Err(format!("Lambda invocation failed: {e}"))
799                        }
800                        None => {
801                            tracing::warn!(
802                                "No Lambda delivery configured; skipping custom resource invocation for {}",
803                                function_arn
804                            );
805                            Ok(())
806                        }
807                    }
808                })
809            })
810            .join()
811            .map_err(|_| "Lambda invocation thread panicked".to_string())?
812        })
813    }
814
815    fn create_custom_resource(&self, resource: &ResourceDefinition) -> Result<String, String> {
816        let props = &resource.properties;
817        let service_token = props
818            .get("ServiceToken")
819            .and_then(|v| v.as_str())
820            .ok_or("Custom resource requires ServiceToken property")?;
821
822        let request_id = Uuid::new_v4().to_string();
823
824        // Build the CloudFormation custom resource event
825        let event = serde_json::json!({
826            "RequestType": "Create",
827            "ServiceToken": service_token,
828            "StackId": self.stack_id,
829            "RequestId": request_id,
830            "ResourceType": resource.resource_type,
831            "LogicalResourceId": resource.logical_id,
832            "ResourceProperties": props,
833        });
834
835        let payload = serde_json::to_string(&event).map_err(|e| e.to_string())?;
836        self.invoke_lambda_sync(service_token, &payload)?;
837
838        // Physical resource ID: use a generated ID (the Lambda could return one,
839        // but for simplicity we generate one here).
840        let physical_id = format!("{}-{}", resource.logical_id, &request_id[..8]);
841        Ok(physical_id)
842    }
843
844    fn delete_custom_resource(&self, resource: &StackResource) -> Result<(), String> {
845        let service_token = match &resource.service_token {
846            Some(token) => token.clone(),
847            None => {
848                // No ServiceToken stored — nothing to invoke
849                return Ok(());
850            }
851        };
852
853        let request_id = Uuid::new_v4().to_string();
854
855        let event = serde_json::json!({
856            "RequestType": "Delete",
857            "ServiceToken": service_token,
858            "StackId": self.stack_id,
859            "RequestId": request_id,
860            "ResourceType": resource.resource_type,
861            "LogicalResourceId": resource.logical_id,
862            "PhysicalResourceId": resource.physical_id,
863        });
864
865        let payload = serde_json::to_string(&event).map_err(|e| e.to_string())?;
866
867        // Best-effort: don't fail stack deletion if Lambda invocation fails
868        if let Err(e) = self.invoke_lambda_sync(&service_token, &payload) {
869            tracing::warn!(
870                "Custom resource delete Lambda invocation failed for {}: {e}",
871                resource.logical_id
872            );
873        }
874        Ok(())
875    }
876}
877
878#[cfg(test)]
879mod tests {
880    use super::*;
881    use parking_lot::RwLock;
882
883    fn make_provisioner() -> ResourceProvisioner {
884        ResourceProvisioner {
885            sqs_state: Arc::new(RwLock::new(fakecloud_sqs::state::SqsState::new(
886                "123456789012",
887                "us-east-1",
888                "http://localhost:4566",
889            ))),
890            sns_state: Arc::new(RwLock::new(fakecloud_sns::state::SnsState::new(
891                "123456789012",
892                "us-east-1",
893                "http://localhost:4566",
894            ))),
895            ssm_state: Arc::new(RwLock::new(fakecloud_ssm::state::SsmState::new(
896                "123456789012",
897                "us-east-1",
898            ))),
899            iam_state: Arc::new(RwLock::new(fakecloud_iam::state::IamState::new(
900                "123456789012",
901            ))),
902            s3_state: Arc::new(RwLock::new(fakecloud_s3::state::S3State::new(
903                "123456789012",
904                "us-east-1",
905            ))),
906            eventbridge_state: Arc::new(RwLock::new(
907                fakecloud_eventbridge::state::EventBridgeState::new("123456789012", "us-east-1"),
908            )),
909            dynamodb_state: Arc::new(RwLock::new(fakecloud_dynamodb::state::DynamoDbState::new(
910                "123456789012",
911                "us-east-1",
912            ))),
913            logs_state: Arc::new(RwLock::new(fakecloud_logs::state::LogsState::new(
914                "123456789012",
915                "us-east-1",
916            ))),
917            delivery: Arc::new(DeliveryBus::new()),
918            account_id: "123456789012".to_string(),
919            region: "us-east-1".to_string(),
920            stack_id: "arn:aws:cloudformation:us-east-1:123456789012:stack/test/00000000-0000-0000-0000-000000000000".to_string(),
921        }
922    }
923
924    fn make_resource(
925        resource_type: &str,
926        logical_id: &str,
927        props: serde_json::Value,
928    ) -> ResourceDefinition {
929        ResourceDefinition {
930            logical_id: logical_id.to_string(),
931            resource_type: resource_type.to_string(),
932            properties: props,
933        }
934    }
935
936    #[test]
937    fn sns_subscription_rejects_nonexistent_topic() {
938        let prov = make_provisioner();
939        let resource = make_resource(
940            "AWS::SNS::Subscription",
941            "MySub",
942            serde_json::json!({
943                "TopicArn": "arn:aws:sns:us-east-1:123456789012:NonExistent",
944                "Protocol": "sqs",
945                "Endpoint": "arn:aws:sqs:us-east-1:123456789012:my-queue"
946            }),
947        );
948        let result = prov.create_resource(&resource);
949        assert!(result.is_err());
950        assert!(result.unwrap_err().contains("does not exist"));
951    }
952
953    #[test]
954    fn sns_subscription_succeeds_when_topic_exists() {
955        let prov = make_provisioner();
956        // First create the topic
957        let topic = make_resource(
958            "AWS::SNS::Topic",
959            "MyTopic",
960            serde_json::json!({ "TopicName": "my-topic" }),
961        );
962        let topic_result = prov.create_resource(&topic);
963        assert!(topic_result.is_ok());
964        let topic_arn = topic_result.unwrap().physical_id;
965
966        // Now create subscription referencing that topic
967        let sub = make_resource(
968            "AWS::SNS::Subscription",
969            "MySub",
970            serde_json::json!({
971                "TopicArn": topic_arn,
972                "Protocol": "sqs",
973                "Endpoint": "arn:aws:sqs:us-east-1:123456789012:my-queue"
974            }),
975        );
976        let result = prov.create_resource(&sub);
977        assert!(result.is_ok());
978    }
979
980    #[test]
981    fn eventbridge_rule_arn_default_bus_omits_bus_name() {
982        let prov = make_provisioner();
983        let resource = make_resource(
984            "AWS::Events::Rule",
985            "MyRule",
986            serde_json::json!({
987                "Name": "my-rule",
988                "ScheduleExpression": "rate(1 hour)"
989            }),
990        );
991        let result = prov.create_resource(&resource).unwrap();
992        // For default bus, ARN should be rule/<name> without /default/
993        assert_eq!(
994            result.physical_id,
995            "arn:aws:events:us-east-1:123456789012:rule/my-rule"
996        );
997        assert!(!result.physical_id.contains("rule/default/"));
998    }
999
1000    #[test]
1001    fn eventbridge_rule_arn_custom_bus_includes_bus_name() {
1002        let prov = make_provisioner();
1003        // Create a custom bus first
1004        {
1005            let mut state = prov.eventbridge_state.write();
1006            state.buses.insert(
1007                "custom-bus".to_string(),
1008                fakecloud_eventbridge::state::EventBus {
1009                    name: "custom-bus".to_string(),
1010                    arn: "arn:aws:events:us-east-1:123456789012:event-bus/custom-bus".to_string(),
1011                    policy: None,
1012                    creation_time: Utc::now(),
1013                    last_modified_time: Utc::now(),
1014                    description: None,
1015                    kms_key_identifier: None,
1016                    dead_letter_config: None,
1017                    tags: HashMap::new(),
1018                },
1019            );
1020        }
1021        let resource = make_resource(
1022            "AWS::Events::Rule",
1023            "MyRule",
1024            serde_json::json!({
1025                "Name": "my-rule",
1026                "EventBusName": "custom-bus",
1027                "ScheduleExpression": "rate(1 hour)"
1028            }),
1029        );
1030        let result = prov.create_resource(&resource).unwrap();
1031        assert_eq!(
1032            result.physical_id,
1033            "arn:aws:events:us-east-1:123456789012:rule/custom-bus/my-rule"
1034        );
1035    }
1036
1037    #[test]
1038    fn eventbridge_rule_rejects_nonexistent_bus() {
1039        let prov = make_provisioner();
1040        let resource = make_resource(
1041            "AWS::Events::Rule",
1042            "MyRule",
1043            serde_json::json!({
1044                "Name": "my-rule",
1045                "EventBusName": "nonexistent-bus",
1046                "ScheduleExpression": "rate(1 hour)"
1047            }),
1048        );
1049        let result = prov.create_resource(&resource);
1050        assert!(result.is_err());
1051        assert!(result.unwrap_err().contains("does not exist"));
1052    }
1053
1054    #[test]
1055    fn custom_resource_requires_service_token() {
1056        let prov = make_provisioner();
1057        let resource = make_resource(
1058            "Custom::MyResource",
1059            "MyCustom",
1060            serde_json::json!({
1061                "Foo": "bar"
1062            }),
1063        );
1064        let result = prov.create_resource(&resource);
1065        assert!(result.is_err());
1066        assert!(
1067            result.unwrap_err().contains("ServiceToken"),
1068            "Should require ServiceToken property"
1069        );
1070    }
1071
1072    #[test]
1073    fn custom_resource_succeeds_without_lambda_delivery() {
1074        // When no Lambda delivery is configured, custom resource creation
1075        // should still succeed (the invocation is silently skipped).
1076        let prov = make_provisioner();
1077        let resource = make_resource(
1078            "Custom::MyResource",
1079            "MyCustom",
1080            serde_json::json!({
1081                "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:my-func",
1082                "Foo": "bar"
1083            }),
1084        );
1085        let result = prov.create_resource(&resource);
1086        assert!(result.is_ok());
1087        let sr = result.unwrap();
1088        assert_eq!(sr.logical_id, "MyCustom");
1089        assert_eq!(sr.resource_type, "Custom::MyResource");
1090        assert!(sr.physical_id.starts_with("MyCustom-"));
1091    }
1092
1093    #[test]
1094    fn cloudformation_custom_resource_type_succeeds() {
1095        let prov = make_provisioner();
1096        let resource = make_resource(
1097            "AWS::CloudFormation::CustomResource",
1098            "MyCustom2",
1099            serde_json::json!({
1100                "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:my-func",
1101                "Key": "value"
1102            }),
1103        );
1104        let result = prov.create_resource(&resource);
1105        assert!(result.is_ok());
1106        let sr = result.unwrap();
1107        assert_eq!(sr.resource_type, "AWS::CloudFormation::CustomResource");
1108    }
1109}