1use chrono::Utc;
2use parking_lot::RwLock;
3use std::collections::HashMap;
4use std::sync::Arc;
5use uuid::Uuid;
6
7use fakecloud_core::delivery::DeliveryBus;
8use fakecloud_dynamodb::state::{
9 AttributeDefinition, DynamoTable, KeySchemaElement, ProvisionedThroughput, SharedDynamoDbState,
10};
11use fakecloud_eventbridge::state::{EventRule, SharedEventBridgeState};
12use fakecloud_iam::state::{IamPolicy, IamRole, PolicyVersion, SharedIamState};
13use fakecloud_logs::state::SharedLogsState;
14use fakecloud_s3::state::{S3Bucket, SharedS3State};
15use fakecloud_sns::state::{SharedSnsState, SnsSubscription, SnsTopic};
16use fakecloud_sqs::state::{SharedSqsState, SqsQueue};
17use fakecloud_ssm::state::{SharedSsmState, SsmParameter};
18
19use crate::state::StackResource;
20use crate::template::ResourceDefinition;
21
22pub struct ResourceProvisioner {
24 pub sqs_state: SharedSqsState,
25 pub sns_state: SharedSnsState,
26 pub ssm_state: SharedSsmState,
27 pub iam_state: SharedIamState,
28 pub s3_state: SharedS3State,
29 pub eventbridge_state: SharedEventBridgeState,
30 pub dynamodb_state: SharedDynamoDbState,
31 pub logs_state: SharedLogsState,
32 pub delivery: Arc<DeliveryBus>,
33 pub account_id: String,
34 pub region: String,
35 pub stack_id: String,
36}
37
38impl ResourceProvisioner {
39 pub fn create_resource(&self, resource: &ResourceDefinition) -> Result<StackResource, String> {
41 let result = match resource.resource_type.as_str() {
42 "AWS::SQS::Queue" => self.create_sqs_queue(resource),
43 "AWS::SNS::Topic" => self.create_sns_topic(resource),
44 "AWS::SNS::Subscription" => self.create_sns_subscription(resource),
45 "AWS::SSM::Parameter" => self.create_ssm_parameter(resource),
46 "AWS::IAM::Role" => self.create_iam_role(resource),
47 "AWS::IAM::Policy" => self.create_iam_policy(resource),
48 "AWS::S3::Bucket" => self.create_s3_bucket(resource),
49 "AWS::Events::Rule" => self.create_eventbridge_rule(resource),
50 "AWS::DynamoDB::Table" => self.create_dynamodb_table(resource),
51 "AWS::Logs::LogGroup" => self.create_log_group(resource),
52 t if t.starts_with("Custom::") || t == "AWS::CloudFormation::CustomResource" => {
53 self.create_custom_resource(resource)
54 }
55 other => Err(format!("Unsupported resource type: {other}")),
56 };
57
58 let is_custom = resource.resource_type.starts_with("Custom::")
59 || resource.resource_type == "AWS::CloudFormation::CustomResource";
60 let service_token = if is_custom {
61 resource
62 .properties
63 .get("ServiceToken")
64 .and_then(|v| v.as_str())
65 .map(|s| s.to_string())
66 } else {
67 None
68 };
69
70 result.map(|physical_id| StackResource {
71 logical_id: resource.logical_id.clone(),
72 physical_id,
73 resource_type: resource.resource_type.clone(),
74 status: "CREATE_COMPLETE".to_string(),
75 service_token,
76 })
77 }
78
79 pub fn delete_resource(&self, resource: &StackResource) -> Result<(), String> {
81 match resource.resource_type.as_str() {
82 "AWS::SQS::Queue" => self.delete_sqs_queue(&resource.physical_id),
83 "AWS::SNS::Topic" => self.delete_sns_topic(&resource.physical_id),
84 "AWS::SNS::Subscription" => self.delete_sns_subscription(&resource.physical_id),
85 "AWS::SSM::Parameter" => self.delete_ssm_parameter(&resource.physical_id),
86 "AWS::IAM::Role" => self.delete_iam_role(&resource.physical_id),
87 "AWS::IAM::Policy" => self.delete_iam_policy(&resource.physical_id),
88 "AWS::S3::Bucket" => self.delete_s3_bucket(&resource.physical_id),
89 "AWS::Events::Rule" => self.delete_eventbridge_rule(&resource.physical_id),
90 "AWS::DynamoDB::Table" => self.delete_dynamodb_table(&resource.physical_id),
91 "AWS::Logs::LogGroup" => self.delete_log_group(&resource.physical_id),
92 t if t.starts_with("Custom::") || t == "AWS::CloudFormation::CustomResource" => {
93 self.delete_custom_resource(resource)
94 }
95 other => Err(format!("Unsupported resource type: {other}")),
96 }
97 }
98
99 fn create_sqs_queue(&self, resource: &ResourceDefinition) -> Result<String, String> {
102 let props = &resource.properties;
103 let queue_name = props
104 .get("QueueName")
105 .and_then(|v| v.as_str())
106 .unwrap_or(&resource.logical_id);
107
108 let mut state = self.sqs_state.write();
109 let queue_url = format!("{}/{}/{}", state.endpoint, state.account_id, queue_name);
110 let arn = format!(
111 "arn:aws:sqs:{}:{}:{}",
112 state.region, state.account_id, queue_name
113 );
114
115 let is_fifo = queue_name.ends_with(".fifo");
116 let mut attributes = HashMap::new();
117 if let Some(obj) = props.as_object() {
118 for (k, v) in obj {
119 if k != "QueueName" {
120 if let Some(s) = v.as_str() {
121 attributes.insert(k.clone(), s.to_string());
122 } else if let Some(n) = v.as_i64() {
123 attributes.insert(k.clone(), n.to_string());
124 }
125 }
126 }
127 }
128
129 let queue = SqsQueue {
130 queue_name: queue_name.to_string(),
131 queue_url: queue_url.clone(),
132 arn,
133 created_at: Utc::now(),
134 messages: std::collections::VecDeque::new(),
135 inflight: Vec::new(),
136 attributes,
137 is_fifo,
138 dedup_cache: HashMap::new(),
139 redrive_policy: None,
140 tags: HashMap::new(),
141 next_sequence_number: 0,
142 permission_labels: Vec::new(),
143 receipt_handle_map: HashMap::new(),
144 };
145
146 state
147 .name_to_url
148 .insert(queue_name.to_string(), queue_url.clone());
149 state.queues.insert(queue_url.clone(), queue);
150
151 Ok(queue_url)
152 }
153
154 fn delete_sqs_queue(&self, physical_id: &str) -> Result<(), String> {
155 let mut state = self.sqs_state.write();
156 if let Some(queue) = state.queues.remove(physical_id) {
157 state.name_to_url.remove(&queue.queue_name);
158 }
159 Ok(())
160 }
161
162 fn create_sns_topic(&self, resource: &ResourceDefinition) -> Result<String, String> {
165 let props = &resource.properties;
166 let topic_name = props
167 .get("TopicName")
168 .and_then(|v| v.as_str())
169 .unwrap_or(&resource.logical_id);
170
171 let mut state = self.sns_state.write();
172 let topic_arn = format!(
173 "arn:aws:sns:{}:{}:{}",
174 state.region, state.account_id, topic_name
175 );
176
177 let topic = SnsTopic {
178 topic_arn: topic_arn.clone(),
179 name: topic_name.to_string(),
180 attributes: HashMap::new(),
181 tags: Vec::new(),
182 is_fifo: topic_name.ends_with(".fifo"),
183 created_at: Utc::now(),
184 };
185
186 state.topics.insert(topic_arn.clone(), topic);
187 Ok(topic_arn)
188 }
189
190 fn delete_sns_topic(&self, physical_id: &str) -> Result<(), String> {
191 let mut state = self.sns_state.write();
192 state.topics.remove(physical_id);
193 state
195 .subscriptions
196 .retain(|_, sub| sub.topic_arn != physical_id);
197 Ok(())
198 }
199
200 fn create_sns_subscription(&self, resource: &ResourceDefinition) -> Result<String, String> {
203 let props = &resource.properties;
204 let topic_arn = props
205 .get("TopicArn")
206 .and_then(|v| v.as_str())
207 .ok_or("SNS Subscription requires TopicArn")?;
208 let protocol = props
209 .get("Protocol")
210 .and_then(|v| v.as_str())
211 .ok_or("SNS Subscription requires Protocol")?;
212 let endpoint = props
213 .get("Endpoint")
214 .and_then(|v| v.as_str())
215 .ok_or("SNS Subscription requires Endpoint")?;
216
217 let mut state = self.sns_state.write();
218
219 if !state.topics.contains_key(topic_arn) {
221 return Err(format!("Topic ARN does not exist: {topic_arn}"));
222 }
223
224 let sub_arn = format!("{}:{}", topic_arn, Uuid::new_v4());
225
226 let subscription = SnsSubscription {
227 subscription_arn: sub_arn.clone(),
228 topic_arn: topic_arn.to_string(),
229 protocol: protocol.to_string(),
230 endpoint: endpoint.to_string(),
231 owner: state.account_id.clone(),
232 attributes: HashMap::new(),
233 confirmed: true,
234 confirmation_token: None,
235 };
236
237 state.subscriptions.insert(sub_arn.clone(), subscription);
238 Ok(sub_arn)
239 }
240
241 fn delete_sns_subscription(&self, physical_id: &str) -> Result<(), String> {
242 let mut state = self.sns_state.write();
243 state.subscriptions.remove(physical_id);
244 Ok(())
245 }
246
247 fn create_ssm_parameter(&self, resource: &ResourceDefinition) -> Result<String, String> {
250 let props = &resource.properties;
251 let name = props
252 .get("Name")
253 .and_then(|v| v.as_str())
254 .ok_or("SSM Parameter requires Name")?;
255 let value = props
256 .get("Value")
257 .and_then(|v| v.as_str())
258 .ok_or("SSM Parameter requires Value")?;
259 let param_type = props
260 .get("Type")
261 .and_then(|v| v.as_str())
262 .unwrap_or("String");
263
264 let mut state = self.ssm_state.write();
265 let arn = format!(
266 "arn:aws:ssm:{}:{}:parameter{}",
267 state.region,
268 state.account_id,
269 if name.starts_with('/') {
270 name.to_string()
271 } else {
272 format!("/{name}")
273 }
274 );
275
276 let parameter = SsmParameter {
277 name: name.to_string(),
278 value: value.to_string(),
279 param_type: param_type.to_string(),
280 version: 1,
281 arn: arn.clone(),
282 last_modified: Utc::now(),
283 history: Vec::new(),
284 tags: HashMap::new(),
285 labels: HashMap::new(),
286 description: props
287 .get("Description")
288 .and_then(|v| v.as_str())
289 .map(|s| s.to_string()),
290 allowed_pattern: None,
291 key_id: None,
292 data_type: "text".to_string(),
293 tier: "Standard".to_string(),
294 policies: None,
295 };
296
297 state.parameters.insert(name.to_string(), parameter);
298 Ok(name.to_string())
299 }
300
301 fn delete_ssm_parameter(&self, physical_id: &str) -> Result<(), String> {
302 let mut state = self.ssm_state.write();
303 state.parameters.remove(physical_id);
304 Ok(())
305 }
306
307 fn create_iam_role(&self, resource: &ResourceDefinition) -> Result<String, String> {
310 let props = &resource.properties;
311 let role_name = props
312 .get("RoleName")
313 .and_then(|v| v.as_str())
314 .unwrap_or(&resource.logical_id);
315
316 let assume_role_policy = props
317 .get("AssumeRolePolicyDocument")
318 .map(|v| {
319 if v.is_string() {
320 v.as_str().unwrap().to_string()
321 } else {
322 serde_json::to_string(v).unwrap_or_default()
323 }
324 })
325 .unwrap_or_default();
326
327 let path = props.get("Path").and_then(|v| v.as_str()).unwrap_or("/");
328
329 let mut state = self.iam_state.write();
330 let role_id = format!(
331 "FKIA{}",
332 &Uuid::new_v4().to_string().replace('-', "").to_uppercase()[..16]
333 );
334 let arn = format!(
335 "arn:aws:iam::{}:role{}{}",
336 state.account_id,
337 if path == "/" { "/" } else { path },
338 role_name
339 );
340
341 let role = IamRole {
342 role_name: role_name.to_string(),
343 role_id,
344 arn: arn.clone(),
345 path: path.to_string(),
346 assume_role_policy_document: assume_role_policy,
347 created_at: Utc::now(),
348 description: props
349 .get("Description")
350 .and_then(|v| v.as_str())
351 .map(|s| s.to_string()),
352 max_session_duration: 3600,
353 tags: Vec::new(),
354 permissions_boundary: None,
355 };
356
357 state.roles.insert(role_name.to_string(), role);
358 Ok(arn)
359 }
360
361 fn delete_iam_role(&self, physical_id: &str) -> Result<(), String> {
362 let mut state = self.iam_state.write();
363 let role_name = state
365 .roles
366 .iter()
367 .find(|(_, r)| r.arn == physical_id)
368 .map(|(name, _)| name.clone());
369 if let Some(name) = role_name {
370 state.roles.remove(&name);
371 state.role_policies.remove(&name);
372 state.role_inline_policies.remove(&name);
373 }
374 Ok(())
375 }
376
377 fn create_iam_policy(&self, resource: &ResourceDefinition) -> Result<String, String> {
380 let props = &resource.properties;
381 let policy_name = props
382 .get("PolicyName")
383 .and_then(|v| v.as_str())
384 .unwrap_or(&resource.logical_id);
385
386 let policy_document = props
387 .get("PolicyDocument")
388 .map(|v| {
389 if v.is_string() {
390 v.as_str().unwrap().to_string()
391 } else {
392 serde_json::to_string(v).unwrap_or_default()
393 }
394 })
395 .unwrap_or_default();
396
397 let path = props.get("Path").and_then(|v| v.as_str()).unwrap_or("/");
398
399 let mut state = self.iam_state.write();
400 let policy_id = format!(
401 "FSIA{}",
402 &Uuid::new_v4().to_string().replace('-', "").to_uppercase()[..16]
403 );
404 let arn = format!(
405 "arn:aws:iam::{}:policy{}{}",
406 state.account_id,
407 if path == "/" { "/" } else { path },
408 policy_name
409 );
410
411 let now = Utc::now();
412 let policy = IamPolicy {
413 policy_name: policy_name.to_string(),
414 policy_id,
415 arn: arn.clone(),
416 path: path.to_string(),
417 description: props
418 .get("Description")
419 .and_then(|v| v.as_str())
420 .unwrap_or("")
421 .to_string(),
422 created_at: now,
423 tags: Vec::new(),
424 default_version_id: "v1".to_string(),
425 versions: vec![PolicyVersion {
426 version_id: "v1".to_string(),
427 document: policy_document,
428 is_default: true,
429 created_at: now,
430 }],
431 next_version_num: 2,
432 attachment_count: 0,
433 };
434
435 state.policies.insert(arn.clone(), policy);
436 Ok(arn)
437 }
438
439 fn delete_iam_policy(&self, physical_id: &str) -> Result<(), String> {
440 let mut state = self.iam_state.write();
441 state.policies.remove(physical_id);
442 Ok(())
443 }
444
445 fn create_s3_bucket(&self, resource: &ResourceDefinition) -> Result<String, String> {
448 let props = &resource.properties;
449 let bucket_name = props
450 .get("BucketName")
451 .and_then(|v| v.as_str())
452 .unwrap_or(&resource.logical_id);
453
454 let mut state = self.s3_state.write();
455 let bucket = S3Bucket::new(bucket_name, &state.region, &state.account_id);
456 state.buckets.insert(bucket_name.to_string(), bucket);
457 Ok(bucket_name.to_string())
458 }
459
460 fn delete_s3_bucket(&self, physical_id: &str) -> Result<(), String> {
461 let mut state = self.s3_state.write();
462 state.buckets.remove(physical_id);
463 Ok(())
464 }
465
466 fn create_eventbridge_rule(&self, resource: &ResourceDefinition) -> Result<String, String> {
469 let props = &resource.properties;
470 let rule_name = props
471 .get("Name")
472 .and_then(|v| v.as_str())
473 .unwrap_or(&resource.logical_id);
474 let event_bus_name = props
475 .get("EventBusName")
476 .and_then(|v| v.as_str())
477 .unwrap_or("default");
478
479 let mut state = self.eventbridge_state.write();
480
481 if !state.buses.contains_key(event_bus_name) {
483 return Err(format!("Event bus does not exist: {event_bus_name}"));
484 }
485
486 let arn = if event_bus_name == "default" {
487 format!(
488 "arn:aws:events:{}:{}:rule/{}",
489 state.region, state.account_id, rule_name
490 )
491 } else {
492 format!(
493 "arn:aws:events:{}:{}:rule/{}/{}",
494 state.region, state.account_id, event_bus_name, rule_name
495 )
496 };
497
498 let rule = EventRule {
499 name: rule_name.to_string(),
500 arn: arn.clone(),
501 event_bus_name: event_bus_name.to_string(),
502 event_pattern: props.get("EventPattern").map(|v| {
503 if v.is_string() {
504 v.as_str().unwrap().to_string()
505 } else {
506 serde_json::to_string(v).unwrap_or_default()
507 }
508 }),
509 schedule_expression: props
510 .get("ScheduleExpression")
511 .and_then(|v| v.as_str())
512 .map(|s| s.to_string()),
513 state: props
514 .get("State")
515 .and_then(|v| v.as_str())
516 .unwrap_or("ENABLED")
517 .to_string(),
518 description: props
519 .get("Description")
520 .and_then(|v| v.as_str())
521 .map(|s| s.to_string()),
522 role_arn: props
523 .get("RoleArn")
524 .and_then(|v| v.as_str())
525 .map(|s| s.to_string()),
526 managed_by: None,
527 created_by: None,
528 targets: Vec::new(),
529 tags: HashMap::new(),
530 last_fired: None,
531 };
532
533 state
534 .rules
535 .insert((event_bus_name.to_string(), rule_name.to_string()), rule);
536 Ok(arn)
537 }
538
539 fn delete_eventbridge_rule(&self, physical_id: &str) -> Result<(), String> {
540 let mut state = self.eventbridge_state.write();
541 let key = state
543 .rules
544 .iter()
545 .find(|(_, r)| r.arn == physical_id)
546 .map(|(k, _)| k.clone());
547 if let Some(k) = key {
548 state.rules.remove(&k);
549 }
550 Ok(())
551 }
552
553 fn create_dynamodb_table(&self, resource: &ResourceDefinition) -> Result<String, String> {
556 let props = &resource.properties;
557 let table_name = props
558 .get("TableName")
559 .and_then(|v| v.as_str())
560 .unwrap_or(&resource.logical_id);
561
562 let mut key_schema = Vec::new();
563 if let Some(ks) = props.get("KeySchema").and_then(|v| v.as_array()) {
564 for item in ks {
565 let attr_name = item
566 .get("AttributeName")
567 .and_then(|v| v.as_str())
568 .unwrap_or("")
569 .to_string();
570 let key_type = item
571 .get("KeyType")
572 .and_then(|v| v.as_str())
573 .unwrap_or("HASH")
574 .to_string();
575 key_schema.push(KeySchemaElement {
576 attribute_name: attr_name,
577 key_type,
578 });
579 }
580 }
581
582 let mut attribute_definitions = Vec::new();
583 if let Some(defs) = props.get("AttributeDefinitions").and_then(|v| v.as_array()) {
584 for item in defs {
585 let attr_name = item
586 .get("AttributeName")
587 .and_then(|v| v.as_str())
588 .unwrap_or("")
589 .to_string();
590 let attr_type = item
591 .get("AttributeType")
592 .and_then(|v| v.as_str())
593 .unwrap_or("S")
594 .to_string();
595 attribute_definitions.push(AttributeDefinition {
596 attribute_name: attr_name,
597 attribute_type: attr_type,
598 });
599 }
600 }
601
602 let billing_mode = props
603 .get("BillingMode")
604 .and_then(|v| v.as_str())
605 .unwrap_or("PAY_PER_REQUEST")
606 .to_string();
607
608 let provisioned_throughput = if billing_mode == "PROVISIONED" {
609 if let Some(pt) = props.get("ProvisionedThroughput") {
610 ProvisionedThroughput {
611 read_capacity_units: pt
612 .get("ReadCapacityUnits")
613 .and_then(|v| v.as_i64())
614 .unwrap_or(5),
615 write_capacity_units: pt
616 .get("WriteCapacityUnits")
617 .and_then(|v| v.as_i64())
618 .unwrap_or(5),
619 }
620 } else {
621 ProvisionedThroughput {
622 read_capacity_units: 5,
623 write_capacity_units: 5,
624 }
625 }
626 } else {
627 ProvisionedThroughput {
628 read_capacity_units: 0,
629 write_capacity_units: 0,
630 }
631 };
632
633 let (stream_enabled, stream_view_type) =
635 if let Some(stream_spec) = props.get("StreamSpecification") {
636 let view_type = stream_spec
637 .get("StreamViewType")
638 .and_then(|v| v.as_str())
639 .map(|s| s.to_string());
640 let enabled = stream_spec
641 .get("StreamEnabled")
642 .and_then(|v| v.as_bool().or_else(|| v.as_str().map(|s| s == "true")))
643 .unwrap_or(view_type.is_some());
645 (enabled, view_type)
646 } else {
647 (false, None)
648 };
649
650 let deletion_protection_enabled = props
651 .get("DeletionProtectionEnabled")
652 .and_then(|v| v.as_bool().or_else(|| v.as_str().map(|s| s == "true")))
653 .unwrap_or(false);
654
655 let mut state = self.dynamodb_state.write();
656 let arn = format!(
657 "arn:aws:dynamodb:{}:{}:table/{}",
658 state.region, state.account_id, table_name
659 );
660
661 let stream_arn = if stream_enabled {
662 Some(format!(
663 "{}/stream/{}",
664 arn,
665 Utc::now().format("%Y-%m-%dT%H:%M:%S.%3f")
666 ))
667 } else {
668 None
669 };
670
671 let table = DynamoTable {
672 name: table_name.to_string(),
673 arn: arn.clone(),
674 table_id: Uuid::new_v4().to_string().replace('-', ""),
675 key_schema,
676 attribute_definitions,
677 provisioned_throughput,
678 items: Vec::new(),
679 gsi: Vec::new(),
680 lsi: Vec::new(),
681 tags: HashMap::new(),
682 created_at: Utc::now(),
683 status: "ACTIVE".to_string(),
684 item_count: 0,
685 size_bytes: 0,
686 billing_mode,
687 ttl_attribute: None,
688 ttl_enabled: false,
689 resource_policy: None,
690 pitr_enabled: false,
691 kinesis_destinations: Vec::new(),
692 contributor_insights_status: "DISABLED".to_string(),
693 contributor_insights_counters: HashMap::new(),
694 stream_enabled,
695 stream_view_type,
696 stream_arn,
697 stream_records: Arc::new(RwLock::new(Vec::new())),
698 sse_type: None,
699 sse_kms_key_arn: None,
700 deletion_protection_enabled,
701 };
702
703 state.tables.insert(table_name.to_string(), table);
704 Ok(arn)
705 }
706
707 fn delete_dynamodb_table(&self, physical_id: &str) -> Result<(), String> {
708 let mut state = self.dynamodb_state.write();
709 let table_name = state
711 .tables
712 .iter()
713 .find(|(_, t)| t.arn == physical_id)
714 .map(|(name, _)| name.clone());
715 if let Some(name) = table_name {
716 state.tables.remove(&name);
717 }
718 Ok(())
719 }
720
721 fn create_log_group(&self, resource: &ResourceDefinition) -> Result<String, String> {
724 let props = &resource.properties;
725 let log_group_name = props
726 .get("LogGroupName")
727 .and_then(|v| v.as_str())
728 .unwrap_or(&resource.logical_id);
729
730 let retention_in_days = props
731 .get("RetentionInDays")
732 .and_then(|v| v.as_i64())
733 .map(|v| v as i32);
734
735 let mut state = self.logs_state.write();
736 let arn = format!(
737 "arn:aws:logs:{}:{}:log-group:{}:*",
738 state.region, state.account_id, log_group_name
739 );
740
741 let log_group = fakecloud_logs::state::LogGroup {
742 name: log_group_name.to_string(),
743 arn: arn.clone(),
744 creation_time: Utc::now().timestamp_millis(),
745 retention_in_days,
746 kms_key_id: None,
747 stored_bytes: 0,
748 log_streams: HashMap::new(),
749 tags: HashMap::new(),
750 subscription_filters: Vec::new(),
751 data_protection_policy: None,
752 index_policies: Vec::new(),
753 transformer: None,
754 deletion_protection: false,
755 log_group_class: Some("STANDARD".to_string()),
756 };
757
758 state
759 .log_groups
760 .insert(log_group_name.to_string(), log_group);
761 Ok(arn)
762 }
763
764 fn delete_log_group(&self, physical_id: &str) -> Result<(), String> {
765 let mut state = self.logs_state.write();
766 let name = state
768 .log_groups
769 .iter()
770 .find(|(_, g)| g.arn == physical_id)
771 .map(|(name, _)| name.clone());
772 if let Some(name) = name {
773 state.log_groups.remove(&name);
774 }
775 Ok(())
776 }
777
778 fn invoke_lambda_sync(&self, function_arn: &str, payload: &str) -> Result<(), String> {
782 let delivery = self.delivery.clone();
783 let function_arn = function_arn.to_string();
784 let payload = payload.to_string();
785 std::thread::scope(|s| {
786 s.spawn(|| {
787 let rt = tokio::runtime::Builder::new_current_thread()
788 .enable_all()
789 .build()
790 .map_err(|e| format!("Failed to create runtime: {e}"))?;
791 rt.block_on(async {
792 match delivery.invoke_lambda(&function_arn, &payload).await {
793 Some(Ok(_)) => {
794 tracing::info!(
795 "Custom resource Lambda {} invoked successfully",
796 function_arn
797 );
798 Ok(())
799 }
800 Some(Err(e)) => {
801 tracing::warn!(
802 "Custom resource Lambda {} invocation failed: {e}",
803 function_arn
804 );
805 Err(format!("Lambda invocation failed: {e}"))
806 }
807 None => {
808 tracing::warn!(
809 "No Lambda delivery configured; skipping custom resource invocation for {}",
810 function_arn
811 );
812 Ok(())
813 }
814 }
815 })
816 })
817 .join()
818 .map_err(|_| "Lambda invocation thread panicked".to_string())?
819 })
820 }
821
822 fn create_custom_resource(&self, resource: &ResourceDefinition) -> Result<String, String> {
823 let props = &resource.properties;
824 let service_token = props
825 .get("ServiceToken")
826 .and_then(|v| v.as_str())
827 .ok_or("Custom resource requires ServiceToken property")?;
828
829 let request_id = Uuid::new_v4().to_string();
830
831 let event = serde_json::json!({
833 "RequestType": "Create",
834 "ServiceToken": service_token,
835 "StackId": self.stack_id,
836 "RequestId": request_id,
837 "ResourceType": resource.resource_type,
838 "LogicalResourceId": resource.logical_id,
839 "ResourceProperties": props,
840 });
841
842 let payload = serde_json::to_string(&event).map_err(|e| e.to_string())?;
843 self.invoke_lambda_sync(service_token, &payload)?;
844
845 let physical_id = format!("{}-{}", resource.logical_id, &request_id[..8]);
848 Ok(physical_id)
849 }
850
851 fn delete_custom_resource(&self, resource: &StackResource) -> Result<(), String> {
852 let service_token = match &resource.service_token {
853 Some(token) => token.clone(),
854 None => {
855 return Ok(());
857 }
858 };
859
860 let request_id = Uuid::new_v4().to_string();
861
862 let event = serde_json::json!({
863 "RequestType": "Delete",
864 "ServiceToken": service_token,
865 "StackId": self.stack_id,
866 "RequestId": request_id,
867 "ResourceType": resource.resource_type,
868 "LogicalResourceId": resource.logical_id,
869 "PhysicalResourceId": resource.physical_id,
870 });
871
872 let payload = serde_json::to_string(&event).map_err(|e| e.to_string())?;
873
874 if let Err(e) = self.invoke_lambda_sync(&service_token, &payload) {
876 tracing::warn!(
877 "Custom resource delete Lambda invocation failed for {}: {e}",
878 resource.logical_id
879 );
880 }
881 Ok(())
882 }
883}
884
885#[cfg(test)]
886mod tests {
887 use super::*;
888 use parking_lot::RwLock;
889
890 fn make_provisioner() -> ResourceProvisioner {
891 ResourceProvisioner {
892 sqs_state: Arc::new(RwLock::new(fakecloud_sqs::state::SqsState::new(
893 "123456789012",
894 "us-east-1",
895 "http://localhost:4566",
896 ))),
897 sns_state: Arc::new(RwLock::new(fakecloud_sns::state::SnsState::new(
898 "123456789012",
899 "us-east-1",
900 "http://localhost:4566",
901 ))),
902 ssm_state: Arc::new(RwLock::new(fakecloud_ssm::state::SsmState::new(
903 "123456789012",
904 "us-east-1",
905 ))),
906 iam_state: Arc::new(RwLock::new(fakecloud_iam::state::IamState::new(
907 "123456789012",
908 ))),
909 s3_state: Arc::new(RwLock::new(fakecloud_s3::state::S3State::new(
910 "123456789012",
911 "us-east-1",
912 ))),
913 eventbridge_state: Arc::new(RwLock::new(
914 fakecloud_eventbridge::state::EventBridgeState::new("123456789012", "us-east-1"),
915 )),
916 dynamodb_state: Arc::new(RwLock::new(fakecloud_dynamodb::state::DynamoDbState::new(
917 "123456789012",
918 "us-east-1",
919 ))),
920 logs_state: Arc::new(RwLock::new(fakecloud_logs::state::LogsState::new(
921 "123456789012",
922 "us-east-1",
923 ))),
924 delivery: Arc::new(DeliveryBus::new()),
925 account_id: "123456789012".to_string(),
926 region: "us-east-1".to_string(),
927 stack_id: "arn:aws:cloudformation:us-east-1:123456789012:stack/test/00000000-0000-0000-0000-000000000000".to_string(),
928 }
929 }
930
931 fn make_resource(
932 resource_type: &str,
933 logical_id: &str,
934 props: serde_json::Value,
935 ) -> ResourceDefinition {
936 ResourceDefinition {
937 logical_id: logical_id.to_string(),
938 resource_type: resource_type.to_string(),
939 properties: props,
940 }
941 }
942
943 #[test]
944 fn sns_subscription_rejects_nonexistent_topic() {
945 let prov = make_provisioner();
946 let resource = make_resource(
947 "AWS::SNS::Subscription",
948 "MySub",
949 serde_json::json!({
950 "TopicArn": "arn:aws:sns:us-east-1:123456789012:NonExistent",
951 "Protocol": "sqs",
952 "Endpoint": "arn:aws:sqs:us-east-1:123456789012:my-queue"
953 }),
954 );
955 let result = prov.create_resource(&resource);
956 assert!(result.is_err());
957 assert!(result.unwrap_err().contains("does not exist"));
958 }
959
960 #[test]
961 fn sns_subscription_succeeds_when_topic_exists() {
962 let prov = make_provisioner();
963 let topic = make_resource(
965 "AWS::SNS::Topic",
966 "MyTopic",
967 serde_json::json!({ "TopicName": "my-topic" }),
968 );
969 let topic_result = prov.create_resource(&topic);
970 assert!(topic_result.is_ok());
971 let topic_arn = topic_result.unwrap().physical_id;
972
973 let sub = make_resource(
975 "AWS::SNS::Subscription",
976 "MySub",
977 serde_json::json!({
978 "TopicArn": topic_arn,
979 "Protocol": "sqs",
980 "Endpoint": "arn:aws:sqs:us-east-1:123456789012:my-queue"
981 }),
982 );
983 let result = prov.create_resource(&sub);
984 assert!(result.is_ok());
985 }
986
987 #[test]
988 fn eventbridge_rule_arn_default_bus_omits_bus_name() {
989 let prov = make_provisioner();
990 let resource = make_resource(
991 "AWS::Events::Rule",
992 "MyRule",
993 serde_json::json!({
994 "Name": "my-rule",
995 "ScheduleExpression": "rate(1 hour)"
996 }),
997 );
998 let result = prov.create_resource(&resource).unwrap();
999 assert_eq!(
1001 result.physical_id,
1002 "arn:aws:events:us-east-1:123456789012:rule/my-rule"
1003 );
1004 assert!(!result.physical_id.contains("rule/default/"));
1005 }
1006
1007 #[test]
1008 fn eventbridge_rule_arn_custom_bus_includes_bus_name() {
1009 let prov = make_provisioner();
1010 {
1012 let mut state = prov.eventbridge_state.write();
1013 state.buses.insert(
1014 "custom-bus".to_string(),
1015 fakecloud_eventbridge::state::EventBus {
1016 name: "custom-bus".to_string(),
1017 arn: "arn:aws:events:us-east-1:123456789012:event-bus/custom-bus".to_string(),
1018 policy: None,
1019 creation_time: Utc::now(),
1020 last_modified_time: Utc::now(),
1021 description: None,
1022 kms_key_identifier: None,
1023 dead_letter_config: None,
1024 tags: HashMap::new(),
1025 },
1026 );
1027 }
1028 let resource = make_resource(
1029 "AWS::Events::Rule",
1030 "MyRule",
1031 serde_json::json!({
1032 "Name": "my-rule",
1033 "EventBusName": "custom-bus",
1034 "ScheduleExpression": "rate(1 hour)"
1035 }),
1036 );
1037 let result = prov.create_resource(&resource).unwrap();
1038 assert_eq!(
1039 result.physical_id,
1040 "arn:aws:events:us-east-1:123456789012:rule/custom-bus/my-rule"
1041 );
1042 }
1043
1044 #[test]
1045 fn eventbridge_rule_rejects_nonexistent_bus() {
1046 let prov = make_provisioner();
1047 let resource = make_resource(
1048 "AWS::Events::Rule",
1049 "MyRule",
1050 serde_json::json!({
1051 "Name": "my-rule",
1052 "EventBusName": "nonexistent-bus",
1053 "ScheduleExpression": "rate(1 hour)"
1054 }),
1055 );
1056 let result = prov.create_resource(&resource);
1057 assert!(result.is_err());
1058 assert!(result.unwrap_err().contains("does not exist"));
1059 }
1060
1061 #[test]
1062 fn custom_resource_requires_service_token() {
1063 let prov = make_provisioner();
1064 let resource = make_resource(
1065 "Custom::MyResource",
1066 "MyCustom",
1067 serde_json::json!({
1068 "Foo": "bar"
1069 }),
1070 );
1071 let result = prov.create_resource(&resource);
1072 assert!(result.is_err());
1073 assert!(
1074 result.unwrap_err().contains("ServiceToken"),
1075 "Should require ServiceToken property"
1076 );
1077 }
1078
1079 #[test]
1080 fn custom_resource_succeeds_without_lambda_delivery() {
1081 let prov = make_provisioner();
1084 let resource = make_resource(
1085 "Custom::MyResource",
1086 "MyCustom",
1087 serde_json::json!({
1088 "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:my-func",
1089 "Foo": "bar"
1090 }),
1091 );
1092 let result = prov.create_resource(&resource);
1093 assert!(result.is_ok());
1094 let sr = result.unwrap();
1095 assert_eq!(sr.logical_id, "MyCustom");
1096 assert_eq!(sr.resource_type, "Custom::MyResource");
1097 assert!(sr.physical_id.starts_with("MyCustom-"));
1098 }
1099
1100 #[test]
1101 fn cloudformation_custom_resource_type_succeeds() {
1102 let prov = make_provisioner();
1103 let resource = make_resource(
1104 "AWS::CloudFormation::CustomResource",
1105 "MyCustom2",
1106 serde_json::json!({
1107 "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:my-func",
1108 "Key": "value"
1109 }),
1110 );
1111 let result = prov.create_resource(&resource);
1112 assert!(result.is_ok());
1113 let sr = result.unwrap();
1114 assert_eq!(sr.resource_type, "AWS::CloudFormation::CustomResource");
1115 }
1116}