Skip to main content

fakecloud_cloudfront/
service.rs

1//! CloudFront REST-XML service implementation.
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use chrono::Utc;
8use http::header::{HeaderName, HeaderValue, ETAG, IF_MATCH, LOCATION};
9use http::{HeaderMap, StatusCode};
10use parking_lot::RwLock;
11use tokio::sync::Mutex as AsyncMutex;
12use uuid::Uuid;
13
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError, ResponseBody};
15use fakecloud_persistence::SnapshotStore;
16
17use crate::model::{
18    DistributionConfig, DistributionConfigWithTags, InvalidationBatch, TagKeys, Tags as ModelTags,
19};
20use crate::router::{route, Route};
21use crate::state::{
22    CloudFrontAccounts, CloudFrontSnapshot, SharedCloudFrontState, StoredDistribution,
23    StoredInvalidation, Tag, CLOUDFRONT_SNAPSHOT_SCHEMA_VERSION,
24};
25use crate::xml_io;
26
27/// CloudFront mutating actions share these prefixes; everything else
28/// (`Get*`/`List*`/`Describe*`/`Test*`/`Verify*`) is read-only. Used to decide
29/// when a handled request must trigger a persistence snapshot.
30fn is_mutating_action(action: &str) -> bool {
31    const PREFIXES: &[&str] = &[
32        "Create",
33        "Update",
34        "Delete",
35        "Copy",
36        "Associate",
37        "Disassociate",
38        "Tag",
39        "Untag",
40        "Publish",
41        "Put",
42    ];
43    PREFIXES.iter().any(|p| action.starts_with(p))
44}
45
46pub(crate) const DEFAULT_ACCOUNT: &str = "000000000000";
47
48const SUPPORTED_ACTIONS: &[&str] = &[
49    "CreateDistribution",
50    "CreateDistributionWithTags",
51    "GetDistribution",
52    "GetDistributionConfig",
53    "UpdateDistribution",
54    "DeleteDistribution",
55    "ListDistributions",
56    "CopyDistribution",
57    "CreateInvalidation",
58    "GetInvalidation",
59    "ListInvalidations",
60    "TagResource",
61    "UntagResource",
62    "ListTagsForResource",
63    "AssociateAlias",
64    "ListConflictingAliases",
65    "ListDistributionsByCachePolicyId",
66    "ListDistributionsByOriginRequestPolicyId",
67    "ListDistributionsByResponseHeadersPolicyId",
68    "ListDistributionsByKeyGroup",
69    "ListDistributionsByWebACLId",
70    "ListDistributionsByVpcOriginId",
71    "ListDistributionsByAnycastIpListId",
72    "ListDistributionsByConnectionMode",
73    "ListDistributionsByConnectionFunction",
74    "ListDistributionsByOwnedResource",
75    "ListDistributionsByTrustStore",
76    "ListDistributionsByRealtimeLogConfig",
77    "AssociateDistributionWebACL",
78    "DisassociateDistributionWebACL",
79    "CreateOriginAccessControl",
80    "GetOriginAccessControl",
81    "GetOriginAccessControlConfig",
82    "UpdateOriginAccessControl",
83    "DeleteOriginAccessControl",
84    "ListOriginAccessControls",
85    "CreateCachePolicy",
86    "GetCachePolicy",
87    "GetCachePolicyConfig",
88    "UpdateCachePolicy",
89    "DeleteCachePolicy",
90    "ListCachePolicies",
91    "CreateOriginRequestPolicy",
92    "GetOriginRequestPolicy",
93    "GetOriginRequestPolicyConfig",
94    "UpdateOriginRequestPolicy",
95    "DeleteOriginRequestPolicy",
96    "ListOriginRequestPolicies",
97    "CreateResponseHeadersPolicy",
98    "GetResponseHeadersPolicy",
99    "GetResponseHeadersPolicyConfig",
100    "UpdateResponseHeadersPolicy",
101    "DeleteResponseHeadersPolicy",
102    "ListResponseHeadersPolicies",
103    "CreateContinuousDeploymentPolicy",
104    "GetContinuousDeploymentPolicy",
105    "GetContinuousDeploymentPolicyConfig",
106    "UpdateContinuousDeploymentPolicy",
107    "DeleteContinuousDeploymentPolicy",
108    "ListContinuousDeploymentPolicies",
109    "CreateFunction",
110    "DescribeFunction",
111    "GetFunction",
112    "UpdateFunction",
113    "DeleteFunction",
114    "ListFunctions",
115    "PublishFunction",
116    "TestFunction",
117    "CreatePublicKey",
118    "GetPublicKey",
119    "GetPublicKeyConfig",
120    "UpdatePublicKey",
121    "DeletePublicKey",
122    "ListPublicKeys",
123    "CreateKeyGroup",
124    "GetKeyGroup",
125    "GetKeyGroupConfig",
126    "UpdateKeyGroup",
127    "DeleteKeyGroup",
128    "ListKeyGroups",
129    "CreateKeyValueStore",
130    "DescribeKeyValueStore",
131    "UpdateKeyValueStore",
132    "DeleteKeyValueStore",
133    "ListKeyValueStores",
134    "CreateCloudFrontOriginAccessIdentity",
135    "GetCloudFrontOriginAccessIdentity",
136    "GetCloudFrontOriginAccessIdentityConfig",
137    "UpdateCloudFrontOriginAccessIdentity",
138    "DeleteCloudFrontOriginAccessIdentity",
139    "ListCloudFrontOriginAccessIdentities",
140    "CreateMonitoringSubscription",
141    "GetMonitoringSubscription",
142    "DeleteMonitoringSubscription",
143    "CreateStreamingDistribution",
144    "CreateStreamingDistributionWithTags",
145    "GetStreamingDistribution",
146    "GetStreamingDistributionConfig",
147    "UpdateStreamingDistribution",
148    "DeleteStreamingDistribution",
149    "ListStreamingDistributions",
150    "CreateFieldLevelEncryptionConfig",
151    "GetFieldLevelEncryption",
152    "GetFieldLevelEncryptionConfig",
153    "UpdateFieldLevelEncryptionConfig",
154    "DeleteFieldLevelEncryptionConfig",
155    "ListFieldLevelEncryptionConfigs",
156    "CreateFieldLevelEncryptionProfile",
157    "GetFieldLevelEncryptionProfile",
158    "GetFieldLevelEncryptionProfileConfig",
159    "UpdateFieldLevelEncryptionProfile",
160    "DeleteFieldLevelEncryptionProfile",
161    "ListFieldLevelEncryptionProfiles",
162    "CreateRealtimeLogConfig",
163    "GetRealtimeLogConfig",
164    "UpdateRealtimeLogConfig",
165    "DeleteRealtimeLogConfig",
166    "ListRealtimeLogConfigs",
167    "CreateVpcOrigin",
168    "GetVpcOrigin",
169    "UpdateVpcOrigin",
170    "DeleteVpcOrigin",
171    "ListVpcOrigins",
172    "CreateAnycastIpList",
173    "GetAnycastIpList",
174    "UpdateAnycastIpList",
175    "DeleteAnycastIpList",
176    "ListAnycastIpLists",
177    "CreateTrustStore",
178    "GetTrustStore",
179    "UpdateTrustStore",
180    "DeleteTrustStore",
181    "ListTrustStores",
182    "GetResourcePolicy",
183    "PutResourcePolicy",
184    "DeleteResourcePolicy",
185    "CreateConnectionGroup",
186    "GetConnectionGroup",
187    "GetConnectionGroupByRoutingEndpoint",
188    "UpdateConnectionGroup",
189    "DeleteConnectionGroup",
190    "ListConnectionGroups",
191    "ListDomainConflicts",
192    "UpdateDomainAssociation",
193    "VerifyDnsConfiguration",
194    "GetManagedCertificateDetails",
195    "UpdateDistributionWithStagingConfig",
196    "CreateDistributionTenant",
197    "GetDistributionTenant",
198    "GetDistributionTenantByDomain",
199    "UpdateDistributionTenant",
200    "DeleteDistributionTenant",
201    "ListDistributionTenants",
202    "ListDistributionTenantsByCustomization",
203    "AssociateDistributionTenantWebACL",
204    "DisassociateDistributionTenantWebACL",
205    "CreateInvalidationForDistributionTenant",
206    "GetInvalidationForDistributionTenant",
207    "ListInvalidationsForDistributionTenant",
208    "CreateConnectionFunction",
209    "GetConnectionFunction",
210    "DescribeConnectionFunction",
211    "UpdateConnectionFunction",
212    "DeleteConnectionFunction",
213    "ListConnectionFunctions",
214    "PublishConnectionFunction",
215    "TestConnectionFunction",
216];
217
218pub struct CloudFrontService {
219    pub(crate) state: SharedCloudFrontState,
220    /// How long the propagation tick sleeps before flipping a freshly
221    /// created or updated Distribution / DistributionTenant /
222    /// ConnectionGroup / StreamingDistribution from `InProgress` to
223    /// `Deployed`. Real CloudFront takes ~15 minutes; the default of 1s
224    /// keeps SDK-driven integration tests fast while still simulating the
225    /// async transition. Tests can shrink it via
226    /// [`CloudFrontService::with_propagation_delay`], and operators can
227    /// override the default via the
228    /// `FAKECLOUD_CLOUDFRONT_STATUS_DELAY_SEC` env var.
229    pub(crate) propagation_delay: std::time::Duration,
230    pub(crate) snapshot_store: Option<Arc<dyn SnapshotStore>>,
231    pub(crate) snapshot_lock: Arc<AsyncMutex<()>>,
232}
233
234/// Resolve the default `InProgress` -> `Deployed` delay from the
235/// `FAKECLOUD_CLOUDFRONT_STATUS_DELAY_SEC` env var, falling back to 1
236/// second. The value parses as either a non-negative integer (seconds) or
237/// a floating-point number; `0` keeps the transition synchronous, which is
238/// useful for fast unit tests. Invalid input falls back to 1 second so a
239/// typo in the env var can't accidentally hang the process.
240fn default_propagation_delay() -> std::time::Duration {
241    let Ok(raw) = std::env::var("FAKECLOUD_CLOUDFRONT_STATUS_DELAY_SEC") else {
242        return std::time::Duration::from_secs(1);
243    };
244    let trimmed = raw.trim();
245    if let Ok(secs) = trimmed.parse::<u64>() {
246        return std::time::Duration::from_secs(secs);
247    }
248    if let Ok(secs) = trimmed.parse::<f64>() {
249        if secs.is_finite() && secs >= 0.0 {
250            return std::time::Duration::from_secs_f64(secs);
251        }
252    }
253    std::time::Duration::from_secs(1)
254}
255
256impl CloudFrontService {
257    pub fn new(state: SharedCloudFrontState) -> Self {
258        Self {
259            state,
260            propagation_delay: default_propagation_delay(),
261            snapshot_store: None,
262            snapshot_lock: Arc::new(AsyncMutex::new(())),
263        }
264    }
265
266    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
267        self.snapshot_store = Some(store);
268        self
269    }
270
271    pub fn shared_state(&self) -> SharedCloudFrontState {
272        Arc::clone(&self.state)
273    }
274
275    /// Persist current state as a snapshot. Held across the
276    /// clone-serialize-write sequence to prevent stale-last writes, with serde
277    /// + file I/O offloaded to the blocking pool.
278    async fn save_snapshot(&self) {
279        save_cloudfront_snapshot(
280            &self.state,
281            self.snapshot_store.clone(),
282            &self.snapshot_lock,
283        )
284        .await;
285    }
286
287    /// Build a hook that persists the current CloudFront state when invoked, or
288    /// `None` in memory mode. The CloudFormation provisioner mutates `state`
289    /// directly and uses this to write a CFN-provisioned resource through to
290    /// disk, the same way a direct mutating API call would.
291    pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
292        let store = self.snapshot_store.clone()?;
293        let state = self.state.clone();
294        let lock = self.snapshot_lock.clone();
295        Some(Arc::new(move || {
296            let state = state.clone();
297            let store = store.clone();
298            let lock = lock.clone();
299            Box::pin(async move {
300                save_cloudfront_snapshot(&state, Some(store), &lock).await;
301            })
302        }))
303    }
304
305    /// Re-arm the propagation tick for any Distribution / DistributionTenant /
306    /// ConnectionGroup / StreamingDistribution that was still `InProgress` when
307    /// the previous process exited, so they still transition to `Deployed`
308    /// after a restart. Called by the server after loading a snapshot.
309    pub fn rearm_in_progress(&self) {
310        let (dists, tenants, groups, streaming) = {
311            let s = self.state.read();
312            let mut dists = Vec::new();
313            let mut tenants = Vec::new();
314            let mut groups = Vec::new();
315            let mut streaming = Vec::new();
316            for account in s.accounts.values() {
317                for (id, d) in &account.distributions {
318                    if d.status == "InProgress" {
319                        dists.push(id.clone());
320                    }
321                }
322                for (id, t) in &account.distribution_tenants {
323                    if t.status == "InProgress" {
324                        tenants.push(id.clone());
325                    }
326                }
327                for (id, g) in &account.connection_groups {
328                    if g.status == "InProgress" {
329                        groups.push(id.clone());
330                    }
331                }
332                for (id, d) in &account.streaming_distributions {
333                    if d.status == "InProgress" {
334                        streaming.push(id.clone());
335                    }
336                }
337            }
338            (dists, tenants, groups, streaming)
339        };
340        for id in dists {
341            self.schedule_distribution_deploy(id);
342        }
343        for id in tenants {
344            self.schedule_distribution_tenant_deploy(id);
345        }
346        for id in groups {
347            self.schedule_connection_group_deploy(id);
348        }
349        for id in streaming {
350            self.schedule_streaming_distribution_deploy(id);
351        }
352    }
353
354    /// Override the propagation delay used by `CreateDistribution`,
355    /// `UpdateDistribution`, and the equivalent DistributionTenant /
356    /// ConnectionGroup ops. Tests pass a small duration so they don't
357    /// have to wait wall-clock seconds for the `InProgress` -> `Deployed`
358    /// transition.
359    pub fn with_propagation_delay(mut self, delay: std::time::Duration) -> Self {
360        self.propagation_delay = delay;
361        self
362    }
363
364    /// Flip a stored Distribution's status synchronously. Returns `false`
365    /// if no distribution matches `id` across any account. The admin
366    /// endpoint `POST /_fakecloud/cloudfront/distributions/{id}/status`
367    /// calls this so tests can force `InProgress` <-> `Deployed` without
368    /// waiting on the propagation tick.
369    pub fn set_distribution_status(&self, id: &str, status: &str) -> bool {
370        let mut state = self.state.write();
371        for account in state.accounts.values_mut() {
372            if let Some(dist) = account.distributions.get_mut(id) {
373                dist.status = status.to_string();
374                return true;
375            }
376        }
377        false
378    }
379
380    /// Admin-endpoint flavour of [`set_distribution_status`](Self::set_distribution_status)
381    /// that persists the forced status so it survives a restart.
382    pub async fn set_distribution_status_persistent(&self, id: &str, status: &str) -> bool {
383        let changed = self.set_distribution_status(id, status);
384        if changed {
385            self.save_snapshot().await;
386        }
387        changed
388    }
389}
390
391/// Persist the current CloudFront state as a snapshot. Offloads the serde +
392/// blocking file write to the Tokio blocking pool. Noop when `store` is `None`
393/// (memory mode). Shared by `CloudFrontService::save_snapshot`, the propagation
394/// ticks, and the CloudFormation provisioner persist hook so all route through
395/// the same serialize-and-write path.
396pub async fn save_cloudfront_snapshot(
397    state: &SharedCloudFrontState,
398    store: Option<Arc<dyn SnapshotStore>>,
399    lock: &AsyncMutex<()>,
400) {
401    let Some(store) = store else {
402        return;
403    };
404    let _guard = lock.lock().await;
405    let snapshot = CloudFrontSnapshot {
406        schema_version: CLOUDFRONT_SNAPSHOT_SCHEMA_VERSION,
407        accounts: Some(state.read().clone()),
408    };
409    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
410        let bytes = serde_json::to_vec(&snapshot)
411            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
412        store.save(&bytes)
413    })
414    .await;
415    match join {
416        Ok(Ok(())) => {}
417        Ok(Err(err)) => tracing::error!(%err, "failed to write cloudfront snapshot"),
418        Err(err) => tracing::error!(%err, "cloudfront snapshot task panicked"),
419    }
420}
421
422impl Default for CloudFrontService {
423    fn default() -> Self {
424        Self::new(Arc::new(RwLock::new(CloudFrontAccounts::new())))
425    }
426}
427
428#[async_trait]
429impl AwsService for CloudFrontService {
430    fn service_name(&self) -> &str {
431        "cloudfront"
432    }
433
434    fn supported_actions(&self) -> &[&str] {
435        SUPPORTED_ACTIONS
436    }
437
438    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
439        let resolved = match route(&req.method, &req.raw_path, &req.raw_query) {
440            Some(r) => r,
441            None => {
442                return Err(aws_error(
443                    StatusCode::NOT_FOUND,
444                    "InvalidArgument",
445                    format!("Unknown CloudFront route: {} {}", req.method, req.raw_path),
446                ));
447            }
448        };
449
450        let mutates = is_mutating_action(resolved.action);
451        let result = match resolved.action {
452            "CreateDistribution" => self.create_distribution(&req, false),
453            "CreateDistributionWithTags" => self.create_distribution(&req, true),
454            "GetDistribution" => self.get_distribution(&resolved),
455            "GetDistributionConfig" => self.get_distribution_config(&resolved),
456            "UpdateDistribution" => self.update_distribution(&req, &resolved),
457            "DeleteDistribution" => self.delete_distribution(&req, &resolved),
458            "ListDistributions" => self.list_distributions(&req),
459            "CopyDistribution" => self.copy_distribution(&req, &resolved),
460            "CreateInvalidation" => self.create_invalidation(&req, &resolved),
461            "GetInvalidation" => self.get_invalidation(&resolved),
462            "ListInvalidations" => self.list_invalidations(&resolved),
463            "TagResource" => self.tag_resource(&req),
464            "UntagResource" => self.untag_resource(&req),
465            "ListTagsForResource" => self.list_tags_for_resource(&req),
466            "AssociateAlias" => self.associate_alias(&req, &resolved),
467            "ListConflictingAliases" => self.list_conflicting_aliases(&req),
468            "AssociateDistributionWebACL" => self.associate_web_acl(&req, &resolved),
469            "DisassociateDistributionWebACL" => self.disassociate_web_acl(&req, &resolved),
470            "ListDistributionsByCachePolicyId"
471            | "ListDistributionsByOriginRequestPolicyId"
472            | "ListDistributionsByResponseHeadersPolicyId"
473            | "ListDistributionsByKeyGroup"
474            | "ListDistributionsByWebACLId"
475            | "ListDistributionsByVpcOriginId"
476            | "ListDistributionsByAnycastIpListId"
477            | "ListDistributionsByConnectionMode"
478            | "ListDistributionsByConnectionFunction"
479            | "ListDistributionsByOwnedResource"
480            | "ListDistributionsByTrustStore"
481            | "ListDistributionsByRealtimeLogConfig" => {
482                self.list_distributions_by(&req, &resolved, resolved.action)
483            }
484            "CreateOriginAccessControl" => self.create_origin_access_control(&req),
485            "GetOriginAccessControl" => self.get_origin_access_control(&resolved),
486            "GetOriginAccessControlConfig" => self.get_origin_access_control_config(&resolved),
487            "UpdateOriginAccessControl" => self.update_origin_access_control(&req, &resolved),
488            "DeleteOriginAccessControl" => self.delete_origin_access_control(&req, &resolved),
489            "ListOriginAccessControls" => self.list_origin_access_controls(&req),
490            "CreateCachePolicy" => self.create_cache_policy(&req),
491            "GetCachePolicy" => self.get_cache_policy(&resolved),
492            "GetCachePolicyConfig" => self.get_cache_policy_config(&resolved),
493            "UpdateCachePolicy" => self.update_cache_policy(&req, &resolved),
494            "DeleteCachePolicy" => self.delete_cache_policy(&req, &resolved),
495            "ListCachePolicies" => self.list_cache_policies(&req),
496            "CreateOriginRequestPolicy" => self.create_origin_request_policy(&req),
497            "GetOriginRequestPolicy" => self.get_origin_request_policy(&resolved),
498            "GetOriginRequestPolicyConfig" => self.get_origin_request_policy_config(&resolved),
499            "UpdateOriginRequestPolicy" => self.update_origin_request_policy(&req, &resolved),
500            "DeleteOriginRequestPolicy" => self.delete_origin_request_policy(&req, &resolved),
501            "ListOriginRequestPolicies" => self.list_origin_request_policies(&req),
502            "CreateResponseHeadersPolicy" => self.create_response_headers_policy(&req),
503            "GetResponseHeadersPolicy" => self.get_response_headers_policy(&resolved),
504            "GetResponseHeadersPolicyConfig" => self.get_response_headers_policy_config(&resolved),
505            "UpdateResponseHeadersPolicy" => self.update_response_headers_policy(&req, &resolved),
506            "DeleteResponseHeadersPolicy" => self.delete_response_headers_policy(&req, &resolved),
507            "ListResponseHeadersPolicies" => self.list_response_headers_policies(&req),
508            "CreateContinuousDeploymentPolicy" => self.create_continuous_deployment_policy(&req),
509            "GetContinuousDeploymentPolicy" => self.get_continuous_deployment_policy(&resolved),
510            "GetContinuousDeploymentPolicyConfig" => {
511                self.get_continuous_deployment_policy_config(&resolved)
512            }
513            "UpdateContinuousDeploymentPolicy" => {
514                self.update_continuous_deployment_policy(&req, &resolved)
515            }
516            "DeleteContinuousDeploymentPolicy" => {
517                self.delete_continuous_deployment_policy(&req, &resolved)
518            }
519            "ListContinuousDeploymentPolicies" => self.list_continuous_deployment_policies(&req),
520            "CreateFunction" => self.create_function(&req),
521            "DescribeFunction" => self.describe_function(&req, &resolved),
522            "GetFunction" => self.get_function(&req, &resolved),
523            "UpdateFunction" => self.update_function(&req, &resolved),
524            "DeleteFunction" => self.delete_function(&req, &resolved),
525            "ListFunctions" => self.list_functions(&req),
526            "PublishFunction" => self.publish_function(&req, &resolved),
527            "TestFunction" => self.test_function(&req, &resolved),
528            "CreatePublicKey" => self.create_public_key(&req),
529            "GetPublicKey" => self.get_public_key(&resolved),
530            "GetPublicKeyConfig" => self.get_public_key_config(&resolved),
531            "UpdatePublicKey" => self.update_public_key(&req, &resolved),
532            "DeletePublicKey" => self.delete_public_key(&req, &resolved),
533            "ListPublicKeys" => self.list_public_keys(&req),
534            "CreateKeyGroup" => self.create_key_group(&req),
535            "GetKeyGroup" => self.get_key_group(&resolved),
536            "GetKeyGroupConfig" => self.get_key_group_config(&resolved),
537            "UpdateKeyGroup" => self.update_key_group(&req, &resolved),
538            "DeleteKeyGroup" => self.delete_key_group(&req, &resolved),
539            "ListKeyGroups" => self.list_key_groups(&req),
540            "CreateKeyValueStore" => self.create_key_value_store(&req),
541            "DescribeKeyValueStore" => self.describe_key_value_store(&resolved),
542            "UpdateKeyValueStore" => self.update_key_value_store(&req, &resolved),
543            "DeleteKeyValueStore" => self.delete_key_value_store(&req, &resolved),
544            "ListKeyValueStores" => self.list_key_value_stores(&req),
545            "CreateCloudFrontOriginAccessIdentity" => self.create_oai(&req),
546            "GetCloudFrontOriginAccessIdentity" => self.get_oai(&resolved),
547            "GetCloudFrontOriginAccessIdentityConfig" => self.get_oai_config(&resolved),
548            "UpdateCloudFrontOriginAccessIdentity" => self.update_oai(&req, &resolved),
549            "DeleteCloudFrontOriginAccessIdentity" => self.delete_oai(&req, &resolved),
550            "ListCloudFrontOriginAccessIdentities" => self.list_oai(&req),
551            "CreateMonitoringSubscription" => self.create_monitoring_subscription(&req, &resolved),
552            "GetMonitoringSubscription" => self.get_monitoring_subscription(&resolved),
553            "DeleteMonitoringSubscription" => self.delete_monitoring_subscription(&resolved),
554            "CreateStreamingDistribution" => self.create_streaming_distribution(&req, false),
555            "CreateStreamingDistributionWithTags" => self.create_streaming_distribution(&req, true),
556            "GetStreamingDistribution" => self.get_streaming_distribution(&resolved),
557            "GetStreamingDistributionConfig" => self.get_streaming_distribution_config(&resolved),
558            "UpdateStreamingDistribution" => self.update_streaming_distribution(&req, &resolved),
559            "DeleteStreamingDistribution" => self.delete_streaming_distribution(&req, &resolved),
560            "ListStreamingDistributions" => self.list_streaming_distributions(&req),
561            "CreateFieldLevelEncryptionConfig" => self.create_field_level_encryption_config(&req),
562            "GetFieldLevelEncryption" => self.get_field_level_encryption(&resolved),
563            "GetFieldLevelEncryptionConfig" => self.get_field_level_encryption_config(&resolved),
564            "UpdateFieldLevelEncryptionConfig" => {
565                self.update_field_level_encryption_config(&req, &resolved)
566            }
567            "DeleteFieldLevelEncryptionConfig" => {
568                self.delete_field_level_encryption_config(&req, &resolved)
569            }
570            "ListFieldLevelEncryptionConfigs" => self.list_field_level_encryption_configs(&req),
571            "CreateFieldLevelEncryptionProfile" => self.create_field_level_encryption_profile(&req),
572            "GetFieldLevelEncryptionProfile" => self.get_field_level_encryption_profile(&resolved),
573            "GetFieldLevelEncryptionProfileConfig" => {
574                self.get_field_level_encryption_profile_config(&resolved)
575            }
576            "UpdateFieldLevelEncryptionProfile" => {
577                self.update_field_level_encryption_profile(&req, &resolved)
578            }
579            "DeleteFieldLevelEncryptionProfile" => {
580                self.delete_field_level_encryption_profile(&req, &resolved)
581            }
582            "ListFieldLevelEncryptionProfiles" => self.list_field_level_encryption_profiles(&req),
583            "CreateRealtimeLogConfig" => self.create_realtime_log_config(&req),
584            "GetRealtimeLogConfig" => self.get_realtime_log_config(&req),
585            "UpdateRealtimeLogConfig" => self.update_realtime_log_config(&req),
586            "DeleteRealtimeLogConfig" => self.delete_realtime_log_config(&req),
587            "ListRealtimeLogConfigs" => self.list_realtime_log_configs(&req),
588            "CreateVpcOrigin" => self.create_vpc_origin(&req),
589            "GetVpcOrigin" => self.get_vpc_origin(&resolved),
590            "UpdateVpcOrigin" => self.update_vpc_origin(&req, &resolved),
591            "DeleteVpcOrigin" => self.delete_vpc_origin(&req, &resolved),
592            "ListVpcOrigins" => self.list_vpc_origins(&req),
593            "CreateAnycastIpList" => self.create_anycast_ip_list(&req),
594            "GetAnycastIpList" => self.get_anycast_ip_list(&resolved),
595            "UpdateAnycastIpList" => self.update_anycast_ip_list(&req, &resolved),
596            "DeleteAnycastIpList" => self.delete_anycast_ip_list(&req, &resolved),
597            "ListAnycastIpLists" => self.list_anycast_ip_lists(&req),
598            "CreateTrustStore" => self.create_trust_store(&req),
599            "GetTrustStore" => self.get_trust_store(&resolved),
600            "UpdateTrustStore" => self.update_trust_store(&req, &resolved),
601            "DeleteTrustStore" => self.delete_trust_store(&req, &resolved),
602            "ListTrustStores" => self.list_trust_stores(&req),
603            "GetResourcePolicy" => self.get_resource_policy(&req),
604            "PutResourcePolicy" => self.put_resource_policy(&req),
605            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
606            "CreateConnectionGroup" => self.create_connection_group(&req),
607            "GetConnectionGroup" => self.get_connection_group(&resolved),
608            "GetConnectionGroupByRoutingEndpoint" => {
609                self.get_connection_group_by_routing_endpoint(&req)
610            }
611            "UpdateConnectionGroup" => self.update_connection_group(&req, &resolved),
612            "DeleteConnectionGroup" => self.delete_connection_group(&req, &resolved),
613            "ListConnectionGroups" => self.list_connection_groups(&req),
614            "ListDomainConflicts" => self.list_domain_conflicts(&req),
615            "UpdateDomainAssociation" => self.update_domain_association(&req),
616            "VerifyDnsConfiguration" => self.verify_dns_configuration(&req),
617            "GetManagedCertificateDetails" => self.get_managed_certificate_details(&resolved),
618            "UpdateDistributionWithStagingConfig" => {
619                self.update_distribution_with_staging_config(&req, &resolved)
620            }
621            "CreateDistributionTenant" => self.create_distribution_tenant(&req),
622            "GetDistributionTenant" => self.get_distribution_tenant(&resolved),
623            "GetDistributionTenantByDomain" => self.get_distribution_tenant_by_domain(&req),
624            "UpdateDistributionTenant" => self.update_distribution_tenant(&req, &resolved),
625            "DeleteDistributionTenant" => self.delete_distribution_tenant(&req, &resolved),
626            "ListDistributionTenants" => self.list_distribution_tenants(&req),
627            "ListDistributionTenantsByCustomization" => {
628                self.list_distribution_tenants_by_customization(&req)
629            }
630            "AssociateDistributionTenantWebACL" => {
631                self.associate_distribution_tenant_web_acl(&req, &resolved)
632            }
633            "DisassociateDistributionTenantWebACL" => {
634                self.disassociate_distribution_tenant_web_acl(&req, &resolved)
635            }
636            "CreateInvalidationForDistributionTenant" => {
637                self.create_invalidation_for_distribution_tenant(&req, &resolved)
638            }
639            "GetInvalidationForDistributionTenant" => {
640                self.get_invalidation_for_distribution_tenant(&resolved)
641            }
642            "ListInvalidationsForDistributionTenant" => {
643                self.list_invalidations_for_distribution_tenant(&resolved)
644            }
645            "CreateConnectionFunction" => self.create_connection_function(&req),
646            "GetConnectionFunction" => self.get_connection_function(&resolved),
647            "DescribeConnectionFunction" => self.describe_connection_function(&resolved),
648            "UpdateConnectionFunction" => self.update_connection_function(&req, &resolved),
649            "DeleteConnectionFunction" => self.delete_connection_function(&req, &resolved),
650            "ListConnectionFunctions" => self.list_connection_functions(&req),
651            "PublishConnectionFunction" => self.publish_connection_function(&req, &resolved),
652            "TestConnectionFunction" => self.test_connection_function(&req, &resolved),
653            other => Err(aws_error(
654                StatusCode::NOT_IMPLEMENTED,
655                "InvalidAction",
656                format!("CloudFront action {other} is not implemented yet"),
657            )),
658        };
659        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
660            self.save_snapshot().await;
661        }
662        result
663    }
664}
665
666// ─── Distribution handlers ────────────────────────────────────────────
667
668impl CloudFrontService {
669    fn create_distribution(
670        &self,
671        req: &AwsRequest,
672        with_tags: bool,
673    ) -> Result<AwsResponse, AwsServiceError> {
674        let (config, tags) = if with_tags {
675            let parsed: DistributionConfigWithTags = xml_io::from_xml_root(&req.body)
676                .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
677            let tags = parsed
678                .tags
679                .items
680                .map(|i| {
681                    i.tag
682                        .into_iter()
683                        .map(|t| Tag {
684                            key: t.key,
685                            value: t.value,
686                        })
687                        .collect()
688                })
689                .unwrap_or_default();
690            (parsed.distribution_config, tags)
691        } else {
692            let parsed: DistributionConfig = xml_io::from_xml_root(&req.body)
693                .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
694            (parsed, Vec::new())
695        };
696
697        validate_caller_reference(&config.caller_reference)?;
698        validate_origins(&config)?;
699
700        let mut state = self.state.write();
701        let account = state.entry(account_id(req));
702
703        if let Some(existing) = account
704            .distributions
705            .values()
706            .find(|d| d.config.caller_reference == config.caller_reference)
707        {
708            return Err(aws_error(
709                StatusCode::CONFLICT,
710                "DistributionAlreadyExists",
711                format!(
712                    "Distribution with the same CallerReference exists: {}",
713                    existing.id
714                ),
715            ));
716        }
717
718        let id = generate_distribution_id();
719        let now = Utc::now();
720        let etag = generate_etag();
721        let domain = format!("{}.cloudfront.net", id.to_lowercase());
722        let arn = format!(
723            "arn:aws:cloudfront::{}:distribution/{}",
724            account_id(req),
725            id
726        );
727
728        let stored = StoredDistribution {
729            id: id.clone(),
730            arn: arn.clone(),
731            // Real CloudFront returns InProgress immediately and flips to
732            // Deployed ~15 minutes later once the edge propagation completes.
733            // The spawn below mirrors that lifecycle on a configurable delay.
734            status: "InProgress".to_string(),
735            last_modified_time: now,
736            domain_name: domain,
737            in_progress_invalidation_batches: 0,
738            etag: etag.clone(),
739            config,
740        };
741        account.distributions.insert(id.clone(), stored.clone());
742        if !tags.is_empty() {
743            account.tags.insert(arn.clone(), tags);
744        }
745        drop(state);
746
747        self.schedule_distribution_deploy(id.clone());
748
749        let body = build_distribution_xml(&stored);
750        let mut headers = HeaderMap::new();
751        set_header(&mut headers, ETAG, &etag);
752        set_header(&mut headers, LOCATION, &stored.arn);
753        Ok(xml_response(StatusCode::CREATED, body, headers))
754    }
755
756    /// Spawn a tokio task that flips the named distribution from
757    /// `InProgress` to `Deployed` after `propagation_delay`. Used by
758    /// `CreateDistribution` and `UpdateDistribution` to model the real
759    /// CloudFront edge propagation lifecycle.
760    fn schedule_distribution_deploy(&self, id: String) {
761        let state = Arc::clone(&self.state);
762        let delay = self.propagation_delay;
763        let store = self.snapshot_store.clone();
764        let lock = self.snapshot_lock.clone();
765        tokio::spawn(async move {
766            tokio::time::sleep(delay).await;
767            let flipped = {
768                let mut s = state.write();
769                let mut flipped = false;
770                for account in s.accounts.values_mut() {
771                    if let Some(d) = account.distributions.get_mut(&id) {
772                        if d.status == "InProgress" {
773                            d.status = "Deployed".to_string();
774                            flipped = true;
775                        }
776                        break;
777                    }
778                }
779                flipped
780            };
781            if flipped {
782                save_cloudfront_snapshot(&state, store, &lock).await;
783            }
784        });
785    }
786
787    /// Same as [`schedule_distribution_deploy`] but for DistributionTenants.
788    pub(crate) fn schedule_distribution_tenant_deploy(&self, id: String) {
789        let state = Arc::clone(&self.state);
790        let delay = self.propagation_delay;
791        let store = self.snapshot_store.clone();
792        let lock = self.snapshot_lock.clone();
793        tokio::spawn(async move {
794            tokio::time::sleep(delay).await;
795            let flipped = {
796                let mut s = state.write();
797                let mut flipped = false;
798                for account in s.accounts.values_mut() {
799                    if let Some(t) = account.distribution_tenants.get_mut(&id) {
800                        if t.status == "InProgress" {
801                            t.status = "Deployed".to_string();
802                            flipped = true;
803                        }
804                        break;
805                    }
806                }
807                flipped
808            };
809            if flipped {
810                save_cloudfront_snapshot(&state, store, &lock).await;
811            }
812        });
813    }
814
815    /// Same as [`schedule_distribution_deploy`] but for ConnectionGroups.
816    pub(crate) fn schedule_connection_group_deploy(&self, id: String) {
817        let state = Arc::clone(&self.state);
818        let delay = self.propagation_delay;
819        let store = self.snapshot_store.clone();
820        let lock = self.snapshot_lock.clone();
821        tokio::spawn(async move {
822            tokio::time::sleep(delay).await;
823            let flipped = {
824                let mut s = state.write();
825                let mut flipped = false;
826                for account in s.accounts.values_mut() {
827                    if let Some(g) = account.connection_groups.get_mut(&id) {
828                        if g.status == "InProgress" {
829                            g.status = "Deployed".to_string();
830                            flipped = true;
831                        }
832                        break;
833                    }
834                }
835                flipped
836            };
837            if flipped {
838                save_cloudfront_snapshot(&state, store, &lock).await;
839            }
840        });
841    }
842
843    /// Same as [`schedule_distribution_deploy`] but for StreamingDistributions.
844    /// Real CloudFront mirrors the `InProgress` -> `Deployed` lifecycle for
845    /// RTMP streaming distributions, even though the resource is largely
846    /// deprecated.
847    pub(crate) fn schedule_streaming_distribution_deploy(&self, id: String) {
848        let state = Arc::clone(&self.state);
849        let delay = self.propagation_delay;
850        let store = self.snapshot_store.clone();
851        let lock = self.snapshot_lock.clone();
852        tokio::spawn(async move {
853            tokio::time::sleep(delay).await;
854            let flipped = {
855                let mut s = state.write();
856                let mut flipped = false;
857                for account in s.accounts.values_mut() {
858                    if let Some(d) = account.streaming_distributions.get_mut(&id) {
859                        if d.status == "InProgress" {
860                            d.status = "Deployed".to_string();
861                            flipped = true;
862                        }
863                        break;
864                    }
865                }
866                flipped
867            };
868            if flipped {
869                save_cloudfront_snapshot(&state, store, &lock).await;
870            }
871        });
872    }
873
874    fn get_distribution(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
875        let id = route
876            .id
877            .as_deref()
878            .ok_or_else(|| invalid_argument("missing distribution id"))?;
879        let state = self.state.read();
880        let account = state
881            .accounts
882            .get(DEFAULT_ACCOUNT)
883            .ok_or_else(|| no_such_distribution(id))?;
884        let dist = account
885            .distributions
886            .get(id)
887            .ok_or_else(|| no_such_distribution(id))?
888            .clone();
889        drop(state);
890        let body = build_distribution_xml(&dist);
891        let mut headers = HeaderMap::new();
892        set_header(&mut headers, ETAG, &dist.etag);
893        Ok(xml_response(StatusCode::OK, body, headers))
894    }
895
896    fn get_distribution_config(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
897        let id = route
898            .id
899            .as_deref()
900            .ok_or_else(|| invalid_argument("missing distribution id"))?;
901        let state = self.state.read();
902        let account = state
903            .accounts
904            .get(DEFAULT_ACCOUNT)
905            .ok_or_else(|| no_such_distribution(id))?;
906        let dist = account
907            .distributions
908            .get(id)
909            .ok_or_else(|| no_such_distribution(id))?
910            .clone();
911        drop(state);
912        let body = xml_io::to_xml_root("DistributionConfig", &dist.config)
913            .map_err(|e| internal_error(format!("xml encode failed: {e}")))?;
914        let mut headers = HeaderMap::new();
915        set_header(&mut headers, ETAG, &dist.etag);
916        Ok(xml_response(StatusCode::OK, body, headers))
917    }
918
919    fn update_distribution(
920        &self,
921        req: &AwsRequest,
922        route: &Route,
923    ) -> Result<AwsResponse, AwsServiceError> {
924        let id = route
925            .id
926            .as_deref()
927            .ok_or_else(|| invalid_argument("missing distribution id"))?;
928        let if_match = req
929            .headers
930            .get(IF_MATCH)
931            .and_then(|v| v.to_str().ok())
932            .ok_or_else(|| {
933                aws_error(
934                    StatusCode::BAD_REQUEST,
935                    "InvalidIfMatchVersion",
936                    "Missing If-Match header for UpdateDistribution",
937                )
938            })?
939            .to_string();
940        let new_config: DistributionConfig = xml_io::from_xml_root(&req.body)
941            .map_err(|e| invalid_argument(format!("invalid DistributionConfig XML: {e}")))?;
942        validate_caller_reference(&new_config.caller_reference)?;
943        validate_origins(&new_config)?;
944
945        let mut state = self.state.write();
946        let account = state
947            .accounts
948            .get_mut(DEFAULT_ACCOUNT)
949            .ok_or_else(|| no_such_distribution(id))?;
950        let dist = account
951            .distributions
952            .get_mut(id)
953            .ok_or_else(|| no_such_distribution(id))?;
954        if dist.etag != if_match {
955            return Err(aws_error(
956                StatusCode::PRECONDITION_FAILED,
957                "PreconditionFailed",
958                "If-Match header does not match the current ETag",
959            ));
960        }
961        // ETag stability: only bump the ETag and flip status back to
962        // InProgress when the new config actually differs from what we
963        // have on disk. A no-op UpdateDistribution (PUT the same config
964        // back) leaves the ETag intact, matching AWS behavior.
965        let config_changed = !configs_equal(&dist.config, &new_config);
966        if config_changed {
967            dist.config = new_config;
968            dist.etag = generate_etag();
969            dist.last_modified_time = Utc::now();
970            // UpdateDistribution kicks off a fresh edge propagation; AWS
971            // flips the status back to InProgress until the new config
972            // lands.
973            dist.status = "InProgress".to_string();
974        }
975        let snapshot = dist.clone();
976        drop(state);
977
978        if config_changed {
979            self.schedule_distribution_deploy(id.to_string());
980        }
981
982        let body = build_distribution_xml(&snapshot);
983        let mut headers = HeaderMap::new();
984        set_header(&mut headers, ETAG, &snapshot.etag);
985        Ok(xml_response(StatusCode::OK, body, headers))
986    }
987
988    fn delete_distribution(
989        &self,
990        req: &AwsRequest,
991        route: &Route,
992    ) -> Result<AwsResponse, AwsServiceError> {
993        let id = route
994            .id
995            .as_deref()
996            .ok_or_else(|| invalid_argument("missing distribution id"))?;
997        let if_match = req
998            .headers
999            .get(IF_MATCH)
1000            .and_then(|v| v.to_str().ok())
1001            .ok_or_else(|| {
1002                aws_error(
1003                    StatusCode::BAD_REQUEST,
1004                    "InvalidIfMatchVersion",
1005                    "Missing If-Match header for DeleteDistribution",
1006                )
1007            })?
1008            .to_string();
1009        let mut state = self.state.write();
1010        let account = state
1011            .accounts
1012            .get_mut(DEFAULT_ACCOUNT)
1013            .ok_or_else(|| no_such_distribution(id))?;
1014        {
1015            let dist = account
1016                .distributions
1017                .get(id)
1018                .ok_or_else(|| no_such_distribution(id))?;
1019            if dist.etag != if_match {
1020                return Err(aws_error(
1021                    StatusCode::PRECONDITION_FAILED,
1022                    "PreconditionFailed",
1023                    "If-Match header does not match the current ETag",
1024                ));
1025            }
1026            if dist.config.enabled {
1027                return Err(aws_error(
1028                    StatusCode::PRECONDITION_FAILED,
1029                    "DistributionNotDisabled",
1030                    "Distribution must be disabled before delete",
1031                ));
1032            }
1033        }
1034        let removed = account.distributions.remove(id).unwrap();
1035        account.tags.remove(&removed.arn);
1036        Ok(empty_response(StatusCode::NO_CONTENT))
1037    }
1038
1039    fn list_distributions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1040        let state = self.state.read();
1041        let mut dists: Vec<StoredDistribution> = state
1042            .accounts
1043            .values()
1044            .flat_map(|a| a.distributions.values().cloned())
1045            .collect();
1046        dists.sort_by_key(|a| a.last_modified_time);
1047        drop(state);
1048
1049        // CloudFront paginates with Marker (the next distribution Id) + MaxItems.
1050        let max_items = req
1051            .query_params
1052            .get("MaxItems")
1053            .or_else(|| req.query_params.get("maxitems"))
1054            .and_then(|v| v.parse::<usize>().ok())
1055            .filter(|n| *n > 0)
1056            .unwrap_or(100);
1057        let marker = req
1058            .query_params
1059            .get("Marker")
1060            .or_else(|| req.query_params.get("marker"));
1061        let start_idx = match marker {
1062            Some(m) if !m.is_empty() => {
1063                dists.iter().position(|d| &d.id == m).unwrap_or(dists.len())
1064            }
1065            _ => 0,
1066        };
1067        let page: Vec<StoredDistribution> = dists
1068            .iter()
1069            .skip(start_idx)
1070            .take(max_items)
1071            .cloned()
1072            .collect();
1073        let next_marker = dists.get(start_idx + page.len()).map(|d| d.id.clone());
1074
1075        let body = build_distribution_list_xml(
1076            &page,
1077            "DistributionList",
1078            marker.map(String::as_str).unwrap_or(""),
1079            max_items,
1080            next_marker.as_deref(),
1081        );
1082        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1083    }
1084
1085    fn list_distributions_by(
1086        &self,
1087        req: &AwsRequest,
1088        route: &Route,
1089        action: &str,
1090    ) -> Result<AwsResponse, AwsServiceError> {
1091        // Each "by-X" listing has a Smithy-required identifier (path or query)
1092        // plus, for ConnectionMode, an enum-constrained discriminator. The
1093        // synthetic probe sends `negative_omit_*` variants without these, so
1094        // every handler that can short-circuit on a missing identifier must
1095        // do so up-front. Otherwise the empty-list 200 looks like an honest
1096        // pass to the probe and a 100% conformance number is unreachable.
1097        match action {
1098            // Path-id ops: route.id is the URL placeholder. If the probe
1099            // omitted the field, the substitution left the literal
1100            // `{Member}` braces in place — easy to detect.
1101            "ListDistributionsByCachePolicyId"
1102            | "ListDistributionsByOriginRequestPolicyId"
1103            | "ListDistributionsByResponseHeadersPolicyId"
1104            | "ListDistributionsByKeyGroup"
1105            | "ListDistributionsByWebACLId"
1106            | "ListDistributionsByVpcOriginId"
1107            | "ListDistributionsByAnycastIpListId"
1108            | "ListDistributionsByOwnedResource" => {
1109                let id = route.id.as_deref().unwrap_or("");
1110                if is_placeholder_label(id) {
1111                    return Err(invalid_argument(format!(
1112                        "Required URL identifier for {action} is missing or invalid"
1113                    )));
1114                }
1115            }
1116            "ListDistributionsByConnectionMode" => {
1117                let id = route.id.as_deref().unwrap_or("");
1118                if is_placeholder_label(id) {
1119                    return Err(invalid_argument(
1120                        "ConnectionMode is required for ListDistributionsByConnectionMode",
1121                    ));
1122                }
1123                if id != "direct" && id != "tenant-only" {
1124                    return Err(invalid_argument(format!(
1125                        "ConnectionMode must be 'direct' or 'tenant-only', got '{id}'"
1126                    )));
1127                }
1128            }
1129            "ListDistributionsByConnectionFunction"
1130                if parse_query_value(&req.raw_query, "ConnectionFunctionIdentifier").is_none() =>
1131            {
1132                return Err(invalid_argument(
1133                    "ConnectionFunctionIdentifier query parameter is required",
1134                ));
1135            }
1136            "ListDistributionsByTrustStore"
1137                if parse_query_value(&req.raw_query, "TrustStoreIdentifier").is_none() =>
1138            {
1139                return Err(invalid_argument(
1140                    "TrustStoreIdentifier query parameter is required",
1141                ));
1142            }
1143            _ => {}
1144        }
1145
1146        // The "by-X" listings each have a distinct response root element.
1147        // We never index distributions by the predicate (that would require
1148        // shipping each policy/key-group/etc service first), so each
1149        // response is empty until those services land.
1150        let root = match action {
1151            "ListDistributionsByCachePolicyId"
1152            | "ListDistributionsByOriginRequestPolicyId"
1153            | "ListDistributionsByResponseHeadersPolicyId"
1154            | "ListDistributionsByKeyGroup"
1155            | "ListDistributionsByVpcOriginId" => "DistributionIdList",
1156            "ListDistributionsByOwnedResource" => "DistributionIdOwnerList",
1157            _ => "DistributionList",
1158        };
1159        let body = build_empty_distribution_id_list(root);
1160        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1161    }
1162
1163    fn copy_distribution(
1164        &self,
1165        req: &AwsRequest,
1166        route: &Route,
1167    ) -> Result<AwsResponse, AwsServiceError> {
1168        let primary_id = route
1169            .id
1170            .as_deref()
1171            .ok_or_else(|| invalid_argument("missing primary distribution id"))?;
1172        let if_match = req
1173            .headers
1174            .get(IF_MATCH)
1175            .and_then(|v| v.to_str().ok())
1176            .ok_or_else(|| {
1177                aws_error(
1178                    StatusCode::BAD_REQUEST,
1179                    "InvalidIfMatchVersion",
1180                    "Missing If-Match header for CopyDistribution",
1181                )
1182            })?
1183            .to_string();
1184        let parsed: CopyDistributionRequest = xml_io::from_xml_root(&req.body)
1185            .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
1186        validate_caller_reference(&parsed.caller_reference)?;
1187        let mut state = self.state.write();
1188        let account = state
1189            .accounts
1190            .get_mut(DEFAULT_ACCOUNT)
1191            .ok_or_else(|| no_such_distribution(primary_id))?;
1192        let primary = account
1193            .distributions
1194            .get(primary_id)
1195            .ok_or_else(|| no_such_distribution(primary_id))?
1196            .clone();
1197        if primary.etag != if_match {
1198            return Err(aws_error(
1199                StatusCode::PRECONDITION_FAILED,
1200                "PreconditionFailed",
1201                "If-Match header does not match the current ETag",
1202            ));
1203        }
1204        if account
1205            .distributions
1206            .values()
1207            .any(|d| d.config.caller_reference == parsed.caller_reference)
1208        {
1209            return Err(aws_error(
1210                StatusCode::CONFLICT,
1211                "DistributionAlreadyExists",
1212                "Distribution with the same CallerReference exists",
1213            ));
1214        }
1215        let new_id = generate_distribution_id();
1216        let mut config = primary.config.clone();
1217        config.caller_reference = parsed.caller_reference;
1218        config.enabled = parsed.enabled.unwrap_or(false);
1219        config.staging = parsed.staging;
1220        let now = Utc::now();
1221        let etag = generate_etag();
1222        let arn = format!(
1223            "arn:aws:cloudfront::{}:distribution/{}",
1224            account_id(req),
1225            new_id
1226        );
1227        let stored = StoredDistribution {
1228            id: new_id.clone(),
1229            arn: arn.clone(),
1230            status: "InProgress".to_string(),
1231            last_modified_time: now,
1232            domain_name: format!("{}.cloudfront.net", new_id.to_lowercase()),
1233            in_progress_invalidation_batches: 0,
1234            etag: etag.clone(),
1235            config,
1236        };
1237        account.distributions.insert(new_id.clone(), stored.clone());
1238        drop(state);
1239        self.schedule_distribution_deploy(new_id);
1240        let body = build_distribution_xml(&stored);
1241        let mut headers = HeaderMap::new();
1242        set_header(&mut headers, ETAG, &etag);
1243        set_header(&mut headers, LOCATION, &stored.arn);
1244        Ok(xml_response(StatusCode::CREATED, body, headers))
1245    }
1246}
1247
1248#[derive(Debug, serde::Deserialize, Default)]
1249#[serde(rename_all = "PascalCase")]
1250struct CopyDistributionRequest {
1251    caller_reference: String,
1252    #[serde(default)]
1253    enabled: Option<bool>,
1254    #[serde(default)]
1255    staging: Option<bool>,
1256}
1257
1258// ─── Invalidations ────────────────────────────────────────────────────
1259
1260impl CloudFrontService {
1261    fn create_invalidation(
1262        &self,
1263        req: &AwsRequest,
1264        route: &Route,
1265    ) -> Result<AwsResponse, AwsServiceError> {
1266        let dist_id = route
1267            .id
1268            .as_deref()
1269            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1270        let batch: InvalidationBatch = xml_io::from_xml_root(&req.body)
1271            .map_err(|e| invalid_argument(format!("invalid InvalidationBatch XML: {e}")))?;
1272        if batch.caller_reference.is_empty() {
1273            return Err(invalid_argument("CallerReference is required"));
1274        }
1275        if batch.paths.quantity < 1 {
1276            return Err(invalid_argument(
1277                "InvalidationBatch.Paths must be non-empty",
1278            ));
1279        }
1280        let mut state = self.state.write();
1281        let account = state.entry(DEFAULT_ACCOUNT);
1282        if !account.distributions.contains_key(dist_id) {
1283            return Err(no_such_distribution(dist_id));
1284        }
1285        let id = generate_invalidation_id();
1286        let stored = StoredInvalidation {
1287            id: id.clone(),
1288            distribution_id: dist_id.to_string(),
1289            status: "Completed".to_string(),
1290            create_time: Utc::now(),
1291            batch: batch.clone(),
1292        };
1293        account.invalidations.insert(id.clone(), stored.clone());
1294        drop(state);
1295        let body = build_invalidation_xml(&stored);
1296        let mut headers = HeaderMap::new();
1297        set_header(
1298            &mut headers,
1299            LOCATION,
1300            &format!(
1301                "/2020-05-31/distribution/{dist_id}/invalidation/{}",
1302                stored.id
1303            ),
1304        );
1305        Ok(xml_response(StatusCode::CREATED, body, headers))
1306    }
1307
1308    fn get_invalidation(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
1309        let dist_id = route
1310            .id
1311            .as_deref()
1312            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1313        let inv_id = route
1314            .second_id
1315            .as_deref()
1316            .ok_or_else(|| invalid_argument("missing invalidation id"))?;
1317        let state = self.state.read();
1318        let account = state
1319            .accounts
1320            .get(DEFAULT_ACCOUNT)
1321            .ok_or_else(|| no_such_invalidation(inv_id))?;
1322        if !account.distributions.contains_key(dist_id) {
1323            return Err(no_such_distribution(dist_id));
1324        }
1325        let inv = account
1326            .invalidations
1327            .get(inv_id)
1328            .filter(|i| i.distribution_id == dist_id)
1329            .ok_or_else(|| no_such_invalidation(inv_id))?
1330            .clone();
1331        drop(state);
1332        let body = build_invalidation_xml(&inv);
1333        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1334    }
1335
1336    fn list_invalidations(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
1337        let dist_id = route
1338            .id
1339            .as_deref()
1340            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1341        let state = self.state.read();
1342        let account = state
1343            .accounts
1344            .get(DEFAULT_ACCOUNT)
1345            .ok_or_else(|| no_such_distribution(dist_id))?;
1346        if !account.distributions.contains_key(dist_id) {
1347            return Err(no_such_distribution(dist_id));
1348        }
1349        let mut items: Vec<&StoredInvalidation> = account
1350            .invalidations
1351            .values()
1352            .filter(|i| i.distribution_id == dist_id)
1353            .collect();
1354        items.sort_by_key(|a| a.create_time);
1355        let body = build_invalidation_list_xml(&items);
1356        drop(state);
1357        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1358    }
1359}
1360
1361// ─── Tags ─────────────────────────────────────────────────────────────
1362
1363impl CloudFrontService {
1364    fn parse_arn_query(query: &str) -> Option<String> {
1365        for pair in query.split('&').filter(|p| !p.is_empty()) {
1366            if let Some(rest) = pair.strip_prefix("Resource=") {
1367                return Some(percent_decode(rest));
1368            }
1369        }
1370        None
1371    }
1372
1373    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1374        let arn = Self::parse_arn_query(&req.raw_query)
1375            .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1376        let parsed: ModelTags = xml_io::from_xml_root(&req.body)
1377            .map_err(|e| invalid_argument(format!("invalid Tags XML: {e}")))?;
1378        let new_tags: Vec<Tag> = parsed
1379            .items
1380            .map(|i| {
1381                i.tag
1382                    .into_iter()
1383                    .map(|t| Tag {
1384                        key: t.key,
1385                        value: t.value,
1386                    })
1387                    .collect()
1388            })
1389            .unwrap_or_default();
1390        let mut state = self.state.write();
1391        let account = state.entry(DEFAULT_ACCOUNT);
1392        let entry = account.tags.entry(arn).or_default();
1393        for tag in new_tags {
1394            if let Some(existing) = entry.iter_mut().find(|t| t.key == tag.key) {
1395                existing.value = tag.value;
1396            } else {
1397                entry.push(tag);
1398            }
1399        }
1400        Ok(empty_response(StatusCode::NO_CONTENT))
1401    }
1402
1403    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1404        let arn = Self::parse_arn_query(&req.raw_query)
1405            .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1406        let parsed: TagKeys = xml_io::from_xml_root(&req.body)
1407            .map_err(|e| invalid_argument(format!("invalid TagKeys XML: {e}")))?;
1408        let keys: Vec<String> = parsed.items.map(|k| k.key).unwrap_or_default();
1409        let mut state = self.state.write();
1410        let account = state.entry(DEFAULT_ACCOUNT);
1411        if let Some(existing) = account.tags.get_mut(&arn) {
1412            existing.retain(|t| !keys.contains(&t.key));
1413        }
1414        Ok(empty_response(StatusCode::NO_CONTENT))
1415    }
1416
1417    fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1418        let arn = Self::parse_arn_query(&req.raw_query)
1419            .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1420        let state = self.state.read();
1421        let tags = state
1422            .accounts
1423            .get(DEFAULT_ACCOUNT)
1424            .and_then(|a| a.tags.get(&arn))
1425            .cloned()
1426            .unwrap_or_default();
1427        drop(state);
1428        let body = build_tags_xml(&tags);
1429        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1430    }
1431}
1432
1433// ─── Aliases / WebACL ─────────────────────────────────────────────────
1434
1435impl CloudFrontService {
1436    fn associate_alias(
1437        &self,
1438        req: &AwsRequest,
1439        route: &Route,
1440    ) -> Result<AwsResponse, AwsServiceError> {
1441        let id = route
1442            .id
1443            .as_deref()
1444            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1445        let alias = parse_query_value(&req.raw_query, "Alias")
1446            .ok_or_else(|| invalid_argument("Alias query parameter is required"))?;
1447        let mut state = self.state.write();
1448        let account = state
1449            .accounts
1450            .get_mut(DEFAULT_ACCOUNT)
1451            .ok_or_else(|| no_such_distribution(id))?;
1452        // Reject if the alias is already attached to a different distribution.
1453        if let Some(other) = account.distributions.values().find(|d| {
1454            d.id != id
1455                && d.config
1456                    .aliases
1457                    .as_ref()
1458                    .and_then(|a| a.items.as_ref())
1459                    .is_some_and(|i| i.cname.iter().any(|c| c == &alias))
1460        }) {
1461            return Err(aws_error(
1462                StatusCode::CONFLICT,
1463                "CNAMEAlreadyExists",
1464                format!(
1465                    "Alias {alias} is already associated with distribution {}",
1466                    other.id
1467                ),
1468            ));
1469        }
1470        let dist = account
1471            .distributions
1472            .get_mut(id)
1473            .ok_or_else(|| no_such_distribution(id))?;
1474        let aliases = dist.config.aliases.get_or_insert_with(Default::default);
1475        let items = aliases
1476            .items
1477            .get_or_insert_with(crate::model::AliasItems::default);
1478        if !items.cname.iter().any(|c| c == &alias) {
1479            items.cname.push(alias.clone());
1480            aliases.quantity = items.cname.len() as i32;
1481        }
1482        dist.etag = generate_etag();
1483        dist.last_modified_time = Utc::now();
1484        Ok(empty_response(StatusCode::OK))
1485    }
1486
1487    fn list_conflicting_aliases(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1488        let alias = parse_query_value(&req.raw_query, "Alias")
1489            .ok_or_else(|| invalid_argument("Alias query parameter is required"))?;
1490        let dist_id = parse_query_value(&req.raw_query, "DistributionId")
1491            .ok_or_else(|| invalid_argument("DistributionId query parameter is required"))?;
1492        // aliasString max 253, distributionIdString max 25 per the Smithy
1493        // model. Reject probe-generated boundary variants that overrun the
1494        // documented length so they produce the declared InvalidArgument
1495        // instead of an empty 200.
1496        if alias.len() > 253 {
1497            return Err(invalid_argument(format!(
1498                "Alias length {} exceeds maximum 253",
1499                alias.len()
1500            )));
1501        }
1502        if dist_id.len() > 25 {
1503            return Err(invalid_argument(format!(
1504                "DistributionId length {} exceeds maximum 25",
1505                dist_id.len()
1506            )));
1507        }
1508        if let Some(max_items) = parse_query_value(&req.raw_query, "MaxItems") {
1509            let n: i64 = max_items.parse().map_err(|_| {
1510                invalid_argument(format!("MaxItems must be an integer, got '{max_items}'"))
1511            })?;
1512            if n > 100 {
1513                return Err(invalid_argument(format!(
1514                    "MaxItems {n} exceeds maximum 100"
1515                )));
1516            }
1517        }
1518        // We never produce conflicts because every alias is owned by one
1519        // distribution at most. Return an empty list with the proper shape.
1520        let body = format!(
1521            "{XML_DECL}<ConflictingAliasesList xmlns=\"{NS}\"><Quantity>0</Quantity></ConflictingAliasesList>",
1522            NS = crate::NAMESPACE,
1523            XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1524        );
1525        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1526    }
1527
1528    fn associate_web_acl(
1529        &self,
1530        req: &AwsRequest,
1531        route: &Route,
1532    ) -> Result<AwsResponse, AwsServiceError> {
1533        let id = route
1534            .id
1535            .as_deref()
1536            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1537        let parsed: AssociateAliasRequest = xml_io::from_xml_root(&req.body)
1538            .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
1539        let mut state = self.state.write();
1540        let account = state
1541            .accounts
1542            .get_mut(DEFAULT_ACCOUNT)
1543            .ok_or_else(|| no_such_distribution(id))?;
1544        let dist = account
1545            .distributions
1546            .get_mut(id)
1547            .ok_or_else(|| no_such_distribution(id))?;
1548        dist.config.web_acl_id = Some(parsed.web_acl_arn.clone());
1549        dist.etag = generate_etag();
1550        dist.last_modified_time = Utc::now();
1551        let body = format!(
1552            "{XML_DECL}<AssociateDistributionWebACLResult xmlns=\"{NS}\"><Id>{}</Id><WebACLArn>{}</WebACLArn></AssociateDistributionWebACLResult>",
1553            esc(id), esc(&parsed.web_acl_arn),
1554            NS = crate::NAMESPACE,
1555            XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1556        );
1557        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1558    }
1559
1560    fn disassociate_web_acl(
1561        &self,
1562        _req: &AwsRequest,
1563        route: &Route,
1564    ) -> Result<AwsResponse, AwsServiceError> {
1565        let id = route
1566            .id
1567            .as_deref()
1568            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1569        // DisassociateDistributionWebACL's Smithy model declares EntityNotFound
1570        // (not NoSuchDistribution) for unknown distribution IDs.
1571        let entity_not_found = || {
1572            aws_error(
1573                StatusCode::NOT_FOUND,
1574                "EntityNotFound",
1575                format!("The specified distribution does not exist: {id}"),
1576            )
1577        };
1578        let mut state = self.state.write();
1579        let account = state
1580            .accounts
1581            .get_mut(DEFAULT_ACCOUNT)
1582            .ok_or_else(entity_not_found)?;
1583        let dist = account
1584            .distributions
1585            .get_mut(id)
1586            .ok_or_else(entity_not_found)?;
1587        dist.config.web_acl_id = None;
1588        dist.etag = generate_etag();
1589        dist.last_modified_time = Utc::now();
1590        let body = format!(
1591            "{XML_DECL}<DisassociateDistributionWebACLResult xmlns=\"{NS}\"><Id>{}</Id></DisassociateDistributionWebACLResult>",
1592            esc(id),
1593            NS = crate::NAMESPACE,
1594            XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1595        );
1596        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1597    }
1598}
1599
1600#[derive(serde::Deserialize, Default, Debug)]
1601#[serde(rename_all = "PascalCase")]
1602struct AssociateAliasRequest {
1603    #[serde(rename = "WebACLArn", default)]
1604    web_acl_arn: String,
1605}
1606
1607// ─── XML body builders ────────────────────────────────────────────────
1608
1609/// XML-escape a user-provided string before injecting it into hand-rolled
1610/// XML response bodies. The 5 standard XML metacharacters are escaped;
1611/// everything else passes through unchanged. Keep this in sync with
1612/// `quick_xml`'s entity table — using their own primitive would mean a
1613/// `Writer` per call and we serialize directly into a `String`.
1614pub(crate) fn esc(s: &str) -> String {
1615    let mut out = String::with_capacity(s.len());
1616    for c in s.chars() {
1617        match c {
1618            '&' => out.push_str("&amp;"),
1619            '<' => out.push_str("&lt;"),
1620            '>' => out.push_str("&gt;"),
1621            '"' => out.push_str("&quot;"),
1622            '\'' => out.push_str("&apos;"),
1623            _ => out.push(c),
1624        }
1625    }
1626    out
1627}
1628
1629pub(crate) fn build_distribution_xml(dist: &StoredDistribution) -> String {
1630    let mut out = String::with_capacity(2048);
1631    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1632    out.push_str(&format!(
1633        "<Distribution xmlns=\"{ns}\">",
1634        ns = crate::NAMESPACE
1635    ));
1636    out.push_str(&format!("<Id>{}</Id>", esc(&dist.id)));
1637    out.push_str(&format!("<ARN>{}</ARN>", esc(&dist.arn)));
1638    out.push_str(&format!("<Status>{}</Status>", esc(&dist.status)));
1639    out.push_str(&format!(
1640        "<LastModifiedTime>{}</LastModifiedTime>",
1641        rfc3339(&dist.last_modified_time)
1642    ));
1643    out.push_str(&format!(
1644        "<InProgressInvalidationBatches>{}</InProgressInvalidationBatches>",
1645        dist.in_progress_invalidation_batches
1646    ));
1647    out.push_str(&format!(
1648        "<DomainName>{}</DomainName>",
1649        esc(&dist.domain_name)
1650    ));
1651    out.push_str("<ActiveTrustedSigners><Enabled>false</Enabled><Quantity>0</Quantity></ActiveTrustedSigners>");
1652    out.push_str("<ActiveTrustedKeyGroups><Enabled>false</Enabled><Quantity>0</Quantity></ActiveTrustedKeyGroups>");
1653    let inner = quick_xml::se::to_string_with_root("DistributionConfig", &dist.config)
1654        .unwrap_or_else(|_| String::new());
1655    out.push_str(&inner);
1656    out.push_str("</Distribution>");
1657    out
1658}
1659
1660fn build_distribution_list_xml(
1661    dists: &[StoredDistribution],
1662    root: &str,
1663    marker: &str,
1664    max_items: usize,
1665    next_marker: Option<&str>,
1666) -> String {
1667    let mut out = String::with_capacity(2048);
1668    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1669    out.push_str(&format!("<{root} xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1670    out.push_str(&format!("<Marker>{}</Marker>", esc(marker)));
1671    if let Some(nm) = next_marker {
1672        out.push_str(&format!("<NextMarker>{}</NextMarker>", esc(nm)));
1673    }
1674    out.push_str(&format!("<MaxItems>{max_items}</MaxItems>"));
1675    out.push_str(&format!(
1676        "<IsTruncated>{}</IsTruncated>",
1677        next_marker.is_some()
1678    ));
1679    out.push_str(&format!("<Quantity>{}</Quantity>", dists.len()));
1680    if dists.is_empty() {
1681        out.push_str(&format!("</{root}>"));
1682        return out;
1683    }
1684    out.push_str("<Items>");
1685    for d in dists {
1686        out.push_str("<DistributionSummary>");
1687        out.push_str(&format!("<Id>{}</Id>", esc(&d.id)));
1688        out.push_str(&format!("<ARN>{}</ARN>", esc(&d.arn)));
1689        out.push_str(&format!("<Status>{}</Status>", esc(&d.status)));
1690        out.push_str(&format!(
1691            "<LastModifiedTime>{}</LastModifiedTime>",
1692            rfc3339(&d.last_modified_time)
1693        ));
1694        out.push_str(&format!("<DomainName>{}</DomainName>", esc(&d.domain_name)));
1695        let aliases = d.config.aliases.clone().unwrap_or_default();
1696        out.push_str(&render_inline("Aliases", &aliases));
1697        let origins = d.config.origins.clone();
1698        out.push_str(&render_inline("Origins", &origins));
1699        let dcb = d.config.default_cache_behavior.clone();
1700        out.push_str(&render_inline("DefaultCacheBehavior", &dcb));
1701        let cb = d.config.cache_behaviors.clone().unwrap_or_default();
1702        out.push_str(&render_inline("CacheBehaviors", &cb));
1703        let cer = d.config.custom_error_responses.clone().unwrap_or_default();
1704        out.push_str(&render_inline("CustomErrorResponses", &cer));
1705        out.push_str(&format!("<Comment>{}</Comment>", esc(&d.config.comment)));
1706        out.push_str(&format!(
1707            "<PriceClass>{}</PriceClass>",
1708            esc(&d
1709                .config
1710                .price_class
1711                .clone()
1712                .unwrap_or_else(|| "PriceClass_All".to_string()))
1713        ));
1714        out.push_str(&format!("<Enabled>{}</Enabled>", d.config.enabled));
1715        out.push_str(&render_inline(
1716            "ViewerCertificate",
1717            &d.config.viewer_certificate.clone().unwrap_or_default(),
1718        ));
1719        out.push_str(&render_inline(
1720            "Restrictions",
1721            &d.config.restrictions.clone().unwrap_or_default(),
1722        ));
1723        out.push_str(&format!(
1724            "<WebACLId>{}</WebACLId>",
1725            esc(&d.config.web_acl_id.clone().unwrap_or_default())
1726        ));
1727        out.push_str(&format!(
1728            "<HttpVersion>{}</HttpVersion>",
1729            esc(&d
1730                .config
1731                .http_version
1732                .clone()
1733                .unwrap_or_else(|| "http2".to_string()))
1734        ));
1735        out.push_str(&format!(
1736            "<IsIPV6Enabled>{}</IsIPV6Enabled>",
1737            d.config.is_ipv6_enabled.unwrap_or(true)
1738        ));
1739        out.push_str(&format!(
1740            "<Staging>{}</Staging>",
1741            d.config.staging.unwrap_or(false)
1742        ));
1743        out.push_str("</DistributionSummary>");
1744    }
1745    out.push_str("</Items>");
1746    out.push_str(&format!("</{root}>"));
1747    out
1748}
1749
1750fn build_empty_distribution_id_list(root: &str) -> String {
1751    let mut out = String::with_capacity(256);
1752    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1753    out.push_str(&format!("<{root} xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1754    out.push_str("<Marker></Marker>");
1755    out.push_str("<MaxItems>100</MaxItems>");
1756    out.push_str("<IsTruncated>false</IsTruncated>");
1757    out.push_str("<Quantity>0</Quantity>");
1758    out.push_str(&format!("</{root}>"));
1759    out
1760}
1761
1762fn build_invalidation_xml(inv: &StoredInvalidation) -> String {
1763    let mut out = String::with_capacity(512);
1764    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1765    out.push_str(&format!(
1766        "<Invalidation xmlns=\"{ns}\">",
1767        ns = crate::NAMESPACE
1768    ));
1769    out.push_str(&format!("<Id>{}</Id>", esc(&inv.id)));
1770    out.push_str(&format!("<Status>{}</Status>", esc(&inv.status)));
1771    out.push_str(&format!(
1772        "<CreateTime>{}</CreateTime>",
1773        rfc3339(&inv.create_time)
1774    ));
1775    out.push_str(&render_inline("InvalidationBatch", &inv.batch));
1776    out.push_str("</Invalidation>");
1777    out
1778}
1779
1780fn build_invalidation_list_xml(items: &[&StoredInvalidation]) -> String {
1781    let mut out = String::with_capacity(1024);
1782    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1783    out.push_str(&format!(
1784        "<InvalidationList xmlns=\"{ns}\">",
1785        ns = crate::NAMESPACE
1786    ));
1787    out.push_str("<Marker></Marker>");
1788    out.push_str("<MaxItems>100</MaxItems>");
1789    out.push_str("<IsTruncated>false</IsTruncated>");
1790    out.push_str(&format!("<Quantity>{}</Quantity>", items.len()));
1791    if !items.is_empty() {
1792        out.push_str("<Items>");
1793        for inv in items {
1794            out.push_str("<InvalidationSummary>");
1795            out.push_str(&format!("<Id>{}</Id>", esc(&inv.id)));
1796            out.push_str(&format!(
1797                "<CreateTime>{}</CreateTime>",
1798                rfc3339(&inv.create_time)
1799            ));
1800            out.push_str(&format!("<Status>{}</Status>", esc(&inv.status)));
1801            out.push_str("</InvalidationSummary>");
1802        }
1803        out.push_str("</Items>");
1804    }
1805    out.push_str("</InvalidationList>");
1806    out
1807}
1808
1809fn build_tags_xml(tags: &[Tag]) -> String {
1810    let mut out = String::with_capacity(256);
1811    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1812    out.push_str(&format!("<Tags xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1813    out.push_str("<Items>");
1814    for t in tags {
1815        out.push_str("<Tag>");
1816        out.push_str(&format!("<Key>{}</Key>", esc(&t.key)));
1817        if let Some(v) = &t.value {
1818            out.push_str(&format!("<Value>{}</Value>", esc(v)));
1819        }
1820        out.push_str("</Tag>");
1821    }
1822    out.push_str("</Items>");
1823    out.push_str("</Tags>");
1824    out
1825}
1826
1827fn render_inline<T: serde::Serialize>(root: &str, value: &T) -> String {
1828    quick_xml::se::to_string_with_root(root, value).unwrap_or_default()
1829}
1830
1831// ─── Helpers ──────────────────────────────────────────────────────────
1832
1833fn validate_caller_reference(s: &str) -> Result<(), AwsServiceError> {
1834    if s.is_empty() {
1835        return Err(invalid_argument("CallerReference is required"));
1836    }
1837    Ok(())
1838}
1839
1840fn validate_origins(config: &DistributionConfig) -> Result<(), AwsServiceError> {
1841    if config.origins.quantity < 1 {
1842        return Err(invalid_argument(
1843            "DistributionConfig.Origins must contain at least one origin",
1844        ));
1845    }
1846    Ok(())
1847}
1848
1849/// Compare two `DistributionConfig`s by serializing them to canonical XML
1850/// and comparing bytes. Used by `UpdateDistribution` to detect no-op writes
1851/// so the ETag stays stable when the caller PUTs the same config back.
1852/// Falls back to "not equal" if either serialization fails so we still
1853/// honor the request rather than silently swallow a write.
1854fn configs_equal(lhs: &DistributionConfig, rhs: &DistributionConfig) -> bool {
1855    let Ok(a) = xml_io::to_xml_root("DistributionConfig", lhs) else {
1856        return false;
1857    };
1858    let Ok(b) = xml_io::to_xml_root("DistributionConfig", rhs) else {
1859        return false;
1860    };
1861    a == b
1862}
1863
1864fn account_id(_req: &AwsRequest) -> &'static str {
1865    // Multi-account is wired through AwsRequest.account_id elsewhere; the
1866    // CloudFront control plane only uses the resolved id for the ARN
1867    // suffix. Until that field stabilizes for REST-XML we use the default
1868    // account ID consistently with the rest of the registered services.
1869    DEFAULT_ACCOUNT
1870}
1871
1872fn generate_distribution_id() -> String {
1873    // CloudFront IDs are 14-char base32-ish uppercase strings starting with E.
1874    let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1875    format!("E{}", &raw[..13])
1876}
1877
1878fn generate_invalidation_id() -> String {
1879    let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1880    format!("I{}", &raw[..13])
1881}
1882
1883pub(crate) fn generate_etag() -> String {
1884    let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1885    format!("E{}", &raw[..13])
1886}
1887
1888/// Generate an AWS-shaped CloudFront resource ID with the given prefix.
1889/// Used by Batch 2 policy resources (cache, origin request, response
1890/// headers, continuous deployment, OAC) so each gets a recognizable
1891/// alphabetic prefix in addition to the random suffix.
1892pub(crate) fn generate_id_with_prefix(prefix: &str) -> String {
1893    let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1894    let suffix_len = 14usize.saturating_sub(prefix.len()).min(raw.len());
1895    format!("{prefix}{}", &raw[..suffix_len])
1896}
1897
1898fn rfc3339(t: &chrono::DateTime<chrono::Utc>) -> String {
1899    t.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
1900}
1901
1902pub(crate) fn invalid_argument(msg: impl Into<String>) -> AwsServiceError {
1903    aws_error(StatusCode::BAD_REQUEST, "InvalidArgument", msg)
1904}
1905
1906fn no_such_distribution(id: &str) -> AwsServiceError {
1907    aws_error(
1908        StatusCode::NOT_FOUND,
1909        "NoSuchDistribution",
1910        format!("The specified distribution does not exist: {id}"),
1911    )
1912}
1913
1914fn no_such_invalidation(id: &str) -> AwsServiceError {
1915    aws_error(
1916        StatusCode::NOT_FOUND,
1917        "NoSuchInvalidation",
1918        format!("The specified invalidation does not exist: {id}"),
1919    )
1920}
1921
1922fn internal_error(msg: impl Into<String>) -> AwsServiceError {
1923    aws_error(StatusCode::INTERNAL_SERVER_ERROR, "InternalError", msg)
1924}
1925
1926pub(crate) fn aws_error(
1927    status: StatusCode,
1928    code: impl Into<String>,
1929    msg: impl Into<String>,
1930) -> AwsServiceError {
1931    AwsServiceError::aws_error(status, code.into(), msg)
1932}
1933
1934fn set_header(headers: &mut HeaderMap, name: HeaderName, value: &str) {
1935    if let Ok(v) = HeaderValue::from_str(value) {
1936        headers.insert(name, v);
1937    }
1938}
1939
1940pub(crate) fn xml_response(status: StatusCode, body: String, headers: HeaderMap) -> AwsResponse {
1941    AwsResponse {
1942        status,
1943        content_type: "text/xml".to_string(),
1944        body: ResponseBody::Bytes(Bytes::from(body)),
1945        headers,
1946    }
1947}
1948
1949fn empty_response(status: StatusCode) -> AwsResponse {
1950    AwsResponse {
1951        status,
1952        content_type: "text/xml".to_string(),
1953        body: ResponseBody::Bytes(Bytes::new()),
1954        headers: HeaderMap::new(),
1955    }
1956}
1957
1958fn percent_decode(input: &str) -> String {
1959    let mut out = String::with_capacity(input.len());
1960    let bytes = input.as_bytes();
1961    let mut i = 0;
1962    while i < bytes.len() {
1963        let b = bytes[i];
1964        if b == b'%' && i + 2 < bytes.len() {
1965            if let (Some(a), Some(c)) = (hex_digit(bytes[i + 1]), hex_digit(bytes[i + 2])) {
1966                out.push(((a << 4) | c) as char);
1967                i += 3;
1968                continue;
1969            }
1970        }
1971        if b == b'+' {
1972            out.push(' ');
1973        } else {
1974            out.push(b as char);
1975        }
1976        i += 1;
1977    }
1978    out
1979}
1980
1981fn hex_digit(b: u8) -> Option<u8> {
1982    match b {
1983        b'0'..=b'9' => Some(b - b'0'),
1984        b'a'..=b'f' => Some(b - b'a' + 10),
1985        b'A'..=b'F' => Some(b - b'A' + 10),
1986        _ => None,
1987    }
1988}
1989
1990/// True when a URL-decoded label segment looks like an unsubstituted Smithy
1991/// URI placeholder (e.g. `{Identifier}` or its percent-encoded form
1992/// `%7BIdentifier%7D`). Conformance probes that omit a required `@httpLabel`
1993/// input leave the literal placeholder in the URL; handlers that can
1994/// short-circuit on it return the declared validation error instead of a
1995/// fabricated 200.
1996pub(crate) fn is_placeholder_label(value: &str) -> bool {
1997    if value.is_empty() {
1998        return true;
1999    }
2000    let lower = value.to_ascii_lowercase();
2001    value.starts_with('{') || lower.starts_with("%7b")
2002}
2003
2004/// Best-effort field extractor for request bodies the probe sends as JSON
2005/// even when the service is REST-XML. Walks the body trying JSON first,
2006/// then a naive XML element scan. Returns the value for the first occurrence
2007/// of `key`. Used by handlers that need to validate optional/required body
2008/// members up-front (enum bounds, length, presence) without committing to a
2009/// full strongly-typed parse — the actual handler still uses the typed XML
2010/// parser for the structured fields it consumes.
2011pub(crate) fn extract_body_field(body: &[u8], key: &str) -> Option<String> {
2012    if let Ok(s) = std::str::from_utf8(body) {
2013        let trimmed = s.trim_start();
2014        if trimmed.starts_with('{') {
2015            if let Ok(v) = serde_json::from_str::<serde_json::Value>(trimmed) {
2016                if let Some(field) = v.get(key) {
2017                    return match field {
2018                        serde_json::Value::String(s) => Some(s.clone()),
2019                        serde_json::Value::Number(n) => Some(n.to_string()),
2020                        serde_json::Value::Bool(b) => Some(b.to_string()),
2021                        _ => None,
2022                    };
2023                }
2024                return None;
2025            }
2026        }
2027        // Fall back to a naive XML extraction for genuine XML bodies.
2028        let open = format!("<{key}>");
2029        let close = format!("</{key}>");
2030        if let Some(start) = s.find(&open) {
2031            let after = start + open.len();
2032            if let Some(end_rel) = s[after..].find(&close) {
2033                return Some(s[after..after + end_rel].to_string());
2034            }
2035        }
2036    }
2037    None
2038}
2039
2040fn parse_query_value(query: &str, key: &str) -> Option<String> {
2041    let prefix = format!("{key}=");
2042    for pair in query.split('&').filter(|p| !p.is_empty()) {
2043        if let Some(rest) = pair.strip_prefix(&prefix) {
2044            return Some(percent_decode(rest));
2045        }
2046    }
2047    None
2048}
2049
2050#[cfg(test)]
2051mod tests {
2052    use super::*;
2053
2054    #[test]
2055    fn distribution_list_xml_renders_pagination_fields() {
2056        // Truncated page: echoes the request marker, the requested MaxItems,
2057        // IsTruncated=true, and the NextMarker cursor.
2058        let xml =
2059            build_distribution_list_xml(&[], "DistributionList", "EDFDVBD6", 2, Some("E2NEXT"));
2060        assert!(xml.contains("<Marker>EDFDVBD6</Marker>"));
2061        assert!(xml.contains("<MaxItems>2</MaxItems>"));
2062        assert!(xml.contains("<IsTruncated>true</IsTruncated>"));
2063        assert!(xml.contains("<NextMarker>E2NEXT</NextMarker>"));
2064        // Final page: no NextMarker, IsTruncated=false.
2065        let xml = build_distribution_list_xml(&[], "DistributionList", "", 100, None);
2066        assert!(xml.contains("<IsTruncated>false</IsTruncated>"));
2067        assert!(!xml.contains("<NextMarker>"));
2068    }
2069
2070    #[test]
2071    fn placeholder_label_detects_braces_and_percent_encoding() {
2072        assert!(is_placeholder_label(""));
2073        assert!(is_placeholder_label("{Identifier}"));
2074        assert!(is_placeholder_label("%7BIdentifier%7D"));
2075        assert!(is_placeholder_label("%7bidentifier%7d"));
2076        assert!(!is_placeholder_label("E1234567890ABC"));
2077        assert!(!is_placeholder_label(
2078            "arn:aws:cloudfront::000:distribution/E1"
2079        ));
2080    }
2081
2082    #[test]
2083    fn extract_body_field_handles_json_and_xml() {
2084        let json = br#"{"Stage":"BROKEN","Marker":"x"}"#;
2085        assert_eq!(
2086            extract_body_field(json, "Stage"),
2087            Some("BROKEN".to_string())
2088        );
2089        assert_eq!(extract_body_field(json, "MaxItems"), None);
2090
2091        let xml = br#"<?xml version="1.0"?><Body><Domain>example.com</Domain></Body>"#;
2092        assert_eq!(
2093            extract_body_field(xml, "Domain"),
2094            Some("example.com".to_string())
2095        );
2096        assert_eq!(extract_body_field(xml, "Missing"), None);
2097
2098        assert_eq!(extract_body_field(b"", "x"), None);
2099    }
2100
2101    fn make_state() -> SharedCloudFrontState {
2102        Arc::new(RwLock::new(CloudFrontAccounts::new()))
2103    }
2104
2105    fn make_request(method: http::Method, path: &str, query: &str, body: &str) -> AwsRequest {
2106        AwsRequest {
2107            service: "cloudfront".into(),
2108            action: String::new(),
2109            region: "us-east-1".into(),
2110            account_id: DEFAULT_ACCOUNT.into(),
2111            request_id: Uuid::new_v4().to_string(),
2112            headers: HeaderMap::new(),
2113            query_params: std::collections::HashMap::new(),
2114            body_stream: parking_lot::Mutex::new(None),
2115            body: Bytes::from(body.to_string()),
2116            path_segments: path
2117                .split('/')
2118                .filter(|s| !s.is_empty())
2119                .map(String::from)
2120                .collect(),
2121            raw_path: path.into(),
2122            raw_query: query.into(),
2123            method,
2124            is_query_protocol: false,
2125            access_key_id: None,
2126            principal: None,
2127        }
2128    }
2129
2130    fn minimal_dist_config_xml(caller_ref: &str) -> String {
2131        format!(
2132            r#"<?xml version="1.0" encoding="UTF-8"?>
2133<DistributionConfig xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2134  <CallerReference>{caller_ref}</CallerReference>
2135  <Origins>
2136    <Quantity>1</Quantity>
2137    <Items>
2138      <Origin>
2139        <Id>primary</Id>
2140        <DomainName>example.com</DomainName>
2141      </Origin>
2142    </Items>
2143  </Origins>
2144  <DefaultCacheBehavior>
2145    <TargetOriginId>primary</TargetOriginId>
2146    <ViewerProtocolPolicy>allow-all</ViewerProtocolPolicy>
2147  </DefaultCacheBehavior>
2148  <Comment></Comment>
2149  <Enabled>true</Enabled>
2150</DistributionConfig>"#
2151        )
2152    }
2153
2154    #[tokio::test]
2155    async fn create_then_get_then_delete_distribution() {
2156        let svc = CloudFrontService::new(make_state());
2157        let body = minimal_dist_config_xml("ref-1");
2158        let create = svc
2159            .handle(make_request(
2160                http::Method::POST,
2161                "/2020-05-31/distribution",
2162                "",
2163                &body,
2164            ))
2165            .await
2166            .unwrap();
2167        assert_eq!(create.status, StatusCode::CREATED);
2168        let etag = create
2169            .headers
2170            .get(ETAG)
2171            .unwrap()
2172            .to_str()
2173            .unwrap()
2174            .to_string();
2175        let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2176        let id = xml
2177            .split("<Id>")
2178            .nth(1)
2179            .unwrap()
2180            .split("</Id>")
2181            .next()
2182            .unwrap()
2183            .to_string();
2184
2185        let get = svc
2186            .handle(make_request(
2187                http::Method::GET,
2188                &format!("/2020-05-31/distribution/{id}"),
2189                "",
2190                "",
2191            ))
2192            .await
2193            .unwrap();
2194        assert_eq!(get.status, StatusCode::OK);
2195
2196        // Disable then delete (CloudFront requires Disabled before delete).
2197        let disable_body = body.replace("<Enabled>true</Enabled>", "<Enabled>false</Enabled>");
2198        let mut update_req = make_request(
2199            http::Method::PUT,
2200            &format!("/2020-05-31/distribution/{id}/config"),
2201            "",
2202            &disable_body,
2203        );
2204        update_req.headers.insert(IF_MATCH, etag.parse().unwrap());
2205        let updated = svc.handle(update_req).await.unwrap();
2206        assert_eq!(updated.status, StatusCode::OK);
2207        let new_etag = updated
2208            .headers
2209            .get(ETAG)
2210            .unwrap()
2211            .to_str()
2212            .unwrap()
2213            .to_string();
2214
2215        let mut del_req = make_request(
2216            http::Method::DELETE,
2217            &format!("/2020-05-31/distribution/{id}"),
2218            "",
2219            "",
2220        );
2221        del_req.headers.insert(IF_MATCH, new_etag.parse().unwrap());
2222        let del = svc.handle(del_req).await.unwrap();
2223        assert_eq!(del.status, StatusCode::NO_CONTENT);
2224    }
2225
2226    async fn create_distribution_returning_id(svc: &CloudFrontService, caller_ref: &str) -> String {
2227        let body = minimal_dist_config_xml(caller_ref);
2228        let create = svc
2229            .handle(make_request(
2230                http::Method::POST,
2231                "/2020-05-31/distribution",
2232                "",
2233                &body,
2234            ))
2235            .await
2236            .unwrap();
2237        let xml = std::str::from_utf8(create.body.expect_bytes())
2238            .unwrap()
2239            .to_string();
2240        xml.split("<Id>")
2241            .nth(1)
2242            .unwrap()
2243            .split("</Id>")
2244            .next()
2245            .unwrap()
2246            .to_string()
2247    }
2248
2249    fn distribution_status(svc: &CloudFrontService, id: &str) -> String {
2250        let state = svc.state.read();
2251        state
2252            .accounts
2253            .get(DEFAULT_ACCOUNT)
2254            .and_then(|a| a.distributions.get(id))
2255            .map(|d| d.status.clone())
2256            .unwrap_or_default()
2257    }
2258
2259    #[tokio::test]
2260    async fn create_distribution_starts_in_progress() {
2261        // Use a long delay so the auto-tick can't race the assertion.
2262        let svc = CloudFrontService::new(make_state())
2263            .with_propagation_delay(std::time::Duration::from_secs(60));
2264        let body = minimal_dist_config_xml("status-ref");
2265        let create = svc
2266            .handle(make_request(
2267                http::Method::POST,
2268                "/2020-05-31/distribution",
2269                "",
2270                &body,
2271            ))
2272            .await
2273            .unwrap();
2274        let xml = std::str::from_utf8(create.body.expect_bytes())
2275            .unwrap()
2276            .to_string();
2277        assert!(
2278            xml.contains("<Status>InProgress</Status>"),
2279            "expected initial status InProgress, got: {xml}"
2280        );
2281    }
2282
2283    #[tokio::test]
2284    async fn auto_transition_after_tick_marks_deployed() {
2285        let svc = CloudFrontService::new(make_state())
2286            .with_propagation_delay(std::time::Duration::from_millis(50));
2287        let id = create_distribution_returning_id(&svc, "auto-tick-ref").await;
2288        assert_eq!(distribution_status(&svc, &id), "InProgress");
2289        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2290        assert_eq!(distribution_status(&svc, &id), "Deployed");
2291    }
2292
2293    #[tokio::test]
2294    async fn set_distribution_status_via_admin_flips_synchronously() {
2295        let svc = CloudFrontService::new(make_state())
2296            .with_propagation_delay(std::time::Duration::from_secs(60));
2297        let id = create_distribution_returning_id(&svc, "admin-ref").await;
2298        assert_eq!(distribution_status(&svc, &id), "InProgress");
2299        assert!(svc.set_distribution_status(&id, "Deployed"));
2300        assert_eq!(distribution_status(&svc, &id), "Deployed");
2301        assert!(svc.set_distribution_status(&id, "InProgress"));
2302        assert_eq!(distribution_status(&svc, &id), "InProgress");
2303    }
2304
2305    #[tokio::test]
2306    async fn set_distribution_status_unknown_id_returns_false() {
2307        let svc = CloudFrontService::new(make_state());
2308        assert!(!svc.set_distribution_status("E-DOES-NOT-EXIST", "Deployed"));
2309    }
2310
2311    #[tokio::test]
2312    async fn update_distribution_resets_to_in_progress() {
2313        let svc = CloudFrontService::new(make_state())
2314            .with_propagation_delay(std::time::Duration::from_secs(60));
2315        let body = minimal_dist_config_xml("update-reset-ref");
2316        let create = svc
2317            .handle(make_request(
2318                http::Method::POST,
2319                "/2020-05-31/distribution",
2320                "",
2321                &body,
2322            ))
2323            .await
2324            .unwrap();
2325        let etag = create
2326            .headers
2327            .get(ETAG)
2328            .unwrap()
2329            .to_str()
2330            .unwrap()
2331            .to_string();
2332        let xml = std::str::from_utf8(create.body.expect_bytes())
2333            .unwrap()
2334            .to_string();
2335        let id = xml
2336            .split("<Id>")
2337            .nth(1)
2338            .unwrap()
2339            .split("</Id>")
2340            .next()
2341            .unwrap()
2342            .to_string();
2343        // Force the distribution to Deployed via the admin mutator so we can
2344        // observe the UpdateDistribution flip back to InProgress.
2345        assert!(svc.set_distribution_status(&id, "Deployed"));
2346        assert_eq!(distribution_status(&svc, &id), "Deployed");
2347
2348        let updated_body = body.replace(
2349            "<ViewerProtocolPolicy>allow-all</ViewerProtocolPolicy>",
2350            "<ViewerProtocolPolicy>https-only</ViewerProtocolPolicy>",
2351        );
2352        let mut update_req = make_request(
2353            http::Method::PUT,
2354            &format!("/2020-05-31/distribution/{id}/config"),
2355            "",
2356            &updated_body,
2357        );
2358        update_req.headers.insert(IF_MATCH, etag.parse().unwrap());
2359        let updated = svc.handle(update_req).await.unwrap();
2360        assert_eq!(updated.status, StatusCode::OK);
2361        assert_eq!(distribution_status(&svc, &id), "InProgress");
2362    }
2363
2364    #[tokio::test]
2365    async fn duplicate_caller_reference_is_rejected() {
2366        let svc = CloudFrontService::new(make_state());
2367        let body = minimal_dist_config_xml("dup-ref");
2368        svc.handle(make_request(
2369            http::Method::POST,
2370            "/2020-05-31/distribution",
2371            "",
2372            &body,
2373        ))
2374        .await
2375        .unwrap();
2376        let result = svc
2377            .handle(make_request(
2378                http::Method::POST,
2379                "/2020-05-31/distribution",
2380                "",
2381                &body,
2382            ))
2383            .await;
2384        let err = match result {
2385            Ok(_) => panic!("expected duplicate caller-reference to fail"),
2386            Err(e) => e,
2387        };
2388        assert_eq!(err.code(), "DistributionAlreadyExists");
2389        assert_eq!(err.status(), StatusCode::CONFLICT);
2390    }
2391
2392    #[tokio::test]
2393    async fn invalidation_lifecycle() {
2394        let svc = CloudFrontService::new(make_state());
2395        let body = minimal_dist_config_xml("inv-ref");
2396        let create = svc
2397            .handle(make_request(
2398                http::Method::POST,
2399                "/2020-05-31/distribution",
2400                "",
2401                &body,
2402            ))
2403            .await
2404            .unwrap();
2405        let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2406        let dist_id = xml
2407            .split("<Id>")
2408            .nth(1)
2409            .unwrap()
2410            .split("</Id>")
2411            .next()
2412            .unwrap();
2413        let inv_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2414<InvalidationBatch xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2415  <Paths><Quantity>1</Quantity><Items><Path>/*</Path></Items></Paths>
2416  <CallerReference>inv-1</CallerReference>
2417</InvalidationBatch>"#;
2418        let inv_resp = svc
2419            .handle(make_request(
2420                http::Method::POST,
2421                &format!("/2020-05-31/distribution/{dist_id}/invalidation"),
2422                "",
2423                inv_body,
2424            ))
2425            .await
2426            .unwrap();
2427        assert_eq!(inv_resp.status, StatusCode::CREATED);
2428        let inv_xml = std::str::from_utf8(inv_resp.body.expect_bytes()).unwrap();
2429        let inv_id = inv_xml
2430            .split("<Id>")
2431            .nth(1)
2432            .unwrap()
2433            .split("</Id>")
2434            .next()
2435            .unwrap();
2436        let get = svc
2437            .handle(make_request(
2438                http::Method::GET,
2439                &format!("/2020-05-31/distribution/{dist_id}/invalidation/{inv_id}"),
2440                "",
2441                "",
2442            ))
2443            .await
2444            .unwrap();
2445        assert_eq!(get.status, StatusCode::OK);
2446        let list = svc
2447            .handle(make_request(
2448                http::Method::GET,
2449                &format!("/2020-05-31/distribution/{dist_id}/invalidation"),
2450                "",
2451                "",
2452            ))
2453            .await
2454            .unwrap();
2455        let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2456        assert!(xml.contains("<Quantity>1</Quantity>"));
2457    }
2458
2459    #[tokio::test]
2460    async fn tags_roundtrip() {
2461        let svc = CloudFrontService::new(make_state());
2462        let body = minimal_dist_config_xml("tag-ref");
2463        let create = svc
2464            .handle(make_request(
2465                http::Method::POST,
2466                "/2020-05-31/distribution",
2467                "",
2468                &body,
2469            ))
2470            .await
2471            .unwrap();
2472        let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2473        let arn = xml
2474            .split("<ARN>")
2475            .nth(1)
2476            .unwrap()
2477            .split("</ARN>")
2478            .next()
2479            .unwrap();
2480        let tag_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2481<Tags xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2482  <Items><Tag><Key>env</Key><Value>prod</Value></Tag></Items>
2483</Tags>"#;
2484        let arn_q = format!("Operation=Tag&Resource={}", arn);
2485        let resp = svc
2486            .handle(make_request(
2487                http::Method::POST,
2488                "/2020-05-31/tagging",
2489                &arn_q,
2490                tag_body,
2491            ))
2492            .await
2493            .unwrap();
2494        assert_eq!(resp.status, StatusCode::NO_CONTENT);
2495        let list = svc
2496            .handle(make_request(
2497                http::Method::GET,
2498                "/2020-05-31/tagging",
2499                &format!("Resource={}", arn),
2500                "",
2501            ))
2502            .await
2503            .unwrap();
2504        let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2505        assert!(xml.contains("<Key>env</Key>"));
2506        assert!(xml.contains("<Value>prod</Value>"));
2507    }
2508
2509    #[tokio::test]
2510    async fn xml_metacharacters_in_user_input_are_escaped() {
2511        let svc = CloudFrontService::new(make_state());
2512        let body = minimal_dist_config_xml("escape-ref").replace(
2513            "<Comment></Comment>",
2514            "<Comment><![CDATA[a&b<c>d]]></Comment>",
2515        );
2516        let create = svc
2517            .handle(make_request(
2518                http::Method::POST,
2519                "/2020-05-31/distribution",
2520                "",
2521                &body,
2522            ))
2523            .await
2524            .unwrap();
2525        let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2526        let dist_id = xml
2527            .split("<Id>")
2528            .nth(1)
2529            .unwrap()
2530            .split("</Id>")
2531            .next()
2532            .unwrap();
2533        let arn = xml
2534            .split("<ARN>")
2535            .nth(1)
2536            .unwrap()
2537            .split("</ARN>")
2538            .next()
2539            .unwrap();
2540
2541        let tag_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2542<Tags xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2543  <Items><Tag><Key>env</Key><Value>a&amp;b&lt;c&gt;d</Value></Tag></Items>
2544</Tags>"#;
2545        let arn_q = format!("Operation=Tag&Resource={}", arn);
2546        svc.handle(make_request(
2547            http::Method::POST,
2548            "/2020-05-31/tagging",
2549            &arn_q,
2550            tag_body,
2551        ))
2552        .await
2553        .unwrap();
2554
2555        let list = svc
2556            .handle(make_request(
2557                http::Method::GET,
2558                "/2020-05-31/tagging",
2559                &format!("Resource={}", arn),
2560                "",
2561            ))
2562            .await
2563            .unwrap();
2564        let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2565        assert!(xml.contains("<Value>a&amp;b&lt;c&gt;d</Value>"));
2566        assert!(!xml.contains("<Value>a&b<c>d</Value>"));
2567
2568        // Force the distribution into the list-rendering path so the
2569        // unescaped Comment field would surface there.
2570        let list_resp = svc
2571            .handle(make_request(
2572                http::Method::GET,
2573                "/2020-05-31/distribution",
2574                "",
2575                "",
2576            ))
2577            .await
2578            .unwrap();
2579        let xml = std::str::from_utf8(list_resp.body.expect_bytes()).unwrap();
2580        // Ensure raw `<` from the user-supplied comment never lands inside
2581        // a Comment element on the wire.
2582        assert!(!xml.contains("<Comment>a&b<c>d"));
2583        assert!(xml.contains(dist_id));
2584    }
2585}