Skip to main content

rustack_cloudfront_core/
provider.rs

1//! CloudFront provider — owns the store and implements every operation.
2//!
3//! Operations are grouped by resource kind. Each mutating op follows the same
4//! skeleton:
5//!
6//! 1. Look up the resource (or fail with the appropriate `NoSuch*`).
7//! 2. Check `If-Match` for Update/Delete (or fail `PreconditionFailed`).
8//! 3. Mutate, bump ETag, update `last_modified_time`.
9//! 4. Spawn propagation simulator if this is a distribution/invalidation.
10//!
11//! The ETag model is monotonic: every successful mutation generates a brand
12//! new opaque token. ETags are never reused.
13
14#![allow(clippy::too_many_lines)]
15#![allow(clippy::too_many_arguments)]
16
17use std::{cmp::Reverse, sync::Arc};
18
19use chrono::Utc;
20use rustack_cloudfront_model::{
21    CachePolicy, CachePolicyConfig, CloudFrontError, CloudFrontFunction,
22    CloudFrontOriginAccessIdentity, CloudFrontOriginAccessIdentityConfig, Distribution,
23    DistributionConfig, FieldLevelEncryption, FieldLevelEncryptionConfig,
24    FieldLevelEncryptionProfile, FieldLevelEncryptionProfileConfig, FunctionConfig,
25    FunctionMetadata, Invalidation, InvalidationBatch, KeyGroup, KeyGroupConfig, KeyValueStore,
26    MonitoringSubscription, OriginAccessControl, OriginAccessControlConfig, OriginRequestPolicy,
27    OriginRequestPolicyConfig, PublicKey, PublicKeyConfig, RealtimeLogConfig, ResourceStatus,
28    ResponseHeadersPolicy, ResponseHeadersPolicyConfig, Tag, TagSet,
29};
30use tracing::info;
31
32use crate::{
33    arn::{
34        cache_policy_arn, distribution_arn, function_arn, key_group_arn, kvs_arn, oai_arn,
35        origin_access_control_arn, origin_request_policy_arn, public_key_arn, realtime_log_arn,
36        response_headers_policy_arn,
37    },
38    config::CloudFrontConfig,
39    id_gen::{
40        deterministic_id_with_prefix, distribution_domain_name, new_distribution_id, new_etag,
41        new_id_with_prefix, new_invalidation_id, new_s3_canonical_user_id,
42    },
43    managed::{
44        managed_cache_policies, managed_origin_request_policies, managed_response_headers_policies,
45    },
46    store::{CloudFrontStore, CloudFrontStoreSnapshot},
47};
48
49/// Main provider.
50#[derive(Debug)]
51pub struct RustackCloudFront {
52    store: Arc<CloudFrontStore>,
53    config: Arc<CloudFrontConfig>,
54}
55
56impl RustackCloudFront {
57    /// Build a new provider with managed policies pre-seeded.
58    #[must_use]
59    pub fn new(config: CloudFrontConfig) -> Self {
60        let store = CloudFrontStore::new();
61        for p in managed_cache_policies() {
62            store.cache_policies.insert(p.id.clone(), p);
63        }
64        for p in managed_origin_request_policies() {
65            store.origin_request_policies.insert(p.id.clone(), p);
66        }
67        for p in managed_response_headers_policies() {
68            store.response_headers_policies.insert(p.id.clone(), p);
69        }
70        Self {
71            store,
72            config: Arc::new(config),
73        }
74    }
75
76    /// Shared store handle.
77    #[must_use]
78    pub fn store(&self) -> &Arc<CloudFrontStore> {
79        &self.store
80    }
81
82    /// Runtime configuration.
83    #[must_use]
84    pub fn config(&self) -> &CloudFrontConfig {
85        &self.config
86    }
87
88    /// Export a point-in-time snapshot of CloudFront management resources.
89    #[must_use]
90    pub fn export_snapshot(&self) -> CloudFrontStoreSnapshot {
91        self.store.export_snapshot()
92    }
93
94    /// Replace CloudFront management resources from a snapshot.
95    pub fn import_snapshot(&self, snapshot: CloudFrontStoreSnapshot) {
96        self.store.import_snapshot(snapshot);
97    }
98
99    // ---------------------------------------------------------------------
100    // Distribution operations
101    // ---------------------------------------------------------------------
102
103    /// CreateDistribution / CreateDistributionWithTags.
104    pub fn create_distribution(
105        self: &Arc<Self>,
106        config: DistributionConfig,
107        tags: TagSet,
108    ) -> Result<Distribution, CloudFrontError> {
109        validate_distribution_config(&config)?;
110
111        let id = if self.config.deterministic_ids {
112            deterministic_id_with_prefix('E', &config.caller_reference)
113        } else {
114            new_distribution_id()
115        };
116        let arn = distribution_arn(&self.config.account_id, &id);
117        let domain_name = distribution_domain_name(&id, &self.config.domain_suffix);
118        let etag = new_etag();
119        let dist = Distribution {
120            id: id.clone(),
121            arn: arn.clone(),
122            status: ResourceStatus::InProgress,
123            last_modified_time: Utc::now(),
124            domain_name,
125            in_progress_invalidation_batches: 0,
126            active_trusted_signers_enabled: false,
127            active_trusted_key_groups_enabled: false,
128            config,
129            tags: tags.clone(),
130            etag,
131            alias_icp_recordal: Vec::new(),
132        };
133
134        if !tags.is_empty() {
135            self.store.tags.insert(arn.clone(), tags);
136        }
137        self.store.distributions.insert(id.clone(), dist.clone());
138        self.spawn_distribution_deployment(&id);
139        info!(distribution_id = %id, "created distribution");
140        Ok(dist)
141    }
142
143    /// GetDistribution returns the full `Distribution` record.
144    pub fn get_distribution(&self, id: &str) -> Result<Distribution, CloudFrontError> {
145        self.store
146            .distributions
147            .get(id)
148            .map(|r| r.value().clone())
149            .ok_or_else(|| CloudFrontError::no_such_distribution(id))
150    }
151
152    /// UpdateDistribution.
153    pub fn update_distribution(
154        self: &Arc<Self>,
155        id: &str,
156        if_match: Option<&str>,
157        new_config: DistributionConfig,
158    ) -> Result<Distribution, CloudFrontError> {
159        validate_distribution_config(&new_config)?;
160        let mut entry = self
161            .store
162            .distributions
163            .get_mut(id)
164            .ok_or_else(|| CloudFrontError::no_such_distribution(id))?;
165        check_if_match(if_match, &entry.etag)?;
166        entry.config = new_config;
167        entry.etag = new_etag();
168        entry.status = ResourceStatus::InProgress;
169        entry.last_modified_time = Utc::now();
170        let clone = entry.value().clone();
171        drop(entry);
172        self.spawn_distribution_deployment(id);
173        Ok(clone)
174    }
175
176    /// DeleteDistribution.
177    pub fn delete_distribution(
178        &self,
179        id: &str,
180        if_match: Option<&str>,
181    ) -> Result<(), CloudFrontError> {
182        let dist = self
183            .store
184            .distributions
185            .get(id)
186            .ok_or_else(|| CloudFrontError::no_such_distribution(id))?;
187        check_if_match(if_match, &dist.etag)?;
188        if dist.config.enabled {
189            return Err(CloudFrontError::DistributionNotDisabled(format!(
190                "Distribution {id} is enabled; disable it before deleting."
191            )));
192        }
193        drop(dist);
194        self.store.distributions.remove(id);
195        self.store
196            .tags
197            .remove(&distribution_arn(&self.config.account_id, id));
198        Ok(())
199    }
200
201    /// ListDistributions (unpaginated — Rustack scale makes this fine).
202    pub fn list_distributions(&self) -> Vec<Distribution> {
203        let mut v: Vec<_> = self
204            .store
205            .distributions
206            .iter()
207            .map(|e| e.value().clone())
208            .collect();
209        v.sort_by(|a, b| a.id.cmp(&b.id));
210        v
211    }
212
213    /// CopyDistribution: clone an existing distribution under a new ID.
214    pub fn copy_distribution(
215        self: &Arc<Self>,
216        primary_id: &str,
217        caller_reference: &str,
218        staging: bool,
219    ) -> Result<Distribution, CloudFrontError> {
220        let primary = self
221            .store
222            .distributions
223            .get(primary_id)
224            .ok_or_else(|| CloudFrontError::no_such_distribution(primary_id))?;
225        let mut new_cfg = primary.config.clone();
226        new_cfg.caller_reference = caller_reference.to_owned();
227        new_cfg.staging = staging;
228        drop(primary);
229        self.create_distribution(new_cfg, Vec::new())
230    }
231
232    // ---------------------------------------------------------------------
233    // Invalidation operations
234    // ---------------------------------------------------------------------
235
236    /// CreateInvalidation.
237    pub fn create_invalidation(
238        self: &Arc<Self>,
239        distribution_id: &str,
240        batch: InvalidationBatch,
241    ) -> Result<Invalidation, CloudFrontError> {
242        if !self.store.distributions.contains_key(distribution_id) {
243            return Err(CloudFrontError::no_such_distribution(distribution_id));
244        }
245        if batch.paths.is_empty() {
246            return Err(CloudFrontError::InvalidArgument(
247                "Invalidation batch must contain at least one path".to_owned(),
248            ));
249        }
250        let id = if self.config.deterministic_ids {
251            deterministic_id_with_prefix('I', &batch.caller_reference)
252        } else {
253            new_invalidation_id()
254        };
255        let inv = Invalidation {
256            id: id.clone(),
257            status: ResourceStatus::InProgress,
258            create_time: Utc::now(),
259            distribution_id: distribution_id.to_owned(),
260            batch,
261        };
262        self.store
263            .invalidations
264            .insert((distribution_id.to_owned(), id.clone()), inv.clone());
265        if let Some(mut d) = self.store.distributions.get_mut(distribution_id) {
266            d.in_progress_invalidation_batches += 1;
267        }
268        self.spawn_invalidation_completion(distribution_id, &id);
269        Ok(inv)
270    }
271
272    /// GetInvalidation.
273    pub fn get_invalidation(
274        &self,
275        distribution_id: &str,
276        invalidation_id: &str,
277    ) -> Result<Invalidation, CloudFrontError> {
278        self.store
279            .invalidations
280            .get(&(distribution_id.to_owned(), invalidation_id.to_owned()))
281            .map(|r| r.value().clone())
282            .ok_or_else(|| CloudFrontError::no_such_invalidation(invalidation_id))
283    }
284
285    /// ListInvalidations for a distribution.
286    pub fn list_invalidations(&self, distribution_id: &str) -> Vec<Invalidation> {
287        let mut v: Vec<_> = self
288            .store
289            .invalidations
290            .iter()
291            .filter(|e| e.key().0 == distribution_id)
292            .map(|e| e.value().clone())
293            .collect();
294        v.sort_by_key(|invalidation| Reverse(invalidation.create_time));
295        v
296    }
297
298    // ---------------------------------------------------------------------
299    // Origin Access Control (OAC)
300    // ---------------------------------------------------------------------
301
302    /// CreateOriginAccessControl.
303    pub fn create_oac(
304        &self,
305        cfg: OriginAccessControlConfig,
306    ) -> Result<OriginAccessControl, CloudFrontError> {
307        if cfg.name.is_empty() {
308            return Err(CloudFrontError::InvalidArgument(
309                "OriginAccessControl Name is required".to_owned(),
310            ));
311        }
312        let id = new_id_with_prefix('E');
313        let oac = OriginAccessControl {
314            id: id.clone(),
315            config: cfg,
316            etag: new_etag(),
317        };
318        self.store.origin_access_controls.insert(id, oac.clone());
319        Ok(oac)
320    }
321
322    /// GetOriginAccessControl.
323    pub fn get_oac(&self, id: &str) -> Result<OriginAccessControl, CloudFrontError> {
324        self.store
325            .origin_access_controls
326            .get(id)
327            .map(|r| r.value().clone())
328            .ok_or_else(|| CloudFrontError::no_such_origin_access_control(id))
329    }
330
331    /// UpdateOriginAccessControl.
332    pub fn update_oac(
333        &self,
334        id: &str,
335        if_match: Option<&str>,
336        cfg: OriginAccessControlConfig,
337    ) -> Result<OriginAccessControl, CloudFrontError> {
338        let mut entry = self
339            .store
340            .origin_access_controls
341            .get_mut(id)
342            .ok_or_else(|| CloudFrontError::no_such_origin_access_control(id))?;
343        check_if_match(if_match, &entry.etag)?;
344        entry.config = cfg;
345        entry.etag = new_etag();
346        Ok(entry.value().clone())
347    }
348
349    /// DeleteOriginAccessControl.
350    pub fn delete_oac(&self, id: &str, if_match: Option<&str>) -> Result<(), CloudFrontError> {
351        let entry = self
352            .store
353            .origin_access_controls
354            .get(id)
355            .ok_or_else(|| CloudFrontError::no_such_origin_access_control(id))?;
356        check_if_match(if_match, &entry.etag)?;
357        drop(entry);
358        self.store.origin_access_controls.remove(id);
359        self.store
360            .tags
361            .remove(&origin_access_control_arn(&self.config.account_id, id));
362        Ok(())
363    }
364
365    /// ListOriginAccessControls.
366    pub fn list_oacs(&self) -> Vec<OriginAccessControl> {
367        let mut v: Vec<_> = self
368            .store
369            .origin_access_controls
370            .iter()
371            .map(|e| e.value().clone())
372            .collect();
373        v.sort_by(|a, b| a.id.cmp(&b.id));
374        v
375    }
376
377    // ---------------------------------------------------------------------
378    // Cloudfront Origin Access Identity (OAI, legacy)
379    // ---------------------------------------------------------------------
380
381    /// CreateCloudFrontOriginAccessIdentity.
382    pub fn create_oai(
383        &self,
384        cfg: CloudFrontOriginAccessIdentityConfig,
385    ) -> Result<CloudFrontOriginAccessIdentity, CloudFrontError> {
386        let id = new_id_with_prefix('E');
387        let oai = CloudFrontOriginAccessIdentity {
388            id: id.clone(),
389            s3_canonical_user_id: new_s3_canonical_user_id(),
390            config: cfg,
391            etag: new_etag(),
392        };
393        self.store.origin_access_identities.insert(id, oai.clone());
394        Ok(oai)
395    }
396
397    /// GetCloudFrontOriginAccessIdentity.
398    pub fn get_oai(&self, id: &str) -> Result<CloudFrontOriginAccessIdentity, CloudFrontError> {
399        self.store
400            .origin_access_identities
401            .get(id)
402            .map(|r| r.value().clone())
403            .ok_or_else(|| CloudFrontError::no_such_oai(id))
404    }
405
406    /// UpdateCloudFrontOriginAccessIdentity.
407    pub fn update_oai(
408        &self,
409        id: &str,
410        if_match: Option<&str>,
411        cfg: CloudFrontOriginAccessIdentityConfig,
412    ) -> Result<CloudFrontOriginAccessIdentity, CloudFrontError> {
413        let mut entry = self
414            .store
415            .origin_access_identities
416            .get_mut(id)
417            .ok_or_else(|| CloudFrontError::no_such_oai(id))?;
418        check_if_match(if_match, &entry.etag)?;
419        entry.config = cfg;
420        entry.etag = new_etag();
421        Ok(entry.value().clone())
422    }
423
424    /// DeleteCloudFrontOriginAccessIdentity.
425    pub fn delete_oai(&self, id: &str, if_match: Option<&str>) -> Result<(), CloudFrontError> {
426        let entry = self
427            .store
428            .origin_access_identities
429            .get(id)
430            .ok_or_else(|| CloudFrontError::no_such_oai(id))?;
431        check_if_match(if_match, &entry.etag)?;
432        drop(entry);
433        self.store.origin_access_identities.remove(id);
434        self.store
435            .tags
436            .remove(&oai_arn(&self.config.account_id, id));
437        Ok(())
438    }
439
440    /// ListCloudFrontOriginAccessIdentities.
441    pub fn list_oais(&self) -> Vec<CloudFrontOriginAccessIdentity> {
442        let mut v: Vec<_> = self
443            .store
444            .origin_access_identities
445            .iter()
446            .map(|e| e.value().clone())
447            .collect();
448        v.sort_by(|a, b| a.id.cmp(&b.id));
449        v
450    }
451
452    // ---------------------------------------------------------------------
453    // Cache policy
454    // ---------------------------------------------------------------------
455
456    /// CreateCachePolicy.
457    pub fn create_cache_policy(
458        &self,
459        cfg: CachePolicyConfig,
460    ) -> Result<CachePolicy, CloudFrontError> {
461        if cfg.name.is_empty() {
462            return Err(CloudFrontError::InvalidArgument(
463                "CachePolicyConfig.Name is required".to_owned(),
464            ));
465        }
466        let id = uuid::Uuid::new_v4().to_string();
467        let p = CachePolicy {
468            id: id.clone(),
469            last_modified_time: Utc::now(),
470            config: cfg,
471            etag: new_etag(),
472            managed: false,
473        };
474        self.store.cache_policies.insert(id, p.clone());
475        Ok(p)
476    }
477
478    /// GetCachePolicy.
479    pub fn get_cache_policy(&self, id: &str) -> Result<CachePolicy, CloudFrontError> {
480        self.store
481            .cache_policies
482            .get(id)
483            .map(|r| r.value().clone())
484            .ok_or_else(|| CloudFrontError::no_such_cache_policy(id))
485    }
486
487    /// UpdateCachePolicy — managed policies are immutable.
488    pub fn update_cache_policy(
489        &self,
490        id: &str,
491        if_match: Option<&str>,
492        cfg: CachePolicyConfig,
493    ) -> Result<CachePolicy, CloudFrontError> {
494        let mut entry = self
495            .store
496            .cache_policies
497            .get_mut(id)
498            .ok_or_else(|| CloudFrontError::no_such_cache_policy(id))?;
499        if entry.managed {
500            return Err(CloudFrontError::AccessDenied(
501                "AWS-managed cache policies cannot be modified".to_owned(),
502            ));
503        }
504        check_if_match(if_match, &entry.etag)?;
505        entry.config = cfg;
506        entry.etag = new_etag();
507        entry.last_modified_time = Utc::now();
508        Ok(entry.value().clone())
509    }
510
511    /// DeleteCachePolicy.
512    pub fn delete_cache_policy(
513        &self,
514        id: &str,
515        if_match: Option<&str>,
516    ) -> Result<(), CloudFrontError> {
517        let entry = self
518            .store
519            .cache_policies
520            .get(id)
521            .ok_or_else(|| CloudFrontError::no_such_cache_policy(id))?;
522        if entry.managed {
523            return Err(CloudFrontError::AccessDenied(
524                "AWS-managed cache policies cannot be deleted".to_owned(),
525            ));
526        }
527        check_if_match(if_match, &entry.etag)?;
528        drop(entry);
529        self.store.cache_policies.remove(id);
530        self.store
531            .tags
532            .remove(&cache_policy_arn(&self.config.account_id, id));
533        Ok(())
534    }
535
536    /// ListCachePolicies.
537    pub fn list_cache_policies(&self) -> Vec<CachePolicy> {
538        let mut v: Vec<_> = self
539            .store
540            .cache_policies
541            .iter()
542            .map(|e| e.value().clone())
543            .collect();
544        v.sort_by(|a, b| a.config.name.cmp(&b.config.name));
545        v
546    }
547
548    // ---------------------------------------------------------------------
549    // Origin request policy
550    // ---------------------------------------------------------------------
551
552    /// CreateOriginRequestPolicy.
553    pub fn create_origin_request_policy(
554        &self,
555        cfg: OriginRequestPolicyConfig,
556    ) -> Result<OriginRequestPolicy, CloudFrontError> {
557        if cfg.name.is_empty() {
558            return Err(CloudFrontError::InvalidArgument(
559                "OriginRequestPolicyConfig.Name is required".to_owned(),
560            ));
561        }
562        let id = uuid::Uuid::new_v4().to_string();
563        let p = OriginRequestPolicy {
564            id: id.clone(),
565            last_modified_time: Utc::now(),
566            config: cfg,
567            etag: new_etag(),
568            managed: false,
569        };
570        self.store.origin_request_policies.insert(id, p.clone());
571        Ok(p)
572    }
573
574    /// GetOriginRequestPolicy.
575    pub fn get_origin_request_policy(
576        &self,
577        id: &str,
578    ) -> Result<OriginRequestPolicy, CloudFrontError> {
579        self.store
580            .origin_request_policies
581            .get(id)
582            .map(|r| r.value().clone())
583            .ok_or_else(|| CloudFrontError::no_such_origin_request_policy(id))
584    }
585
586    /// UpdateOriginRequestPolicy.
587    pub fn update_origin_request_policy(
588        &self,
589        id: &str,
590        if_match: Option<&str>,
591        cfg: OriginRequestPolicyConfig,
592    ) -> Result<OriginRequestPolicy, CloudFrontError> {
593        let mut entry = self
594            .store
595            .origin_request_policies
596            .get_mut(id)
597            .ok_or_else(|| CloudFrontError::no_such_origin_request_policy(id))?;
598        if entry.managed {
599            return Err(CloudFrontError::AccessDenied(
600                "AWS-managed origin request policies cannot be modified".to_owned(),
601            ));
602        }
603        check_if_match(if_match, &entry.etag)?;
604        entry.config = cfg;
605        entry.etag = new_etag();
606        entry.last_modified_time = Utc::now();
607        Ok(entry.value().clone())
608    }
609
610    /// DeleteOriginRequestPolicy.
611    pub fn delete_origin_request_policy(
612        &self,
613        id: &str,
614        if_match: Option<&str>,
615    ) -> Result<(), CloudFrontError> {
616        let entry = self
617            .store
618            .origin_request_policies
619            .get(id)
620            .ok_or_else(|| CloudFrontError::no_such_origin_request_policy(id))?;
621        if entry.managed {
622            return Err(CloudFrontError::AccessDenied(
623                "AWS-managed origin request policies cannot be deleted".to_owned(),
624            ));
625        }
626        check_if_match(if_match, &entry.etag)?;
627        drop(entry);
628        self.store.origin_request_policies.remove(id);
629        self.store
630            .tags
631            .remove(&origin_request_policy_arn(&self.config.account_id, id));
632        Ok(())
633    }
634
635    /// ListOriginRequestPolicies.
636    pub fn list_origin_request_policies(&self) -> Vec<OriginRequestPolicy> {
637        let mut v: Vec<_> = self
638            .store
639            .origin_request_policies
640            .iter()
641            .map(|e| e.value().clone())
642            .collect();
643        v.sort_by(|a, b| a.config.name.cmp(&b.config.name));
644        v
645    }
646
647    // ---------------------------------------------------------------------
648    // Response headers policy
649    // ---------------------------------------------------------------------
650
651    /// CreateResponseHeadersPolicy.
652    pub fn create_response_headers_policy(
653        &self,
654        cfg: ResponseHeadersPolicyConfig,
655    ) -> Result<ResponseHeadersPolicy, CloudFrontError> {
656        if cfg.name.is_empty() {
657            return Err(CloudFrontError::InvalidArgument(
658                "ResponseHeadersPolicyConfig.Name is required".to_owned(),
659            ));
660        }
661        let id = uuid::Uuid::new_v4().to_string();
662        let p = ResponseHeadersPolicy {
663            id: id.clone(),
664            last_modified_time: Utc::now(),
665            config: cfg,
666            etag: new_etag(),
667            managed: false,
668        };
669        self.store.response_headers_policies.insert(id, p.clone());
670        Ok(p)
671    }
672
673    /// GetResponseHeadersPolicy.
674    pub fn get_response_headers_policy(
675        &self,
676        id: &str,
677    ) -> Result<ResponseHeadersPolicy, CloudFrontError> {
678        self.store
679            .response_headers_policies
680            .get(id)
681            .map(|r| r.value().clone())
682            .ok_or_else(|| CloudFrontError::no_such_response_headers_policy(id))
683    }
684
685    /// UpdateResponseHeadersPolicy.
686    pub fn update_response_headers_policy(
687        &self,
688        id: &str,
689        if_match: Option<&str>,
690        cfg: ResponseHeadersPolicyConfig,
691    ) -> Result<ResponseHeadersPolicy, CloudFrontError> {
692        let mut entry = self
693            .store
694            .response_headers_policies
695            .get_mut(id)
696            .ok_or_else(|| CloudFrontError::no_such_response_headers_policy(id))?;
697        if entry.managed {
698            return Err(CloudFrontError::AccessDenied(
699                "AWS-managed response headers policies cannot be modified".to_owned(),
700            ));
701        }
702        check_if_match(if_match, &entry.etag)?;
703        entry.config = cfg;
704        entry.etag = new_etag();
705        entry.last_modified_time = Utc::now();
706        Ok(entry.value().clone())
707    }
708
709    /// DeleteResponseHeadersPolicy.
710    pub fn delete_response_headers_policy(
711        &self,
712        id: &str,
713        if_match: Option<&str>,
714    ) -> Result<(), CloudFrontError> {
715        let entry = self
716            .store
717            .response_headers_policies
718            .get(id)
719            .ok_or_else(|| CloudFrontError::no_such_response_headers_policy(id))?;
720        if entry.managed {
721            return Err(CloudFrontError::AccessDenied(
722                "AWS-managed response headers policies cannot be deleted".to_owned(),
723            ));
724        }
725        check_if_match(if_match, &entry.etag)?;
726        drop(entry);
727        self.store.response_headers_policies.remove(id);
728        self.store
729            .tags
730            .remove(&response_headers_policy_arn(&self.config.account_id, id));
731        Ok(())
732    }
733
734    /// ListResponseHeadersPolicies.
735    pub fn list_response_headers_policies(&self) -> Vec<ResponseHeadersPolicy> {
736        let mut v: Vec<_> = self
737            .store
738            .response_headers_policies
739            .iter()
740            .map(|e| e.value().clone())
741            .collect();
742        v.sort_by(|a, b| a.config.name.cmp(&b.config.name));
743        v
744    }
745
746    // ---------------------------------------------------------------------
747    // Key group / public key
748    // ---------------------------------------------------------------------
749
750    /// CreateKeyGroup.
751    pub fn create_key_group(&self, cfg: KeyGroupConfig) -> Result<KeyGroup, CloudFrontError> {
752        if cfg.name.is_empty() {
753            return Err(CloudFrontError::InvalidArgument(
754                "KeyGroupConfig.Name is required".to_owned(),
755            ));
756        }
757        let id = new_id_with_prefix('K');
758        let kg = KeyGroup {
759            id: id.clone(),
760            last_modified_time: Utc::now(),
761            config: cfg,
762            etag: new_etag(),
763        };
764        self.store.key_groups.insert(id, kg.clone());
765        Ok(kg)
766    }
767
768    /// GetKeyGroup.
769    pub fn get_key_group(&self, id: &str) -> Result<KeyGroup, CloudFrontError> {
770        self.store
771            .key_groups
772            .get(id)
773            .map(|r| r.value().clone())
774            .ok_or_else(|| CloudFrontError::no_such_resource("KeyGroup", id))
775    }
776
777    /// UpdateKeyGroup.
778    pub fn update_key_group(
779        &self,
780        id: &str,
781        if_match: Option<&str>,
782        cfg: KeyGroupConfig,
783    ) -> Result<KeyGroup, CloudFrontError> {
784        let mut entry = self
785            .store
786            .key_groups
787            .get_mut(id)
788            .ok_or_else(|| CloudFrontError::no_such_resource("KeyGroup", id))?;
789        check_if_match(if_match, &entry.etag)?;
790        entry.config = cfg;
791        entry.etag = new_etag();
792        entry.last_modified_time = Utc::now();
793        Ok(entry.value().clone())
794    }
795
796    /// DeleteKeyGroup.
797    pub fn delete_key_group(
798        &self,
799        id: &str,
800        if_match: Option<&str>,
801    ) -> Result<(), CloudFrontError> {
802        let entry = self
803            .store
804            .key_groups
805            .get(id)
806            .ok_or_else(|| CloudFrontError::no_such_resource("KeyGroup", id))?;
807        check_if_match(if_match, &entry.etag)?;
808        drop(entry);
809        self.store.key_groups.remove(id);
810        self.store
811            .tags
812            .remove(&key_group_arn(&self.config.account_id, id));
813        Ok(())
814    }
815
816    /// ListKeyGroups.
817    pub fn list_key_groups(&self) -> Vec<KeyGroup> {
818        let mut v: Vec<_> = self
819            .store
820            .key_groups
821            .iter()
822            .map(|e| e.value().clone())
823            .collect();
824        v.sort_by(|a, b| a.id.cmp(&b.id));
825        v
826    }
827
828    /// CreatePublicKey.
829    pub fn create_public_key(&self, cfg: PublicKeyConfig) -> Result<PublicKey, CloudFrontError> {
830        if cfg.name.is_empty() || cfg.encoded_key.is_empty() {
831            return Err(CloudFrontError::InvalidArgument(
832                "PublicKeyConfig Name and EncodedKey are required".to_owned(),
833            ));
834        }
835        let id = new_id_with_prefix('K');
836        let pk = PublicKey {
837            id: id.clone(),
838            created_time: Utc::now(),
839            config: cfg,
840            etag: new_etag(),
841        };
842        self.store.public_keys.insert(id, pk.clone());
843        Ok(pk)
844    }
845
846    /// GetPublicKey.
847    pub fn get_public_key(&self, id: &str) -> Result<PublicKey, CloudFrontError> {
848        self.store
849            .public_keys
850            .get(id)
851            .map(|r| r.value().clone())
852            .ok_or_else(|| CloudFrontError::no_such_public_key(id))
853    }
854
855    /// UpdatePublicKey.
856    pub fn update_public_key(
857        &self,
858        id: &str,
859        if_match: Option<&str>,
860        cfg: PublicKeyConfig,
861    ) -> Result<PublicKey, CloudFrontError> {
862        let mut entry = self
863            .store
864            .public_keys
865            .get_mut(id)
866            .ok_or_else(|| CloudFrontError::no_such_public_key(id))?;
867        check_if_match(if_match, &entry.etag)?;
868        entry.config = cfg;
869        entry.etag = new_etag();
870        Ok(entry.value().clone())
871    }
872
873    /// DeletePublicKey.
874    pub fn delete_public_key(
875        &self,
876        id: &str,
877        if_match: Option<&str>,
878    ) -> Result<(), CloudFrontError> {
879        let entry = self
880            .store
881            .public_keys
882            .get(id)
883            .ok_or_else(|| CloudFrontError::no_such_public_key(id))?;
884        check_if_match(if_match, &entry.etag)?;
885        drop(entry);
886        self.store.public_keys.remove(id);
887        self.store
888            .tags
889            .remove(&public_key_arn(&self.config.account_id, id));
890        Ok(())
891    }
892
893    /// ListPublicKeys.
894    pub fn list_public_keys(&self) -> Vec<PublicKey> {
895        let mut v: Vec<_> = self
896            .store
897            .public_keys
898            .iter()
899            .map(|e| e.value().clone())
900            .collect();
901        v.sort_by(|a, b| a.id.cmp(&b.id));
902        v
903    }
904
905    // ---------------------------------------------------------------------
906    // Functions (store-only; no JS runtime)
907    // ---------------------------------------------------------------------
908
909    /// CreateFunction.
910    pub fn create_function(
911        &self,
912        name: String,
913        cfg: FunctionConfig,
914        code: Vec<u8>,
915    ) -> Result<CloudFrontFunction, CloudFrontError> {
916        if name.is_empty() {
917            return Err(CloudFrontError::InvalidArgument(
918                "Function Name is required".to_owned(),
919            ));
920        }
921        if self.store.functions.contains_key(&name) {
922            return Err(CloudFrontError::AlreadyExists {
923                code: "FunctionAlreadyExists",
924                message: format!("Function {name} already exists"),
925            });
926        }
927        let arn = function_arn(&self.config.account_id, &name);
928        let now = Utc::now();
929        let f = CloudFrontFunction {
930            name: name.clone(),
931            arn: arn.clone(),
932            last_modified_time: now,
933            stage: "DEVELOPMENT".to_owned(),
934            metadata: FunctionMetadata {
935                function_arn: arn,
936                stage: "DEVELOPMENT".to_owned(),
937                created_time: now,
938                last_modified_time: now,
939            },
940            config: cfg,
941            code,
942            etag: new_etag(),
943            status: "UNPUBLISHED".to_owned(),
944        };
945        self.store.functions.insert(name, f.clone());
946        Ok(f)
947    }
948
949    /// DescribeFunction / GetFunction (code is the distinction — same storage).
950    pub fn get_function(&self, name: &str) -> Result<CloudFrontFunction, CloudFrontError> {
951        self.store
952            .functions
953            .get(name)
954            .map(|r| r.value().clone())
955            .ok_or_else(|| CloudFrontError::no_such_resource("Function", name))
956    }
957
958    /// UpdateFunction.
959    pub fn update_function(
960        &self,
961        name: &str,
962        if_match: Option<&str>,
963        cfg: FunctionConfig,
964        code: Vec<u8>,
965    ) -> Result<CloudFrontFunction, CloudFrontError> {
966        let mut entry = self
967            .store
968            .functions
969            .get_mut(name)
970            .ok_or_else(|| CloudFrontError::no_such_resource("Function", name))?;
971        check_if_match(if_match, &entry.etag)?;
972        entry.config = cfg;
973        entry.code = code;
974        entry.etag = new_etag();
975        entry.last_modified_time = Utc::now();
976        Ok(entry.value().clone())
977    }
978
979    /// DeleteFunction.
980    pub fn delete_function(
981        &self,
982        name: &str,
983        if_match: Option<&str>,
984    ) -> Result<(), CloudFrontError> {
985        let entry = self
986            .store
987            .functions
988            .get(name)
989            .ok_or_else(|| CloudFrontError::no_such_resource("Function", name))?;
990        check_if_match(if_match, &entry.etag)?;
991        drop(entry);
992        self.store.functions.remove(name);
993        Ok(())
994    }
995
996    /// PublishFunction: flips stage from DEVELOPMENT to LIVE.
997    pub fn publish_function(
998        &self,
999        name: &str,
1000        if_match: Option<&str>,
1001    ) -> Result<CloudFrontFunction, CloudFrontError> {
1002        let mut entry = self
1003            .store
1004            .functions
1005            .get_mut(name)
1006            .ok_or_else(|| CloudFrontError::no_such_resource("Function", name))?;
1007        check_if_match(if_match, &entry.etag)?;
1008        entry.stage = "LIVE".to_owned();
1009        entry.metadata.stage = "LIVE".to_owned();
1010        entry.status = "PUBLISHED".to_owned();
1011        entry.etag = new_etag();
1012        Ok(entry.value().clone())
1013    }
1014
1015    /// TestFunction: returns canned success.
1016    pub fn test_function(
1017        &self,
1018        name: &str,
1019        event_object: &[u8],
1020    ) -> Result<(Vec<u8>, String), CloudFrontError> {
1021        let _ = self.get_function(name)?;
1022        let result = br#"{"status":"success","testStatus":"OK"}"#.to_vec();
1023        let compute_util = format!("compute_utilization_percent={}", event_object.len());
1024        Ok((result, compute_util))
1025    }
1026
1027    /// ListFunctions.
1028    pub fn list_functions(&self) -> Vec<CloudFrontFunction> {
1029        let mut v: Vec<_> = self
1030            .store
1031            .functions
1032            .iter()
1033            .map(|e| e.value().clone())
1034            .collect();
1035        v.sort_by(|a, b| a.name.cmp(&b.name));
1036        v
1037    }
1038
1039    // ---------------------------------------------------------------------
1040    // Field-Level Encryption (store-only)
1041    // ---------------------------------------------------------------------
1042
1043    /// CreateFieldLevelEncryptionConfig.
1044    pub fn create_fle_config(
1045        &self,
1046        cfg: FieldLevelEncryptionConfig,
1047    ) -> Result<FieldLevelEncryption, CloudFrontError> {
1048        let id = new_id_with_prefix('F');
1049        let f = FieldLevelEncryption {
1050            id: id.clone(),
1051            last_modified_time: Utc::now(),
1052            config: cfg,
1053            etag: new_etag(),
1054        };
1055        self.store.fle_configs.insert(id, f.clone());
1056        Ok(f)
1057    }
1058
1059    /// GetFieldLevelEncryption / GetFieldLevelEncryptionConfig.
1060    pub fn get_fle_config(&self, id: &str) -> Result<FieldLevelEncryption, CloudFrontError> {
1061        self.store
1062            .fle_configs
1063            .get(id)
1064            .map(|r| r.value().clone())
1065            .ok_or_else(|| CloudFrontError::no_such_resource("FieldLevelEncryption", id))
1066    }
1067
1068    /// UpdateFieldLevelEncryptionConfig.
1069    pub fn update_fle_config(
1070        &self,
1071        id: &str,
1072        if_match: Option<&str>,
1073        cfg: FieldLevelEncryptionConfig,
1074    ) -> Result<FieldLevelEncryption, CloudFrontError> {
1075        let mut entry = self
1076            .store
1077            .fle_configs
1078            .get_mut(id)
1079            .ok_or_else(|| CloudFrontError::no_such_resource("FieldLevelEncryption", id))?;
1080        check_if_match(if_match, &entry.etag)?;
1081        entry.config = cfg;
1082        entry.etag = new_etag();
1083        entry.last_modified_time = Utc::now();
1084        Ok(entry.value().clone())
1085    }
1086
1087    /// DeleteFieldLevelEncryptionConfig.
1088    pub fn delete_fle_config(
1089        &self,
1090        id: &str,
1091        if_match: Option<&str>,
1092    ) -> Result<(), CloudFrontError> {
1093        let entry = self
1094            .store
1095            .fle_configs
1096            .get(id)
1097            .ok_or_else(|| CloudFrontError::no_such_resource("FieldLevelEncryption", id))?;
1098        check_if_match(if_match, &entry.etag)?;
1099        drop(entry);
1100        self.store.fle_configs.remove(id);
1101        Ok(())
1102    }
1103
1104    /// ListFieldLevelEncryptionConfigs.
1105    pub fn list_fle_configs(&self) -> Vec<FieldLevelEncryption> {
1106        self.store
1107            .fle_configs
1108            .iter()
1109            .map(|e| e.value().clone())
1110            .collect()
1111    }
1112
1113    /// CreateFieldLevelEncryptionProfile.
1114    pub fn create_fle_profile(
1115        &self,
1116        cfg: FieldLevelEncryptionProfileConfig,
1117    ) -> Result<FieldLevelEncryptionProfile, CloudFrontError> {
1118        let id = new_id_with_prefix('P');
1119        let p = FieldLevelEncryptionProfile {
1120            id: id.clone(),
1121            last_modified_time: Utc::now(),
1122            config: cfg,
1123            etag: new_etag(),
1124        };
1125        self.store.fle_profiles.insert(id, p.clone());
1126        Ok(p)
1127    }
1128
1129    /// GetFieldLevelEncryptionProfile.
1130    pub fn get_fle_profile(
1131        &self,
1132        id: &str,
1133    ) -> Result<FieldLevelEncryptionProfile, CloudFrontError> {
1134        self.store
1135            .fle_profiles
1136            .get(id)
1137            .map(|r| r.value().clone())
1138            .ok_or_else(|| CloudFrontError::no_such_resource("FieldLevelEncryptionProfile", id))
1139    }
1140
1141    /// UpdateFieldLevelEncryptionProfile.
1142    pub fn update_fle_profile(
1143        &self,
1144        id: &str,
1145        if_match: Option<&str>,
1146        cfg: FieldLevelEncryptionProfileConfig,
1147    ) -> Result<FieldLevelEncryptionProfile, CloudFrontError> {
1148        let mut entry =
1149            self.store.fle_profiles.get_mut(id).ok_or_else(|| {
1150                CloudFrontError::no_such_resource("FieldLevelEncryptionProfile", id)
1151            })?;
1152        check_if_match(if_match, &entry.etag)?;
1153        entry.config = cfg;
1154        entry.etag = new_etag();
1155        entry.last_modified_time = Utc::now();
1156        Ok(entry.value().clone())
1157    }
1158
1159    /// DeleteFieldLevelEncryptionProfile.
1160    pub fn delete_fle_profile(
1161        &self,
1162        id: &str,
1163        if_match: Option<&str>,
1164    ) -> Result<(), CloudFrontError> {
1165        let entry =
1166            self.store.fle_profiles.get(id).ok_or_else(|| {
1167                CloudFrontError::no_such_resource("FieldLevelEncryptionProfile", id)
1168            })?;
1169        check_if_match(if_match, &entry.etag)?;
1170        drop(entry);
1171        self.store.fle_profiles.remove(id);
1172        Ok(())
1173    }
1174
1175    /// ListFieldLevelEncryptionProfiles.
1176    pub fn list_fle_profiles(&self) -> Vec<FieldLevelEncryptionProfile> {
1177        self.store
1178            .fle_profiles
1179            .iter()
1180            .map(|e| e.value().clone())
1181            .collect()
1182    }
1183
1184    // ---------------------------------------------------------------------
1185    // Monitoring subscription
1186    // ---------------------------------------------------------------------
1187
1188    /// CreateMonitoringSubscription.
1189    pub fn create_monitoring_subscription(
1190        &self,
1191        distribution_id: &str,
1192        enabled: bool,
1193    ) -> Result<MonitoringSubscription, CloudFrontError> {
1194        if !self.store.distributions.contains_key(distribution_id) {
1195            return Err(CloudFrontError::no_such_distribution(distribution_id));
1196        }
1197        let sub = MonitoringSubscription {
1198            distribution_id: distribution_id.to_owned(),
1199            realtime_metrics_subscription_status: if enabled {
1200                "Enabled".to_owned()
1201            } else {
1202                "Disabled".to_owned()
1203            },
1204        };
1205        self.store
1206            .monitoring_subscriptions
1207            .insert(distribution_id.to_owned(), sub.clone());
1208        Ok(sub)
1209    }
1210
1211    /// GetMonitoringSubscription.
1212    pub fn get_monitoring_subscription(
1213        &self,
1214        distribution_id: &str,
1215    ) -> Result<MonitoringSubscription, CloudFrontError> {
1216        self.store
1217            .monitoring_subscriptions
1218            .get(distribution_id)
1219            .map(|r| r.value().clone())
1220            .ok_or_else(|| {
1221                CloudFrontError::no_such_resource("MonitoringSubscription", distribution_id)
1222            })
1223    }
1224
1225    /// DeleteMonitoringSubscription.
1226    pub fn delete_monitoring_subscription(
1227        &self,
1228        distribution_id: &str,
1229    ) -> Result<(), CloudFrontError> {
1230        if self
1231            .store
1232            .monitoring_subscriptions
1233            .remove(distribution_id)
1234            .is_none()
1235        {
1236            return Err(CloudFrontError::no_such_resource(
1237                "MonitoringSubscription",
1238                distribution_id,
1239            ));
1240        }
1241        Ok(())
1242    }
1243
1244    // ---------------------------------------------------------------------
1245    // KeyValueStore
1246    // ---------------------------------------------------------------------
1247
1248    /// CreateKeyValueStore.
1249    pub fn create_kvs(
1250        &self,
1251        name: String,
1252        comment: String,
1253    ) -> Result<KeyValueStore, CloudFrontError> {
1254        let id = uuid::Uuid::new_v4().to_string();
1255        let arn = kvs_arn(&self.config.account_id, &id);
1256        let kvs = KeyValueStore {
1257            id: id.clone(),
1258            name,
1259            arn,
1260            comment,
1261            status: "PROVISIONING".to_owned(),
1262            last_modified_time: Utc::now(),
1263            etag: new_etag(),
1264        };
1265        self.store.key_value_stores.insert(id, kvs.clone());
1266        Ok(kvs)
1267    }
1268
1269    /// DescribeKeyValueStore.
1270    pub fn get_kvs(&self, id: &str) -> Result<KeyValueStore, CloudFrontError> {
1271        self.store
1272            .key_value_stores
1273            .get(id)
1274            .map(|r| r.value().clone())
1275            .ok_or_else(|| CloudFrontError::no_such_resource("KeyValueStore", id))
1276    }
1277
1278    /// UpdateKeyValueStore.
1279    pub fn update_kvs(
1280        &self,
1281        id: &str,
1282        if_match: Option<&str>,
1283        comment: String,
1284    ) -> Result<KeyValueStore, CloudFrontError> {
1285        let mut entry = self
1286            .store
1287            .key_value_stores
1288            .get_mut(id)
1289            .ok_or_else(|| CloudFrontError::no_such_resource("KeyValueStore", id))?;
1290        check_if_match(if_match, &entry.etag)?;
1291        entry.comment = comment;
1292        entry.etag = new_etag();
1293        entry.last_modified_time = Utc::now();
1294        Ok(entry.value().clone())
1295    }
1296
1297    /// DeleteKeyValueStore.
1298    pub fn delete_kvs(&self, id: &str, if_match: Option<&str>) -> Result<(), CloudFrontError> {
1299        let entry = self
1300            .store
1301            .key_value_stores
1302            .get(id)
1303            .ok_or_else(|| CloudFrontError::no_such_resource("KeyValueStore", id))?;
1304        check_if_match(if_match, &entry.etag)?;
1305        drop(entry);
1306        self.store.key_value_stores.remove(id);
1307        Ok(())
1308    }
1309
1310    /// ListKeyValueStores.
1311    pub fn list_kvs(&self) -> Vec<KeyValueStore> {
1312        self.store
1313            .key_value_stores
1314            .iter()
1315            .map(|e| e.value().clone())
1316            .collect()
1317    }
1318
1319    // ---------------------------------------------------------------------
1320    // Realtime log configs
1321    // ---------------------------------------------------------------------
1322
1323    /// CreateRealtimeLogConfig.
1324    pub fn create_realtime_log_config(
1325        &self,
1326        cfg: RealtimeLogConfig,
1327    ) -> Result<RealtimeLogConfig, CloudFrontError> {
1328        if cfg.name.is_empty() {
1329            return Err(CloudFrontError::InvalidArgument(
1330                "RealtimeLogConfig Name is required".to_owned(),
1331            ));
1332        }
1333        let mut final_cfg = cfg;
1334        if final_cfg.arn.is_empty() {
1335            final_cfg.arn = realtime_log_arn(&self.config.account_id, &final_cfg.name);
1336        }
1337        self.store
1338            .realtime_log_configs
1339            .insert(final_cfg.name.clone(), final_cfg.clone());
1340        Ok(final_cfg)
1341    }
1342
1343    /// GetRealtimeLogConfig.
1344    pub fn get_realtime_log_config(
1345        &self,
1346        name: &str,
1347    ) -> Result<RealtimeLogConfig, CloudFrontError> {
1348        self.store
1349            .realtime_log_configs
1350            .get(name)
1351            .map(|r| r.value().clone())
1352            .ok_or_else(|| CloudFrontError::no_such_resource("RealtimeLogConfig", name))
1353    }
1354
1355    /// UpdateRealtimeLogConfig.
1356    pub fn update_realtime_log_config(
1357        &self,
1358        cfg: RealtimeLogConfig,
1359    ) -> Result<RealtimeLogConfig, CloudFrontError> {
1360        if !self.store.realtime_log_configs.contains_key(&cfg.name) {
1361            return Err(CloudFrontError::no_such_resource(
1362                "RealtimeLogConfig",
1363                &cfg.name,
1364            ));
1365        }
1366        self.store
1367            .realtime_log_configs
1368            .insert(cfg.name.clone(), cfg.clone());
1369        Ok(cfg)
1370    }
1371
1372    /// DeleteRealtimeLogConfig.
1373    pub fn delete_realtime_log_config(&self, name: &str) -> Result<(), CloudFrontError> {
1374        if self.store.realtime_log_configs.remove(name).is_none() {
1375            return Err(CloudFrontError::no_such_resource("RealtimeLogConfig", name));
1376        }
1377        Ok(())
1378    }
1379
1380    /// ListRealtimeLogConfigs.
1381    pub fn list_realtime_log_configs(&self) -> Vec<RealtimeLogConfig> {
1382        self.store
1383            .realtime_log_configs
1384            .iter()
1385            .map(|e| e.value().clone())
1386            .collect()
1387    }
1388
1389    // ---------------------------------------------------------------------
1390    // Tagging (uniform across all taggable resources)
1391    // ---------------------------------------------------------------------
1392
1393    /// TagResource.
1394    pub fn tag_resource(&self, arn: &str, new_tags: &[Tag]) -> Result<(), CloudFrontError> {
1395        let mut entry = self.store.tags.entry(arn.to_owned()).or_default();
1396        merge_tags(entry.value_mut(), new_tags);
1397        Ok(())
1398    }
1399
1400    /// UntagResource.
1401    pub fn untag_resource(&self, arn: &str, keys: &[String]) -> Result<(), CloudFrontError> {
1402        if let Some(mut entry) = self.store.tags.get_mut(arn) {
1403            entry.retain(|t| !keys.iter().any(|k| k == &t.key));
1404        }
1405        Ok(())
1406    }
1407
1408    /// ListTagsForResource.
1409    pub fn list_tags_for_resource(&self, arn: &str) -> Result<TagSet, CloudFrontError> {
1410        Ok(self
1411            .store
1412            .tags
1413            .get(arn)
1414            .map(|r| r.value().clone())
1415            .unwrap_or_default())
1416    }
1417
1418    // ---------------------------------------------------------------------
1419    // Lifecycle simulators
1420    // ---------------------------------------------------------------------
1421
1422    fn spawn_distribution_deployment(self: &Arc<Self>, id: &str) {
1423        let delay = self.config.distribution_propagation;
1424        let id = id.to_owned();
1425        let provider = Arc::clone(self);
1426        tokio::spawn(async move {
1427            if !delay.is_zero() {
1428                tokio::time::sleep(delay).await;
1429            }
1430            if let Some(mut d) = provider.store.distributions.get_mut(&id) {
1431                d.status = ResourceStatus::Deployed;
1432            }
1433        });
1434    }
1435
1436    fn spawn_invalidation_completion(
1437        self: &Arc<Self>,
1438        distribution_id: &str,
1439        invalidation_id: &str,
1440    ) {
1441        let delay = self.config.invalidation_propagation;
1442        let dist_id = distribution_id.to_owned();
1443        let inv_id = invalidation_id.to_owned();
1444        let provider = Arc::clone(self);
1445        tokio::spawn(async move {
1446            if !delay.is_zero() {
1447                tokio::time::sleep(delay).await;
1448            }
1449            if let Some(mut inv) = provider
1450                .store
1451                .invalidations
1452                .get_mut(&(dist_id.clone(), inv_id))
1453            {
1454                inv.status = ResourceStatus::Completed;
1455            }
1456            if let Some(mut d) = provider.store.distributions.get_mut(&dist_id) {
1457                d.in_progress_invalidation_batches =
1458                    (d.in_progress_invalidation_batches - 1).max(0);
1459            }
1460        });
1461    }
1462}
1463
1464// ---------------------------------------------------------------------------
1465// Validation helpers
1466// ---------------------------------------------------------------------------
1467
1468fn validate_distribution_config(cfg: &DistributionConfig) -> Result<(), CloudFrontError> {
1469    if cfg.caller_reference.is_empty() {
1470        return Err(CloudFrontError::MissingArgument(
1471            "CallerReference is required".to_owned(),
1472        ));
1473    }
1474    if cfg.origins.is_empty() {
1475        return Err(CloudFrontError::MissingArgument(
1476            "At least one Origin is required".to_owned(),
1477        ));
1478    }
1479    if cfg.default_cache_behavior.target_origin_id.is_empty() {
1480        return Err(CloudFrontError::MissingArgument(
1481            "DefaultCacheBehavior.TargetOriginId is required".to_owned(),
1482        ));
1483    }
1484    let origin_ids: std::collections::HashSet<&str> =
1485        cfg.origins.iter().map(|o| o.id.as_str()).collect();
1486    if !origin_ids.contains(cfg.default_cache_behavior.target_origin_id.as_str())
1487        && !cfg
1488            .origin_groups
1489            .iter()
1490            .any(|g| g.id == cfg.default_cache_behavior.target_origin_id)
1491    {
1492        return Err(CloudFrontError::MalformedInput(format!(
1493            "DefaultCacheBehavior.TargetOriginId {} does not match any Origin.Id",
1494            cfg.default_cache_behavior.target_origin_id
1495        )));
1496    }
1497    for cb in &cfg.cache_behaviors {
1498        if !origin_ids.contains(cb.target_origin_id.as_str())
1499            && !cfg
1500                .origin_groups
1501                .iter()
1502                .any(|g| g.id == cb.target_origin_id)
1503        {
1504            return Err(CloudFrontError::MalformedInput(format!(
1505                "CacheBehavior.TargetOriginId {} does not match any Origin.Id",
1506                cb.target_origin_id
1507            )));
1508        }
1509    }
1510    Ok(())
1511}
1512
1513fn check_if_match(supplied: Option<&str>, current: &str) -> Result<(), CloudFrontError> {
1514    match supplied {
1515        None | Some("") => Err(CloudFrontError::InvalidIfMatchVersion(
1516            "The If-Match version is missing".to_owned(),
1517        )),
1518        Some(v) if v == current => Ok(()),
1519        Some(_) => Err(CloudFrontError::PreconditionFailed(
1520            "The If-Match version is not valid for the resource".to_owned(),
1521        )),
1522    }
1523}
1524
1525fn merge_tags(existing: &mut TagSet, new_tags: &[Tag]) {
1526    for t in new_tags {
1527        if let Some(existing_tag) = existing.iter_mut().find(|e| e.key == t.key) {
1528            existing_tag.value = t.value.clone();
1529        } else {
1530            existing.push(t.clone());
1531        }
1532    }
1533}