1#![allow(clippy::too_many_lines)]
15#![allow(clippy::too_many_arguments)]
16
17use std::sync::Arc;
18
19use chrono::Utc;
20use rustack_cloudfront_model::{
21 CachePolicy, CachePolicyConfig, CloudFrontError, CloudFrontFunction,
22 CloudFrontOriginAccessIdentity, CloudFrontOriginAccessIdentityConfig, Distribution,
23 DistributionConfig, FieldLevelEncryption, FieldLevelEncryptionConfig,
24 FieldLevelEncryptionProfile, FieldLevelEncryptionProfileConfig, FunctionConfig,
25 FunctionMetadata, Invalidation, InvalidationBatch, KeyGroup, KeyGroupConfig, KeyValueStore,
26 MonitoringSubscription, OriginAccessControl, OriginAccessControlConfig, OriginRequestPolicy,
27 OriginRequestPolicyConfig, PublicKey, PublicKeyConfig, RealtimeLogConfig, ResourceStatus,
28 ResponseHeadersPolicy, ResponseHeadersPolicyConfig, Tag, TagSet,
29};
30use tracing::info;
31
32use crate::{
33 arn::{
34 cache_policy_arn, distribution_arn, function_arn, key_group_arn, kvs_arn, oai_arn,
35 origin_access_control_arn, origin_request_policy_arn, public_key_arn, realtime_log_arn,
36 response_headers_policy_arn,
37 },
38 config::CloudFrontConfig,
39 id_gen::{
40 deterministic_id_with_prefix, distribution_domain_name, new_distribution_id, new_etag,
41 new_id_with_prefix, new_invalidation_id, new_s3_canonical_user_id,
42 },
43 managed::{
44 managed_cache_policies, managed_origin_request_policies, managed_response_headers_policies,
45 },
46 store::CloudFrontStore,
47};
48
49#[derive(Debug)]
51pub struct RustackCloudFront {
52 store: Arc<CloudFrontStore>,
53 config: Arc<CloudFrontConfig>,
54}
55
56impl RustackCloudFront {
57 #[must_use]
59 pub fn new(config: CloudFrontConfig) -> Self {
60 let store = CloudFrontStore::new();
61 for p in managed_cache_policies() {
62 store.cache_policies.insert(p.id.clone(), p);
63 }
64 for p in managed_origin_request_policies() {
65 store.origin_request_policies.insert(p.id.clone(), p);
66 }
67 for p in managed_response_headers_policies() {
68 store.response_headers_policies.insert(p.id.clone(), p);
69 }
70 Self {
71 store,
72 config: Arc::new(config),
73 }
74 }
75
76 #[must_use]
78 pub fn store(&self) -> &Arc<CloudFrontStore> {
79 &self.store
80 }
81
82 #[must_use]
84 pub fn config(&self) -> &CloudFrontConfig {
85 &self.config
86 }
87
88 pub fn create_distribution(
94 self: &Arc<Self>,
95 config: DistributionConfig,
96 tags: TagSet,
97 ) -> Result<Distribution, CloudFrontError> {
98 validate_distribution_config(&config)?;
99
100 let id = if self.config.deterministic_ids {
101 deterministic_id_with_prefix('E', &config.caller_reference)
102 } else {
103 new_distribution_id()
104 };
105 let arn = distribution_arn(&self.config.account_id, &id);
106 let domain_name = distribution_domain_name(&id, &self.config.domain_suffix);
107 let etag = new_etag();
108 let dist = Distribution {
109 id: id.clone(),
110 arn: arn.clone(),
111 status: ResourceStatus::InProgress,
112 last_modified_time: Utc::now(),
113 domain_name,
114 in_progress_invalidation_batches: 0,
115 active_trusted_signers_enabled: false,
116 active_trusted_key_groups_enabled: false,
117 config,
118 tags: tags.clone(),
119 etag,
120 alias_icp_recordal: Vec::new(),
121 };
122
123 if !tags.is_empty() {
124 self.store.tags.insert(arn.clone(), tags);
125 }
126 self.store.distributions.insert(id.clone(), dist.clone());
127 self.spawn_distribution_deployment(&id);
128 info!(distribution_id = %id, "created distribution");
129 Ok(dist)
130 }
131
132 pub fn get_distribution(&self, id: &str) -> Result<Distribution, CloudFrontError> {
134 self.store
135 .distributions
136 .get(id)
137 .map(|r| r.value().clone())
138 .ok_or_else(|| CloudFrontError::no_such_distribution(id))
139 }
140
141 pub fn update_distribution(
143 self: &Arc<Self>,
144 id: &str,
145 if_match: Option<&str>,
146 new_config: DistributionConfig,
147 ) -> Result<Distribution, CloudFrontError> {
148 validate_distribution_config(&new_config)?;
149 let mut entry = self
150 .store
151 .distributions
152 .get_mut(id)
153 .ok_or_else(|| CloudFrontError::no_such_distribution(id))?;
154 check_if_match(if_match, &entry.etag)?;
155 entry.config = new_config;
156 entry.etag = new_etag();
157 entry.status = ResourceStatus::InProgress;
158 entry.last_modified_time = Utc::now();
159 let clone = entry.value().clone();
160 drop(entry);
161 self.spawn_distribution_deployment(id);
162 Ok(clone)
163 }
164
165 pub fn delete_distribution(
167 &self,
168 id: &str,
169 if_match: Option<&str>,
170 ) -> Result<(), CloudFrontError> {
171 let dist = self
172 .store
173 .distributions
174 .get(id)
175 .ok_or_else(|| CloudFrontError::no_such_distribution(id))?;
176 check_if_match(if_match, &dist.etag)?;
177 if dist.config.enabled {
178 return Err(CloudFrontError::DistributionNotDisabled(format!(
179 "Distribution {id} is enabled; disable it before deleting."
180 )));
181 }
182 drop(dist);
183 self.store.distributions.remove(id);
184 self.store
185 .tags
186 .remove(&distribution_arn(&self.config.account_id, id));
187 Ok(())
188 }
189
190 pub fn list_distributions(&self) -> Vec<Distribution> {
192 let mut v: Vec<_> = self
193 .store
194 .distributions
195 .iter()
196 .map(|e| e.value().clone())
197 .collect();
198 v.sort_by(|a, b| a.id.cmp(&b.id));
199 v
200 }
201
202 pub fn copy_distribution(
204 self: &Arc<Self>,
205 primary_id: &str,
206 caller_reference: &str,
207 staging: bool,
208 ) -> Result<Distribution, CloudFrontError> {
209 let primary = self
210 .store
211 .distributions
212 .get(primary_id)
213 .ok_or_else(|| CloudFrontError::no_such_distribution(primary_id))?;
214 let mut new_cfg = primary.config.clone();
215 new_cfg.caller_reference = caller_reference.to_owned();
216 new_cfg.staging = staging;
217 drop(primary);
218 self.create_distribution(new_cfg, Vec::new())
219 }
220
221 pub fn create_invalidation(
227 self: &Arc<Self>,
228 distribution_id: &str,
229 batch: InvalidationBatch,
230 ) -> Result<Invalidation, CloudFrontError> {
231 if !self.store.distributions.contains_key(distribution_id) {
232 return Err(CloudFrontError::no_such_distribution(distribution_id));
233 }
234 if batch.paths.is_empty() {
235 return Err(CloudFrontError::InvalidArgument(
236 "Invalidation batch must contain at least one path".to_owned(),
237 ));
238 }
239 let id = if self.config.deterministic_ids {
240 deterministic_id_with_prefix('I', &batch.caller_reference)
241 } else {
242 new_invalidation_id()
243 };
244 let inv = Invalidation {
245 id: id.clone(),
246 status: ResourceStatus::InProgress,
247 create_time: Utc::now(),
248 distribution_id: distribution_id.to_owned(),
249 batch,
250 };
251 self.store
252 .invalidations
253 .insert((distribution_id.to_owned(), id.clone()), inv.clone());
254 if let Some(mut d) = self.store.distributions.get_mut(distribution_id) {
255 d.in_progress_invalidation_batches += 1;
256 }
257 self.spawn_invalidation_completion(distribution_id, &id);
258 Ok(inv)
259 }
260
261 pub fn get_invalidation(
263 &self,
264 distribution_id: &str,
265 invalidation_id: &str,
266 ) -> Result<Invalidation, CloudFrontError> {
267 self.store
268 .invalidations
269 .get(&(distribution_id.to_owned(), invalidation_id.to_owned()))
270 .map(|r| r.value().clone())
271 .ok_or_else(|| CloudFrontError::no_such_invalidation(invalidation_id))
272 }
273
274 pub fn list_invalidations(&self, distribution_id: &str) -> Vec<Invalidation> {
276 let mut v: Vec<_> = self
277 .store
278 .invalidations
279 .iter()
280 .filter(|e| e.key().0 == distribution_id)
281 .map(|e| e.value().clone())
282 .collect();
283 v.sort_by(|a, b| b.create_time.cmp(&a.create_time));
284 v
285 }
286
287 pub fn create_oac(
293 &self,
294 cfg: OriginAccessControlConfig,
295 ) -> Result<OriginAccessControl, CloudFrontError> {
296 if cfg.name.is_empty() {
297 return Err(CloudFrontError::InvalidArgument(
298 "OriginAccessControl Name is required".to_owned(),
299 ));
300 }
301 let id = new_id_with_prefix('E');
302 let oac = OriginAccessControl {
303 id: id.clone(),
304 config: cfg,
305 etag: new_etag(),
306 };
307 self.store.origin_access_controls.insert(id, oac.clone());
308 Ok(oac)
309 }
310
311 pub fn get_oac(&self, id: &str) -> Result<OriginAccessControl, CloudFrontError> {
313 self.store
314 .origin_access_controls
315 .get(id)
316 .map(|r| r.value().clone())
317 .ok_or_else(|| CloudFrontError::no_such_origin_access_control(id))
318 }
319
320 pub fn update_oac(
322 &self,
323 id: &str,
324 if_match: Option<&str>,
325 cfg: OriginAccessControlConfig,
326 ) -> Result<OriginAccessControl, CloudFrontError> {
327 let mut entry = self
328 .store
329 .origin_access_controls
330 .get_mut(id)
331 .ok_or_else(|| CloudFrontError::no_such_origin_access_control(id))?;
332 check_if_match(if_match, &entry.etag)?;
333 entry.config = cfg;
334 entry.etag = new_etag();
335 Ok(entry.value().clone())
336 }
337
338 pub fn delete_oac(&self, id: &str, if_match: Option<&str>) -> Result<(), CloudFrontError> {
340 let entry = self
341 .store
342 .origin_access_controls
343 .get(id)
344 .ok_or_else(|| CloudFrontError::no_such_origin_access_control(id))?;
345 check_if_match(if_match, &entry.etag)?;
346 drop(entry);
347 self.store.origin_access_controls.remove(id);
348 self.store
349 .tags
350 .remove(&origin_access_control_arn(&self.config.account_id, id));
351 Ok(())
352 }
353
354 pub fn list_oacs(&self) -> Vec<OriginAccessControl> {
356 let mut v: Vec<_> = self
357 .store
358 .origin_access_controls
359 .iter()
360 .map(|e| e.value().clone())
361 .collect();
362 v.sort_by(|a, b| a.id.cmp(&b.id));
363 v
364 }
365
366 pub fn create_oai(
372 &self,
373 cfg: CloudFrontOriginAccessIdentityConfig,
374 ) -> Result<CloudFrontOriginAccessIdentity, CloudFrontError> {
375 let id = new_id_with_prefix('E');
376 let oai = CloudFrontOriginAccessIdentity {
377 id: id.clone(),
378 s3_canonical_user_id: new_s3_canonical_user_id(),
379 config: cfg,
380 etag: new_etag(),
381 };
382 self.store.origin_access_identities.insert(id, oai.clone());
383 Ok(oai)
384 }
385
386 pub fn get_oai(&self, id: &str) -> Result<CloudFrontOriginAccessIdentity, CloudFrontError> {
388 self.store
389 .origin_access_identities
390 .get(id)
391 .map(|r| r.value().clone())
392 .ok_or_else(|| CloudFrontError::no_such_oai(id))
393 }
394
395 pub fn update_oai(
397 &self,
398 id: &str,
399 if_match: Option<&str>,
400 cfg: CloudFrontOriginAccessIdentityConfig,
401 ) -> Result<CloudFrontOriginAccessIdentity, CloudFrontError> {
402 let mut entry = self
403 .store
404 .origin_access_identities
405 .get_mut(id)
406 .ok_or_else(|| CloudFrontError::no_such_oai(id))?;
407 check_if_match(if_match, &entry.etag)?;
408 entry.config = cfg;
409 entry.etag = new_etag();
410 Ok(entry.value().clone())
411 }
412
413 pub fn delete_oai(&self, id: &str, if_match: Option<&str>) -> Result<(), CloudFrontError> {
415 let entry = self
416 .store
417 .origin_access_identities
418 .get(id)
419 .ok_or_else(|| CloudFrontError::no_such_oai(id))?;
420 check_if_match(if_match, &entry.etag)?;
421 drop(entry);
422 self.store.origin_access_identities.remove(id);
423 self.store
424 .tags
425 .remove(&oai_arn(&self.config.account_id, id));
426 Ok(())
427 }
428
429 pub fn list_oais(&self) -> Vec<CloudFrontOriginAccessIdentity> {
431 let mut v: Vec<_> = self
432 .store
433 .origin_access_identities
434 .iter()
435 .map(|e| e.value().clone())
436 .collect();
437 v.sort_by(|a, b| a.id.cmp(&b.id));
438 v
439 }
440
441 pub fn create_cache_policy(
447 &self,
448 cfg: CachePolicyConfig,
449 ) -> Result<CachePolicy, CloudFrontError> {
450 if cfg.name.is_empty() {
451 return Err(CloudFrontError::InvalidArgument(
452 "CachePolicyConfig.Name is required".to_owned(),
453 ));
454 }
455 let id = uuid::Uuid::new_v4().to_string();
456 let p = CachePolicy {
457 id: id.clone(),
458 last_modified_time: Utc::now(),
459 config: cfg,
460 etag: new_etag(),
461 managed: false,
462 };
463 self.store.cache_policies.insert(id, p.clone());
464 Ok(p)
465 }
466
467 pub fn get_cache_policy(&self, id: &str) -> Result<CachePolicy, CloudFrontError> {
469 self.store
470 .cache_policies
471 .get(id)
472 .map(|r| r.value().clone())
473 .ok_or_else(|| CloudFrontError::no_such_cache_policy(id))
474 }
475
476 pub fn update_cache_policy(
478 &self,
479 id: &str,
480 if_match: Option<&str>,
481 cfg: CachePolicyConfig,
482 ) -> Result<CachePolicy, CloudFrontError> {
483 let mut entry = self
484 .store
485 .cache_policies
486 .get_mut(id)
487 .ok_or_else(|| CloudFrontError::no_such_cache_policy(id))?;
488 if entry.managed {
489 return Err(CloudFrontError::AccessDenied(
490 "AWS-managed cache policies cannot be modified".to_owned(),
491 ));
492 }
493 check_if_match(if_match, &entry.etag)?;
494 entry.config = cfg;
495 entry.etag = new_etag();
496 entry.last_modified_time = Utc::now();
497 Ok(entry.value().clone())
498 }
499
500 pub fn delete_cache_policy(
502 &self,
503 id: &str,
504 if_match: Option<&str>,
505 ) -> Result<(), CloudFrontError> {
506 let entry = self
507 .store
508 .cache_policies
509 .get(id)
510 .ok_or_else(|| CloudFrontError::no_such_cache_policy(id))?;
511 if entry.managed {
512 return Err(CloudFrontError::AccessDenied(
513 "AWS-managed cache policies cannot be deleted".to_owned(),
514 ));
515 }
516 check_if_match(if_match, &entry.etag)?;
517 drop(entry);
518 self.store.cache_policies.remove(id);
519 self.store
520 .tags
521 .remove(&cache_policy_arn(&self.config.account_id, id));
522 Ok(())
523 }
524
525 pub fn list_cache_policies(&self) -> Vec<CachePolicy> {
527 let mut v: Vec<_> = self
528 .store
529 .cache_policies
530 .iter()
531 .map(|e| e.value().clone())
532 .collect();
533 v.sort_by(|a, b| a.config.name.cmp(&b.config.name));
534 v
535 }
536
537 pub fn create_origin_request_policy(
543 &self,
544 cfg: OriginRequestPolicyConfig,
545 ) -> Result<OriginRequestPolicy, CloudFrontError> {
546 if cfg.name.is_empty() {
547 return Err(CloudFrontError::InvalidArgument(
548 "OriginRequestPolicyConfig.Name is required".to_owned(),
549 ));
550 }
551 let id = uuid::Uuid::new_v4().to_string();
552 let p = OriginRequestPolicy {
553 id: id.clone(),
554 last_modified_time: Utc::now(),
555 config: cfg,
556 etag: new_etag(),
557 managed: false,
558 };
559 self.store.origin_request_policies.insert(id, p.clone());
560 Ok(p)
561 }
562
563 pub fn get_origin_request_policy(
565 &self,
566 id: &str,
567 ) -> Result<OriginRequestPolicy, CloudFrontError> {
568 self.store
569 .origin_request_policies
570 .get(id)
571 .map(|r| r.value().clone())
572 .ok_or_else(|| CloudFrontError::no_such_origin_request_policy(id))
573 }
574
575 pub fn update_origin_request_policy(
577 &self,
578 id: &str,
579 if_match: Option<&str>,
580 cfg: OriginRequestPolicyConfig,
581 ) -> Result<OriginRequestPolicy, CloudFrontError> {
582 let mut entry = self
583 .store
584 .origin_request_policies
585 .get_mut(id)
586 .ok_or_else(|| CloudFrontError::no_such_origin_request_policy(id))?;
587 if entry.managed {
588 return Err(CloudFrontError::AccessDenied(
589 "AWS-managed origin request policies cannot be modified".to_owned(),
590 ));
591 }
592 check_if_match(if_match, &entry.etag)?;
593 entry.config = cfg;
594 entry.etag = new_etag();
595 entry.last_modified_time = Utc::now();
596 Ok(entry.value().clone())
597 }
598
599 pub fn delete_origin_request_policy(
601 &self,
602 id: &str,
603 if_match: Option<&str>,
604 ) -> Result<(), CloudFrontError> {
605 let entry = self
606 .store
607 .origin_request_policies
608 .get(id)
609 .ok_or_else(|| CloudFrontError::no_such_origin_request_policy(id))?;
610 if entry.managed {
611 return Err(CloudFrontError::AccessDenied(
612 "AWS-managed origin request policies cannot be deleted".to_owned(),
613 ));
614 }
615 check_if_match(if_match, &entry.etag)?;
616 drop(entry);
617 self.store.origin_request_policies.remove(id);
618 self.store
619 .tags
620 .remove(&origin_request_policy_arn(&self.config.account_id, id));
621 Ok(())
622 }
623
624 pub fn list_origin_request_policies(&self) -> Vec<OriginRequestPolicy> {
626 let mut v: Vec<_> = self
627 .store
628 .origin_request_policies
629 .iter()
630 .map(|e| e.value().clone())
631 .collect();
632 v.sort_by(|a, b| a.config.name.cmp(&b.config.name));
633 v
634 }
635
636 pub fn create_response_headers_policy(
642 &self,
643 cfg: ResponseHeadersPolicyConfig,
644 ) -> Result<ResponseHeadersPolicy, CloudFrontError> {
645 if cfg.name.is_empty() {
646 return Err(CloudFrontError::InvalidArgument(
647 "ResponseHeadersPolicyConfig.Name is required".to_owned(),
648 ));
649 }
650 let id = uuid::Uuid::new_v4().to_string();
651 let p = ResponseHeadersPolicy {
652 id: id.clone(),
653 last_modified_time: Utc::now(),
654 config: cfg,
655 etag: new_etag(),
656 managed: false,
657 };
658 self.store.response_headers_policies.insert(id, p.clone());
659 Ok(p)
660 }
661
662 pub fn get_response_headers_policy(
664 &self,
665 id: &str,
666 ) -> Result<ResponseHeadersPolicy, CloudFrontError> {
667 self.store
668 .response_headers_policies
669 .get(id)
670 .map(|r| r.value().clone())
671 .ok_or_else(|| CloudFrontError::no_such_response_headers_policy(id))
672 }
673
674 pub fn update_response_headers_policy(
676 &self,
677 id: &str,
678 if_match: Option<&str>,
679 cfg: ResponseHeadersPolicyConfig,
680 ) -> Result<ResponseHeadersPolicy, CloudFrontError> {
681 let mut entry = self
682 .store
683 .response_headers_policies
684 .get_mut(id)
685 .ok_or_else(|| CloudFrontError::no_such_response_headers_policy(id))?;
686 if entry.managed {
687 return Err(CloudFrontError::AccessDenied(
688 "AWS-managed response headers policies cannot be modified".to_owned(),
689 ));
690 }
691 check_if_match(if_match, &entry.etag)?;
692 entry.config = cfg;
693 entry.etag = new_etag();
694 entry.last_modified_time = Utc::now();
695 Ok(entry.value().clone())
696 }
697
698 pub fn delete_response_headers_policy(
700 &self,
701 id: &str,
702 if_match: Option<&str>,
703 ) -> Result<(), CloudFrontError> {
704 let entry = self
705 .store
706 .response_headers_policies
707 .get(id)
708 .ok_or_else(|| CloudFrontError::no_such_response_headers_policy(id))?;
709 if entry.managed {
710 return Err(CloudFrontError::AccessDenied(
711 "AWS-managed response headers policies cannot be deleted".to_owned(),
712 ));
713 }
714 check_if_match(if_match, &entry.etag)?;
715 drop(entry);
716 self.store.response_headers_policies.remove(id);
717 self.store
718 .tags
719 .remove(&response_headers_policy_arn(&self.config.account_id, id));
720 Ok(())
721 }
722
723 pub fn list_response_headers_policies(&self) -> Vec<ResponseHeadersPolicy> {
725 let mut v: Vec<_> = self
726 .store
727 .response_headers_policies
728 .iter()
729 .map(|e| e.value().clone())
730 .collect();
731 v.sort_by(|a, b| a.config.name.cmp(&b.config.name));
732 v
733 }
734
735 pub fn create_key_group(&self, cfg: KeyGroupConfig) -> Result<KeyGroup, CloudFrontError> {
741 if cfg.name.is_empty() {
742 return Err(CloudFrontError::InvalidArgument(
743 "KeyGroupConfig.Name is required".to_owned(),
744 ));
745 }
746 let id = new_id_with_prefix('K');
747 let kg = KeyGroup {
748 id: id.clone(),
749 last_modified_time: Utc::now(),
750 config: cfg,
751 etag: new_etag(),
752 };
753 self.store.key_groups.insert(id, kg.clone());
754 Ok(kg)
755 }
756
757 pub fn get_key_group(&self, id: &str) -> Result<KeyGroup, CloudFrontError> {
759 self.store
760 .key_groups
761 .get(id)
762 .map(|r| r.value().clone())
763 .ok_or_else(|| CloudFrontError::no_such_resource("KeyGroup", id))
764 }
765
766 pub fn update_key_group(
768 &self,
769 id: &str,
770 if_match: Option<&str>,
771 cfg: KeyGroupConfig,
772 ) -> Result<KeyGroup, CloudFrontError> {
773 let mut entry = self
774 .store
775 .key_groups
776 .get_mut(id)
777 .ok_or_else(|| CloudFrontError::no_such_resource("KeyGroup", id))?;
778 check_if_match(if_match, &entry.etag)?;
779 entry.config = cfg;
780 entry.etag = new_etag();
781 entry.last_modified_time = Utc::now();
782 Ok(entry.value().clone())
783 }
784
785 pub fn delete_key_group(
787 &self,
788 id: &str,
789 if_match: Option<&str>,
790 ) -> Result<(), CloudFrontError> {
791 let entry = self
792 .store
793 .key_groups
794 .get(id)
795 .ok_or_else(|| CloudFrontError::no_such_resource("KeyGroup", id))?;
796 check_if_match(if_match, &entry.etag)?;
797 drop(entry);
798 self.store.key_groups.remove(id);
799 self.store
800 .tags
801 .remove(&key_group_arn(&self.config.account_id, id));
802 Ok(())
803 }
804
805 pub fn list_key_groups(&self) -> Vec<KeyGroup> {
807 let mut v: Vec<_> = self
808 .store
809 .key_groups
810 .iter()
811 .map(|e| e.value().clone())
812 .collect();
813 v.sort_by(|a, b| a.id.cmp(&b.id));
814 v
815 }
816
817 pub fn create_public_key(&self, cfg: PublicKeyConfig) -> Result<PublicKey, CloudFrontError> {
819 if cfg.name.is_empty() || cfg.encoded_key.is_empty() {
820 return Err(CloudFrontError::InvalidArgument(
821 "PublicKeyConfig Name and EncodedKey are required".to_owned(),
822 ));
823 }
824 let id = new_id_with_prefix('K');
825 let pk = PublicKey {
826 id: id.clone(),
827 created_time: Utc::now(),
828 config: cfg,
829 etag: new_etag(),
830 };
831 self.store.public_keys.insert(id, pk.clone());
832 Ok(pk)
833 }
834
835 pub fn get_public_key(&self, id: &str) -> Result<PublicKey, CloudFrontError> {
837 self.store
838 .public_keys
839 .get(id)
840 .map(|r| r.value().clone())
841 .ok_or_else(|| CloudFrontError::no_such_public_key(id))
842 }
843
844 pub fn update_public_key(
846 &self,
847 id: &str,
848 if_match: Option<&str>,
849 cfg: PublicKeyConfig,
850 ) -> Result<PublicKey, CloudFrontError> {
851 let mut entry = self
852 .store
853 .public_keys
854 .get_mut(id)
855 .ok_or_else(|| CloudFrontError::no_such_public_key(id))?;
856 check_if_match(if_match, &entry.etag)?;
857 entry.config = cfg;
858 entry.etag = new_etag();
859 Ok(entry.value().clone())
860 }
861
862 pub fn delete_public_key(
864 &self,
865 id: &str,
866 if_match: Option<&str>,
867 ) -> Result<(), CloudFrontError> {
868 let entry = self
869 .store
870 .public_keys
871 .get(id)
872 .ok_or_else(|| CloudFrontError::no_such_public_key(id))?;
873 check_if_match(if_match, &entry.etag)?;
874 drop(entry);
875 self.store.public_keys.remove(id);
876 self.store
877 .tags
878 .remove(&public_key_arn(&self.config.account_id, id));
879 Ok(())
880 }
881
882 pub fn list_public_keys(&self) -> Vec<PublicKey> {
884 let mut v: Vec<_> = self
885 .store
886 .public_keys
887 .iter()
888 .map(|e| e.value().clone())
889 .collect();
890 v.sort_by(|a, b| a.id.cmp(&b.id));
891 v
892 }
893
894 pub fn create_function(
900 &self,
901 name: String,
902 cfg: FunctionConfig,
903 code: Vec<u8>,
904 ) -> Result<CloudFrontFunction, CloudFrontError> {
905 if name.is_empty() {
906 return Err(CloudFrontError::InvalidArgument(
907 "Function Name is required".to_owned(),
908 ));
909 }
910 if self.store.functions.contains_key(&name) {
911 return Err(CloudFrontError::AlreadyExists {
912 code: "FunctionAlreadyExists",
913 message: format!("Function {name} already exists"),
914 });
915 }
916 let arn = function_arn(&self.config.account_id, &name);
917 let now = Utc::now();
918 let f = CloudFrontFunction {
919 name: name.clone(),
920 arn: arn.clone(),
921 last_modified_time: now,
922 stage: "DEVELOPMENT".to_owned(),
923 metadata: FunctionMetadata {
924 function_arn: arn,
925 stage: "DEVELOPMENT".to_owned(),
926 created_time: now,
927 last_modified_time: now,
928 },
929 config: cfg,
930 code,
931 etag: new_etag(),
932 status: "UNPUBLISHED".to_owned(),
933 };
934 self.store.functions.insert(name, f.clone());
935 Ok(f)
936 }
937
938 pub fn get_function(&self, name: &str) -> Result<CloudFrontFunction, CloudFrontError> {
940 self.store
941 .functions
942 .get(name)
943 .map(|r| r.value().clone())
944 .ok_or_else(|| CloudFrontError::no_such_resource("Function", name))
945 }
946
947 pub fn update_function(
949 &self,
950 name: &str,
951 if_match: Option<&str>,
952 cfg: FunctionConfig,
953 code: Vec<u8>,
954 ) -> Result<CloudFrontFunction, CloudFrontError> {
955 let mut entry = self
956 .store
957 .functions
958 .get_mut(name)
959 .ok_or_else(|| CloudFrontError::no_such_resource("Function", name))?;
960 check_if_match(if_match, &entry.etag)?;
961 entry.config = cfg;
962 entry.code = code;
963 entry.etag = new_etag();
964 entry.last_modified_time = Utc::now();
965 Ok(entry.value().clone())
966 }
967
968 pub fn delete_function(
970 &self,
971 name: &str,
972 if_match: Option<&str>,
973 ) -> Result<(), CloudFrontError> {
974 let entry = self
975 .store
976 .functions
977 .get(name)
978 .ok_or_else(|| CloudFrontError::no_such_resource("Function", name))?;
979 check_if_match(if_match, &entry.etag)?;
980 drop(entry);
981 self.store.functions.remove(name);
982 Ok(())
983 }
984
985 pub fn publish_function(
987 &self,
988 name: &str,
989 if_match: Option<&str>,
990 ) -> Result<CloudFrontFunction, CloudFrontError> {
991 let mut entry = self
992 .store
993 .functions
994 .get_mut(name)
995 .ok_or_else(|| CloudFrontError::no_such_resource("Function", name))?;
996 check_if_match(if_match, &entry.etag)?;
997 entry.stage = "LIVE".to_owned();
998 entry.metadata.stage = "LIVE".to_owned();
999 entry.status = "PUBLISHED".to_owned();
1000 entry.etag = new_etag();
1001 Ok(entry.value().clone())
1002 }
1003
1004 pub fn test_function(
1006 &self,
1007 name: &str,
1008 event_object: &[u8],
1009 ) -> Result<(Vec<u8>, String), CloudFrontError> {
1010 let _ = self.get_function(name)?;
1011 let result = br#"{"status":"success","testStatus":"OK"}"#.to_vec();
1012 let compute_util = format!("compute_utilization_percent={}", event_object.len());
1013 Ok((result, compute_util))
1014 }
1015
1016 pub fn list_functions(&self) -> Vec<CloudFrontFunction> {
1018 let mut v: Vec<_> = self
1019 .store
1020 .functions
1021 .iter()
1022 .map(|e| e.value().clone())
1023 .collect();
1024 v.sort_by(|a, b| a.name.cmp(&b.name));
1025 v
1026 }
1027
1028 pub fn create_fle_config(
1034 &self,
1035 cfg: FieldLevelEncryptionConfig,
1036 ) -> Result<FieldLevelEncryption, CloudFrontError> {
1037 let id = new_id_with_prefix('F');
1038 let f = FieldLevelEncryption {
1039 id: id.clone(),
1040 last_modified_time: Utc::now(),
1041 config: cfg,
1042 etag: new_etag(),
1043 };
1044 self.store.fle_configs.insert(id, f.clone());
1045 Ok(f)
1046 }
1047
1048 pub fn get_fle_config(&self, id: &str) -> Result<FieldLevelEncryption, CloudFrontError> {
1050 self.store
1051 .fle_configs
1052 .get(id)
1053 .map(|r| r.value().clone())
1054 .ok_or_else(|| CloudFrontError::no_such_resource("FieldLevelEncryption", id))
1055 }
1056
1057 pub fn update_fle_config(
1059 &self,
1060 id: &str,
1061 if_match: Option<&str>,
1062 cfg: FieldLevelEncryptionConfig,
1063 ) -> Result<FieldLevelEncryption, CloudFrontError> {
1064 let mut entry = self
1065 .store
1066 .fle_configs
1067 .get_mut(id)
1068 .ok_or_else(|| CloudFrontError::no_such_resource("FieldLevelEncryption", id))?;
1069 check_if_match(if_match, &entry.etag)?;
1070 entry.config = cfg;
1071 entry.etag = new_etag();
1072 entry.last_modified_time = Utc::now();
1073 Ok(entry.value().clone())
1074 }
1075
1076 pub fn delete_fle_config(
1078 &self,
1079 id: &str,
1080 if_match: Option<&str>,
1081 ) -> Result<(), CloudFrontError> {
1082 let entry = self
1083 .store
1084 .fle_configs
1085 .get(id)
1086 .ok_or_else(|| CloudFrontError::no_such_resource("FieldLevelEncryption", id))?;
1087 check_if_match(if_match, &entry.etag)?;
1088 drop(entry);
1089 self.store.fle_configs.remove(id);
1090 Ok(())
1091 }
1092
1093 pub fn list_fle_configs(&self) -> Vec<FieldLevelEncryption> {
1095 self.store
1096 .fle_configs
1097 .iter()
1098 .map(|e| e.value().clone())
1099 .collect()
1100 }
1101
1102 pub fn create_fle_profile(
1104 &self,
1105 cfg: FieldLevelEncryptionProfileConfig,
1106 ) -> Result<FieldLevelEncryptionProfile, CloudFrontError> {
1107 let id = new_id_with_prefix('P');
1108 let p = FieldLevelEncryptionProfile {
1109 id: id.clone(),
1110 last_modified_time: Utc::now(),
1111 config: cfg,
1112 etag: new_etag(),
1113 };
1114 self.store.fle_profiles.insert(id, p.clone());
1115 Ok(p)
1116 }
1117
1118 pub fn get_fle_profile(
1120 &self,
1121 id: &str,
1122 ) -> Result<FieldLevelEncryptionProfile, CloudFrontError> {
1123 self.store
1124 .fle_profiles
1125 .get(id)
1126 .map(|r| r.value().clone())
1127 .ok_or_else(|| CloudFrontError::no_such_resource("FieldLevelEncryptionProfile", id))
1128 }
1129
1130 pub fn update_fle_profile(
1132 &self,
1133 id: &str,
1134 if_match: Option<&str>,
1135 cfg: FieldLevelEncryptionProfileConfig,
1136 ) -> Result<FieldLevelEncryptionProfile, CloudFrontError> {
1137 let mut entry =
1138 self.store.fle_profiles.get_mut(id).ok_or_else(|| {
1139 CloudFrontError::no_such_resource("FieldLevelEncryptionProfile", id)
1140 })?;
1141 check_if_match(if_match, &entry.etag)?;
1142 entry.config = cfg;
1143 entry.etag = new_etag();
1144 entry.last_modified_time = Utc::now();
1145 Ok(entry.value().clone())
1146 }
1147
1148 pub fn delete_fle_profile(
1150 &self,
1151 id: &str,
1152 if_match: Option<&str>,
1153 ) -> Result<(), CloudFrontError> {
1154 let entry =
1155 self.store.fle_profiles.get(id).ok_or_else(|| {
1156 CloudFrontError::no_such_resource("FieldLevelEncryptionProfile", id)
1157 })?;
1158 check_if_match(if_match, &entry.etag)?;
1159 drop(entry);
1160 self.store.fle_profiles.remove(id);
1161 Ok(())
1162 }
1163
1164 pub fn list_fle_profiles(&self) -> Vec<FieldLevelEncryptionProfile> {
1166 self.store
1167 .fle_profiles
1168 .iter()
1169 .map(|e| e.value().clone())
1170 .collect()
1171 }
1172
1173 pub fn create_monitoring_subscription(
1179 &self,
1180 distribution_id: &str,
1181 enabled: bool,
1182 ) -> Result<MonitoringSubscription, CloudFrontError> {
1183 if !self.store.distributions.contains_key(distribution_id) {
1184 return Err(CloudFrontError::no_such_distribution(distribution_id));
1185 }
1186 let sub = MonitoringSubscription {
1187 distribution_id: distribution_id.to_owned(),
1188 realtime_metrics_subscription_status: if enabled {
1189 "Enabled".to_owned()
1190 } else {
1191 "Disabled".to_owned()
1192 },
1193 };
1194 self.store
1195 .monitoring_subscriptions
1196 .insert(distribution_id.to_owned(), sub.clone());
1197 Ok(sub)
1198 }
1199
1200 pub fn get_monitoring_subscription(
1202 &self,
1203 distribution_id: &str,
1204 ) -> Result<MonitoringSubscription, CloudFrontError> {
1205 self.store
1206 .monitoring_subscriptions
1207 .get(distribution_id)
1208 .map(|r| r.value().clone())
1209 .ok_or_else(|| {
1210 CloudFrontError::no_such_resource("MonitoringSubscription", distribution_id)
1211 })
1212 }
1213
1214 pub fn delete_monitoring_subscription(
1216 &self,
1217 distribution_id: &str,
1218 ) -> Result<(), CloudFrontError> {
1219 if self
1220 .store
1221 .monitoring_subscriptions
1222 .remove(distribution_id)
1223 .is_none()
1224 {
1225 return Err(CloudFrontError::no_such_resource(
1226 "MonitoringSubscription",
1227 distribution_id,
1228 ));
1229 }
1230 Ok(())
1231 }
1232
1233 pub fn create_kvs(
1239 &self,
1240 name: String,
1241 comment: String,
1242 ) -> Result<KeyValueStore, CloudFrontError> {
1243 let id = uuid::Uuid::new_v4().to_string();
1244 let arn = kvs_arn(&self.config.account_id, &id);
1245 let kvs = KeyValueStore {
1246 id: id.clone(),
1247 name,
1248 arn,
1249 comment,
1250 status: "PROVISIONING".to_owned(),
1251 last_modified_time: Utc::now(),
1252 etag: new_etag(),
1253 };
1254 self.store.key_value_stores.insert(id, kvs.clone());
1255 Ok(kvs)
1256 }
1257
1258 pub fn get_kvs(&self, id: &str) -> Result<KeyValueStore, CloudFrontError> {
1260 self.store
1261 .key_value_stores
1262 .get(id)
1263 .map(|r| r.value().clone())
1264 .ok_or_else(|| CloudFrontError::no_such_resource("KeyValueStore", id))
1265 }
1266
1267 pub fn update_kvs(
1269 &self,
1270 id: &str,
1271 if_match: Option<&str>,
1272 comment: String,
1273 ) -> Result<KeyValueStore, CloudFrontError> {
1274 let mut entry = self
1275 .store
1276 .key_value_stores
1277 .get_mut(id)
1278 .ok_or_else(|| CloudFrontError::no_such_resource("KeyValueStore", id))?;
1279 check_if_match(if_match, &entry.etag)?;
1280 entry.comment = comment;
1281 entry.etag = new_etag();
1282 entry.last_modified_time = Utc::now();
1283 Ok(entry.value().clone())
1284 }
1285
1286 pub fn delete_kvs(&self, id: &str, if_match: Option<&str>) -> Result<(), CloudFrontError> {
1288 let entry = self
1289 .store
1290 .key_value_stores
1291 .get(id)
1292 .ok_or_else(|| CloudFrontError::no_such_resource("KeyValueStore", id))?;
1293 check_if_match(if_match, &entry.etag)?;
1294 drop(entry);
1295 self.store.key_value_stores.remove(id);
1296 Ok(())
1297 }
1298
1299 pub fn list_kvs(&self) -> Vec<KeyValueStore> {
1301 self.store
1302 .key_value_stores
1303 .iter()
1304 .map(|e| e.value().clone())
1305 .collect()
1306 }
1307
1308 pub fn create_realtime_log_config(
1314 &self,
1315 cfg: RealtimeLogConfig,
1316 ) -> Result<RealtimeLogConfig, CloudFrontError> {
1317 if cfg.name.is_empty() {
1318 return Err(CloudFrontError::InvalidArgument(
1319 "RealtimeLogConfig Name is required".to_owned(),
1320 ));
1321 }
1322 let mut final_cfg = cfg;
1323 if final_cfg.arn.is_empty() {
1324 final_cfg.arn = realtime_log_arn(&self.config.account_id, &final_cfg.name);
1325 }
1326 self.store
1327 .realtime_log_configs
1328 .insert(final_cfg.name.clone(), final_cfg.clone());
1329 Ok(final_cfg)
1330 }
1331
1332 pub fn get_realtime_log_config(
1334 &self,
1335 name: &str,
1336 ) -> Result<RealtimeLogConfig, CloudFrontError> {
1337 self.store
1338 .realtime_log_configs
1339 .get(name)
1340 .map(|r| r.value().clone())
1341 .ok_or_else(|| CloudFrontError::no_such_resource("RealtimeLogConfig", name))
1342 }
1343
1344 pub fn update_realtime_log_config(
1346 &self,
1347 cfg: RealtimeLogConfig,
1348 ) -> Result<RealtimeLogConfig, CloudFrontError> {
1349 if !self.store.realtime_log_configs.contains_key(&cfg.name) {
1350 return Err(CloudFrontError::no_such_resource(
1351 "RealtimeLogConfig",
1352 &cfg.name,
1353 ));
1354 }
1355 self.store
1356 .realtime_log_configs
1357 .insert(cfg.name.clone(), cfg.clone());
1358 Ok(cfg)
1359 }
1360
1361 pub fn delete_realtime_log_config(&self, name: &str) -> Result<(), CloudFrontError> {
1363 if self.store.realtime_log_configs.remove(name).is_none() {
1364 return Err(CloudFrontError::no_such_resource("RealtimeLogConfig", name));
1365 }
1366 Ok(())
1367 }
1368
1369 pub fn list_realtime_log_configs(&self) -> Vec<RealtimeLogConfig> {
1371 self.store
1372 .realtime_log_configs
1373 .iter()
1374 .map(|e| e.value().clone())
1375 .collect()
1376 }
1377
1378 pub fn tag_resource(&self, arn: &str, new_tags: &[Tag]) -> Result<(), CloudFrontError> {
1384 let mut entry = self.store.tags.entry(arn.to_owned()).or_default();
1385 merge_tags(entry.value_mut(), new_tags);
1386 Ok(())
1387 }
1388
1389 pub fn untag_resource(&self, arn: &str, keys: &[String]) -> Result<(), CloudFrontError> {
1391 if let Some(mut entry) = self.store.tags.get_mut(arn) {
1392 entry.retain(|t| !keys.iter().any(|k| k == &t.key));
1393 }
1394 Ok(())
1395 }
1396
1397 pub fn list_tags_for_resource(&self, arn: &str) -> Result<TagSet, CloudFrontError> {
1399 Ok(self
1400 .store
1401 .tags
1402 .get(arn)
1403 .map(|r| r.value().clone())
1404 .unwrap_or_default())
1405 }
1406
1407 fn spawn_distribution_deployment(self: &Arc<Self>, id: &str) {
1412 let delay = self.config.distribution_propagation;
1413 let id = id.to_owned();
1414 let provider = Arc::clone(self);
1415 tokio::spawn(async move {
1416 if !delay.is_zero() {
1417 tokio::time::sleep(delay).await;
1418 }
1419 if let Some(mut d) = provider.store.distributions.get_mut(&id) {
1420 d.status = ResourceStatus::Deployed;
1421 }
1422 });
1423 }
1424
1425 fn spawn_invalidation_completion(
1426 self: &Arc<Self>,
1427 distribution_id: &str,
1428 invalidation_id: &str,
1429 ) {
1430 let delay = self.config.invalidation_propagation;
1431 let dist_id = distribution_id.to_owned();
1432 let inv_id = invalidation_id.to_owned();
1433 let provider = Arc::clone(self);
1434 tokio::spawn(async move {
1435 if !delay.is_zero() {
1436 tokio::time::sleep(delay).await;
1437 }
1438 if let Some(mut inv) = provider
1439 .store
1440 .invalidations
1441 .get_mut(&(dist_id.clone(), inv_id))
1442 {
1443 inv.status = ResourceStatus::Completed;
1444 }
1445 if let Some(mut d) = provider.store.distributions.get_mut(&dist_id) {
1446 d.in_progress_invalidation_batches =
1447 (d.in_progress_invalidation_batches - 1).max(0);
1448 }
1449 });
1450 }
1451}
1452
1453fn validate_distribution_config(cfg: &DistributionConfig) -> Result<(), CloudFrontError> {
1458 if cfg.caller_reference.is_empty() {
1459 return Err(CloudFrontError::MissingArgument(
1460 "CallerReference is required".to_owned(),
1461 ));
1462 }
1463 if cfg.origins.is_empty() {
1464 return Err(CloudFrontError::MissingArgument(
1465 "At least one Origin is required".to_owned(),
1466 ));
1467 }
1468 if cfg.default_cache_behavior.target_origin_id.is_empty() {
1469 return Err(CloudFrontError::MissingArgument(
1470 "DefaultCacheBehavior.TargetOriginId is required".to_owned(),
1471 ));
1472 }
1473 let origin_ids: std::collections::HashSet<&str> =
1474 cfg.origins.iter().map(|o| o.id.as_str()).collect();
1475 if !origin_ids.contains(cfg.default_cache_behavior.target_origin_id.as_str())
1476 && !cfg
1477 .origin_groups
1478 .iter()
1479 .any(|g| g.id == cfg.default_cache_behavior.target_origin_id)
1480 {
1481 return Err(CloudFrontError::MalformedInput(format!(
1482 "DefaultCacheBehavior.TargetOriginId {} does not match any Origin.Id",
1483 cfg.default_cache_behavior.target_origin_id
1484 )));
1485 }
1486 for cb in &cfg.cache_behaviors {
1487 if !origin_ids.contains(cb.target_origin_id.as_str())
1488 && !cfg
1489 .origin_groups
1490 .iter()
1491 .any(|g| g.id == cb.target_origin_id)
1492 {
1493 return Err(CloudFrontError::MalformedInput(format!(
1494 "CacheBehavior.TargetOriginId {} does not match any Origin.Id",
1495 cb.target_origin_id
1496 )));
1497 }
1498 }
1499 Ok(())
1500}
1501
1502fn check_if_match(supplied: Option<&str>, current: &str) -> Result<(), CloudFrontError> {
1503 match supplied {
1504 None | Some("") => Err(CloudFrontError::InvalidIfMatchVersion(
1505 "The If-Match version is missing".to_owned(),
1506 )),
1507 Some(v) if v == current => Ok(()),
1508 Some(_) => Err(CloudFrontError::PreconditionFailed(
1509 "The If-Match version is not valid for the resource".to_owned(),
1510 )),
1511 }
1512}
1513
1514fn merge_tags(existing: &mut TagSet, new_tags: &[Tag]) {
1515 for t in new_tags {
1516 if let Some(existing_tag) = existing.iter_mut().find(|e| e.key == t.key) {
1517 existing_tag.value = t.value.clone();
1518 } else {
1519 existing.push(t.clone());
1520 }
1521 }
1522}