1use chrono::Utc;
2use parking_lot::RwLock;
3use std::collections::HashMap;
4use std::sync::Arc;
5use uuid::Uuid;
6
7use fakecloud_core::delivery::DeliveryBus;
8use fakecloud_dynamodb::state::{
9 AttributeDefinition, DynamoTable, KeySchemaElement, OnDemandThroughput, ProvisionedThroughput,
10 SharedDynamoDbState,
11};
12use fakecloud_eventbridge::state::{EventRule, SharedEventBridgeState};
13use fakecloud_iam::state::{IamPolicy, IamRole, PolicyVersion, SharedIamState};
14use fakecloud_logs::state::SharedLogsState;
15use fakecloud_s3::state::{S3Bucket, SharedS3State};
16use fakecloud_sns::state::{SharedSnsState, SnsSubscription, SnsTopic};
17use fakecloud_sqs::state::{SharedSqsState, SqsQueue};
18use fakecloud_ssm::state::{SharedSsmState, SsmParameter};
19
20use crate::state::StackResource;
21use crate::template::ResourceDefinition;
22
23pub struct ResourceProvisioner {
25 pub sqs_state: SharedSqsState,
26 pub sns_state: SharedSnsState,
27 pub ssm_state: SharedSsmState,
28 pub iam_state: SharedIamState,
29 pub s3_state: SharedS3State,
30 pub eventbridge_state: SharedEventBridgeState,
31 pub dynamodb_state: SharedDynamoDbState,
32 pub logs_state: SharedLogsState,
33 pub delivery: Arc<DeliveryBus>,
34 pub account_id: String,
35 pub region: String,
36 pub stack_id: String,
37}
38
39impl ResourceProvisioner {
40 pub fn create_resource(&self, resource: &ResourceDefinition) -> Result<StackResource, String> {
42 let result = match resource.resource_type.as_str() {
43 "AWS::SQS::Queue" => self.create_sqs_queue(resource),
44 "AWS::SNS::Topic" => self.create_sns_topic(resource),
45 "AWS::SNS::Subscription" => self.create_sns_subscription(resource),
46 "AWS::SSM::Parameter" => self.create_ssm_parameter(resource),
47 "AWS::IAM::Role" => self.create_iam_role(resource),
48 "AWS::IAM::Policy" => self.create_iam_policy(resource),
49 "AWS::S3::Bucket" => self.create_s3_bucket(resource),
50 "AWS::Events::Rule" => self.create_eventbridge_rule(resource),
51 "AWS::DynamoDB::Table" => self.create_dynamodb_table(resource),
52 "AWS::Logs::LogGroup" => self.create_log_group(resource),
53 t if t.starts_with("Custom::") || t == "AWS::CloudFormation::CustomResource" => {
54 self.create_custom_resource(resource)
55 }
56 other => Err(format!("Unsupported resource type: {other}")),
57 };
58
59 let is_custom = resource.resource_type.starts_with("Custom::")
60 || resource.resource_type == "AWS::CloudFormation::CustomResource";
61 let service_token = if is_custom {
62 resource
63 .properties
64 .get("ServiceToken")
65 .and_then(|v| v.as_str())
66 .map(|s| s.to_string())
67 } else {
68 None
69 };
70
71 result.map(|physical_id| StackResource {
72 logical_id: resource.logical_id.clone(),
73 physical_id,
74 resource_type: resource.resource_type.clone(),
75 status: "CREATE_COMPLETE".to_string(),
76 service_token,
77 })
78 }
79
80 pub fn delete_resource(&self, resource: &StackResource) -> Result<(), String> {
82 match resource.resource_type.as_str() {
83 "AWS::SQS::Queue" => self.delete_sqs_queue(&resource.physical_id),
84 "AWS::SNS::Topic" => self.delete_sns_topic(&resource.physical_id),
85 "AWS::SNS::Subscription" => self.delete_sns_subscription(&resource.physical_id),
86 "AWS::SSM::Parameter" => self.delete_ssm_parameter(&resource.physical_id),
87 "AWS::IAM::Role" => self.delete_iam_role(&resource.physical_id),
88 "AWS::IAM::Policy" => self.delete_iam_policy(&resource.physical_id),
89 "AWS::S3::Bucket" => self.delete_s3_bucket(&resource.physical_id),
90 "AWS::Events::Rule" => self.delete_eventbridge_rule(&resource.physical_id),
91 "AWS::DynamoDB::Table" => self.delete_dynamodb_table(&resource.physical_id),
92 "AWS::Logs::LogGroup" => self.delete_log_group(&resource.physical_id),
93 t if t.starts_with("Custom::") || t == "AWS::CloudFormation::CustomResource" => {
94 self.delete_custom_resource(resource)
95 }
96 other => Err(format!("Unsupported resource type: {other}")),
97 }
98 }
99
100 fn create_sqs_queue(&self, resource: &ResourceDefinition) -> Result<String, String> {
103 let props = &resource.properties;
104 let queue_name = props
105 .get("QueueName")
106 .and_then(|v| v.as_str())
107 .unwrap_or(&resource.logical_id);
108
109 let mut __sqs_mas = self.sqs_state.write();
110 let state = __sqs_mas.get_or_create(&self.account_id);
111 let queue_url = format!("{}/{}/{}", state.endpoint, state.account_id, queue_name);
112 let arn = format!(
113 "arn:aws:sqs:{}:{}:{}",
114 state.region, state.account_id, queue_name
115 );
116
117 let is_fifo = queue_name.ends_with(".fifo");
118 let mut attributes = HashMap::new();
119 if let Some(obj) = props.as_object() {
120 for (k, v) in obj {
121 if k != "QueueName" {
122 if let Some(s) = v.as_str() {
123 attributes.insert(k.clone(), s.to_string());
124 } else if let Some(n) = v.as_i64() {
125 attributes.insert(k.clone(), n.to_string());
126 }
127 }
128 }
129 }
130
131 let queue = SqsQueue {
132 queue_name: queue_name.to_string(),
133 queue_url: queue_url.clone(),
134 arn,
135 created_at: Utc::now(),
136 messages: std::collections::VecDeque::new(),
137 inflight: Vec::new(),
138 attributes,
139 is_fifo,
140 dedup_cache: HashMap::new(),
141 redrive_policy: None,
142 tags: HashMap::new(),
143 next_sequence_number: 0,
144 permission_labels: Vec::new(),
145 receipt_handle_map: HashMap::new(),
146 };
147
148 state
149 .name_to_url
150 .insert(queue_name.to_string(), queue_url.clone());
151 state.queues.insert(queue_url.clone(), queue);
152
153 Ok(queue_url)
154 }
155
156 fn delete_sqs_queue(&self, physical_id: &str) -> Result<(), String> {
157 let mut __sqs_mas = self.sqs_state.write();
158 let state = __sqs_mas.get_or_create(&self.account_id);
159 if let Some(queue) = state.queues.remove(physical_id) {
160 state.name_to_url.remove(&queue.queue_name);
161 }
162 Ok(())
163 }
164
165 fn create_sns_topic(&self, resource: &ResourceDefinition) -> Result<String, String> {
168 let props = &resource.properties;
169 let topic_name = props
170 .get("TopicName")
171 .and_then(|v| v.as_str())
172 .unwrap_or(&resource.logical_id);
173
174 let mut __sns_mas = self.sns_state.write();
175 let state = __sns_mas.get_or_create(&self.account_id);
176 let topic_arn = format!(
177 "arn:aws:sns:{}:{}:{}",
178 state.region, state.account_id, topic_name
179 );
180
181 let topic = SnsTopic {
182 topic_arn: topic_arn.clone(),
183 name: topic_name.to_string(),
184 attributes: HashMap::new(),
185 tags: Vec::new(),
186 is_fifo: topic_name.ends_with(".fifo"),
187 created_at: Utc::now(),
188 };
189
190 state.topics.insert(topic_arn.clone(), topic);
191 Ok(topic_arn)
192 }
193
194 fn delete_sns_topic(&self, physical_id: &str) -> Result<(), String> {
195 let mut __sns_mas = self.sns_state.write();
196 let state = __sns_mas.get_or_create(&self.account_id);
197 state.topics.remove(physical_id);
198 state
200 .subscriptions
201 .retain(|_, sub| sub.topic_arn != physical_id);
202 Ok(())
203 }
204
205 fn create_sns_subscription(&self, resource: &ResourceDefinition) -> Result<String, String> {
208 let props = &resource.properties;
209 let topic_arn = props
210 .get("TopicArn")
211 .and_then(|v| v.as_str())
212 .ok_or("SNS Subscription requires TopicArn")?;
213 let protocol = props
214 .get("Protocol")
215 .and_then(|v| v.as_str())
216 .ok_or("SNS Subscription requires Protocol")?;
217 let endpoint = props
218 .get("Endpoint")
219 .and_then(|v| v.as_str())
220 .ok_or("SNS Subscription requires Endpoint")?;
221
222 let mut __sns_mas = self.sns_state.write();
223 let state = __sns_mas.get_or_create(&self.account_id);
224
225 if !state.topics.contains_key(topic_arn) {
227 return Err(format!("Topic ARN does not exist: {topic_arn}"));
228 }
229
230 let sub_arn = format!("{}:{}", topic_arn, Uuid::new_v4());
231
232 let subscription = SnsSubscription {
233 subscription_arn: sub_arn.clone(),
234 topic_arn: topic_arn.to_string(),
235 protocol: protocol.to_string(),
236 endpoint: endpoint.to_string(),
237 owner: state.account_id.clone(),
238 attributes: HashMap::new(),
239 confirmed: true,
240 confirmation_token: None,
241 };
242
243 state.subscriptions.insert(sub_arn.clone(), subscription);
244 Ok(sub_arn)
245 }
246
247 fn delete_sns_subscription(&self, physical_id: &str) -> Result<(), String> {
248 let mut __sns_mas = self.sns_state.write();
249 let state = __sns_mas.get_or_create(&self.account_id);
250 state.subscriptions.remove(physical_id);
251 Ok(())
252 }
253
254 fn create_ssm_parameter(&self, resource: &ResourceDefinition) -> Result<String, String> {
257 let props = &resource.properties;
258 let name = props
259 .get("Name")
260 .and_then(|v| v.as_str())
261 .ok_or("SSM Parameter requires Name")?;
262 let value = props
263 .get("Value")
264 .and_then(|v| v.as_str())
265 .ok_or("SSM Parameter requires Value")?;
266 let param_type = props
267 .get("Type")
268 .and_then(|v| v.as_str())
269 .unwrap_or("String");
270
271 let mut accounts = self.ssm_state.write();
272 let state = accounts.get_or_create(&self.account_id);
273 let arn = format!(
274 "arn:aws:ssm:{}:{}:parameter{}",
275 self.region,
276 self.account_id,
277 if name.starts_with('/') {
278 name.to_string()
279 } else {
280 format!("/{name}")
281 }
282 );
283
284 let parameter = SsmParameter {
285 name: name.to_string(),
286 value: value.to_string(),
287 param_type: param_type.to_string(),
288 version: 1,
289 arn: arn.clone(),
290 last_modified: Utc::now(),
291 history: Vec::new(),
292 tags: HashMap::new(),
293 labels: HashMap::new(),
294 description: props
295 .get("Description")
296 .and_then(|v| v.as_str())
297 .map(|s| s.to_string()),
298 allowed_pattern: None,
299 key_id: None,
300 data_type: "text".to_string(),
301 tier: "Standard".to_string(),
302 policies: None,
303 };
304
305 state.parameters.insert(name.to_string(), parameter);
306 Ok(name.to_string())
307 }
308
309 fn delete_ssm_parameter(&self, physical_id: &str) -> Result<(), String> {
310 let mut accounts = self.ssm_state.write();
311 let state = accounts.get_or_create(&self.account_id);
312 state.parameters.remove(physical_id);
313 Ok(())
314 }
315
316 fn create_iam_role(&self, resource: &ResourceDefinition) -> Result<String, String> {
319 let props = &resource.properties;
320 let role_name = props
321 .get("RoleName")
322 .and_then(|v| v.as_str())
323 .unwrap_or(&resource.logical_id);
324
325 let assume_role_policy = props
326 .get("AssumeRolePolicyDocument")
327 .map(|v| {
328 if v.is_string() {
329 v.as_str().unwrap().to_string()
330 } else {
331 serde_json::to_string(v).unwrap_or_default()
332 }
333 })
334 .unwrap_or_default();
335
336 let path = props.get("Path").and_then(|v| v.as_str()).unwrap_or("/");
337
338 let mut accounts = self.iam_state.write();
339 let state = accounts.get_or_create(&self.account_id);
340 let role_id = format!(
341 "FKIA{}",
342 &Uuid::new_v4().to_string().replace('-', "").to_uppercase()[..16]
343 );
344 let arn = format!(
345 "arn:aws:iam::{}:role{}{}",
346 state.account_id,
347 if path == "/" { "/" } else { path },
348 role_name
349 );
350
351 let role = IamRole {
352 role_name: role_name.to_string(),
353 role_id,
354 arn: arn.clone(),
355 path: path.to_string(),
356 assume_role_policy_document: assume_role_policy,
357 created_at: Utc::now(),
358 description: props
359 .get("Description")
360 .and_then(|v| v.as_str())
361 .map(|s| s.to_string()),
362 max_session_duration: 3600,
363 tags: Vec::new(),
364 permissions_boundary: None,
365 };
366
367 state.roles.insert(role_name.to_string(), role);
368 Ok(arn)
369 }
370
371 fn delete_iam_role(&self, physical_id: &str) -> Result<(), String> {
372 let mut accounts = self.iam_state.write();
373 let state = accounts.get_or_create(&self.account_id);
374 let role_name = state
376 .roles
377 .iter()
378 .find(|(_, r)| r.arn == physical_id)
379 .map(|(name, _)| name.clone());
380 if let Some(name) = role_name {
381 state.roles.remove(&name);
382 state.role_policies.remove(&name);
383 state.role_inline_policies.remove(&name);
384 }
385 Ok(())
386 }
387
388 fn create_iam_policy(&self, resource: &ResourceDefinition) -> Result<String, String> {
391 let props = &resource.properties;
392 let policy_name = props
393 .get("PolicyName")
394 .and_then(|v| v.as_str())
395 .unwrap_or(&resource.logical_id);
396
397 let policy_document = props
398 .get("PolicyDocument")
399 .map(|v| {
400 if v.is_string() {
401 v.as_str().unwrap().to_string()
402 } else {
403 serde_json::to_string(v).unwrap_or_default()
404 }
405 })
406 .unwrap_or_default();
407
408 let path = props.get("Path").and_then(|v| v.as_str()).unwrap_or("/");
409
410 let mut accounts = self.iam_state.write();
411 let state = accounts.get_or_create(&self.account_id);
412 let policy_id = format!(
413 "FSIA{}",
414 &Uuid::new_v4().to_string().replace('-', "").to_uppercase()[..16]
415 );
416 let arn = format!(
417 "arn:aws:iam::{}:policy{}{}",
418 state.account_id,
419 if path == "/" { "/" } else { path },
420 policy_name
421 );
422
423 let now = Utc::now();
424 let policy = IamPolicy {
425 policy_name: policy_name.to_string(),
426 policy_id,
427 arn: arn.clone(),
428 path: path.to_string(),
429 description: props
430 .get("Description")
431 .and_then(|v| v.as_str())
432 .unwrap_or("")
433 .to_string(),
434 created_at: now,
435 tags: Vec::new(),
436 default_version_id: "v1".to_string(),
437 versions: vec![PolicyVersion {
438 version_id: "v1".to_string(),
439 document: policy_document,
440 is_default: true,
441 created_at: now,
442 }],
443 next_version_num: 2,
444 attachment_count: 0,
445 };
446
447 state.policies.insert(arn.clone(), policy);
448 Ok(arn)
449 }
450
451 fn delete_iam_policy(&self, physical_id: &str) -> Result<(), String> {
452 let mut accounts = self.iam_state.write();
453 let state = accounts.get_or_create(&self.account_id);
454 state.policies.remove(physical_id);
455 Ok(())
456 }
457
458 fn create_s3_bucket(&self, resource: &ResourceDefinition) -> Result<String, String> {
461 let props = &resource.properties;
462 let bucket_name = props
463 .get("BucketName")
464 .and_then(|v| v.as_str())
465 .unwrap_or(&resource.logical_id);
466
467 let mut __s3_mas = self.s3_state.write();
468 let state = __s3_mas.get_or_create(&self.account_id);
469 let bucket = S3Bucket::new(bucket_name, &state.region, &state.account_id);
470 state.buckets.insert(bucket_name.to_string(), bucket);
471 Ok(bucket_name.to_string())
472 }
473
474 fn delete_s3_bucket(&self, physical_id: &str) -> Result<(), String> {
475 let mut __s3_mas = self.s3_state.write();
476 let state = __s3_mas.get_or_create(&self.account_id);
477 state.buckets.remove(physical_id);
478 Ok(())
479 }
480
481 fn create_eventbridge_rule(&self, resource: &ResourceDefinition) -> Result<String, String> {
484 let props = &resource.properties;
485 let rule_name = props
486 .get("Name")
487 .and_then(|v| v.as_str())
488 .unwrap_or(&resource.logical_id);
489 let event_bus_name = props
490 .get("EventBusName")
491 .and_then(|v| v.as_str())
492 .unwrap_or("default");
493
494 let mut eb_accounts = self.eventbridge_state.write();
495 let state = eb_accounts.get_or_create(&self.account_id);
496
497 if !state.buses.contains_key(event_bus_name) {
499 return Err(format!("Event bus does not exist: {event_bus_name}"));
500 }
501
502 let arn = if event_bus_name == "default" {
503 format!(
504 "arn:aws:events:{}:{}:rule/{}",
505 state.region, state.account_id, rule_name
506 )
507 } else {
508 format!(
509 "arn:aws:events:{}:{}:rule/{}/{}",
510 state.region, state.account_id, event_bus_name, rule_name
511 )
512 };
513
514 let rule = EventRule {
515 name: rule_name.to_string(),
516 arn: arn.clone(),
517 event_bus_name: event_bus_name.to_string(),
518 event_pattern: props.get("EventPattern").map(|v| {
519 if v.is_string() {
520 v.as_str().unwrap().to_string()
521 } else {
522 serde_json::to_string(v).unwrap_or_default()
523 }
524 }),
525 schedule_expression: props
526 .get("ScheduleExpression")
527 .and_then(|v| v.as_str())
528 .map(|s| s.to_string()),
529 state: props
530 .get("State")
531 .and_then(|v| v.as_str())
532 .unwrap_or("ENABLED")
533 .to_string(),
534 description: props
535 .get("Description")
536 .and_then(|v| v.as_str())
537 .map(|s| s.to_string()),
538 role_arn: props
539 .get("RoleArn")
540 .and_then(|v| v.as_str())
541 .map(|s| s.to_string()),
542 managed_by: None,
543 created_by: None,
544 targets: Vec::new(),
545 tags: HashMap::new(),
546 last_fired: None,
547 };
548
549 state
550 .rules
551 .insert((event_bus_name.to_string(), rule_name.to_string()), rule);
552 Ok(arn)
553 }
554
555 fn delete_eventbridge_rule(&self, physical_id: &str) -> Result<(), String> {
556 let mut eb_accounts = self.eventbridge_state.write();
557 let state = eb_accounts.default_mut();
558 let key = state
560 .rules
561 .iter()
562 .find(|(_, r)| r.arn == physical_id)
563 .map(|(k, _)| k.clone());
564 if let Some(k) = key {
565 state.rules.remove(&k);
566 }
567 Ok(())
568 }
569
570 fn create_dynamodb_table(&self, resource: &ResourceDefinition) -> Result<String, String> {
573 let props = &resource.properties;
574 let table_name = props
575 .get("TableName")
576 .and_then(|v| v.as_str())
577 .unwrap_or(&resource.logical_id);
578
579 let mut key_schema = Vec::new();
580 if let Some(ks) = props.get("KeySchema").and_then(|v| v.as_array()) {
581 for item in ks {
582 let attr_name = item
583 .get("AttributeName")
584 .and_then(|v| v.as_str())
585 .unwrap_or("")
586 .to_string();
587 let key_type = item
588 .get("KeyType")
589 .and_then(|v| v.as_str())
590 .unwrap_or("HASH")
591 .to_string();
592 key_schema.push(KeySchemaElement {
593 attribute_name: attr_name,
594 key_type,
595 });
596 }
597 }
598
599 let mut attribute_definitions = Vec::new();
600 if let Some(defs) = props.get("AttributeDefinitions").and_then(|v| v.as_array()) {
601 for item in defs {
602 let attr_name = item
603 .get("AttributeName")
604 .and_then(|v| v.as_str())
605 .unwrap_or("")
606 .to_string();
607 let attr_type = item
608 .get("AttributeType")
609 .and_then(|v| v.as_str())
610 .unwrap_or("S")
611 .to_string();
612 attribute_definitions.push(AttributeDefinition {
613 attribute_name: attr_name,
614 attribute_type: attr_type,
615 });
616 }
617 }
618
619 let billing_mode = props
620 .get("BillingMode")
621 .and_then(|v| v.as_str())
622 .unwrap_or("PAY_PER_REQUEST")
623 .to_string();
624
625 let provisioned_throughput = if billing_mode == "PROVISIONED" {
626 if let Some(pt) = props.get("ProvisionedThroughput") {
627 ProvisionedThroughput {
628 read_capacity_units: pt
629 .get("ReadCapacityUnits")
630 .and_then(|v| v.as_i64())
631 .unwrap_or(5),
632 write_capacity_units: pt
633 .get("WriteCapacityUnits")
634 .and_then(|v| v.as_i64())
635 .unwrap_or(5),
636 }
637 } else {
638 ProvisionedThroughput {
639 read_capacity_units: 5,
640 write_capacity_units: 5,
641 }
642 }
643 } else {
644 ProvisionedThroughput {
645 read_capacity_units: 0,
646 write_capacity_units: 0,
647 }
648 };
649
650 let (stream_enabled, stream_view_type) =
652 if let Some(stream_spec) = props.get("StreamSpecification") {
653 let view_type = stream_spec
654 .get("StreamViewType")
655 .and_then(|v| v.as_str())
656 .map(|s| s.to_string());
657 let enabled = stream_spec
658 .get("StreamEnabled")
659 .and_then(|v| v.as_bool().or_else(|| v.as_str().map(|s| s == "true")))
660 .unwrap_or(view_type.is_some());
662 (enabled, view_type)
663 } else {
664 (false, None)
665 };
666
667 let deletion_protection_enabled = props
668 .get("DeletionProtectionEnabled")
669 .and_then(|v| v.as_bool().or_else(|| v.as_str().map(|s| s == "true")))
670 .unwrap_or(false);
671
672 let on_demand_throughput = props
673 .get("OnDemandThroughput")
674 .map(|odt| OnDemandThroughput {
675 max_read_request_units: odt
676 .get("MaxReadRequestUnits")
677 .and_then(|v| v.as_i64())
678 .unwrap_or(-1),
679 max_write_request_units: odt
680 .get("MaxWriteRequestUnits")
681 .and_then(|v| v.as_i64())
682 .unwrap_or(-1),
683 });
684
685 let mut __ddb_mas = self.dynamodb_state.write();
686 let state = __ddb_mas.get_or_create(&self.account_id);
687 let arn = format!(
688 "arn:aws:dynamodb:{}:{}:table/{}",
689 state.region, state.account_id, table_name
690 );
691
692 let stream_arn = if stream_enabled {
693 Some(format!(
694 "{}/stream/{}",
695 arn,
696 Utc::now().format("%Y-%m-%dT%H:%M:%S.%3f")
697 ))
698 } else {
699 None
700 };
701
702 let table = DynamoTable {
703 name: table_name.to_string(),
704 arn: arn.clone(),
705 table_id: Uuid::new_v4().to_string().replace('-', ""),
706 key_schema,
707 attribute_definitions,
708 provisioned_throughput,
709 items: Vec::new(),
710 gsi: Vec::new(),
711 lsi: Vec::new(),
712 tags: HashMap::new(),
713 created_at: Utc::now(),
714 status: "ACTIVE".to_string(),
715 item_count: 0,
716 size_bytes: 0,
717 billing_mode,
718 ttl_attribute: None,
719 ttl_enabled: false,
720 resource_policy: None,
721 pitr_enabled: false,
722 kinesis_destinations: Vec::new(),
723 contributor_insights_status: "DISABLED".to_string(),
724 contributor_insights_counters: HashMap::new(),
725 stream_enabled,
726 stream_view_type,
727 stream_arn,
728 stream_records: Arc::new(RwLock::new(Vec::new())),
729 sse_type: None,
730 sse_kms_key_arn: None,
731 deletion_protection_enabled,
732 on_demand_throughput,
733 };
734
735 state.tables.insert(table_name.to_string(), table);
736 Ok(arn)
737 }
738
739 fn delete_dynamodb_table(&self, physical_id: &str) -> Result<(), String> {
740 let mut __ddb_mas = self.dynamodb_state.write();
741 let state = __ddb_mas.get_or_create(&self.account_id);
742 let table_name = state
744 .tables
745 .iter()
746 .find(|(_, t)| t.arn == physical_id)
747 .map(|(name, _)| name.clone());
748 if let Some(name) = table_name {
749 state.tables.remove(&name);
750 }
751 Ok(())
752 }
753
754 fn create_log_group(&self, resource: &ResourceDefinition) -> Result<String, String> {
757 let props = &resource.properties;
758 let log_group_name = props
759 .get("LogGroupName")
760 .and_then(|v| v.as_str())
761 .unwrap_or(&resource.logical_id);
762
763 let retention_in_days = props
764 .get("RetentionInDays")
765 .and_then(|v| v.as_i64())
766 .map(|v| v as i32);
767
768 let mut logs_accounts = self.logs_state.write();
769 let state = logs_accounts.get_or_create(&self.account_id);
770 let arn = format!(
771 "arn:aws:logs:{}:{}:log-group:{}:*",
772 state.region, state.account_id, log_group_name
773 );
774
775 let log_group = fakecloud_logs::state::LogGroup {
776 name: log_group_name.to_string(),
777 arn: arn.clone(),
778 creation_time: Utc::now().timestamp_millis(),
779 retention_in_days,
780 kms_key_id: None,
781 stored_bytes: 0,
782 log_streams: HashMap::new(),
783 tags: HashMap::new(),
784 subscription_filters: Vec::new(),
785 data_protection_policy: None,
786 index_policies: Vec::new(),
787 transformer: None,
788 deletion_protection: false,
789 log_group_class: Some("STANDARD".to_string()),
790 };
791
792 state
793 .log_groups
794 .insert(log_group_name.to_string(), log_group);
795 Ok(arn)
796 }
797
798 fn delete_log_group(&self, physical_id: &str) -> Result<(), String> {
799 let mut logs_accounts = self.logs_state.write();
800 let state = logs_accounts.default_mut();
801 let name = state
803 .log_groups
804 .iter()
805 .find(|(_, g)| g.arn == physical_id)
806 .map(|(name, _)| name.clone());
807 if let Some(name) = name {
808 state.log_groups.remove(&name);
809 }
810 Ok(())
811 }
812
813 fn invoke_lambda_sync(&self, function_arn: &str, payload: &str) -> Result<(), String> {
817 let delivery = self.delivery.clone();
818 let function_arn = function_arn.to_string();
819 let payload = payload.to_string();
820 std::thread::scope(|s| {
821 s.spawn(|| {
822 let rt = tokio::runtime::Builder::new_current_thread()
823 .enable_all()
824 .build()
825 .map_err(|e| format!("Failed to create runtime: {e}"))?;
826 rt.block_on(async {
827 match delivery.invoke_lambda(&function_arn, &payload).await {
828 Some(Ok(_)) => {
829 tracing::info!(
830 "Custom resource Lambda {} invoked successfully",
831 function_arn
832 );
833 Ok(())
834 }
835 Some(Err(e)) => {
836 tracing::warn!(
837 "Custom resource Lambda {} invocation failed: {e}",
838 function_arn
839 );
840 Err(format!("Lambda invocation failed: {e}"))
841 }
842 None => {
843 tracing::warn!(
844 "No Lambda delivery configured; skipping custom resource invocation for {}",
845 function_arn
846 );
847 Ok(())
848 }
849 }
850 })
851 })
852 .join()
853 .map_err(|_| "Lambda invocation thread panicked".to_string())?
854 })
855 }
856
857 fn create_custom_resource(&self, resource: &ResourceDefinition) -> Result<String, String> {
858 let props = &resource.properties;
859 let service_token = props
860 .get("ServiceToken")
861 .and_then(|v| v.as_str())
862 .ok_or("Custom resource requires ServiceToken property")?;
863
864 let request_id = Uuid::new_v4().to_string();
865
866 let event = serde_json::json!({
868 "RequestType": "Create",
869 "ServiceToken": service_token,
870 "StackId": self.stack_id,
871 "RequestId": request_id,
872 "ResourceType": resource.resource_type,
873 "LogicalResourceId": resource.logical_id,
874 "ResourceProperties": props,
875 });
876
877 let payload = serde_json::to_string(&event).map_err(|e| e.to_string())?;
878 self.invoke_lambda_sync(service_token, &payload)?;
879
880 let physical_id = format!("{}-{}", resource.logical_id, &request_id[..8]);
883 Ok(physical_id)
884 }
885
886 fn delete_custom_resource(&self, resource: &StackResource) -> Result<(), String> {
887 let service_token = match &resource.service_token {
888 Some(token) => token.clone(),
889 None => {
890 return Ok(());
892 }
893 };
894
895 let request_id = Uuid::new_v4().to_string();
896
897 let event = serde_json::json!({
898 "RequestType": "Delete",
899 "ServiceToken": service_token,
900 "StackId": self.stack_id,
901 "RequestId": request_id,
902 "ResourceType": resource.resource_type,
903 "LogicalResourceId": resource.logical_id,
904 "PhysicalResourceId": resource.physical_id,
905 });
906
907 let payload = serde_json::to_string(&event).map_err(|e| e.to_string())?;
908
909 if let Err(e) = self.invoke_lambda_sync(&service_token, &payload) {
911 tracing::warn!(
912 "Custom resource delete Lambda invocation failed for {}: {e}",
913 resource.logical_id
914 );
915 }
916 Ok(())
917 }
918}
919
920#[cfg(test)]
921mod tests {
922 use super::*;
923 use parking_lot::RwLock;
924
925 fn make_provisioner() -> ResourceProvisioner {
926 ResourceProvisioner {
927 sqs_state: Arc::new(RwLock::new(
928 fakecloud_core::multi_account::MultiAccountState::new(
929 "123456789012",
930 "us-east-1",
931 "http://localhost:4566",
932 ),
933 )),
934 sns_state: Arc::new(RwLock::new(
935 fakecloud_core::multi_account::MultiAccountState::new(
936 "123456789012",
937 "us-east-1",
938 "http://localhost:4566",
939 ),
940 )),
941 ssm_state: Arc::new(RwLock::new(
942 fakecloud_core::multi_account::MultiAccountState::new(
943 "123456789012",
944 "us-east-1",
945 "http://localhost:4566",
946 ),
947 )),
948 iam_state: Arc::new(RwLock::new(
949 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", "http://localhost:4566"),
950 )),
951 s3_state: Arc::new(RwLock::new(fakecloud_core::multi_account::MultiAccountState::new(
952 "123456789012",
953 "us-east-1", "",
954 ))),
955 eventbridge_state: Arc::new(RwLock::new(
956 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
957 )),
958 dynamodb_state: Arc::new(RwLock::new(fakecloud_core::multi_account::MultiAccountState::new(
959 "123456789012",
960 "us-east-1", "",
961 ))),
962 logs_state: Arc::new(RwLock::new(
963 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
964 )),
965 delivery: Arc::new(DeliveryBus::new()),
966 account_id: "123456789012".to_string(),
967 region: "us-east-1".to_string(),
968 stack_id: "arn:aws:cloudformation:us-east-1:123456789012:stack/test/00000000-0000-0000-0000-000000000000".to_string(),
969 }
970 }
971
972 fn make_resource(
973 resource_type: &str,
974 logical_id: &str,
975 props: serde_json::Value,
976 ) -> ResourceDefinition {
977 ResourceDefinition {
978 logical_id: logical_id.to_string(),
979 resource_type: resource_type.to_string(),
980 properties: props,
981 }
982 }
983
984 #[test]
985 fn sns_subscription_rejects_nonexistent_topic() {
986 let prov = make_provisioner();
987 let resource = make_resource(
988 "AWS::SNS::Subscription",
989 "MySub",
990 serde_json::json!({
991 "TopicArn": "arn:aws:sns:us-east-1:123456789012:NonExistent",
992 "Protocol": "sqs",
993 "Endpoint": "arn:aws:sqs:us-east-1:123456789012:my-queue"
994 }),
995 );
996 let result = prov.create_resource(&resource);
997 assert!(result.is_err());
998 assert!(result.unwrap_err().contains("does not exist"));
999 }
1000
1001 #[test]
1002 fn sns_subscription_succeeds_when_topic_exists() {
1003 let prov = make_provisioner();
1004 let topic = make_resource(
1006 "AWS::SNS::Topic",
1007 "MyTopic",
1008 serde_json::json!({ "TopicName": "my-topic" }),
1009 );
1010 let topic_result = prov.create_resource(&topic);
1011 assert!(topic_result.is_ok());
1012 let topic_arn = topic_result.unwrap().physical_id;
1013
1014 let sub = make_resource(
1016 "AWS::SNS::Subscription",
1017 "MySub",
1018 serde_json::json!({
1019 "TopicArn": topic_arn,
1020 "Protocol": "sqs",
1021 "Endpoint": "arn:aws:sqs:us-east-1:123456789012:my-queue"
1022 }),
1023 );
1024 let result = prov.create_resource(&sub);
1025 assert!(result.is_ok());
1026 }
1027
1028 #[test]
1029 fn eventbridge_rule_arn_default_bus_omits_bus_name() {
1030 let prov = make_provisioner();
1031 let resource = make_resource(
1032 "AWS::Events::Rule",
1033 "MyRule",
1034 serde_json::json!({
1035 "Name": "my-rule",
1036 "ScheduleExpression": "rate(1 hour)"
1037 }),
1038 );
1039 let result = prov.create_resource(&resource).unwrap();
1040 assert_eq!(
1042 result.physical_id,
1043 "arn:aws:events:us-east-1:123456789012:rule/my-rule"
1044 );
1045 assert!(!result.physical_id.contains("rule/default/"));
1046 }
1047
1048 #[test]
1049 fn eventbridge_rule_arn_custom_bus_includes_bus_name() {
1050 let prov = make_provisioner();
1051 {
1053 let mut eb_accounts = prov.eventbridge_state.write();
1054 let state = eb_accounts.default_mut();
1055 state.buses.insert(
1056 "custom-bus".to_string(),
1057 fakecloud_eventbridge::state::EventBus {
1058 name: "custom-bus".to_string(),
1059 arn: "arn:aws:events:us-east-1:123456789012:event-bus/custom-bus".to_string(),
1060 policy: None,
1061 creation_time: Utc::now(),
1062 last_modified_time: Utc::now(),
1063 description: None,
1064 kms_key_identifier: None,
1065 dead_letter_config: None,
1066 tags: HashMap::new(),
1067 },
1068 );
1069 }
1070 let resource = make_resource(
1071 "AWS::Events::Rule",
1072 "MyRule",
1073 serde_json::json!({
1074 "Name": "my-rule",
1075 "EventBusName": "custom-bus",
1076 "ScheduleExpression": "rate(1 hour)"
1077 }),
1078 );
1079 let result = prov.create_resource(&resource).unwrap();
1080 assert_eq!(
1081 result.physical_id,
1082 "arn:aws:events:us-east-1:123456789012:rule/custom-bus/my-rule"
1083 );
1084 }
1085
1086 #[test]
1087 fn eventbridge_rule_rejects_nonexistent_bus() {
1088 let prov = make_provisioner();
1089 let resource = make_resource(
1090 "AWS::Events::Rule",
1091 "MyRule",
1092 serde_json::json!({
1093 "Name": "my-rule",
1094 "EventBusName": "nonexistent-bus",
1095 "ScheduleExpression": "rate(1 hour)"
1096 }),
1097 );
1098 let result = prov.create_resource(&resource);
1099 assert!(result.is_err());
1100 assert!(result.unwrap_err().contains("does not exist"));
1101 }
1102
1103 #[test]
1104 fn custom_resource_requires_service_token() {
1105 let prov = make_provisioner();
1106 let resource = make_resource(
1107 "Custom::MyResource",
1108 "MyCustom",
1109 serde_json::json!({
1110 "Foo": "bar"
1111 }),
1112 );
1113 let result = prov.create_resource(&resource);
1114 assert!(result.is_err());
1115 assert!(
1116 result.unwrap_err().contains("ServiceToken"),
1117 "Should require ServiceToken property"
1118 );
1119 }
1120
1121 #[test]
1122 fn custom_resource_succeeds_without_lambda_delivery() {
1123 let prov = make_provisioner();
1126 let resource = make_resource(
1127 "Custom::MyResource",
1128 "MyCustom",
1129 serde_json::json!({
1130 "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:my-func",
1131 "Foo": "bar"
1132 }),
1133 );
1134 let result = prov.create_resource(&resource);
1135 assert!(result.is_ok());
1136 let sr = result.unwrap();
1137 assert_eq!(sr.logical_id, "MyCustom");
1138 assert_eq!(sr.resource_type, "Custom::MyResource");
1139 assert!(sr.physical_id.starts_with("MyCustom-"));
1140 }
1141
1142 #[test]
1143 fn cloudformation_custom_resource_type_succeeds() {
1144 let prov = make_provisioner();
1145 let resource = make_resource(
1146 "AWS::CloudFormation::CustomResource",
1147 "MyCustom2",
1148 serde_json::json!({
1149 "ServiceToken": "arn:aws:lambda:us-east-1:123456789012:function:my-func",
1150 "Key": "value"
1151 }),
1152 );
1153 let result = prov.create_resource(&resource);
1154 assert!(result.is_ok());
1155 let sr = result.unwrap();
1156 assert_eq!(sr.resource_type, "AWS::CloudFormation::CustomResource");
1157 }
1158
1159 #[test]
1162 fn sqs_queue_create_and_delete() {
1163 let prov = make_provisioner();
1164 let res = make_resource(
1165 "AWS::SQS::Queue",
1166 "MyQ",
1167 serde_json::json!({"QueueName": "my-q"}),
1168 );
1169 let sr = prov.create_resource(&res).unwrap();
1170 assert!(sr.physical_id.contains("my-q"));
1171 assert_eq!(sr.resource_type, "AWS::SQS::Queue");
1172 prov.delete_resource(&sr).unwrap();
1173 }
1174
1175 #[test]
1176 fn sqs_queue_fifo_with_suffix() {
1177 let prov = make_provisioner();
1178 let res = make_resource(
1179 "AWS::SQS::Queue",
1180 "FifoQ",
1181 serde_json::json!({"QueueName": "my-fifo.fifo", "FifoQueue": true}),
1182 );
1183 let sr = prov.create_resource(&res).unwrap();
1184 assert!(sr.physical_id.contains(".fifo"));
1185 }
1186
1187 #[test]
1188 fn sns_topic_create_and_delete() {
1189 let prov = make_provisioner();
1190 let res = make_resource(
1191 "AWS::SNS::Topic",
1192 "MyTopic",
1193 serde_json::json!({"TopicName": "t1"}),
1194 );
1195 let sr = prov.create_resource(&res).unwrap();
1196 assert!(sr.physical_id.contains("t1"));
1197 prov.delete_resource(&sr).unwrap();
1198 }
1199
1200 #[test]
1201 fn ssm_parameter_create_and_delete() {
1202 let prov = make_provisioner();
1203 let res = make_resource(
1204 "AWS::SSM::Parameter",
1205 "MyParam",
1206 serde_json::json!({
1207 "Name": "/my/param",
1208 "Type": "String",
1209 "Value": "v1"
1210 }),
1211 );
1212 let sr = prov.create_resource(&res).unwrap();
1213 assert_eq!(sr.physical_id, "/my/param");
1214 prov.delete_resource(&sr).unwrap();
1215 }
1216
1217 #[test]
1218 fn iam_role_create_and_delete() {
1219 let prov = make_provisioner();
1220 let res = make_resource(
1221 "AWS::IAM::Role",
1222 "MyRole",
1223 serde_json::json!({
1224 "RoleName": "my-role",
1225 "AssumeRolePolicyDocument": {"Version": "2012-10-17", "Statement": []}
1226 }),
1227 );
1228 let sr = prov.create_resource(&res).unwrap();
1229 assert!(sr.physical_id.contains("my-role"));
1230 prov.delete_resource(&sr).unwrap();
1231 }
1232
1233 #[test]
1234 fn iam_policy_create_and_delete() {
1235 let prov = make_provisioner();
1236 let res = make_resource(
1237 "AWS::IAM::Policy",
1238 "MyPolicy",
1239 serde_json::json!({
1240 "PolicyName": "my-policy",
1241 "PolicyDocument": {"Version": "2012-10-17", "Statement": []}
1242 }),
1243 );
1244 let sr = prov.create_resource(&res).unwrap();
1245 assert!(sr.physical_id.contains("my-policy"));
1246 prov.delete_resource(&sr).unwrap();
1247 }
1248
1249 #[test]
1250 fn s3_bucket_create_and_delete() {
1251 let prov = make_provisioner();
1252 let res = make_resource(
1253 "AWS::S3::Bucket",
1254 "MyBucket",
1255 serde_json::json!({"BucketName": "my-bucket"}),
1256 );
1257 let sr = prov.create_resource(&res).unwrap();
1258 assert_eq!(sr.physical_id, "my-bucket");
1259 prov.delete_resource(&sr).unwrap();
1260 }
1261
1262 #[test]
1263 fn dynamodb_table_create_and_delete() {
1264 let prov = make_provisioner();
1265 let res = make_resource(
1266 "AWS::DynamoDB::Table",
1267 "MyTable",
1268 serde_json::json!({
1269 "TableName": "my-table",
1270 "KeySchema": [{"AttributeName": "pk", "KeyType": "HASH"}],
1271 "AttributeDefinitions": [{"AttributeName": "pk", "AttributeType": "S"}],
1272 "BillingMode": "PAY_PER_REQUEST"
1273 }),
1274 );
1275 let sr = prov.create_resource(&res).unwrap();
1276 assert!(sr.physical_id.contains("my-table"));
1277 prov.delete_resource(&sr).unwrap();
1278 }
1279
1280 #[test]
1281 fn log_group_create_and_delete() {
1282 let prov = make_provisioner();
1283 let res = make_resource(
1284 "AWS::Logs::LogGroup",
1285 "MyLogs",
1286 serde_json::json!({"LogGroupName": "/app/logs"}),
1287 );
1288 let sr = prov.create_resource(&res).unwrap();
1289 assert!(sr.physical_id.contains("/app/logs"));
1290 prov.delete_resource(&sr).unwrap();
1291 }
1292
1293 #[test]
1294 fn unsupported_resource_type_fails() {
1295 let prov = make_provisioner();
1296 let res = make_resource("AWS::NonExistent::Thing", "X", serde_json::json!({}));
1297 assert!(prov.create_resource(&res).is_err());
1298 }
1299
1300 #[test]
1301 fn iam_role_with_inline_policies() {
1302 let prov = make_provisioner();
1303 let res = make_resource(
1304 "AWS::IAM::Role",
1305 "MyRole",
1306 serde_json::json!({
1307 "RoleName": "role-inline",
1308 "AssumeRolePolicyDocument": {"Version": "2012-10-17", "Statement": []},
1309 "Policies": [
1310 {
1311 "PolicyName": "inline-1",
1312 "PolicyDocument": {"Version": "2012-10-17", "Statement": []}
1313 }
1314 ]
1315 }),
1316 );
1317 let sr = prov.create_resource(&res).unwrap();
1318 assert!(sr.physical_id.contains("role-inline"));
1319 }
1320
1321 #[test]
1322 fn sqs_queue_auto_name() {
1323 let prov = make_provisioner();
1324 let res = make_resource("AWS::SQS::Queue", "AutoQ", serde_json::json!({}));
1325 let sr = prov.create_resource(&res).unwrap();
1326 assert!(!sr.physical_id.is_empty());
1328 }
1329
1330 #[test]
1331 fn sns_topic_auto_name() {
1332 let prov = make_provisioner();
1333 let res = make_resource("AWS::SNS::Topic", "AutoT", serde_json::json!({}));
1334 let sr = prov.create_resource(&res).unwrap();
1335 assert!(!sr.physical_id.is_empty());
1336 }
1337
1338 #[test]
1341 fn unsupported_resource_type_errors() {
1342 let prov = make_provisioner();
1343 let res = make_resource("AWS::FooBar::Thing", "X", serde_json::json!({}));
1344 assert!(prov.create_resource(&res).is_err());
1345 }
1346
1347 #[test]
1348 fn sqs_queue_with_redrive_policy() {
1349 let prov = make_provisioner();
1350 let dlq = make_resource(
1352 "AWS::SQS::Queue",
1353 "DLQ",
1354 serde_json::json!({"QueueName": "dlq1"}),
1355 );
1356 let dlq_resource = prov.create_resource(&dlq).unwrap();
1357 let _ = dlq_resource.physical_id;
1358
1359 let src = make_resource(
1361 "AWS::SQS::Queue",
1362 "Src",
1363 serde_json::json!({
1364 "QueueName": "src1",
1365 "RedrivePolicy": {
1366 "deadLetterTargetArn": "arn:aws:sqs:us-east-1:123456789012:dlq1",
1367 "maxReceiveCount": 3
1368 }
1369 }),
1370 );
1371 let sr = prov.create_resource(&src).unwrap();
1372 assert!(!sr.physical_id.is_empty());
1373 }
1374
1375 #[test]
1376 fn sns_topic_with_display_name() {
1377 let prov = make_provisioner();
1378 let res = make_resource(
1379 "AWS::SNS::Topic",
1380 "WithName",
1381 serde_json::json!({"TopicName": "named-topic", "DisplayName": "Named"}),
1382 );
1383 let sr = prov.create_resource(&res).unwrap();
1384 assert!(sr.physical_id.contains("named-topic"));
1385 }
1386
1387 #[test]
1388 fn ssm_parameter_with_explicit_name() {
1389 let prov = make_provisioner();
1390 let res = make_resource(
1391 "AWS::SSM::Parameter",
1392 "Param",
1393 serde_json::json!({"Name": "/my/param", "Value": "v", "Type": "String"}),
1394 );
1395 let sr = prov.create_resource(&res).unwrap();
1396 assert!(sr.physical_id.contains("/my/param"));
1397 }
1398
1399 #[test]
1400 fn ssm_parameter_missing_name_errors() {
1401 let prov = make_provisioner();
1402 let res = make_resource(
1403 "AWS::SSM::Parameter",
1404 "AutoP",
1405 serde_json::json!({"Value": "v", "Type": "String"}),
1406 );
1407 assert!(prov.create_resource(&res).is_err());
1408 }
1409
1410 #[test]
1411 fn iam_managed_policy_auto_name() {
1412 let prov = make_provisioner();
1413 let res = make_resource(
1414 "AWS::IAM::Policy",
1415 "AutoPol",
1416 serde_json::json!({
1417 "PolicyName": "inline-pol",
1418 "PolicyDocument": {"Version": "2012-10-17", "Statement": []},
1419 "Users": []
1420 }),
1421 );
1422 let sr = prov.create_resource(&res).unwrap();
1423 assert!(!sr.physical_id.is_empty());
1424 }
1425
1426 #[test]
1427 fn delete_resource_works_for_queue() {
1428 let prov = make_provisioner();
1429 let res = make_resource(
1430 "AWS::SQS::Queue",
1431 "ToDel",
1432 serde_json::json!({"QueueName": "todel"}),
1433 );
1434 let sr = prov.create_resource(&res).unwrap();
1435 assert!(prov.delete_resource(&sr).is_ok());
1436 }
1437
1438 #[test]
1439 fn delete_resource_works_for_topic() {
1440 let prov = make_provisioner();
1441 let res = make_resource(
1442 "AWS::SNS::Topic",
1443 "DelT",
1444 serde_json::json!({"TopicName": "delt"}),
1445 );
1446 let sr = prov.create_resource(&res).unwrap();
1447 assert!(prov.delete_resource(&sr).is_ok());
1448 }
1449
1450 #[test]
1451 fn sqs_queue_with_fifo_suffix() {
1452 let prov = make_provisioner();
1453 let res = make_resource(
1454 "AWS::SQS::Queue",
1455 "Fifo",
1456 serde_json::json!({"QueueName": "fq.fifo", "FifoQueue": true}),
1457 );
1458 let sr = prov.create_resource(&res).unwrap();
1459 assert!(sr.physical_id.ends_with(".fifo"));
1460 }
1461}