Skip to main content

fakecloud_cloudformation/
resource_provisioner.rs

1use chrono::Utc;
2use parking_lot::RwLock;
3use std::collections::HashMap;
4use std::sync::Arc;
5use uuid::Uuid;
6
7use fakecloud_core::delivery::DeliveryBus;
8use fakecloud_dynamodb::state::{
9    AttributeDefinition, DynamoTable, KeySchemaElement, ProvisionedThroughput, SharedDynamoDbState,
10};
11use fakecloud_eventbridge::state::{EventRule, SharedEventBridgeState};
12use fakecloud_iam::state::{IamPolicy, IamRole, PolicyVersion, SharedIamState};
13use fakecloud_logs::state::SharedLogsState;
14use fakecloud_s3::state::{S3Bucket, SharedS3State};
15use fakecloud_sns::state::{SharedSnsState, SnsSubscription, SnsTopic};
16use fakecloud_sqs::state::{SharedSqsState, SqsQueue};
17use fakecloud_ssm::state::{SharedSsmState, SsmParameter};
18
19use crate::state::StackResource;
20use crate::template::ResourceDefinition;
21
22/// Holds references to all service states so CloudFormation can provision resources.
23pub struct ResourceProvisioner {
24    pub sqs_state: SharedSqsState,
25    pub sns_state: SharedSnsState,
26    pub ssm_state: SharedSsmState,
27    pub iam_state: SharedIamState,
28    pub s3_state: SharedS3State,
29    pub eventbridge_state: SharedEventBridgeState,
30    pub dynamodb_state: SharedDynamoDbState,
31    pub logs_state: SharedLogsState,
32    pub delivery: Arc<DeliveryBus>,
33    pub account_id: String,
34    pub region: String,
35    pub stack_id: String,
36}
37
38impl ResourceProvisioner {
39    /// Create a resource and return the StackResource with physical ID.
40    pub fn create_resource(&self, resource: &ResourceDefinition) -> Result<StackResource, String> {
41        let result = match resource.resource_type.as_str() {
42            "AWS::SQS::Queue" => self.create_sqs_queue(resource),
43            "AWS::SNS::Topic" => self.create_sns_topic(resource),
44            "AWS::SNS::Subscription" => self.create_sns_subscription(resource),
45            "AWS::SSM::Parameter" => self.create_ssm_parameter(resource),
46            "AWS::IAM::Role" => self.create_iam_role(resource),
47            "AWS::IAM::Policy" => self.create_iam_policy(resource),
48            "AWS::S3::Bucket" => self.create_s3_bucket(resource),
49            "AWS::Events::Rule" => self.create_eventbridge_rule(resource),
50            "AWS::DynamoDB::Table" => self.create_dynamodb_table(resource),
51            "AWS::Logs::LogGroup" => self.create_log_group(resource),
52            t if t.starts_with("Custom::") || t == "AWS::CloudFormation::CustomResource" => {
53                self.create_custom_resource(resource)
54            }
55            other => Err(format!("Unsupported resource type: {other}")),
56        };
57
58        let is_custom = resource.resource_type.starts_with("Custom::")
59            || resource.resource_type == "AWS::CloudFormation::CustomResource";
60        let service_token = if is_custom {
61            resource
62                .properties
63                .get("ServiceToken")
64                .and_then(|v| v.as_str())
65                .map(|s| s.to_string())
66        } else {
67            None
68        };
69
70        result.map(|physical_id| StackResource {
71            logical_id: resource.logical_id.clone(),
72            physical_id,
73            resource_type: resource.resource_type.clone(),
74            status: "CREATE_COMPLETE".to_string(),
75            service_token,
76        })
77    }
78
79    /// Delete a previously created resource.
80    pub fn delete_resource(&self, resource: &StackResource) -> Result<(), String> {
81        match resource.resource_type.as_str() {
82            "AWS::SQS::Queue" => self.delete_sqs_queue(&resource.physical_id),
83            "AWS::SNS::Topic" => self.delete_sns_topic(&resource.physical_id),
84            "AWS::SNS::Subscription" => self.delete_sns_subscription(&resource.physical_id),
85            "AWS::SSM::Parameter" => self.delete_ssm_parameter(&resource.physical_id),
86            "AWS::IAM::Role" => self.delete_iam_role(&resource.physical_id),
87            "AWS::IAM::Policy" => self.delete_iam_policy(&resource.physical_id),
88            "AWS::S3::Bucket" => self.delete_s3_bucket(&resource.physical_id),
89            "AWS::Events::Rule" => self.delete_eventbridge_rule(&resource.physical_id),
90            "AWS::DynamoDB::Table" => self.delete_dynamodb_table(&resource.physical_id),
91            "AWS::Logs::LogGroup" => self.delete_log_group(&resource.physical_id),
92            t if t.starts_with("Custom::") || t == "AWS::CloudFormation::CustomResource" => {
93                self.delete_custom_resource(resource)
94            }
95            other => Err(format!("Unsupported resource type: {other}")),
96        }
97    }
98
99    // --- SQS ---
100
101    fn create_sqs_queue(&self, resource: &ResourceDefinition) -> Result<String, String> {
102        let props = &resource.properties;
103        let queue_name = props
104            .get("QueueName")
105            .and_then(|v| v.as_str())
106            .unwrap_or(&resource.logical_id);
107
108        let mut state = self.sqs_state.write();
109        let queue_url = format!("{}/{}/{}", state.endpoint, state.account_id, queue_name);
110        let arn = format!(
111            "arn:aws:sqs:{}:{}:{}",
112            state.region, state.account_id, queue_name
113        );
114
115        let is_fifo = queue_name.ends_with(".fifo");
116        let mut attributes = HashMap::new();
117        if let Some(obj) = props.as_object() {
118            for (k, v) in obj {
119                if k != "QueueName" {
120                    if let Some(s) = v.as_str() {
121                        attributes.insert(k.clone(), s.to_string());
122                    } else if let Some(n) = v.as_i64() {
123                        attributes.insert(k.clone(), n.to_string());
124                    }
125                }
126            }
127        }
128
129        let queue = SqsQueue {
130            queue_name: queue_name.to_string(),
131            queue_url: queue_url.clone(),
132            arn,
133            created_at: Utc::now(),
134            messages: std::collections::VecDeque::new(),
135            inflight: Vec::new(),
136            attributes,
137            is_fifo,
138            dedup_cache: HashMap::new(),
139            redrive_policy: None,
140            tags: HashMap::new(),
141            next_sequence_number: 0,
142            permission_labels: Vec::new(),
143            receipt_handle_map: HashMap::new(),
144        };
145
146        state
147            .name_to_url
148            .insert(queue_name.to_string(), queue_url.clone());
149        state.queues.insert(queue_url.clone(), queue);
150
151        Ok(queue_url)
152    }
153
154    fn delete_sqs_queue(&self, physical_id: &str) -> Result<(), String> {
155        let mut state = self.sqs_state.write();
156        if let Some(queue) = state.queues.remove(physical_id) {
157            state.name_to_url.remove(&queue.queue_name);
158        }
159        Ok(())
160    }
161
162    // --- SNS ---
163
164    fn create_sns_topic(&self, resource: &ResourceDefinition) -> Result<String, String> {
165        let props = &resource.properties;
166        let topic_name = props
167            .get("TopicName")
168            .and_then(|v| v.as_str())
169            .unwrap_or(&resource.logical_id);
170
171        let mut state = self.sns_state.write();
172        let topic_arn = format!(
173            "arn:aws:sns:{}:{}:{}",
174            state.region, state.account_id, topic_name
175        );
176
177        let topic = SnsTopic {
178            topic_arn: topic_arn.clone(),
179            name: topic_name.to_string(),
180            attributes: HashMap::new(),
181            tags: Vec::new(),
182            is_fifo: topic_name.ends_with(".fifo"),
183            created_at: Utc::now(),
184        };
185
186        state.topics.insert(topic_arn.clone(), topic);
187        Ok(topic_arn)
188    }
189
190    fn delete_sns_topic(&self, physical_id: &str) -> Result<(), String> {
191        let mut state = self.sns_state.write();
192        state.topics.remove(physical_id);
193        // Also remove subscriptions for this topic
194        state
195            .subscriptions
196            .retain(|_, sub| sub.topic_arn != physical_id);
197        Ok(())
198    }
199
200    // --- SNS Subscription ---
201
202    fn create_sns_subscription(&self, resource: &ResourceDefinition) -> Result<String, String> {
203        let props = &resource.properties;
204        let topic_arn = props
205            .get("TopicArn")
206            .and_then(|v| v.as_str())
207            .ok_or("SNS Subscription requires TopicArn")?;
208        let protocol = props
209            .get("Protocol")
210            .and_then(|v| v.as_str())
211            .ok_or("SNS Subscription requires Protocol")?;
212        let endpoint = props
213            .get("Endpoint")
214            .and_then(|v| v.as_str())
215            .ok_or("SNS Subscription requires Endpoint")?;
216
217        let mut state = self.sns_state.write();
218
219        // Validate that the topic exists
220        if !state.topics.contains_key(topic_arn) {
221            return Err(format!("Topic ARN does not exist: {topic_arn}"));
222        }
223
224        let sub_arn = format!("{}:{}", topic_arn, Uuid::new_v4());
225
226        let subscription = SnsSubscription {
227            subscription_arn: sub_arn.clone(),
228            topic_arn: topic_arn.to_string(),
229            protocol: protocol.to_string(),
230            endpoint: endpoint.to_string(),
231            owner: state.account_id.clone(),
232            attributes: HashMap::new(),
233            confirmed: true,
234            confirmation_token: None,
235        };
236
237        state.subscriptions.insert(sub_arn.clone(), subscription);
238        Ok(sub_arn)
239    }
240
241    fn delete_sns_subscription(&self, physical_id: &str) -> Result<(), String> {
242        let mut state = self.sns_state.write();
243        state.subscriptions.remove(physical_id);
244        Ok(())
245    }
246
247    // --- SSM ---
248
249    fn create_ssm_parameter(&self, resource: &ResourceDefinition) -> Result<String, String> {
250        let props = &resource.properties;
251        let name = props
252            .get("Name")
253            .and_then(|v| v.as_str())
254            .ok_or("SSM Parameter requires Name")?;
255        let value = props
256            .get("Value")
257            .and_then(|v| v.as_str())
258            .ok_or("SSM Parameter requires Value")?;
259        let param_type = props
260            .get("Type")
261            .and_then(|v| v.as_str())
262            .unwrap_or("String");
263
264        let mut state = self.ssm_state.write();
265        let arn = format!(
266            "arn:aws:ssm:{}:{}:parameter{}",
267            state.region,
268            state.account_id,
269            if name.starts_with('/') {
270                name.to_string()
271            } else {
272                format!("/{name}")
273            }
274        );
275
276        let parameter = SsmParameter {
277            name: name.to_string(),
278            value: value.to_string(),
279            param_type: param_type.to_string(),
280            version: 1,
281            arn: arn.clone(),
282            last_modified: Utc::now(),
283            history: Vec::new(),
284            tags: HashMap::new(),
285            labels: HashMap::new(),
286            description: props
287                .get("Description")
288                .and_then(|v| v.as_str())
289                .map(|s| s.to_string()),
290            allowed_pattern: None,
291            key_id: None,
292            data_type: "text".to_string(),
293            tier: "Standard".to_string(),
294            policies: None,
295        };
296
297        state.parameters.insert(name.to_string(), parameter);
298        Ok(name.to_string())
299    }
300
301    fn delete_ssm_parameter(&self, physical_id: &str) -> Result<(), String> {
302        let mut state = self.ssm_state.write();
303        state.parameters.remove(physical_id);
304        Ok(())
305    }
306
307    // --- IAM Role ---
308
309    fn create_iam_role(&self, resource: &ResourceDefinition) -> Result<String, String> {
310        let props = &resource.properties;
311        let role_name = props
312            .get("RoleName")
313            .and_then(|v| v.as_str())
314            .unwrap_or(&resource.logical_id);
315
316        let assume_role_policy = props
317            .get("AssumeRolePolicyDocument")
318            .map(|v| {
319                if v.is_string() {
320                    v.as_str().unwrap().to_string()
321                } else {
322                    serde_json::to_string(v).unwrap_or_default()
323                }
324            })
325            .unwrap_or_default();
326
327        let path = props.get("Path").and_then(|v| v.as_str()).unwrap_or("/");
328
329        let mut state = self.iam_state.write();
330        let role_id = format!(
331            "FKIA{}",
332            &Uuid::new_v4().to_string().replace('-', "").to_uppercase()[..16]
333        );
334        let arn = format!(
335            "arn:aws:iam::{}:role{}{}",
336            state.account_id,
337            if path == "/" { "/" } else { path },
338            role_name
339        );
340
341        let role = IamRole {
342            role_name: role_name.to_string(),
343            role_id,
344            arn: arn.clone(),
345            path: path.to_string(),
346            assume_role_policy_document: assume_role_policy,
347            created_at: Utc::now(),
348            description: props
349                .get("Description")
350                .and_then(|v| v.as_str())
351                .map(|s| s.to_string()),
352            max_session_duration: 3600,
353            tags: Vec::new(),
354            permissions_boundary: None,
355        };
356
357        state.roles.insert(role_name.to_string(), role);
358        Ok(arn)
359    }
360
361    fn delete_iam_role(&self, physical_id: &str) -> Result<(), String> {
362        let mut state = self.iam_state.write();
363        // physical_id is the ARN; find the role name
364        let role_name = state
365            .roles
366            .iter()
367            .find(|(_, r)| r.arn == physical_id)
368            .map(|(name, _)| name.clone());
369        if let Some(name) = role_name {
370            state.roles.remove(&name);
371            state.role_policies.remove(&name);
372            state.role_inline_policies.remove(&name);
373        }
374        Ok(())
375    }
376
377    // --- IAM Policy ---
378
379    fn create_iam_policy(&self, resource: &ResourceDefinition) -> Result<String, String> {
380        let props = &resource.properties;
381        let policy_name = props
382            .get("PolicyName")
383            .and_then(|v| v.as_str())
384            .unwrap_or(&resource.logical_id);
385
386        let policy_document = props
387            .get("PolicyDocument")
388            .map(|v| {
389                if v.is_string() {
390                    v.as_str().unwrap().to_string()
391                } else {
392                    serde_json::to_string(v).unwrap_or_default()
393                }
394            })
395            .unwrap_or_default();
396
397        let path = props.get("Path").and_then(|v| v.as_str()).unwrap_or("/");
398
399        let mut state = self.iam_state.write();
400        let policy_id = format!(
401            "FSIA{}",
402            &Uuid::new_v4().to_string().replace('-', "").to_uppercase()[..16]
403        );
404        let arn = format!(
405            "arn:aws:iam::{}:policy{}{}",
406            state.account_id,
407            if path == "/" { "/" } else { path },
408            policy_name
409        );
410
411        let now = Utc::now();
412        let policy = IamPolicy {
413            policy_name: policy_name.to_string(),
414            policy_id,
415            arn: arn.clone(),
416            path: path.to_string(),
417            description: props
418                .get("Description")
419                .and_then(|v| v.as_str())
420                .unwrap_or("")
421                .to_string(),
422            created_at: now,
423            tags: Vec::new(),
424            default_version_id: "v1".to_string(),
425            versions: vec![PolicyVersion {
426                version_id: "v1".to_string(),
427                document: policy_document,
428                is_default: true,
429                created_at: now,
430            }],
431            next_version_num: 2,
432            attachment_count: 0,
433        };
434
435        state.policies.insert(arn.clone(), policy);
436        Ok(arn)
437    }
438
439    fn delete_iam_policy(&self, physical_id: &str) -> Result<(), String> {
440        let mut state = self.iam_state.write();
441        state.policies.remove(physical_id);
442        Ok(())
443    }
444
445    // --- S3 ---
446
447    fn create_s3_bucket(&self, resource: &ResourceDefinition) -> Result<String, String> {
448        let props = &resource.properties;
449        let bucket_name = props
450            .get("BucketName")
451            .and_then(|v| v.as_str())
452            .unwrap_or(&resource.logical_id);
453
454        let mut state = self.s3_state.write();
455        let bucket = S3Bucket::new(bucket_name, &state.region, &state.account_id);
456        state.buckets.insert(bucket_name.to_string(), bucket);
457        Ok(bucket_name.to_string())
458    }
459
460    fn delete_s3_bucket(&self, physical_id: &str) -> Result<(), String> {
461        let mut state = self.s3_state.write();
462        state.buckets.remove(physical_id);
463        Ok(())
464    }
465
466    // --- EventBridge ---
467
468    fn create_eventbridge_rule(&self, resource: &ResourceDefinition) -> Result<String, String> {
469        let props = &resource.properties;
470        let rule_name = props
471            .get("Name")
472            .and_then(|v| v.as_str())
473            .unwrap_or(&resource.logical_id);
474        let event_bus_name = props
475            .get("EventBusName")
476            .and_then(|v| v.as_str())
477            .unwrap_or("default");
478
479        let mut state = self.eventbridge_state.write();
480
481        // Validate that the event bus exists
482        if !state.buses.contains_key(event_bus_name) {
483            return Err(format!("Event bus does not exist: {event_bus_name}"));
484        }
485
486        let arn = if event_bus_name == "default" {
487            format!(
488                "arn:aws:events:{}:{}:rule/{}",
489                state.region, state.account_id, rule_name
490            )
491        } else {
492            format!(
493                "arn:aws:events:{}:{}:rule/{}/{}",
494                state.region, state.account_id, event_bus_name, rule_name
495            )
496        };
497
498        let rule = EventRule {
499            name: rule_name.to_string(),
500            arn: arn.clone(),
501            event_bus_name: event_bus_name.to_string(),
502            event_pattern: props.get("EventPattern").map(|v| {
503                if v.is_string() {
504                    v.as_str().unwrap().to_string()
505                } else {
506                    serde_json::to_string(v).unwrap_or_default()
507                }
508            }),
509            schedule_expression: props
510                .get("ScheduleExpression")
511                .and_then(|v| v.as_str())
512                .map(|s| s.to_string()),
513            state: props
514                .get("State")
515                .and_then(|v| v.as_str())
516                .unwrap_or("ENABLED")
517                .to_string(),
518            description: props
519                .get("Description")
520                .and_then(|v| v.as_str())
521                .map(|s| s.to_string()),
522            role_arn: props
523                .get("RoleArn")
524                .and_then(|v| v.as_str())
525                .map(|s| s.to_string()),
526            managed_by: None,
527            created_by: None,
528            targets: Vec::new(),
529            tags: HashMap::new(),
530            last_fired: None,
531        };
532
533        state
534            .rules
535            .insert((event_bus_name.to_string(), rule_name.to_string()), rule);
536        Ok(arn)
537    }
538
539    fn delete_eventbridge_rule(&self, physical_id: &str) -> Result<(), String> {
540        let mut state = self.eventbridge_state.write();
541        // physical_id is the ARN; find the rule key
542        let key = state
543            .rules
544            .iter()
545            .find(|(_, r)| r.arn == physical_id)
546            .map(|(k, _)| k.clone());
547        if let Some(k) = key {
548            state.rules.remove(&k);
549        }
550        Ok(())
551    }
552
553    // --- DynamoDB ---
554
555    fn create_dynamodb_table(&self, resource: &ResourceDefinition) -> Result<String, String> {
556        let props = &resource.properties;
557        let table_name = props
558            .get("TableName")
559            .and_then(|v| v.as_str())
560            .unwrap_or(&resource.logical_id);
561
562        let mut key_schema = Vec::new();
563        if let Some(ks) = props.get("KeySchema").and_then(|v| v.as_array()) {
564            for item in ks {
565                let attr_name = item
566                    .get("AttributeName")
567                    .and_then(|v| v.as_str())
568                    .unwrap_or("")
569                    .to_string();
570                let key_type = item
571                    .get("KeyType")
572                    .and_then(|v| v.as_str())
573                    .unwrap_or("HASH")
574                    .to_string();
575                key_schema.push(KeySchemaElement {
576                    attribute_name: attr_name,
577                    key_type,
578                });
579            }
580        }
581
582        let mut attribute_definitions = Vec::new();
583        if let Some(defs) = props.get("AttributeDefinitions").and_then(|v| v.as_array()) {
584            for item in defs {
585                let attr_name = item
586                    .get("AttributeName")
587                    .and_then(|v| v.as_str())
588                    .unwrap_or("")
589                    .to_string();
590                let attr_type = item
591                    .get("AttributeType")
592                    .and_then(|v| v.as_str())
593                    .unwrap_or("S")
594                    .to_string();
595                attribute_definitions.push(AttributeDefinition {
596                    attribute_name: attr_name,
597                    attribute_type: attr_type,
598                });
599            }
600        }
601
602        let billing_mode = props
603            .get("BillingMode")
604            .and_then(|v| v.as_str())
605            .unwrap_or("PAY_PER_REQUEST")
606            .to_string();
607
608        let provisioned_throughput = if billing_mode == "PROVISIONED" {
609            if let Some(pt) = props.get("ProvisionedThroughput") {
610                ProvisionedThroughput {
611                    read_capacity_units: pt
612                        .get("ReadCapacityUnits")
613                        .and_then(|v| v.as_i64())
614                        .unwrap_or(5),
615                    write_capacity_units: pt
616                        .get("WriteCapacityUnits")
617                        .and_then(|v| v.as_i64())
618                        .unwrap_or(5),
619                }
620            } else {
621                ProvisionedThroughput {
622                    read_capacity_units: 5,
623                    write_capacity_units: 5,
624                }
625            }
626        } else {
627            ProvisionedThroughput {
628                read_capacity_units: 0,
629                write_capacity_units: 0,
630            }
631        };
632
633        // Parse StreamSpecification from CloudFormation properties
634        let (stream_enabled, stream_view_type) =
635            if let Some(stream_spec) = props.get("StreamSpecification") {
636                let view_type = stream_spec
637                    .get("StreamViewType")
638                    .and_then(|v| v.as_str())
639                    .map(|s| s.to_string());
640                let enabled = stream_spec
641                    .get("StreamEnabled")
642                    .and_then(|v| v.as_bool().or_else(|| v.as_str().map(|s| s == "true")))
643                    // If StreamViewType is set, treat streams as enabled even if StreamEnabled is missing
644                    .unwrap_or(view_type.is_some());
645                (enabled, view_type)
646            } else {
647                (false, None)
648            };
649
650        let deletion_protection_enabled = props
651            .get("DeletionProtectionEnabled")
652            .and_then(|v| v.as_bool().or_else(|| v.as_str().map(|s| s == "true")))
653            .unwrap_or(false);
654
655        let mut state = self.dynamodb_state.write();
656        let arn = format!(
657            "arn:aws:dynamodb:{}:{}:table/{}",
658            state.region, state.account_id, table_name
659        );
660
661        let stream_arn = if stream_enabled {
662            Some(format!(
663                "{}/stream/{}",
664                arn,
665                Utc::now().format("%Y-%m-%dT%H:%M:%S.%3f")
666            ))
667        } else {
668            None
669        };
670
671        let table = DynamoTable {
672            name: table_name.to_string(),
673            arn: arn.clone(),
674            table_id: Uuid::new_v4().to_string().replace('-', ""),
675            key_schema,
676            attribute_definitions,
677            provisioned_throughput,
678            items: Vec::new(),
679            gsi: Vec::new(),
680            lsi: Vec::new(),
681            tags: HashMap::new(),
682            created_at: Utc::now(),
683            status: "ACTIVE".to_string(),
684            item_count: 0,
685            size_bytes: 0,
686            billing_mode,
687            ttl_attribute: None,
688            ttl_enabled: false,
689            resource_policy: None,
690            pitr_enabled: false,
691            kinesis_destinations: Vec::new(),
692            contributor_insights_status: "DISABLED".to_string(),
693            contributor_insights_counters: HashMap::new(),
694            stream_enabled,
695            stream_view_type,
696            stream_arn,
697            stream_records: Arc::new(RwLock::new(Vec::new())),
698            sse_type: None,
699            sse_kms_key_arn: None,
700            deletion_protection_enabled,
701        };
702
703        state.tables.insert(table_name.to_string(), table);
704        Ok(arn)
705    }
706
707    fn delete_dynamodb_table(&self, physical_id: &str) -> Result<(), String> {
708        let mut state = self.dynamodb_state.write();
709        // physical_id is the ARN; find the table name
710        let table_name = state
711            .tables
712            .iter()
713            .find(|(_, t)| t.arn == physical_id)
714            .map(|(name, _)| name.clone());
715        if let Some(name) = table_name {
716            state.tables.remove(&name);
717        }
718        Ok(())
719    }
720
721    // --- CloudWatch Logs ---
722
723    fn create_log_group(&self, resource: &ResourceDefinition) -> Result<String, String> {
724        let props = &resource.properties;
725        let log_group_name = props
726            .get("LogGroupName")
727            .and_then(|v| v.as_str())
728            .unwrap_or(&resource.logical_id);
729
730        let retention_in_days = props
731            .get("RetentionInDays")
732            .and_then(|v| v.as_i64())
733            .map(|v| v as i32);
734
735        let mut state = self.logs_state.write();
736        let arn = format!(
737            "arn:aws:logs:{}:{}:log-group:{}:*",
738            state.region, state.account_id, log_group_name
739        );
740
741        let log_group = fakecloud_logs::state::LogGroup {
742            name: log_group_name.to_string(),
743            arn: arn.clone(),
744            creation_time: Utc::now().timestamp_millis(),
745            retention_in_days,
746            kms_key_id: None,
747            stored_bytes: 0,
748            log_streams: HashMap::new(),
749            tags: HashMap::new(),
750            subscription_filters: Vec::new(),
751            data_protection_policy: None,
752            index_policies: Vec::new(),
753            transformer: None,
754            deletion_protection: false,
755            log_group_class: Some("STANDARD".to_string()),
756        };
757
758        state
759            .log_groups
760            .insert(log_group_name.to_string(), log_group);
761        Ok(arn)
762    }
763
764    fn delete_log_group(&self, physical_id: &str) -> Result<(), String> {
765        let mut state = self.logs_state.write();
766        // physical_id is the ARN; find the log group name
767        let name = state
768            .log_groups
769            .iter()
770            .find(|(_, g)| g.arn == physical_id)
771            .map(|(name, _)| name.clone());
772        if let Some(name) = name {
773            state.log_groups.remove(&name);
774        }
775        Ok(())
776    }
777
778    // --- Custom Resources ---
779
780    /// Invoke a Lambda function synchronously via the delivery bus.
781    fn invoke_lambda_sync(&self, function_arn: &str, payload: &str) -> Result<(), String> {
782        let delivery = self.delivery.clone();
783        let function_arn = function_arn.to_string();
784        let payload = payload.to_string();
785        std::thread::scope(|s| {
786            s.spawn(|| {
787                let rt = tokio::runtime::Builder::new_current_thread()
788                    .enable_all()
789                    .build()
790                    .map_err(|e| format!("Failed to create runtime: {e}"))?;
791                rt.block_on(async {
792                    match delivery.invoke_lambda(&function_arn, &payload).await {
793                        Some(Ok(_)) => {
794                            tracing::info!(
795                                "Custom resource Lambda {} invoked successfully",
796                                function_arn
797                            );
798                            Ok(())
799                        }
800                        Some(Err(e)) => {
801                            tracing::warn!(
802                                "Custom resource Lambda {} invocation failed: {e}",
803                                function_arn
804                            );
805                            Err(format!("Lambda invocation failed: {e}"))
806                        }
807                        None => {
808                            tracing::warn!(
809                                "No Lambda delivery configured; skipping custom resource invocation for {}",
810                                function_arn
811                            );
812                            Ok(())
813                        }
814                    }
815                })
816            })
817            .join()
818            .map_err(|_| "Lambda invocation thread panicked".to_string())?
819        })
820    }
821
822    fn create_custom_resource(&self, resource: &ResourceDefinition) -> Result<String, String> {
823        let props = &resource.properties;
824        let service_token = props
825            .get("ServiceToken")
826            .and_then(|v| v.as_str())
827            .ok_or("Custom resource requires ServiceToken property")?;
828
829        let request_id = Uuid::new_v4().to_string();
830
831        // Build the CloudFormation custom resource event
832        let event = serde_json::json!({
833            "RequestType": "Create",
834            "ServiceToken": service_token,
835            "StackId": self.stack_id,
836            "RequestId": request_id,
837            "ResourceType": resource.resource_type,
838            "LogicalResourceId": resource.logical_id,
839            "ResourceProperties": props,
840        });
841
842        let payload = serde_json::to_string(&event).map_err(|e| e.to_string())?;
843        self.invoke_lambda_sync(service_token, &payload)?;
844
845        // Physical resource ID: use a generated ID (the Lambda could return one,
846        // but for simplicity we generate one here).
847        let physical_id = format!("{}-{}", resource.logical_id, &request_id[..8]);
848        Ok(physical_id)
849    }
850
851    fn delete_custom_resource(&self, resource: &StackResource) -> Result<(), String> {
852        let service_token = match &resource.service_token {
853            Some(token) => token.clone(),
854            None => {
855                // No ServiceToken stored — nothing to invoke
856                return Ok(());
857            }
858        };
859
860        let request_id = Uuid::new_v4().to_string();
861
862        let event = serde_json::json!({
863            "RequestType": "Delete",
864            "ServiceToken": service_token,
865            "StackId": self.stack_id,
866            "RequestId": request_id,
867            "ResourceType": resource.resource_type,
868            "LogicalResourceId": resource.logical_id,
869            "PhysicalResourceId": resource.physical_id,
870        });
871
872        let payload = serde_json::to_string(&event).map_err(|e| e.to_string())?;
873
874        // Best-effort: don't fail stack deletion if Lambda invocation fails
875        if let Err(e) = self.invoke_lambda_sync(&service_token, &payload) {
876            tracing::warn!(
877                "Custom resource delete Lambda invocation failed for {}: {e}",
878                resource.logical_id
879            );
880        }
881        Ok(())
882    }
883}
884
885#[cfg(test)]
886mod tests {
887    use super::*;
888    use parking_lot::RwLock;
889
890    fn make_provisioner() -> ResourceProvisioner {
891        ResourceProvisioner {
892            sqs_state: Arc::new(RwLock::new(fakecloud_sqs::state::SqsState::new(
893                "123456789012",
894                "us-east-1",
895                "http://localhost:4566",
896            ))),
897            sns_state: Arc::new(RwLock::new(fakecloud_sns::state::SnsState::new(
898                "123456789012",
899                "us-east-1",
900                "http://localhost:4566",
901            ))),
902            ssm_state: Arc::new(RwLock::new(fakecloud_ssm::state::SsmState::new(
903                "123456789012",
904                "us-east-1",
905            ))),
906            iam_state: Arc::new(RwLock::new(fakecloud_iam::state::IamState::new(
907                "123456789012",
908            ))),
909            s3_state: Arc::new(RwLock::new(fakecloud_s3::state::S3State::new(
910                "123456789012",
911                "us-east-1",
912            ))),
913            eventbridge_state: Arc::new(RwLock::new(
914                fakecloud_eventbridge::state::EventBridgeState::new("123456789012", "us-east-1"),
915            )),
916            dynamodb_state: Arc::new(RwLock::new(fakecloud_dynamodb::state::DynamoDbState::new(
917                "123456789012",
918                "us-east-1",
919            ))),
920            logs_state: Arc::new(RwLock::new(fakecloud_logs::state::LogsState::new(
921                "123456789012",
922                "us-east-1",
923            ))),
924            delivery: Arc::new(DeliveryBus::new()),
925            account_id: "123456789012".to_string(),
926            region: "us-east-1".to_string(),
927            stack_id: "arn:aws:cloudformation:us-east-1:123456789012:stack/test/00000000-0000-0000-0000-000000000000".to_string(),
928        }
929    }
930
931    fn make_resource(
932        resource_type: &str,
933        logical_id: &str,
934        props: serde_json::Value,
935    ) -> ResourceDefinition {
936        ResourceDefinition {
937            logical_id: logical_id.to_string(),
938            resource_type: resource_type.to_string(),
939            properties: props,
940        }
941    }
942
943    #[test]
944    fn sns_subscription_rejects_nonexistent_topic() {
945        let prov = make_provisioner();
946        let resource = make_resource(
947            "AWS::SNS::Subscription",
948            "MySub",
949            serde_json::json!({
950                "TopicArn": "arn:aws:sns:us-east-1:123456789012:NonExistent",
951                "Protocol": "sqs",
952                "Endpoint": "arn:aws:sqs:us-east-1:123456789012:my-queue"
953            }),
954        );
955        let result = prov.create_resource(&resource);
956        assert!(result.is_err());
957        assert!(result.unwrap_err().contains("does not exist"));
958    }
959
960    #[test]
961    fn sns_subscription_succeeds_when_topic_exists() {
962        let prov = make_provisioner();
963        // First create the topic
964        let topic = make_resource(
965            "AWS::SNS::Topic",
966            "MyTopic",
967            serde_json::json!({ "TopicName": "my-topic" }),
968        );
969        let topic_result = prov.create_resource(&topic);
970        assert!(topic_result.is_ok());
971        let topic_arn = topic_result.unwrap().physical_id;
972
973        // Now create subscription referencing that topic
974        let sub = make_resource(
975            "AWS::SNS::Subscription",
976            "MySub",
977            serde_json::json!({
978                "TopicArn": topic_arn,
979                "Protocol": "sqs",
980                "Endpoint": "arn:aws:sqs:us-east-1:123456789012:my-queue"
981            }),
982        );
983        let result = prov.create_resource(&sub);
984        assert!(result.is_ok());
985    }
986
987    #[test]
988    fn eventbridge_rule_arn_default_bus_omits_bus_name() {
989        let prov = make_provisioner();
990        let resource = make_resource(
991            "AWS::Events::Rule",
992            "MyRule",
993            serde_json::json!({
994                "Name": "my-rule",
995                "ScheduleExpression": "rate(1 hour)"
996            }),
997        );
998        let result = prov.create_resource(&resource).unwrap();
999        // For default bus, ARN should be rule/<name> without /default/
1000        assert_eq!(
1001            result.physical_id,
1002            "arn:aws:events:us-east-1:123456789012:rule/my-rule"
1003        );
1004        assert!(!result.physical_id.contains("rule/default/"));
1005    }
1006
1007    #[test]
1008    fn eventbridge_rule_arn_custom_bus_includes_bus_name() {
1009        let prov = make_provisioner();
1010        // Create a custom bus first
1011        {
1012            let mut state = prov.eventbridge_state.write();
1013            state.buses.insert(
1014                "custom-bus".to_string(),
1015                fakecloud_eventbridge::state::EventBus {
1016                    name: "custom-bus".to_string(),
1017                    arn: "arn:aws:events:us-east-1:123456789012:event-bus/custom-bus".to_string(),
1018                    policy: None,
1019                    creation_time: Utc::now(),
1020                    last_modified_time: Utc::now(),
1021                    description: None,
1022                    kms_key_identifier: None,
1023                    dead_letter_config: None,
1024                    tags: HashMap::new(),
1025                },
1026            );
1027        }
1028        let resource = make_resource(
1029            "AWS::Events::Rule",
1030            "MyRule",
1031            serde_json::json!({
1032                "Name": "my-rule",
1033                "EventBusName": "custom-bus",
1034                "ScheduleExpression": "rate(1 hour)"
1035            }),
1036        );
1037        let result = prov.create_resource(&resource).unwrap();
1038        assert_eq!(
1039            result.physical_id,
1040            "arn:aws:events:us-east-1:123456789012:rule/custom-bus/my-rule"
1041        );
1042    }
1043
1044    #[test]
1045    fn eventbridge_rule_rejects_nonexistent_bus() {
1046        let prov = make_provisioner();
1047        let resource = make_resource(
1048            "AWS::Events::Rule",
1049            "MyRule",
1050            serde_json::json!({
1051                "Name": "my-rule",
1052                "EventBusName": "nonexistent-bus",
1053                "ScheduleExpression": "rate(1 hour)"
1054            }),
1055        );
1056        let result = prov.create_resource(&resource);
1057        assert!(result.is_err());
1058        assert!(result.unwrap_err().contains("does not exist"));
1059    }
1060
1061    #[test]
1062    fn custom_resource_requires_service_token() {
1063        let prov = make_provisioner();
1064        let resource = make_resource(
1065            "Custom::MyResource",
1066            "MyCustom",
1067            serde_json::json!({
1068                "Foo": "bar"
1069            }),
1070        );
1071        let result = prov.create_resource(&resource);
1072        assert!(result.is_err());
1073        assert!(
1074            result.unwrap_err().contains("ServiceToken"),
1075            "Should require ServiceToken property"
1076        );
1077    }
1078
1079    #[test]
1080    fn custom_resource_succeeds_without_lambda_delivery() {
1081        // When no Lambda delivery is configured, custom resource creation
1082        // should still succeed (the invocation is silently skipped).
1083        let prov = make_provisioner();
1084        let resource = make_resource(
1085            "Custom::MyResource",
1086            "MyCustom",
1087            serde_json::json!({
1088                "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:my-func",
1089                "Foo": "bar"
1090            }),
1091        );
1092        let result = prov.create_resource(&resource);
1093        assert!(result.is_ok());
1094        let sr = result.unwrap();
1095        assert_eq!(sr.logical_id, "MyCustom");
1096        assert_eq!(sr.resource_type, "Custom::MyResource");
1097        assert!(sr.physical_id.starts_with("MyCustom-"));
1098    }
1099
1100    #[test]
1101    fn cloudformation_custom_resource_type_succeeds() {
1102        let prov = make_provisioner();
1103        let resource = make_resource(
1104            "AWS::CloudFormation::CustomResource",
1105            "MyCustom2",
1106            serde_json::json!({
1107                "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:my-func",
1108                "Key": "value"
1109            }),
1110        );
1111        let result = prov.create_resource(&resource);
1112        assert!(result.is_ok());
1113        let sr = result.unwrap();
1114        assert_eq!(sr.resource_type, "AWS::CloudFormation::CustomResource");
1115    }
1116}