1use chrono::Utc;
2use parking_lot::RwLock;
3use std::collections::HashMap;
4use std::sync::Arc;
5use uuid::Uuid;
6
7use fakecloud_core::delivery::DeliveryBus;
8use fakecloud_dynamodb::state::{
9 AttributeDefinition, DynamoTable, KeySchemaElement, ProvisionedThroughput, SharedDynamoDbState,
10};
11use fakecloud_eventbridge::state::{EventRule, SharedEventBridgeState};
12use fakecloud_iam::state::{IamPolicy, IamRole, PolicyVersion, SharedIamState};
13use fakecloud_logs::state::SharedLogsState;
14use fakecloud_s3::state::{S3Bucket, SharedS3State};
15use fakecloud_sns::state::{SharedSnsState, SnsSubscription, SnsTopic};
16use fakecloud_sqs::state::{SharedSqsState, SqsQueue};
17use fakecloud_ssm::state::{SharedSsmState, SsmParameter};
18
19use crate::state::StackResource;
20use crate::template::ResourceDefinition;
21
22pub struct ResourceProvisioner {
24 pub sqs_state: SharedSqsState,
25 pub sns_state: SharedSnsState,
26 pub ssm_state: SharedSsmState,
27 pub iam_state: SharedIamState,
28 pub s3_state: SharedS3State,
29 pub eventbridge_state: SharedEventBridgeState,
30 pub dynamodb_state: SharedDynamoDbState,
31 pub logs_state: SharedLogsState,
32 pub delivery: Arc<DeliveryBus>,
33 pub account_id: String,
34 pub region: String,
35 pub stack_id: String,
36}
37
38impl ResourceProvisioner {
39 pub fn create_resource(&self, resource: &ResourceDefinition) -> Result<StackResource, String> {
41 let result = match resource.resource_type.as_str() {
42 "AWS::SQS::Queue" => self.create_sqs_queue(resource),
43 "AWS::SNS::Topic" => self.create_sns_topic(resource),
44 "AWS::SNS::Subscription" => self.create_sns_subscription(resource),
45 "AWS::SSM::Parameter" => self.create_ssm_parameter(resource),
46 "AWS::IAM::Role" => self.create_iam_role(resource),
47 "AWS::IAM::Policy" => self.create_iam_policy(resource),
48 "AWS::S3::Bucket" => self.create_s3_bucket(resource),
49 "AWS::Events::Rule" => self.create_eventbridge_rule(resource),
50 "AWS::DynamoDB::Table" => self.create_dynamodb_table(resource),
51 "AWS::Logs::LogGroup" => self.create_log_group(resource),
52 t if t.starts_with("Custom::") || t == "AWS::CloudFormation::CustomResource" => {
53 self.create_custom_resource(resource)
54 }
55 other => Err(format!("Unsupported resource type: {other}")),
56 };
57
58 let is_custom = resource.resource_type.starts_with("Custom::")
59 || resource.resource_type == "AWS::CloudFormation::CustomResource";
60 let service_token = if is_custom {
61 resource
62 .properties
63 .get("ServiceToken")
64 .and_then(|v| v.as_str())
65 .map(|s| s.to_string())
66 } else {
67 None
68 };
69
70 result.map(|physical_id| StackResource {
71 logical_id: resource.logical_id.clone(),
72 physical_id,
73 resource_type: resource.resource_type.clone(),
74 status: "CREATE_COMPLETE".to_string(),
75 service_token,
76 })
77 }
78
79 pub fn delete_resource(&self, resource: &StackResource) -> Result<(), String> {
81 match resource.resource_type.as_str() {
82 "AWS::SQS::Queue" => self.delete_sqs_queue(&resource.physical_id),
83 "AWS::SNS::Topic" => self.delete_sns_topic(&resource.physical_id),
84 "AWS::SNS::Subscription" => self.delete_sns_subscription(&resource.physical_id),
85 "AWS::SSM::Parameter" => self.delete_ssm_parameter(&resource.physical_id),
86 "AWS::IAM::Role" => self.delete_iam_role(&resource.physical_id),
87 "AWS::IAM::Policy" => self.delete_iam_policy(&resource.physical_id),
88 "AWS::S3::Bucket" => self.delete_s3_bucket(&resource.physical_id),
89 "AWS::Events::Rule" => self.delete_eventbridge_rule(&resource.physical_id),
90 "AWS::DynamoDB::Table" => self.delete_dynamodb_table(&resource.physical_id),
91 "AWS::Logs::LogGroup" => self.delete_log_group(&resource.physical_id),
92 t if t.starts_with("Custom::") || t == "AWS::CloudFormation::CustomResource" => {
93 self.delete_custom_resource(resource)
94 }
95 other => Err(format!("Unsupported resource type: {other}")),
96 }
97 }
98
99 fn create_sqs_queue(&self, resource: &ResourceDefinition) -> Result<String, String> {
102 let props = &resource.properties;
103 let queue_name = props
104 .get("QueueName")
105 .and_then(|v| v.as_str())
106 .unwrap_or(&resource.logical_id);
107
108 let mut state = self.sqs_state.write();
109 let queue_url = format!("{}/{}/{}", state.endpoint, state.account_id, queue_name);
110 let arn = format!(
111 "arn:aws:sqs:{}:{}:{}",
112 state.region, state.account_id, queue_name
113 );
114
115 let is_fifo = queue_name.ends_with(".fifo");
116 let mut attributes = HashMap::new();
117 if let Some(obj) = props.as_object() {
118 for (k, v) in obj {
119 if k != "QueueName" {
120 if let Some(s) = v.as_str() {
121 attributes.insert(k.clone(), s.to_string());
122 } else if let Some(n) = v.as_i64() {
123 attributes.insert(k.clone(), n.to_string());
124 }
125 }
126 }
127 }
128
129 let queue = SqsQueue {
130 queue_name: queue_name.to_string(),
131 queue_url: queue_url.clone(),
132 arn,
133 created_at: Utc::now(),
134 messages: std::collections::VecDeque::new(),
135 inflight: Vec::new(),
136 attributes,
137 is_fifo,
138 dedup_cache: HashMap::new(),
139 redrive_policy: None,
140 tags: HashMap::new(),
141 next_sequence_number: 0,
142 permission_labels: Vec::new(),
143 receipt_handle_map: HashMap::new(),
144 };
145
146 state
147 .name_to_url
148 .insert(queue_name.to_string(), queue_url.clone());
149 state.queues.insert(queue_url.clone(), queue);
150
151 Ok(queue_url)
152 }
153
154 fn delete_sqs_queue(&self, physical_id: &str) -> Result<(), String> {
155 let mut state = self.sqs_state.write();
156 if let Some(queue) = state.queues.remove(physical_id) {
157 state.name_to_url.remove(&queue.queue_name);
158 }
159 Ok(())
160 }
161
162 fn create_sns_topic(&self, resource: &ResourceDefinition) -> Result<String, String> {
165 let props = &resource.properties;
166 let topic_name = props
167 .get("TopicName")
168 .and_then(|v| v.as_str())
169 .unwrap_or(&resource.logical_id);
170
171 let mut state = self.sns_state.write();
172 let topic_arn = format!(
173 "arn:aws:sns:{}:{}:{}",
174 state.region, state.account_id, topic_name
175 );
176
177 let topic = SnsTopic {
178 topic_arn: topic_arn.clone(),
179 name: topic_name.to_string(),
180 attributes: HashMap::new(),
181 tags: Vec::new(),
182 is_fifo: topic_name.ends_with(".fifo"),
183 created_at: Utc::now(),
184 };
185
186 state.topics.insert(topic_arn.clone(), topic);
187 Ok(topic_arn)
188 }
189
190 fn delete_sns_topic(&self, physical_id: &str) -> Result<(), String> {
191 let mut state = self.sns_state.write();
192 state.topics.remove(physical_id);
193 state
195 .subscriptions
196 .retain(|_, sub| sub.topic_arn != physical_id);
197 Ok(())
198 }
199
200 fn create_sns_subscription(&self, resource: &ResourceDefinition) -> Result<String, String> {
203 let props = &resource.properties;
204 let topic_arn = props
205 .get("TopicArn")
206 .and_then(|v| v.as_str())
207 .ok_or("SNS Subscription requires TopicArn")?;
208 let protocol = props
209 .get("Protocol")
210 .and_then(|v| v.as_str())
211 .ok_or("SNS Subscription requires Protocol")?;
212 let endpoint = props
213 .get("Endpoint")
214 .and_then(|v| v.as_str())
215 .ok_or("SNS Subscription requires Endpoint")?;
216
217 let mut state = self.sns_state.write();
218
219 if !state.topics.contains_key(topic_arn) {
221 return Err(format!("Topic ARN does not exist: {topic_arn}"));
222 }
223
224 let sub_arn = format!("{}:{}", topic_arn, Uuid::new_v4());
225
226 let subscription = SnsSubscription {
227 subscription_arn: sub_arn.clone(),
228 topic_arn: topic_arn.to_string(),
229 protocol: protocol.to_string(),
230 endpoint: endpoint.to_string(),
231 owner: state.account_id.clone(),
232 attributes: HashMap::new(),
233 confirmed: true,
234 confirmation_token: None,
235 };
236
237 state.subscriptions.insert(sub_arn.clone(), subscription);
238 Ok(sub_arn)
239 }
240
241 fn delete_sns_subscription(&self, physical_id: &str) -> Result<(), String> {
242 let mut state = self.sns_state.write();
243 state.subscriptions.remove(physical_id);
244 Ok(())
245 }
246
247 fn create_ssm_parameter(&self, resource: &ResourceDefinition) -> Result<String, String> {
250 let props = &resource.properties;
251 let name = props
252 .get("Name")
253 .and_then(|v| v.as_str())
254 .ok_or("SSM Parameter requires Name")?;
255 let value = props
256 .get("Value")
257 .and_then(|v| v.as_str())
258 .ok_or("SSM Parameter requires Value")?;
259 let param_type = props
260 .get("Type")
261 .and_then(|v| v.as_str())
262 .unwrap_or("String");
263
264 let mut state = self.ssm_state.write();
265 let arn = format!(
266 "arn:aws:ssm:{}:{}:parameter{}",
267 state.region,
268 state.account_id,
269 if name.starts_with('/') {
270 name.to_string()
271 } else {
272 format!("/{name}")
273 }
274 );
275
276 let parameter = SsmParameter {
277 name: name.to_string(),
278 value: value.to_string(),
279 param_type: param_type.to_string(),
280 version: 1,
281 arn: arn.clone(),
282 last_modified: Utc::now(),
283 history: Vec::new(),
284 tags: HashMap::new(),
285 labels: HashMap::new(),
286 description: props
287 .get("Description")
288 .and_then(|v| v.as_str())
289 .map(|s| s.to_string()),
290 allowed_pattern: None,
291 key_id: None,
292 data_type: "text".to_string(),
293 tier: "Standard".to_string(),
294 policies: None,
295 };
296
297 state.parameters.insert(name.to_string(), parameter);
298 Ok(name.to_string())
299 }
300
301 fn delete_ssm_parameter(&self, physical_id: &str) -> Result<(), String> {
302 let mut state = self.ssm_state.write();
303 state.parameters.remove(physical_id);
304 Ok(())
305 }
306
307 fn create_iam_role(&self, resource: &ResourceDefinition) -> Result<String, String> {
310 let props = &resource.properties;
311 let role_name = props
312 .get("RoleName")
313 .and_then(|v| v.as_str())
314 .unwrap_or(&resource.logical_id);
315
316 let assume_role_policy = props
317 .get("AssumeRolePolicyDocument")
318 .map(|v| {
319 if v.is_string() {
320 v.as_str().unwrap().to_string()
321 } else {
322 serde_json::to_string(v).unwrap_or_default()
323 }
324 })
325 .unwrap_or_default();
326
327 let path = props.get("Path").and_then(|v| v.as_str()).unwrap_or("/");
328
329 let mut state = self.iam_state.write();
330 let role_id = format!(
331 "FKIA{}",
332 &Uuid::new_v4().to_string().replace('-', "").to_uppercase()[..16]
333 );
334 let arn = format!(
335 "arn:aws:iam::{}:role{}{}",
336 state.account_id,
337 if path == "/" { "/" } else { path },
338 role_name
339 );
340
341 let role = IamRole {
342 role_name: role_name.to_string(),
343 role_id,
344 arn: arn.clone(),
345 path: path.to_string(),
346 assume_role_policy_document: assume_role_policy,
347 created_at: Utc::now(),
348 description: props
349 .get("Description")
350 .and_then(|v| v.as_str())
351 .map(|s| s.to_string()),
352 max_session_duration: 3600,
353 tags: Vec::new(),
354 permissions_boundary: None,
355 };
356
357 state.roles.insert(role_name.to_string(), role);
358 Ok(arn)
359 }
360
361 fn delete_iam_role(&self, physical_id: &str) -> Result<(), String> {
362 let mut state = self.iam_state.write();
363 let role_name = state
365 .roles
366 .iter()
367 .find(|(_, r)| r.arn == physical_id)
368 .map(|(name, _)| name.clone());
369 if let Some(name) = role_name {
370 state.roles.remove(&name);
371 state.role_policies.remove(&name);
372 state.role_inline_policies.remove(&name);
373 }
374 Ok(())
375 }
376
377 fn create_iam_policy(&self, resource: &ResourceDefinition) -> Result<String, String> {
380 let props = &resource.properties;
381 let policy_name = props
382 .get("PolicyName")
383 .and_then(|v| v.as_str())
384 .unwrap_or(&resource.logical_id);
385
386 let policy_document = props
387 .get("PolicyDocument")
388 .map(|v| {
389 if v.is_string() {
390 v.as_str().unwrap().to_string()
391 } else {
392 serde_json::to_string(v).unwrap_or_default()
393 }
394 })
395 .unwrap_or_default();
396
397 let path = props.get("Path").and_then(|v| v.as_str()).unwrap_or("/");
398
399 let mut state = self.iam_state.write();
400 let policy_id = format!(
401 "FSIA{}",
402 &Uuid::new_v4().to_string().replace('-', "").to_uppercase()[..16]
403 );
404 let arn = format!(
405 "arn:aws:iam::{}:policy{}{}",
406 state.account_id,
407 if path == "/" { "/" } else { path },
408 policy_name
409 );
410
411 let now = Utc::now();
412 let policy = IamPolicy {
413 policy_name: policy_name.to_string(),
414 policy_id,
415 arn: arn.clone(),
416 path: path.to_string(),
417 description: props
418 .get("Description")
419 .and_then(|v| v.as_str())
420 .unwrap_or("")
421 .to_string(),
422 created_at: now,
423 tags: Vec::new(),
424 default_version_id: "v1".to_string(),
425 versions: vec![PolicyVersion {
426 version_id: "v1".to_string(),
427 document: policy_document,
428 is_default: true,
429 created_at: now,
430 }],
431 next_version_num: 2,
432 attachment_count: 0,
433 };
434
435 state.policies.insert(arn.clone(), policy);
436 Ok(arn)
437 }
438
439 fn delete_iam_policy(&self, physical_id: &str) -> Result<(), String> {
440 let mut state = self.iam_state.write();
441 state.policies.remove(physical_id);
442 Ok(())
443 }
444
445 fn create_s3_bucket(&self, resource: &ResourceDefinition) -> Result<String, String> {
448 let props = &resource.properties;
449 let bucket_name = props
450 .get("BucketName")
451 .and_then(|v| v.as_str())
452 .unwrap_or(&resource.logical_id);
453
454 let mut state = self.s3_state.write();
455 let bucket = S3Bucket::new(bucket_name, &state.region, &state.account_id);
456 state.buckets.insert(bucket_name.to_string(), bucket);
457 Ok(bucket_name.to_string())
458 }
459
460 fn delete_s3_bucket(&self, physical_id: &str) -> Result<(), String> {
461 let mut state = self.s3_state.write();
462 state.buckets.remove(physical_id);
463 Ok(())
464 }
465
466 fn create_eventbridge_rule(&self, resource: &ResourceDefinition) -> Result<String, String> {
469 let props = &resource.properties;
470 let rule_name = props
471 .get("Name")
472 .and_then(|v| v.as_str())
473 .unwrap_or(&resource.logical_id);
474 let event_bus_name = props
475 .get("EventBusName")
476 .and_then(|v| v.as_str())
477 .unwrap_or("default");
478
479 let mut state = self.eventbridge_state.write();
480
481 if !state.buses.contains_key(event_bus_name) {
483 return Err(format!("Event bus does not exist: {event_bus_name}"));
484 }
485
486 let arn = if event_bus_name == "default" {
487 format!(
488 "arn:aws:events:{}:{}:rule/{}",
489 state.region, state.account_id, rule_name
490 )
491 } else {
492 format!(
493 "arn:aws:events:{}:{}:rule/{}/{}",
494 state.region, state.account_id, event_bus_name, rule_name
495 )
496 };
497
498 let rule = EventRule {
499 name: rule_name.to_string(),
500 arn: arn.clone(),
501 event_bus_name: event_bus_name.to_string(),
502 event_pattern: props.get("EventPattern").map(|v| {
503 if v.is_string() {
504 v.as_str().unwrap().to_string()
505 } else {
506 serde_json::to_string(v).unwrap_or_default()
507 }
508 }),
509 schedule_expression: props
510 .get("ScheduleExpression")
511 .and_then(|v| v.as_str())
512 .map(|s| s.to_string()),
513 state: props
514 .get("State")
515 .and_then(|v| v.as_str())
516 .unwrap_or("ENABLED")
517 .to_string(),
518 description: props
519 .get("Description")
520 .and_then(|v| v.as_str())
521 .map(|s| s.to_string()),
522 role_arn: props
523 .get("RoleArn")
524 .and_then(|v| v.as_str())
525 .map(|s| s.to_string()),
526 managed_by: None,
527 created_by: None,
528 targets: Vec::new(),
529 tags: HashMap::new(),
530 last_fired: None,
531 };
532
533 state
534 .rules
535 .insert((event_bus_name.to_string(), rule_name.to_string()), rule);
536 Ok(arn)
537 }
538
539 fn delete_eventbridge_rule(&self, physical_id: &str) -> Result<(), String> {
540 let mut state = self.eventbridge_state.write();
541 let key = state
543 .rules
544 .iter()
545 .find(|(_, r)| r.arn == physical_id)
546 .map(|(k, _)| k.clone());
547 if let Some(k) = key {
548 state.rules.remove(&k);
549 }
550 Ok(())
551 }
552
553 fn create_dynamodb_table(&self, resource: &ResourceDefinition) -> Result<String, String> {
556 let props = &resource.properties;
557 let table_name = props
558 .get("TableName")
559 .and_then(|v| v.as_str())
560 .unwrap_or(&resource.logical_id);
561
562 let mut key_schema = Vec::new();
563 if let Some(ks) = props.get("KeySchema").and_then(|v| v.as_array()) {
564 for item in ks {
565 let attr_name = item
566 .get("AttributeName")
567 .and_then(|v| v.as_str())
568 .unwrap_or("")
569 .to_string();
570 let key_type = item
571 .get("KeyType")
572 .and_then(|v| v.as_str())
573 .unwrap_or("HASH")
574 .to_string();
575 key_schema.push(KeySchemaElement {
576 attribute_name: attr_name,
577 key_type,
578 });
579 }
580 }
581
582 let mut attribute_definitions = Vec::new();
583 if let Some(defs) = props.get("AttributeDefinitions").and_then(|v| v.as_array()) {
584 for item in defs {
585 let attr_name = item
586 .get("AttributeName")
587 .and_then(|v| v.as_str())
588 .unwrap_or("")
589 .to_string();
590 let attr_type = item
591 .get("AttributeType")
592 .and_then(|v| v.as_str())
593 .unwrap_or("S")
594 .to_string();
595 attribute_definitions.push(AttributeDefinition {
596 attribute_name: attr_name,
597 attribute_type: attr_type,
598 });
599 }
600 }
601
602 let billing_mode = props
603 .get("BillingMode")
604 .and_then(|v| v.as_str())
605 .unwrap_or("PAY_PER_REQUEST")
606 .to_string();
607
608 let provisioned_throughput = if billing_mode == "PROVISIONED" {
609 if let Some(pt) = props.get("ProvisionedThroughput") {
610 ProvisionedThroughput {
611 read_capacity_units: pt
612 .get("ReadCapacityUnits")
613 .and_then(|v| v.as_i64())
614 .unwrap_or(5),
615 write_capacity_units: pt
616 .get("WriteCapacityUnits")
617 .and_then(|v| v.as_i64())
618 .unwrap_or(5),
619 }
620 } else {
621 ProvisionedThroughput {
622 read_capacity_units: 5,
623 write_capacity_units: 5,
624 }
625 }
626 } else {
627 ProvisionedThroughput {
628 read_capacity_units: 0,
629 write_capacity_units: 0,
630 }
631 };
632
633 let (stream_enabled, stream_view_type) =
635 if let Some(stream_spec) = props.get("StreamSpecification") {
636 let view_type = stream_spec
637 .get("StreamViewType")
638 .and_then(|v| v.as_str())
639 .map(|s| s.to_string());
640 let enabled = stream_spec
641 .get("StreamEnabled")
642 .and_then(|v| v.as_bool().or_else(|| v.as_str().map(|s| s == "true")))
643 .unwrap_or(view_type.is_some());
645 (enabled, view_type)
646 } else {
647 (false, None)
648 };
649
650 let mut state = self.dynamodb_state.write();
651 let arn = format!(
652 "arn:aws:dynamodb:{}:{}:table/{}",
653 state.region, state.account_id, table_name
654 );
655
656 let stream_arn = if stream_enabled {
657 Some(format!(
658 "{}/stream/{}",
659 arn,
660 Utc::now().format("%Y-%m-%dT%H:%M:%S.%3f")
661 ))
662 } else {
663 None
664 };
665
666 let table = DynamoTable {
667 name: table_name.to_string(),
668 arn: arn.clone(),
669 key_schema,
670 attribute_definitions,
671 provisioned_throughput,
672 items: Vec::new(),
673 gsi: Vec::new(),
674 lsi: Vec::new(),
675 tags: HashMap::new(),
676 created_at: Utc::now(),
677 status: "ACTIVE".to_string(),
678 item_count: 0,
679 size_bytes: 0,
680 billing_mode,
681 ttl_attribute: None,
682 ttl_enabled: false,
683 resource_policy: None,
684 pitr_enabled: false,
685 kinesis_destinations: Vec::new(),
686 contributor_insights_status: "DISABLED".to_string(),
687 contributor_insights_counters: HashMap::new(),
688 stream_enabled,
689 stream_view_type,
690 stream_arn,
691 stream_records: Arc::new(RwLock::new(Vec::new())),
692 sse_type: None,
693 sse_kms_key_arn: None,
694 };
695
696 state.tables.insert(table_name.to_string(), table);
697 Ok(arn)
698 }
699
700 fn delete_dynamodb_table(&self, physical_id: &str) -> Result<(), String> {
701 let mut state = self.dynamodb_state.write();
702 let table_name = state
704 .tables
705 .iter()
706 .find(|(_, t)| t.arn == physical_id)
707 .map(|(name, _)| name.clone());
708 if let Some(name) = table_name {
709 state.tables.remove(&name);
710 }
711 Ok(())
712 }
713
714 fn create_log_group(&self, resource: &ResourceDefinition) -> Result<String, String> {
717 let props = &resource.properties;
718 let log_group_name = props
719 .get("LogGroupName")
720 .and_then(|v| v.as_str())
721 .unwrap_or(&resource.logical_id);
722
723 let retention_in_days = props
724 .get("RetentionInDays")
725 .and_then(|v| v.as_i64())
726 .map(|v| v as i32);
727
728 let mut state = self.logs_state.write();
729 let arn = format!(
730 "arn:aws:logs:{}:{}:log-group:{}:*",
731 state.region, state.account_id, log_group_name
732 );
733
734 let log_group = fakecloud_logs::state::LogGroup {
735 name: log_group_name.to_string(),
736 arn: arn.clone(),
737 creation_time: Utc::now().timestamp_millis(),
738 retention_in_days,
739 kms_key_id: None,
740 stored_bytes: 0,
741 log_streams: HashMap::new(),
742 tags: HashMap::new(),
743 subscription_filters: Vec::new(),
744 data_protection_policy: None,
745 index_policies: Vec::new(),
746 transformer: None,
747 deletion_protection: false,
748 };
749
750 state
751 .log_groups
752 .insert(log_group_name.to_string(), log_group);
753 Ok(arn)
754 }
755
756 fn delete_log_group(&self, physical_id: &str) -> Result<(), String> {
757 let mut state = self.logs_state.write();
758 let name = state
760 .log_groups
761 .iter()
762 .find(|(_, g)| g.arn == physical_id)
763 .map(|(name, _)| name.clone());
764 if let Some(name) = name {
765 state.log_groups.remove(&name);
766 }
767 Ok(())
768 }
769
770 fn invoke_lambda_sync(&self, function_arn: &str, payload: &str) -> Result<(), String> {
774 let delivery = self.delivery.clone();
775 let function_arn = function_arn.to_string();
776 let payload = payload.to_string();
777 std::thread::scope(|s| {
778 s.spawn(|| {
779 let rt = tokio::runtime::Builder::new_current_thread()
780 .enable_all()
781 .build()
782 .map_err(|e| format!("Failed to create runtime: {e}"))?;
783 rt.block_on(async {
784 match delivery.invoke_lambda(&function_arn, &payload).await {
785 Some(Ok(_)) => {
786 tracing::info!(
787 "Custom resource Lambda {} invoked successfully",
788 function_arn
789 );
790 Ok(())
791 }
792 Some(Err(e)) => {
793 tracing::warn!(
794 "Custom resource Lambda {} invocation failed: {e}",
795 function_arn
796 );
797 Err(format!("Lambda invocation failed: {e}"))
798 }
799 None => {
800 tracing::warn!(
801 "No Lambda delivery configured; skipping custom resource invocation for {}",
802 function_arn
803 );
804 Ok(())
805 }
806 }
807 })
808 })
809 .join()
810 .map_err(|_| "Lambda invocation thread panicked".to_string())?
811 })
812 }
813
814 fn create_custom_resource(&self, resource: &ResourceDefinition) -> Result<String, String> {
815 let props = &resource.properties;
816 let service_token = props
817 .get("ServiceToken")
818 .and_then(|v| v.as_str())
819 .ok_or("Custom resource requires ServiceToken property")?;
820
821 let request_id = Uuid::new_v4().to_string();
822
823 let event = serde_json::json!({
825 "RequestType": "Create",
826 "ServiceToken": service_token,
827 "StackId": self.stack_id,
828 "RequestId": request_id,
829 "ResourceType": resource.resource_type,
830 "LogicalResourceId": resource.logical_id,
831 "ResourceProperties": props,
832 });
833
834 let payload = serde_json::to_string(&event).map_err(|e| e.to_string())?;
835 self.invoke_lambda_sync(service_token, &payload)?;
836
837 let physical_id = format!("{}-{}", resource.logical_id, &request_id[..8]);
840 Ok(physical_id)
841 }
842
843 fn delete_custom_resource(&self, resource: &StackResource) -> Result<(), String> {
844 let service_token = match &resource.service_token {
845 Some(token) => token.clone(),
846 None => {
847 return Ok(());
849 }
850 };
851
852 let request_id = Uuid::new_v4().to_string();
853
854 let event = serde_json::json!({
855 "RequestType": "Delete",
856 "ServiceToken": service_token,
857 "StackId": self.stack_id,
858 "RequestId": request_id,
859 "ResourceType": resource.resource_type,
860 "LogicalResourceId": resource.logical_id,
861 "PhysicalResourceId": resource.physical_id,
862 });
863
864 let payload = serde_json::to_string(&event).map_err(|e| e.to_string())?;
865
866 if let Err(e) = self.invoke_lambda_sync(&service_token, &payload) {
868 tracing::warn!(
869 "Custom resource delete Lambda invocation failed for {}: {e}",
870 resource.logical_id
871 );
872 }
873 Ok(())
874 }
875}
876
877#[cfg(test)]
878mod tests {
879 use super::*;
880 use parking_lot::RwLock;
881
882 fn make_provisioner() -> ResourceProvisioner {
883 ResourceProvisioner {
884 sqs_state: Arc::new(RwLock::new(fakecloud_sqs::state::SqsState::new(
885 "123456789012",
886 "us-east-1",
887 "http://localhost:4566",
888 ))),
889 sns_state: Arc::new(RwLock::new(fakecloud_sns::state::SnsState::new(
890 "123456789012",
891 "us-east-1",
892 "http://localhost:4566",
893 ))),
894 ssm_state: Arc::new(RwLock::new(fakecloud_ssm::state::SsmState::new(
895 "123456789012",
896 "us-east-1",
897 ))),
898 iam_state: Arc::new(RwLock::new(fakecloud_iam::state::IamState::new(
899 "123456789012",
900 ))),
901 s3_state: Arc::new(RwLock::new(fakecloud_s3::state::S3State::new(
902 "123456789012",
903 "us-east-1",
904 ))),
905 eventbridge_state: Arc::new(RwLock::new(
906 fakecloud_eventbridge::state::EventBridgeState::new("123456789012", "us-east-1"),
907 )),
908 dynamodb_state: Arc::new(RwLock::new(fakecloud_dynamodb::state::DynamoDbState::new(
909 "123456789012",
910 "us-east-1",
911 ))),
912 logs_state: Arc::new(RwLock::new(fakecloud_logs::state::LogsState::new(
913 "123456789012",
914 "us-east-1",
915 ))),
916 delivery: Arc::new(DeliveryBus::new()),
917 account_id: "123456789012".to_string(),
918 region: "us-east-1".to_string(),
919 stack_id: "arn:aws:cloudformation:us-east-1:123456789012:stack/test/00000000-0000-0000-0000-000000000000".to_string(),
920 }
921 }
922
923 fn make_resource(
924 resource_type: &str,
925 logical_id: &str,
926 props: serde_json::Value,
927 ) -> ResourceDefinition {
928 ResourceDefinition {
929 logical_id: logical_id.to_string(),
930 resource_type: resource_type.to_string(),
931 properties: props,
932 }
933 }
934
935 #[test]
936 fn sns_subscription_rejects_nonexistent_topic() {
937 let prov = make_provisioner();
938 let resource = make_resource(
939 "AWS::SNS::Subscription",
940 "MySub",
941 serde_json::json!({
942 "TopicArn": "arn:aws:sns:us-east-1:123456789012:NonExistent",
943 "Protocol": "sqs",
944 "Endpoint": "arn:aws:sqs:us-east-1:123456789012:my-queue"
945 }),
946 );
947 let result = prov.create_resource(&resource);
948 assert!(result.is_err());
949 assert!(result.unwrap_err().contains("does not exist"));
950 }
951
952 #[test]
953 fn sns_subscription_succeeds_when_topic_exists() {
954 let prov = make_provisioner();
955 let topic = make_resource(
957 "AWS::SNS::Topic",
958 "MyTopic",
959 serde_json::json!({ "TopicName": "my-topic" }),
960 );
961 let topic_result = prov.create_resource(&topic);
962 assert!(topic_result.is_ok());
963 let topic_arn = topic_result.unwrap().physical_id;
964
965 let sub = make_resource(
967 "AWS::SNS::Subscription",
968 "MySub",
969 serde_json::json!({
970 "TopicArn": topic_arn,
971 "Protocol": "sqs",
972 "Endpoint": "arn:aws:sqs:us-east-1:123456789012:my-queue"
973 }),
974 );
975 let result = prov.create_resource(&sub);
976 assert!(result.is_ok());
977 }
978
979 #[test]
980 fn eventbridge_rule_arn_default_bus_omits_bus_name() {
981 let prov = make_provisioner();
982 let resource = make_resource(
983 "AWS::Events::Rule",
984 "MyRule",
985 serde_json::json!({
986 "Name": "my-rule",
987 "ScheduleExpression": "rate(1 hour)"
988 }),
989 );
990 let result = prov.create_resource(&resource).unwrap();
991 assert_eq!(
993 result.physical_id,
994 "arn:aws:events:us-east-1:123456789012:rule/my-rule"
995 );
996 assert!(!result.physical_id.contains("rule/default/"));
997 }
998
999 #[test]
1000 fn eventbridge_rule_arn_custom_bus_includes_bus_name() {
1001 let prov = make_provisioner();
1002 {
1004 let mut state = prov.eventbridge_state.write();
1005 state.buses.insert(
1006 "custom-bus".to_string(),
1007 fakecloud_eventbridge::state::EventBus {
1008 name: "custom-bus".to_string(),
1009 arn: "arn:aws:events:us-east-1:123456789012:event-bus/custom-bus".to_string(),
1010 policy: None,
1011 creation_time: Utc::now(),
1012 last_modified_time: Utc::now(),
1013 description: None,
1014 kms_key_identifier: None,
1015 dead_letter_config: None,
1016 tags: HashMap::new(),
1017 },
1018 );
1019 }
1020 let resource = make_resource(
1021 "AWS::Events::Rule",
1022 "MyRule",
1023 serde_json::json!({
1024 "Name": "my-rule",
1025 "EventBusName": "custom-bus",
1026 "ScheduleExpression": "rate(1 hour)"
1027 }),
1028 );
1029 let result = prov.create_resource(&resource).unwrap();
1030 assert_eq!(
1031 result.physical_id,
1032 "arn:aws:events:us-east-1:123456789012:rule/custom-bus/my-rule"
1033 );
1034 }
1035
1036 #[test]
1037 fn eventbridge_rule_rejects_nonexistent_bus() {
1038 let prov = make_provisioner();
1039 let resource = make_resource(
1040 "AWS::Events::Rule",
1041 "MyRule",
1042 serde_json::json!({
1043 "Name": "my-rule",
1044 "EventBusName": "nonexistent-bus",
1045 "ScheduleExpression": "rate(1 hour)"
1046 }),
1047 );
1048 let result = prov.create_resource(&resource);
1049 assert!(result.is_err());
1050 assert!(result.unwrap_err().contains("does not exist"));
1051 }
1052
1053 #[test]
1054 fn custom_resource_requires_service_token() {
1055 let prov = make_provisioner();
1056 let resource = make_resource(
1057 "Custom::MyResource",
1058 "MyCustom",
1059 serde_json::json!({
1060 "Foo": "bar"
1061 }),
1062 );
1063 let result = prov.create_resource(&resource);
1064 assert!(result.is_err());
1065 assert!(
1066 result.unwrap_err().contains("ServiceToken"),
1067 "Should require ServiceToken property"
1068 );
1069 }
1070
1071 #[test]
1072 fn custom_resource_succeeds_without_lambda_delivery() {
1073 let prov = make_provisioner();
1076 let resource = make_resource(
1077 "Custom::MyResource",
1078 "MyCustom",
1079 serde_json::json!({
1080 "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:my-func",
1081 "Foo": "bar"
1082 }),
1083 );
1084 let result = prov.create_resource(&resource);
1085 assert!(result.is_ok());
1086 let sr = result.unwrap();
1087 assert_eq!(sr.logical_id, "MyCustom");
1088 assert_eq!(sr.resource_type, "Custom::MyResource");
1089 assert!(sr.physical_id.starts_with("MyCustom-"));
1090 }
1091
1092 #[test]
1093 fn cloudformation_custom_resource_type_succeeds() {
1094 let prov = make_provisioner();
1095 let resource = make_resource(
1096 "AWS::CloudFormation::CustomResource",
1097 "MyCustom2",
1098 serde_json::json!({
1099 "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:my-func",
1100 "Key": "value"
1101 }),
1102 );
1103 let result = prov.create_resource(&resource);
1104 assert!(result.is_ok());
1105 let sr = result.unwrap();
1106 assert_eq!(sr.resource_type, "AWS::CloudFormation::CustomResource");
1107 }
1108}