Skip to main content

rustack_cloudfront_core/
provider.rs

1//! CloudFront provider — owns the store and implements every operation.
2//!
3//! Operations are grouped by resource kind. Each mutating op follows the same
4//! skeleton:
5//!
6//! 1. Look up the resource (or fail with the appropriate `NoSuch*`).
7//! 2. Check `If-Match` for Update/Delete (or fail `PreconditionFailed`).
8//! 3. Mutate, bump ETag, update `last_modified_time`.
9//! 4. Spawn propagation simulator if this is a distribution/invalidation.
10//!
11//! The ETag model is monotonic: every successful mutation generates a brand
12//! new opaque token. ETags are never reused.
13
14#![allow(clippy::too_many_lines)]
15#![allow(clippy::too_many_arguments)]
16
17use std::sync::Arc;
18
19use chrono::Utc;
20use rustack_cloudfront_model::{
21    CachePolicy, CachePolicyConfig, CloudFrontError, CloudFrontFunction,
22    CloudFrontOriginAccessIdentity, CloudFrontOriginAccessIdentityConfig, Distribution,
23    DistributionConfig, FieldLevelEncryption, FieldLevelEncryptionConfig,
24    FieldLevelEncryptionProfile, FieldLevelEncryptionProfileConfig, FunctionConfig,
25    FunctionMetadata, Invalidation, InvalidationBatch, KeyGroup, KeyGroupConfig, KeyValueStore,
26    MonitoringSubscription, OriginAccessControl, OriginAccessControlConfig, OriginRequestPolicy,
27    OriginRequestPolicyConfig, PublicKey, PublicKeyConfig, RealtimeLogConfig, ResourceStatus,
28    ResponseHeadersPolicy, ResponseHeadersPolicyConfig, Tag, TagSet,
29};
30use tracing::info;
31
32use crate::{
33    arn::{
34        cache_policy_arn, distribution_arn, function_arn, key_group_arn, kvs_arn, oai_arn,
35        origin_access_control_arn, origin_request_policy_arn, public_key_arn, realtime_log_arn,
36        response_headers_policy_arn,
37    },
38    config::CloudFrontConfig,
39    id_gen::{
40        deterministic_id_with_prefix, distribution_domain_name, new_distribution_id, new_etag,
41        new_id_with_prefix, new_invalidation_id, new_s3_canonical_user_id,
42    },
43    managed::{
44        managed_cache_policies, managed_origin_request_policies, managed_response_headers_policies,
45    },
46    store::CloudFrontStore,
47};
48
49/// Main provider.
50#[derive(Debug)]
51pub struct RustackCloudFront {
52    store: Arc<CloudFrontStore>,
53    config: Arc<CloudFrontConfig>,
54}
55
56impl RustackCloudFront {
57    /// Build a new provider with managed policies pre-seeded.
58    #[must_use]
59    pub fn new(config: CloudFrontConfig) -> Self {
60        let store = CloudFrontStore::new();
61        for p in managed_cache_policies() {
62            store.cache_policies.insert(p.id.clone(), p);
63        }
64        for p in managed_origin_request_policies() {
65            store.origin_request_policies.insert(p.id.clone(), p);
66        }
67        for p in managed_response_headers_policies() {
68            store.response_headers_policies.insert(p.id.clone(), p);
69        }
70        Self {
71            store,
72            config: Arc::new(config),
73        }
74    }
75
76    /// Shared store handle.
77    #[must_use]
78    pub fn store(&self) -> &Arc<CloudFrontStore> {
79        &self.store
80    }
81
82    /// Runtime configuration.
83    #[must_use]
84    pub fn config(&self) -> &CloudFrontConfig {
85        &self.config
86    }
87
88    // ---------------------------------------------------------------------
89    // Distribution operations
90    // ---------------------------------------------------------------------
91
92    /// CreateDistribution / CreateDistributionWithTags.
93    pub fn create_distribution(
94        self: &Arc<Self>,
95        config: DistributionConfig,
96        tags: TagSet,
97    ) -> Result<Distribution, CloudFrontError> {
98        validate_distribution_config(&config)?;
99
100        let id = if self.config.deterministic_ids {
101            deterministic_id_with_prefix('E', &config.caller_reference)
102        } else {
103            new_distribution_id()
104        };
105        let arn = distribution_arn(&self.config.account_id, &id);
106        let domain_name = distribution_domain_name(&id, &self.config.domain_suffix);
107        let etag = new_etag();
108        let dist = Distribution {
109            id: id.clone(),
110            arn: arn.clone(),
111            status: ResourceStatus::InProgress,
112            last_modified_time: Utc::now(),
113            domain_name,
114            in_progress_invalidation_batches: 0,
115            active_trusted_signers_enabled: false,
116            active_trusted_key_groups_enabled: false,
117            config,
118            tags: tags.clone(),
119            etag,
120            alias_icp_recordal: Vec::new(),
121        };
122
123        if !tags.is_empty() {
124            self.store.tags.insert(arn.clone(), tags);
125        }
126        self.store.distributions.insert(id.clone(), dist.clone());
127        self.spawn_distribution_deployment(&id);
128        info!(distribution_id = %id, "created distribution");
129        Ok(dist)
130    }
131
132    /// GetDistribution returns the full `Distribution` record.
133    pub fn get_distribution(&self, id: &str) -> Result<Distribution, CloudFrontError> {
134        self.store
135            .distributions
136            .get(id)
137            .map(|r| r.value().clone())
138            .ok_or_else(|| CloudFrontError::no_such_distribution(id))
139    }
140
141    /// UpdateDistribution.
142    pub fn update_distribution(
143        self: &Arc<Self>,
144        id: &str,
145        if_match: Option<&str>,
146        new_config: DistributionConfig,
147    ) -> Result<Distribution, CloudFrontError> {
148        validate_distribution_config(&new_config)?;
149        let mut entry = self
150            .store
151            .distributions
152            .get_mut(id)
153            .ok_or_else(|| CloudFrontError::no_such_distribution(id))?;
154        check_if_match(if_match, &entry.etag)?;
155        entry.config = new_config;
156        entry.etag = new_etag();
157        entry.status = ResourceStatus::InProgress;
158        entry.last_modified_time = Utc::now();
159        let clone = entry.value().clone();
160        drop(entry);
161        self.spawn_distribution_deployment(id);
162        Ok(clone)
163    }
164
165    /// DeleteDistribution.
166    pub fn delete_distribution(
167        &self,
168        id: &str,
169        if_match: Option<&str>,
170    ) -> Result<(), CloudFrontError> {
171        let dist = self
172            .store
173            .distributions
174            .get(id)
175            .ok_or_else(|| CloudFrontError::no_such_distribution(id))?;
176        check_if_match(if_match, &dist.etag)?;
177        if dist.config.enabled {
178            return Err(CloudFrontError::DistributionNotDisabled(format!(
179                "Distribution {id} is enabled; disable it before deleting."
180            )));
181        }
182        drop(dist);
183        self.store.distributions.remove(id);
184        self.store
185            .tags
186            .remove(&distribution_arn(&self.config.account_id, id));
187        Ok(())
188    }
189
190    /// ListDistributions (unpaginated — Rustack scale makes this fine).
191    pub fn list_distributions(&self) -> Vec<Distribution> {
192        let mut v: Vec<_> = self
193            .store
194            .distributions
195            .iter()
196            .map(|e| e.value().clone())
197            .collect();
198        v.sort_by(|a, b| a.id.cmp(&b.id));
199        v
200    }
201
202    /// CopyDistribution: clone an existing distribution under a new ID.
203    pub fn copy_distribution(
204        self: &Arc<Self>,
205        primary_id: &str,
206        caller_reference: &str,
207        staging: bool,
208    ) -> Result<Distribution, CloudFrontError> {
209        let primary = self
210            .store
211            .distributions
212            .get(primary_id)
213            .ok_or_else(|| CloudFrontError::no_such_distribution(primary_id))?;
214        let mut new_cfg = primary.config.clone();
215        new_cfg.caller_reference = caller_reference.to_owned();
216        new_cfg.staging = staging;
217        drop(primary);
218        self.create_distribution(new_cfg, Vec::new())
219    }
220
221    // ---------------------------------------------------------------------
222    // Invalidation operations
223    // ---------------------------------------------------------------------
224
225    /// CreateInvalidation.
226    pub fn create_invalidation(
227        self: &Arc<Self>,
228        distribution_id: &str,
229        batch: InvalidationBatch,
230    ) -> Result<Invalidation, CloudFrontError> {
231        if !self.store.distributions.contains_key(distribution_id) {
232            return Err(CloudFrontError::no_such_distribution(distribution_id));
233        }
234        if batch.paths.is_empty() {
235            return Err(CloudFrontError::InvalidArgument(
236                "Invalidation batch must contain at least one path".to_owned(),
237            ));
238        }
239        let id = if self.config.deterministic_ids {
240            deterministic_id_with_prefix('I', &batch.caller_reference)
241        } else {
242            new_invalidation_id()
243        };
244        let inv = Invalidation {
245            id: id.clone(),
246            status: ResourceStatus::InProgress,
247            create_time: Utc::now(),
248            distribution_id: distribution_id.to_owned(),
249            batch,
250        };
251        self.store
252            .invalidations
253            .insert((distribution_id.to_owned(), id.clone()), inv.clone());
254        if let Some(mut d) = self.store.distributions.get_mut(distribution_id) {
255            d.in_progress_invalidation_batches += 1;
256        }
257        self.spawn_invalidation_completion(distribution_id, &id);
258        Ok(inv)
259    }
260
261    /// GetInvalidation.
262    pub fn get_invalidation(
263        &self,
264        distribution_id: &str,
265        invalidation_id: &str,
266    ) -> Result<Invalidation, CloudFrontError> {
267        self.store
268            .invalidations
269            .get(&(distribution_id.to_owned(), invalidation_id.to_owned()))
270            .map(|r| r.value().clone())
271            .ok_or_else(|| CloudFrontError::no_such_invalidation(invalidation_id))
272    }
273
274    /// ListInvalidations for a distribution.
275    pub fn list_invalidations(&self, distribution_id: &str) -> Vec<Invalidation> {
276        let mut v: Vec<_> = self
277            .store
278            .invalidations
279            .iter()
280            .filter(|e| e.key().0 == distribution_id)
281            .map(|e| e.value().clone())
282            .collect();
283        v.sort_by(|a, b| b.create_time.cmp(&a.create_time));
284        v
285    }
286
287    // ---------------------------------------------------------------------
288    // Origin Access Control (OAC)
289    // ---------------------------------------------------------------------
290
291    /// CreateOriginAccessControl.
292    pub fn create_oac(
293        &self,
294        cfg: OriginAccessControlConfig,
295    ) -> Result<OriginAccessControl, CloudFrontError> {
296        if cfg.name.is_empty() {
297            return Err(CloudFrontError::InvalidArgument(
298                "OriginAccessControl Name is required".to_owned(),
299            ));
300        }
301        let id = new_id_with_prefix('E');
302        let oac = OriginAccessControl {
303            id: id.clone(),
304            config: cfg,
305            etag: new_etag(),
306        };
307        self.store.origin_access_controls.insert(id, oac.clone());
308        Ok(oac)
309    }
310
311    /// GetOriginAccessControl.
312    pub fn get_oac(&self, id: &str) -> Result<OriginAccessControl, CloudFrontError> {
313        self.store
314            .origin_access_controls
315            .get(id)
316            .map(|r| r.value().clone())
317            .ok_or_else(|| CloudFrontError::no_such_origin_access_control(id))
318    }
319
320    /// UpdateOriginAccessControl.
321    pub fn update_oac(
322        &self,
323        id: &str,
324        if_match: Option<&str>,
325        cfg: OriginAccessControlConfig,
326    ) -> Result<OriginAccessControl, CloudFrontError> {
327        let mut entry = self
328            .store
329            .origin_access_controls
330            .get_mut(id)
331            .ok_or_else(|| CloudFrontError::no_such_origin_access_control(id))?;
332        check_if_match(if_match, &entry.etag)?;
333        entry.config = cfg;
334        entry.etag = new_etag();
335        Ok(entry.value().clone())
336    }
337
338    /// DeleteOriginAccessControl.
339    pub fn delete_oac(&self, id: &str, if_match: Option<&str>) -> Result<(), CloudFrontError> {
340        let entry = self
341            .store
342            .origin_access_controls
343            .get(id)
344            .ok_or_else(|| CloudFrontError::no_such_origin_access_control(id))?;
345        check_if_match(if_match, &entry.etag)?;
346        drop(entry);
347        self.store.origin_access_controls.remove(id);
348        self.store
349            .tags
350            .remove(&origin_access_control_arn(&self.config.account_id, id));
351        Ok(())
352    }
353
354    /// ListOriginAccessControls.
355    pub fn list_oacs(&self) -> Vec<OriginAccessControl> {
356        let mut v: Vec<_> = self
357            .store
358            .origin_access_controls
359            .iter()
360            .map(|e| e.value().clone())
361            .collect();
362        v.sort_by(|a, b| a.id.cmp(&b.id));
363        v
364    }
365
366    // ---------------------------------------------------------------------
367    // Cloudfront Origin Access Identity (OAI, legacy)
368    // ---------------------------------------------------------------------
369
370    /// CreateCloudFrontOriginAccessIdentity.
371    pub fn create_oai(
372        &self,
373        cfg: CloudFrontOriginAccessIdentityConfig,
374    ) -> Result<CloudFrontOriginAccessIdentity, CloudFrontError> {
375        let id = new_id_with_prefix('E');
376        let oai = CloudFrontOriginAccessIdentity {
377            id: id.clone(),
378            s3_canonical_user_id: new_s3_canonical_user_id(),
379            config: cfg,
380            etag: new_etag(),
381        };
382        self.store.origin_access_identities.insert(id, oai.clone());
383        Ok(oai)
384    }
385
386    /// GetCloudFrontOriginAccessIdentity.
387    pub fn get_oai(&self, id: &str) -> Result<CloudFrontOriginAccessIdentity, CloudFrontError> {
388        self.store
389            .origin_access_identities
390            .get(id)
391            .map(|r| r.value().clone())
392            .ok_or_else(|| CloudFrontError::no_such_oai(id))
393    }
394
395    /// UpdateCloudFrontOriginAccessIdentity.
396    pub fn update_oai(
397        &self,
398        id: &str,
399        if_match: Option<&str>,
400        cfg: CloudFrontOriginAccessIdentityConfig,
401    ) -> Result<CloudFrontOriginAccessIdentity, CloudFrontError> {
402        let mut entry = self
403            .store
404            .origin_access_identities
405            .get_mut(id)
406            .ok_or_else(|| CloudFrontError::no_such_oai(id))?;
407        check_if_match(if_match, &entry.etag)?;
408        entry.config = cfg;
409        entry.etag = new_etag();
410        Ok(entry.value().clone())
411    }
412
413    /// DeleteCloudFrontOriginAccessIdentity.
414    pub fn delete_oai(&self, id: &str, if_match: Option<&str>) -> Result<(), CloudFrontError> {
415        let entry = self
416            .store
417            .origin_access_identities
418            .get(id)
419            .ok_or_else(|| CloudFrontError::no_such_oai(id))?;
420        check_if_match(if_match, &entry.etag)?;
421        drop(entry);
422        self.store.origin_access_identities.remove(id);
423        self.store
424            .tags
425            .remove(&oai_arn(&self.config.account_id, id));
426        Ok(())
427    }
428
429    /// ListCloudFrontOriginAccessIdentities.
430    pub fn list_oais(&self) -> Vec<CloudFrontOriginAccessIdentity> {
431        let mut v: Vec<_> = self
432            .store
433            .origin_access_identities
434            .iter()
435            .map(|e| e.value().clone())
436            .collect();
437        v.sort_by(|a, b| a.id.cmp(&b.id));
438        v
439    }
440
441    // ---------------------------------------------------------------------
442    // Cache policy
443    // ---------------------------------------------------------------------
444
445    /// CreateCachePolicy.
446    pub fn create_cache_policy(
447        &self,
448        cfg: CachePolicyConfig,
449    ) -> Result<CachePolicy, CloudFrontError> {
450        if cfg.name.is_empty() {
451            return Err(CloudFrontError::InvalidArgument(
452                "CachePolicyConfig.Name is required".to_owned(),
453            ));
454        }
455        let id = uuid::Uuid::new_v4().to_string();
456        let p = CachePolicy {
457            id: id.clone(),
458            last_modified_time: Utc::now(),
459            config: cfg,
460            etag: new_etag(),
461            managed: false,
462        };
463        self.store.cache_policies.insert(id, p.clone());
464        Ok(p)
465    }
466
467    /// GetCachePolicy.
468    pub fn get_cache_policy(&self, id: &str) -> Result<CachePolicy, CloudFrontError> {
469        self.store
470            .cache_policies
471            .get(id)
472            .map(|r| r.value().clone())
473            .ok_or_else(|| CloudFrontError::no_such_cache_policy(id))
474    }
475
476    /// UpdateCachePolicy — managed policies are immutable.
477    pub fn update_cache_policy(
478        &self,
479        id: &str,
480        if_match: Option<&str>,
481        cfg: CachePolicyConfig,
482    ) -> Result<CachePolicy, CloudFrontError> {
483        let mut entry = self
484            .store
485            .cache_policies
486            .get_mut(id)
487            .ok_or_else(|| CloudFrontError::no_such_cache_policy(id))?;
488        if entry.managed {
489            return Err(CloudFrontError::AccessDenied(
490                "AWS-managed cache policies cannot be modified".to_owned(),
491            ));
492        }
493        check_if_match(if_match, &entry.etag)?;
494        entry.config = cfg;
495        entry.etag = new_etag();
496        entry.last_modified_time = Utc::now();
497        Ok(entry.value().clone())
498    }
499
500    /// DeleteCachePolicy.
501    pub fn delete_cache_policy(
502        &self,
503        id: &str,
504        if_match: Option<&str>,
505    ) -> Result<(), CloudFrontError> {
506        let entry = self
507            .store
508            .cache_policies
509            .get(id)
510            .ok_or_else(|| CloudFrontError::no_such_cache_policy(id))?;
511        if entry.managed {
512            return Err(CloudFrontError::AccessDenied(
513                "AWS-managed cache policies cannot be deleted".to_owned(),
514            ));
515        }
516        check_if_match(if_match, &entry.etag)?;
517        drop(entry);
518        self.store.cache_policies.remove(id);
519        self.store
520            .tags
521            .remove(&cache_policy_arn(&self.config.account_id, id));
522        Ok(())
523    }
524
525    /// ListCachePolicies.
526    pub fn list_cache_policies(&self) -> Vec<CachePolicy> {
527        let mut v: Vec<_> = self
528            .store
529            .cache_policies
530            .iter()
531            .map(|e| e.value().clone())
532            .collect();
533        v.sort_by(|a, b| a.config.name.cmp(&b.config.name));
534        v
535    }
536
537    // ---------------------------------------------------------------------
538    // Origin request policy
539    // ---------------------------------------------------------------------
540
541    /// CreateOriginRequestPolicy.
542    pub fn create_origin_request_policy(
543        &self,
544        cfg: OriginRequestPolicyConfig,
545    ) -> Result<OriginRequestPolicy, CloudFrontError> {
546        if cfg.name.is_empty() {
547            return Err(CloudFrontError::InvalidArgument(
548                "OriginRequestPolicyConfig.Name is required".to_owned(),
549            ));
550        }
551        let id = uuid::Uuid::new_v4().to_string();
552        let p = OriginRequestPolicy {
553            id: id.clone(),
554            last_modified_time: Utc::now(),
555            config: cfg,
556            etag: new_etag(),
557            managed: false,
558        };
559        self.store.origin_request_policies.insert(id, p.clone());
560        Ok(p)
561    }
562
563    /// GetOriginRequestPolicy.
564    pub fn get_origin_request_policy(
565        &self,
566        id: &str,
567    ) -> Result<OriginRequestPolicy, CloudFrontError> {
568        self.store
569            .origin_request_policies
570            .get(id)
571            .map(|r| r.value().clone())
572            .ok_or_else(|| CloudFrontError::no_such_origin_request_policy(id))
573    }
574
575    /// UpdateOriginRequestPolicy.
576    pub fn update_origin_request_policy(
577        &self,
578        id: &str,
579        if_match: Option<&str>,
580        cfg: OriginRequestPolicyConfig,
581    ) -> Result<OriginRequestPolicy, CloudFrontError> {
582        let mut entry = self
583            .store
584            .origin_request_policies
585            .get_mut(id)
586            .ok_or_else(|| CloudFrontError::no_such_origin_request_policy(id))?;
587        if entry.managed {
588            return Err(CloudFrontError::AccessDenied(
589                "AWS-managed origin request policies cannot be modified".to_owned(),
590            ));
591        }
592        check_if_match(if_match, &entry.etag)?;
593        entry.config = cfg;
594        entry.etag = new_etag();
595        entry.last_modified_time = Utc::now();
596        Ok(entry.value().clone())
597    }
598
599    /// DeleteOriginRequestPolicy.
600    pub fn delete_origin_request_policy(
601        &self,
602        id: &str,
603        if_match: Option<&str>,
604    ) -> Result<(), CloudFrontError> {
605        let entry = self
606            .store
607            .origin_request_policies
608            .get(id)
609            .ok_or_else(|| CloudFrontError::no_such_origin_request_policy(id))?;
610        if entry.managed {
611            return Err(CloudFrontError::AccessDenied(
612                "AWS-managed origin request policies cannot be deleted".to_owned(),
613            ));
614        }
615        check_if_match(if_match, &entry.etag)?;
616        drop(entry);
617        self.store.origin_request_policies.remove(id);
618        self.store
619            .tags
620            .remove(&origin_request_policy_arn(&self.config.account_id, id));
621        Ok(())
622    }
623
624    /// ListOriginRequestPolicies.
625    pub fn list_origin_request_policies(&self) -> Vec<OriginRequestPolicy> {
626        let mut v: Vec<_> = self
627            .store
628            .origin_request_policies
629            .iter()
630            .map(|e| e.value().clone())
631            .collect();
632        v.sort_by(|a, b| a.config.name.cmp(&b.config.name));
633        v
634    }
635
636    // ---------------------------------------------------------------------
637    // Response headers policy
638    // ---------------------------------------------------------------------
639
640    /// CreateResponseHeadersPolicy.
641    pub fn create_response_headers_policy(
642        &self,
643        cfg: ResponseHeadersPolicyConfig,
644    ) -> Result<ResponseHeadersPolicy, CloudFrontError> {
645        if cfg.name.is_empty() {
646            return Err(CloudFrontError::InvalidArgument(
647                "ResponseHeadersPolicyConfig.Name is required".to_owned(),
648            ));
649        }
650        let id = uuid::Uuid::new_v4().to_string();
651        let p = ResponseHeadersPolicy {
652            id: id.clone(),
653            last_modified_time: Utc::now(),
654            config: cfg,
655            etag: new_etag(),
656            managed: false,
657        };
658        self.store.response_headers_policies.insert(id, p.clone());
659        Ok(p)
660    }
661
662    /// GetResponseHeadersPolicy.
663    pub fn get_response_headers_policy(
664        &self,
665        id: &str,
666    ) -> Result<ResponseHeadersPolicy, CloudFrontError> {
667        self.store
668            .response_headers_policies
669            .get(id)
670            .map(|r| r.value().clone())
671            .ok_or_else(|| CloudFrontError::no_such_response_headers_policy(id))
672    }
673
674    /// UpdateResponseHeadersPolicy.
675    pub fn update_response_headers_policy(
676        &self,
677        id: &str,
678        if_match: Option<&str>,
679        cfg: ResponseHeadersPolicyConfig,
680    ) -> Result<ResponseHeadersPolicy, CloudFrontError> {
681        let mut entry = self
682            .store
683            .response_headers_policies
684            .get_mut(id)
685            .ok_or_else(|| CloudFrontError::no_such_response_headers_policy(id))?;
686        if entry.managed {
687            return Err(CloudFrontError::AccessDenied(
688                "AWS-managed response headers policies cannot be modified".to_owned(),
689            ));
690        }
691        check_if_match(if_match, &entry.etag)?;
692        entry.config = cfg;
693        entry.etag = new_etag();
694        entry.last_modified_time = Utc::now();
695        Ok(entry.value().clone())
696    }
697
698    /// DeleteResponseHeadersPolicy.
699    pub fn delete_response_headers_policy(
700        &self,
701        id: &str,
702        if_match: Option<&str>,
703    ) -> Result<(), CloudFrontError> {
704        let entry = self
705            .store
706            .response_headers_policies
707            .get(id)
708            .ok_or_else(|| CloudFrontError::no_such_response_headers_policy(id))?;
709        if entry.managed {
710            return Err(CloudFrontError::AccessDenied(
711                "AWS-managed response headers policies cannot be deleted".to_owned(),
712            ));
713        }
714        check_if_match(if_match, &entry.etag)?;
715        drop(entry);
716        self.store.response_headers_policies.remove(id);
717        self.store
718            .tags
719            .remove(&response_headers_policy_arn(&self.config.account_id, id));
720        Ok(())
721    }
722
723    /// ListResponseHeadersPolicies.
724    pub fn list_response_headers_policies(&self) -> Vec<ResponseHeadersPolicy> {
725        let mut v: Vec<_> = self
726            .store
727            .response_headers_policies
728            .iter()
729            .map(|e| e.value().clone())
730            .collect();
731        v.sort_by(|a, b| a.config.name.cmp(&b.config.name));
732        v
733    }
734
735    // ---------------------------------------------------------------------
736    // Key group / public key
737    // ---------------------------------------------------------------------
738
739    /// CreateKeyGroup.
740    pub fn create_key_group(&self, cfg: KeyGroupConfig) -> Result<KeyGroup, CloudFrontError> {
741        if cfg.name.is_empty() {
742            return Err(CloudFrontError::InvalidArgument(
743                "KeyGroupConfig.Name is required".to_owned(),
744            ));
745        }
746        let id = new_id_with_prefix('K');
747        let kg = KeyGroup {
748            id: id.clone(),
749            last_modified_time: Utc::now(),
750            config: cfg,
751            etag: new_etag(),
752        };
753        self.store.key_groups.insert(id, kg.clone());
754        Ok(kg)
755    }
756
757    /// GetKeyGroup.
758    pub fn get_key_group(&self, id: &str) -> Result<KeyGroup, CloudFrontError> {
759        self.store
760            .key_groups
761            .get(id)
762            .map(|r| r.value().clone())
763            .ok_or_else(|| CloudFrontError::no_such_resource("KeyGroup", id))
764    }
765
766    /// UpdateKeyGroup.
767    pub fn update_key_group(
768        &self,
769        id: &str,
770        if_match: Option<&str>,
771        cfg: KeyGroupConfig,
772    ) -> Result<KeyGroup, CloudFrontError> {
773        let mut entry = self
774            .store
775            .key_groups
776            .get_mut(id)
777            .ok_or_else(|| CloudFrontError::no_such_resource("KeyGroup", id))?;
778        check_if_match(if_match, &entry.etag)?;
779        entry.config = cfg;
780        entry.etag = new_etag();
781        entry.last_modified_time = Utc::now();
782        Ok(entry.value().clone())
783    }
784
785    /// DeleteKeyGroup.
786    pub fn delete_key_group(
787        &self,
788        id: &str,
789        if_match: Option<&str>,
790    ) -> Result<(), CloudFrontError> {
791        let entry = self
792            .store
793            .key_groups
794            .get(id)
795            .ok_or_else(|| CloudFrontError::no_such_resource("KeyGroup", id))?;
796        check_if_match(if_match, &entry.etag)?;
797        drop(entry);
798        self.store.key_groups.remove(id);
799        self.store
800            .tags
801            .remove(&key_group_arn(&self.config.account_id, id));
802        Ok(())
803    }
804
805    /// ListKeyGroups.
806    pub fn list_key_groups(&self) -> Vec<KeyGroup> {
807        let mut v: Vec<_> = self
808            .store
809            .key_groups
810            .iter()
811            .map(|e| e.value().clone())
812            .collect();
813        v.sort_by(|a, b| a.id.cmp(&b.id));
814        v
815    }
816
817    /// CreatePublicKey.
818    pub fn create_public_key(&self, cfg: PublicKeyConfig) -> Result<PublicKey, CloudFrontError> {
819        if cfg.name.is_empty() || cfg.encoded_key.is_empty() {
820            return Err(CloudFrontError::InvalidArgument(
821                "PublicKeyConfig Name and EncodedKey are required".to_owned(),
822            ));
823        }
824        let id = new_id_with_prefix('K');
825        let pk = PublicKey {
826            id: id.clone(),
827            created_time: Utc::now(),
828            config: cfg,
829            etag: new_etag(),
830        };
831        self.store.public_keys.insert(id, pk.clone());
832        Ok(pk)
833    }
834
835    /// GetPublicKey.
836    pub fn get_public_key(&self, id: &str) -> Result<PublicKey, CloudFrontError> {
837        self.store
838            .public_keys
839            .get(id)
840            .map(|r| r.value().clone())
841            .ok_or_else(|| CloudFrontError::no_such_public_key(id))
842    }
843
844    /// UpdatePublicKey.
845    pub fn update_public_key(
846        &self,
847        id: &str,
848        if_match: Option<&str>,
849        cfg: PublicKeyConfig,
850    ) -> Result<PublicKey, CloudFrontError> {
851        let mut entry = self
852            .store
853            .public_keys
854            .get_mut(id)
855            .ok_or_else(|| CloudFrontError::no_such_public_key(id))?;
856        check_if_match(if_match, &entry.etag)?;
857        entry.config = cfg;
858        entry.etag = new_etag();
859        Ok(entry.value().clone())
860    }
861
862    /// DeletePublicKey.
863    pub fn delete_public_key(
864        &self,
865        id: &str,
866        if_match: Option<&str>,
867    ) -> Result<(), CloudFrontError> {
868        let entry = self
869            .store
870            .public_keys
871            .get(id)
872            .ok_or_else(|| CloudFrontError::no_such_public_key(id))?;
873        check_if_match(if_match, &entry.etag)?;
874        drop(entry);
875        self.store.public_keys.remove(id);
876        self.store
877            .tags
878            .remove(&public_key_arn(&self.config.account_id, id));
879        Ok(())
880    }
881
882    /// ListPublicKeys.
883    pub fn list_public_keys(&self) -> Vec<PublicKey> {
884        let mut v: Vec<_> = self
885            .store
886            .public_keys
887            .iter()
888            .map(|e| e.value().clone())
889            .collect();
890        v.sort_by(|a, b| a.id.cmp(&b.id));
891        v
892    }
893
894    // ---------------------------------------------------------------------
895    // Functions (store-only; no JS runtime)
896    // ---------------------------------------------------------------------
897
898    /// CreateFunction.
899    pub fn create_function(
900        &self,
901        name: String,
902        cfg: FunctionConfig,
903        code: Vec<u8>,
904    ) -> Result<CloudFrontFunction, CloudFrontError> {
905        if name.is_empty() {
906            return Err(CloudFrontError::InvalidArgument(
907                "Function Name is required".to_owned(),
908            ));
909        }
910        if self.store.functions.contains_key(&name) {
911            return Err(CloudFrontError::AlreadyExists {
912                code: "FunctionAlreadyExists",
913                message: format!("Function {name} already exists"),
914            });
915        }
916        let arn = function_arn(&self.config.account_id, &name);
917        let now = Utc::now();
918        let f = CloudFrontFunction {
919            name: name.clone(),
920            arn: arn.clone(),
921            last_modified_time: now,
922            stage: "DEVELOPMENT".to_owned(),
923            metadata: FunctionMetadata {
924                function_arn: arn,
925                stage: "DEVELOPMENT".to_owned(),
926                created_time: now,
927                last_modified_time: now,
928            },
929            config: cfg,
930            code,
931            etag: new_etag(),
932            status: "UNPUBLISHED".to_owned(),
933        };
934        self.store.functions.insert(name, f.clone());
935        Ok(f)
936    }
937
938    /// DescribeFunction / GetFunction (code is the distinction — same storage).
939    pub fn get_function(&self, name: &str) -> Result<CloudFrontFunction, CloudFrontError> {
940        self.store
941            .functions
942            .get(name)
943            .map(|r| r.value().clone())
944            .ok_or_else(|| CloudFrontError::no_such_resource("Function", name))
945    }
946
947    /// UpdateFunction.
948    pub fn update_function(
949        &self,
950        name: &str,
951        if_match: Option<&str>,
952        cfg: FunctionConfig,
953        code: Vec<u8>,
954    ) -> Result<CloudFrontFunction, CloudFrontError> {
955        let mut entry = self
956            .store
957            .functions
958            .get_mut(name)
959            .ok_or_else(|| CloudFrontError::no_such_resource("Function", name))?;
960        check_if_match(if_match, &entry.etag)?;
961        entry.config = cfg;
962        entry.code = code;
963        entry.etag = new_etag();
964        entry.last_modified_time = Utc::now();
965        Ok(entry.value().clone())
966    }
967
968    /// DeleteFunction.
969    pub fn delete_function(
970        &self,
971        name: &str,
972        if_match: Option<&str>,
973    ) -> Result<(), CloudFrontError> {
974        let entry = self
975            .store
976            .functions
977            .get(name)
978            .ok_or_else(|| CloudFrontError::no_such_resource("Function", name))?;
979        check_if_match(if_match, &entry.etag)?;
980        drop(entry);
981        self.store.functions.remove(name);
982        Ok(())
983    }
984
985    /// PublishFunction: flips stage from DEVELOPMENT to LIVE.
986    pub fn publish_function(
987        &self,
988        name: &str,
989        if_match: Option<&str>,
990    ) -> Result<CloudFrontFunction, CloudFrontError> {
991        let mut entry = self
992            .store
993            .functions
994            .get_mut(name)
995            .ok_or_else(|| CloudFrontError::no_such_resource("Function", name))?;
996        check_if_match(if_match, &entry.etag)?;
997        entry.stage = "LIVE".to_owned();
998        entry.metadata.stage = "LIVE".to_owned();
999        entry.status = "PUBLISHED".to_owned();
1000        entry.etag = new_etag();
1001        Ok(entry.value().clone())
1002    }
1003
1004    /// TestFunction: returns canned success.
1005    pub fn test_function(
1006        &self,
1007        name: &str,
1008        event_object: &[u8],
1009    ) -> Result<(Vec<u8>, String), CloudFrontError> {
1010        let _ = self.get_function(name)?;
1011        let result = br#"{"status":"success","testStatus":"OK"}"#.to_vec();
1012        let compute_util = format!("compute_utilization_percent={}", event_object.len());
1013        Ok((result, compute_util))
1014    }
1015
1016    /// ListFunctions.
1017    pub fn list_functions(&self) -> Vec<CloudFrontFunction> {
1018        let mut v: Vec<_> = self
1019            .store
1020            .functions
1021            .iter()
1022            .map(|e| e.value().clone())
1023            .collect();
1024        v.sort_by(|a, b| a.name.cmp(&b.name));
1025        v
1026    }
1027
1028    // ---------------------------------------------------------------------
1029    // Field-Level Encryption (store-only)
1030    // ---------------------------------------------------------------------
1031
1032    /// CreateFieldLevelEncryptionConfig.
1033    pub fn create_fle_config(
1034        &self,
1035        cfg: FieldLevelEncryptionConfig,
1036    ) -> Result<FieldLevelEncryption, CloudFrontError> {
1037        let id = new_id_with_prefix('F');
1038        let f = FieldLevelEncryption {
1039            id: id.clone(),
1040            last_modified_time: Utc::now(),
1041            config: cfg,
1042            etag: new_etag(),
1043        };
1044        self.store.fle_configs.insert(id, f.clone());
1045        Ok(f)
1046    }
1047
1048    /// GetFieldLevelEncryption / GetFieldLevelEncryptionConfig.
1049    pub fn get_fle_config(&self, id: &str) -> Result<FieldLevelEncryption, CloudFrontError> {
1050        self.store
1051            .fle_configs
1052            .get(id)
1053            .map(|r| r.value().clone())
1054            .ok_or_else(|| CloudFrontError::no_such_resource("FieldLevelEncryption", id))
1055    }
1056
1057    /// UpdateFieldLevelEncryptionConfig.
1058    pub fn update_fle_config(
1059        &self,
1060        id: &str,
1061        if_match: Option<&str>,
1062        cfg: FieldLevelEncryptionConfig,
1063    ) -> Result<FieldLevelEncryption, CloudFrontError> {
1064        let mut entry = self
1065            .store
1066            .fle_configs
1067            .get_mut(id)
1068            .ok_or_else(|| CloudFrontError::no_such_resource("FieldLevelEncryption", id))?;
1069        check_if_match(if_match, &entry.etag)?;
1070        entry.config = cfg;
1071        entry.etag = new_etag();
1072        entry.last_modified_time = Utc::now();
1073        Ok(entry.value().clone())
1074    }
1075
1076    /// DeleteFieldLevelEncryptionConfig.
1077    pub fn delete_fle_config(
1078        &self,
1079        id: &str,
1080        if_match: Option<&str>,
1081    ) -> Result<(), CloudFrontError> {
1082        let entry = self
1083            .store
1084            .fle_configs
1085            .get(id)
1086            .ok_or_else(|| CloudFrontError::no_such_resource("FieldLevelEncryption", id))?;
1087        check_if_match(if_match, &entry.etag)?;
1088        drop(entry);
1089        self.store.fle_configs.remove(id);
1090        Ok(())
1091    }
1092
1093    /// ListFieldLevelEncryptionConfigs.
1094    pub fn list_fle_configs(&self) -> Vec<FieldLevelEncryption> {
1095        self.store
1096            .fle_configs
1097            .iter()
1098            .map(|e| e.value().clone())
1099            .collect()
1100    }
1101
1102    /// CreateFieldLevelEncryptionProfile.
1103    pub fn create_fle_profile(
1104        &self,
1105        cfg: FieldLevelEncryptionProfileConfig,
1106    ) -> Result<FieldLevelEncryptionProfile, CloudFrontError> {
1107        let id = new_id_with_prefix('P');
1108        let p = FieldLevelEncryptionProfile {
1109            id: id.clone(),
1110            last_modified_time: Utc::now(),
1111            config: cfg,
1112            etag: new_etag(),
1113        };
1114        self.store.fle_profiles.insert(id, p.clone());
1115        Ok(p)
1116    }
1117
1118    /// GetFieldLevelEncryptionProfile.
1119    pub fn get_fle_profile(
1120        &self,
1121        id: &str,
1122    ) -> Result<FieldLevelEncryptionProfile, CloudFrontError> {
1123        self.store
1124            .fle_profiles
1125            .get(id)
1126            .map(|r| r.value().clone())
1127            .ok_or_else(|| CloudFrontError::no_such_resource("FieldLevelEncryptionProfile", id))
1128    }
1129
1130    /// UpdateFieldLevelEncryptionProfile.
1131    pub fn update_fle_profile(
1132        &self,
1133        id: &str,
1134        if_match: Option<&str>,
1135        cfg: FieldLevelEncryptionProfileConfig,
1136    ) -> Result<FieldLevelEncryptionProfile, CloudFrontError> {
1137        let mut entry =
1138            self.store.fle_profiles.get_mut(id).ok_or_else(|| {
1139                CloudFrontError::no_such_resource("FieldLevelEncryptionProfile", id)
1140            })?;
1141        check_if_match(if_match, &entry.etag)?;
1142        entry.config = cfg;
1143        entry.etag = new_etag();
1144        entry.last_modified_time = Utc::now();
1145        Ok(entry.value().clone())
1146    }
1147
1148    /// DeleteFieldLevelEncryptionProfile.
1149    pub fn delete_fle_profile(
1150        &self,
1151        id: &str,
1152        if_match: Option<&str>,
1153    ) -> Result<(), CloudFrontError> {
1154        let entry =
1155            self.store.fle_profiles.get(id).ok_or_else(|| {
1156                CloudFrontError::no_such_resource("FieldLevelEncryptionProfile", id)
1157            })?;
1158        check_if_match(if_match, &entry.etag)?;
1159        drop(entry);
1160        self.store.fle_profiles.remove(id);
1161        Ok(())
1162    }
1163
1164    /// ListFieldLevelEncryptionProfiles.
1165    pub fn list_fle_profiles(&self) -> Vec<FieldLevelEncryptionProfile> {
1166        self.store
1167            .fle_profiles
1168            .iter()
1169            .map(|e| e.value().clone())
1170            .collect()
1171    }
1172
1173    // ---------------------------------------------------------------------
1174    // Monitoring subscription
1175    // ---------------------------------------------------------------------
1176
1177    /// CreateMonitoringSubscription.
1178    pub fn create_monitoring_subscription(
1179        &self,
1180        distribution_id: &str,
1181        enabled: bool,
1182    ) -> Result<MonitoringSubscription, CloudFrontError> {
1183        if !self.store.distributions.contains_key(distribution_id) {
1184            return Err(CloudFrontError::no_such_distribution(distribution_id));
1185        }
1186        let sub = MonitoringSubscription {
1187            distribution_id: distribution_id.to_owned(),
1188            realtime_metrics_subscription_status: if enabled {
1189                "Enabled".to_owned()
1190            } else {
1191                "Disabled".to_owned()
1192            },
1193        };
1194        self.store
1195            .monitoring_subscriptions
1196            .insert(distribution_id.to_owned(), sub.clone());
1197        Ok(sub)
1198    }
1199
1200    /// GetMonitoringSubscription.
1201    pub fn get_monitoring_subscription(
1202        &self,
1203        distribution_id: &str,
1204    ) -> Result<MonitoringSubscription, CloudFrontError> {
1205        self.store
1206            .monitoring_subscriptions
1207            .get(distribution_id)
1208            .map(|r| r.value().clone())
1209            .ok_or_else(|| {
1210                CloudFrontError::no_such_resource("MonitoringSubscription", distribution_id)
1211            })
1212    }
1213
1214    /// DeleteMonitoringSubscription.
1215    pub fn delete_monitoring_subscription(
1216        &self,
1217        distribution_id: &str,
1218    ) -> Result<(), CloudFrontError> {
1219        if self
1220            .store
1221            .monitoring_subscriptions
1222            .remove(distribution_id)
1223            .is_none()
1224        {
1225            return Err(CloudFrontError::no_such_resource(
1226                "MonitoringSubscription",
1227                distribution_id,
1228            ));
1229        }
1230        Ok(())
1231    }
1232
1233    // ---------------------------------------------------------------------
1234    // KeyValueStore
1235    // ---------------------------------------------------------------------
1236
1237    /// CreateKeyValueStore.
1238    pub fn create_kvs(
1239        &self,
1240        name: String,
1241        comment: String,
1242    ) -> Result<KeyValueStore, CloudFrontError> {
1243        let id = uuid::Uuid::new_v4().to_string();
1244        let arn = kvs_arn(&self.config.account_id, &id);
1245        let kvs = KeyValueStore {
1246            id: id.clone(),
1247            name,
1248            arn,
1249            comment,
1250            status: "PROVISIONING".to_owned(),
1251            last_modified_time: Utc::now(),
1252            etag: new_etag(),
1253        };
1254        self.store.key_value_stores.insert(id, kvs.clone());
1255        Ok(kvs)
1256    }
1257
1258    /// DescribeKeyValueStore.
1259    pub fn get_kvs(&self, id: &str) -> Result<KeyValueStore, CloudFrontError> {
1260        self.store
1261            .key_value_stores
1262            .get(id)
1263            .map(|r| r.value().clone())
1264            .ok_or_else(|| CloudFrontError::no_such_resource("KeyValueStore", id))
1265    }
1266
1267    /// UpdateKeyValueStore.
1268    pub fn update_kvs(
1269        &self,
1270        id: &str,
1271        if_match: Option<&str>,
1272        comment: String,
1273    ) -> Result<KeyValueStore, CloudFrontError> {
1274        let mut entry = self
1275            .store
1276            .key_value_stores
1277            .get_mut(id)
1278            .ok_or_else(|| CloudFrontError::no_such_resource("KeyValueStore", id))?;
1279        check_if_match(if_match, &entry.etag)?;
1280        entry.comment = comment;
1281        entry.etag = new_etag();
1282        entry.last_modified_time = Utc::now();
1283        Ok(entry.value().clone())
1284    }
1285
1286    /// DeleteKeyValueStore.
1287    pub fn delete_kvs(&self, id: &str, if_match: Option<&str>) -> Result<(), CloudFrontError> {
1288        let entry = self
1289            .store
1290            .key_value_stores
1291            .get(id)
1292            .ok_or_else(|| CloudFrontError::no_such_resource("KeyValueStore", id))?;
1293        check_if_match(if_match, &entry.etag)?;
1294        drop(entry);
1295        self.store.key_value_stores.remove(id);
1296        Ok(())
1297    }
1298
1299    /// ListKeyValueStores.
1300    pub fn list_kvs(&self) -> Vec<KeyValueStore> {
1301        self.store
1302            .key_value_stores
1303            .iter()
1304            .map(|e| e.value().clone())
1305            .collect()
1306    }
1307
1308    // ---------------------------------------------------------------------
1309    // Realtime log configs
1310    // ---------------------------------------------------------------------
1311
1312    /// CreateRealtimeLogConfig.
1313    pub fn create_realtime_log_config(
1314        &self,
1315        cfg: RealtimeLogConfig,
1316    ) -> Result<RealtimeLogConfig, CloudFrontError> {
1317        if cfg.name.is_empty() {
1318            return Err(CloudFrontError::InvalidArgument(
1319                "RealtimeLogConfig Name is required".to_owned(),
1320            ));
1321        }
1322        let mut final_cfg = cfg;
1323        if final_cfg.arn.is_empty() {
1324            final_cfg.arn = realtime_log_arn(&self.config.account_id, &final_cfg.name);
1325        }
1326        self.store
1327            .realtime_log_configs
1328            .insert(final_cfg.name.clone(), final_cfg.clone());
1329        Ok(final_cfg)
1330    }
1331
1332    /// GetRealtimeLogConfig.
1333    pub fn get_realtime_log_config(
1334        &self,
1335        name: &str,
1336    ) -> Result<RealtimeLogConfig, CloudFrontError> {
1337        self.store
1338            .realtime_log_configs
1339            .get(name)
1340            .map(|r| r.value().clone())
1341            .ok_or_else(|| CloudFrontError::no_such_resource("RealtimeLogConfig", name))
1342    }
1343
1344    /// UpdateRealtimeLogConfig.
1345    pub fn update_realtime_log_config(
1346        &self,
1347        cfg: RealtimeLogConfig,
1348    ) -> Result<RealtimeLogConfig, CloudFrontError> {
1349        if !self.store.realtime_log_configs.contains_key(&cfg.name) {
1350            return Err(CloudFrontError::no_such_resource(
1351                "RealtimeLogConfig",
1352                &cfg.name,
1353            ));
1354        }
1355        self.store
1356            .realtime_log_configs
1357            .insert(cfg.name.clone(), cfg.clone());
1358        Ok(cfg)
1359    }
1360
1361    /// DeleteRealtimeLogConfig.
1362    pub fn delete_realtime_log_config(&self, name: &str) -> Result<(), CloudFrontError> {
1363        if self.store.realtime_log_configs.remove(name).is_none() {
1364            return Err(CloudFrontError::no_such_resource("RealtimeLogConfig", name));
1365        }
1366        Ok(())
1367    }
1368
1369    /// ListRealtimeLogConfigs.
1370    pub fn list_realtime_log_configs(&self) -> Vec<RealtimeLogConfig> {
1371        self.store
1372            .realtime_log_configs
1373            .iter()
1374            .map(|e| e.value().clone())
1375            .collect()
1376    }
1377
1378    // ---------------------------------------------------------------------
1379    // Tagging (uniform across all taggable resources)
1380    // ---------------------------------------------------------------------
1381
1382    /// TagResource.
1383    pub fn tag_resource(&self, arn: &str, new_tags: &[Tag]) -> Result<(), CloudFrontError> {
1384        let mut entry = self.store.tags.entry(arn.to_owned()).or_default();
1385        merge_tags(entry.value_mut(), new_tags);
1386        Ok(())
1387    }
1388
1389    /// UntagResource.
1390    pub fn untag_resource(&self, arn: &str, keys: &[String]) -> Result<(), CloudFrontError> {
1391        if let Some(mut entry) = self.store.tags.get_mut(arn) {
1392            entry.retain(|t| !keys.iter().any(|k| k == &t.key));
1393        }
1394        Ok(())
1395    }
1396
1397    /// ListTagsForResource.
1398    pub fn list_tags_for_resource(&self, arn: &str) -> Result<TagSet, CloudFrontError> {
1399        Ok(self
1400            .store
1401            .tags
1402            .get(arn)
1403            .map(|r| r.value().clone())
1404            .unwrap_or_default())
1405    }
1406
1407    // ---------------------------------------------------------------------
1408    // Lifecycle simulators
1409    // ---------------------------------------------------------------------
1410
1411    fn spawn_distribution_deployment(self: &Arc<Self>, id: &str) {
1412        let delay = self.config.distribution_propagation;
1413        let id = id.to_owned();
1414        let provider = Arc::clone(self);
1415        tokio::spawn(async move {
1416            if !delay.is_zero() {
1417                tokio::time::sleep(delay).await;
1418            }
1419            if let Some(mut d) = provider.store.distributions.get_mut(&id) {
1420                d.status = ResourceStatus::Deployed;
1421            }
1422        });
1423    }
1424
1425    fn spawn_invalidation_completion(
1426        self: &Arc<Self>,
1427        distribution_id: &str,
1428        invalidation_id: &str,
1429    ) {
1430        let delay = self.config.invalidation_propagation;
1431        let dist_id = distribution_id.to_owned();
1432        let inv_id = invalidation_id.to_owned();
1433        let provider = Arc::clone(self);
1434        tokio::spawn(async move {
1435            if !delay.is_zero() {
1436                tokio::time::sleep(delay).await;
1437            }
1438            if let Some(mut inv) = provider
1439                .store
1440                .invalidations
1441                .get_mut(&(dist_id.clone(), inv_id))
1442            {
1443                inv.status = ResourceStatus::Completed;
1444            }
1445            if let Some(mut d) = provider.store.distributions.get_mut(&dist_id) {
1446                d.in_progress_invalidation_batches =
1447                    (d.in_progress_invalidation_batches - 1).max(0);
1448            }
1449        });
1450    }
1451}
1452
1453// ---------------------------------------------------------------------------
1454// Validation helpers
1455// ---------------------------------------------------------------------------
1456
1457fn validate_distribution_config(cfg: &DistributionConfig) -> Result<(), CloudFrontError> {
1458    if cfg.caller_reference.is_empty() {
1459        return Err(CloudFrontError::MissingArgument(
1460            "CallerReference is required".to_owned(),
1461        ));
1462    }
1463    if cfg.origins.is_empty() {
1464        return Err(CloudFrontError::MissingArgument(
1465            "At least one Origin is required".to_owned(),
1466        ));
1467    }
1468    if cfg.default_cache_behavior.target_origin_id.is_empty() {
1469        return Err(CloudFrontError::MissingArgument(
1470            "DefaultCacheBehavior.TargetOriginId is required".to_owned(),
1471        ));
1472    }
1473    let origin_ids: std::collections::HashSet<&str> =
1474        cfg.origins.iter().map(|o| o.id.as_str()).collect();
1475    if !origin_ids.contains(cfg.default_cache_behavior.target_origin_id.as_str())
1476        && !cfg
1477            .origin_groups
1478            .iter()
1479            .any(|g| g.id == cfg.default_cache_behavior.target_origin_id)
1480    {
1481        return Err(CloudFrontError::MalformedInput(format!(
1482            "DefaultCacheBehavior.TargetOriginId {} does not match any Origin.Id",
1483            cfg.default_cache_behavior.target_origin_id
1484        )));
1485    }
1486    for cb in &cfg.cache_behaviors {
1487        if !origin_ids.contains(cb.target_origin_id.as_str())
1488            && !cfg
1489                .origin_groups
1490                .iter()
1491                .any(|g| g.id == cb.target_origin_id)
1492        {
1493            return Err(CloudFrontError::MalformedInput(format!(
1494                "CacheBehavior.TargetOriginId {} does not match any Origin.Id",
1495                cb.target_origin_id
1496            )));
1497        }
1498    }
1499    Ok(())
1500}
1501
1502fn check_if_match(supplied: Option<&str>, current: &str) -> Result<(), CloudFrontError> {
1503    match supplied {
1504        None | Some("") => Err(CloudFrontError::InvalidIfMatchVersion(
1505            "The If-Match version is missing".to_owned(),
1506        )),
1507        Some(v) if v == current => Ok(()),
1508        Some(_) => Err(CloudFrontError::PreconditionFailed(
1509            "The If-Match version is not valid for the resource".to_owned(),
1510        )),
1511    }
1512}
1513
1514fn merge_tags(existing: &mut TagSet, new_tags: &[Tag]) {
1515    for t in new_tags {
1516        if let Some(existing_tag) = existing.iter_mut().find(|e| e.key == t.key) {
1517            existing_tag.value = t.value.clone();
1518        } else {
1519            existing.push(t.clone());
1520        }
1521    }
1522}