1use chrono::Utc;
2use std::collections::HashMap;
3use std::sync::Arc;
4use uuid::Uuid;
5
6use fakecloud_core::delivery::DeliveryBus;
7use fakecloud_dynamodb::state::{
8 AttributeDefinition, DynamoTable, KeySchemaElement, ProvisionedThroughput, SharedDynamoDbState,
9};
10use fakecloud_eventbridge::state::{EventRule, SharedEventBridgeState};
11use fakecloud_iam::state::{IamPolicy, IamRole, PolicyVersion, SharedIamState};
12use fakecloud_logs::state::SharedLogsState;
13use fakecloud_s3::state::{S3Bucket, SharedS3State};
14use fakecloud_sns::state::{SharedSnsState, SnsSubscription, SnsTopic};
15use fakecloud_sqs::state::{SharedSqsState, SqsQueue};
16use fakecloud_ssm::state::{SharedSsmState, SsmParameter};
17
18use crate::state::StackResource;
19use crate::template::ResourceDefinition;
20
21pub struct ResourceProvisioner {
23 pub sqs_state: SharedSqsState,
24 pub sns_state: SharedSnsState,
25 pub ssm_state: SharedSsmState,
26 pub iam_state: SharedIamState,
27 pub s3_state: SharedS3State,
28 pub eventbridge_state: SharedEventBridgeState,
29 pub dynamodb_state: SharedDynamoDbState,
30 pub logs_state: SharedLogsState,
31 pub delivery: Arc<DeliveryBus>,
32 pub account_id: String,
33 pub region: String,
34 pub stack_id: String,
35}
36
37impl ResourceProvisioner {
38 pub fn create_resource(&self, resource: &ResourceDefinition) -> Result<StackResource, String> {
40 let result = match resource.resource_type.as_str() {
41 "AWS::SQS::Queue" => self.create_sqs_queue(resource),
42 "AWS::SNS::Topic" => self.create_sns_topic(resource),
43 "AWS::SNS::Subscription" => self.create_sns_subscription(resource),
44 "AWS::SSM::Parameter" => self.create_ssm_parameter(resource),
45 "AWS::IAM::Role" => self.create_iam_role(resource),
46 "AWS::IAM::Policy" => self.create_iam_policy(resource),
47 "AWS::S3::Bucket" => self.create_s3_bucket(resource),
48 "AWS::Events::Rule" => self.create_eventbridge_rule(resource),
49 "AWS::DynamoDB::Table" => self.create_dynamodb_table(resource),
50 "AWS::Logs::LogGroup" => self.create_log_group(resource),
51 t if t.starts_with("Custom::") || t == "AWS::CloudFormation::CustomResource" => {
52 self.create_custom_resource(resource)
53 }
54 other => Err(format!("Unsupported resource type: {other}")),
55 };
56
57 let is_custom = resource.resource_type.starts_with("Custom::")
58 || resource.resource_type == "AWS::CloudFormation::CustomResource";
59 let service_token = if is_custom {
60 resource
61 .properties
62 .get("ServiceToken")
63 .and_then(|v| v.as_str())
64 .map(|s| s.to_string())
65 } else {
66 None
67 };
68
69 result.map(|physical_id| StackResource {
70 logical_id: resource.logical_id.clone(),
71 physical_id,
72 resource_type: resource.resource_type.clone(),
73 status: "CREATE_COMPLETE".to_string(),
74 service_token,
75 })
76 }
77
78 pub fn delete_resource(&self, resource: &StackResource) -> Result<(), String> {
80 match resource.resource_type.as_str() {
81 "AWS::SQS::Queue" => self.delete_sqs_queue(&resource.physical_id),
82 "AWS::SNS::Topic" => self.delete_sns_topic(&resource.physical_id),
83 "AWS::SNS::Subscription" => self.delete_sns_subscription(&resource.physical_id),
84 "AWS::SSM::Parameter" => self.delete_ssm_parameter(&resource.physical_id),
85 "AWS::IAM::Role" => self.delete_iam_role(&resource.physical_id),
86 "AWS::IAM::Policy" => self.delete_iam_policy(&resource.physical_id),
87 "AWS::S3::Bucket" => self.delete_s3_bucket(&resource.physical_id),
88 "AWS::Events::Rule" => self.delete_eventbridge_rule(&resource.physical_id),
89 "AWS::DynamoDB::Table" => self.delete_dynamodb_table(&resource.physical_id),
90 "AWS::Logs::LogGroup" => self.delete_log_group(&resource.physical_id),
91 t if t.starts_with("Custom::") || t == "AWS::CloudFormation::CustomResource" => {
92 self.delete_custom_resource(resource)
93 }
94 other => Err(format!("Unsupported resource type: {other}")),
95 }
96 }
97
98 fn create_sqs_queue(&self, resource: &ResourceDefinition) -> Result<String, String> {
101 let props = &resource.properties;
102 let queue_name = props
103 .get("QueueName")
104 .and_then(|v| v.as_str())
105 .unwrap_or(&resource.logical_id);
106
107 let mut state = self.sqs_state.write();
108 let queue_url = format!("{}/{}/{}", state.endpoint, state.account_id, queue_name);
109 let arn = format!(
110 "arn:aws:sqs:{}:{}:{}",
111 state.region, state.account_id, queue_name
112 );
113
114 let is_fifo = queue_name.ends_with(".fifo");
115 let mut attributes = HashMap::new();
116 if let Some(obj) = props.as_object() {
117 for (k, v) in obj {
118 if k != "QueueName" {
119 if let Some(s) = v.as_str() {
120 attributes.insert(k.clone(), s.to_string());
121 } else if let Some(n) = v.as_i64() {
122 attributes.insert(k.clone(), n.to_string());
123 }
124 }
125 }
126 }
127
128 let queue = SqsQueue {
129 queue_name: queue_name.to_string(),
130 queue_url: queue_url.clone(),
131 arn,
132 created_at: Utc::now(),
133 messages: std::collections::VecDeque::new(),
134 inflight: Vec::new(),
135 attributes,
136 is_fifo,
137 dedup_cache: HashMap::new(),
138 redrive_policy: None,
139 tags: HashMap::new(),
140 next_sequence_number: 0,
141 permission_labels: Vec::new(),
142 receipt_handle_map: HashMap::new(),
143 };
144
145 state
146 .name_to_url
147 .insert(queue_name.to_string(), queue_url.clone());
148 state.queues.insert(queue_url.clone(), queue);
149
150 Ok(queue_url)
151 }
152
153 fn delete_sqs_queue(&self, physical_id: &str) -> Result<(), String> {
154 let mut state = self.sqs_state.write();
155 if let Some(queue) = state.queues.remove(physical_id) {
156 state.name_to_url.remove(&queue.queue_name);
157 }
158 Ok(())
159 }
160
161 fn create_sns_topic(&self, resource: &ResourceDefinition) -> Result<String, String> {
164 let props = &resource.properties;
165 let topic_name = props
166 .get("TopicName")
167 .and_then(|v| v.as_str())
168 .unwrap_or(&resource.logical_id);
169
170 let mut state = self.sns_state.write();
171 let topic_arn = format!(
172 "arn:aws:sns:{}:{}:{}",
173 state.region, state.account_id, topic_name
174 );
175
176 let topic = SnsTopic {
177 topic_arn: topic_arn.clone(),
178 name: topic_name.to_string(),
179 attributes: HashMap::new(),
180 tags: Vec::new(),
181 is_fifo: topic_name.ends_with(".fifo"),
182 created_at: Utc::now(),
183 };
184
185 state.topics.insert(topic_arn.clone(), topic);
186 Ok(topic_arn)
187 }
188
189 fn delete_sns_topic(&self, physical_id: &str) -> Result<(), String> {
190 let mut state = self.sns_state.write();
191 state.topics.remove(physical_id);
192 state
194 .subscriptions
195 .retain(|_, sub| sub.topic_arn != physical_id);
196 Ok(())
197 }
198
199 fn create_sns_subscription(&self, resource: &ResourceDefinition) -> Result<String, String> {
202 let props = &resource.properties;
203 let topic_arn = props
204 .get("TopicArn")
205 .and_then(|v| v.as_str())
206 .ok_or("SNS Subscription requires TopicArn")?;
207 let protocol = props
208 .get("Protocol")
209 .and_then(|v| v.as_str())
210 .ok_or("SNS Subscription requires Protocol")?;
211 let endpoint = props
212 .get("Endpoint")
213 .and_then(|v| v.as_str())
214 .ok_or("SNS Subscription requires Endpoint")?;
215
216 let mut state = self.sns_state.write();
217
218 if !state.topics.contains_key(topic_arn) {
220 return Err(format!("Topic ARN does not exist: {topic_arn}"));
221 }
222
223 let sub_arn = format!("{}:{}", topic_arn, Uuid::new_v4());
224
225 let subscription = SnsSubscription {
226 subscription_arn: sub_arn.clone(),
227 topic_arn: topic_arn.to_string(),
228 protocol: protocol.to_string(),
229 endpoint: endpoint.to_string(),
230 owner: state.account_id.clone(),
231 attributes: HashMap::new(),
232 confirmed: true,
233 confirmation_token: None,
234 };
235
236 state.subscriptions.insert(sub_arn.clone(), subscription);
237 Ok(sub_arn)
238 }
239
240 fn delete_sns_subscription(&self, physical_id: &str) -> Result<(), String> {
241 let mut state = self.sns_state.write();
242 state.subscriptions.remove(physical_id);
243 Ok(())
244 }
245
246 fn create_ssm_parameter(&self, resource: &ResourceDefinition) -> Result<String, String> {
249 let props = &resource.properties;
250 let name = props
251 .get("Name")
252 .and_then(|v| v.as_str())
253 .ok_or("SSM Parameter requires Name")?;
254 let value = props
255 .get("Value")
256 .and_then(|v| v.as_str())
257 .ok_or("SSM Parameter requires Value")?;
258 let param_type = props
259 .get("Type")
260 .and_then(|v| v.as_str())
261 .unwrap_or("String");
262
263 let mut state = self.ssm_state.write();
264 let arn = format!(
265 "arn:aws:ssm:{}:{}:parameter{}",
266 state.region,
267 state.account_id,
268 if name.starts_with('/') {
269 name.to_string()
270 } else {
271 format!("/{name}")
272 }
273 );
274
275 let parameter = SsmParameter {
276 name: name.to_string(),
277 value: value.to_string(),
278 param_type: param_type.to_string(),
279 version: 1,
280 arn: arn.clone(),
281 last_modified: Utc::now(),
282 history: Vec::new(),
283 tags: HashMap::new(),
284 labels: HashMap::new(),
285 description: props
286 .get("Description")
287 .and_then(|v| v.as_str())
288 .map(|s| s.to_string()),
289 allowed_pattern: None,
290 key_id: None,
291 data_type: "text".to_string(),
292 tier: "Standard".to_string(),
293 policies: None,
294 };
295
296 state.parameters.insert(name.to_string(), parameter);
297 Ok(name.to_string())
298 }
299
300 fn delete_ssm_parameter(&self, physical_id: &str) -> Result<(), String> {
301 let mut state = self.ssm_state.write();
302 state.parameters.remove(physical_id);
303 Ok(())
304 }
305
306 fn create_iam_role(&self, resource: &ResourceDefinition) -> Result<String, String> {
309 let props = &resource.properties;
310 let role_name = props
311 .get("RoleName")
312 .and_then(|v| v.as_str())
313 .unwrap_or(&resource.logical_id);
314
315 let assume_role_policy = props
316 .get("AssumeRolePolicyDocument")
317 .map(|v| {
318 if v.is_string() {
319 v.as_str().unwrap().to_string()
320 } else {
321 serde_json::to_string(v).unwrap_or_default()
322 }
323 })
324 .unwrap_or_default();
325
326 let path = props.get("Path").and_then(|v| v.as_str()).unwrap_or("/");
327
328 let mut state = self.iam_state.write();
329 let role_id = format!(
330 "FKIA{}",
331 &Uuid::new_v4().to_string().replace('-', "").to_uppercase()[..16]
332 );
333 let arn = format!(
334 "arn:aws:iam::{}:role{}{}",
335 state.account_id,
336 if path == "/" { "/" } else { path },
337 role_name
338 );
339
340 let role = IamRole {
341 role_name: role_name.to_string(),
342 role_id,
343 arn: arn.clone(),
344 path: path.to_string(),
345 assume_role_policy_document: assume_role_policy,
346 created_at: Utc::now(),
347 description: props
348 .get("Description")
349 .and_then(|v| v.as_str())
350 .map(|s| s.to_string()),
351 max_session_duration: 3600,
352 tags: Vec::new(),
353 permissions_boundary: None,
354 };
355
356 state.roles.insert(role_name.to_string(), role);
357 Ok(arn)
358 }
359
360 fn delete_iam_role(&self, physical_id: &str) -> Result<(), String> {
361 let mut state = self.iam_state.write();
362 let role_name = state
364 .roles
365 .iter()
366 .find(|(_, r)| r.arn == physical_id)
367 .map(|(name, _)| name.clone());
368 if let Some(name) = role_name {
369 state.roles.remove(&name);
370 state.role_policies.remove(&name);
371 state.role_inline_policies.remove(&name);
372 }
373 Ok(())
374 }
375
376 fn create_iam_policy(&self, resource: &ResourceDefinition) -> Result<String, String> {
379 let props = &resource.properties;
380 let policy_name = props
381 .get("PolicyName")
382 .and_then(|v| v.as_str())
383 .unwrap_or(&resource.logical_id);
384
385 let policy_document = props
386 .get("PolicyDocument")
387 .map(|v| {
388 if v.is_string() {
389 v.as_str().unwrap().to_string()
390 } else {
391 serde_json::to_string(v).unwrap_or_default()
392 }
393 })
394 .unwrap_or_default();
395
396 let path = props.get("Path").and_then(|v| v.as_str()).unwrap_or("/");
397
398 let mut state = self.iam_state.write();
399 let policy_id = format!(
400 "FSIA{}",
401 &Uuid::new_v4().to_string().replace('-', "").to_uppercase()[..16]
402 );
403 let arn = format!(
404 "arn:aws:iam::{}:policy{}{}",
405 state.account_id,
406 if path == "/" { "/" } else { path },
407 policy_name
408 );
409
410 let now = Utc::now();
411 let policy = IamPolicy {
412 policy_name: policy_name.to_string(),
413 policy_id,
414 arn: arn.clone(),
415 path: path.to_string(),
416 description: props
417 .get("Description")
418 .and_then(|v| v.as_str())
419 .unwrap_or("")
420 .to_string(),
421 created_at: now,
422 tags: Vec::new(),
423 default_version_id: "v1".to_string(),
424 versions: vec![PolicyVersion {
425 version_id: "v1".to_string(),
426 document: policy_document,
427 is_default: true,
428 created_at: now,
429 }],
430 next_version_num: 2,
431 attachment_count: 0,
432 };
433
434 state.policies.insert(arn.clone(), policy);
435 Ok(arn)
436 }
437
438 fn delete_iam_policy(&self, physical_id: &str) -> Result<(), String> {
439 let mut state = self.iam_state.write();
440 state.policies.remove(physical_id);
441 Ok(())
442 }
443
444 fn create_s3_bucket(&self, resource: &ResourceDefinition) -> Result<String, String> {
447 let props = &resource.properties;
448 let bucket_name = props
449 .get("BucketName")
450 .and_then(|v| v.as_str())
451 .unwrap_or(&resource.logical_id);
452
453 let mut state = self.s3_state.write();
454 let bucket = S3Bucket::new(bucket_name, &state.region, &state.account_id);
455 state.buckets.insert(bucket_name.to_string(), bucket);
456 Ok(bucket_name.to_string())
457 }
458
459 fn delete_s3_bucket(&self, physical_id: &str) -> Result<(), String> {
460 let mut state = self.s3_state.write();
461 state.buckets.remove(physical_id);
462 Ok(())
463 }
464
465 fn create_eventbridge_rule(&self, resource: &ResourceDefinition) -> Result<String, String> {
468 let props = &resource.properties;
469 let rule_name = props
470 .get("Name")
471 .and_then(|v| v.as_str())
472 .unwrap_or(&resource.logical_id);
473 let event_bus_name = props
474 .get("EventBusName")
475 .and_then(|v| v.as_str())
476 .unwrap_or("default");
477
478 let mut state = self.eventbridge_state.write();
479
480 if !state.buses.contains_key(event_bus_name) {
482 return Err(format!("Event bus does not exist: {event_bus_name}"));
483 }
484
485 let arn = if event_bus_name == "default" {
486 format!(
487 "arn:aws:events:{}:{}:rule/{}",
488 state.region, state.account_id, rule_name
489 )
490 } else {
491 format!(
492 "arn:aws:events:{}:{}:rule/{}/{}",
493 state.region, state.account_id, event_bus_name, rule_name
494 )
495 };
496
497 let rule = EventRule {
498 name: rule_name.to_string(),
499 arn: arn.clone(),
500 event_bus_name: event_bus_name.to_string(),
501 event_pattern: props.get("EventPattern").map(|v| {
502 if v.is_string() {
503 v.as_str().unwrap().to_string()
504 } else {
505 serde_json::to_string(v).unwrap_or_default()
506 }
507 }),
508 schedule_expression: props
509 .get("ScheduleExpression")
510 .and_then(|v| v.as_str())
511 .map(|s| s.to_string()),
512 state: props
513 .get("State")
514 .and_then(|v| v.as_str())
515 .unwrap_or("ENABLED")
516 .to_string(),
517 description: props
518 .get("Description")
519 .and_then(|v| v.as_str())
520 .map(|s| s.to_string()),
521 role_arn: props
522 .get("RoleArn")
523 .and_then(|v| v.as_str())
524 .map(|s| s.to_string()),
525 managed_by: None,
526 created_by: None,
527 targets: Vec::new(),
528 tags: HashMap::new(),
529 last_fired: None,
530 };
531
532 state
533 .rules
534 .insert((event_bus_name.to_string(), rule_name.to_string()), rule);
535 Ok(arn)
536 }
537
538 fn delete_eventbridge_rule(&self, physical_id: &str) -> Result<(), String> {
539 let mut state = self.eventbridge_state.write();
540 let key = state
542 .rules
543 .iter()
544 .find(|(_, r)| r.arn == physical_id)
545 .map(|(k, _)| k.clone());
546 if let Some(k) = key {
547 state.rules.remove(&k);
548 }
549 Ok(())
550 }
551
552 fn create_dynamodb_table(&self, resource: &ResourceDefinition) -> Result<String, String> {
555 let props = &resource.properties;
556 let table_name = props
557 .get("TableName")
558 .and_then(|v| v.as_str())
559 .unwrap_or(&resource.logical_id);
560
561 let mut key_schema = Vec::new();
562 if let Some(ks) = props.get("KeySchema").and_then(|v| v.as_array()) {
563 for item in ks {
564 let attr_name = item
565 .get("AttributeName")
566 .and_then(|v| v.as_str())
567 .unwrap_or("")
568 .to_string();
569 let key_type = item
570 .get("KeyType")
571 .and_then(|v| v.as_str())
572 .unwrap_or("HASH")
573 .to_string();
574 key_schema.push(KeySchemaElement {
575 attribute_name: attr_name,
576 key_type,
577 });
578 }
579 }
580
581 let mut attribute_definitions = Vec::new();
582 if let Some(defs) = props.get("AttributeDefinitions").and_then(|v| v.as_array()) {
583 for item in defs {
584 let attr_name = item
585 .get("AttributeName")
586 .and_then(|v| v.as_str())
587 .unwrap_or("")
588 .to_string();
589 let attr_type = item
590 .get("AttributeType")
591 .and_then(|v| v.as_str())
592 .unwrap_or("S")
593 .to_string();
594 attribute_definitions.push(AttributeDefinition {
595 attribute_name: attr_name,
596 attribute_type: attr_type,
597 });
598 }
599 }
600
601 let billing_mode = props
602 .get("BillingMode")
603 .and_then(|v| v.as_str())
604 .unwrap_or("PAY_PER_REQUEST")
605 .to_string();
606
607 let provisioned_throughput = if billing_mode == "PROVISIONED" {
608 if let Some(pt) = props.get("ProvisionedThroughput") {
609 ProvisionedThroughput {
610 read_capacity_units: pt
611 .get("ReadCapacityUnits")
612 .and_then(|v| v.as_i64())
613 .unwrap_or(5),
614 write_capacity_units: pt
615 .get("WriteCapacityUnits")
616 .and_then(|v| v.as_i64())
617 .unwrap_or(5),
618 }
619 } else {
620 ProvisionedThroughput {
621 read_capacity_units: 5,
622 write_capacity_units: 5,
623 }
624 }
625 } else {
626 ProvisionedThroughput {
627 read_capacity_units: 0,
628 write_capacity_units: 0,
629 }
630 };
631
632 let mut state = self.dynamodb_state.write();
633 let arn = format!(
634 "arn:aws:dynamodb:{}:{}:table/{}",
635 state.region, state.account_id, table_name
636 );
637
638 let table = DynamoTable {
639 name: table_name.to_string(),
640 arn: arn.clone(),
641 key_schema,
642 attribute_definitions,
643 provisioned_throughput,
644 items: Vec::new(),
645 gsi: Vec::new(),
646 lsi: Vec::new(),
647 tags: HashMap::new(),
648 created_at: Utc::now(),
649 status: "ACTIVE".to_string(),
650 item_count: 0,
651 size_bytes: 0,
652 billing_mode,
653 ttl_attribute: None,
654 ttl_enabled: false,
655 resource_policy: None,
656 pitr_enabled: false,
657 kinesis_destinations: Vec::new(),
658 contributor_insights_status: "DISABLED".to_string(),
659 contributor_insights_counters: HashMap::new(),
660 };
661
662 state.tables.insert(table_name.to_string(), table);
663 Ok(arn)
664 }
665
666 fn delete_dynamodb_table(&self, physical_id: &str) -> Result<(), String> {
667 let mut state = self.dynamodb_state.write();
668 let table_name = state
670 .tables
671 .iter()
672 .find(|(_, t)| t.arn == physical_id)
673 .map(|(name, _)| name.clone());
674 if let Some(name) = table_name {
675 state.tables.remove(&name);
676 }
677 Ok(())
678 }
679
680 fn create_log_group(&self, resource: &ResourceDefinition) -> Result<String, String> {
683 let props = &resource.properties;
684 let log_group_name = props
685 .get("LogGroupName")
686 .and_then(|v| v.as_str())
687 .unwrap_or(&resource.logical_id);
688
689 let retention_in_days = props
690 .get("RetentionInDays")
691 .and_then(|v| v.as_i64())
692 .map(|v| v as i32);
693
694 let mut state = self.logs_state.write();
695 let arn = format!(
696 "arn:aws:logs:{}:{}:log-group:{}:*",
697 state.region, state.account_id, log_group_name
698 );
699
700 let log_group = fakecloud_logs::state::LogGroup {
701 name: log_group_name.to_string(),
702 arn: arn.clone(),
703 creation_time: Utc::now().timestamp_millis(),
704 retention_in_days,
705 kms_key_id: None,
706 stored_bytes: 0,
707 log_streams: HashMap::new(),
708 tags: HashMap::new(),
709 subscription_filters: Vec::new(),
710 data_protection_policy: None,
711 index_policies: Vec::new(),
712 transformer: None,
713 deletion_protection: false,
714 };
715
716 state
717 .log_groups
718 .insert(log_group_name.to_string(), log_group);
719 Ok(arn)
720 }
721
722 fn delete_log_group(&self, physical_id: &str) -> Result<(), String> {
723 let mut state = self.logs_state.write();
724 let name = state
726 .log_groups
727 .iter()
728 .find(|(_, g)| g.arn == physical_id)
729 .map(|(name, _)| name.clone());
730 if let Some(name) = name {
731 state.log_groups.remove(&name);
732 }
733 Ok(())
734 }
735
736 fn invoke_lambda_sync(&self, function_arn: &str, payload: &str) -> Result<(), String> {
740 let delivery = self.delivery.clone();
741 let function_arn = function_arn.to_string();
742 let payload = payload.to_string();
743 std::thread::scope(|s| {
744 s.spawn(|| {
745 let rt = tokio::runtime::Builder::new_current_thread()
746 .enable_all()
747 .build()
748 .map_err(|e| format!("Failed to create runtime: {e}"))?;
749 rt.block_on(async {
750 match delivery.invoke_lambda(&function_arn, &payload).await {
751 Some(Ok(_)) => {
752 tracing::info!(
753 "Custom resource Lambda {} invoked successfully",
754 function_arn
755 );
756 Ok(())
757 }
758 Some(Err(e)) => {
759 tracing::warn!(
760 "Custom resource Lambda {} invocation failed: {e}",
761 function_arn
762 );
763 Err(format!("Lambda invocation failed: {e}"))
764 }
765 None => {
766 tracing::warn!(
767 "No Lambda delivery configured; skipping custom resource invocation for {}",
768 function_arn
769 );
770 Ok(())
771 }
772 }
773 })
774 })
775 .join()
776 .map_err(|_| "Lambda invocation thread panicked".to_string())?
777 })
778 }
779
780 fn create_custom_resource(&self, resource: &ResourceDefinition) -> Result<String, String> {
781 let props = &resource.properties;
782 let service_token = props
783 .get("ServiceToken")
784 .and_then(|v| v.as_str())
785 .ok_or("Custom resource requires ServiceToken property")?;
786
787 let request_id = Uuid::new_v4().to_string();
788
789 let event = serde_json::json!({
791 "RequestType": "Create",
792 "ServiceToken": service_token,
793 "StackId": self.stack_id,
794 "RequestId": request_id,
795 "ResourceType": resource.resource_type,
796 "LogicalResourceId": resource.logical_id,
797 "ResourceProperties": props,
798 });
799
800 let payload = serde_json::to_string(&event).map_err(|e| e.to_string())?;
801 self.invoke_lambda_sync(service_token, &payload)?;
802
803 let physical_id = format!("{}-{}", resource.logical_id, &request_id[..8]);
806 Ok(physical_id)
807 }
808
809 fn delete_custom_resource(&self, resource: &StackResource) -> Result<(), String> {
810 let service_token = match &resource.service_token {
811 Some(token) => token.clone(),
812 None => {
813 return Ok(());
815 }
816 };
817
818 let request_id = Uuid::new_v4().to_string();
819
820 let event = serde_json::json!({
821 "RequestType": "Delete",
822 "ServiceToken": service_token,
823 "StackId": self.stack_id,
824 "RequestId": request_id,
825 "ResourceType": resource.resource_type,
826 "LogicalResourceId": resource.logical_id,
827 "PhysicalResourceId": resource.physical_id,
828 });
829
830 let payload = serde_json::to_string(&event).map_err(|e| e.to_string())?;
831
832 if let Err(e) = self.invoke_lambda_sync(&service_token, &payload) {
834 tracing::warn!(
835 "Custom resource delete Lambda invocation failed for {}: {e}",
836 resource.logical_id
837 );
838 }
839 Ok(())
840 }
841}
842
843#[cfg(test)]
844mod tests {
845 use super::*;
846 use parking_lot::RwLock;
847
848 fn make_provisioner() -> ResourceProvisioner {
849 ResourceProvisioner {
850 sqs_state: Arc::new(RwLock::new(fakecloud_sqs::state::SqsState::new(
851 "123456789012",
852 "us-east-1",
853 "http://localhost:4566",
854 ))),
855 sns_state: Arc::new(RwLock::new(fakecloud_sns::state::SnsState::new(
856 "123456789012",
857 "us-east-1",
858 "http://localhost:4566",
859 ))),
860 ssm_state: Arc::new(RwLock::new(fakecloud_ssm::state::SsmState::new(
861 "123456789012",
862 "us-east-1",
863 ))),
864 iam_state: Arc::new(RwLock::new(fakecloud_iam::state::IamState::new(
865 "123456789012",
866 ))),
867 s3_state: Arc::new(RwLock::new(fakecloud_s3::state::S3State::new(
868 "123456789012",
869 "us-east-1",
870 ))),
871 eventbridge_state: Arc::new(RwLock::new(
872 fakecloud_eventbridge::state::EventBridgeState::new("123456789012", "us-east-1"),
873 )),
874 dynamodb_state: Arc::new(RwLock::new(fakecloud_dynamodb::state::DynamoDbState::new(
875 "123456789012",
876 "us-east-1",
877 ))),
878 logs_state: Arc::new(RwLock::new(fakecloud_logs::state::LogsState::new(
879 "123456789012",
880 "us-east-1",
881 ))),
882 delivery: Arc::new(DeliveryBus::new()),
883 account_id: "123456789012".to_string(),
884 region: "us-east-1".to_string(),
885 stack_id: "arn:aws:cloudformation:us-east-1:123456789012:stack/test/00000000-0000-0000-0000-000000000000".to_string(),
886 }
887 }
888
889 fn make_resource(
890 resource_type: &str,
891 logical_id: &str,
892 props: serde_json::Value,
893 ) -> ResourceDefinition {
894 ResourceDefinition {
895 logical_id: logical_id.to_string(),
896 resource_type: resource_type.to_string(),
897 properties: props,
898 }
899 }
900
901 #[test]
902 fn sns_subscription_rejects_nonexistent_topic() {
903 let prov = make_provisioner();
904 let resource = make_resource(
905 "AWS::SNS::Subscription",
906 "MySub",
907 serde_json::json!({
908 "TopicArn": "arn:aws:sns:us-east-1:123456789012:NonExistent",
909 "Protocol": "sqs",
910 "Endpoint": "arn:aws:sqs:us-east-1:123456789012:my-queue"
911 }),
912 );
913 let result = prov.create_resource(&resource);
914 assert!(result.is_err());
915 assert!(result.unwrap_err().contains("does not exist"));
916 }
917
918 #[test]
919 fn sns_subscription_succeeds_when_topic_exists() {
920 let prov = make_provisioner();
921 let topic = make_resource(
923 "AWS::SNS::Topic",
924 "MyTopic",
925 serde_json::json!({ "TopicName": "my-topic" }),
926 );
927 let topic_result = prov.create_resource(&topic);
928 assert!(topic_result.is_ok());
929 let topic_arn = topic_result.unwrap().physical_id;
930
931 let sub = make_resource(
933 "AWS::SNS::Subscription",
934 "MySub",
935 serde_json::json!({
936 "TopicArn": topic_arn,
937 "Protocol": "sqs",
938 "Endpoint": "arn:aws:sqs:us-east-1:123456789012:my-queue"
939 }),
940 );
941 let result = prov.create_resource(&sub);
942 assert!(result.is_ok());
943 }
944
945 #[test]
946 fn eventbridge_rule_arn_default_bus_omits_bus_name() {
947 let prov = make_provisioner();
948 let resource = make_resource(
949 "AWS::Events::Rule",
950 "MyRule",
951 serde_json::json!({
952 "Name": "my-rule",
953 "ScheduleExpression": "rate(1 hour)"
954 }),
955 );
956 let result = prov.create_resource(&resource).unwrap();
957 assert_eq!(
959 result.physical_id,
960 "arn:aws:events:us-east-1:123456789012:rule/my-rule"
961 );
962 assert!(!result.physical_id.contains("rule/default/"));
963 }
964
965 #[test]
966 fn eventbridge_rule_arn_custom_bus_includes_bus_name() {
967 let prov = make_provisioner();
968 {
970 let mut state = prov.eventbridge_state.write();
971 state.buses.insert(
972 "custom-bus".to_string(),
973 fakecloud_eventbridge::state::EventBus {
974 name: "custom-bus".to_string(),
975 arn: "arn:aws:events:us-east-1:123456789012:event-bus/custom-bus".to_string(),
976 policy: None,
977 creation_time: Utc::now(),
978 last_modified_time: Utc::now(),
979 description: None,
980 kms_key_identifier: None,
981 dead_letter_config: None,
982 tags: HashMap::new(),
983 },
984 );
985 }
986 let resource = make_resource(
987 "AWS::Events::Rule",
988 "MyRule",
989 serde_json::json!({
990 "Name": "my-rule",
991 "EventBusName": "custom-bus",
992 "ScheduleExpression": "rate(1 hour)"
993 }),
994 );
995 let result = prov.create_resource(&resource).unwrap();
996 assert_eq!(
997 result.physical_id,
998 "arn:aws:events:us-east-1:123456789012:rule/custom-bus/my-rule"
999 );
1000 }
1001
1002 #[test]
1003 fn eventbridge_rule_rejects_nonexistent_bus() {
1004 let prov = make_provisioner();
1005 let resource = make_resource(
1006 "AWS::Events::Rule",
1007 "MyRule",
1008 serde_json::json!({
1009 "Name": "my-rule",
1010 "EventBusName": "nonexistent-bus",
1011 "ScheduleExpression": "rate(1 hour)"
1012 }),
1013 );
1014 let result = prov.create_resource(&resource);
1015 assert!(result.is_err());
1016 assert!(result.unwrap_err().contains("does not exist"));
1017 }
1018
1019 #[test]
1020 fn custom_resource_requires_service_token() {
1021 let prov = make_provisioner();
1022 let resource = make_resource(
1023 "Custom::MyResource",
1024 "MyCustom",
1025 serde_json::json!({
1026 "Foo": "bar"
1027 }),
1028 );
1029 let result = prov.create_resource(&resource);
1030 assert!(result.is_err());
1031 assert!(
1032 result.unwrap_err().contains("ServiceToken"),
1033 "Should require ServiceToken property"
1034 );
1035 }
1036
1037 #[test]
1038 fn custom_resource_succeeds_without_lambda_delivery() {
1039 let prov = make_provisioner();
1042 let resource = make_resource(
1043 "Custom::MyResource",
1044 "MyCustom",
1045 serde_json::json!({
1046 "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:my-func",
1047 "Foo": "bar"
1048 }),
1049 );
1050 let result = prov.create_resource(&resource);
1051 assert!(result.is_ok());
1052 let sr = result.unwrap();
1053 assert_eq!(sr.logical_id, "MyCustom");
1054 assert_eq!(sr.resource_type, "Custom::MyResource");
1055 assert!(sr.physical_id.starts_with("MyCustom-"));
1056 }
1057
1058 #[test]
1059 fn cloudformation_custom_resource_type_succeeds() {
1060 let prov = make_provisioner();
1061 let resource = make_resource(
1062 "AWS::CloudFormation::CustomResource",
1063 "MyCustom2",
1064 serde_json::json!({
1065 "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:my-func",
1066 "Key": "value"
1067 }),
1068 );
1069 let result = prov.create_resource(&resource);
1070 assert!(result.is_ok());
1071 let sr = result.unwrap();
1072 assert_eq!(sr.resource_type, "AWS::CloudFormation::CustomResource");
1073 }
1074}