1#![allow(clippy::too_many_lines)]
15#![allow(clippy::too_many_arguments)]
16
17use std::{cmp::Reverse, sync::Arc};
18
19use chrono::Utc;
20use rustack_cloudfront_model::{
21 CachePolicy, CachePolicyConfig, CloudFrontError, CloudFrontFunction,
22 CloudFrontOriginAccessIdentity, CloudFrontOriginAccessIdentityConfig, Distribution,
23 DistributionConfig, FieldLevelEncryption, FieldLevelEncryptionConfig,
24 FieldLevelEncryptionProfile, FieldLevelEncryptionProfileConfig, FunctionConfig,
25 FunctionMetadata, Invalidation, InvalidationBatch, KeyGroup, KeyGroupConfig, KeyValueStore,
26 MonitoringSubscription, OriginAccessControl, OriginAccessControlConfig, OriginRequestPolicy,
27 OriginRequestPolicyConfig, PublicKey, PublicKeyConfig, RealtimeLogConfig, ResourceStatus,
28 ResponseHeadersPolicy, ResponseHeadersPolicyConfig, Tag, TagSet,
29};
30use tracing::info;
31
32use crate::{
33 arn::{
34 cache_policy_arn, distribution_arn, function_arn, key_group_arn, kvs_arn, oai_arn,
35 origin_access_control_arn, origin_request_policy_arn, public_key_arn, realtime_log_arn,
36 response_headers_policy_arn,
37 },
38 config::CloudFrontConfig,
39 id_gen::{
40 deterministic_id_with_prefix, distribution_domain_name, new_distribution_id, new_etag,
41 new_id_with_prefix, new_invalidation_id, new_s3_canonical_user_id,
42 },
43 managed::{
44 managed_cache_policies, managed_origin_request_policies, managed_response_headers_policies,
45 },
46 store::{CloudFrontStore, CloudFrontStoreSnapshot},
47};
48
49#[derive(Debug)]
51pub struct RustackCloudFront {
52 store: Arc<CloudFrontStore>,
53 config: Arc<CloudFrontConfig>,
54}
55
56impl RustackCloudFront {
57 #[must_use]
59 pub fn new(config: CloudFrontConfig) -> Self {
60 let store = CloudFrontStore::new();
61 for p in managed_cache_policies() {
62 store.cache_policies.insert(p.id.clone(), p);
63 }
64 for p in managed_origin_request_policies() {
65 store.origin_request_policies.insert(p.id.clone(), p);
66 }
67 for p in managed_response_headers_policies() {
68 store.response_headers_policies.insert(p.id.clone(), p);
69 }
70 Self {
71 store,
72 config: Arc::new(config),
73 }
74 }
75
76 #[must_use]
78 pub fn store(&self) -> &Arc<CloudFrontStore> {
79 &self.store
80 }
81
82 #[must_use]
84 pub fn config(&self) -> &CloudFrontConfig {
85 &self.config
86 }
87
88 #[must_use]
90 pub fn export_snapshot(&self) -> CloudFrontStoreSnapshot {
91 self.store.export_snapshot()
92 }
93
94 pub fn import_snapshot(&self, snapshot: CloudFrontStoreSnapshot) {
96 self.store.import_snapshot(snapshot);
97 }
98
99 pub fn create_distribution(
105 self: &Arc<Self>,
106 config: DistributionConfig,
107 tags: TagSet,
108 ) -> Result<Distribution, CloudFrontError> {
109 validate_distribution_config(&config)?;
110
111 let id = if self.config.deterministic_ids {
112 deterministic_id_with_prefix('E', &config.caller_reference)
113 } else {
114 new_distribution_id()
115 };
116 let arn = distribution_arn(&self.config.account_id, &id);
117 let domain_name = distribution_domain_name(&id, &self.config.domain_suffix);
118 let etag = new_etag();
119 let dist = Distribution {
120 id: id.clone(),
121 arn: arn.clone(),
122 status: ResourceStatus::InProgress,
123 last_modified_time: Utc::now(),
124 domain_name,
125 in_progress_invalidation_batches: 0,
126 active_trusted_signers_enabled: false,
127 active_trusted_key_groups_enabled: false,
128 config,
129 tags: tags.clone(),
130 etag,
131 alias_icp_recordal: Vec::new(),
132 };
133
134 if !tags.is_empty() {
135 self.store.tags.insert(arn.clone(), tags);
136 }
137 self.store.distributions.insert(id.clone(), dist.clone());
138 self.spawn_distribution_deployment(&id);
139 info!(distribution_id = %id, "created distribution");
140 Ok(dist)
141 }
142
143 pub fn get_distribution(&self, id: &str) -> Result<Distribution, CloudFrontError> {
145 self.store
146 .distributions
147 .get(id)
148 .map(|r| r.value().clone())
149 .ok_or_else(|| CloudFrontError::no_such_distribution(id))
150 }
151
152 pub fn update_distribution(
154 self: &Arc<Self>,
155 id: &str,
156 if_match: Option<&str>,
157 new_config: DistributionConfig,
158 ) -> Result<Distribution, CloudFrontError> {
159 validate_distribution_config(&new_config)?;
160 let mut entry = self
161 .store
162 .distributions
163 .get_mut(id)
164 .ok_or_else(|| CloudFrontError::no_such_distribution(id))?;
165 check_if_match(if_match, &entry.etag)?;
166 entry.config = new_config;
167 entry.etag = new_etag();
168 entry.status = ResourceStatus::InProgress;
169 entry.last_modified_time = Utc::now();
170 let clone = entry.value().clone();
171 drop(entry);
172 self.spawn_distribution_deployment(id);
173 Ok(clone)
174 }
175
176 pub fn delete_distribution(
178 &self,
179 id: &str,
180 if_match: Option<&str>,
181 ) -> Result<(), CloudFrontError> {
182 let dist = self
183 .store
184 .distributions
185 .get(id)
186 .ok_or_else(|| CloudFrontError::no_such_distribution(id))?;
187 check_if_match(if_match, &dist.etag)?;
188 if dist.config.enabled {
189 return Err(CloudFrontError::DistributionNotDisabled(format!(
190 "Distribution {id} is enabled; disable it before deleting."
191 )));
192 }
193 drop(dist);
194 self.store.distributions.remove(id);
195 self.store
196 .tags
197 .remove(&distribution_arn(&self.config.account_id, id));
198 Ok(())
199 }
200
201 pub fn list_distributions(&self) -> Vec<Distribution> {
203 let mut v: Vec<_> = self
204 .store
205 .distributions
206 .iter()
207 .map(|e| e.value().clone())
208 .collect();
209 v.sort_by(|a, b| a.id.cmp(&b.id));
210 v
211 }
212
213 pub fn copy_distribution(
215 self: &Arc<Self>,
216 primary_id: &str,
217 caller_reference: &str,
218 staging: bool,
219 ) -> Result<Distribution, CloudFrontError> {
220 let primary = self
221 .store
222 .distributions
223 .get(primary_id)
224 .ok_or_else(|| CloudFrontError::no_such_distribution(primary_id))?;
225 let mut new_cfg = primary.config.clone();
226 new_cfg.caller_reference = caller_reference.to_owned();
227 new_cfg.staging = staging;
228 drop(primary);
229 self.create_distribution(new_cfg, Vec::new())
230 }
231
232 pub fn create_invalidation(
238 self: &Arc<Self>,
239 distribution_id: &str,
240 batch: InvalidationBatch,
241 ) -> Result<Invalidation, CloudFrontError> {
242 if !self.store.distributions.contains_key(distribution_id) {
243 return Err(CloudFrontError::no_such_distribution(distribution_id));
244 }
245 if batch.paths.is_empty() {
246 return Err(CloudFrontError::InvalidArgument(
247 "Invalidation batch must contain at least one path".to_owned(),
248 ));
249 }
250 let id = if self.config.deterministic_ids {
251 deterministic_id_with_prefix('I', &batch.caller_reference)
252 } else {
253 new_invalidation_id()
254 };
255 let inv = Invalidation {
256 id: id.clone(),
257 status: ResourceStatus::InProgress,
258 create_time: Utc::now(),
259 distribution_id: distribution_id.to_owned(),
260 batch,
261 };
262 self.store
263 .invalidations
264 .insert((distribution_id.to_owned(), id.clone()), inv.clone());
265 if let Some(mut d) = self.store.distributions.get_mut(distribution_id) {
266 d.in_progress_invalidation_batches += 1;
267 }
268 self.spawn_invalidation_completion(distribution_id, &id);
269 Ok(inv)
270 }
271
272 pub fn get_invalidation(
274 &self,
275 distribution_id: &str,
276 invalidation_id: &str,
277 ) -> Result<Invalidation, CloudFrontError> {
278 self.store
279 .invalidations
280 .get(&(distribution_id.to_owned(), invalidation_id.to_owned()))
281 .map(|r| r.value().clone())
282 .ok_or_else(|| CloudFrontError::no_such_invalidation(invalidation_id))
283 }
284
285 pub fn list_invalidations(&self, distribution_id: &str) -> Vec<Invalidation> {
287 let mut v: Vec<_> = self
288 .store
289 .invalidations
290 .iter()
291 .filter(|e| e.key().0 == distribution_id)
292 .map(|e| e.value().clone())
293 .collect();
294 v.sort_by_key(|invalidation| Reverse(invalidation.create_time));
295 v
296 }
297
298 pub fn create_oac(
304 &self,
305 cfg: OriginAccessControlConfig,
306 ) -> Result<OriginAccessControl, CloudFrontError> {
307 if cfg.name.is_empty() {
308 return Err(CloudFrontError::InvalidArgument(
309 "OriginAccessControl Name is required".to_owned(),
310 ));
311 }
312 let id = new_id_with_prefix('E');
313 let oac = OriginAccessControl {
314 id: id.clone(),
315 config: cfg,
316 etag: new_etag(),
317 };
318 self.store.origin_access_controls.insert(id, oac.clone());
319 Ok(oac)
320 }
321
322 pub fn get_oac(&self, id: &str) -> Result<OriginAccessControl, CloudFrontError> {
324 self.store
325 .origin_access_controls
326 .get(id)
327 .map(|r| r.value().clone())
328 .ok_or_else(|| CloudFrontError::no_such_origin_access_control(id))
329 }
330
331 pub fn update_oac(
333 &self,
334 id: &str,
335 if_match: Option<&str>,
336 cfg: OriginAccessControlConfig,
337 ) -> Result<OriginAccessControl, CloudFrontError> {
338 let mut entry = self
339 .store
340 .origin_access_controls
341 .get_mut(id)
342 .ok_or_else(|| CloudFrontError::no_such_origin_access_control(id))?;
343 check_if_match(if_match, &entry.etag)?;
344 entry.config = cfg;
345 entry.etag = new_etag();
346 Ok(entry.value().clone())
347 }
348
349 pub fn delete_oac(&self, id: &str, if_match: Option<&str>) -> Result<(), CloudFrontError> {
351 let entry = self
352 .store
353 .origin_access_controls
354 .get(id)
355 .ok_or_else(|| CloudFrontError::no_such_origin_access_control(id))?;
356 check_if_match(if_match, &entry.etag)?;
357 drop(entry);
358 self.store.origin_access_controls.remove(id);
359 self.store
360 .tags
361 .remove(&origin_access_control_arn(&self.config.account_id, id));
362 Ok(())
363 }
364
365 pub fn list_oacs(&self) -> Vec<OriginAccessControl> {
367 let mut v: Vec<_> = self
368 .store
369 .origin_access_controls
370 .iter()
371 .map(|e| e.value().clone())
372 .collect();
373 v.sort_by(|a, b| a.id.cmp(&b.id));
374 v
375 }
376
377 pub fn create_oai(
383 &self,
384 cfg: CloudFrontOriginAccessIdentityConfig,
385 ) -> Result<CloudFrontOriginAccessIdentity, CloudFrontError> {
386 let id = new_id_with_prefix('E');
387 let oai = CloudFrontOriginAccessIdentity {
388 id: id.clone(),
389 s3_canonical_user_id: new_s3_canonical_user_id(),
390 config: cfg,
391 etag: new_etag(),
392 };
393 self.store.origin_access_identities.insert(id, oai.clone());
394 Ok(oai)
395 }
396
397 pub fn get_oai(&self, id: &str) -> Result<CloudFrontOriginAccessIdentity, CloudFrontError> {
399 self.store
400 .origin_access_identities
401 .get(id)
402 .map(|r| r.value().clone())
403 .ok_or_else(|| CloudFrontError::no_such_oai(id))
404 }
405
406 pub fn update_oai(
408 &self,
409 id: &str,
410 if_match: Option<&str>,
411 cfg: CloudFrontOriginAccessIdentityConfig,
412 ) -> Result<CloudFrontOriginAccessIdentity, CloudFrontError> {
413 let mut entry = self
414 .store
415 .origin_access_identities
416 .get_mut(id)
417 .ok_or_else(|| CloudFrontError::no_such_oai(id))?;
418 check_if_match(if_match, &entry.etag)?;
419 entry.config = cfg;
420 entry.etag = new_etag();
421 Ok(entry.value().clone())
422 }
423
424 pub fn delete_oai(&self, id: &str, if_match: Option<&str>) -> Result<(), CloudFrontError> {
426 let entry = self
427 .store
428 .origin_access_identities
429 .get(id)
430 .ok_or_else(|| CloudFrontError::no_such_oai(id))?;
431 check_if_match(if_match, &entry.etag)?;
432 drop(entry);
433 self.store.origin_access_identities.remove(id);
434 self.store
435 .tags
436 .remove(&oai_arn(&self.config.account_id, id));
437 Ok(())
438 }
439
440 pub fn list_oais(&self) -> Vec<CloudFrontOriginAccessIdentity> {
442 let mut v: Vec<_> = self
443 .store
444 .origin_access_identities
445 .iter()
446 .map(|e| e.value().clone())
447 .collect();
448 v.sort_by(|a, b| a.id.cmp(&b.id));
449 v
450 }
451
452 pub fn create_cache_policy(
458 &self,
459 cfg: CachePolicyConfig,
460 ) -> Result<CachePolicy, CloudFrontError> {
461 if cfg.name.is_empty() {
462 return Err(CloudFrontError::InvalidArgument(
463 "CachePolicyConfig.Name is required".to_owned(),
464 ));
465 }
466 let id = uuid::Uuid::new_v4().to_string();
467 let p = CachePolicy {
468 id: id.clone(),
469 last_modified_time: Utc::now(),
470 config: cfg,
471 etag: new_etag(),
472 managed: false,
473 };
474 self.store.cache_policies.insert(id, p.clone());
475 Ok(p)
476 }
477
478 pub fn get_cache_policy(&self, id: &str) -> Result<CachePolicy, CloudFrontError> {
480 self.store
481 .cache_policies
482 .get(id)
483 .map(|r| r.value().clone())
484 .ok_or_else(|| CloudFrontError::no_such_cache_policy(id))
485 }
486
487 pub fn update_cache_policy(
489 &self,
490 id: &str,
491 if_match: Option<&str>,
492 cfg: CachePolicyConfig,
493 ) -> Result<CachePolicy, CloudFrontError> {
494 let mut entry = self
495 .store
496 .cache_policies
497 .get_mut(id)
498 .ok_or_else(|| CloudFrontError::no_such_cache_policy(id))?;
499 if entry.managed {
500 return Err(CloudFrontError::AccessDenied(
501 "AWS-managed cache policies cannot be modified".to_owned(),
502 ));
503 }
504 check_if_match(if_match, &entry.etag)?;
505 entry.config = cfg;
506 entry.etag = new_etag();
507 entry.last_modified_time = Utc::now();
508 Ok(entry.value().clone())
509 }
510
511 pub fn delete_cache_policy(
513 &self,
514 id: &str,
515 if_match: Option<&str>,
516 ) -> Result<(), CloudFrontError> {
517 let entry = self
518 .store
519 .cache_policies
520 .get(id)
521 .ok_or_else(|| CloudFrontError::no_such_cache_policy(id))?;
522 if entry.managed {
523 return Err(CloudFrontError::AccessDenied(
524 "AWS-managed cache policies cannot be deleted".to_owned(),
525 ));
526 }
527 check_if_match(if_match, &entry.etag)?;
528 drop(entry);
529 self.store.cache_policies.remove(id);
530 self.store
531 .tags
532 .remove(&cache_policy_arn(&self.config.account_id, id));
533 Ok(())
534 }
535
536 pub fn list_cache_policies(&self) -> Vec<CachePolicy> {
538 let mut v: Vec<_> = self
539 .store
540 .cache_policies
541 .iter()
542 .map(|e| e.value().clone())
543 .collect();
544 v.sort_by(|a, b| a.config.name.cmp(&b.config.name));
545 v
546 }
547
548 pub fn create_origin_request_policy(
554 &self,
555 cfg: OriginRequestPolicyConfig,
556 ) -> Result<OriginRequestPolicy, CloudFrontError> {
557 if cfg.name.is_empty() {
558 return Err(CloudFrontError::InvalidArgument(
559 "OriginRequestPolicyConfig.Name is required".to_owned(),
560 ));
561 }
562 let id = uuid::Uuid::new_v4().to_string();
563 let p = OriginRequestPolicy {
564 id: id.clone(),
565 last_modified_time: Utc::now(),
566 config: cfg,
567 etag: new_etag(),
568 managed: false,
569 };
570 self.store.origin_request_policies.insert(id, p.clone());
571 Ok(p)
572 }
573
574 pub fn get_origin_request_policy(
576 &self,
577 id: &str,
578 ) -> Result<OriginRequestPolicy, CloudFrontError> {
579 self.store
580 .origin_request_policies
581 .get(id)
582 .map(|r| r.value().clone())
583 .ok_or_else(|| CloudFrontError::no_such_origin_request_policy(id))
584 }
585
586 pub fn update_origin_request_policy(
588 &self,
589 id: &str,
590 if_match: Option<&str>,
591 cfg: OriginRequestPolicyConfig,
592 ) -> Result<OriginRequestPolicy, CloudFrontError> {
593 let mut entry = self
594 .store
595 .origin_request_policies
596 .get_mut(id)
597 .ok_or_else(|| CloudFrontError::no_such_origin_request_policy(id))?;
598 if entry.managed {
599 return Err(CloudFrontError::AccessDenied(
600 "AWS-managed origin request policies cannot be modified".to_owned(),
601 ));
602 }
603 check_if_match(if_match, &entry.etag)?;
604 entry.config = cfg;
605 entry.etag = new_etag();
606 entry.last_modified_time = Utc::now();
607 Ok(entry.value().clone())
608 }
609
610 pub fn delete_origin_request_policy(
612 &self,
613 id: &str,
614 if_match: Option<&str>,
615 ) -> Result<(), CloudFrontError> {
616 let entry = self
617 .store
618 .origin_request_policies
619 .get(id)
620 .ok_or_else(|| CloudFrontError::no_such_origin_request_policy(id))?;
621 if entry.managed {
622 return Err(CloudFrontError::AccessDenied(
623 "AWS-managed origin request policies cannot be deleted".to_owned(),
624 ));
625 }
626 check_if_match(if_match, &entry.etag)?;
627 drop(entry);
628 self.store.origin_request_policies.remove(id);
629 self.store
630 .tags
631 .remove(&origin_request_policy_arn(&self.config.account_id, id));
632 Ok(())
633 }
634
635 pub fn list_origin_request_policies(&self) -> Vec<OriginRequestPolicy> {
637 let mut v: Vec<_> = self
638 .store
639 .origin_request_policies
640 .iter()
641 .map(|e| e.value().clone())
642 .collect();
643 v.sort_by(|a, b| a.config.name.cmp(&b.config.name));
644 v
645 }
646
647 pub fn create_response_headers_policy(
653 &self,
654 cfg: ResponseHeadersPolicyConfig,
655 ) -> Result<ResponseHeadersPolicy, CloudFrontError> {
656 if cfg.name.is_empty() {
657 return Err(CloudFrontError::InvalidArgument(
658 "ResponseHeadersPolicyConfig.Name is required".to_owned(),
659 ));
660 }
661 let id = uuid::Uuid::new_v4().to_string();
662 let p = ResponseHeadersPolicy {
663 id: id.clone(),
664 last_modified_time: Utc::now(),
665 config: cfg,
666 etag: new_etag(),
667 managed: false,
668 };
669 self.store.response_headers_policies.insert(id, p.clone());
670 Ok(p)
671 }
672
673 pub fn get_response_headers_policy(
675 &self,
676 id: &str,
677 ) -> Result<ResponseHeadersPolicy, CloudFrontError> {
678 self.store
679 .response_headers_policies
680 .get(id)
681 .map(|r| r.value().clone())
682 .ok_or_else(|| CloudFrontError::no_such_response_headers_policy(id))
683 }
684
685 pub fn update_response_headers_policy(
687 &self,
688 id: &str,
689 if_match: Option<&str>,
690 cfg: ResponseHeadersPolicyConfig,
691 ) -> Result<ResponseHeadersPolicy, CloudFrontError> {
692 let mut entry = self
693 .store
694 .response_headers_policies
695 .get_mut(id)
696 .ok_or_else(|| CloudFrontError::no_such_response_headers_policy(id))?;
697 if entry.managed {
698 return Err(CloudFrontError::AccessDenied(
699 "AWS-managed response headers policies cannot be modified".to_owned(),
700 ));
701 }
702 check_if_match(if_match, &entry.etag)?;
703 entry.config = cfg;
704 entry.etag = new_etag();
705 entry.last_modified_time = Utc::now();
706 Ok(entry.value().clone())
707 }
708
709 pub fn delete_response_headers_policy(
711 &self,
712 id: &str,
713 if_match: Option<&str>,
714 ) -> Result<(), CloudFrontError> {
715 let entry = self
716 .store
717 .response_headers_policies
718 .get(id)
719 .ok_or_else(|| CloudFrontError::no_such_response_headers_policy(id))?;
720 if entry.managed {
721 return Err(CloudFrontError::AccessDenied(
722 "AWS-managed response headers policies cannot be deleted".to_owned(),
723 ));
724 }
725 check_if_match(if_match, &entry.etag)?;
726 drop(entry);
727 self.store.response_headers_policies.remove(id);
728 self.store
729 .tags
730 .remove(&response_headers_policy_arn(&self.config.account_id, id));
731 Ok(())
732 }
733
734 pub fn list_response_headers_policies(&self) -> Vec<ResponseHeadersPolicy> {
736 let mut v: Vec<_> = self
737 .store
738 .response_headers_policies
739 .iter()
740 .map(|e| e.value().clone())
741 .collect();
742 v.sort_by(|a, b| a.config.name.cmp(&b.config.name));
743 v
744 }
745
746 pub fn create_key_group(&self, cfg: KeyGroupConfig) -> Result<KeyGroup, CloudFrontError> {
752 if cfg.name.is_empty() {
753 return Err(CloudFrontError::InvalidArgument(
754 "KeyGroupConfig.Name is required".to_owned(),
755 ));
756 }
757 let id = new_id_with_prefix('K');
758 let kg = KeyGroup {
759 id: id.clone(),
760 last_modified_time: Utc::now(),
761 config: cfg,
762 etag: new_etag(),
763 };
764 self.store.key_groups.insert(id, kg.clone());
765 Ok(kg)
766 }
767
768 pub fn get_key_group(&self, id: &str) -> Result<KeyGroup, CloudFrontError> {
770 self.store
771 .key_groups
772 .get(id)
773 .map(|r| r.value().clone())
774 .ok_or_else(|| CloudFrontError::no_such_resource("KeyGroup", id))
775 }
776
777 pub fn update_key_group(
779 &self,
780 id: &str,
781 if_match: Option<&str>,
782 cfg: KeyGroupConfig,
783 ) -> Result<KeyGroup, CloudFrontError> {
784 let mut entry = self
785 .store
786 .key_groups
787 .get_mut(id)
788 .ok_or_else(|| CloudFrontError::no_such_resource("KeyGroup", id))?;
789 check_if_match(if_match, &entry.etag)?;
790 entry.config = cfg;
791 entry.etag = new_etag();
792 entry.last_modified_time = Utc::now();
793 Ok(entry.value().clone())
794 }
795
796 pub fn delete_key_group(
798 &self,
799 id: &str,
800 if_match: Option<&str>,
801 ) -> Result<(), CloudFrontError> {
802 let entry = self
803 .store
804 .key_groups
805 .get(id)
806 .ok_or_else(|| CloudFrontError::no_such_resource("KeyGroup", id))?;
807 check_if_match(if_match, &entry.etag)?;
808 drop(entry);
809 self.store.key_groups.remove(id);
810 self.store
811 .tags
812 .remove(&key_group_arn(&self.config.account_id, id));
813 Ok(())
814 }
815
816 pub fn list_key_groups(&self) -> Vec<KeyGroup> {
818 let mut v: Vec<_> = self
819 .store
820 .key_groups
821 .iter()
822 .map(|e| e.value().clone())
823 .collect();
824 v.sort_by(|a, b| a.id.cmp(&b.id));
825 v
826 }
827
828 pub fn create_public_key(&self, cfg: PublicKeyConfig) -> Result<PublicKey, CloudFrontError> {
830 if cfg.name.is_empty() || cfg.encoded_key.is_empty() {
831 return Err(CloudFrontError::InvalidArgument(
832 "PublicKeyConfig Name and EncodedKey are required".to_owned(),
833 ));
834 }
835 let id = new_id_with_prefix('K');
836 let pk = PublicKey {
837 id: id.clone(),
838 created_time: Utc::now(),
839 config: cfg,
840 etag: new_etag(),
841 };
842 self.store.public_keys.insert(id, pk.clone());
843 Ok(pk)
844 }
845
846 pub fn get_public_key(&self, id: &str) -> Result<PublicKey, CloudFrontError> {
848 self.store
849 .public_keys
850 .get(id)
851 .map(|r| r.value().clone())
852 .ok_or_else(|| CloudFrontError::no_such_public_key(id))
853 }
854
855 pub fn update_public_key(
857 &self,
858 id: &str,
859 if_match: Option<&str>,
860 cfg: PublicKeyConfig,
861 ) -> Result<PublicKey, CloudFrontError> {
862 let mut entry = self
863 .store
864 .public_keys
865 .get_mut(id)
866 .ok_or_else(|| CloudFrontError::no_such_public_key(id))?;
867 check_if_match(if_match, &entry.etag)?;
868 entry.config = cfg;
869 entry.etag = new_etag();
870 Ok(entry.value().clone())
871 }
872
873 pub fn delete_public_key(
875 &self,
876 id: &str,
877 if_match: Option<&str>,
878 ) -> Result<(), CloudFrontError> {
879 let entry = self
880 .store
881 .public_keys
882 .get(id)
883 .ok_or_else(|| CloudFrontError::no_such_public_key(id))?;
884 check_if_match(if_match, &entry.etag)?;
885 drop(entry);
886 self.store.public_keys.remove(id);
887 self.store
888 .tags
889 .remove(&public_key_arn(&self.config.account_id, id));
890 Ok(())
891 }
892
893 pub fn list_public_keys(&self) -> Vec<PublicKey> {
895 let mut v: Vec<_> = self
896 .store
897 .public_keys
898 .iter()
899 .map(|e| e.value().clone())
900 .collect();
901 v.sort_by(|a, b| a.id.cmp(&b.id));
902 v
903 }
904
905 pub fn create_function(
911 &self,
912 name: String,
913 cfg: FunctionConfig,
914 code: Vec<u8>,
915 ) -> Result<CloudFrontFunction, CloudFrontError> {
916 if name.is_empty() {
917 return Err(CloudFrontError::InvalidArgument(
918 "Function Name is required".to_owned(),
919 ));
920 }
921 if self.store.functions.contains_key(&name) {
922 return Err(CloudFrontError::AlreadyExists {
923 code: "FunctionAlreadyExists",
924 message: format!("Function {name} already exists"),
925 });
926 }
927 let arn = function_arn(&self.config.account_id, &name);
928 let now = Utc::now();
929 let f = CloudFrontFunction {
930 name: name.clone(),
931 arn: arn.clone(),
932 last_modified_time: now,
933 stage: "DEVELOPMENT".to_owned(),
934 metadata: FunctionMetadata {
935 function_arn: arn,
936 stage: "DEVELOPMENT".to_owned(),
937 created_time: now,
938 last_modified_time: now,
939 },
940 config: cfg,
941 code,
942 etag: new_etag(),
943 status: "UNPUBLISHED".to_owned(),
944 };
945 self.store.functions.insert(name, f.clone());
946 Ok(f)
947 }
948
949 pub fn get_function(&self, name: &str) -> Result<CloudFrontFunction, CloudFrontError> {
951 self.store
952 .functions
953 .get(name)
954 .map(|r| r.value().clone())
955 .ok_or_else(|| CloudFrontError::no_such_resource("Function", name))
956 }
957
958 pub fn update_function(
960 &self,
961 name: &str,
962 if_match: Option<&str>,
963 cfg: FunctionConfig,
964 code: Vec<u8>,
965 ) -> Result<CloudFrontFunction, CloudFrontError> {
966 let mut entry = self
967 .store
968 .functions
969 .get_mut(name)
970 .ok_or_else(|| CloudFrontError::no_such_resource("Function", name))?;
971 check_if_match(if_match, &entry.etag)?;
972 entry.config = cfg;
973 entry.code = code;
974 entry.etag = new_etag();
975 entry.last_modified_time = Utc::now();
976 Ok(entry.value().clone())
977 }
978
979 pub fn delete_function(
981 &self,
982 name: &str,
983 if_match: Option<&str>,
984 ) -> Result<(), CloudFrontError> {
985 let entry = self
986 .store
987 .functions
988 .get(name)
989 .ok_or_else(|| CloudFrontError::no_such_resource("Function", name))?;
990 check_if_match(if_match, &entry.etag)?;
991 drop(entry);
992 self.store.functions.remove(name);
993 Ok(())
994 }
995
996 pub fn publish_function(
998 &self,
999 name: &str,
1000 if_match: Option<&str>,
1001 ) -> Result<CloudFrontFunction, CloudFrontError> {
1002 let mut entry = self
1003 .store
1004 .functions
1005 .get_mut(name)
1006 .ok_or_else(|| CloudFrontError::no_such_resource("Function", name))?;
1007 check_if_match(if_match, &entry.etag)?;
1008 entry.stage = "LIVE".to_owned();
1009 entry.metadata.stage = "LIVE".to_owned();
1010 entry.status = "PUBLISHED".to_owned();
1011 entry.etag = new_etag();
1012 Ok(entry.value().clone())
1013 }
1014
1015 pub fn test_function(
1017 &self,
1018 name: &str,
1019 event_object: &[u8],
1020 ) -> Result<(Vec<u8>, String), CloudFrontError> {
1021 let _ = self.get_function(name)?;
1022 let result = br#"{"status":"success","testStatus":"OK"}"#.to_vec();
1023 let compute_util = format!("compute_utilization_percent={}", event_object.len());
1024 Ok((result, compute_util))
1025 }
1026
1027 pub fn list_functions(&self) -> Vec<CloudFrontFunction> {
1029 let mut v: Vec<_> = self
1030 .store
1031 .functions
1032 .iter()
1033 .map(|e| e.value().clone())
1034 .collect();
1035 v.sort_by(|a, b| a.name.cmp(&b.name));
1036 v
1037 }
1038
1039 pub fn create_fle_config(
1045 &self,
1046 cfg: FieldLevelEncryptionConfig,
1047 ) -> Result<FieldLevelEncryption, CloudFrontError> {
1048 let id = new_id_with_prefix('F');
1049 let f = FieldLevelEncryption {
1050 id: id.clone(),
1051 last_modified_time: Utc::now(),
1052 config: cfg,
1053 etag: new_etag(),
1054 };
1055 self.store.fle_configs.insert(id, f.clone());
1056 Ok(f)
1057 }
1058
1059 pub fn get_fle_config(&self, id: &str) -> Result<FieldLevelEncryption, CloudFrontError> {
1061 self.store
1062 .fle_configs
1063 .get(id)
1064 .map(|r| r.value().clone())
1065 .ok_or_else(|| CloudFrontError::no_such_resource("FieldLevelEncryption", id))
1066 }
1067
1068 pub fn update_fle_config(
1070 &self,
1071 id: &str,
1072 if_match: Option<&str>,
1073 cfg: FieldLevelEncryptionConfig,
1074 ) -> Result<FieldLevelEncryption, CloudFrontError> {
1075 let mut entry = self
1076 .store
1077 .fle_configs
1078 .get_mut(id)
1079 .ok_or_else(|| CloudFrontError::no_such_resource("FieldLevelEncryption", id))?;
1080 check_if_match(if_match, &entry.etag)?;
1081 entry.config = cfg;
1082 entry.etag = new_etag();
1083 entry.last_modified_time = Utc::now();
1084 Ok(entry.value().clone())
1085 }
1086
1087 pub fn delete_fle_config(
1089 &self,
1090 id: &str,
1091 if_match: Option<&str>,
1092 ) -> Result<(), CloudFrontError> {
1093 let entry = self
1094 .store
1095 .fle_configs
1096 .get(id)
1097 .ok_or_else(|| CloudFrontError::no_such_resource("FieldLevelEncryption", id))?;
1098 check_if_match(if_match, &entry.etag)?;
1099 drop(entry);
1100 self.store.fle_configs.remove(id);
1101 Ok(())
1102 }
1103
1104 pub fn list_fle_configs(&self) -> Vec<FieldLevelEncryption> {
1106 self.store
1107 .fle_configs
1108 .iter()
1109 .map(|e| e.value().clone())
1110 .collect()
1111 }
1112
1113 pub fn create_fle_profile(
1115 &self,
1116 cfg: FieldLevelEncryptionProfileConfig,
1117 ) -> Result<FieldLevelEncryptionProfile, CloudFrontError> {
1118 let id = new_id_with_prefix('P');
1119 let p = FieldLevelEncryptionProfile {
1120 id: id.clone(),
1121 last_modified_time: Utc::now(),
1122 config: cfg,
1123 etag: new_etag(),
1124 };
1125 self.store.fle_profiles.insert(id, p.clone());
1126 Ok(p)
1127 }
1128
1129 pub fn get_fle_profile(
1131 &self,
1132 id: &str,
1133 ) -> Result<FieldLevelEncryptionProfile, CloudFrontError> {
1134 self.store
1135 .fle_profiles
1136 .get(id)
1137 .map(|r| r.value().clone())
1138 .ok_or_else(|| CloudFrontError::no_such_resource("FieldLevelEncryptionProfile", id))
1139 }
1140
1141 pub fn update_fle_profile(
1143 &self,
1144 id: &str,
1145 if_match: Option<&str>,
1146 cfg: FieldLevelEncryptionProfileConfig,
1147 ) -> Result<FieldLevelEncryptionProfile, CloudFrontError> {
1148 let mut entry =
1149 self.store.fle_profiles.get_mut(id).ok_or_else(|| {
1150 CloudFrontError::no_such_resource("FieldLevelEncryptionProfile", id)
1151 })?;
1152 check_if_match(if_match, &entry.etag)?;
1153 entry.config = cfg;
1154 entry.etag = new_etag();
1155 entry.last_modified_time = Utc::now();
1156 Ok(entry.value().clone())
1157 }
1158
1159 pub fn delete_fle_profile(
1161 &self,
1162 id: &str,
1163 if_match: Option<&str>,
1164 ) -> Result<(), CloudFrontError> {
1165 let entry =
1166 self.store.fle_profiles.get(id).ok_or_else(|| {
1167 CloudFrontError::no_such_resource("FieldLevelEncryptionProfile", id)
1168 })?;
1169 check_if_match(if_match, &entry.etag)?;
1170 drop(entry);
1171 self.store.fle_profiles.remove(id);
1172 Ok(())
1173 }
1174
1175 pub fn list_fle_profiles(&self) -> Vec<FieldLevelEncryptionProfile> {
1177 self.store
1178 .fle_profiles
1179 .iter()
1180 .map(|e| e.value().clone())
1181 .collect()
1182 }
1183
1184 pub fn create_monitoring_subscription(
1190 &self,
1191 distribution_id: &str,
1192 enabled: bool,
1193 ) -> Result<MonitoringSubscription, CloudFrontError> {
1194 if !self.store.distributions.contains_key(distribution_id) {
1195 return Err(CloudFrontError::no_such_distribution(distribution_id));
1196 }
1197 let sub = MonitoringSubscription {
1198 distribution_id: distribution_id.to_owned(),
1199 realtime_metrics_subscription_status: if enabled {
1200 "Enabled".to_owned()
1201 } else {
1202 "Disabled".to_owned()
1203 },
1204 };
1205 self.store
1206 .monitoring_subscriptions
1207 .insert(distribution_id.to_owned(), sub.clone());
1208 Ok(sub)
1209 }
1210
1211 pub fn get_monitoring_subscription(
1213 &self,
1214 distribution_id: &str,
1215 ) -> Result<MonitoringSubscription, CloudFrontError> {
1216 self.store
1217 .monitoring_subscriptions
1218 .get(distribution_id)
1219 .map(|r| r.value().clone())
1220 .ok_or_else(|| {
1221 CloudFrontError::no_such_resource("MonitoringSubscription", distribution_id)
1222 })
1223 }
1224
1225 pub fn delete_monitoring_subscription(
1227 &self,
1228 distribution_id: &str,
1229 ) -> Result<(), CloudFrontError> {
1230 if self
1231 .store
1232 .monitoring_subscriptions
1233 .remove(distribution_id)
1234 .is_none()
1235 {
1236 return Err(CloudFrontError::no_such_resource(
1237 "MonitoringSubscription",
1238 distribution_id,
1239 ));
1240 }
1241 Ok(())
1242 }
1243
1244 pub fn create_kvs(
1250 &self,
1251 name: String,
1252 comment: String,
1253 ) -> Result<KeyValueStore, CloudFrontError> {
1254 let id = uuid::Uuid::new_v4().to_string();
1255 let arn = kvs_arn(&self.config.account_id, &id);
1256 let kvs = KeyValueStore {
1257 id: id.clone(),
1258 name,
1259 arn,
1260 comment,
1261 status: "PROVISIONING".to_owned(),
1262 last_modified_time: Utc::now(),
1263 etag: new_etag(),
1264 };
1265 self.store.key_value_stores.insert(id, kvs.clone());
1266 Ok(kvs)
1267 }
1268
1269 pub fn get_kvs(&self, id: &str) -> Result<KeyValueStore, CloudFrontError> {
1271 self.store
1272 .key_value_stores
1273 .get(id)
1274 .map(|r| r.value().clone())
1275 .ok_or_else(|| CloudFrontError::no_such_resource("KeyValueStore", id))
1276 }
1277
1278 pub fn update_kvs(
1280 &self,
1281 id: &str,
1282 if_match: Option<&str>,
1283 comment: String,
1284 ) -> Result<KeyValueStore, CloudFrontError> {
1285 let mut entry = self
1286 .store
1287 .key_value_stores
1288 .get_mut(id)
1289 .ok_or_else(|| CloudFrontError::no_such_resource("KeyValueStore", id))?;
1290 check_if_match(if_match, &entry.etag)?;
1291 entry.comment = comment;
1292 entry.etag = new_etag();
1293 entry.last_modified_time = Utc::now();
1294 Ok(entry.value().clone())
1295 }
1296
1297 pub fn delete_kvs(&self, id: &str, if_match: Option<&str>) -> Result<(), CloudFrontError> {
1299 let entry = self
1300 .store
1301 .key_value_stores
1302 .get(id)
1303 .ok_or_else(|| CloudFrontError::no_such_resource("KeyValueStore", id))?;
1304 check_if_match(if_match, &entry.etag)?;
1305 drop(entry);
1306 self.store.key_value_stores.remove(id);
1307 Ok(())
1308 }
1309
1310 pub fn list_kvs(&self) -> Vec<KeyValueStore> {
1312 self.store
1313 .key_value_stores
1314 .iter()
1315 .map(|e| e.value().clone())
1316 .collect()
1317 }
1318
1319 pub fn create_realtime_log_config(
1325 &self,
1326 cfg: RealtimeLogConfig,
1327 ) -> Result<RealtimeLogConfig, CloudFrontError> {
1328 if cfg.name.is_empty() {
1329 return Err(CloudFrontError::InvalidArgument(
1330 "RealtimeLogConfig Name is required".to_owned(),
1331 ));
1332 }
1333 let mut final_cfg = cfg;
1334 if final_cfg.arn.is_empty() {
1335 final_cfg.arn = realtime_log_arn(&self.config.account_id, &final_cfg.name);
1336 }
1337 self.store
1338 .realtime_log_configs
1339 .insert(final_cfg.name.clone(), final_cfg.clone());
1340 Ok(final_cfg)
1341 }
1342
1343 pub fn get_realtime_log_config(
1345 &self,
1346 name: &str,
1347 ) -> Result<RealtimeLogConfig, CloudFrontError> {
1348 self.store
1349 .realtime_log_configs
1350 .get(name)
1351 .map(|r| r.value().clone())
1352 .ok_or_else(|| CloudFrontError::no_such_resource("RealtimeLogConfig", name))
1353 }
1354
1355 pub fn update_realtime_log_config(
1357 &self,
1358 cfg: RealtimeLogConfig,
1359 ) -> Result<RealtimeLogConfig, CloudFrontError> {
1360 if !self.store.realtime_log_configs.contains_key(&cfg.name) {
1361 return Err(CloudFrontError::no_such_resource(
1362 "RealtimeLogConfig",
1363 &cfg.name,
1364 ));
1365 }
1366 self.store
1367 .realtime_log_configs
1368 .insert(cfg.name.clone(), cfg.clone());
1369 Ok(cfg)
1370 }
1371
1372 pub fn delete_realtime_log_config(&self, name: &str) -> Result<(), CloudFrontError> {
1374 if self.store.realtime_log_configs.remove(name).is_none() {
1375 return Err(CloudFrontError::no_such_resource("RealtimeLogConfig", name));
1376 }
1377 Ok(())
1378 }
1379
1380 pub fn list_realtime_log_configs(&self) -> Vec<RealtimeLogConfig> {
1382 self.store
1383 .realtime_log_configs
1384 .iter()
1385 .map(|e| e.value().clone())
1386 .collect()
1387 }
1388
1389 pub fn tag_resource(&self, arn: &str, new_tags: &[Tag]) -> Result<(), CloudFrontError> {
1395 let mut entry = self.store.tags.entry(arn.to_owned()).or_default();
1396 merge_tags(entry.value_mut(), new_tags);
1397 Ok(())
1398 }
1399
1400 pub fn untag_resource(&self, arn: &str, keys: &[String]) -> Result<(), CloudFrontError> {
1402 if let Some(mut entry) = self.store.tags.get_mut(arn) {
1403 entry.retain(|t| !keys.iter().any(|k| k == &t.key));
1404 }
1405 Ok(())
1406 }
1407
1408 pub fn list_tags_for_resource(&self, arn: &str) -> Result<TagSet, CloudFrontError> {
1410 Ok(self
1411 .store
1412 .tags
1413 .get(arn)
1414 .map(|r| r.value().clone())
1415 .unwrap_or_default())
1416 }
1417
1418 fn spawn_distribution_deployment(self: &Arc<Self>, id: &str) {
1423 let delay = self.config.distribution_propagation;
1424 let id = id.to_owned();
1425 let provider = Arc::clone(self);
1426 tokio::spawn(async move {
1427 if !delay.is_zero() {
1428 tokio::time::sleep(delay).await;
1429 }
1430 if let Some(mut d) = provider.store.distributions.get_mut(&id) {
1431 d.status = ResourceStatus::Deployed;
1432 }
1433 });
1434 }
1435
1436 fn spawn_invalidation_completion(
1437 self: &Arc<Self>,
1438 distribution_id: &str,
1439 invalidation_id: &str,
1440 ) {
1441 let delay = self.config.invalidation_propagation;
1442 let dist_id = distribution_id.to_owned();
1443 let inv_id = invalidation_id.to_owned();
1444 let provider = Arc::clone(self);
1445 tokio::spawn(async move {
1446 if !delay.is_zero() {
1447 tokio::time::sleep(delay).await;
1448 }
1449 if let Some(mut inv) = provider
1450 .store
1451 .invalidations
1452 .get_mut(&(dist_id.clone(), inv_id))
1453 {
1454 inv.status = ResourceStatus::Completed;
1455 }
1456 if let Some(mut d) = provider.store.distributions.get_mut(&dist_id) {
1457 d.in_progress_invalidation_batches =
1458 (d.in_progress_invalidation_batches - 1).max(0);
1459 }
1460 });
1461 }
1462}
1463
1464fn validate_distribution_config(cfg: &DistributionConfig) -> Result<(), CloudFrontError> {
1469 if cfg.caller_reference.is_empty() {
1470 return Err(CloudFrontError::MissingArgument(
1471 "CallerReference is required".to_owned(),
1472 ));
1473 }
1474 if cfg.origins.is_empty() {
1475 return Err(CloudFrontError::MissingArgument(
1476 "At least one Origin is required".to_owned(),
1477 ));
1478 }
1479 if cfg.default_cache_behavior.target_origin_id.is_empty() {
1480 return Err(CloudFrontError::MissingArgument(
1481 "DefaultCacheBehavior.TargetOriginId is required".to_owned(),
1482 ));
1483 }
1484 let origin_ids: std::collections::HashSet<&str> =
1485 cfg.origins.iter().map(|o| o.id.as_str()).collect();
1486 if !origin_ids.contains(cfg.default_cache_behavior.target_origin_id.as_str())
1487 && !cfg
1488 .origin_groups
1489 .iter()
1490 .any(|g| g.id == cfg.default_cache_behavior.target_origin_id)
1491 {
1492 return Err(CloudFrontError::MalformedInput(format!(
1493 "DefaultCacheBehavior.TargetOriginId {} does not match any Origin.Id",
1494 cfg.default_cache_behavior.target_origin_id
1495 )));
1496 }
1497 for cb in &cfg.cache_behaviors {
1498 if !origin_ids.contains(cb.target_origin_id.as_str())
1499 && !cfg
1500 .origin_groups
1501 .iter()
1502 .any(|g| g.id == cb.target_origin_id)
1503 {
1504 return Err(CloudFrontError::MalformedInput(format!(
1505 "CacheBehavior.TargetOriginId {} does not match any Origin.Id",
1506 cb.target_origin_id
1507 )));
1508 }
1509 }
1510 Ok(())
1511}
1512
1513fn check_if_match(supplied: Option<&str>, current: &str) -> Result<(), CloudFrontError> {
1514 match supplied {
1515 None | Some("") => Err(CloudFrontError::InvalidIfMatchVersion(
1516 "The If-Match version is missing".to_owned(),
1517 )),
1518 Some(v) if v == current => Ok(()),
1519 Some(_) => Err(CloudFrontError::PreconditionFailed(
1520 "The If-Match version is not valid for the resource".to_owned(),
1521 )),
1522 }
1523}
1524
1525fn merge_tags(existing: &mut TagSet, new_tags: &[Tag]) {
1526 for t in new_tags {
1527 if let Some(existing_tag) = existing.iter_mut().find(|e| e.key == t.key) {
1528 existing_tag.value = t.value.clone();
1529 } else {
1530 existing.push(t.clone());
1531 }
1532 }
1533}