1use chrono::Utc;
2use parking_lot::RwLock;
3use std::collections::HashMap;
4use std::sync::Arc;
5use uuid::Uuid;
6
7use fakecloud_core::delivery::DeliveryBus;
8use fakecloud_dynamodb::state::{
9 AttributeDefinition, DynamoTable, KeySchemaElement, ProvisionedThroughput, SharedDynamoDbState,
10};
11use fakecloud_eventbridge::state::{EventRule, SharedEventBridgeState};
12use fakecloud_iam::state::{IamPolicy, IamRole, PolicyVersion, SharedIamState};
13use fakecloud_logs::state::SharedLogsState;
14use fakecloud_s3::state::{S3Bucket, SharedS3State};
15use fakecloud_sns::state::{SharedSnsState, SnsSubscription, SnsTopic};
16use fakecloud_sqs::state::{SharedSqsState, SqsQueue};
17use fakecloud_ssm::state::{SharedSsmState, SsmParameter};
18
19use crate::state::StackResource;
20use crate::template::ResourceDefinition;
21
22pub struct ResourceProvisioner {
24 pub sqs_state: SharedSqsState,
25 pub sns_state: SharedSnsState,
26 pub ssm_state: SharedSsmState,
27 pub iam_state: SharedIamState,
28 pub s3_state: SharedS3State,
29 pub eventbridge_state: SharedEventBridgeState,
30 pub dynamodb_state: SharedDynamoDbState,
31 pub logs_state: SharedLogsState,
32 pub delivery: Arc<DeliveryBus>,
33 pub account_id: String,
34 pub region: String,
35 pub stack_id: String,
36}
37
38impl ResourceProvisioner {
39 pub fn create_resource(&self, resource: &ResourceDefinition) -> Result<StackResource, String> {
41 let result = match resource.resource_type.as_str() {
42 "AWS::SQS::Queue" => self.create_sqs_queue(resource),
43 "AWS::SNS::Topic" => self.create_sns_topic(resource),
44 "AWS::SNS::Subscription" => self.create_sns_subscription(resource),
45 "AWS::SSM::Parameter" => self.create_ssm_parameter(resource),
46 "AWS::IAM::Role" => self.create_iam_role(resource),
47 "AWS::IAM::Policy" => self.create_iam_policy(resource),
48 "AWS::S3::Bucket" => self.create_s3_bucket(resource),
49 "AWS::Events::Rule" => self.create_eventbridge_rule(resource),
50 "AWS::DynamoDB::Table" => self.create_dynamodb_table(resource),
51 "AWS::Logs::LogGroup" => self.create_log_group(resource),
52 t if t.starts_with("Custom::") || t == "AWS::CloudFormation::CustomResource" => {
53 self.create_custom_resource(resource)
54 }
55 other => Err(format!("Unsupported resource type: {other}")),
56 };
57
58 let is_custom = resource.resource_type.starts_with("Custom::")
59 || resource.resource_type == "AWS::CloudFormation::CustomResource";
60 let service_token = if is_custom {
61 resource
62 .properties
63 .get("ServiceToken")
64 .and_then(|v| v.as_str())
65 .map(|s| s.to_string())
66 } else {
67 None
68 };
69
70 result.map(|physical_id| StackResource {
71 logical_id: resource.logical_id.clone(),
72 physical_id,
73 resource_type: resource.resource_type.clone(),
74 status: "CREATE_COMPLETE".to_string(),
75 service_token,
76 })
77 }
78
79 pub fn delete_resource(&self, resource: &StackResource) -> Result<(), String> {
81 match resource.resource_type.as_str() {
82 "AWS::SQS::Queue" => self.delete_sqs_queue(&resource.physical_id),
83 "AWS::SNS::Topic" => self.delete_sns_topic(&resource.physical_id),
84 "AWS::SNS::Subscription" => self.delete_sns_subscription(&resource.physical_id),
85 "AWS::SSM::Parameter" => self.delete_ssm_parameter(&resource.physical_id),
86 "AWS::IAM::Role" => self.delete_iam_role(&resource.physical_id),
87 "AWS::IAM::Policy" => self.delete_iam_policy(&resource.physical_id),
88 "AWS::S3::Bucket" => self.delete_s3_bucket(&resource.physical_id),
89 "AWS::Events::Rule" => self.delete_eventbridge_rule(&resource.physical_id),
90 "AWS::DynamoDB::Table" => self.delete_dynamodb_table(&resource.physical_id),
91 "AWS::Logs::LogGroup" => self.delete_log_group(&resource.physical_id),
92 t if t.starts_with("Custom::") || t == "AWS::CloudFormation::CustomResource" => {
93 self.delete_custom_resource(resource)
94 }
95 other => Err(format!("Unsupported resource type: {other}")),
96 }
97 }
98
99 fn create_sqs_queue(&self, resource: &ResourceDefinition) -> Result<String, String> {
102 let props = &resource.properties;
103 let queue_name = props
104 .get("QueueName")
105 .and_then(|v| v.as_str())
106 .unwrap_or(&resource.logical_id);
107
108 let mut state = self.sqs_state.write();
109 let queue_url = format!("{}/{}/{}", state.endpoint, state.account_id, queue_name);
110 let arn = format!(
111 "arn:aws:sqs:{}:{}:{}",
112 state.region, state.account_id, queue_name
113 );
114
115 let is_fifo = queue_name.ends_with(".fifo");
116 let mut attributes = HashMap::new();
117 if let Some(obj) = props.as_object() {
118 for (k, v) in obj {
119 if k != "QueueName" {
120 if let Some(s) = v.as_str() {
121 attributes.insert(k.clone(), s.to_string());
122 } else if let Some(n) = v.as_i64() {
123 attributes.insert(k.clone(), n.to_string());
124 }
125 }
126 }
127 }
128
129 let queue = SqsQueue {
130 queue_name: queue_name.to_string(),
131 queue_url: queue_url.clone(),
132 arn,
133 created_at: Utc::now(),
134 messages: std::collections::VecDeque::new(),
135 inflight: Vec::new(),
136 attributes,
137 is_fifo,
138 dedup_cache: HashMap::new(),
139 redrive_policy: None,
140 tags: HashMap::new(),
141 next_sequence_number: 0,
142 permission_labels: Vec::new(),
143 receipt_handle_map: HashMap::new(),
144 };
145
146 state
147 .name_to_url
148 .insert(queue_name.to_string(), queue_url.clone());
149 state.queues.insert(queue_url.clone(), queue);
150
151 Ok(queue_url)
152 }
153
154 fn delete_sqs_queue(&self, physical_id: &str) -> Result<(), String> {
155 let mut state = self.sqs_state.write();
156 if let Some(queue) = state.queues.remove(physical_id) {
157 state.name_to_url.remove(&queue.queue_name);
158 }
159 Ok(())
160 }
161
162 fn create_sns_topic(&self, resource: &ResourceDefinition) -> Result<String, String> {
165 let props = &resource.properties;
166 let topic_name = props
167 .get("TopicName")
168 .and_then(|v| v.as_str())
169 .unwrap_or(&resource.logical_id);
170
171 let mut state = self.sns_state.write();
172 let topic_arn = format!(
173 "arn:aws:sns:{}:{}:{}",
174 state.region, state.account_id, topic_name
175 );
176
177 let topic = SnsTopic {
178 topic_arn: topic_arn.clone(),
179 name: topic_name.to_string(),
180 attributes: HashMap::new(),
181 tags: Vec::new(),
182 is_fifo: topic_name.ends_with(".fifo"),
183 created_at: Utc::now(),
184 };
185
186 state.topics.insert(topic_arn.clone(), topic);
187 Ok(topic_arn)
188 }
189
190 fn delete_sns_topic(&self, physical_id: &str) -> Result<(), String> {
191 let mut state = self.sns_state.write();
192 state.topics.remove(physical_id);
193 state
195 .subscriptions
196 .retain(|_, sub| sub.topic_arn != physical_id);
197 Ok(())
198 }
199
200 fn create_sns_subscription(&self, resource: &ResourceDefinition) -> Result<String, String> {
203 let props = &resource.properties;
204 let topic_arn = props
205 .get("TopicArn")
206 .and_then(|v| v.as_str())
207 .ok_or("SNS Subscription requires TopicArn")?;
208 let protocol = props
209 .get("Protocol")
210 .and_then(|v| v.as_str())
211 .ok_or("SNS Subscription requires Protocol")?;
212 let endpoint = props
213 .get("Endpoint")
214 .and_then(|v| v.as_str())
215 .ok_or("SNS Subscription requires Endpoint")?;
216
217 let mut state = self.sns_state.write();
218
219 if !state.topics.contains_key(topic_arn) {
221 return Err(format!("Topic ARN does not exist: {topic_arn}"));
222 }
223
224 let sub_arn = format!("{}:{}", topic_arn, Uuid::new_v4());
225
226 let subscription = SnsSubscription {
227 subscription_arn: sub_arn.clone(),
228 topic_arn: topic_arn.to_string(),
229 protocol: protocol.to_string(),
230 endpoint: endpoint.to_string(),
231 owner: state.account_id.clone(),
232 attributes: HashMap::new(),
233 confirmed: true,
234 confirmation_token: None,
235 };
236
237 state.subscriptions.insert(sub_arn.clone(), subscription);
238 Ok(sub_arn)
239 }
240
241 fn delete_sns_subscription(&self, physical_id: &str) -> Result<(), String> {
242 let mut state = self.sns_state.write();
243 state.subscriptions.remove(physical_id);
244 Ok(())
245 }
246
247 fn create_ssm_parameter(&self, resource: &ResourceDefinition) -> Result<String, String> {
250 let props = &resource.properties;
251 let name = props
252 .get("Name")
253 .and_then(|v| v.as_str())
254 .ok_or("SSM Parameter requires Name")?;
255 let value = props
256 .get("Value")
257 .and_then(|v| v.as_str())
258 .ok_or("SSM Parameter requires Value")?;
259 let param_type = props
260 .get("Type")
261 .and_then(|v| v.as_str())
262 .unwrap_or("String");
263
264 let mut state = self.ssm_state.write();
265 let arn = format!(
266 "arn:aws:ssm:{}:{}:parameter{}",
267 state.region,
268 state.account_id,
269 if name.starts_with('/') {
270 name.to_string()
271 } else {
272 format!("/{name}")
273 }
274 );
275
276 let parameter = SsmParameter {
277 name: name.to_string(),
278 value: value.to_string(),
279 param_type: param_type.to_string(),
280 version: 1,
281 arn: arn.clone(),
282 last_modified: Utc::now(),
283 history: Vec::new(),
284 tags: HashMap::new(),
285 labels: HashMap::new(),
286 description: props
287 .get("Description")
288 .and_then(|v| v.as_str())
289 .map(|s| s.to_string()),
290 allowed_pattern: None,
291 key_id: None,
292 data_type: "text".to_string(),
293 tier: "Standard".to_string(),
294 policies: None,
295 };
296
297 state.parameters.insert(name.to_string(), parameter);
298 Ok(name.to_string())
299 }
300
301 fn delete_ssm_parameter(&self, physical_id: &str) -> Result<(), String> {
302 let mut state = self.ssm_state.write();
303 state.parameters.remove(physical_id);
304 Ok(())
305 }
306
307 fn create_iam_role(&self, resource: &ResourceDefinition) -> Result<String, String> {
310 let props = &resource.properties;
311 let role_name = props
312 .get("RoleName")
313 .and_then(|v| v.as_str())
314 .unwrap_or(&resource.logical_id);
315
316 let assume_role_policy = props
317 .get("AssumeRolePolicyDocument")
318 .map(|v| {
319 if v.is_string() {
320 v.as_str().unwrap().to_string()
321 } else {
322 serde_json::to_string(v).unwrap_or_default()
323 }
324 })
325 .unwrap_or_default();
326
327 let path = props.get("Path").and_then(|v| v.as_str()).unwrap_or("/");
328
329 let mut state = self.iam_state.write();
330 let role_id = format!(
331 "FKIA{}",
332 &Uuid::new_v4().to_string().replace('-', "").to_uppercase()[..16]
333 );
334 let arn = format!(
335 "arn:aws:iam::{}:role{}{}",
336 state.account_id,
337 if path == "/" { "/" } else { path },
338 role_name
339 );
340
341 let role = IamRole {
342 role_name: role_name.to_string(),
343 role_id,
344 arn: arn.clone(),
345 path: path.to_string(),
346 assume_role_policy_document: assume_role_policy,
347 created_at: Utc::now(),
348 description: props
349 .get("Description")
350 .and_then(|v| v.as_str())
351 .map(|s| s.to_string()),
352 max_session_duration: 3600,
353 tags: Vec::new(),
354 permissions_boundary: None,
355 };
356
357 state.roles.insert(role_name.to_string(), role);
358 Ok(arn)
359 }
360
361 fn delete_iam_role(&self, physical_id: &str) -> Result<(), String> {
362 let mut state = self.iam_state.write();
363 let role_name = state
365 .roles
366 .iter()
367 .find(|(_, r)| r.arn == physical_id)
368 .map(|(name, _)| name.clone());
369 if let Some(name) = role_name {
370 state.roles.remove(&name);
371 state.role_policies.remove(&name);
372 state.role_inline_policies.remove(&name);
373 }
374 Ok(())
375 }
376
377 fn create_iam_policy(&self, resource: &ResourceDefinition) -> Result<String, String> {
380 let props = &resource.properties;
381 let policy_name = props
382 .get("PolicyName")
383 .and_then(|v| v.as_str())
384 .unwrap_or(&resource.logical_id);
385
386 let policy_document = props
387 .get("PolicyDocument")
388 .map(|v| {
389 if v.is_string() {
390 v.as_str().unwrap().to_string()
391 } else {
392 serde_json::to_string(v).unwrap_or_default()
393 }
394 })
395 .unwrap_or_default();
396
397 let path = props.get("Path").and_then(|v| v.as_str()).unwrap_or("/");
398
399 let mut state = self.iam_state.write();
400 let policy_id = format!(
401 "FSIA{}",
402 &Uuid::new_v4().to_string().replace('-', "").to_uppercase()[..16]
403 );
404 let arn = format!(
405 "arn:aws:iam::{}:policy{}{}",
406 state.account_id,
407 if path == "/" { "/" } else { path },
408 policy_name
409 );
410
411 let now = Utc::now();
412 let policy = IamPolicy {
413 policy_name: policy_name.to_string(),
414 policy_id,
415 arn: arn.clone(),
416 path: path.to_string(),
417 description: props
418 .get("Description")
419 .and_then(|v| v.as_str())
420 .unwrap_or("")
421 .to_string(),
422 created_at: now,
423 tags: Vec::new(),
424 default_version_id: "v1".to_string(),
425 versions: vec![PolicyVersion {
426 version_id: "v1".to_string(),
427 document: policy_document,
428 is_default: true,
429 created_at: now,
430 }],
431 next_version_num: 2,
432 attachment_count: 0,
433 };
434
435 state.policies.insert(arn.clone(), policy);
436 Ok(arn)
437 }
438
439 fn delete_iam_policy(&self, physical_id: &str) -> Result<(), String> {
440 let mut state = self.iam_state.write();
441 state.policies.remove(physical_id);
442 Ok(())
443 }
444
445 fn create_s3_bucket(&self, resource: &ResourceDefinition) -> Result<String, String> {
448 let props = &resource.properties;
449 let bucket_name = props
450 .get("BucketName")
451 .and_then(|v| v.as_str())
452 .unwrap_or(&resource.logical_id);
453
454 let mut state = self.s3_state.write();
455 let bucket = S3Bucket::new(bucket_name, &state.region, &state.account_id);
456 state.buckets.insert(bucket_name.to_string(), bucket);
457 Ok(bucket_name.to_string())
458 }
459
460 fn delete_s3_bucket(&self, physical_id: &str) -> Result<(), String> {
461 let mut state = self.s3_state.write();
462 state.buckets.remove(physical_id);
463 Ok(())
464 }
465
466 fn create_eventbridge_rule(&self, resource: &ResourceDefinition) -> Result<String, String> {
469 let props = &resource.properties;
470 let rule_name = props
471 .get("Name")
472 .and_then(|v| v.as_str())
473 .unwrap_or(&resource.logical_id);
474 let event_bus_name = props
475 .get("EventBusName")
476 .and_then(|v| v.as_str())
477 .unwrap_or("default");
478
479 let mut state = self.eventbridge_state.write();
480
481 if !state.buses.contains_key(event_bus_name) {
483 return Err(format!("Event bus does not exist: {event_bus_name}"));
484 }
485
486 let arn = if event_bus_name == "default" {
487 format!(
488 "arn:aws:events:{}:{}:rule/{}",
489 state.region, state.account_id, rule_name
490 )
491 } else {
492 format!(
493 "arn:aws:events:{}:{}:rule/{}/{}",
494 state.region, state.account_id, event_bus_name, rule_name
495 )
496 };
497
498 let rule = EventRule {
499 name: rule_name.to_string(),
500 arn: arn.clone(),
501 event_bus_name: event_bus_name.to_string(),
502 event_pattern: props.get("EventPattern").map(|v| {
503 if v.is_string() {
504 v.as_str().unwrap().to_string()
505 } else {
506 serde_json::to_string(v).unwrap_or_default()
507 }
508 }),
509 schedule_expression: props
510 .get("ScheduleExpression")
511 .and_then(|v| v.as_str())
512 .map(|s| s.to_string()),
513 state: props
514 .get("State")
515 .and_then(|v| v.as_str())
516 .unwrap_or("ENABLED")
517 .to_string(),
518 description: props
519 .get("Description")
520 .and_then(|v| v.as_str())
521 .map(|s| s.to_string()),
522 role_arn: props
523 .get("RoleArn")
524 .and_then(|v| v.as_str())
525 .map(|s| s.to_string()),
526 managed_by: None,
527 created_by: None,
528 targets: Vec::new(),
529 tags: HashMap::new(),
530 last_fired: None,
531 };
532
533 state
534 .rules
535 .insert((event_bus_name.to_string(), rule_name.to_string()), rule);
536 Ok(arn)
537 }
538
539 fn delete_eventbridge_rule(&self, physical_id: &str) -> Result<(), String> {
540 let mut state = self.eventbridge_state.write();
541 let key = state
543 .rules
544 .iter()
545 .find(|(_, r)| r.arn == physical_id)
546 .map(|(k, _)| k.clone());
547 if let Some(k) = key {
548 state.rules.remove(&k);
549 }
550 Ok(())
551 }
552
553 fn create_dynamodb_table(&self, resource: &ResourceDefinition) -> Result<String, String> {
556 let props = &resource.properties;
557 let table_name = props
558 .get("TableName")
559 .and_then(|v| v.as_str())
560 .unwrap_or(&resource.logical_id);
561
562 let mut key_schema = Vec::new();
563 if let Some(ks) = props.get("KeySchema").and_then(|v| v.as_array()) {
564 for item in ks {
565 let attr_name = item
566 .get("AttributeName")
567 .and_then(|v| v.as_str())
568 .unwrap_or("")
569 .to_string();
570 let key_type = item
571 .get("KeyType")
572 .and_then(|v| v.as_str())
573 .unwrap_or("HASH")
574 .to_string();
575 key_schema.push(KeySchemaElement {
576 attribute_name: attr_name,
577 key_type,
578 });
579 }
580 }
581
582 let mut attribute_definitions = Vec::new();
583 if let Some(defs) = props.get("AttributeDefinitions").and_then(|v| v.as_array()) {
584 for item in defs {
585 let attr_name = item
586 .get("AttributeName")
587 .and_then(|v| v.as_str())
588 .unwrap_or("")
589 .to_string();
590 let attr_type = item
591 .get("AttributeType")
592 .and_then(|v| v.as_str())
593 .unwrap_or("S")
594 .to_string();
595 attribute_definitions.push(AttributeDefinition {
596 attribute_name: attr_name,
597 attribute_type: attr_type,
598 });
599 }
600 }
601
602 let billing_mode = props
603 .get("BillingMode")
604 .and_then(|v| v.as_str())
605 .unwrap_or("PAY_PER_REQUEST")
606 .to_string();
607
608 let provisioned_throughput = if billing_mode == "PROVISIONED" {
609 if let Some(pt) = props.get("ProvisionedThroughput") {
610 ProvisionedThroughput {
611 read_capacity_units: pt
612 .get("ReadCapacityUnits")
613 .and_then(|v| v.as_i64())
614 .unwrap_or(5),
615 write_capacity_units: pt
616 .get("WriteCapacityUnits")
617 .and_then(|v| v.as_i64())
618 .unwrap_or(5),
619 }
620 } else {
621 ProvisionedThroughput {
622 read_capacity_units: 5,
623 write_capacity_units: 5,
624 }
625 }
626 } else {
627 ProvisionedThroughput {
628 read_capacity_units: 0,
629 write_capacity_units: 0,
630 }
631 };
632
633 let (stream_enabled, stream_view_type) =
635 if let Some(stream_spec) = props.get("StreamSpecification") {
636 let view_type = stream_spec
637 .get("StreamViewType")
638 .and_then(|v| v.as_str())
639 .map(|s| s.to_string());
640 let enabled = stream_spec
641 .get("StreamEnabled")
642 .and_then(|v| v.as_bool().or_else(|| v.as_str().map(|s| s == "true")))
643 .unwrap_or(view_type.is_some());
645 (enabled, view_type)
646 } else {
647 (false, None)
648 };
649
650 let mut state = self.dynamodb_state.write();
651 let arn = format!(
652 "arn:aws:dynamodb:{}:{}:table/{}",
653 state.region, state.account_id, table_name
654 );
655
656 let stream_arn = if stream_enabled {
657 Some(format!(
658 "{}/stream/{}",
659 arn,
660 Utc::now().format("%Y-%m-%dT%H:%M:%S.%3f")
661 ))
662 } else {
663 None
664 };
665
666 let table = DynamoTable {
667 name: table_name.to_string(),
668 arn: arn.clone(),
669 table_id: Uuid::new_v4().to_string().replace('-', ""),
670 key_schema,
671 attribute_definitions,
672 provisioned_throughput,
673 items: Vec::new(),
674 gsi: Vec::new(),
675 lsi: Vec::new(),
676 tags: HashMap::new(),
677 created_at: Utc::now(),
678 status: "ACTIVE".to_string(),
679 item_count: 0,
680 size_bytes: 0,
681 billing_mode,
682 ttl_attribute: None,
683 ttl_enabled: false,
684 resource_policy: None,
685 pitr_enabled: false,
686 kinesis_destinations: Vec::new(),
687 contributor_insights_status: "DISABLED".to_string(),
688 contributor_insights_counters: HashMap::new(),
689 stream_enabled,
690 stream_view_type,
691 stream_arn,
692 stream_records: Arc::new(RwLock::new(Vec::new())),
693 sse_type: None,
694 sse_kms_key_arn: None,
695 };
696
697 state.tables.insert(table_name.to_string(), table);
698 Ok(arn)
699 }
700
701 fn delete_dynamodb_table(&self, physical_id: &str) -> Result<(), String> {
702 let mut state = self.dynamodb_state.write();
703 let table_name = state
705 .tables
706 .iter()
707 .find(|(_, t)| t.arn == physical_id)
708 .map(|(name, _)| name.clone());
709 if let Some(name) = table_name {
710 state.tables.remove(&name);
711 }
712 Ok(())
713 }
714
715 fn create_log_group(&self, resource: &ResourceDefinition) -> Result<String, String> {
718 let props = &resource.properties;
719 let log_group_name = props
720 .get("LogGroupName")
721 .and_then(|v| v.as_str())
722 .unwrap_or(&resource.logical_id);
723
724 let retention_in_days = props
725 .get("RetentionInDays")
726 .and_then(|v| v.as_i64())
727 .map(|v| v as i32);
728
729 let mut state = self.logs_state.write();
730 let arn = format!(
731 "arn:aws:logs:{}:{}:log-group:{}:*",
732 state.region, state.account_id, log_group_name
733 );
734
735 let log_group = fakecloud_logs::state::LogGroup {
736 name: log_group_name.to_string(),
737 arn: arn.clone(),
738 creation_time: Utc::now().timestamp_millis(),
739 retention_in_days,
740 kms_key_id: None,
741 stored_bytes: 0,
742 log_streams: HashMap::new(),
743 tags: HashMap::new(),
744 subscription_filters: Vec::new(),
745 data_protection_policy: None,
746 index_policies: Vec::new(),
747 transformer: None,
748 deletion_protection: false,
749 };
750
751 state
752 .log_groups
753 .insert(log_group_name.to_string(), log_group);
754 Ok(arn)
755 }
756
757 fn delete_log_group(&self, physical_id: &str) -> Result<(), String> {
758 let mut state = self.logs_state.write();
759 let name = state
761 .log_groups
762 .iter()
763 .find(|(_, g)| g.arn == physical_id)
764 .map(|(name, _)| name.clone());
765 if let Some(name) = name {
766 state.log_groups.remove(&name);
767 }
768 Ok(())
769 }
770
771 fn invoke_lambda_sync(&self, function_arn: &str, payload: &str) -> Result<(), String> {
775 let delivery = self.delivery.clone();
776 let function_arn = function_arn.to_string();
777 let payload = payload.to_string();
778 std::thread::scope(|s| {
779 s.spawn(|| {
780 let rt = tokio::runtime::Builder::new_current_thread()
781 .enable_all()
782 .build()
783 .map_err(|e| format!("Failed to create runtime: {e}"))?;
784 rt.block_on(async {
785 match delivery.invoke_lambda(&function_arn, &payload).await {
786 Some(Ok(_)) => {
787 tracing::info!(
788 "Custom resource Lambda {} invoked successfully",
789 function_arn
790 );
791 Ok(())
792 }
793 Some(Err(e)) => {
794 tracing::warn!(
795 "Custom resource Lambda {} invocation failed: {e}",
796 function_arn
797 );
798 Err(format!("Lambda invocation failed: {e}"))
799 }
800 None => {
801 tracing::warn!(
802 "No Lambda delivery configured; skipping custom resource invocation for {}",
803 function_arn
804 );
805 Ok(())
806 }
807 }
808 })
809 })
810 .join()
811 .map_err(|_| "Lambda invocation thread panicked".to_string())?
812 })
813 }
814
815 fn create_custom_resource(&self, resource: &ResourceDefinition) -> Result<String, String> {
816 let props = &resource.properties;
817 let service_token = props
818 .get("ServiceToken")
819 .and_then(|v| v.as_str())
820 .ok_or("Custom resource requires ServiceToken property")?;
821
822 let request_id = Uuid::new_v4().to_string();
823
824 let event = serde_json::json!({
826 "RequestType": "Create",
827 "ServiceToken": service_token,
828 "StackId": self.stack_id,
829 "RequestId": request_id,
830 "ResourceType": resource.resource_type,
831 "LogicalResourceId": resource.logical_id,
832 "ResourceProperties": props,
833 });
834
835 let payload = serde_json::to_string(&event).map_err(|e| e.to_string())?;
836 self.invoke_lambda_sync(service_token, &payload)?;
837
838 let physical_id = format!("{}-{}", resource.logical_id, &request_id[..8]);
841 Ok(physical_id)
842 }
843
844 fn delete_custom_resource(&self, resource: &StackResource) -> Result<(), String> {
845 let service_token = match &resource.service_token {
846 Some(token) => token.clone(),
847 None => {
848 return Ok(());
850 }
851 };
852
853 let request_id = Uuid::new_v4().to_string();
854
855 let event = serde_json::json!({
856 "RequestType": "Delete",
857 "ServiceToken": service_token,
858 "StackId": self.stack_id,
859 "RequestId": request_id,
860 "ResourceType": resource.resource_type,
861 "LogicalResourceId": resource.logical_id,
862 "PhysicalResourceId": resource.physical_id,
863 });
864
865 let payload = serde_json::to_string(&event).map_err(|e| e.to_string())?;
866
867 if let Err(e) = self.invoke_lambda_sync(&service_token, &payload) {
869 tracing::warn!(
870 "Custom resource delete Lambda invocation failed for {}: {e}",
871 resource.logical_id
872 );
873 }
874 Ok(())
875 }
876}
877
878#[cfg(test)]
879mod tests {
880 use super::*;
881 use parking_lot::RwLock;
882
883 fn make_provisioner() -> ResourceProvisioner {
884 ResourceProvisioner {
885 sqs_state: Arc::new(RwLock::new(fakecloud_sqs::state::SqsState::new(
886 "123456789012",
887 "us-east-1",
888 "http://localhost:4566",
889 ))),
890 sns_state: Arc::new(RwLock::new(fakecloud_sns::state::SnsState::new(
891 "123456789012",
892 "us-east-1",
893 "http://localhost:4566",
894 ))),
895 ssm_state: Arc::new(RwLock::new(fakecloud_ssm::state::SsmState::new(
896 "123456789012",
897 "us-east-1",
898 ))),
899 iam_state: Arc::new(RwLock::new(fakecloud_iam::state::IamState::new(
900 "123456789012",
901 ))),
902 s3_state: Arc::new(RwLock::new(fakecloud_s3::state::S3State::new(
903 "123456789012",
904 "us-east-1",
905 ))),
906 eventbridge_state: Arc::new(RwLock::new(
907 fakecloud_eventbridge::state::EventBridgeState::new("123456789012", "us-east-1"),
908 )),
909 dynamodb_state: Arc::new(RwLock::new(fakecloud_dynamodb::state::DynamoDbState::new(
910 "123456789012",
911 "us-east-1",
912 ))),
913 logs_state: Arc::new(RwLock::new(fakecloud_logs::state::LogsState::new(
914 "123456789012",
915 "us-east-1",
916 ))),
917 delivery: Arc::new(DeliveryBus::new()),
918 account_id: "123456789012".to_string(),
919 region: "us-east-1".to_string(),
920 stack_id: "arn:aws:cloudformation:us-east-1:123456789012:stack/test/00000000-0000-0000-0000-000000000000".to_string(),
921 }
922 }
923
924 fn make_resource(
925 resource_type: &str,
926 logical_id: &str,
927 props: serde_json::Value,
928 ) -> ResourceDefinition {
929 ResourceDefinition {
930 logical_id: logical_id.to_string(),
931 resource_type: resource_type.to_string(),
932 properties: props,
933 }
934 }
935
936 #[test]
937 fn sns_subscription_rejects_nonexistent_topic() {
938 let prov = make_provisioner();
939 let resource = make_resource(
940 "AWS::SNS::Subscription",
941 "MySub",
942 serde_json::json!({
943 "TopicArn": "arn:aws:sns:us-east-1:123456789012:NonExistent",
944 "Protocol": "sqs",
945 "Endpoint": "arn:aws:sqs:us-east-1:123456789012:my-queue"
946 }),
947 );
948 let result = prov.create_resource(&resource);
949 assert!(result.is_err());
950 assert!(result.unwrap_err().contains("does not exist"));
951 }
952
953 #[test]
954 fn sns_subscription_succeeds_when_topic_exists() {
955 let prov = make_provisioner();
956 let topic = make_resource(
958 "AWS::SNS::Topic",
959 "MyTopic",
960 serde_json::json!({ "TopicName": "my-topic" }),
961 );
962 let topic_result = prov.create_resource(&topic);
963 assert!(topic_result.is_ok());
964 let topic_arn = topic_result.unwrap().physical_id;
965
966 let sub = make_resource(
968 "AWS::SNS::Subscription",
969 "MySub",
970 serde_json::json!({
971 "TopicArn": topic_arn,
972 "Protocol": "sqs",
973 "Endpoint": "arn:aws:sqs:us-east-1:123456789012:my-queue"
974 }),
975 );
976 let result = prov.create_resource(&sub);
977 assert!(result.is_ok());
978 }
979
980 #[test]
981 fn eventbridge_rule_arn_default_bus_omits_bus_name() {
982 let prov = make_provisioner();
983 let resource = make_resource(
984 "AWS::Events::Rule",
985 "MyRule",
986 serde_json::json!({
987 "Name": "my-rule",
988 "ScheduleExpression": "rate(1 hour)"
989 }),
990 );
991 let result = prov.create_resource(&resource).unwrap();
992 assert_eq!(
994 result.physical_id,
995 "arn:aws:events:us-east-1:123456789012:rule/my-rule"
996 );
997 assert!(!result.physical_id.contains("rule/default/"));
998 }
999
1000 #[test]
1001 fn eventbridge_rule_arn_custom_bus_includes_bus_name() {
1002 let prov = make_provisioner();
1003 {
1005 let mut state = prov.eventbridge_state.write();
1006 state.buses.insert(
1007 "custom-bus".to_string(),
1008 fakecloud_eventbridge::state::EventBus {
1009 name: "custom-bus".to_string(),
1010 arn: "arn:aws:events:us-east-1:123456789012:event-bus/custom-bus".to_string(),
1011 policy: None,
1012 creation_time: Utc::now(),
1013 last_modified_time: Utc::now(),
1014 description: None,
1015 kms_key_identifier: None,
1016 dead_letter_config: None,
1017 tags: HashMap::new(),
1018 },
1019 );
1020 }
1021 let resource = make_resource(
1022 "AWS::Events::Rule",
1023 "MyRule",
1024 serde_json::json!({
1025 "Name": "my-rule",
1026 "EventBusName": "custom-bus",
1027 "ScheduleExpression": "rate(1 hour)"
1028 }),
1029 );
1030 let result = prov.create_resource(&resource).unwrap();
1031 assert_eq!(
1032 result.physical_id,
1033 "arn:aws:events:us-east-1:123456789012:rule/custom-bus/my-rule"
1034 );
1035 }
1036
1037 #[test]
1038 fn eventbridge_rule_rejects_nonexistent_bus() {
1039 let prov = make_provisioner();
1040 let resource = make_resource(
1041 "AWS::Events::Rule",
1042 "MyRule",
1043 serde_json::json!({
1044 "Name": "my-rule",
1045 "EventBusName": "nonexistent-bus",
1046 "ScheduleExpression": "rate(1 hour)"
1047 }),
1048 );
1049 let result = prov.create_resource(&resource);
1050 assert!(result.is_err());
1051 assert!(result.unwrap_err().contains("does not exist"));
1052 }
1053
1054 #[test]
1055 fn custom_resource_requires_service_token() {
1056 let prov = make_provisioner();
1057 let resource = make_resource(
1058 "Custom::MyResource",
1059 "MyCustom",
1060 serde_json::json!({
1061 "Foo": "bar"
1062 }),
1063 );
1064 let result = prov.create_resource(&resource);
1065 assert!(result.is_err());
1066 assert!(
1067 result.unwrap_err().contains("ServiceToken"),
1068 "Should require ServiceToken property"
1069 );
1070 }
1071
1072 #[test]
1073 fn custom_resource_succeeds_without_lambda_delivery() {
1074 let prov = make_provisioner();
1077 let resource = make_resource(
1078 "Custom::MyResource",
1079 "MyCustom",
1080 serde_json::json!({
1081 "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:my-func",
1082 "Foo": "bar"
1083 }),
1084 );
1085 let result = prov.create_resource(&resource);
1086 assert!(result.is_ok());
1087 let sr = result.unwrap();
1088 assert_eq!(sr.logical_id, "MyCustom");
1089 assert_eq!(sr.resource_type, "Custom::MyResource");
1090 assert!(sr.physical_id.starts_with("MyCustom-"));
1091 }
1092
1093 #[test]
1094 fn cloudformation_custom_resource_type_succeeds() {
1095 let prov = make_provisioner();
1096 let resource = make_resource(
1097 "AWS::CloudFormation::CustomResource",
1098 "MyCustom2",
1099 serde_json::json!({
1100 "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:my-func",
1101 "Key": "value"
1102 }),
1103 );
1104 let result = prov.create_resource(&resource);
1105 assert!(result.is_ok());
1106 let sr = result.unwrap();
1107 assert_eq!(sr.resource_type, "AWS::CloudFormation::CustomResource");
1108 }
1109}