Skip to main content

fakecloud_cloudfront/
service.rs

1//! CloudFront REST-XML service implementation.
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use chrono::Utc;
8use http::header::{HeaderName, HeaderValue, ETAG, IF_MATCH, LOCATION};
9use http::{HeaderMap, StatusCode};
10use parking_lot::RwLock;
11use tokio::sync::Mutex as AsyncMutex;
12use uuid::Uuid;
13
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError, ResponseBody};
15use fakecloud_persistence::SnapshotStore;
16
17use crate::model::{
18    DistributionConfig, DistributionConfigWithTags, InvalidationBatch, TagKeys, Tags as ModelTags,
19};
20use crate::router::{route, Route};
21use crate::state::{
22    CloudFrontAccounts, CloudFrontSnapshot, SharedCloudFrontState, StoredDistribution,
23    StoredInvalidation, Tag, CLOUDFRONT_SNAPSHOT_SCHEMA_VERSION,
24};
25use crate::xml_io;
26
27/// CloudFront mutating actions share these prefixes; everything else
28/// (`Get*`/`List*`/`Describe*`/`Test*`/`Verify*`) is read-only. Used to decide
29/// when a handled request must trigger a persistence snapshot.
30fn is_mutating_action(action: &str) -> bool {
31    const PREFIXES: &[&str] = &[
32        "Create",
33        "Update",
34        "Delete",
35        "Copy",
36        "Associate",
37        "Disassociate",
38        "Tag",
39        "Untag",
40        "Publish",
41        "Put",
42    ];
43    PREFIXES.iter().any(|p| action.starts_with(p))
44}
45
46pub(crate) const DEFAULT_ACCOUNT: &str = "000000000000";
47
48const SUPPORTED_ACTIONS: &[&str] = &[
49    "CreateDistribution",
50    "CreateDistributionWithTags",
51    "GetDistribution",
52    "GetDistributionConfig",
53    "UpdateDistribution",
54    "DeleteDistribution",
55    "ListDistributions",
56    "CopyDistribution",
57    "CreateInvalidation",
58    "GetInvalidation",
59    "ListInvalidations",
60    "TagResource",
61    "UntagResource",
62    "ListTagsForResource",
63    "AssociateAlias",
64    "ListConflictingAliases",
65    "ListDistributionsByCachePolicyId",
66    "ListDistributionsByOriginRequestPolicyId",
67    "ListDistributionsByResponseHeadersPolicyId",
68    "ListDistributionsByKeyGroup",
69    "ListDistributionsByWebACLId",
70    "ListDistributionsByVpcOriginId",
71    "ListDistributionsByAnycastIpListId",
72    "ListDistributionsByConnectionMode",
73    "ListDistributionsByConnectionFunction",
74    "ListDistributionsByOwnedResource",
75    "ListDistributionsByTrustStore",
76    "ListDistributionsByRealtimeLogConfig",
77    "AssociateDistributionWebACL",
78    "DisassociateDistributionWebACL",
79    "CreateOriginAccessControl",
80    "GetOriginAccessControl",
81    "GetOriginAccessControlConfig",
82    "UpdateOriginAccessControl",
83    "DeleteOriginAccessControl",
84    "ListOriginAccessControls",
85    "CreateCachePolicy",
86    "GetCachePolicy",
87    "GetCachePolicyConfig",
88    "UpdateCachePolicy",
89    "DeleteCachePolicy",
90    "ListCachePolicies",
91    "CreateOriginRequestPolicy",
92    "GetOriginRequestPolicy",
93    "GetOriginRequestPolicyConfig",
94    "UpdateOriginRequestPolicy",
95    "DeleteOriginRequestPolicy",
96    "ListOriginRequestPolicies",
97    "CreateResponseHeadersPolicy",
98    "GetResponseHeadersPolicy",
99    "GetResponseHeadersPolicyConfig",
100    "UpdateResponseHeadersPolicy",
101    "DeleteResponseHeadersPolicy",
102    "ListResponseHeadersPolicies",
103    "CreateContinuousDeploymentPolicy",
104    "GetContinuousDeploymentPolicy",
105    "GetContinuousDeploymentPolicyConfig",
106    "UpdateContinuousDeploymentPolicy",
107    "DeleteContinuousDeploymentPolicy",
108    "ListContinuousDeploymentPolicies",
109    "CreateFunction",
110    "DescribeFunction",
111    "GetFunction",
112    "UpdateFunction",
113    "DeleteFunction",
114    "ListFunctions",
115    "PublishFunction",
116    "TestFunction",
117    "CreatePublicKey",
118    "GetPublicKey",
119    "GetPublicKeyConfig",
120    "UpdatePublicKey",
121    "DeletePublicKey",
122    "ListPublicKeys",
123    "CreateKeyGroup",
124    "GetKeyGroup",
125    "GetKeyGroupConfig",
126    "UpdateKeyGroup",
127    "DeleteKeyGroup",
128    "ListKeyGroups",
129    "CreateKeyValueStore",
130    "DescribeKeyValueStore",
131    "UpdateKeyValueStore",
132    "DeleteKeyValueStore",
133    "ListKeyValueStores",
134    "CreateCloudFrontOriginAccessIdentity",
135    "GetCloudFrontOriginAccessIdentity",
136    "GetCloudFrontOriginAccessIdentityConfig",
137    "UpdateCloudFrontOriginAccessIdentity",
138    "DeleteCloudFrontOriginAccessIdentity",
139    "ListCloudFrontOriginAccessIdentities",
140    "CreateMonitoringSubscription",
141    "GetMonitoringSubscription",
142    "DeleteMonitoringSubscription",
143    "CreateStreamingDistribution",
144    "CreateStreamingDistributionWithTags",
145    "GetStreamingDistribution",
146    "GetStreamingDistributionConfig",
147    "UpdateStreamingDistribution",
148    "DeleteStreamingDistribution",
149    "ListStreamingDistributions",
150    "CreateFieldLevelEncryptionConfig",
151    "GetFieldLevelEncryption",
152    "GetFieldLevelEncryptionConfig",
153    "UpdateFieldLevelEncryptionConfig",
154    "DeleteFieldLevelEncryptionConfig",
155    "ListFieldLevelEncryptionConfigs",
156    "CreateFieldLevelEncryptionProfile",
157    "GetFieldLevelEncryptionProfile",
158    "GetFieldLevelEncryptionProfileConfig",
159    "UpdateFieldLevelEncryptionProfile",
160    "DeleteFieldLevelEncryptionProfile",
161    "ListFieldLevelEncryptionProfiles",
162    "CreateRealtimeLogConfig",
163    "GetRealtimeLogConfig",
164    "UpdateRealtimeLogConfig",
165    "DeleteRealtimeLogConfig",
166    "ListRealtimeLogConfigs",
167    "CreateVpcOrigin",
168    "GetVpcOrigin",
169    "UpdateVpcOrigin",
170    "DeleteVpcOrigin",
171    "ListVpcOrigins",
172    "CreateAnycastIpList",
173    "GetAnycastIpList",
174    "UpdateAnycastIpList",
175    "DeleteAnycastIpList",
176    "ListAnycastIpLists",
177    "CreateTrustStore",
178    "GetTrustStore",
179    "UpdateTrustStore",
180    "DeleteTrustStore",
181    "ListTrustStores",
182    "GetResourcePolicy",
183    "PutResourcePolicy",
184    "DeleteResourcePolicy",
185    "CreateConnectionGroup",
186    "GetConnectionGroup",
187    "GetConnectionGroupByRoutingEndpoint",
188    "UpdateConnectionGroup",
189    "DeleteConnectionGroup",
190    "ListConnectionGroups",
191    "ListDomainConflicts",
192    "UpdateDomainAssociation",
193    "VerifyDnsConfiguration",
194    "GetManagedCertificateDetails",
195    "UpdateDistributionWithStagingConfig",
196    "CreateDistributionTenant",
197    "GetDistributionTenant",
198    "GetDistributionTenantByDomain",
199    "UpdateDistributionTenant",
200    "DeleteDistributionTenant",
201    "ListDistributionTenants",
202    "ListDistributionTenantsByCustomization",
203    "AssociateDistributionTenantWebACL",
204    "DisassociateDistributionTenantWebACL",
205    "CreateInvalidationForDistributionTenant",
206    "GetInvalidationForDistributionTenant",
207    "ListInvalidationsForDistributionTenant",
208    "CreateConnectionFunction",
209    "GetConnectionFunction",
210    "DescribeConnectionFunction",
211    "UpdateConnectionFunction",
212    "DeleteConnectionFunction",
213    "ListConnectionFunctions",
214    "PublishConnectionFunction",
215    "TestConnectionFunction",
216];
217
218pub struct CloudFrontService {
219    pub(crate) state: SharedCloudFrontState,
220    /// How long the propagation tick sleeps before flipping a freshly
221    /// created or updated Distribution / DistributionTenant /
222    /// ConnectionGroup / StreamingDistribution from `InProgress` to
223    /// `Deployed`. Real CloudFront takes ~15 minutes; the default of 1s
224    /// keeps SDK-driven integration tests fast while still simulating the
225    /// async transition. Tests can shrink it via
226    /// [`CloudFrontService::with_propagation_delay`], and operators can
227    /// override the default via the
228    /// `FAKECLOUD_CLOUDFRONT_STATUS_DELAY_SEC` env var.
229    pub(crate) propagation_delay: std::time::Duration,
230    pub(crate) snapshot_store: Option<Arc<dyn SnapshotStore>>,
231    pub(crate) snapshot_lock: Arc<AsyncMutex<()>>,
232}
233
234/// Resolve the default `InProgress` -> `Deployed` delay from the
235/// `FAKECLOUD_CLOUDFRONT_STATUS_DELAY_SEC` env var, falling back to 1
236/// second. The value parses as either a non-negative integer (seconds) or
237/// a floating-point number; `0` keeps the transition synchronous, which is
238/// useful for fast unit tests. Invalid input falls back to 1 second so a
239/// typo in the env var can't accidentally hang the process.
240fn default_propagation_delay() -> std::time::Duration {
241    let Ok(raw) = std::env::var("FAKECLOUD_CLOUDFRONT_STATUS_DELAY_SEC") else {
242        return std::time::Duration::from_secs(1);
243    };
244    let trimmed = raw.trim();
245    if let Ok(secs) = trimmed.parse::<u64>() {
246        return std::time::Duration::from_secs(secs);
247    }
248    if let Ok(secs) = trimmed.parse::<f64>() {
249        if secs.is_finite() && secs >= 0.0 {
250            return std::time::Duration::from_secs_f64(secs);
251        }
252    }
253    std::time::Duration::from_secs(1)
254}
255
256impl CloudFrontService {
257    pub fn new(state: SharedCloudFrontState) -> Self {
258        Self {
259            state,
260            propagation_delay: default_propagation_delay(),
261            snapshot_store: None,
262            snapshot_lock: Arc::new(AsyncMutex::new(())),
263        }
264    }
265
266    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
267        self.snapshot_store = Some(store);
268        self
269    }
270
271    pub fn shared_state(&self) -> SharedCloudFrontState {
272        Arc::clone(&self.state)
273    }
274
275    /// Persist current state as a snapshot. Held across the
276    /// clone-serialize-write sequence to prevent stale-last writes, with serde
277    /// + file I/O offloaded to the blocking pool.
278    async fn save_snapshot(&self) {
279        save_cloudfront_snapshot(
280            &self.state,
281            self.snapshot_store.clone(),
282            &self.snapshot_lock,
283        )
284        .await;
285    }
286
287    /// Build a hook that persists the current CloudFront state when invoked, or
288    /// `None` in memory mode. The CloudFormation provisioner mutates `state`
289    /// directly and uses this to write a CFN-provisioned resource through to
290    /// disk, the same way a direct mutating API call would.
291    pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
292        let store = self.snapshot_store.clone()?;
293        let state = self.state.clone();
294        let lock = self.snapshot_lock.clone();
295        Some(Arc::new(move || {
296            let state = state.clone();
297            let store = store.clone();
298            let lock = lock.clone();
299            Box::pin(async move {
300                save_cloudfront_snapshot(&state, Some(store), &lock).await;
301            })
302        }))
303    }
304
305    /// Re-arm the propagation tick for any Distribution / DistributionTenant /
306    /// ConnectionGroup / StreamingDistribution that was still `InProgress` when
307    /// the previous process exited, so they still transition to `Deployed`
308    /// after a restart. Called by the server after loading a snapshot.
309    pub fn rearm_in_progress(&self) {
310        let (dists, tenants, groups, streaming) = {
311            let s = self.state.read();
312            let mut dists = Vec::new();
313            let mut tenants = Vec::new();
314            let mut groups = Vec::new();
315            let mut streaming = Vec::new();
316            for account in s.accounts.values() {
317                for (id, d) in &account.distributions {
318                    if d.status == "InProgress" {
319                        dists.push(id.clone());
320                    }
321                }
322                for (id, t) in &account.distribution_tenants {
323                    if t.status == "InProgress" {
324                        tenants.push(id.clone());
325                    }
326                }
327                for (id, g) in &account.connection_groups {
328                    if g.status == "InProgress" {
329                        groups.push(id.clone());
330                    }
331                }
332                for (id, d) in &account.streaming_distributions {
333                    if d.status == "InProgress" {
334                        streaming.push(id.clone());
335                    }
336                }
337            }
338            (dists, tenants, groups, streaming)
339        };
340        for id in dists {
341            self.schedule_distribution_deploy(id);
342        }
343        for id in tenants {
344            self.schedule_distribution_tenant_deploy(id);
345        }
346        for id in groups {
347            self.schedule_connection_group_deploy(id);
348        }
349        for id in streaming {
350            self.schedule_streaming_distribution_deploy(id);
351        }
352    }
353
354    /// Override the propagation delay used by `CreateDistribution`,
355    /// `UpdateDistribution`, and the equivalent DistributionTenant /
356    /// ConnectionGroup ops. Tests pass a small duration so they don't
357    /// have to wait wall-clock seconds for the `InProgress` -> `Deployed`
358    /// transition.
359    pub fn with_propagation_delay(mut self, delay: std::time::Duration) -> Self {
360        self.propagation_delay = delay;
361        self
362    }
363
364    /// Flip a stored Distribution's status synchronously. Returns `false`
365    /// if no distribution matches `id` across any account. The admin
366    /// endpoint `POST /_fakecloud/cloudfront/distributions/{id}/status`
367    /// calls this so tests can force `InProgress` <-> `Deployed` without
368    /// waiting on the propagation tick.
369    pub fn set_distribution_status(&self, id: &str, status: &str) -> bool {
370        let mut state = self.state.write();
371        for account in state.accounts.values_mut() {
372            if let Some(dist) = account.distributions.get_mut(id) {
373                dist.status = status.to_string();
374                return true;
375            }
376        }
377        false
378    }
379
380    /// Admin-endpoint flavour of [`set_distribution_status`](Self::set_distribution_status)
381    /// that persists the forced status so it survives a restart.
382    pub async fn set_distribution_status_persistent(&self, id: &str, status: &str) -> bool {
383        let changed = self.set_distribution_status(id, status);
384        if changed {
385            self.save_snapshot().await;
386        }
387        changed
388    }
389}
390
391/// Persist the current CloudFront state as a snapshot. Offloads the serde +
392/// blocking file write to the Tokio blocking pool. Noop when `store` is `None`
393/// (memory mode). Shared by `CloudFrontService::save_snapshot`, the propagation
394/// ticks, and the CloudFormation provisioner persist hook so all route through
395/// the same serialize-and-write path.
396pub async fn save_cloudfront_snapshot(
397    state: &SharedCloudFrontState,
398    store: Option<Arc<dyn SnapshotStore>>,
399    lock: &AsyncMutex<()>,
400) {
401    let Some(store) = store else {
402        return;
403    };
404    let _guard = lock.lock().await;
405    let snapshot = CloudFrontSnapshot {
406        schema_version: CLOUDFRONT_SNAPSHOT_SCHEMA_VERSION,
407        accounts: Some(state.read().clone()),
408    };
409    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
410        let bytes = serde_json::to_vec(&snapshot)
411            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
412        store.save(&bytes)
413    })
414    .await;
415    match join {
416        Ok(Ok(())) => {}
417        Ok(Err(err)) => tracing::error!(%err, "failed to write cloudfront snapshot"),
418        Err(err) => tracing::error!(%err, "cloudfront snapshot task panicked"),
419    }
420}
421
422impl Default for CloudFrontService {
423    fn default() -> Self {
424        Self::new(Arc::new(RwLock::new(CloudFrontAccounts::new())))
425    }
426}
427
428#[async_trait]
429impl AwsService for CloudFrontService {
430    fn service_name(&self) -> &str {
431        "cloudfront"
432    }
433
434    fn supported_actions(&self) -> &[&str] {
435        SUPPORTED_ACTIONS
436    }
437
438    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
439        let resolved = match route(&req.method, &req.raw_path, &req.raw_query) {
440            Some(r) => r,
441            None => {
442                return Err(aws_error(
443                    StatusCode::NOT_FOUND,
444                    "InvalidArgument",
445                    format!("Unknown CloudFront route: {} {}", req.method, req.raw_path),
446                ));
447            }
448        };
449
450        let mutates = is_mutating_action(resolved.action);
451        let result = match resolved.action {
452            "CreateDistribution" => self.create_distribution(&req, false),
453            "CreateDistributionWithTags" => self.create_distribution(&req, true),
454            "GetDistribution" => self.get_distribution(&resolved),
455            "GetDistributionConfig" => self.get_distribution_config(&resolved),
456            "UpdateDistribution" => self.update_distribution(&req, &resolved),
457            "DeleteDistribution" => self.delete_distribution(&req, &resolved),
458            "ListDistributions" => self.list_distributions(&req),
459            "CopyDistribution" => self.copy_distribution(&req, &resolved),
460            "CreateInvalidation" => self.create_invalidation(&req, &resolved),
461            "GetInvalidation" => self.get_invalidation(&resolved),
462            "ListInvalidations" => self.list_invalidations(&resolved),
463            "TagResource" => self.tag_resource(&req),
464            "UntagResource" => self.untag_resource(&req),
465            "ListTagsForResource" => self.list_tags_for_resource(&req),
466            "AssociateAlias" => self.associate_alias(&req, &resolved),
467            "ListConflictingAliases" => self.list_conflicting_aliases(&req),
468            "AssociateDistributionWebACL" => self.associate_web_acl(&req, &resolved),
469            "DisassociateDistributionWebACL" => self.disassociate_web_acl(&req, &resolved),
470            "ListDistributionsByCachePolicyId"
471            | "ListDistributionsByOriginRequestPolicyId"
472            | "ListDistributionsByResponseHeadersPolicyId"
473            | "ListDistributionsByKeyGroup"
474            | "ListDistributionsByWebACLId"
475            | "ListDistributionsByVpcOriginId"
476            | "ListDistributionsByAnycastIpListId"
477            | "ListDistributionsByConnectionMode"
478            | "ListDistributionsByConnectionFunction"
479            | "ListDistributionsByOwnedResource"
480            | "ListDistributionsByTrustStore"
481            | "ListDistributionsByRealtimeLogConfig" => {
482                self.list_distributions_by(&req, &resolved, resolved.action)
483            }
484            "CreateOriginAccessControl" => self.create_origin_access_control(&req),
485            "GetOriginAccessControl" => self.get_origin_access_control(&resolved),
486            "GetOriginAccessControlConfig" => self.get_origin_access_control_config(&resolved),
487            "UpdateOriginAccessControl" => self.update_origin_access_control(&req, &resolved),
488            "DeleteOriginAccessControl" => self.delete_origin_access_control(&req, &resolved),
489            "ListOriginAccessControls" => self.list_origin_access_controls(&req),
490            "CreateCachePolicy" => self.create_cache_policy(&req),
491            "GetCachePolicy" => self.get_cache_policy(&resolved),
492            "GetCachePolicyConfig" => self.get_cache_policy_config(&resolved),
493            "UpdateCachePolicy" => self.update_cache_policy(&req, &resolved),
494            "DeleteCachePolicy" => self.delete_cache_policy(&req, &resolved),
495            "ListCachePolicies" => self.list_cache_policies(&req),
496            "CreateOriginRequestPolicy" => self.create_origin_request_policy(&req),
497            "GetOriginRequestPolicy" => self.get_origin_request_policy(&resolved),
498            "GetOriginRequestPolicyConfig" => self.get_origin_request_policy_config(&resolved),
499            "UpdateOriginRequestPolicy" => self.update_origin_request_policy(&req, &resolved),
500            "DeleteOriginRequestPolicy" => self.delete_origin_request_policy(&req, &resolved),
501            "ListOriginRequestPolicies" => self.list_origin_request_policies(&req),
502            "CreateResponseHeadersPolicy" => self.create_response_headers_policy(&req),
503            "GetResponseHeadersPolicy" => self.get_response_headers_policy(&resolved),
504            "GetResponseHeadersPolicyConfig" => self.get_response_headers_policy_config(&resolved),
505            "UpdateResponseHeadersPolicy" => self.update_response_headers_policy(&req, &resolved),
506            "DeleteResponseHeadersPolicy" => self.delete_response_headers_policy(&req, &resolved),
507            "ListResponseHeadersPolicies" => self.list_response_headers_policies(&req),
508            "CreateContinuousDeploymentPolicy" => self.create_continuous_deployment_policy(&req),
509            "GetContinuousDeploymentPolicy" => self.get_continuous_deployment_policy(&resolved),
510            "GetContinuousDeploymentPolicyConfig" => {
511                self.get_continuous_deployment_policy_config(&resolved)
512            }
513            "UpdateContinuousDeploymentPolicy" => {
514                self.update_continuous_deployment_policy(&req, &resolved)
515            }
516            "DeleteContinuousDeploymentPolicy" => {
517                self.delete_continuous_deployment_policy(&req, &resolved)
518            }
519            "ListContinuousDeploymentPolicies" => self.list_continuous_deployment_policies(&req),
520            "CreateFunction" => self.create_function(&req),
521            "DescribeFunction" => self.describe_function(&req, &resolved),
522            "GetFunction" => self.get_function(&req, &resolved),
523            "UpdateFunction" => self.update_function(&req, &resolved),
524            "DeleteFunction" => self.delete_function(&req, &resolved),
525            "ListFunctions" => self.list_functions(&req),
526            "PublishFunction" => self.publish_function(&req, &resolved),
527            "TestFunction" => self.test_function(&req, &resolved),
528            "CreatePublicKey" => self.create_public_key(&req),
529            "GetPublicKey" => self.get_public_key(&resolved),
530            "GetPublicKeyConfig" => self.get_public_key_config(&resolved),
531            "UpdatePublicKey" => self.update_public_key(&req, &resolved),
532            "DeletePublicKey" => self.delete_public_key(&req, &resolved),
533            "ListPublicKeys" => self.list_public_keys(&req),
534            "CreateKeyGroup" => self.create_key_group(&req),
535            "GetKeyGroup" => self.get_key_group(&resolved),
536            "GetKeyGroupConfig" => self.get_key_group_config(&resolved),
537            "UpdateKeyGroup" => self.update_key_group(&req, &resolved),
538            "DeleteKeyGroup" => self.delete_key_group(&req, &resolved),
539            "ListKeyGroups" => self.list_key_groups(&req),
540            "CreateKeyValueStore" => self.create_key_value_store(&req),
541            "DescribeKeyValueStore" => self.describe_key_value_store(&resolved),
542            "UpdateKeyValueStore" => self.update_key_value_store(&req, &resolved),
543            "DeleteKeyValueStore" => self.delete_key_value_store(&req, &resolved),
544            "ListKeyValueStores" => self.list_key_value_stores(&req),
545            "CreateCloudFrontOriginAccessIdentity" => self.create_oai(&req),
546            "GetCloudFrontOriginAccessIdentity" => self.get_oai(&resolved),
547            "GetCloudFrontOriginAccessIdentityConfig" => self.get_oai_config(&resolved),
548            "UpdateCloudFrontOriginAccessIdentity" => self.update_oai(&req, &resolved),
549            "DeleteCloudFrontOriginAccessIdentity" => self.delete_oai(&req, &resolved),
550            "ListCloudFrontOriginAccessIdentities" => self.list_oai(&req),
551            "CreateMonitoringSubscription" => self.create_monitoring_subscription(&req, &resolved),
552            "GetMonitoringSubscription" => self.get_monitoring_subscription(&resolved),
553            "DeleteMonitoringSubscription" => self.delete_monitoring_subscription(&resolved),
554            "CreateStreamingDistribution" => self.create_streaming_distribution(&req, false),
555            "CreateStreamingDistributionWithTags" => self.create_streaming_distribution(&req, true),
556            "GetStreamingDistribution" => self.get_streaming_distribution(&resolved),
557            "GetStreamingDistributionConfig" => self.get_streaming_distribution_config(&resolved),
558            "UpdateStreamingDistribution" => self.update_streaming_distribution(&req, &resolved),
559            "DeleteStreamingDistribution" => self.delete_streaming_distribution(&req, &resolved),
560            "ListStreamingDistributions" => self.list_streaming_distributions(&req),
561            "CreateFieldLevelEncryptionConfig" => self.create_field_level_encryption_config(&req),
562            "GetFieldLevelEncryption" => self.get_field_level_encryption(&resolved),
563            "GetFieldLevelEncryptionConfig" => self.get_field_level_encryption_config(&resolved),
564            "UpdateFieldLevelEncryptionConfig" => {
565                self.update_field_level_encryption_config(&req, &resolved)
566            }
567            "DeleteFieldLevelEncryptionConfig" => {
568                self.delete_field_level_encryption_config(&req, &resolved)
569            }
570            "ListFieldLevelEncryptionConfigs" => self.list_field_level_encryption_configs(&req),
571            "CreateFieldLevelEncryptionProfile" => self.create_field_level_encryption_profile(&req),
572            "GetFieldLevelEncryptionProfile" => self.get_field_level_encryption_profile(&resolved),
573            "GetFieldLevelEncryptionProfileConfig" => {
574                self.get_field_level_encryption_profile_config(&resolved)
575            }
576            "UpdateFieldLevelEncryptionProfile" => {
577                self.update_field_level_encryption_profile(&req, &resolved)
578            }
579            "DeleteFieldLevelEncryptionProfile" => {
580                self.delete_field_level_encryption_profile(&req, &resolved)
581            }
582            "ListFieldLevelEncryptionProfiles" => self.list_field_level_encryption_profiles(&req),
583            "CreateRealtimeLogConfig" => self.create_realtime_log_config(&req),
584            "GetRealtimeLogConfig" => self.get_realtime_log_config(&req),
585            "UpdateRealtimeLogConfig" => self.update_realtime_log_config(&req),
586            "DeleteRealtimeLogConfig" => self.delete_realtime_log_config(&req),
587            "ListRealtimeLogConfigs" => self.list_realtime_log_configs(&req),
588            "CreateVpcOrigin" => self.create_vpc_origin(&req),
589            "GetVpcOrigin" => self.get_vpc_origin(&resolved),
590            "UpdateVpcOrigin" => self.update_vpc_origin(&req, &resolved),
591            "DeleteVpcOrigin" => self.delete_vpc_origin(&req, &resolved),
592            "ListVpcOrigins" => self.list_vpc_origins(&req),
593            "CreateAnycastIpList" => self.create_anycast_ip_list(&req),
594            "GetAnycastIpList" => self.get_anycast_ip_list(&resolved),
595            "UpdateAnycastIpList" => self.update_anycast_ip_list(&req, &resolved),
596            "DeleteAnycastIpList" => self.delete_anycast_ip_list(&req, &resolved),
597            "ListAnycastIpLists" => self.list_anycast_ip_lists(&req),
598            "CreateTrustStore" => self.create_trust_store(&req),
599            "GetTrustStore" => self.get_trust_store(&resolved),
600            "UpdateTrustStore" => self.update_trust_store(&req, &resolved),
601            "DeleteTrustStore" => self.delete_trust_store(&req, &resolved),
602            "ListTrustStores" => self.list_trust_stores(&req),
603            "GetResourcePolicy" => self.get_resource_policy(&req),
604            "PutResourcePolicy" => self.put_resource_policy(&req),
605            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
606            "CreateConnectionGroup" => self.create_connection_group(&req),
607            "GetConnectionGroup" => self.get_connection_group(&resolved),
608            "GetConnectionGroupByRoutingEndpoint" => {
609                self.get_connection_group_by_routing_endpoint(&req)
610            }
611            "UpdateConnectionGroup" => self.update_connection_group(&req, &resolved),
612            "DeleteConnectionGroup" => self.delete_connection_group(&req, &resolved),
613            "ListConnectionGroups" => self.list_connection_groups(&req),
614            "ListDomainConflicts" => self.list_domain_conflicts(&req),
615            "UpdateDomainAssociation" => self.update_domain_association(&req),
616            "VerifyDnsConfiguration" => self.verify_dns_configuration(&req),
617            "GetManagedCertificateDetails" => self.get_managed_certificate_details(&resolved),
618            "UpdateDistributionWithStagingConfig" => {
619                self.update_distribution_with_staging_config(&req, &resolved)
620            }
621            "CreateDistributionTenant" => self.create_distribution_tenant(&req),
622            "GetDistributionTenant" => self.get_distribution_tenant(&resolved),
623            "GetDistributionTenantByDomain" => self.get_distribution_tenant_by_domain(&req),
624            "UpdateDistributionTenant" => self.update_distribution_tenant(&req, &resolved),
625            "DeleteDistributionTenant" => self.delete_distribution_tenant(&req, &resolved),
626            "ListDistributionTenants" => self.list_distribution_tenants(&req),
627            "ListDistributionTenantsByCustomization" => {
628                self.list_distribution_tenants_by_customization(&req)
629            }
630            "AssociateDistributionTenantWebACL" => {
631                self.associate_distribution_tenant_web_acl(&req, &resolved)
632            }
633            "DisassociateDistributionTenantWebACL" => {
634                self.disassociate_distribution_tenant_web_acl(&req, &resolved)
635            }
636            "CreateInvalidationForDistributionTenant" => {
637                self.create_invalidation_for_distribution_tenant(&req, &resolved)
638            }
639            "GetInvalidationForDistributionTenant" => {
640                self.get_invalidation_for_distribution_tenant(&resolved)
641            }
642            "ListInvalidationsForDistributionTenant" => {
643                self.list_invalidations_for_distribution_tenant(&resolved)
644            }
645            "CreateConnectionFunction" => self.create_connection_function(&req),
646            "GetConnectionFunction" => self.get_connection_function(&resolved),
647            "DescribeConnectionFunction" => self.describe_connection_function(&resolved),
648            "UpdateConnectionFunction" => self.update_connection_function(&req, &resolved),
649            "DeleteConnectionFunction" => self.delete_connection_function(&req, &resolved),
650            "ListConnectionFunctions" => self.list_connection_functions(&req),
651            "PublishConnectionFunction" => self.publish_connection_function(&req, &resolved),
652            "TestConnectionFunction" => self.test_connection_function(&req, &resolved),
653            other => Err(aws_error(
654                StatusCode::NOT_IMPLEMENTED,
655                "InvalidAction",
656                format!("CloudFront action {other} is not implemented yet"),
657            )),
658        };
659        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
660            self.save_snapshot().await;
661        }
662        result
663    }
664}
665
666// ─── Distribution handlers ────────────────────────────────────────────
667
668impl CloudFrontService {
669    fn create_distribution(
670        &self,
671        req: &AwsRequest,
672        with_tags: bool,
673    ) -> Result<AwsResponse, AwsServiceError> {
674        let (config, tags) = if with_tags {
675            let parsed: DistributionConfigWithTags = xml_io::from_xml_root(&req.body)
676                .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
677            let tags = parsed
678                .tags
679                .items
680                .map(|i| {
681                    i.tag
682                        .into_iter()
683                        .map(|t| Tag {
684                            key: t.key,
685                            value: t.value,
686                        })
687                        .collect()
688                })
689                .unwrap_or_default();
690            (parsed.distribution_config, tags)
691        } else {
692            let parsed: DistributionConfig = xml_io::from_xml_root(&req.body)
693                .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
694            (parsed, Vec::new())
695        };
696
697        validate_caller_reference(&config.caller_reference)?;
698        validate_origins(&config)?;
699
700        let mut state = self.state.write();
701        let account = state.entry(account_id(req));
702
703        if let Some(existing) = account
704            .distributions
705            .values()
706            .find(|d| d.config.caller_reference == config.caller_reference)
707        {
708            return Err(aws_error(
709                StatusCode::CONFLICT,
710                "DistributionAlreadyExists",
711                format!(
712                    "Distribution with the same CallerReference exists: {}",
713                    existing.id
714                ),
715            ));
716        }
717
718        let id = generate_distribution_id();
719        let now = Utc::now();
720        let etag = generate_etag();
721        let domain = format!("{}.cloudfront.net", id.to_lowercase());
722        let arn = format!(
723            "arn:aws:cloudfront::{}:distribution/{}",
724            account_id(req),
725            id
726        );
727
728        let stored = StoredDistribution {
729            id: id.clone(),
730            arn: arn.clone(),
731            // Real CloudFront returns InProgress immediately and flips to
732            // Deployed ~15 minutes later once the edge propagation completes.
733            // The spawn below mirrors that lifecycle on a configurable delay.
734            status: "InProgress".to_string(),
735            last_modified_time: now,
736            domain_name: domain,
737            in_progress_invalidation_batches: 0,
738            etag: etag.clone(),
739            config,
740        };
741        account.distributions.insert(id.clone(), stored.clone());
742        if !tags.is_empty() {
743            account.tags.insert(arn.clone(), tags);
744        }
745        drop(state);
746
747        self.schedule_distribution_deploy(id.clone());
748
749        let body = build_distribution_xml(&stored);
750        let mut headers = HeaderMap::new();
751        set_header(&mut headers, ETAG, &etag);
752        set_header(&mut headers, LOCATION, &stored.arn);
753        Ok(xml_response(StatusCode::CREATED, body, headers))
754    }
755
756    /// Spawn a tokio task that flips the named distribution from
757    /// `InProgress` to `Deployed` after `propagation_delay`. Used by
758    /// `CreateDistribution` and `UpdateDistribution` to model the real
759    /// CloudFront edge propagation lifecycle.
760    fn schedule_distribution_deploy(&self, id: String) {
761        let state = Arc::clone(&self.state);
762        let delay = self.propagation_delay;
763        let store = self.snapshot_store.clone();
764        let lock = self.snapshot_lock.clone();
765        tokio::spawn(async move {
766            tokio::time::sleep(delay).await;
767            let flipped = {
768                let mut s = state.write();
769                let mut flipped = false;
770                for account in s.accounts.values_mut() {
771                    if let Some(d) = account.distributions.get_mut(&id) {
772                        if d.status == "InProgress" {
773                            d.status = "Deployed".to_string();
774                            flipped = true;
775                        }
776                        break;
777                    }
778                }
779                flipped
780            };
781            if flipped {
782                save_cloudfront_snapshot(&state, store, &lock).await;
783            }
784        });
785    }
786
787    /// Same as [`schedule_distribution_deploy`] but for DistributionTenants.
788    pub(crate) fn schedule_distribution_tenant_deploy(&self, id: String) {
789        let state = Arc::clone(&self.state);
790        let delay = self.propagation_delay;
791        let store = self.snapshot_store.clone();
792        let lock = self.snapshot_lock.clone();
793        tokio::spawn(async move {
794            tokio::time::sleep(delay).await;
795            let flipped = {
796                let mut s = state.write();
797                let mut flipped = false;
798                for account in s.accounts.values_mut() {
799                    if let Some(t) = account.distribution_tenants.get_mut(&id) {
800                        if t.status == "InProgress" {
801                            t.status = "Deployed".to_string();
802                            flipped = true;
803                        }
804                        break;
805                    }
806                }
807                flipped
808            };
809            if flipped {
810                save_cloudfront_snapshot(&state, store, &lock).await;
811            }
812        });
813    }
814
815    /// Same as [`schedule_distribution_deploy`] but for ConnectionGroups.
816    pub(crate) fn schedule_connection_group_deploy(&self, id: String) {
817        let state = Arc::clone(&self.state);
818        let delay = self.propagation_delay;
819        let store = self.snapshot_store.clone();
820        let lock = self.snapshot_lock.clone();
821        tokio::spawn(async move {
822            tokio::time::sleep(delay).await;
823            let flipped = {
824                let mut s = state.write();
825                let mut flipped = false;
826                for account in s.accounts.values_mut() {
827                    if let Some(g) = account.connection_groups.get_mut(&id) {
828                        if g.status == "InProgress" {
829                            g.status = "Deployed".to_string();
830                            flipped = true;
831                        }
832                        break;
833                    }
834                }
835                flipped
836            };
837            if flipped {
838                save_cloudfront_snapshot(&state, store, &lock).await;
839            }
840        });
841    }
842
843    /// Same as [`schedule_distribution_deploy`] but for StreamingDistributions.
844    /// Real CloudFront mirrors the `InProgress` -> `Deployed` lifecycle for
845    /// RTMP streaming distributions, even though the resource is largely
846    /// deprecated.
847    pub(crate) fn schedule_streaming_distribution_deploy(&self, id: String) {
848        let state = Arc::clone(&self.state);
849        let delay = self.propagation_delay;
850        let store = self.snapshot_store.clone();
851        let lock = self.snapshot_lock.clone();
852        tokio::spawn(async move {
853            tokio::time::sleep(delay).await;
854            let flipped = {
855                let mut s = state.write();
856                let mut flipped = false;
857                for account in s.accounts.values_mut() {
858                    if let Some(d) = account.streaming_distributions.get_mut(&id) {
859                        if d.status == "InProgress" {
860                            d.status = "Deployed".to_string();
861                            flipped = true;
862                        }
863                        break;
864                    }
865                }
866                flipped
867            };
868            if flipped {
869                save_cloudfront_snapshot(&state, store, &lock).await;
870            }
871        });
872    }
873
874    fn get_distribution(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
875        let id = route
876            .id
877            .as_deref()
878            .ok_or_else(|| invalid_argument("missing distribution id"))?;
879        let state = self.state.read();
880        let account = state
881            .accounts
882            .get(DEFAULT_ACCOUNT)
883            .ok_or_else(|| no_such_distribution(id))?;
884        let dist = account
885            .distributions
886            .get(id)
887            .ok_or_else(|| no_such_distribution(id))?
888            .clone();
889        drop(state);
890        let body = build_distribution_xml(&dist);
891        let mut headers = HeaderMap::new();
892        set_header(&mut headers, ETAG, &dist.etag);
893        Ok(xml_response(StatusCode::OK, body, headers))
894    }
895
896    fn get_distribution_config(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
897        let id = route
898            .id
899            .as_deref()
900            .ok_or_else(|| invalid_argument("missing distribution id"))?;
901        let state = self.state.read();
902        let account = state
903            .accounts
904            .get(DEFAULT_ACCOUNT)
905            .ok_or_else(|| no_such_distribution(id))?;
906        let dist = account
907            .distributions
908            .get(id)
909            .ok_or_else(|| no_such_distribution(id))?
910            .clone();
911        drop(state);
912        let body = xml_io::to_xml_root("DistributionConfig", &dist.config)
913            .map_err(|e| internal_error(format!("xml encode failed: {e}")))?;
914        let mut headers = HeaderMap::new();
915        set_header(&mut headers, ETAG, &dist.etag);
916        Ok(xml_response(StatusCode::OK, body, headers))
917    }
918
919    fn update_distribution(
920        &self,
921        req: &AwsRequest,
922        route: &Route,
923    ) -> Result<AwsResponse, AwsServiceError> {
924        let id = route
925            .id
926            .as_deref()
927            .ok_or_else(|| invalid_argument("missing distribution id"))?;
928        let if_match = req
929            .headers
930            .get(IF_MATCH)
931            .and_then(|v| v.to_str().ok())
932            .ok_or_else(|| {
933                aws_error(
934                    StatusCode::BAD_REQUEST,
935                    "InvalidIfMatchVersion",
936                    "Missing If-Match header for UpdateDistribution",
937                )
938            })?
939            .to_string();
940        let new_config: DistributionConfig = xml_io::from_xml_root(&req.body)
941            .map_err(|e| invalid_argument(format!("invalid DistributionConfig XML: {e}")))?;
942        validate_caller_reference(&new_config.caller_reference)?;
943        validate_origins(&new_config)?;
944
945        let mut state = self.state.write();
946        let account = state
947            .accounts
948            .get_mut(DEFAULT_ACCOUNT)
949            .ok_or_else(|| no_such_distribution(id))?;
950        let dist = account
951            .distributions
952            .get_mut(id)
953            .ok_or_else(|| no_such_distribution(id))?;
954        if dist.etag != if_match {
955            return Err(aws_error(
956                StatusCode::PRECONDITION_FAILED,
957                "PreconditionFailed",
958                "If-Match header does not match the current ETag",
959            ));
960        }
961        // ETag stability: only bump the ETag and flip status back to
962        // InProgress when the new config actually differs from what we
963        // have on disk. A no-op UpdateDistribution (PUT the same config
964        // back) leaves the ETag intact, matching AWS behavior.
965        let config_changed = !configs_equal(&dist.config, &new_config);
966        if config_changed {
967            dist.config = new_config;
968            dist.etag = generate_etag();
969            dist.last_modified_time = Utc::now();
970            // UpdateDistribution kicks off a fresh edge propagation; AWS
971            // flips the status back to InProgress until the new config
972            // lands.
973            dist.status = "InProgress".to_string();
974        }
975        let snapshot = dist.clone();
976        drop(state);
977
978        if config_changed {
979            self.schedule_distribution_deploy(id.to_string());
980        }
981
982        let body = build_distribution_xml(&snapshot);
983        let mut headers = HeaderMap::new();
984        set_header(&mut headers, ETAG, &snapshot.etag);
985        Ok(xml_response(StatusCode::OK, body, headers))
986    }
987
988    fn delete_distribution(
989        &self,
990        req: &AwsRequest,
991        route: &Route,
992    ) -> Result<AwsResponse, AwsServiceError> {
993        let id = route
994            .id
995            .as_deref()
996            .ok_or_else(|| invalid_argument("missing distribution id"))?;
997        let if_match = req
998            .headers
999            .get(IF_MATCH)
1000            .and_then(|v| v.to_str().ok())
1001            .ok_or_else(|| {
1002                aws_error(
1003                    StatusCode::BAD_REQUEST,
1004                    "InvalidIfMatchVersion",
1005                    "Missing If-Match header for DeleteDistribution",
1006                )
1007            })?
1008            .to_string();
1009        let mut state = self.state.write();
1010        let account = state
1011            .accounts
1012            .get_mut(DEFAULT_ACCOUNT)
1013            .ok_or_else(|| no_such_distribution(id))?;
1014        {
1015            let dist = account
1016                .distributions
1017                .get(id)
1018                .ok_or_else(|| no_such_distribution(id))?;
1019            if dist.etag != if_match {
1020                return Err(aws_error(
1021                    StatusCode::PRECONDITION_FAILED,
1022                    "PreconditionFailed",
1023                    "If-Match header does not match the current ETag",
1024                ));
1025            }
1026            if dist.config.enabled {
1027                return Err(aws_error(
1028                    StatusCode::PRECONDITION_FAILED,
1029                    "DistributionNotDisabled",
1030                    "Distribution must be disabled before delete",
1031                ));
1032            }
1033        }
1034        let removed = account.distributions.remove(id).unwrap();
1035        account.tags.remove(&removed.arn);
1036        Ok(empty_response(StatusCode::NO_CONTENT))
1037    }
1038
1039    fn list_distributions(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1040        let state = self.state.read();
1041        let mut dists: Vec<StoredDistribution> = state
1042            .accounts
1043            .values()
1044            .flat_map(|a| a.distributions.values().cloned())
1045            .collect();
1046        dists.sort_by_key(|a| a.last_modified_time);
1047        drop(state);
1048        let body = build_distribution_list_xml(&dists, "DistributionList");
1049        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1050    }
1051
1052    fn list_distributions_by(
1053        &self,
1054        req: &AwsRequest,
1055        route: &Route,
1056        action: &str,
1057    ) -> Result<AwsResponse, AwsServiceError> {
1058        // Each "by-X" listing has a Smithy-required identifier (path or query)
1059        // plus, for ConnectionMode, an enum-constrained discriminator. The
1060        // synthetic probe sends `negative_omit_*` variants without these, so
1061        // every handler that can short-circuit on a missing identifier must
1062        // do so up-front. Otherwise the empty-list 200 looks like an honest
1063        // pass to the probe and a 100% conformance number is unreachable.
1064        match action {
1065            // Path-id ops: route.id is the URL placeholder. If the probe
1066            // omitted the field, the substitution left the literal
1067            // `{Member}` braces in place — easy to detect.
1068            "ListDistributionsByCachePolicyId"
1069            | "ListDistributionsByOriginRequestPolicyId"
1070            | "ListDistributionsByResponseHeadersPolicyId"
1071            | "ListDistributionsByKeyGroup"
1072            | "ListDistributionsByWebACLId"
1073            | "ListDistributionsByVpcOriginId"
1074            | "ListDistributionsByAnycastIpListId"
1075            | "ListDistributionsByOwnedResource" => {
1076                let id = route.id.as_deref().unwrap_or("");
1077                if is_placeholder_label(id) {
1078                    return Err(invalid_argument(format!(
1079                        "Required URL identifier for {action} is missing or invalid"
1080                    )));
1081                }
1082            }
1083            "ListDistributionsByConnectionMode" => {
1084                let id = route.id.as_deref().unwrap_or("");
1085                if is_placeholder_label(id) {
1086                    return Err(invalid_argument(
1087                        "ConnectionMode is required for ListDistributionsByConnectionMode",
1088                    ));
1089                }
1090                if id != "direct" && id != "tenant-only" {
1091                    return Err(invalid_argument(format!(
1092                        "ConnectionMode must be 'direct' or 'tenant-only', got '{id}'"
1093                    )));
1094                }
1095            }
1096            "ListDistributionsByConnectionFunction"
1097                if parse_query_value(&req.raw_query, "ConnectionFunctionIdentifier").is_none() =>
1098            {
1099                return Err(invalid_argument(
1100                    "ConnectionFunctionIdentifier query parameter is required",
1101                ));
1102            }
1103            "ListDistributionsByTrustStore"
1104                if parse_query_value(&req.raw_query, "TrustStoreIdentifier").is_none() =>
1105            {
1106                return Err(invalid_argument(
1107                    "TrustStoreIdentifier query parameter is required",
1108                ));
1109            }
1110            _ => {}
1111        }
1112
1113        // The "by-X" listings each have a distinct response root element.
1114        // We never index distributions by the predicate (that would require
1115        // shipping each policy/key-group/etc service first), so each
1116        // response is empty until those services land.
1117        let root = match action {
1118            "ListDistributionsByCachePolicyId"
1119            | "ListDistributionsByOriginRequestPolicyId"
1120            | "ListDistributionsByResponseHeadersPolicyId"
1121            | "ListDistributionsByKeyGroup"
1122            | "ListDistributionsByVpcOriginId" => "DistributionIdList",
1123            "ListDistributionsByOwnedResource" => "DistributionIdOwnerList",
1124            _ => "DistributionList",
1125        };
1126        let body = build_empty_distribution_id_list(root);
1127        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1128    }
1129
1130    fn copy_distribution(
1131        &self,
1132        req: &AwsRequest,
1133        route: &Route,
1134    ) -> Result<AwsResponse, AwsServiceError> {
1135        let primary_id = route
1136            .id
1137            .as_deref()
1138            .ok_or_else(|| invalid_argument("missing primary distribution id"))?;
1139        let if_match = req
1140            .headers
1141            .get(IF_MATCH)
1142            .and_then(|v| v.to_str().ok())
1143            .ok_or_else(|| {
1144                aws_error(
1145                    StatusCode::BAD_REQUEST,
1146                    "InvalidIfMatchVersion",
1147                    "Missing If-Match header for CopyDistribution",
1148                )
1149            })?
1150            .to_string();
1151        let parsed: CopyDistributionRequest = xml_io::from_xml_root(&req.body)
1152            .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
1153        validate_caller_reference(&parsed.caller_reference)?;
1154        let mut state = self.state.write();
1155        let account = state
1156            .accounts
1157            .get_mut(DEFAULT_ACCOUNT)
1158            .ok_or_else(|| no_such_distribution(primary_id))?;
1159        let primary = account
1160            .distributions
1161            .get(primary_id)
1162            .ok_or_else(|| no_such_distribution(primary_id))?
1163            .clone();
1164        if primary.etag != if_match {
1165            return Err(aws_error(
1166                StatusCode::PRECONDITION_FAILED,
1167                "PreconditionFailed",
1168                "If-Match header does not match the current ETag",
1169            ));
1170        }
1171        if account
1172            .distributions
1173            .values()
1174            .any(|d| d.config.caller_reference == parsed.caller_reference)
1175        {
1176            return Err(aws_error(
1177                StatusCode::CONFLICT,
1178                "DistributionAlreadyExists",
1179                "Distribution with the same CallerReference exists",
1180            ));
1181        }
1182        let new_id = generate_distribution_id();
1183        let mut config = primary.config.clone();
1184        config.caller_reference = parsed.caller_reference;
1185        config.enabled = parsed.enabled.unwrap_or(false);
1186        config.staging = parsed.staging;
1187        let now = Utc::now();
1188        let etag = generate_etag();
1189        let arn = format!(
1190            "arn:aws:cloudfront::{}:distribution/{}",
1191            account_id(req),
1192            new_id
1193        );
1194        let stored = StoredDistribution {
1195            id: new_id.clone(),
1196            arn: arn.clone(),
1197            status: "InProgress".to_string(),
1198            last_modified_time: now,
1199            domain_name: format!("{}.cloudfront.net", new_id.to_lowercase()),
1200            in_progress_invalidation_batches: 0,
1201            etag: etag.clone(),
1202            config,
1203        };
1204        account.distributions.insert(new_id.clone(), stored.clone());
1205        drop(state);
1206        self.schedule_distribution_deploy(new_id);
1207        let body = build_distribution_xml(&stored);
1208        let mut headers = HeaderMap::new();
1209        set_header(&mut headers, ETAG, &etag);
1210        set_header(&mut headers, LOCATION, &stored.arn);
1211        Ok(xml_response(StatusCode::CREATED, body, headers))
1212    }
1213}
1214
1215#[derive(Debug, serde::Deserialize, Default)]
1216#[serde(rename_all = "PascalCase")]
1217struct CopyDistributionRequest {
1218    caller_reference: String,
1219    #[serde(default)]
1220    enabled: Option<bool>,
1221    #[serde(default)]
1222    staging: Option<bool>,
1223}
1224
1225// ─── Invalidations ────────────────────────────────────────────────────
1226
1227impl CloudFrontService {
1228    fn create_invalidation(
1229        &self,
1230        req: &AwsRequest,
1231        route: &Route,
1232    ) -> Result<AwsResponse, AwsServiceError> {
1233        let dist_id = route
1234            .id
1235            .as_deref()
1236            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1237        let batch: InvalidationBatch = xml_io::from_xml_root(&req.body)
1238            .map_err(|e| invalid_argument(format!("invalid InvalidationBatch XML: {e}")))?;
1239        if batch.caller_reference.is_empty() {
1240            return Err(invalid_argument("CallerReference is required"));
1241        }
1242        if batch.paths.quantity < 1 {
1243            return Err(invalid_argument(
1244                "InvalidationBatch.Paths must be non-empty",
1245            ));
1246        }
1247        let mut state = self.state.write();
1248        let account = state.entry(DEFAULT_ACCOUNT);
1249        if !account.distributions.contains_key(dist_id) {
1250            return Err(no_such_distribution(dist_id));
1251        }
1252        let id = generate_invalidation_id();
1253        let stored = StoredInvalidation {
1254            id: id.clone(),
1255            distribution_id: dist_id.to_string(),
1256            status: "Completed".to_string(),
1257            create_time: Utc::now(),
1258            batch: batch.clone(),
1259        };
1260        account.invalidations.insert(id.clone(), stored.clone());
1261        drop(state);
1262        let body = build_invalidation_xml(&stored);
1263        let mut headers = HeaderMap::new();
1264        set_header(
1265            &mut headers,
1266            LOCATION,
1267            &format!(
1268                "/2020-05-31/distribution/{dist_id}/invalidation/{}",
1269                stored.id
1270            ),
1271        );
1272        Ok(xml_response(StatusCode::CREATED, body, headers))
1273    }
1274
1275    fn get_invalidation(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
1276        let dist_id = route
1277            .id
1278            .as_deref()
1279            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1280        let inv_id = route
1281            .second_id
1282            .as_deref()
1283            .ok_or_else(|| invalid_argument("missing invalidation id"))?;
1284        let state = self.state.read();
1285        let account = state
1286            .accounts
1287            .get(DEFAULT_ACCOUNT)
1288            .ok_or_else(|| no_such_invalidation(inv_id))?;
1289        if !account.distributions.contains_key(dist_id) {
1290            return Err(no_such_distribution(dist_id));
1291        }
1292        let inv = account
1293            .invalidations
1294            .get(inv_id)
1295            .filter(|i| i.distribution_id == dist_id)
1296            .ok_or_else(|| no_such_invalidation(inv_id))?
1297            .clone();
1298        drop(state);
1299        let body = build_invalidation_xml(&inv);
1300        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1301    }
1302
1303    fn list_invalidations(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
1304        let dist_id = route
1305            .id
1306            .as_deref()
1307            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1308        let state = self.state.read();
1309        let account = state
1310            .accounts
1311            .get(DEFAULT_ACCOUNT)
1312            .ok_or_else(|| no_such_distribution(dist_id))?;
1313        if !account.distributions.contains_key(dist_id) {
1314            return Err(no_such_distribution(dist_id));
1315        }
1316        let mut items: Vec<&StoredInvalidation> = account
1317            .invalidations
1318            .values()
1319            .filter(|i| i.distribution_id == dist_id)
1320            .collect();
1321        items.sort_by_key(|a| a.create_time);
1322        let body = build_invalidation_list_xml(&items);
1323        drop(state);
1324        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1325    }
1326}
1327
1328// ─── Tags ─────────────────────────────────────────────────────────────
1329
1330impl CloudFrontService {
1331    fn parse_arn_query(query: &str) -> Option<String> {
1332        for pair in query.split('&').filter(|p| !p.is_empty()) {
1333            if let Some(rest) = pair.strip_prefix("Resource=") {
1334                return Some(percent_decode(rest));
1335            }
1336        }
1337        None
1338    }
1339
1340    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1341        let arn = Self::parse_arn_query(&req.raw_query)
1342            .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1343        let parsed: ModelTags = xml_io::from_xml_root(&req.body)
1344            .map_err(|e| invalid_argument(format!("invalid Tags XML: {e}")))?;
1345        let new_tags: Vec<Tag> = parsed
1346            .items
1347            .map(|i| {
1348                i.tag
1349                    .into_iter()
1350                    .map(|t| Tag {
1351                        key: t.key,
1352                        value: t.value,
1353                    })
1354                    .collect()
1355            })
1356            .unwrap_or_default();
1357        let mut state = self.state.write();
1358        let account = state.entry(DEFAULT_ACCOUNT);
1359        let entry = account.tags.entry(arn).or_default();
1360        for tag in new_tags {
1361            if let Some(existing) = entry.iter_mut().find(|t| t.key == tag.key) {
1362                existing.value = tag.value;
1363            } else {
1364                entry.push(tag);
1365            }
1366        }
1367        Ok(empty_response(StatusCode::NO_CONTENT))
1368    }
1369
1370    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1371        let arn = Self::parse_arn_query(&req.raw_query)
1372            .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1373        let parsed: TagKeys = xml_io::from_xml_root(&req.body)
1374            .map_err(|e| invalid_argument(format!("invalid TagKeys XML: {e}")))?;
1375        let keys: Vec<String> = parsed.items.map(|k| k.key).unwrap_or_default();
1376        let mut state = self.state.write();
1377        let account = state.entry(DEFAULT_ACCOUNT);
1378        if let Some(existing) = account.tags.get_mut(&arn) {
1379            existing.retain(|t| !keys.contains(&t.key));
1380        }
1381        Ok(empty_response(StatusCode::NO_CONTENT))
1382    }
1383
1384    fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1385        let arn = Self::parse_arn_query(&req.raw_query)
1386            .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1387        let state = self.state.read();
1388        let tags = state
1389            .accounts
1390            .get(DEFAULT_ACCOUNT)
1391            .and_then(|a| a.tags.get(&arn))
1392            .cloned()
1393            .unwrap_or_default();
1394        drop(state);
1395        let body = build_tags_xml(&tags);
1396        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1397    }
1398}
1399
1400// ─── Aliases / WebACL ─────────────────────────────────────────────────
1401
1402impl CloudFrontService {
1403    fn associate_alias(
1404        &self,
1405        req: &AwsRequest,
1406        route: &Route,
1407    ) -> Result<AwsResponse, AwsServiceError> {
1408        let id = route
1409            .id
1410            .as_deref()
1411            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1412        let alias = parse_query_value(&req.raw_query, "Alias")
1413            .ok_or_else(|| invalid_argument("Alias query parameter is required"))?;
1414        let mut state = self.state.write();
1415        let account = state
1416            .accounts
1417            .get_mut(DEFAULT_ACCOUNT)
1418            .ok_or_else(|| no_such_distribution(id))?;
1419        // Reject if the alias is already attached to a different distribution.
1420        if let Some(other) = account.distributions.values().find(|d| {
1421            d.id != id
1422                && d.config
1423                    .aliases
1424                    .as_ref()
1425                    .and_then(|a| a.items.as_ref())
1426                    .is_some_and(|i| i.cname.iter().any(|c| c == &alias))
1427        }) {
1428            return Err(aws_error(
1429                StatusCode::CONFLICT,
1430                "CNAMEAlreadyExists",
1431                format!(
1432                    "Alias {alias} is already associated with distribution {}",
1433                    other.id
1434                ),
1435            ));
1436        }
1437        let dist = account
1438            .distributions
1439            .get_mut(id)
1440            .ok_or_else(|| no_such_distribution(id))?;
1441        let aliases = dist.config.aliases.get_or_insert_with(Default::default);
1442        let items = aliases
1443            .items
1444            .get_or_insert_with(crate::model::AliasItems::default);
1445        if !items.cname.iter().any(|c| c == &alias) {
1446            items.cname.push(alias.clone());
1447            aliases.quantity = items.cname.len() as i32;
1448        }
1449        dist.etag = generate_etag();
1450        dist.last_modified_time = Utc::now();
1451        Ok(empty_response(StatusCode::OK))
1452    }
1453
1454    fn list_conflicting_aliases(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1455        let alias = parse_query_value(&req.raw_query, "Alias")
1456            .ok_or_else(|| invalid_argument("Alias query parameter is required"))?;
1457        let dist_id = parse_query_value(&req.raw_query, "DistributionId")
1458            .ok_or_else(|| invalid_argument("DistributionId query parameter is required"))?;
1459        // aliasString max 253, distributionIdString max 25 per the Smithy
1460        // model. Reject probe-generated boundary variants that overrun the
1461        // documented length so they produce the declared InvalidArgument
1462        // instead of an empty 200.
1463        if alias.len() > 253 {
1464            return Err(invalid_argument(format!(
1465                "Alias length {} exceeds maximum 253",
1466                alias.len()
1467            )));
1468        }
1469        if dist_id.len() > 25 {
1470            return Err(invalid_argument(format!(
1471                "DistributionId length {} exceeds maximum 25",
1472                dist_id.len()
1473            )));
1474        }
1475        if let Some(max_items) = parse_query_value(&req.raw_query, "MaxItems") {
1476            let n: i64 = max_items.parse().map_err(|_| {
1477                invalid_argument(format!("MaxItems must be an integer, got '{max_items}'"))
1478            })?;
1479            if n > 100 {
1480                return Err(invalid_argument(format!(
1481                    "MaxItems {n} exceeds maximum 100"
1482                )));
1483            }
1484        }
1485        // We never produce conflicts because every alias is owned by one
1486        // distribution at most. Return an empty list with the proper shape.
1487        let body = format!(
1488            "{XML_DECL}<ConflictingAliasesList xmlns=\"{NS}\"><Quantity>0</Quantity></ConflictingAliasesList>",
1489            NS = crate::NAMESPACE,
1490            XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1491        );
1492        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1493    }
1494
1495    fn associate_web_acl(
1496        &self,
1497        req: &AwsRequest,
1498        route: &Route,
1499    ) -> Result<AwsResponse, AwsServiceError> {
1500        let id = route
1501            .id
1502            .as_deref()
1503            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1504        let parsed: AssociateAliasRequest = xml_io::from_xml_root(&req.body)
1505            .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
1506        let mut state = self.state.write();
1507        let account = state
1508            .accounts
1509            .get_mut(DEFAULT_ACCOUNT)
1510            .ok_or_else(|| no_such_distribution(id))?;
1511        let dist = account
1512            .distributions
1513            .get_mut(id)
1514            .ok_or_else(|| no_such_distribution(id))?;
1515        dist.config.web_acl_id = Some(parsed.web_acl_arn.clone());
1516        dist.etag = generate_etag();
1517        dist.last_modified_time = Utc::now();
1518        let body = format!(
1519            "{XML_DECL}<AssociateDistributionWebACLResult xmlns=\"{NS}\"><Id>{}</Id><WebACLArn>{}</WebACLArn></AssociateDistributionWebACLResult>",
1520            esc(id), esc(&parsed.web_acl_arn),
1521            NS = crate::NAMESPACE,
1522            XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1523        );
1524        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1525    }
1526
1527    fn disassociate_web_acl(
1528        &self,
1529        _req: &AwsRequest,
1530        route: &Route,
1531    ) -> Result<AwsResponse, AwsServiceError> {
1532        let id = route
1533            .id
1534            .as_deref()
1535            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1536        // DisassociateDistributionWebACL's Smithy model declares EntityNotFound
1537        // (not NoSuchDistribution) for unknown distribution IDs.
1538        let entity_not_found = || {
1539            aws_error(
1540                StatusCode::NOT_FOUND,
1541                "EntityNotFound",
1542                format!("The specified distribution does not exist: {id}"),
1543            )
1544        };
1545        let mut state = self.state.write();
1546        let account = state
1547            .accounts
1548            .get_mut(DEFAULT_ACCOUNT)
1549            .ok_or_else(entity_not_found)?;
1550        let dist = account
1551            .distributions
1552            .get_mut(id)
1553            .ok_or_else(entity_not_found)?;
1554        dist.config.web_acl_id = None;
1555        dist.etag = generate_etag();
1556        dist.last_modified_time = Utc::now();
1557        let body = format!(
1558            "{XML_DECL}<DisassociateDistributionWebACLResult xmlns=\"{NS}\"><Id>{}</Id></DisassociateDistributionWebACLResult>",
1559            esc(id),
1560            NS = crate::NAMESPACE,
1561            XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1562        );
1563        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1564    }
1565}
1566
1567#[derive(serde::Deserialize, Default, Debug)]
1568#[serde(rename_all = "PascalCase")]
1569struct AssociateAliasRequest {
1570    #[serde(rename = "WebACLArn", default)]
1571    web_acl_arn: String,
1572}
1573
1574// ─── XML body builders ────────────────────────────────────────────────
1575
1576/// XML-escape a user-provided string before injecting it into hand-rolled
1577/// XML response bodies. The 5 standard XML metacharacters are escaped;
1578/// everything else passes through unchanged. Keep this in sync with
1579/// `quick_xml`'s entity table — using their own primitive would mean a
1580/// `Writer` per call and we serialize directly into a `String`.
1581pub(crate) fn esc(s: &str) -> String {
1582    let mut out = String::with_capacity(s.len());
1583    for c in s.chars() {
1584        match c {
1585            '&' => out.push_str("&amp;"),
1586            '<' => out.push_str("&lt;"),
1587            '>' => out.push_str("&gt;"),
1588            '"' => out.push_str("&quot;"),
1589            '\'' => out.push_str("&apos;"),
1590            _ => out.push(c),
1591        }
1592    }
1593    out
1594}
1595
1596pub(crate) fn build_distribution_xml(dist: &StoredDistribution) -> String {
1597    let mut out = String::with_capacity(2048);
1598    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1599    out.push_str(&format!(
1600        "<Distribution xmlns=\"{ns}\">",
1601        ns = crate::NAMESPACE
1602    ));
1603    out.push_str(&format!("<Id>{}</Id>", esc(&dist.id)));
1604    out.push_str(&format!("<ARN>{}</ARN>", esc(&dist.arn)));
1605    out.push_str(&format!("<Status>{}</Status>", esc(&dist.status)));
1606    out.push_str(&format!(
1607        "<LastModifiedTime>{}</LastModifiedTime>",
1608        rfc3339(&dist.last_modified_time)
1609    ));
1610    out.push_str(&format!(
1611        "<InProgressInvalidationBatches>{}</InProgressInvalidationBatches>",
1612        dist.in_progress_invalidation_batches
1613    ));
1614    out.push_str(&format!(
1615        "<DomainName>{}</DomainName>",
1616        esc(&dist.domain_name)
1617    ));
1618    out.push_str("<ActiveTrustedSigners><Enabled>false</Enabled><Quantity>0</Quantity></ActiveTrustedSigners>");
1619    out.push_str("<ActiveTrustedKeyGroups><Enabled>false</Enabled><Quantity>0</Quantity></ActiveTrustedKeyGroups>");
1620    let inner = quick_xml::se::to_string_with_root("DistributionConfig", &dist.config)
1621        .unwrap_or_else(|_| String::new());
1622    out.push_str(&inner);
1623    out.push_str("</Distribution>");
1624    out
1625}
1626
1627fn build_distribution_list_xml(dists: &[StoredDistribution], root: &str) -> String {
1628    let mut out = String::with_capacity(2048);
1629    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1630    out.push_str(&format!("<{root} xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1631    out.push_str("<Marker></Marker>");
1632    out.push_str(&format!("<MaxItems>{}</MaxItems>", dists.len().max(100)));
1633    out.push_str("<IsTruncated>false</IsTruncated>");
1634    out.push_str(&format!("<Quantity>{}</Quantity>", dists.len()));
1635    if dists.is_empty() {
1636        out.push_str(&format!("</{root}>"));
1637        return out;
1638    }
1639    out.push_str("<Items>");
1640    for d in dists {
1641        out.push_str("<DistributionSummary>");
1642        out.push_str(&format!("<Id>{}</Id>", esc(&d.id)));
1643        out.push_str(&format!("<ARN>{}</ARN>", esc(&d.arn)));
1644        out.push_str(&format!("<Status>{}</Status>", esc(&d.status)));
1645        out.push_str(&format!(
1646            "<LastModifiedTime>{}</LastModifiedTime>",
1647            rfc3339(&d.last_modified_time)
1648        ));
1649        out.push_str(&format!("<DomainName>{}</DomainName>", esc(&d.domain_name)));
1650        let aliases = d.config.aliases.clone().unwrap_or_default();
1651        out.push_str(&render_inline("Aliases", &aliases));
1652        let origins = d.config.origins.clone();
1653        out.push_str(&render_inline("Origins", &origins));
1654        let dcb = d.config.default_cache_behavior.clone();
1655        out.push_str(&render_inline("DefaultCacheBehavior", &dcb));
1656        let cb = d.config.cache_behaviors.clone().unwrap_or_default();
1657        out.push_str(&render_inline("CacheBehaviors", &cb));
1658        let cer = d.config.custom_error_responses.clone().unwrap_or_default();
1659        out.push_str(&render_inline("CustomErrorResponses", &cer));
1660        out.push_str(&format!("<Comment>{}</Comment>", esc(&d.config.comment)));
1661        out.push_str(&format!(
1662            "<PriceClass>{}</PriceClass>",
1663            esc(&d
1664                .config
1665                .price_class
1666                .clone()
1667                .unwrap_or_else(|| "PriceClass_All".to_string()))
1668        ));
1669        out.push_str(&format!("<Enabled>{}</Enabled>", d.config.enabled));
1670        out.push_str(&render_inline(
1671            "ViewerCertificate",
1672            &d.config.viewer_certificate.clone().unwrap_or_default(),
1673        ));
1674        out.push_str(&render_inline(
1675            "Restrictions",
1676            &d.config.restrictions.clone().unwrap_or_default(),
1677        ));
1678        out.push_str(&format!(
1679            "<WebACLId>{}</WebACLId>",
1680            esc(&d.config.web_acl_id.clone().unwrap_or_default())
1681        ));
1682        out.push_str(&format!(
1683            "<HttpVersion>{}</HttpVersion>",
1684            esc(&d
1685                .config
1686                .http_version
1687                .clone()
1688                .unwrap_or_else(|| "http2".to_string()))
1689        ));
1690        out.push_str(&format!(
1691            "<IsIPV6Enabled>{}</IsIPV6Enabled>",
1692            d.config.is_ipv6_enabled.unwrap_or(true)
1693        ));
1694        out.push_str("<Staging>false</Staging>");
1695        out.push_str("</DistributionSummary>");
1696    }
1697    out.push_str("</Items>");
1698    out.push_str(&format!("</{root}>"));
1699    out
1700}
1701
1702fn build_empty_distribution_id_list(root: &str) -> String {
1703    let mut out = String::with_capacity(256);
1704    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1705    out.push_str(&format!("<{root} xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1706    out.push_str("<Marker></Marker>");
1707    out.push_str("<MaxItems>100</MaxItems>");
1708    out.push_str("<IsTruncated>false</IsTruncated>");
1709    out.push_str("<Quantity>0</Quantity>");
1710    out.push_str(&format!("</{root}>"));
1711    out
1712}
1713
1714fn build_invalidation_xml(inv: &StoredInvalidation) -> String {
1715    let mut out = String::with_capacity(512);
1716    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1717    out.push_str(&format!(
1718        "<Invalidation xmlns=\"{ns}\">",
1719        ns = crate::NAMESPACE
1720    ));
1721    out.push_str(&format!("<Id>{}</Id>", esc(&inv.id)));
1722    out.push_str(&format!("<Status>{}</Status>", esc(&inv.status)));
1723    out.push_str(&format!(
1724        "<CreateTime>{}</CreateTime>",
1725        rfc3339(&inv.create_time)
1726    ));
1727    out.push_str(&render_inline("InvalidationBatch", &inv.batch));
1728    out.push_str("</Invalidation>");
1729    out
1730}
1731
1732fn build_invalidation_list_xml(items: &[&StoredInvalidation]) -> String {
1733    let mut out = String::with_capacity(1024);
1734    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1735    out.push_str(&format!(
1736        "<InvalidationList xmlns=\"{ns}\">",
1737        ns = crate::NAMESPACE
1738    ));
1739    out.push_str("<Marker></Marker>");
1740    out.push_str("<MaxItems>100</MaxItems>");
1741    out.push_str("<IsTruncated>false</IsTruncated>");
1742    out.push_str(&format!("<Quantity>{}</Quantity>", items.len()));
1743    if !items.is_empty() {
1744        out.push_str("<Items>");
1745        for inv in items {
1746            out.push_str("<InvalidationSummary>");
1747            out.push_str(&format!("<Id>{}</Id>", esc(&inv.id)));
1748            out.push_str(&format!(
1749                "<CreateTime>{}</CreateTime>",
1750                rfc3339(&inv.create_time)
1751            ));
1752            out.push_str(&format!("<Status>{}</Status>", esc(&inv.status)));
1753            out.push_str("</InvalidationSummary>");
1754        }
1755        out.push_str("</Items>");
1756    }
1757    out.push_str("</InvalidationList>");
1758    out
1759}
1760
1761fn build_tags_xml(tags: &[Tag]) -> String {
1762    let mut out = String::with_capacity(256);
1763    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1764    out.push_str(&format!("<Tags xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1765    out.push_str("<Items>");
1766    for t in tags {
1767        out.push_str("<Tag>");
1768        out.push_str(&format!("<Key>{}</Key>", esc(&t.key)));
1769        if let Some(v) = &t.value {
1770            out.push_str(&format!("<Value>{}</Value>", esc(v)));
1771        }
1772        out.push_str("</Tag>");
1773    }
1774    out.push_str("</Items>");
1775    out.push_str("</Tags>");
1776    out
1777}
1778
1779fn render_inline<T: serde::Serialize>(root: &str, value: &T) -> String {
1780    quick_xml::se::to_string_with_root(root, value).unwrap_or_default()
1781}
1782
1783// ─── Helpers ──────────────────────────────────────────────────────────
1784
1785fn validate_caller_reference(s: &str) -> Result<(), AwsServiceError> {
1786    if s.is_empty() {
1787        return Err(invalid_argument("CallerReference is required"));
1788    }
1789    Ok(())
1790}
1791
1792fn validate_origins(config: &DistributionConfig) -> Result<(), AwsServiceError> {
1793    if config.origins.quantity < 1 {
1794        return Err(invalid_argument(
1795            "DistributionConfig.Origins must contain at least one origin",
1796        ));
1797    }
1798    Ok(())
1799}
1800
1801/// Compare two `DistributionConfig`s by serializing them to canonical XML
1802/// and comparing bytes. Used by `UpdateDistribution` to detect no-op writes
1803/// so the ETag stays stable when the caller PUTs the same config back.
1804/// Falls back to "not equal" if either serialization fails so we still
1805/// honor the request rather than silently swallow a write.
1806fn configs_equal(lhs: &DistributionConfig, rhs: &DistributionConfig) -> bool {
1807    let Ok(a) = xml_io::to_xml_root("DistributionConfig", lhs) else {
1808        return false;
1809    };
1810    let Ok(b) = xml_io::to_xml_root("DistributionConfig", rhs) else {
1811        return false;
1812    };
1813    a == b
1814}
1815
1816fn account_id(_req: &AwsRequest) -> &'static str {
1817    // Multi-account is wired through AwsRequest.account_id elsewhere; the
1818    // CloudFront control plane only uses the resolved id for the ARN
1819    // suffix. Until that field stabilizes for REST-XML we use the default
1820    // account ID consistently with the rest of the registered services.
1821    DEFAULT_ACCOUNT
1822}
1823
1824fn generate_distribution_id() -> String {
1825    // CloudFront IDs are 14-char base32-ish uppercase strings starting with E.
1826    let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1827    format!("E{}", &raw[..13])
1828}
1829
1830fn generate_invalidation_id() -> String {
1831    let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1832    format!("I{}", &raw[..13])
1833}
1834
1835pub(crate) fn generate_etag() -> String {
1836    let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1837    format!("E{}", &raw[..13])
1838}
1839
1840/// Generate an AWS-shaped CloudFront resource ID with the given prefix.
1841/// Used by Batch 2 policy resources (cache, origin request, response
1842/// headers, continuous deployment, OAC) so each gets a recognizable
1843/// alphabetic prefix in addition to the random suffix.
1844pub(crate) fn generate_id_with_prefix(prefix: &str) -> String {
1845    let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1846    let suffix_len = 14usize.saturating_sub(prefix.len()).min(raw.len());
1847    format!("{prefix}{}", &raw[..suffix_len])
1848}
1849
1850fn rfc3339(t: &chrono::DateTime<chrono::Utc>) -> String {
1851    t.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
1852}
1853
1854pub(crate) fn invalid_argument(msg: impl Into<String>) -> AwsServiceError {
1855    aws_error(StatusCode::BAD_REQUEST, "InvalidArgument", msg)
1856}
1857
1858fn no_such_distribution(id: &str) -> AwsServiceError {
1859    aws_error(
1860        StatusCode::NOT_FOUND,
1861        "NoSuchDistribution",
1862        format!("The specified distribution does not exist: {id}"),
1863    )
1864}
1865
1866fn no_such_invalidation(id: &str) -> AwsServiceError {
1867    aws_error(
1868        StatusCode::NOT_FOUND,
1869        "NoSuchInvalidation",
1870        format!("The specified invalidation does not exist: {id}"),
1871    )
1872}
1873
1874fn internal_error(msg: impl Into<String>) -> AwsServiceError {
1875    aws_error(StatusCode::INTERNAL_SERVER_ERROR, "InternalError", msg)
1876}
1877
1878pub(crate) fn aws_error(
1879    status: StatusCode,
1880    code: impl Into<String>,
1881    msg: impl Into<String>,
1882) -> AwsServiceError {
1883    AwsServiceError::aws_error(status, code.into(), msg)
1884}
1885
1886fn set_header(headers: &mut HeaderMap, name: HeaderName, value: &str) {
1887    if let Ok(v) = HeaderValue::from_str(value) {
1888        headers.insert(name, v);
1889    }
1890}
1891
1892pub(crate) fn xml_response(status: StatusCode, body: String, headers: HeaderMap) -> AwsResponse {
1893    AwsResponse {
1894        status,
1895        content_type: "text/xml".to_string(),
1896        body: ResponseBody::Bytes(Bytes::from(body)),
1897        headers,
1898    }
1899}
1900
1901fn empty_response(status: StatusCode) -> AwsResponse {
1902    AwsResponse {
1903        status,
1904        content_type: "text/xml".to_string(),
1905        body: ResponseBody::Bytes(Bytes::new()),
1906        headers: HeaderMap::new(),
1907    }
1908}
1909
1910fn percent_decode(input: &str) -> String {
1911    let mut out = String::with_capacity(input.len());
1912    let bytes = input.as_bytes();
1913    let mut i = 0;
1914    while i < bytes.len() {
1915        let b = bytes[i];
1916        if b == b'%' && i + 2 < bytes.len() {
1917            if let (Some(a), Some(c)) = (hex_digit(bytes[i + 1]), hex_digit(bytes[i + 2])) {
1918                out.push(((a << 4) | c) as char);
1919                i += 3;
1920                continue;
1921            }
1922        }
1923        if b == b'+' {
1924            out.push(' ');
1925        } else {
1926            out.push(b as char);
1927        }
1928        i += 1;
1929    }
1930    out
1931}
1932
1933fn hex_digit(b: u8) -> Option<u8> {
1934    match b {
1935        b'0'..=b'9' => Some(b - b'0'),
1936        b'a'..=b'f' => Some(b - b'a' + 10),
1937        b'A'..=b'F' => Some(b - b'A' + 10),
1938        _ => None,
1939    }
1940}
1941
1942/// True when a URL-decoded label segment looks like an unsubstituted Smithy
1943/// URI placeholder (e.g. `{Identifier}` or its percent-encoded form
1944/// `%7BIdentifier%7D`). Conformance probes that omit a required `@httpLabel`
1945/// input leave the literal placeholder in the URL; handlers that can
1946/// short-circuit on it return the declared validation error instead of a
1947/// fabricated 200.
1948pub(crate) fn is_placeholder_label(value: &str) -> bool {
1949    if value.is_empty() {
1950        return true;
1951    }
1952    let lower = value.to_ascii_lowercase();
1953    value.starts_with('{') || lower.starts_with("%7b")
1954}
1955
1956/// Best-effort field extractor for request bodies the probe sends as JSON
1957/// even when the service is REST-XML. Walks the body trying JSON first,
1958/// then a naive XML element scan. Returns the value for the first occurrence
1959/// of `key`. Used by handlers that need to validate optional/required body
1960/// members up-front (enum bounds, length, presence) without committing to a
1961/// full strongly-typed parse — the actual handler still uses the typed XML
1962/// parser for the structured fields it consumes.
1963pub(crate) fn extract_body_field(body: &[u8], key: &str) -> Option<String> {
1964    if let Ok(s) = std::str::from_utf8(body) {
1965        let trimmed = s.trim_start();
1966        if trimmed.starts_with('{') {
1967            if let Ok(v) = serde_json::from_str::<serde_json::Value>(trimmed) {
1968                if let Some(field) = v.get(key) {
1969                    return match field {
1970                        serde_json::Value::String(s) => Some(s.clone()),
1971                        serde_json::Value::Number(n) => Some(n.to_string()),
1972                        serde_json::Value::Bool(b) => Some(b.to_string()),
1973                        _ => None,
1974                    };
1975                }
1976                return None;
1977            }
1978        }
1979        // Fall back to a naive XML extraction for genuine XML bodies.
1980        let open = format!("<{key}>");
1981        let close = format!("</{key}>");
1982        if let Some(start) = s.find(&open) {
1983            let after = start + open.len();
1984            if let Some(end_rel) = s[after..].find(&close) {
1985                return Some(s[after..after + end_rel].to_string());
1986            }
1987        }
1988    }
1989    None
1990}
1991
1992fn parse_query_value(query: &str, key: &str) -> Option<String> {
1993    let prefix = format!("{key}=");
1994    for pair in query.split('&').filter(|p| !p.is_empty()) {
1995        if let Some(rest) = pair.strip_prefix(&prefix) {
1996            return Some(percent_decode(rest));
1997        }
1998    }
1999    None
2000}
2001
2002#[cfg(test)]
2003mod tests {
2004    use super::*;
2005
2006    #[test]
2007    fn placeholder_label_detects_braces_and_percent_encoding() {
2008        assert!(is_placeholder_label(""));
2009        assert!(is_placeholder_label("{Identifier}"));
2010        assert!(is_placeholder_label("%7BIdentifier%7D"));
2011        assert!(is_placeholder_label("%7bidentifier%7d"));
2012        assert!(!is_placeholder_label("E1234567890ABC"));
2013        assert!(!is_placeholder_label(
2014            "arn:aws:cloudfront::000:distribution/E1"
2015        ));
2016    }
2017
2018    #[test]
2019    fn extract_body_field_handles_json_and_xml() {
2020        let json = br#"{"Stage":"BROKEN","Marker":"x"}"#;
2021        assert_eq!(
2022            extract_body_field(json, "Stage"),
2023            Some("BROKEN".to_string())
2024        );
2025        assert_eq!(extract_body_field(json, "MaxItems"), None);
2026
2027        let xml = br#"<?xml version="1.0"?><Body><Domain>example.com</Domain></Body>"#;
2028        assert_eq!(
2029            extract_body_field(xml, "Domain"),
2030            Some("example.com".to_string())
2031        );
2032        assert_eq!(extract_body_field(xml, "Missing"), None);
2033
2034        assert_eq!(extract_body_field(b"", "x"), None);
2035    }
2036
2037    fn make_state() -> SharedCloudFrontState {
2038        Arc::new(RwLock::new(CloudFrontAccounts::new()))
2039    }
2040
2041    fn make_request(method: http::Method, path: &str, query: &str, body: &str) -> AwsRequest {
2042        AwsRequest {
2043            service: "cloudfront".into(),
2044            action: String::new(),
2045            region: "us-east-1".into(),
2046            account_id: DEFAULT_ACCOUNT.into(),
2047            request_id: Uuid::new_v4().to_string(),
2048            headers: HeaderMap::new(),
2049            query_params: std::collections::HashMap::new(),
2050            body_stream: parking_lot::Mutex::new(None),
2051            body: Bytes::from(body.to_string()),
2052            path_segments: path
2053                .split('/')
2054                .filter(|s| !s.is_empty())
2055                .map(String::from)
2056                .collect(),
2057            raw_path: path.into(),
2058            raw_query: query.into(),
2059            method,
2060            is_query_protocol: false,
2061            access_key_id: None,
2062            principal: None,
2063        }
2064    }
2065
2066    fn minimal_dist_config_xml(caller_ref: &str) -> String {
2067        format!(
2068            r#"<?xml version="1.0" encoding="UTF-8"?>
2069<DistributionConfig xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2070  <CallerReference>{caller_ref}</CallerReference>
2071  <Origins>
2072    <Quantity>1</Quantity>
2073    <Items>
2074      <Origin>
2075        <Id>primary</Id>
2076        <DomainName>example.com</DomainName>
2077      </Origin>
2078    </Items>
2079  </Origins>
2080  <DefaultCacheBehavior>
2081    <TargetOriginId>primary</TargetOriginId>
2082    <ViewerProtocolPolicy>allow-all</ViewerProtocolPolicy>
2083  </DefaultCacheBehavior>
2084  <Comment></Comment>
2085  <Enabled>true</Enabled>
2086</DistributionConfig>"#
2087        )
2088    }
2089
2090    #[tokio::test]
2091    async fn create_then_get_then_delete_distribution() {
2092        let svc = CloudFrontService::new(make_state());
2093        let body = minimal_dist_config_xml("ref-1");
2094        let create = svc
2095            .handle(make_request(
2096                http::Method::POST,
2097                "/2020-05-31/distribution",
2098                "",
2099                &body,
2100            ))
2101            .await
2102            .unwrap();
2103        assert_eq!(create.status, StatusCode::CREATED);
2104        let etag = create
2105            .headers
2106            .get(ETAG)
2107            .unwrap()
2108            .to_str()
2109            .unwrap()
2110            .to_string();
2111        let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2112        let id = xml
2113            .split("<Id>")
2114            .nth(1)
2115            .unwrap()
2116            .split("</Id>")
2117            .next()
2118            .unwrap()
2119            .to_string();
2120
2121        let get = svc
2122            .handle(make_request(
2123                http::Method::GET,
2124                &format!("/2020-05-31/distribution/{id}"),
2125                "",
2126                "",
2127            ))
2128            .await
2129            .unwrap();
2130        assert_eq!(get.status, StatusCode::OK);
2131
2132        // Disable then delete (CloudFront requires Disabled before delete).
2133        let disable_body = body.replace("<Enabled>true</Enabled>", "<Enabled>false</Enabled>");
2134        let mut update_req = make_request(
2135            http::Method::PUT,
2136            &format!("/2020-05-31/distribution/{id}/config"),
2137            "",
2138            &disable_body,
2139        );
2140        update_req.headers.insert(IF_MATCH, etag.parse().unwrap());
2141        let updated = svc.handle(update_req).await.unwrap();
2142        assert_eq!(updated.status, StatusCode::OK);
2143        let new_etag = updated
2144            .headers
2145            .get(ETAG)
2146            .unwrap()
2147            .to_str()
2148            .unwrap()
2149            .to_string();
2150
2151        let mut del_req = make_request(
2152            http::Method::DELETE,
2153            &format!("/2020-05-31/distribution/{id}"),
2154            "",
2155            "",
2156        );
2157        del_req.headers.insert(IF_MATCH, new_etag.parse().unwrap());
2158        let del = svc.handle(del_req).await.unwrap();
2159        assert_eq!(del.status, StatusCode::NO_CONTENT);
2160    }
2161
2162    async fn create_distribution_returning_id(svc: &CloudFrontService, caller_ref: &str) -> String {
2163        let body = minimal_dist_config_xml(caller_ref);
2164        let create = svc
2165            .handle(make_request(
2166                http::Method::POST,
2167                "/2020-05-31/distribution",
2168                "",
2169                &body,
2170            ))
2171            .await
2172            .unwrap();
2173        let xml = std::str::from_utf8(create.body.expect_bytes())
2174            .unwrap()
2175            .to_string();
2176        xml.split("<Id>")
2177            .nth(1)
2178            .unwrap()
2179            .split("</Id>")
2180            .next()
2181            .unwrap()
2182            .to_string()
2183    }
2184
2185    fn distribution_status(svc: &CloudFrontService, id: &str) -> String {
2186        let state = svc.state.read();
2187        state
2188            .accounts
2189            .get(DEFAULT_ACCOUNT)
2190            .and_then(|a| a.distributions.get(id))
2191            .map(|d| d.status.clone())
2192            .unwrap_or_default()
2193    }
2194
2195    #[tokio::test]
2196    async fn create_distribution_starts_in_progress() {
2197        // Use a long delay so the auto-tick can't race the assertion.
2198        let svc = CloudFrontService::new(make_state())
2199            .with_propagation_delay(std::time::Duration::from_secs(60));
2200        let body = minimal_dist_config_xml("status-ref");
2201        let create = svc
2202            .handle(make_request(
2203                http::Method::POST,
2204                "/2020-05-31/distribution",
2205                "",
2206                &body,
2207            ))
2208            .await
2209            .unwrap();
2210        let xml = std::str::from_utf8(create.body.expect_bytes())
2211            .unwrap()
2212            .to_string();
2213        assert!(
2214            xml.contains("<Status>InProgress</Status>"),
2215            "expected initial status InProgress, got: {xml}"
2216        );
2217    }
2218
2219    #[tokio::test]
2220    async fn auto_transition_after_tick_marks_deployed() {
2221        let svc = CloudFrontService::new(make_state())
2222            .with_propagation_delay(std::time::Duration::from_millis(50));
2223        let id = create_distribution_returning_id(&svc, "auto-tick-ref").await;
2224        assert_eq!(distribution_status(&svc, &id), "InProgress");
2225        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2226        assert_eq!(distribution_status(&svc, &id), "Deployed");
2227    }
2228
2229    #[tokio::test]
2230    async fn set_distribution_status_via_admin_flips_synchronously() {
2231        let svc = CloudFrontService::new(make_state())
2232            .with_propagation_delay(std::time::Duration::from_secs(60));
2233        let id = create_distribution_returning_id(&svc, "admin-ref").await;
2234        assert_eq!(distribution_status(&svc, &id), "InProgress");
2235        assert!(svc.set_distribution_status(&id, "Deployed"));
2236        assert_eq!(distribution_status(&svc, &id), "Deployed");
2237        assert!(svc.set_distribution_status(&id, "InProgress"));
2238        assert_eq!(distribution_status(&svc, &id), "InProgress");
2239    }
2240
2241    #[tokio::test]
2242    async fn set_distribution_status_unknown_id_returns_false() {
2243        let svc = CloudFrontService::new(make_state());
2244        assert!(!svc.set_distribution_status("E-DOES-NOT-EXIST", "Deployed"));
2245    }
2246
2247    #[tokio::test]
2248    async fn update_distribution_resets_to_in_progress() {
2249        let svc = CloudFrontService::new(make_state())
2250            .with_propagation_delay(std::time::Duration::from_secs(60));
2251        let body = minimal_dist_config_xml("update-reset-ref");
2252        let create = svc
2253            .handle(make_request(
2254                http::Method::POST,
2255                "/2020-05-31/distribution",
2256                "",
2257                &body,
2258            ))
2259            .await
2260            .unwrap();
2261        let etag = create
2262            .headers
2263            .get(ETAG)
2264            .unwrap()
2265            .to_str()
2266            .unwrap()
2267            .to_string();
2268        let xml = std::str::from_utf8(create.body.expect_bytes())
2269            .unwrap()
2270            .to_string();
2271        let id = xml
2272            .split("<Id>")
2273            .nth(1)
2274            .unwrap()
2275            .split("</Id>")
2276            .next()
2277            .unwrap()
2278            .to_string();
2279        // Force the distribution to Deployed via the admin mutator so we can
2280        // observe the UpdateDistribution flip back to InProgress.
2281        assert!(svc.set_distribution_status(&id, "Deployed"));
2282        assert_eq!(distribution_status(&svc, &id), "Deployed");
2283
2284        let updated_body = body.replace(
2285            "<ViewerProtocolPolicy>allow-all</ViewerProtocolPolicy>",
2286            "<ViewerProtocolPolicy>https-only</ViewerProtocolPolicy>",
2287        );
2288        let mut update_req = make_request(
2289            http::Method::PUT,
2290            &format!("/2020-05-31/distribution/{id}/config"),
2291            "",
2292            &updated_body,
2293        );
2294        update_req.headers.insert(IF_MATCH, etag.parse().unwrap());
2295        let updated = svc.handle(update_req).await.unwrap();
2296        assert_eq!(updated.status, StatusCode::OK);
2297        assert_eq!(distribution_status(&svc, &id), "InProgress");
2298    }
2299
2300    #[tokio::test]
2301    async fn duplicate_caller_reference_is_rejected() {
2302        let svc = CloudFrontService::new(make_state());
2303        let body = minimal_dist_config_xml("dup-ref");
2304        svc.handle(make_request(
2305            http::Method::POST,
2306            "/2020-05-31/distribution",
2307            "",
2308            &body,
2309        ))
2310        .await
2311        .unwrap();
2312        let result = svc
2313            .handle(make_request(
2314                http::Method::POST,
2315                "/2020-05-31/distribution",
2316                "",
2317                &body,
2318            ))
2319            .await;
2320        let err = match result {
2321            Ok(_) => panic!("expected duplicate caller-reference to fail"),
2322            Err(e) => e,
2323        };
2324        assert_eq!(err.code(), "DistributionAlreadyExists");
2325        assert_eq!(err.status(), StatusCode::CONFLICT);
2326    }
2327
2328    #[tokio::test]
2329    async fn invalidation_lifecycle() {
2330        let svc = CloudFrontService::new(make_state());
2331        let body = minimal_dist_config_xml("inv-ref");
2332        let create = svc
2333            .handle(make_request(
2334                http::Method::POST,
2335                "/2020-05-31/distribution",
2336                "",
2337                &body,
2338            ))
2339            .await
2340            .unwrap();
2341        let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2342        let dist_id = xml
2343            .split("<Id>")
2344            .nth(1)
2345            .unwrap()
2346            .split("</Id>")
2347            .next()
2348            .unwrap();
2349        let inv_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2350<InvalidationBatch xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2351  <Paths><Quantity>1</Quantity><Items><Path>/*</Path></Items></Paths>
2352  <CallerReference>inv-1</CallerReference>
2353</InvalidationBatch>"#;
2354        let inv_resp = svc
2355            .handle(make_request(
2356                http::Method::POST,
2357                &format!("/2020-05-31/distribution/{dist_id}/invalidation"),
2358                "",
2359                inv_body,
2360            ))
2361            .await
2362            .unwrap();
2363        assert_eq!(inv_resp.status, StatusCode::CREATED);
2364        let inv_xml = std::str::from_utf8(inv_resp.body.expect_bytes()).unwrap();
2365        let inv_id = inv_xml
2366            .split("<Id>")
2367            .nth(1)
2368            .unwrap()
2369            .split("</Id>")
2370            .next()
2371            .unwrap();
2372        let get = svc
2373            .handle(make_request(
2374                http::Method::GET,
2375                &format!("/2020-05-31/distribution/{dist_id}/invalidation/{inv_id}"),
2376                "",
2377                "",
2378            ))
2379            .await
2380            .unwrap();
2381        assert_eq!(get.status, StatusCode::OK);
2382        let list = svc
2383            .handle(make_request(
2384                http::Method::GET,
2385                &format!("/2020-05-31/distribution/{dist_id}/invalidation"),
2386                "",
2387                "",
2388            ))
2389            .await
2390            .unwrap();
2391        let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2392        assert!(xml.contains("<Quantity>1</Quantity>"));
2393    }
2394
2395    #[tokio::test]
2396    async fn tags_roundtrip() {
2397        let svc = CloudFrontService::new(make_state());
2398        let body = minimal_dist_config_xml("tag-ref");
2399        let create = svc
2400            .handle(make_request(
2401                http::Method::POST,
2402                "/2020-05-31/distribution",
2403                "",
2404                &body,
2405            ))
2406            .await
2407            .unwrap();
2408        let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2409        let arn = xml
2410            .split("<ARN>")
2411            .nth(1)
2412            .unwrap()
2413            .split("</ARN>")
2414            .next()
2415            .unwrap();
2416        let tag_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2417<Tags xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2418  <Items><Tag><Key>env</Key><Value>prod</Value></Tag></Items>
2419</Tags>"#;
2420        let arn_q = format!("Operation=Tag&Resource={}", arn);
2421        let resp = svc
2422            .handle(make_request(
2423                http::Method::POST,
2424                "/2020-05-31/tagging",
2425                &arn_q,
2426                tag_body,
2427            ))
2428            .await
2429            .unwrap();
2430        assert_eq!(resp.status, StatusCode::NO_CONTENT);
2431        let list = svc
2432            .handle(make_request(
2433                http::Method::GET,
2434                "/2020-05-31/tagging",
2435                &format!("Resource={}", arn),
2436                "",
2437            ))
2438            .await
2439            .unwrap();
2440        let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2441        assert!(xml.contains("<Key>env</Key>"));
2442        assert!(xml.contains("<Value>prod</Value>"));
2443    }
2444
2445    #[tokio::test]
2446    async fn xml_metacharacters_in_user_input_are_escaped() {
2447        let svc = CloudFrontService::new(make_state());
2448        let body = minimal_dist_config_xml("escape-ref").replace(
2449            "<Comment></Comment>",
2450            "<Comment><![CDATA[a&b<c>d]]></Comment>",
2451        );
2452        let create = svc
2453            .handle(make_request(
2454                http::Method::POST,
2455                "/2020-05-31/distribution",
2456                "",
2457                &body,
2458            ))
2459            .await
2460            .unwrap();
2461        let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2462        let dist_id = xml
2463            .split("<Id>")
2464            .nth(1)
2465            .unwrap()
2466            .split("</Id>")
2467            .next()
2468            .unwrap();
2469        let arn = xml
2470            .split("<ARN>")
2471            .nth(1)
2472            .unwrap()
2473            .split("</ARN>")
2474            .next()
2475            .unwrap();
2476
2477        let tag_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2478<Tags xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2479  <Items><Tag><Key>env</Key><Value>a&amp;b&lt;c&gt;d</Value></Tag></Items>
2480</Tags>"#;
2481        let arn_q = format!("Operation=Tag&Resource={}", arn);
2482        svc.handle(make_request(
2483            http::Method::POST,
2484            "/2020-05-31/tagging",
2485            &arn_q,
2486            tag_body,
2487        ))
2488        .await
2489        .unwrap();
2490
2491        let list = svc
2492            .handle(make_request(
2493                http::Method::GET,
2494                "/2020-05-31/tagging",
2495                &format!("Resource={}", arn),
2496                "",
2497            ))
2498            .await
2499            .unwrap();
2500        let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2501        assert!(xml.contains("<Value>a&amp;b&lt;c&gt;d</Value>"));
2502        assert!(!xml.contains("<Value>a&b<c>d</Value>"));
2503
2504        // Force the distribution into the list-rendering path so the
2505        // unescaped Comment field would surface there.
2506        let list_resp = svc
2507            .handle(make_request(
2508                http::Method::GET,
2509                "/2020-05-31/distribution",
2510                "",
2511                "",
2512            ))
2513            .await
2514            .unwrap();
2515        let xml = std::str::from_utf8(list_resp.body.expect_bytes()).unwrap();
2516        // Ensure raw `<` from the user-supplied comment never lands inside
2517        // a Comment element on the wire.
2518        assert!(!xml.contains("<Comment>a&b<c>d"));
2519        assert!(xml.contains(dist_id));
2520    }
2521}