Skip to main content

fakecloud_cloudformation/
resource_provisioner.rs

1use chrono::Utc;
2use parking_lot::RwLock;
3use std::collections::HashMap;
4use std::sync::Arc;
5use uuid::Uuid;
6
7use fakecloud_core::delivery::DeliveryBus;
8use fakecloud_dynamodb::state::{
9    AttributeDefinition, DynamoTable, KeySchemaElement, ProvisionedThroughput, SharedDynamoDbState,
10};
11use fakecloud_eventbridge::state::{EventRule, SharedEventBridgeState};
12use fakecloud_iam::state::{IamPolicy, IamRole, PolicyVersion, SharedIamState};
13use fakecloud_logs::state::SharedLogsState;
14use fakecloud_s3::state::{S3Bucket, SharedS3State};
15use fakecloud_sns::state::{SharedSnsState, SnsSubscription, SnsTopic};
16use fakecloud_sqs::state::{SharedSqsState, SqsQueue};
17use fakecloud_ssm::state::{SharedSsmState, SsmParameter};
18
19use crate::state::StackResource;
20use crate::template::ResourceDefinition;
21
22/// Holds references to all service states so CloudFormation can provision resources.
23pub struct ResourceProvisioner {
24    pub sqs_state: SharedSqsState,
25    pub sns_state: SharedSnsState,
26    pub ssm_state: SharedSsmState,
27    pub iam_state: SharedIamState,
28    pub s3_state: SharedS3State,
29    pub eventbridge_state: SharedEventBridgeState,
30    pub dynamodb_state: SharedDynamoDbState,
31    pub logs_state: SharedLogsState,
32    pub delivery: Arc<DeliveryBus>,
33    pub account_id: String,
34    pub region: String,
35    pub stack_id: String,
36}
37
38impl ResourceProvisioner {
39    /// Create a resource and return the StackResource with physical ID.
40    pub fn create_resource(&self, resource: &ResourceDefinition) -> Result<StackResource, String> {
41        let result = match resource.resource_type.as_str() {
42            "AWS::SQS::Queue" => self.create_sqs_queue(resource),
43            "AWS::SNS::Topic" => self.create_sns_topic(resource),
44            "AWS::SNS::Subscription" => self.create_sns_subscription(resource),
45            "AWS::SSM::Parameter" => self.create_ssm_parameter(resource),
46            "AWS::IAM::Role" => self.create_iam_role(resource),
47            "AWS::IAM::Policy" => self.create_iam_policy(resource),
48            "AWS::S3::Bucket" => self.create_s3_bucket(resource),
49            "AWS::Events::Rule" => self.create_eventbridge_rule(resource),
50            "AWS::DynamoDB::Table" => self.create_dynamodb_table(resource),
51            "AWS::Logs::LogGroup" => self.create_log_group(resource),
52            t if t.starts_with("Custom::") || t == "AWS::CloudFormation::CustomResource" => {
53                self.create_custom_resource(resource)
54            }
55            other => Err(format!("Unsupported resource type: {other}")),
56        };
57
58        let is_custom = resource.resource_type.starts_with("Custom::")
59            || resource.resource_type == "AWS::CloudFormation::CustomResource";
60        let service_token = if is_custom {
61            resource
62                .properties
63                .get("ServiceToken")
64                .and_then(|v| v.as_str())
65                .map(|s| s.to_string())
66        } else {
67            None
68        };
69
70        result.map(|physical_id| StackResource {
71            logical_id: resource.logical_id.clone(),
72            physical_id,
73            resource_type: resource.resource_type.clone(),
74            status: "CREATE_COMPLETE".to_string(),
75            service_token,
76        })
77    }
78
79    /// Delete a previously created resource.
80    pub fn delete_resource(&self, resource: &StackResource) -> Result<(), String> {
81        match resource.resource_type.as_str() {
82            "AWS::SQS::Queue" => self.delete_sqs_queue(&resource.physical_id),
83            "AWS::SNS::Topic" => self.delete_sns_topic(&resource.physical_id),
84            "AWS::SNS::Subscription" => self.delete_sns_subscription(&resource.physical_id),
85            "AWS::SSM::Parameter" => self.delete_ssm_parameter(&resource.physical_id),
86            "AWS::IAM::Role" => self.delete_iam_role(&resource.physical_id),
87            "AWS::IAM::Policy" => self.delete_iam_policy(&resource.physical_id),
88            "AWS::S3::Bucket" => self.delete_s3_bucket(&resource.physical_id),
89            "AWS::Events::Rule" => self.delete_eventbridge_rule(&resource.physical_id),
90            "AWS::DynamoDB::Table" => self.delete_dynamodb_table(&resource.physical_id),
91            "AWS::Logs::LogGroup" => self.delete_log_group(&resource.physical_id),
92            t if t.starts_with("Custom::") || t == "AWS::CloudFormation::CustomResource" => {
93                self.delete_custom_resource(resource)
94            }
95            other => Err(format!("Unsupported resource type: {other}")),
96        }
97    }
98
99    // --- SQS ---
100
101    fn create_sqs_queue(&self, resource: &ResourceDefinition) -> Result<String, String> {
102        let props = &resource.properties;
103        let queue_name = props
104            .get("QueueName")
105            .and_then(|v| v.as_str())
106            .unwrap_or(&resource.logical_id);
107
108        let mut state = self.sqs_state.write();
109        let queue_url = format!("{}/{}/{}", state.endpoint, state.account_id, queue_name);
110        let arn = format!(
111            "arn:aws:sqs:{}:{}:{}",
112            state.region, state.account_id, queue_name
113        );
114
115        let is_fifo = queue_name.ends_with(".fifo");
116        let mut attributes = HashMap::new();
117        if let Some(obj) = props.as_object() {
118            for (k, v) in obj {
119                if k != "QueueName" {
120                    if let Some(s) = v.as_str() {
121                        attributes.insert(k.clone(), s.to_string());
122                    } else if let Some(n) = v.as_i64() {
123                        attributes.insert(k.clone(), n.to_string());
124                    }
125                }
126            }
127        }
128
129        let queue = SqsQueue {
130            queue_name: queue_name.to_string(),
131            queue_url: queue_url.clone(),
132            arn,
133            created_at: Utc::now(),
134            messages: std::collections::VecDeque::new(),
135            inflight: Vec::new(),
136            attributes,
137            is_fifo,
138            dedup_cache: HashMap::new(),
139            redrive_policy: None,
140            tags: HashMap::new(),
141            next_sequence_number: 0,
142            permission_labels: Vec::new(),
143            receipt_handle_map: HashMap::new(),
144        };
145
146        state
147            .name_to_url
148            .insert(queue_name.to_string(), queue_url.clone());
149        state.queues.insert(queue_url.clone(), queue);
150
151        Ok(queue_url)
152    }
153
154    fn delete_sqs_queue(&self, physical_id: &str) -> Result<(), String> {
155        let mut state = self.sqs_state.write();
156        if let Some(queue) = state.queues.remove(physical_id) {
157            state.name_to_url.remove(&queue.queue_name);
158        }
159        Ok(())
160    }
161
162    // --- SNS ---
163
164    fn create_sns_topic(&self, resource: &ResourceDefinition) -> Result<String, String> {
165        let props = &resource.properties;
166        let topic_name = props
167            .get("TopicName")
168            .and_then(|v| v.as_str())
169            .unwrap_or(&resource.logical_id);
170
171        let mut state = self.sns_state.write();
172        let topic_arn = format!(
173            "arn:aws:sns:{}:{}:{}",
174            state.region, state.account_id, topic_name
175        );
176
177        let topic = SnsTopic {
178            topic_arn: topic_arn.clone(),
179            name: topic_name.to_string(),
180            attributes: HashMap::new(),
181            tags: Vec::new(),
182            is_fifo: topic_name.ends_with(".fifo"),
183            created_at: Utc::now(),
184        };
185
186        state.topics.insert(topic_arn.clone(), topic);
187        Ok(topic_arn)
188    }
189
190    fn delete_sns_topic(&self, physical_id: &str) -> Result<(), String> {
191        let mut state = self.sns_state.write();
192        state.topics.remove(physical_id);
193        // Also remove subscriptions for this topic
194        state
195            .subscriptions
196            .retain(|_, sub| sub.topic_arn != physical_id);
197        Ok(())
198    }
199
200    // --- SNS Subscription ---
201
202    fn create_sns_subscription(&self, resource: &ResourceDefinition) -> Result<String, String> {
203        let props = &resource.properties;
204        let topic_arn = props
205            .get("TopicArn")
206            .and_then(|v| v.as_str())
207            .ok_or("SNS Subscription requires TopicArn")?;
208        let protocol = props
209            .get("Protocol")
210            .and_then(|v| v.as_str())
211            .ok_or("SNS Subscription requires Protocol")?;
212        let endpoint = props
213            .get("Endpoint")
214            .and_then(|v| v.as_str())
215            .ok_or("SNS Subscription requires Endpoint")?;
216
217        let mut state = self.sns_state.write();
218
219        // Validate that the topic exists
220        if !state.topics.contains_key(topic_arn) {
221            return Err(format!("Topic ARN does not exist: {topic_arn}"));
222        }
223
224        let sub_arn = format!("{}:{}", topic_arn, Uuid::new_v4());
225
226        let subscription = SnsSubscription {
227            subscription_arn: sub_arn.clone(),
228            topic_arn: topic_arn.to_string(),
229            protocol: protocol.to_string(),
230            endpoint: endpoint.to_string(),
231            owner: state.account_id.clone(),
232            attributes: HashMap::new(),
233            confirmed: true,
234            confirmation_token: None,
235        };
236
237        state.subscriptions.insert(sub_arn.clone(), subscription);
238        Ok(sub_arn)
239    }
240
241    fn delete_sns_subscription(&self, physical_id: &str) -> Result<(), String> {
242        let mut state = self.sns_state.write();
243        state.subscriptions.remove(physical_id);
244        Ok(())
245    }
246
247    // --- SSM ---
248
249    fn create_ssm_parameter(&self, resource: &ResourceDefinition) -> Result<String, String> {
250        let props = &resource.properties;
251        let name = props
252            .get("Name")
253            .and_then(|v| v.as_str())
254            .ok_or("SSM Parameter requires Name")?;
255        let value = props
256            .get("Value")
257            .and_then(|v| v.as_str())
258            .ok_or("SSM Parameter requires Value")?;
259        let param_type = props
260            .get("Type")
261            .and_then(|v| v.as_str())
262            .unwrap_or("String");
263
264        let mut state = self.ssm_state.write();
265        let arn = format!(
266            "arn:aws:ssm:{}:{}:parameter{}",
267            state.region,
268            state.account_id,
269            if name.starts_with('/') {
270                name.to_string()
271            } else {
272                format!("/{name}")
273            }
274        );
275
276        let parameter = SsmParameter {
277            name: name.to_string(),
278            value: value.to_string(),
279            param_type: param_type.to_string(),
280            version: 1,
281            arn: arn.clone(),
282            last_modified: Utc::now(),
283            history: Vec::new(),
284            tags: HashMap::new(),
285            labels: HashMap::new(),
286            description: props
287                .get("Description")
288                .and_then(|v| v.as_str())
289                .map(|s| s.to_string()),
290            allowed_pattern: None,
291            key_id: None,
292            data_type: "text".to_string(),
293            tier: "Standard".to_string(),
294            policies: None,
295        };
296
297        state.parameters.insert(name.to_string(), parameter);
298        Ok(name.to_string())
299    }
300
301    fn delete_ssm_parameter(&self, physical_id: &str) -> Result<(), String> {
302        let mut state = self.ssm_state.write();
303        state.parameters.remove(physical_id);
304        Ok(())
305    }
306
307    // --- IAM Role ---
308
309    fn create_iam_role(&self, resource: &ResourceDefinition) -> Result<String, String> {
310        let props = &resource.properties;
311        let role_name = props
312            .get("RoleName")
313            .and_then(|v| v.as_str())
314            .unwrap_or(&resource.logical_id);
315
316        let assume_role_policy = props
317            .get("AssumeRolePolicyDocument")
318            .map(|v| {
319                if v.is_string() {
320                    v.as_str().unwrap().to_string()
321                } else {
322                    serde_json::to_string(v).unwrap_or_default()
323                }
324            })
325            .unwrap_or_default();
326
327        let path = props.get("Path").and_then(|v| v.as_str()).unwrap_or("/");
328
329        let mut state = self.iam_state.write();
330        let role_id = format!(
331            "FKIA{}",
332            &Uuid::new_v4().to_string().replace('-', "").to_uppercase()[..16]
333        );
334        let arn = format!(
335            "arn:aws:iam::{}:role{}{}",
336            state.account_id,
337            if path == "/" { "/" } else { path },
338            role_name
339        );
340
341        let role = IamRole {
342            role_name: role_name.to_string(),
343            role_id,
344            arn: arn.clone(),
345            path: path.to_string(),
346            assume_role_policy_document: assume_role_policy,
347            created_at: Utc::now(),
348            description: props
349                .get("Description")
350                .and_then(|v| v.as_str())
351                .map(|s| s.to_string()),
352            max_session_duration: 3600,
353            tags: Vec::new(),
354            permissions_boundary: None,
355        };
356
357        state.roles.insert(role_name.to_string(), role);
358        Ok(arn)
359    }
360
361    fn delete_iam_role(&self, physical_id: &str) -> Result<(), String> {
362        let mut state = self.iam_state.write();
363        // physical_id is the ARN; find the role name
364        let role_name = state
365            .roles
366            .iter()
367            .find(|(_, r)| r.arn == physical_id)
368            .map(|(name, _)| name.clone());
369        if let Some(name) = role_name {
370            state.roles.remove(&name);
371            state.role_policies.remove(&name);
372            state.role_inline_policies.remove(&name);
373        }
374        Ok(())
375    }
376
377    // --- IAM Policy ---
378
379    fn create_iam_policy(&self, resource: &ResourceDefinition) -> Result<String, String> {
380        let props = &resource.properties;
381        let policy_name = props
382            .get("PolicyName")
383            .and_then(|v| v.as_str())
384            .unwrap_or(&resource.logical_id);
385
386        let policy_document = props
387            .get("PolicyDocument")
388            .map(|v| {
389                if v.is_string() {
390                    v.as_str().unwrap().to_string()
391                } else {
392                    serde_json::to_string(v).unwrap_or_default()
393                }
394            })
395            .unwrap_or_default();
396
397        let path = props.get("Path").and_then(|v| v.as_str()).unwrap_or("/");
398
399        let mut state = self.iam_state.write();
400        let policy_id = format!(
401            "FSIA{}",
402            &Uuid::new_v4().to_string().replace('-', "").to_uppercase()[..16]
403        );
404        let arn = format!(
405            "arn:aws:iam::{}:policy{}{}",
406            state.account_id,
407            if path == "/" { "/" } else { path },
408            policy_name
409        );
410
411        let now = Utc::now();
412        let policy = IamPolicy {
413            policy_name: policy_name.to_string(),
414            policy_id,
415            arn: arn.clone(),
416            path: path.to_string(),
417            description: props
418                .get("Description")
419                .and_then(|v| v.as_str())
420                .unwrap_or("")
421                .to_string(),
422            created_at: now,
423            tags: Vec::new(),
424            default_version_id: "v1".to_string(),
425            versions: vec![PolicyVersion {
426                version_id: "v1".to_string(),
427                document: policy_document,
428                is_default: true,
429                created_at: now,
430            }],
431            next_version_num: 2,
432            attachment_count: 0,
433        };
434
435        state.policies.insert(arn.clone(), policy);
436        Ok(arn)
437    }
438
439    fn delete_iam_policy(&self, physical_id: &str) -> Result<(), String> {
440        let mut state = self.iam_state.write();
441        state.policies.remove(physical_id);
442        Ok(())
443    }
444
445    // --- S3 ---
446
447    fn create_s3_bucket(&self, resource: &ResourceDefinition) -> Result<String, String> {
448        let props = &resource.properties;
449        let bucket_name = props
450            .get("BucketName")
451            .and_then(|v| v.as_str())
452            .unwrap_or(&resource.logical_id);
453
454        let mut state = self.s3_state.write();
455        let bucket = S3Bucket::new(bucket_name, &state.region, &state.account_id);
456        state.buckets.insert(bucket_name.to_string(), bucket);
457        Ok(bucket_name.to_string())
458    }
459
460    fn delete_s3_bucket(&self, physical_id: &str) -> Result<(), String> {
461        let mut state = self.s3_state.write();
462        state.buckets.remove(physical_id);
463        Ok(())
464    }
465
466    // --- EventBridge ---
467
468    fn create_eventbridge_rule(&self, resource: &ResourceDefinition) -> Result<String, String> {
469        let props = &resource.properties;
470        let rule_name = props
471            .get("Name")
472            .and_then(|v| v.as_str())
473            .unwrap_or(&resource.logical_id);
474        let event_bus_name = props
475            .get("EventBusName")
476            .and_then(|v| v.as_str())
477            .unwrap_or("default");
478
479        let mut state = self.eventbridge_state.write();
480
481        // Validate that the event bus exists
482        if !state.buses.contains_key(event_bus_name) {
483            return Err(format!("Event bus does not exist: {event_bus_name}"));
484        }
485
486        let arn = if event_bus_name == "default" {
487            format!(
488                "arn:aws:events:{}:{}:rule/{}",
489                state.region, state.account_id, rule_name
490            )
491        } else {
492            format!(
493                "arn:aws:events:{}:{}:rule/{}/{}",
494                state.region, state.account_id, event_bus_name, rule_name
495            )
496        };
497
498        let rule = EventRule {
499            name: rule_name.to_string(),
500            arn: arn.clone(),
501            event_bus_name: event_bus_name.to_string(),
502            event_pattern: props.get("EventPattern").map(|v| {
503                if v.is_string() {
504                    v.as_str().unwrap().to_string()
505                } else {
506                    serde_json::to_string(v).unwrap_or_default()
507                }
508            }),
509            schedule_expression: props
510                .get("ScheduleExpression")
511                .and_then(|v| v.as_str())
512                .map(|s| s.to_string()),
513            state: props
514                .get("State")
515                .and_then(|v| v.as_str())
516                .unwrap_or("ENABLED")
517                .to_string(),
518            description: props
519                .get("Description")
520                .and_then(|v| v.as_str())
521                .map(|s| s.to_string()),
522            role_arn: props
523                .get("RoleArn")
524                .and_then(|v| v.as_str())
525                .map(|s| s.to_string()),
526            managed_by: None,
527            created_by: None,
528            targets: Vec::new(),
529            tags: HashMap::new(),
530            last_fired: None,
531        };
532
533        state
534            .rules
535            .insert((event_bus_name.to_string(), rule_name.to_string()), rule);
536        Ok(arn)
537    }
538
539    fn delete_eventbridge_rule(&self, physical_id: &str) -> Result<(), String> {
540        let mut state = self.eventbridge_state.write();
541        // physical_id is the ARN; find the rule key
542        let key = state
543            .rules
544            .iter()
545            .find(|(_, r)| r.arn == physical_id)
546            .map(|(k, _)| k.clone());
547        if let Some(k) = key {
548            state.rules.remove(&k);
549        }
550        Ok(())
551    }
552
553    // --- DynamoDB ---
554
555    fn create_dynamodb_table(&self, resource: &ResourceDefinition) -> Result<String, String> {
556        let props = &resource.properties;
557        let table_name = props
558            .get("TableName")
559            .and_then(|v| v.as_str())
560            .unwrap_or(&resource.logical_id);
561
562        let mut key_schema = Vec::new();
563        if let Some(ks) = props.get("KeySchema").and_then(|v| v.as_array()) {
564            for item in ks {
565                let attr_name = item
566                    .get("AttributeName")
567                    .and_then(|v| v.as_str())
568                    .unwrap_or("")
569                    .to_string();
570                let key_type = item
571                    .get("KeyType")
572                    .and_then(|v| v.as_str())
573                    .unwrap_or("HASH")
574                    .to_string();
575                key_schema.push(KeySchemaElement {
576                    attribute_name: attr_name,
577                    key_type,
578                });
579            }
580        }
581
582        let mut attribute_definitions = Vec::new();
583        if let Some(defs) = props.get("AttributeDefinitions").and_then(|v| v.as_array()) {
584            for item in defs {
585                let attr_name = item
586                    .get("AttributeName")
587                    .and_then(|v| v.as_str())
588                    .unwrap_or("")
589                    .to_string();
590                let attr_type = item
591                    .get("AttributeType")
592                    .and_then(|v| v.as_str())
593                    .unwrap_or("S")
594                    .to_string();
595                attribute_definitions.push(AttributeDefinition {
596                    attribute_name: attr_name,
597                    attribute_type: attr_type,
598                });
599            }
600        }
601
602        let billing_mode = props
603            .get("BillingMode")
604            .and_then(|v| v.as_str())
605            .unwrap_or("PAY_PER_REQUEST")
606            .to_string();
607
608        let provisioned_throughput = if billing_mode == "PROVISIONED" {
609            if let Some(pt) = props.get("ProvisionedThroughput") {
610                ProvisionedThroughput {
611                    read_capacity_units: pt
612                        .get("ReadCapacityUnits")
613                        .and_then(|v| v.as_i64())
614                        .unwrap_or(5),
615                    write_capacity_units: pt
616                        .get("WriteCapacityUnits")
617                        .and_then(|v| v.as_i64())
618                        .unwrap_or(5),
619                }
620            } else {
621                ProvisionedThroughput {
622                    read_capacity_units: 5,
623                    write_capacity_units: 5,
624                }
625            }
626        } else {
627            ProvisionedThroughput {
628                read_capacity_units: 0,
629                write_capacity_units: 0,
630            }
631        };
632
633        // Parse StreamSpecification from CloudFormation properties
634        let (stream_enabled, stream_view_type) =
635            if let Some(stream_spec) = props.get("StreamSpecification") {
636                let view_type = stream_spec
637                    .get("StreamViewType")
638                    .and_then(|v| v.as_str())
639                    .map(|s| s.to_string());
640                let enabled = stream_spec
641                    .get("StreamEnabled")
642                    .and_then(|v| v.as_bool().or_else(|| v.as_str().map(|s| s == "true")))
643                    // If StreamViewType is set, treat streams as enabled even if StreamEnabled is missing
644                    .unwrap_or(view_type.is_some());
645                (enabled, view_type)
646            } else {
647                (false, None)
648            };
649
650        let mut state = self.dynamodb_state.write();
651        let arn = format!(
652            "arn:aws:dynamodb:{}:{}:table/{}",
653            state.region, state.account_id, table_name
654        );
655
656        let stream_arn = if stream_enabled {
657            Some(format!(
658                "{}/stream/{}",
659                arn,
660                Utc::now().format("%Y-%m-%dT%H:%M:%S.%3f")
661            ))
662        } else {
663            None
664        };
665
666        let table = DynamoTable {
667            name: table_name.to_string(),
668            arn: arn.clone(),
669            key_schema,
670            attribute_definitions,
671            provisioned_throughput,
672            items: Vec::new(),
673            gsi: Vec::new(),
674            lsi: Vec::new(),
675            tags: HashMap::new(),
676            created_at: Utc::now(),
677            status: "ACTIVE".to_string(),
678            item_count: 0,
679            size_bytes: 0,
680            billing_mode,
681            ttl_attribute: None,
682            ttl_enabled: false,
683            resource_policy: None,
684            pitr_enabled: false,
685            kinesis_destinations: Vec::new(),
686            contributor_insights_status: "DISABLED".to_string(),
687            contributor_insights_counters: HashMap::new(),
688            stream_enabled,
689            stream_view_type,
690            stream_arn,
691            stream_records: Arc::new(RwLock::new(Vec::new())),
692            sse_type: None,
693            sse_kms_key_arn: None,
694        };
695
696        state.tables.insert(table_name.to_string(), table);
697        Ok(arn)
698    }
699
700    fn delete_dynamodb_table(&self, physical_id: &str) -> Result<(), String> {
701        let mut state = self.dynamodb_state.write();
702        // physical_id is the ARN; find the table name
703        let table_name = state
704            .tables
705            .iter()
706            .find(|(_, t)| t.arn == physical_id)
707            .map(|(name, _)| name.clone());
708        if let Some(name) = table_name {
709            state.tables.remove(&name);
710        }
711        Ok(())
712    }
713
714    // --- CloudWatch Logs ---
715
716    fn create_log_group(&self, resource: &ResourceDefinition) -> Result<String, String> {
717        let props = &resource.properties;
718        let log_group_name = props
719            .get("LogGroupName")
720            .and_then(|v| v.as_str())
721            .unwrap_or(&resource.logical_id);
722
723        let retention_in_days = props
724            .get("RetentionInDays")
725            .and_then(|v| v.as_i64())
726            .map(|v| v as i32);
727
728        let mut state = self.logs_state.write();
729        let arn = format!(
730            "arn:aws:logs:{}:{}:log-group:{}:*",
731            state.region, state.account_id, log_group_name
732        );
733
734        let log_group = fakecloud_logs::state::LogGroup {
735            name: log_group_name.to_string(),
736            arn: arn.clone(),
737            creation_time: Utc::now().timestamp_millis(),
738            retention_in_days,
739            kms_key_id: None,
740            stored_bytes: 0,
741            log_streams: HashMap::new(),
742            tags: HashMap::new(),
743            subscription_filters: Vec::new(),
744            data_protection_policy: None,
745            index_policies: Vec::new(),
746            transformer: None,
747            deletion_protection: false,
748        };
749
750        state
751            .log_groups
752            .insert(log_group_name.to_string(), log_group);
753        Ok(arn)
754    }
755
756    fn delete_log_group(&self, physical_id: &str) -> Result<(), String> {
757        let mut state = self.logs_state.write();
758        // physical_id is the ARN; find the log group name
759        let name = state
760            .log_groups
761            .iter()
762            .find(|(_, g)| g.arn == physical_id)
763            .map(|(name, _)| name.clone());
764        if let Some(name) = name {
765            state.log_groups.remove(&name);
766        }
767        Ok(())
768    }
769
770    // --- Custom Resources ---
771
772    /// Invoke a Lambda function synchronously via the delivery bus.
773    fn invoke_lambda_sync(&self, function_arn: &str, payload: &str) -> Result<(), String> {
774        let delivery = self.delivery.clone();
775        let function_arn = function_arn.to_string();
776        let payload = payload.to_string();
777        std::thread::scope(|s| {
778            s.spawn(|| {
779                let rt = tokio::runtime::Builder::new_current_thread()
780                    .enable_all()
781                    .build()
782                    .map_err(|e| format!("Failed to create runtime: {e}"))?;
783                rt.block_on(async {
784                    match delivery.invoke_lambda(&function_arn, &payload).await {
785                        Some(Ok(_)) => {
786                            tracing::info!(
787                                "Custom resource Lambda {} invoked successfully",
788                                function_arn
789                            );
790                            Ok(())
791                        }
792                        Some(Err(e)) => {
793                            tracing::warn!(
794                                "Custom resource Lambda {} invocation failed: {e}",
795                                function_arn
796                            );
797                            Err(format!("Lambda invocation failed: {e}"))
798                        }
799                        None => {
800                            tracing::warn!(
801                                "No Lambda delivery configured; skipping custom resource invocation for {}",
802                                function_arn
803                            );
804                            Ok(())
805                        }
806                    }
807                })
808            })
809            .join()
810            .map_err(|_| "Lambda invocation thread panicked".to_string())?
811        })
812    }
813
814    fn create_custom_resource(&self, resource: &ResourceDefinition) -> Result<String, String> {
815        let props = &resource.properties;
816        let service_token = props
817            .get("ServiceToken")
818            .and_then(|v| v.as_str())
819            .ok_or("Custom resource requires ServiceToken property")?;
820
821        let request_id = Uuid::new_v4().to_string();
822
823        // Build the CloudFormation custom resource event
824        let event = serde_json::json!({
825            "RequestType": "Create",
826            "ServiceToken": service_token,
827            "StackId": self.stack_id,
828            "RequestId": request_id,
829            "ResourceType": resource.resource_type,
830            "LogicalResourceId": resource.logical_id,
831            "ResourceProperties": props,
832        });
833
834        let payload = serde_json::to_string(&event).map_err(|e| e.to_string())?;
835        self.invoke_lambda_sync(service_token, &payload)?;
836
837        // Physical resource ID: use a generated ID (the Lambda could return one,
838        // but for simplicity we generate one here).
839        let physical_id = format!("{}-{}", resource.logical_id, &request_id[..8]);
840        Ok(physical_id)
841    }
842
843    fn delete_custom_resource(&self, resource: &StackResource) -> Result<(), String> {
844        let service_token = match &resource.service_token {
845            Some(token) => token.clone(),
846            None => {
847                // No ServiceToken stored — nothing to invoke
848                return Ok(());
849            }
850        };
851
852        let request_id = Uuid::new_v4().to_string();
853
854        let event = serde_json::json!({
855            "RequestType": "Delete",
856            "ServiceToken": service_token,
857            "StackId": self.stack_id,
858            "RequestId": request_id,
859            "ResourceType": resource.resource_type,
860            "LogicalResourceId": resource.logical_id,
861            "PhysicalResourceId": resource.physical_id,
862        });
863
864        let payload = serde_json::to_string(&event).map_err(|e| e.to_string())?;
865
866        // Best-effort: don't fail stack deletion if Lambda invocation fails
867        if let Err(e) = self.invoke_lambda_sync(&service_token, &payload) {
868            tracing::warn!(
869                "Custom resource delete Lambda invocation failed for {}: {e}",
870                resource.logical_id
871            );
872        }
873        Ok(())
874    }
875}
876
877#[cfg(test)]
878mod tests {
879    use super::*;
880    use parking_lot::RwLock;
881
882    fn make_provisioner() -> ResourceProvisioner {
883        ResourceProvisioner {
884            sqs_state: Arc::new(RwLock::new(fakecloud_sqs::state::SqsState::new(
885                "123456789012",
886                "us-east-1",
887                "http://localhost:4566",
888            ))),
889            sns_state: Arc::new(RwLock::new(fakecloud_sns::state::SnsState::new(
890                "123456789012",
891                "us-east-1",
892                "http://localhost:4566",
893            ))),
894            ssm_state: Arc::new(RwLock::new(fakecloud_ssm::state::SsmState::new(
895                "123456789012",
896                "us-east-1",
897            ))),
898            iam_state: Arc::new(RwLock::new(fakecloud_iam::state::IamState::new(
899                "123456789012",
900            ))),
901            s3_state: Arc::new(RwLock::new(fakecloud_s3::state::S3State::new(
902                "123456789012",
903                "us-east-1",
904            ))),
905            eventbridge_state: Arc::new(RwLock::new(
906                fakecloud_eventbridge::state::EventBridgeState::new("123456789012", "us-east-1"),
907            )),
908            dynamodb_state: Arc::new(RwLock::new(fakecloud_dynamodb::state::DynamoDbState::new(
909                "123456789012",
910                "us-east-1",
911            ))),
912            logs_state: Arc::new(RwLock::new(fakecloud_logs::state::LogsState::new(
913                "123456789012",
914                "us-east-1",
915            ))),
916            delivery: Arc::new(DeliveryBus::new()),
917            account_id: "123456789012".to_string(),
918            region: "us-east-1".to_string(),
919            stack_id: "arn:aws:cloudformation:us-east-1:123456789012:stack/test/00000000-0000-0000-0000-000000000000".to_string(),
920        }
921    }
922
923    fn make_resource(
924        resource_type: &str,
925        logical_id: &str,
926        props: serde_json::Value,
927    ) -> ResourceDefinition {
928        ResourceDefinition {
929            logical_id: logical_id.to_string(),
930            resource_type: resource_type.to_string(),
931            properties: props,
932        }
933    }
934
935    #[test]
936    fn sns_subscription_rejects_nonexistent_topic() {
937        let prov = make_provisioner();
938        let resource = make_resource(
939            "AWS::SNS::Subscription",
940            "MySub",
941            serde_json::json!({
942                "TopicArn": "arn:aws:sns:us-east-1:123456789012:NonExistent",
943                "Protocol": "sqs",
944                "Endpoint": "arn:aws:sqs:us-east-1:123456789012:my-queue"
945            }),
946        );
947        let result = prov.create_resource(&resource);
948        assert!(result.is_err());
949        assert!(result.unwrap_err().contains("does not exist"));
950    }
951
952    #[test]
953    fn sns_subscription_succeeds_when_topic_exists() {
954        let prov = make_provisioner();
955        // First create the topic
956        let topic = make_resource(
957            "AWS::SNS::Topic",
958            "MyTopic",
959            serde_json::json!({ "TopicName": "my-topic" }),
960        );
961        let topic_result = prov.create_resource(&topic);
962        assert!(topic_result.is_ok());
963        let topic_arn = topic_result.unwrap().physical_id;
964
965        // Now create subscription referencing that topic
966        let sub = make_resource(
967            "AWS::SNS::Subscription",
968            "MySub",
969            serde_json::json!({
970                "TopicArn": topic_arn,
971                "Protocol": "sqs",
972                "Endpoint": "arn:aws:sqs:us-east-1:123456789012:my-queue"
973            }),
974        );
975        let result = prov.create_resource(&sub);
976        assert!(result.is_ok());
977    }
978
979    #[test]
980    fn eventbridge_rule_arn_default_bus_omits_bus_name() {
981        let prov = make_provisioner();
982        let resource = make_resource(
983            "AWS::Events::Rule",
984            "MyRule",
985            serde_json::json!({
986                "Name": "my-rule",
987                "ScheduleExpression": "rate(1 hour)"
988            }),
989        );
990        let result = prov.create_resource(&resource).unwrap();
991        // For default bus, ARN should be rule/<name> without /default/
992        assert_eq!(
993            result.physical_id,
994            "arn:aws:events:us-east-1:123456789012:rule/my-rule"
995        );
996        assert!(!result.physical_id.contains("rule/default/"));
997    }
998
999    #[test]
1000    fn eventbridge_rule_arn_custom_bus_includes_bus_name() {
1001        let prov = make_provisioner();
1002        // Create a custom bus first
1003        {
1004            let mut state = prov.eventbridge_state.write();
1005            state.buses.insert(
1006                "custom-bus".to_string(),
1007                fakecloud_eventbridge::state::EventBus {
1008                    name: "custom-bus".to_string(),
1009                    arn: "arn:aws:events:us-east-1:123456789012:event-bus/custom-bus".to_string(),
1010                    policy: None,
1011                    creation_time: Utc::now(),
1012                    last_modified_time: Utc::now(),
1013                    description: None,
1014                    kms_key_identifier: None,
1015                    dead_letter_config: None,
1016                    tags: HashMap::new(),
1017                },
1018            );
1019        }
1020        let resource = make_resource(
1021            "AWS::Events::Rule",
1022            "MyRule",
1023            serde_json::json!({
1024                "Name": "my-rule",
1025                "EventBusName": "custom-bus",
1026                "ScheduleExpression": "rate(1 hour)"
1027            }),
1028        );
1029        let result = prov.create_resource(&resource).unwrap();
1030        assert_eq!(
1031            result.physical_id,
1032            "arn:aws:events:us-east-1:123456789012:rule/custom-bus/my-rule"
1033        );
1034    }
1035
1036    #[test]
1037    fn eventbridge_rule_rejects_nonexistent_bus() {
1038        let prov = make_provisioner();
1039        let resource = make_resource(
1040            "AWS::Events::Rule",
1041            "MyRule",
1042            serde_json::json!({
1043                "Name": "my-rule",
1044                "EventBusName": "nonexistent-bus",
1045                "ScheduleExpression": "rate(1 hour)"
1046            }),
1047        );
1048        let result = prov.create_resource(&resource);
1049        assert!(result.is_err());
1050        assert!(result.unwrap_err().contains("does not exist"));
1051    }
1052
1053    #[test]
1054    fn custom_resource_requires_service_token() {
1055        let prov = make_provisioner();
1056        let resource = make_resource(
1057            "Custom::MyResource",
1058            "MyCustom",
1059            serde_json::json!({
1060                "Foo": "bar"
1061            }),
1062        );
1063        let result = prov.create_resource(&resource);
1064        assert!(result.is_err());
1065        assert!(
1066            result.unwrap_err().contains("ServiceToken"),
1067            "Should require ServiceToken property"
1068        );
1069    }
1070
1071    #[test]
1072    fn custom_resource_succeeds_without_lambda_delivery() {
1073        // When no Lambda delivery is configured, custom resource creation
1074        // should still succeed (the invocation is silently skipped).
1075        let prov = make_provisioner();
1076        let resource = make_resource(
1077            "Custom::MyResource",
1078            "MyCustom",
1079            serde_json::json!({
1080                "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:my-func",
1081                "Foo": "bar"
1082            }),
1083        );
1084        let result = prov.create_resource(&resource);
1085        assert!(result.is_ok());
1086        let sr = result.unwrap();
1087        assert_eq!(sr.logical_id, "MyCustom");
1088        assert_eq!(sr.resource_type, "Custom::MyResource");
1089        assert!(sr.physical_id.starts_with("MyCustom-"));
1090    }
1091
1092    #[test]
1093    fn cloudformation_custom_resource_type_succeeds() {
1094        let prov = make_provisioner();
1095        let resource = make_resource(
1096            "AWS::CloudFormation::CustomResource",
1097            "MyCustom2",
1098            serde_json::json!({
1099                "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:my-func",
1100                "Key": "value"
1101            }),
1102        );
1103        let result = prov.create_resource(&resource);
1104        assert!(result.is_ok());
1105        let sr = result.unwrap();
1106        assert_eq!(sr.resource_type, "AWS::CloudFormation::CustomResource");
1107    }
1108}