1use chrono::Utc;
2use parking_lot::RwLock;
3use std::collections::HashMap;
4use std::sync::Arc;
5use uuid::Uuid;
6
7use fakecloud_core::delivery::DeliveryBus;
8use fakecloud_dynamodb::state::{
9 AttributeDefinition, DynamoTable, KeySchemaElement, ProvisionedThroughput, SharedDynamoDbState,
10};
11use fakecloud_eventbridge::state::{EventRule, SharedEventBridgeState};
12use fakecloud_iam::state::{IamPolicy, IamRole, PolicyVersion, SharedIamState};
13use fakecloud_logs::state::SharedLogsState;
14use fakecloud_s3::state::{S3Bucket, SharedS3State};
15use fakecloud_sns::state::{SharedSnsState, SnsSubscription, SnsTopic};
16use fakecloud_sqs::state::{SharedSqsState, SqsQueue};
17use fakecloud_ssm::state::{SharedSsmState, SsmParameter};
18
19use crate::state::StackResource;
20use crate::template::ResourceDefinition;
21
22pub struct ResourceProvisioner {
24 pub sqs_state: SharedSqsState,
25 pub sns_state: SharedSnsState,
26 pub ssm_state: SharedSsmState,
27 pub iam_state: SharedIamState,
28 pub s3_state: SharedS3State,
29 pub eventbridge_state: SharedEventBridgeState,
30 pub dynamodb_state: SharedDynamoDbState,
31 pub logs_state: SharedLogsState,
32 pub delivery: Arc<DeliveryBus>,
33 pub account_id: String,
34 pub region: String,
35 pub stack_id: String,
36}
37
38impl ResourceProvisioner {
39 pub fn create_resource(&self, resource: &ResourceDefinition) -> Result<StackResource, String> {
41 let result = match resource.resource_type.as_str() {
42 "AWS::SQS::Queue" => self.create_sqs_queue(resource),
43 "AWS::SNS::Topic" => self.create_sns_topic(resource),
44 "AWS::SNS::Subscription" => self.create_sns_subscription(resource),
45 "AWS::SSM::Parameter" => self.create_ssm_parameter(resource),
46 "AWS::IAM::Role" => self.create_iam_role(resource),
47 "AWS::IAM::Policy" => self.create_iam_policy(resource),
48 "AWS::S3::Bucket" => self.create_s3_bucket(resource),
49 "AWS::Events::Rule" => self.create_eventbridge_rule(resource),
50 "AWS::DynamoDB::Table" => self.create_dynamodb_table(resource),
51 "AWS::Logs::LogGroup" => self.create_log_group(resource),
52 t if t.starts_with("Custom::") || t == "AWS::CloudFormation::CustomResource" => {
53 self.create_custom_resource(resource)
54 }
55 other => Err(format!("Unsupported resource type: {other}")),
56 };
57
58 let is_custom = resource.resource_type.starts_with("Custom::")
59 || resource.resource_type == "AWS::CloudFormation::CustomResource";
60 let service_token = if is_custom {
61 resource
62 .properties
63 .get("ServiceToken")
64 .and_then(|v| v.as_str())
65 .map(|s| s.to_string())
66 } else {
67 None
68 };
69
70 result.map(|physical_id| StackResource {
71 logical_id: resource.logical_id.clone(),
72 physical_id,
73 resource_type: resource.resource_type.clone(),
74 status: "CREATE_COMPLETE".to_string(),
75 service_token,
76 })
77 }
78
79 pub fn delete_resource(&self, resource: &StackResource) -> Result<(), String> {
81 match resource.resource_type.as_str() {
82 "AWS::SQS::Queue" => self.delete_sqs_queue(&resource.physical_id),
83 "AWS::SNS::Topic" => self.delete_sns_topic(&resource.physical_id),
84 "AWS::SNS::Subscription" => self.delete_sns_subscription(&resource.physical_id),
85 "AWS::SSM::Parameter" => self.delete_ssm_parameter(&resource.physical_id),
86 "AWS::IAM::Role" => self.delete_iam_role(&resource.physical_id),
87 "AWS::IAM::Policy" => self.delete_iam_policy(&resource.physical_id),
88 "AWS::S3::Bucket" => self.delete_s3_bucket(&resource.physical_id),
89 "AWS::Events::Rule" => self.delete_eventbridge_rule(&resource.physical_id),
90 "AWS::DynamoDB::Table" => self.delete_dynamodb_table(&resource.physical_id),
91 "AWS::Logs::LogGroup" => self.delete_log_group(&resource.physical_id),
92 t if t.starts_with("Custom::") || t == "AWS::CloudFormation::CustomResource" => {
93 self.delete_custom_resource(resource)
94 }
95 other => Err(format!("Unsupported resource type: {other}")),
96 }
97 }
98
99 fn create_sqs_queue(&self, resource: &ResourceDefinition) -> Result<String, String> {
102 let props = &resource.properties;
103 let queue_name = props
104 .get("QueueName")
105 .and_then(|v| v.as_str())
106 .unwrap_or(&resource.logical_id);
107
108 let mut state = self.sqs_state.write();
109 let queue_url = format!("{}/{}/{}", state.endpoint, state.account_id, queue_name);
110 let arn = format!(
111 "arn:aws:sqs:{}:{}:{}",
112 state.region, state.account_id, queue_name
113 );
114
115 let is_fifo = queue_name.ends_with(".fifo");
116 let mut attributes = HashMap::new();
117 if let Some(obj) = props.as_object() {
118 for (k, v) in obj {
119 if k != "QueueName" {
120 if let Some(s) = v.as_str() {
121 attributes.insert(k.clone(), s.to_string());
122 } else if let Some(n) = v.as_i64() {
123 attributes.insert(k.clone(), n.to_string());
124 }
125 }
126 }
127 }
128
129 let queue = SqsQueue {
130 queue_name: queue_name.to_string(),
131 queue_url: queue_url.clone(),
132 arn,
133 created_at: Utc::now(),
134 messages: std::collections::VecDeque::new(),
135 inflight: Vec::new(),
136 attributes,
137 is_fifo,
138 dedup_cache: HashMap::new(),
139 redrive_policy: None,
140 tags: HashMap::new(),
141 next_sequence_number: 0,
142 permission_labels: Vec::new(),
143 receipt_handle_map: HashMap::new(),
144 };
145
146 state
147 .name_to_url
148 .insert(queue_name.to_string(), queue_url.clone());
149 state.queues.insert(queue_url.clone(), queue);
150
151 Ok(queue_url)
152 }
153
154 fn delete_sqs_queue(&self, physical_id: &str) -> Result<(), String> {
155 let mut state = self.sqs_state.write();
156 if let Some(queue) = state.queues.remove(physical_id) {
157 state.name_to_url.remove(&queue.queue_name);
158 }
159 Ok(())
160 }
161
162 fn create_sns_topic(&self, resource: &ResourceDefinition) -> Result<String, String> {
165 let props = &resource.properties;
166 let topic_name = props
167 .get("TopicName")
168 .and_then(|v| v.as_str())
169 .unwrap_or(&resource.logical_id);
170
171 let mut state = self.sns_state.write();
172 let topic_arn = format!(
173 "arn:aws:sns:{}:{}:{}",
174 state.region, state.account_id, topic_name
175 );
176
177 let topic = SnsTopic {
178 topic_arn: topic_arn.clone(),
179 name: topic_name.to_string(),
180 attributes: HashMap::new(),
181 tags: Vec::new(),
182 is_fifo: topic_name.ends_with(".fifo"),
183 created_at: Utc::now(),
184 };
185
186 state.topics.insert(topic_arn.clone(), topic);
187 Ok(topic_arn)
188 }
189
190 fn delete_sns_topic(&self, physical_id: &str) -> Result<(), String> {
191 let mut state = self.sns_state.write();
192 state.topics.remove(physical_id);
193 state
195 .subscriptions
196 .retain(|_, sub| sub.topic_arn != physical_id);
197 Ok(())
198 }
199
200 fn create_sns_subscription(&self, resource: &ResourceDefinition) -> Result<String, String> {
203 let props = &resource.properties;
204 let topic_arn = props
205 .get("TopicArn")
206 .and_then(|v| v.as_str())
207 .ok_or("SNS Subscription requires TopicArn")?;
208 let protocol = props
209 .get("Protocol")
210 .and_then(|v| v.as_str())
211 .ok_or("SNS Subscription requires Protocol")?;
212 let endpoint = props
213 .get("Endpoint")
214 .and_then(|v| v.as_str())
215 .ok_or("SNS Subscription requires Endpoint")?;
216
217 let mut state = self.sns_state.write();
218
219 if !state.topics.contains_key(topic_arn) {
221 return Err(format!("Topic ARN does not exist: {topic_arn}"));
222 }
223
224 let sub_arn = format!("{}:{}", topic_arn, Uuid::new_v4());
225
226 let subscription = SnsSubscription {
227 subscription_arn: sub_arn.clone(),
228 topic_arn: topic_arn.to_string(),
229 protocol: protocol.to_string(),
230 endpoint: endpoint.to_string(),
231 owner: state.account_id.clone(),
232 attributes: HashMap::new(),
233 confirmed: true,
234 confirmation_token: None,
235 };
236
237 state.subscriptions.insert(sub_arn.clone(), subscription);
238 Ok(sub_arn)
239 }
240
241 fn delete_sns_subscription(&self, physical_id: &str) -> Result<(), String> {
242 let mut state = self.sns_state.write();
243 state.subscriptions.remove(physical_id);
244 Ok(())
245 }
246
247 fn create_ssm_parameter(&self, resource: &ResourceDefinition) -> Result<String, String> {
250 let props = &resource.properties;
251 let name = props
252 .get("Name")
253 .and_then(|v| v.as_str())
254 .ok_or("SSM Parameter requires Name")?;
255 let value = props
256 .get("Value")
257 .and_then(|v| v.as_str())
258 .ok_or("SSM Parameter requires Value")?;
259 let param_type = props
260 .get("Type")
261 .and_then(|v| v.as_str())
262 .unwrap_or("String");
263
264 let mut state = self.ssm_state.write();
265 let arn = format!(
266 "arn:aws:ssm:{}:{}:parameter{}",
267 state.region,
268 state.account_id,
269 if name.starts_with('/') {
270 name.to_string()
271 } else {
272 format!("/{name}")
273 }
274 );
275
276 let parameter = SsmParameter {
277 name: name.to_string(),
278 value: value.to_string(),
279 param_type: param_type.to_string(),
280 version: 1,
281 arn: arn.clone(),
282 last_modified: Utc::now(),
283 history: Vec::new(),
284 tags: HashMap::new(),
285 labels: HashMap::new(),
286 description: props
287 .get("Description")
288 .and_then(|v| v.as_str())
289 .map(|s| s.to_string()),
290 allowed_pattern: None,
291 key_id: None,
292 data_type: "text".to_string(),
293 tier: "Standard".to_string(),
294 policies: None,
295 };
296
297 state.parameters.insert(name.to_string(), parameter);
298 Ok(name.to_string())
299 }
300
301 fn delete_ssm_parameter(&self, physical_id: &str) -> Result<(), String> {
302 let mut state = self.ssm_state.write();
303 state.parameters.remove(physical_id);
304 Ok(())
305 }
306
307 fn create_iam_role(&self, resource: &ResourceDefinition) -> Result<String, String> {
310 let props = &resource.properties;
311 let role_name = props
312 .get("RoleName")
313 .and_then(|v| v.as_str())
314 .unwrap_or(&resource.logical_id);
315
316 let assume_role_policy = props
317 .get("AssumeRolePolicyDocument")
318 .map(|v| {
319 if v.is_string() {
320 v.as_str().unwrap().to_string()
321 } else {
322 serde_json::to_string(v).unwrap_or_default()
323 }
324 })
325 .unwrap_or_default();
326
327 let path = props.get("Path").and_then(|v| v.as_str()).unwrap_or("/");
328
329 let mut state = self.iam_state.write();
330 let role_id = format!(
331 "FKIA{}",
332 &Uuid::new_v4().to_string().replace('-', "").to_uppercase()[..16]
333 );
334 let arn = format!(
335 "arn:aws:iam::{}:role{}{}",
336 state.account_id,
337 if path == "/" { "/" } else { path },
338 role_name
339 );
340
341 let role = IamRole {
342 role_name: role_name.to_string(),
343 role_id,
344 arn: arn.clone(),
345 path: path.to_string(),
346 assume_role_policy_document: assume_role_policy,
347 created_at: Utc::now(),
348 description: props
349 .get("Description")
350 .and_then(|v| v.as_str())
351 .map(|s| s.to_string()),
352 max_session_duration: 3600,
353 tags: Vec::new(),
354 permissions_boundary: None,
355 };
356
357 state.roles.insert(role_name.to_string(), role);
358 Ok(arn)
359 }
360
361 fn delete_iam_role(&self, physical_id: &str) -> Result<(), String> {
362 let mut state = self.iam_state.write();
363 let role_name = state
365 .roles
366 .iter()
367 .find(|(_, r)| r.arn == physical_id)
368 .map(|(name, _)| name.clone());
369 if let Some(name) = role_name {
370 state.roles.remove(&name);
371 state.role_policies.remove(&name);
372 state.role_inline_policies.remove(&name);
373 }
374 Ok(())
375 }
376
377 fn create_iam_policy(&self, resource: &ResourceDefinition) -> Result<String, String> {
380 let props = &resource.properties;
381 let policy_name = props
382 .get("PolicyName")
383 .and_then(|v| v.as_str())
384 .unwrap_or(&resource.logical_id);
385
386 let policy_document = props
387 .get("PolicyDocument")
388 .map(|v| {
389 if v.is_string() {
390 v.as_str().unwrap().to_string()
391 } else {
392 serde_json::to_string(v).unwrap_or_default()
393 }
394 })
395 .unwrap_or_default();
396
397 let path = props.get("Path").and_then(|v| v.as_str()).unwrap_or("/");
398
399 let mut state = self.iam_state.write();
400 let policy_id = format!(
401 "FSIA{}",
402 &Uuid::new_v4().to_string().replace('-', "").to_uppercase()[..16]
403 );
404 let arn = format!(
405 "arn:aws:iam::{}:policy{}{}",
406 state.account_id,
407 if path == "/" { "/" } else { path },
408 policy_name
409 );
410
411 let now = Utc::now();
412 let policy = IamPolicy {
413 policy_name: policy_name.to_string(),
414 policy_id,
415 arn: arn.clone(),
416 path: path.to_string(),
417 description: props
418 .get("Description")
419 .and_then(|v| v.as_str())
420 .unwrap_or("")
421 .to_string(),
422 created_at: now,
423 tags: Vec::new(),
424 default_version_id: "v1".to_string(),
425 versions: vec![PolicyVersion {
426 version_id: "v1".to_string(),
427 document: policy_document,
428 is_default: true,
429 created_at: now,
430 }],
431 next_version_num: 2,
432 attachment_count: 0,
433 };
434
435 state.policies.insert(arn.clone(), policy);
436 Ok(arn)
437 }
438
439 fn delete_iam_policy(&self, physical_id: &str) -> Result<(), String> {
440 let mut state = self.iam_state.write();
441 state.policies.remove(physical_id);
442 Ok(())
443 }
444
445 fn create_s3_bucket(&self, resource: &ResourceDefinition) -> Result<String, String> {
448 let props = &resource.properties;
449 let bucket_name = props
450 .get("BucketName")
451 .and_then(|v| v.as_str())
452 .unwrap_or(&resource.logical_id);
453
454 let mut state = self.s3_state.write();
455 let bucket = S3Bucket::new(bucket_name, &state.region, &state.account_id);
456 state.buckets.insert(bucket_name.to_string(), bucket);
457 Ok(bucket_name.to_string())
458 }
459
460 fn delete_s3_bucket(&self, physical_id: &str) -> Result<(), String> {
461 let mut state = self.s3_state.write();
462 state.buckets.remove(physical_id);
463 Ok(())
464 }
465
466 fn create_eventbridge_rule(&self, resource: &ResourceDefinition) -> Result<String, String> {
469 let props = &resource.properties;
470 let rule_name = props
471 .get("Name")
472 .and_then(|v| v.as_str())
473 .unwrap_or(&resource.logical_id);
474 let event_bus_name = props
475 .get("EventBusName")
476 .and_then(|v| v.as_str())
477 .unwrap_or("default");
478
479 let mut state = self.eventbridge_state.write();
480
481 if !state.buses.contains_key(event_bus_name) {
483 return Err(format!("Event bus does not exist: {event_bus_name}"));
484 }
485
486 let arn = if event_bus_name == "default" {
487 format!(
488 "arn:aws:events:{}:{}:rule/{}",
489 state.region, state.account_id, rule_name
490 )
491 } else {
492 format!(
493 "arn:aws:events:{}:{}:rule/{}/{}",
494 state.region, state.account_id, event_bus_name, rule_name
495 )
496 };
497
498 let rule = EventRule {
499 name: rule_name.to_string(),
500 arn: arn.clone(),
501 event_bus_name: event_bus_name.to_string(),
502 event_pattern: props.get("EventPattern").map(|v| {
503 if v.is_string() {
504 v.as_str().unwrap().to_string()
505 } else {
506 serde_json::to_string(v).unwrap_or_default()
507 }
508 }),
509 schedule_expression: props
510 .get("ScheduleExpression")
511 .and_then(|v| v.as_str())
512 .map(|s| s.to_string()),
513 state: props
514 .get("State")
515 .and_then(|v| v.as_str())
516 .unwrap_or("ENABLED")
517 .to_string(),
518 description: props
519 .get("Description")
520 .and_then(|v| v.as_str())
521 .map(|s| s.to_string()),
522 role_arn: props
523 .get("RoleArn")
524 .and_then(|v| v.as_str())
525 .map(|s| s.to_string()),
526 managed_by: None,
527 created_by: None,
528 targets: Vec::new(),
529 tags: HashMap::new(),
530 last_fired: None,
531 };
532
533 state
534 .rules
535 .insert((event_bus_name.to_string(), rule_name.to_string()), rule);
536 Ok(arn)
537 }
538
539 fn delete_eventbridge_rule(&self, physical_id: &str) -> Result<(), String> {
540 let mut state = self.eventbridge_state.write();
541 let key = state
543 .rules
544 .iter()
545 .find(|(_, r)| r.arn == physical_id)
546 .map(|(k, _)| k.clone());
547 if let Some(k) = key {
548 state.rules.remove(&k);
549 }
550 Ok(())
551 }
552
553 fn create_dynamodb_table(&self, resource: &ResourceDefinition) -> Result<String, String> {
556 let props = &resource.properties;
557 let table_name = props
558 .get("TableName")
559 .and_then(|v| v.as_str())
560 .unwrap_or(&resource.logical_id);
561
562 let mut key_schema = Vec::new();
563 if let Some(ks) = props.get("KeySchema").and_then(|v| v.as_array()) {
564 for item in ks {
565 let attr_name = item
566 .get("AttributeName")
567 .and_then(|v| v.as_str())
568 .unwrap_or("")
569 .to_string();
570 let key_type = item
571 .get("KeyType")
572 .and_then(|v| v.as_str())
573 .unwrap_or("HASH")
574 .to_string();
575 key_schema.push(KeySchemaElement {
576 attribute_name: attr_name,
577 key_type,
578 });
579 }
580 }
581
582 let mut attribute_definitions = Vec::new();
583 if let Some(defs) = props.get("AttributeDefinitions").and_then(|v| v.as_array()) {
584 for item in defs {
585 let attr_name = item
586 .get("AttributeName")
587 .and_then(|v| v.as_str())
588 .unwrap_or("")
589 .to_string();
590 let attr_type = item
591 .get("AttributeType")
592 .and_then(|v| v.as_str())
593 .unwrap_or("S")
594 .to_string();
595 attribute_definitions.push(AttributeDefinition {
596 attribute_name: attr_name,
597 attribute_type: attr_type,
598 });
599 }
600 }
601
602 let billing_mode = props
603 .get("BillingMode")
604 .and_then(|v| v.as_str())
605 .unwrap_or("PAY_PER_REQUEST")
606 .to_string();
607
608 let provisioned_throughput = if billing_mode == "PROVISIONED" {
609 if let Some(pt) = props.get("ProvisionedThroughput") {
610 ProvisionedThroughput {
611 read_capacity_units: pt
612 .get("ReadCapacityUnits")
613 .and_then(|v| v.as_i64())
614 .unwrap_or(5),
615 write_capacity_units: pt
616 .get("WriteCapacityUnits")
617 .and_then(|v| v.as_i64())
618 .unwrap_or(5),
619 }
620 } else {
621 ProvisionedThroughput {
622 read_capacity_units: 5,
623 write_capacity_units: 5,
624 }
625 }
626 } else {
627 ProvisionedThroughput {
628 read_capacity_units: 0,
629 write_capacity_units: 0,
630 }
631 };
632
633 let mut state = self.dynamodb_state.write();
634 let arn = format!(
635 "arn:aws:dynamodb:{}:{}:table/{}",
636 state.region, state.account_id, table_name
637 );
638
639 let table = DynamoTable {
640 name: table_name.to_string(),
641 arn: arn.clone(),
642 key_schema,
643 attribute_definitions,
644 provisioned_throughput,
645 items: Vec::new(),
646 gsi: Vec::new(),
647 lsi: Vec::new(),
648 tags: HashMap::new(),
649 created_at: Utc::now(),
650 status: "ACTIVE".to_string(),
651 item_count: 0,
652 size_bytes: 0,
653 billing_mode,
654 ttl_attribute: None,
655 ttl_enabled: false,
656 resource_policy: None,
657 pitr_enabled: false,
658 kinesis_destinations: Vec::new(),
659 contributor_insights_status: "DISABLED".to_string(),
660 contributor_insights_counters: HashMap::new(),
661 stream_enabled: false,
662 stream_view_type: None,
663 stream_arn: None,
664 stream_records: Arc::new(RwLock::new(Vec::new())),
665 sse_type: None,
666 sse_kms_key_arn: None,
667 };
668
669 state.tables.insert(table_name.to_string(), table);
670 Ok(arn)
671 }
672
673 fn delete_dynamodb_table(&self, physical_id: &str) -> Result<(), String> {
674 let mut state = self.dynamodb_state.write();
675 let table_name = state
677 .tables
678 .iter()
679 .find(|(_, t)| t.arn == physical_id)
680 .map(|(name, _)| name.clone());
681 if let Some(name) = table_name {
682 state.tables.remove(&name);
683 }
684 Ok(())
685 }
686
687 fn create_log_group(&self, resource: &ResourceDefinition) -> Result<String, String> {
690 let props = &resource.properties;
691 let log_group_name = props
692 .get("LogGroupName")
693 .and_then(|v| v.as_str())
694 .unwrap_or(&resource.logical_id);
695
696 let retention_in_days = props
697 .get("RetentionInDays")
698 .and_then(|v| v.as_i64())
699 .map(|v| v as i32);
700
701 let mut state = self.logs_state.write();
702 let arn = format!(
703 "arn:aws:logs:{}:{}:log-group:{}:*",
704 state.region, state.account_id, log_group_name
705 );
706
707 let log_group = fakecloud_logs::state::LogGroup {
708 name: log_group_name.to_string(),
709 arn: arn.clone(),
710 creation_time: Utc::now().timestamp_millis(),
711 retention_in_days,
712 kms_key_id: None,
713 stored_bytes: 0,
714 log_streams: HashMap::new(),
715 tags: HashMap::new(),
716 subscription_filters: Vec::new(),
717 data_protection_policy: None,
718 index_policies: Vec::new(),
719 transformer: None,
720 deletion_protection: false,
721 };
722
723 state
724 .log_groups
725 .insert(log_group_name.to_string(), log_group);
726 Ok(arn)
727 }
728
729 fn delete_log_group(&self, physical_id: &str) -> Result<(), String> {
730 let mut state = self.logs_state.write();
731 let name = state
733 .log_groups
734 .iter()
735 .find(|(_, g)| g.arn == physical_id)
736 .map(|(name, _)| name.clone());
737 if let Some(name) = name {
738 state.log_groups.remove(&name);
739 }
740 Ok(())
741 }
742
743 fn invoke_lambda_sync(&self, function_arn: &str, payload: &str) -> Result<(), String> {
747 let delivery = self.delivery.clone();
748 let function_arn = function_arn.to_string();
749 let payload = payload.to_string();
750 std::thread::scope(|s| {
751 s.spawn(|| {
752 let rt = tokio::runtime::Builder::new_current_thread()
753 .enable_all()
754 .build()
755 .map_err(|e| format!("Failed to create runtime: {e}"))?;
756 rt.block_on(async {
757 match delivery.invoke_lambda(&function_arn, &payload).await {
758 Some(Ok(_)) => {
759 tracing::info!(
760 "Custom resource Lambda {} invoked successfully",
761 function_arn
762 );
763 Ok(())
764 }
765 Some(Err(e)) => {
766 tracing::warn!(
767 "Custom resource Lambda {} invocation failed: {e}",
768 function_arn
769 );
770 Err(format!("Lambda invocation failed: {e}"))
771 }
772 None => {
773 tracing::warn!(
774 "No Lambda delivery configured; skipping custom resource invocation for {}",
775 function_arn
776 );
777 Ok(())
778 }
779 }
780 })
781 })
782 .join()
783 .map_err(|_| "Lambda invocation thread panicked".to_string())?
784 })
785 }
786
787 fn create_custom_resource(&self, resource: &ResourceDefinition) -> Result<String, String> {
788 let props = &resource.properties;
789 let service_token = props
790 .get("ServiceToken")
791 .and_then(|v| v.as_str())
792 .ok_or("Custom resource requires ServiceToken property")?;
793
794 let request_id = Uuid::new_v4().to_string();
795
796 let event = serde_json::json!({
798 "RequestType": "Create",
799 "ServiceToken": service_token,
800 "StackId": self.stack_id,
801 "RequestId": request_id,
802 "ResourceType": resource.resource_type,
803 "LogicalResourceId": resource.logical_id,
804 "ResourceProperties": props,
805 });
806
807 let payload = serde_json::to_string(&event).map_err(|e| e.to_string())?;
808 self.invoke_lambda_sync(service_token, &payload)?;
809
810 let physical_id = format!("{}-{}", resource.logical_id, &request_id[..8]);
813 Ok(physical_id)
814 }
815
816 fn delete_custom_resource(&self, resource: &StackResource) -> Result<(), String> {
817 let service_token = match &resource.service_token {
818 Some(token) => token.clone(),
819 None => {
820 return Ok(());
822 }
823 };
824
825 let request_id = Uuid::new_v4().to_string();
826
827 let event = serde_json::json!({
828 "RequestType": "Delete",
829 "ServiceToken": service_token,
830 "StackId": self.stack_id,
831 "RequestId": request_id,
832 "ResourceType": resource.resource_type,
833 "LogicalResourceId": resource.logical_id,
834 "PhysicalResourceId": resource.physical_id,
835 });
836
837 let payload = serde_json::to_string(&event).map_err(|e| e.to_string())?;
838
839 if let Err(e) = self.invoke_lambda_sync(&service_token, &payload) {
841 tracing::warn!(
842 "Custom resource delete Lambda invocation failed for {}: {e}",
843 resource.logical_id
844 );
845 }
846 Ok(())
847 }
848}
849
850#[cfg(test)]
851mod tests {
852 use super::*;
853 use parking_lot::RwLock;
854
855 fn make_provisioner() -> ResourceProvisioner {
856 ResourceProvisioner {
857 sqs_state: Arc::new(RwLock::new(fakecloud_sqs::state::SqsState::new(
858 "123456789012",
859 "us-east-1",
860 "http://localhost:4566",
861 ))),
862 sns_state: Arc::new(RwLock::new(fakecloud_sns::state::SnsState::new(
863 "123456789012",
864 "us-east-1",
865 "http://localhost:4566",
866 ))),
867 ssm_state: Arc::new(RwLock::new(fakecloud_ssm::state::SsmState::new(
868 "123456789012",
869 "us-east-1",
870 ))),
871 iam_state: Arc::new(RwLock::new(fakecloud_iam::state::IamState::new(
872 "123456789012",
873 ))),
874 s3_state: Arc::new(RwLock::new(fakecloud_s3::state::S3State::new(
875 "123456789012",
876 "us-east-1",
877 ))),
878 eventbridge_state: Arc::new(RwLock::new(
879 fakecloud_eventbridge::state::EventBridgeState::new("123456789012", "us-east-1"),
880 )),
881 dynamodb_state: Arc::new(RwLock::new(fakecloud_dynamodb::state::DynamoDbState::new(
882 "123456789012",
883 "us-east-1",
884 ))),
885 logs_state: Arc::new(RwLock::new(fakecloud_logs::state::LogsState::new(
886 "123456789012",
887 "us-east-1",
888 ))),
889 delivery: Arc::new(DeliveryBus::new()),
890 account_id: "123456789012".to_string(),
891 region: "us-east-1".to_string(),
892 stack_id: "arn:aws:cloudformation:us-east-1:123456789012:stack/test/00000000-0000-0000-0000-000000000000".to_string(),
893 }
894 }
895
896 fn make_resource(
897 resource_type: &str,
898 logical_id: &str,
899 props: serde_json::Value,
900 ) -> ResourceDefinition {
901 ResourceDefinition {
902 logical_id: logical_id.to_string(),
903 resource_type: resource_type.to_string(),
904 properties: props,
905 }
906 }
907
908 #[test]
909 fn sns_subscription_rejects_nonexistent_topic() {
910 let prov = make_provisioner();
911 let resource = make_resource(
912 "AWS::SNS::Subscription",
913 "MySub",
914 serde_json::json!({
915 "TopicArn": "arn:aws:sns:us-east-1:123456789012:NonExistent",
916 "Protocol": "sqs",
917 "Endpoint": "arn:aws:sqs:us-east-1:123456789012:my-queue"
918 }),
919 );
920 let result = prov.create_resource(&resource);
921 assert!(result.is_err());
922 assert!(result.unwrap_err().contains("does not exist"));
923 }
924
925 #[test]
926 fn sns_subscription_succeeds_when_topic_exists() {
927 let prov = make_provisioner();
928 let topic = make_resource(
930 "AWS::SNS::Topic",
931 "MyTopic",
932 serde_json::json!({ "TopicName": "my-topic" }),
933 );
934 let topic_result = prov.create_resource(&topic);
935 assert!(topic_result.is_ok());
936 let topic_arn = topic_result.unwrap().physical_id;
937
938 let sub = make_resource(
940 "AWS::SNS::Subscription",
941 "MySub",
942 serde_json::json!({
943 "TopicArn": topic_arn,
944 "Protocol": "sqs",
945 "Endpoint": "arn:aws:sqs:us-east-1:123456789012:my-queue"
946 }),
947 );
948 let result = prov.create_resource(&sub);
949 assert!(result.is_ok());
950 }
951
952 #[test]
953 fn eventbridge_rule_arn_default_bus_omits_bus_name() {
954 let prov = make_provisioner();
955 let resource = make_resource(
956 "AWS::Events::Rule",
957 "MyRule",
958 serde_json::json!({
959 "Name": "my-rule",
960 "ScheduleExpression": "rate(1 hour)"
961 }),
962 );
963 let result = prov.create_resource(&resource).unwrap();
964 assert_eq!(
966 result.physical_id,
967 "arn:aws:events:us-east-1:123456789012:rule/my-rule"
968 );
969 assert!(!result.physical_id.contains("rule/default/"));
970 }
971
972 #[test]
973 fn eventbridge_rule_arn_custom_bus_includes_bus_name() {
974 let prov = make_provisioner();
975 {
977 let mut state = prov.eventbridge_state.write();
978 state.buses.insert(
979 "custom-bus".to_string(),
980 fakecloud_eventbridge::state::EventBus {
981 name: "custom-bus".to_string(),
982 arn: "arn:aws:events:us-east-1:123456789012:event-bus/custom-bus".to_string(),
983 policy: None,
984 creation_time: Utc::now(),
985 last_modified_time: Utc::now(),
986 description: None,
987 kms_key_identifier: None,
988 dead_letter_config: None,
989 tags: HashMap::new(),
990 },
991 );
992 }
993 let resource = make_resource(
994 "AWS::Events::Rule",
995 "MyRule",
996 serde_json::json!({
997 "Name": "my-rule",
998 "EventBusName": "custom-bus",
999 "ScheduleExpression": "rate(1 hour)"
1000 }),
1001 );
1002 let result = prov.create_resource(&resource).unwrap();
1003 assert_eq!(
1004 result.physical_id,
1005 "arn:aws:events:us-east-1:123456789012:rule/custom-bus/my-rule"
1006 );
1007 }
1008
1009 #[test]
1010 fn eventbridge_rule_rejects_nonexistent_bus() {
1011 let prov = make_provisioner();
1012 let resource = make_resource(
1013 "AWS::Events::Rule",
1014 "MyRule",
1015 serde_json::json!({
1016 "Name": "my-rule",
1017 "EventBusName": "nonexistent-bus",
1018 "ScheduleExpression": "rate(1 hour)"
1019 }),
1020 );
1021 let result = prov.create_resource(&resource);
1022 assert!(result.is_err());
1023 assert!(result.unwrap_err().contains("does not exist"));
1024 }
1025
1026 #[test]
1027 fn custom_resource_requires_service_token() {
1028 let prov = make_provisioner();
1029 let resource = make_resource(
1030 "Custom::MyResource",
1031 "MyCustom",
1032 serde_json::json!({
1033 "Foo": "bar"
1034 }),
1035 );
1036 let result = prov.create_resource(&resource);
1037 assert!(result.is_err());
1038 assert!(
1039 result.unwrap_err().contains("ServiceToken"),
1040 "Should require ServiceToken property"
1041 );
1042 }
1043
1044 #[test]
1045 fn custom_resource_succeeds_without_lambda_delivery() {
1046 let prov = make_provisioner();
1049 let resource = make_resource(
1050 "Custom::MyResource",
1051 "MyCustom",
1052 serde_json::json!({
1053 "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:my-func",
1054 "Foo": "bar"
1055 }),
1056 );
1057 let result = prov.create_resource(&resource);
1058 assert!(result.is_ok());
1059 let sr = result.unwrap();
1060 assert_eq!(sr.logical_id, "MyCustom");
1061 assert_eq!(sr.resource_type, "Custom::MyResource");
1062 assert!(sr.physical_id.starts_with("MyCustom-"));
1063 }
1064
1065 #[test]
1066 fn cloudformation_custom_resource_type_succeeds() {
1067 let prov = make_provisioner();
1068 let resource = make_resource(
1069 "AWS::CloudFormation::CustomResource",
1070 "MyCustom2",
1071 serde_json::json!({
1072 "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:my-func",
1073 "Key": "value"
1074 }),
1075 );
1076 let result = prov.create_resource(&resource);
1077 assert!(result.is_ok());
1078 let sr = result.unwrap();
1079 assert_eq!(sr.resource_type, "AWS::CloudFormation::CustomResource");
1080 }
1081}