Skip to main content

fakecloud_cloudfront/
service.rs

1//! CloudFront REST-XML service implementation.
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use chrono::Utc;
8use http::header::{HeaderName, HeaderValue, ETAG, IF_MATCH, LOCATION};
9use http::{HeaderMap, StatusCode};
10use parking_lot::RwLock;
11use tokio::sync::Mutex as AsyncMutex;
12use uuid::Uuid;
13
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError, ResponseBody};
15use fakecloud_persistence::SnapshotStore;
16
17use crate::model::{
18    DistributionConfig, DistributionConfigWithTags, InvalidationBatch, TagKeys, Tags as ModelTags,
19};
20use crate::router::{route, Route};
21use crate::state::{
22    CloudFrontAccounts, CloudFrontSnapshot, SharedCloudFrontState, StoredDistribution,
23    StoredInvalidation, Tag, CLOUDFRONT_SNAPSHOT_SCHEMA_VERSION,
24};
25use crate::xml_io;
26
27/// CloudFront mutating actions share these prefixes; everything else
28/// (`Get*`/`List*`/`Describe*`/`Test*`/`Verify*`) is read-only. Used to decide
29/// when a handled request must trigger a persistence snapshot.
30fn is_mutating_action(action: &str) -> bool {
31    const PREFIXES: &[&str] = &[
32        "Create",
33        "Update",
34        "Delete",
35        "Copy",
36        "Associate",
37        "Disassociate",
38        "Tag",
39        "Untag",
40        "Publish",
41        "Put",
42    ];
43    PREFIXES.iter().any(|p| action.starts_with(p))
44}
45
46pub(crate) const DEFAULT_ACCOUNT: &str = "000000000000";
47
48const SUPPORTED_ACTIONS: &[&str] = &[
49    "CreateDistribution",
50    "CreateDistributionWithTags",
51    "GetDistribution",
52    "GetDistributionConfig",
53    "UpdateDistribution",
54    "DeleteDistribution",
55    "ListDistributions",
56    "CopyDistribution",
57    "CreateInvalidation",
58    "GetInvalidation",
59    "ListInvalidations",
60    "TagResource",
61    "UntagResource",
62    "ListTagsForResource",
63    "AssociateAlias",
64    "ListConflictingAliases",
65    "ListDistributionsByCachePolicyId",
66    "ListDistributionsByOriginRequestPolicyId",
67    "ListDistributionsByResponseHeadersPolicyId",
68    "ListDistributionsByKeyGroup",
69    "ListDistributionsByWebACLId",
70    "ListDistributionsByVpcOriginId",
71    "ListDistributionsByAnycastIpListId",
72    "ListDistributionsByConnectionMode",
73    "ListDistributionsByConnectionFunction",
74    "ListDistributionsByOwnedResource",
75    "ListDistributionsByTrustStore",
76    "ListDistributionsByRealtimeLogConfig",
77    "AssociateDistributionWebACL",
78    "DisassociateDistributionWebACL",
79    "CreateOriginAccessControl",
80    "GetOriginAccessControl",
81    "GetOriginAccessControlConfig",
82    "UpdateOriginAccessControl",
83    "DeleteOriginAccessControl",
84    "ListOriginAccessControls",
85    "CreateCachePolicy",
86    "GetCachePolicy",
87    "GetCachePolicyConfig",
88    "UpdateCachePolicy",
89    "DeleteCachePolicy",
90    "ListCachePolicies",
91    "CreateOriginRequestPolicy",
92    "GetOriginRequestPolicy",
93    "GetOriginRequestPolicyConfig",
94    "UpdateOriginRequestPolicy",
95    "DeleteOriginRequestPolicy",
96    "ListOriginRequestPolicies",
97    "CreateResponseHeadersPolicy",
98    "GetResponseHeadersPolicy",
99    "GetResponseHeadersPolicyConfig",
100    "UpdateResponseHeadersPolicy",
101    "DeleteResponseHeadersPolicy",
102    "ListResponseHeadersPolicies",
103    "CreateContinuousDeploymentPolicy",
104    "GetContinuousDeploymentPolicy",
105    "GetContinuousDeploymentPolicyConfig",
106    "UpdateContinuousDeploymentPolicy",
107    "DeleteContinuousDeploymentPolicy",
108    "ListContinuousDeploymentPolicies",
109    "CreateFunction",
110    "DescribeFunction",
111    "GetFunction",
112    "UpdateFunction",
113    "DeleteFunction",
114    "ListFunctions",
115    "PublishFunction",
116    "TestFunction",
117    "CreatePublicKey",
118    "GetPublicKey",
119    "GetPublicKeyConfig",
120    "UpdatePublicKey",
121    "DeletePublicKey",
122    "ListPublicKeys",
123    "CreateKeyGroup",
124    "GetKeyGroup",
125    "GetKeyGroupConfig",
126    "UpdateKeyGroup",
127    "DeleteKeyGroup",
128    "ListKeyGroups",
129    "CreateKeyValueStore",
130    "DescribeKeyValueStore",
131    "UpdateKeyValueStore",
132    "DeleteKeyValueStore",
133    "ListKeyValueStores",
134    "CreateCloudFrontOriginAccessIdentity",
135    "GetCloudFrontOriginAccessIdentity",
136    "GetCloudFrontOriginAccessIdentityConfig",
137    "UpdateCloudFrontOriginAccessIdentity",
138    "DeleteCloudFrontOriginAccessIdentity",
139    "ListCloudFrontOriginAccessIdentities",
140    "CreateMonitoringSubscription",
141    "GetMonitoringSubscription",
142    "DeleteMonitoringSubscription",
143    "CreateStreamingDistribution",
144    "CreateStreamingDistributionWithTags",
145    "GetStreamingDistribution",
146    "GetStreamingDistributionConfig",
147    "UpdateStreamingDistribution",
148    "DeleteStreamingDistribution",
149    "ListStreamingDistributions",
150    "CreateFieldLevelEncryptionConfig",
151    "GetFieldLevelEncryption",
152    "GetFieldLevelEncryptionConfig",
153    "UpdateFieldLevelEncryptionConfig",
154    "DeleteFieldLevelEncryptionConfig",
155    "ListFieldLevelEncryptionConfigs",
156    "CreateFieldLevelEncryptionProfile",
157    "GetFieldLevelEncryptionProfile",
158    "GetFieldLevelEncryptionProfileConfig",
159    "UpdateFieldLevelEncryptionProfile",
160    "DeleteFieldLevelEncryptionProfile",
161    "ListFieldLevelEncryptionProfiles",
162    "CreateRealtimeLogConfig",
163    "GetRealtimeLogConfig",
164    "UpdateRealtimeLogConfig",
165    "DeleteRealtimeLogConfig",
166    "ListRealtimeLogConfigs",
167    "CreateVpcOrigin",
168    "GetVpcOrigin",
169    "UpdateVpcOrigin",
170    "DeleteVpcOrigin",
171    "ListVpcOrigins",
172    "CreateAnycastIpList",
173    "GetAnycastIpList",
174    "UpdateAnycastIpList",
175    "DeleteAnycastIpList",
176    "ListAnycastIpLists",
177    "CreateTrustStore",
178    "GetTrustStore",
179    "UpdateTrustStore",
180    "DeleteTrustStore",
181    "ListTrustStores",
182    "GetResourcePolicy",
183    "PutResourcePolicy",
184    "DeleteResourcePolicy",
185    "CreateConnectionGroup",
186    "GetConnectionGroup",
187    "GetConnectionGroupByRoutingEndpoint",
188    "UpdateConnectionGroup",
189    "DeleteConnectionGroup",
190    "ListConnectionGroups",
191    "ListDomainConflicts",
192    "UpdateDomainAssociation",
193    "VerifyDnsConfiguration",
194    "GetManagedCertificateDetails",
195    "UpdateDistributionWithStagingConfig",
196    "CreateDistributionTenant",
197    "GetDistributionTenant",
198    "GetDistributionTenantByDomain",
199    "UpdateDistributionTenant",
200    "DeleteDistributionTenant",
201    "ListDistributionTenants",
202    "ListDistributionTenantsByCustomization",
203    "AssociateDistributionTenantWebACL",
204    "DisassociateDistributionTenantWebACL",
205    "CreateInvalidationForDistributionTenant",
206    "GetInvalidationForDistributionTenant",
207    "ListInvalidationsForDistributionTenant",
208    "CreateConnectionFunction",
209    "GetConnectionFunction",
210    "DescribeConnectionFunction",
211    "UpdateConnectionFunction",
212    "DeleteConnectionFunction",
213    "ListConnectionFunctions",
214    "PublishConnectionFunction",
215    "TestConnectionFunction",
216];
217
218pub struct CloudFrontService {
219    pub(crate) state: SharedCloudFrontState,
220    /// How long the propagation tick sleeps before flipping a freshly
221    /// created or updated Distribution / DistributionTenant /
222    /// ConnectionGroup / StreamingDistribution from `InProgress` to
223    /// `Deployed`. Real CloudFront takes ~15 minutes; the default of 1s
224    /// keeps SDK-driven integration tests fast while still simulating the
225    /// async transition. Tests can shrink it via
226    /// [`CloudFrontService::with_propagation_delay`], and operators can
227    /// override the default via the
228    /// `FAKECLOUD_CLOUDFRONT_STATUS_DELAY_SEC` env var.
229    pub(crate) propagation_delay: std::time::Duration,
230    pub(crate) snapshot_store: Option<Arc<dyn SnapshotStore>>,
231    pub(crate) snapshot_lock: Arc<AsyncMutex<()>>,
232}
233
234/// Resolve the default `InProgress` -> `Deployed` delay from the
235/// `FAKECLOUD_CLOUDFRONT_STATUS_DELAY_SEC` env var, falling back to 1
236/// second. The value parses as either a non-negative integer (seconds) or
237/// a floating-point number; `0` keeps the transition synchronous, which is
238/// useful for fast unit tests. Invalid input falls back to 1 second so a
239/// typo in the env var can't accidentally hang the process.
240fn default_propagation_delay() -> std::time::Duration {
241    let Ok(raw) = std::env::var("FAKECLOUD_CLOUDFRONT_STATUS_DELAY_SEC") else {
242        return std::time::Duration::from_secs(1);
243    };
244    let trimmed = raw.trim();
245    if let Ok(secs) = trimmed.parse::<u64>() {
246        return std::time::Duration::from_secs(secs);
247    }
248    if let Ok(secs) = trimmed.parse::<f64>() {
249        if secs.is_finite() && secs >= 0.0 {
250            return std::time::Duration::from_secs_f64(secs);
251        }
252    }
253    std::time::Duration::from_secs(1)
254}
255
256impl CloudFrontService {
257    pub fn new(state: SharedCloudFrontState) -> Self {
258        Self {
259            state,
260            propagation_delay: default_propagation_delay(),
261            snapshot_store: None,
262            snapshot_lock: Arc::new(AsyncMutex::new(())),
263        }
264    }
265
266    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
267        self.snapshot_store = Some(store);
268        self
269    }
270
271    pub fn shared_state(&self) -> SharedCloudFrontState {
272        Arc::clone(&self.state)
273    }
274
275    /// Persist current state as a snapshot. Held across the
276    /// clone-serialize-write sequence to prevent stale-last writes, with serde
277    /// + file I/O offloaded to the blocking pool.
278    async fn save_snapshot(&self) {
279        save_cloudfront_snapshot(
280            &self.state,
281            self.snapshot_store.clone(),
282            &self.snapshot_lock,
283        )
284        .await;
285    }
286
287    /// Build a hook that persists the current CloudFront state when invoked, or
288    /// `None` in memory mode. The CloudFormation provisioner mutates `state`
289    /// directly and uses this to write a CFN-provisioned resource through to
290    /// disk, the same way a direct mutating API call would.
291    pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
292        let store = self.snapshot_store.clone()?;
293        let state = self.state.clone();
294        let lock = self.snapshot_lock.clone();
295        Some(Arc::new(move || {
296            let state = state.clone();
297            let store = store.clone();
298            let lock = lock.clone();
299            Box::pin(async move {
300                save_cloudfront_snapshot(&state, Some(store), &lock).await;
301            })
302        }))
303    }
304
305    /// Re-arm the propagation tick for any Distribution / DistributionTenant /
306    /// ConnectionGroup / StreamingDistribution that was still `InProgress` when
307    /// the previous process exited, so they still transition to `Deployed`
308    /// after a restart. Called by the server after loading a snapshot.
309    pub fn rearm_in_progress(&self) {
310        let (dists, tenants, groups, streaming) = {
311            let s = self.state.read();
312            let mut dists = Vec::new();
313            let mut tenants = Vec::new();
314            let mut groups = Vec::new();
315            let mut streaming = Vec::new();
316            for account in s.accounts.values() {
317                for (id, d) in &account.distributions {
318                    if d.status == "InProgress" {
319                        dists.push(id.clone());
320                    }
321                }
322                for (id, t) in &account.distribution_tenants {
323                    if t.status == "InProgress" {
324                        tenants.push(id.clone());
325                    }
326                }
327                for (id, g) in &account.connection_groups {
328                    if g.status == "InProgress" {
329                        groups.push(id.clone());
330                    }
331                }
332                for (id, d) in &account.streaming_distributions {
333                    if d.status == "InProgress" {
334                        streaming.push(id.clone());
335                    }
336                }
337            }
338            (dists, tenants, groups, streaming)
339        };
340        for id in dists {
341            self.schedule_distribution_deploy(id);
342        }
343        for id in tenants {
344            self.schedule_distribution_tenant_deploy(id);
345        }
346        for id in groups {
347            self.schedule_connection_group_deploy(id);
348        }
349        for id in streaming {
350            self.schedule_streaming_distribution_deploy(id);
351        }
352    }
353
354    /// Override the propagation delay used by `CreateDistribution`,
355    /// `UpdateDistribution`, and the equivalent DistributionTenant /
356    /// ConnectionGroup ops. Tests pass a small duration so they don't
357    /// have to wait wall-clock seconds for the `InProgress` -> `Deployed`
358    /// transition.
359    pub fn with_propagation_delay(mut self, delay: std::time::Duration) -> Self {
360        self.propagation_delay = delay;
361        self
362    }
363
364    /// Flip a stored Distribution's status synchronously. Returns `false`
365    /// if no distribution matches `id` across any account. The admin
366    /// endpoint `POST /_fakecloud/cloudfront/distributions/{id}/status`
367    /// calls this so tests can force `InProgress` <-> `Deployed` without
368    /// waiting on the propagation tick.
369    pub fn set_distribution_status(&self, id: &str, status: &str) -> bool {
370        let mut state = self.state.write();
371        for account in state.accounts.values_mut() {
372            if let Some(dist) = account.distributions.get_mut(id) {
373                dist.status = status.to_string();
374                return true;
375            }
376        }
377        false
378    }
379
380    /// Admin-endpoint flavour of [`set_distribution_status`](Self::set_distribution_status)
381    /// that persists the forced status so it survives a restart.
382    pub async fn set_distribution_status_persistent(&self, id: &str, status: &str) -> bool {
383        let changed = self.set_distribution_status(id, status);
384        if changed {
385            self.save_snapshot().await;
386        }
387        changed
388    }
389}
390
391/// Persist the current CloudFront state as a snapshot. Offloads the serde +
392/// blocking file write to the Tokio blocking pool. Noop when `store` is `None`
393/// (memory mode). Shared by `CloudFrontService::save_snapshot`, the propagation
394/// ticks, and the CloudFormation provisioner persist hook so all route through
395/// the same serialize-and-write path.
396pub async fn save_cloudfront_snapshot(
397    state: &SharedCloudFrontState,
398    store: Option<Arc<dyn SnapshotStore>>,
399    lock: &AsyncMutex<()>,
400) {
401    let Some(store) = store else {
402        return;
403    };
404    let _guard = lock.lock().await;
405    let snapshot = CloudFrontSnapshot {
406        schema_version: CLOUDFRONT_SNAPSHOT_SCHEMA_VERSION,
407        accounts: Some(state.read().clone()),
408    };
409    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
410        let bytes = serde_json::to_vec(&snapshot)
411            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
412        store.save(&bytes)
413    })
414    .await;
415    match join {
416        Ok(Ok(())) => {}
417        Ok(Err(err)) => tracing::error!(%err, "failed to write cloudfront snapshot"),
418        Err(err) => tracing::error!(%err, "cloudfront snapshot task panicked"),
419    }
420}
421
422impl Default for CloudFrontService {
423    fn default() -> Self {
424        Self::new(Arc::new(RwLock::new(CloudFrontAccounts::new())))
425    }
426}
427
428#[async_trait]
429impl AwsService for CloudFrontService {
430    fn service_name(&self) -> &str {
431        "cloudfront"
432    }
433
434    fn supported_actions(&self) -> &[&str] {
435        SUPPORTED_ACTIONS
436    }
437
438    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
439        let resolved = match route(&req.method, &req.raw_path, &req.raw_query) {
440            Some(r) => r,
441            None => {
442                return Err(aws_error(
443                    StatusCode::NOT_FOUND,
444                    "InvalidArgument",
445                    format!("Unknown CloudFront route: {} {}", req.method, req.raw_path),
446                ));
447            }
448        };
449
450        let mutates = is_mutating_action(resolved.action);
451        let result = match resolved.action {
452            "CreateDistribution" => self.create_distribution(&req, false),
453            "CreateDistributionWithTags" => self.create_distribution(&req, true),
454            "GetDistribution" => self.get_distribution(&resolved),
455            "GetDistributionConfig" => self.get_distribution_config(&resolved),
456            "UpdateDistribution" => self.update_distribution(&req, &resolved),
457            "DeleteDistribution" => self.delete_distribution(&req, &resolved),
458            "ListDistributions" => self.list_distributions(&req),
459            "CopyDistribution" => self.copy_distribution(&req, &resolved),
460            "CreateInvalidation" => self.create_invalidation(&req, &resolved),
461            "GetInvalidation" => self.get_invalidation(&resolved),
462            "ListInvalidations" => self.list_invalidations(&resolved),
463            "TagResource" => self.tag_resource(&req),
464            "UntagResource" => self.untag_resource(&req),
465            "ListTagsForResource" => self.list_tags_for_resource(&req),
466            "AssociateAlias" => self.associate_alias(&req, &resolved),
467            "ListConflictingAliases" => self.list_conflicting_aliases(&req),
468            "AssociateDistributionWebACL" => self.associate_web_acl(&req, &resolved),
469            "DisassociateDistributionWebACL" => self.disassociate_web_acl(&req, &resolved),
470            "ListDistributionsByCachePolicyId"
471            | "ListDistributionsByOriginRequestPolicyId"
472            | "ListDistributionsByResponseHeadersPolicyId"
473            | "ListDistributionsByKeyGroup"
474            | "ListDistributionsByWebACLId"
475            | "ListDistributionsByVpcOriginId"
476            | "ListDistributionsByAnycastIpListId"
477            | "ListDistributionsByConnectionMode"
478            | "ListDistributionsByConnectionFunction"
479            | "ListDistributionsByOwnedResource"
480            | "ListDistributionsByTrustStore"
481            | "ListDistributionsByRealtimeLogConfig" => {
482                self.list_distributions_by(&req, &resolved, resolved.action)
483            }
484            "CreateOriginAccessControl" => self.create_origin_access_control(&req),
485            "GetOriginAccessControl" => self.get_origin_access_control(&resolved),
486            "GetOriginAccessControlConfig" => self.get_origin_access_control_config(&resolved),
487            "UpdateOriginAccessControl" => self.update_origin_access_control(&req, &resolved),
488            "DeleteOriginAccessControl" => self.delete_origin_access_control(&req, &resolved),
489            "ListOriginAccessControls" => self.list_origin_access_controls(&req),
490            "CreateCachePolicy" => self.create_cache_policy(&req),
491            "GetCachePolicy" => self.get_cache_policy(&resolved),
492            "GetCachePolicyConfig" => self.get_cache_policy_config(&resolved),
493            "UpdateCachePolicy" => self.update_cache_policy(&req, &resolved),
494            "DeleteCachePolicy" => self.delete_cache_policy(&req, &resolved),
495            "ListCachePolicies" => self.list_cache_policies(&req),
496            "CreateOriginRequestPolicy" => self.create_origin_request_policy(&req),
497            "GetOriginRequestPolicy" => self.get_origin_request_policy(&resolved),
498            "GetOriginRequestPolicyConfig" => self.get_origin_request_policy_config(&resolved),
499            "UpdateOriginRequestPolicy" => self.update_origin_request_policy(&req, &resolved),
500            "DeleteOriginRequestPolicy" => self.delete_origin_request_policy(&req, &resolved),
501            "ListOriginRequestPolicies" => self.list_origin_request_policies(&req),
502            "CreateResponseHeadersPolicy" => self.create_response_headers_policy(&req),
503            "GetResponseHeadersPolicy" => self.get_response_headers_policy(&resolved),
504            "GetResponseHeadersPolicyConfig" => self.get_response_headers_policy_config(&resolved),
505            "UpdateResponseHeadersPolicy" => self.update_response_headers_policy(&req, &resolved),
506            "DeleteResponseHeadersPolicy" => self.delete_response_headers_policy(&req, &resolved),
507            "ListResponseHeadersPolicies" => self.list_response_headers_policies(&req),
508            "CreateContinuousDeploymentPolicy" => self.create_continuous_deployment_policy(&req),
509            "GetContinuousDeploymentPolicy" => self.get_continuous_deployment_policy(&resolved),
510            "GetContinuousDeploymentPolicyConfig" => {
511                self.get_continuous_deployment_policy_config(&resolved)
512            }
513            "UpdateContinuousDeploymentPolicy" => {
514                self.update_continuous_deployment_policy(&req, &resolved)
515            }
516            "DeleteContinuousDeploymentPolicy" => {
517                self.delete_continuous_deployment_policy(&req, &resolved)
518            }
519            "ListContinuousDeploymentPolicies" => self.list_continuous_deployment_policies(&req),
520            "CreateFunction" => self.create_function(&req),
521            "DescribeFunction" => self.describe_function(&req, &resolved),
522            "GetFunction" => self.get_function(&req, &resolved),
523            "UpdateFunction" => self.update_function(&req, &resolved),
524            "DeleteFunction" => self.delete_function(&req, &resolved),
525            "ListFunctions" => self.list_functions(&req),
526            "PublishFunction" => self.publish_function(&req, &resolved),
527            "TestFunction" => self.test_function(&req, &resolved),
528            "CreatePublicKey" => self.create_public_key(&req),
529            "GetPublicKey" => self.get_public_key(&resolved),
530            "GetPublicKeyConfig" => self.get_public_key_config(&resolved),
531            "UpdatePublicKey" => self.update_public_key(&req, &resolved),
532            "DeletePublicKey" => self.delete_public_key(&req, &resolved),
533            "ListPublicKeys" => self.list_public_keys(&req),
534            "CreateKeyGroup" => self.create_key_group(&req),
535            "GetKeyGroup" => self.get_key_group(&resolved),
536            "GetKeyGroupConfig" => self.get_key_group_config(&resolved),
537            "UpdateKeyGroup" => self.update_key_group(&req, &resolved),
538            "DeleteKeyGroup" => self.delete_key_group(&req, &resolved),
539            "ListKeyGroups" => self.list_key_groups(&req),
540            "CreateKeyValueStore" => self.create_key_value_store(&req),
541            "DescribeKeyValueStore" => self.describe_key_value_store(&resolved),
542            "UpdateKeyValueStore" => self.update_key_value_store(&req, &resolved),
543            "DeleteKeyValueStore" => self.delete_key_value_store(&req, &resolved),
544            "ListKeyValueStores" => self.list_key_value_stores(&req),
545            "CreateCloudFrontOriginAccessIdentity" => self.create_oai(&req),
546            "GetCloudFrontOriginAccessIdentity" => self.get_oai(&resolved),
547            "GetCloudFrontOriginAccessIdentityConfig" => self.get_oai_config(&resolved),
548            "UpdateCloudFrontOriginAccessIdentity" => self.update_oai(&req, &resolved),
549            "DeleteCloudFrontOriginAccessIdentity" => self.delete_oai(&req, &resolved),
550            "ListCloudFrontOriginAccessIdentities" => self.list_oai(&req),
551            "CreateMonitoringSubscription" => self.create_monitoring_subscription(&req, &resolved),
552            "GetMonitoringSubscription" => self.get_monitoring_subscription(&resolved),
553            "DeleteMonitoringSubscription" => self.delete_monitoring_subscription(&resolved),
554            "CreateStreamingDistribution" => self.create_streaming_distribution(&req, false),
555            "CreateStreamingDistributionWithTags" => self.create_streaming_distribution(&req, true),
556            "GetStreamingDistribution" => self.get_streaming_distribution(&resolved),
557            "GetStreamingDistributionConfig" => self.get_streaming_distribution_config(&resolved),
558            "UpdateStreamingDistribution" => self.update_streaming_distribution(&req, &resolved),
559            "DeleteStreamingDistribution" => self.delete_streaming_distribution(&req, &resolved),
560            "ListStreamingDistributions" => self.list_streaming_distributions(&req),
561            "CreateFieldLevelEncryptionConfig" => self.create_field_level_encryption_config(&req),
562            "GetFieldLevelEncryption" => self.get_field_level_encryption(&resolved),
563            "GetFieldLevelEncryptionConfig" => self.get_field_level_encryption_config(&resolved),
564            "UpdateFieldLevelEncryptionConfig" => {
565                self.update_field_level_encryption_config(&req, &resolved)
566            }
567            "DeleteFieldLevelEncryptionConfig" => {
568                self.delete_field_level_encryption_config(&req, &resolved)
569            }
570            "ListFieldLevelEncryptionConfigs" => self.list_field_level_encryption_configs(&req),
571            "CreateFieldLevelEncryptionProfile" => self.create_field_level_encryption_profile(&req),
572            "GetFieldLevelEncryptionProfile" => self.get_field_level_encryption_profile(&resolved),
573            "GetFieldLevelEncryptionProfileConfig" => {
574                self.get_field_level_encryption_profile_config(&resolved)
575            }
576            "UpdateFieldLevelEncryptionProfile" => {
577                self.update_field_level_encryption_profile(&req, &resolved)
578            }
579            "DeleteFieldLevelEncryptionProfile" => {
580                self.delete_field_level_encryption_profile(&req, &resolved)
581            }
582            "ListFieldLevelEncryptionProfiles" => self.list_field_level_encryption_profiles(&req),
583            "CreateRealtimeLogConfig" => self.create_realtime_log_config(&req),
584            "GetRealtimeLogConfig" => self.get_realtime_log_config(&req),
585            "UpdateRealtimeLogConfig" => self.update_realtime_log_config(&req),
586            "DeleteRealtimeLogConfig" => self.delete_realtime_log_config(&req),
587            "ListRealtimeLogConfigs" => self.list_realtime_log_configs(&req),
588            "CreateVpcOrigin" => self.create_vpc_origin(&req),
589            "GetVpcOrigin" => self.get_vpc_origin(&resolved),
590            "UpdateVpcOrigin" => self.update_vpc_origin(&req, &resolved),
591            "DeleteVpcOrigin" => self.delete_vpc_origin(&req, &resolved),
592            "ListVpcOrigins" => self.list_vpc_origins(&req),
593            "CreateAnycastIpList" => self.create_anycast_ip_list(&req),
594            "GetAnycastIpList" => self.get_anycast_ip_list(&resolved),
595            "UpdateAnycastIpList" => self.update_anycast_ip_list(&req, &resolved),
596            "DeleteAnycastIpList" => self.delete_anycast_ip_list(&req, &resolved),
597            "ListAnycastIpLists" => self.list_anycast_ip_lists(&req),
598            "CreateTrustStore" => self.create_trust_store(&req),
599            "GetTrustStore" => self.get_trust_store(&resolved),
600            "UpdateTrustStore" => self.update_trust_store(&req, &resolved),
601            "DeleteTrustStore" => self.delete_trust_store(&req, &resolved),
602            "ListTrustStores" => self.list_trust_stores(&req),
603            "GetResourcePolicy" => self.get_resource_policy(&req),
604            "PutResourcePolicy" => self.put_resource_policy(&req),
605            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
606            "CreateConnectionGroup" => self.create_connection_group(&req),
607            "GetConnectionGroup" => self.get_connection_group(&resolved),
608            "GetConnectionGroupByRoutingEndpoint" => {
609                self.get_connection_group_by_routing_endpoint(&req)
610            }
611            "UpdateConnectionGroup" => self.update_connection_group(&req, &resolved),
612            "DeleteConnectionGroup" => self.delete_connection_group(&req, &resolved),
613            "ListConnectionGroups" => self.list_connection_groups(&req),
614            "ListDomainConflicts" => self.list_domain_conflicts(&req),
615            "UpdateDomainAssociation" => self.update_domain_association(&req),
616            "VerifyDnsConfiguration" => self.verify_dns_configuration(&req),
617            "GetManagedCertificateDetails" => self.get_managed_certificate_details(&resolved),
618            "UpdateDistributionWithStagingConfig" => {
619                self.update_distribution_with_staging_config(&req, &resolved)
620            }
621            "CreateDistributionTenant" => self.create_distribution_tenant(&req),
622            "GetDistributionTenant" => self.get_distribution_tenant(&resolved),
623            "GetDistributionTenantByDomain" => self.get_distribution_tenant_by_domain(&req),
624            "UpdateDistributionTenant" => self.update_distribution_tenant(&req, &resolved),
625            "DeleteDistributionTenant" => self.delete_distribution_tenant(&req, &resolved),
626            "ListDistributionTenants" => self.list_distribution_tenants(&req),
627            "ListDistributionTenantsByCustomization" => {
628                self.list_distribution_tenants_by_customization(&req)
629            }
630            "AssociateDistributionTenantWebACL" => {
631                self.associate_distribution_tenant_web_acl(&req, &resolved)
632            }
633            "DisassociateDistributionTenantWebACL" => {
634                self.disassociate_distribution_tenant_web_acl(&req, &resolved)
635            }
636            "CreateInvalidationForDistributionTenant" => {
637                self.create_invalidation_for_distribution_tenant(&req, &resolved)
638            }
639            "GetInvalidationForDistributionTenant" => {
640                self.get_invalidation_for_distribution_tenant(&resolved)
641            }
642            "ListInvalidationsForDistributionTenant" => {
643                self.list_invalidations_for_distribution_tenant(&resolved)
644            }
645            "CreateConnectionFunction" => self.create_connection_function(&req),
646            "GetConnectionFunction" => self.get_connection_function(&resolved),
647            "DescribeConnectionFunction" => self.describe_connection_function(&resolved),
648            "UpdateConnectionFunction" => self.update_connection_function(&req, &resolved),
649            "DeleteConnectionFunction" => self.delete_connection_function(&req, &resolved),
650            "ListConnectionFunctions" => self.list_connection_functions(&req),
651            "PublishConnectionFunction" => self.publish_connection_function(&req, &resolved),
652            "TestConnectionFunction" => self.test_connection_function(&req, &resolved),
653            other => Err(aws_error(
654                StatusCode::NOT_IMPLEMENTED,
655                "InvalidAction",
656                format!("CloudFront action {other} is not implemented yet"),
657            )),
658        };
659        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
660            self.save_snapshot().await;
661        }
662        result
663    }
664}
665
666// ─── Distribution handlers ────────────────────────────────────────────
667
668impl CloudFrontService {
669    fn create_distribution(
670        &self,
671        req: &AwsRequest,
672        with_tags: bool,
673    ) -> Result<AwsResponse, AwsServiceError> {
674        let (config, tags) = if with_tags {
675            let parsed: DistributionConfigWithTags = xml_io::from_xml_root(&req.body)
676                .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
677            let tags = parsed
678                .tags
679                .items
680                .map(|i| {
681                    i.tag
682                        .into_iter()
683                        .map(|t| Tag {
684                            key: t.key,
685                            value: t.value,
686                        })
687                        .collect()
688                })
689                .unwrap_or_default();
690            (parsed.distribution_config, tags)
691        } else {
692            let parsed: DistributionConfig = xml_io::from_xml_root(&req.body)
693                .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
694            (parsed, Vec::new())
695        };
696
697        validate_caller_reference(&config.caller_reference)?;
698        validate_origins(&config)?;
699
700        let mut state = self.state.write();
701        let account = state.entry(account_id(req));
702
703        if let Some(existing) = account
704            .distributions
705            .values()
706            .find(|d| d.config.caller_reference == config.caller_reference)
707        {
708            return Err(aws_error(
709                StatusCode::CONFLICT,
710                "DistributionAlreadyExists",
711                format!(
712                    "Distribution with the same CallerReference exists: {}",
713                    existing.id
714                ),
715            ));
716        }
717
718        let id = generate_distribution_id();
719        let now = Utc::now();
720        let etag = generate_etag();
721        let domain = format!("{}.cloudfront.net", id.to_lowercase());
722        let arn = format!(
723            "arn:aws:cloudfront::{}:distribution/{}",
724            account_id(req),
725            id
726        );
727
728        let stored = StoredDistribution {
729            id: id.clone(),
730            arn: arn.clone(),
731            // Real CloudFront returns InProgress immediately and flips to
732            // Deployed ~15 minutes later once the edge propagation completes.
733            // The spawn below mirrors that lifecycle on a configurable delay.
734            status: "InProgress".to_string(),
735            last_modified_time: now,
736            domain_name: domain,
737            in_progress_invalidation_batches: 0,
738            etag: etag.clone(),
739            config,
740        };
741        account.distributions.insert(id.clone(), stored.clone());
742        if !tags.is_empty() {
743            account.tags.insert(arn.clone(), tags);
744        }
745        drop(state);
746
747        self.schedule_distribution_deploy(id.clone());
748
749        let body = build_distribution_xml(&stored);
750        let mut headers = HeaderMap::new();
751        set_header(&mut headers, ETAG, &etag);
752        set_header(&mut headers, LOCATION, &stored.arn);
753        Ok(xml_response(StatusCode::CREATED, body, headers))
754    }
755
756    /// Spawn a tokio task that flips the named distribution from
757    /// `InProgress` to `Deployed` after `propagation_delay`. Used by
758    /// `CreateDistribution` and `UpdateDistribution` to model the real
759    /// CloudFront edge propagation lifecycle.
760    fn schedule_distribution_deploy(&self, id: String) {
761        let state = Arc::clone(&self.state);
762        let delay = self.propagation_delay;
763        let store = self.snapshot_store.clone();
764        let lock = self.snapshot_lock.clone();
765        tokio::spawn(async move {
766            tokio::time::sleep(delay).await;
767            let flipped = {
768                let mut s = state.write();
769                let mut flipped = false;
770                for account in s.accounts.values_mut() {
771                    if let Some(d) = account.distributions.get_mut(&id) {
772                        if d.status == "InProgress" {
773                            d.status = "Deployed".to_string();
774                            flipped = true;
775                        }
776                        break;
777                    }
778                }
779                flipped
780            };
781            if flipped {
782                save_cloudfront_snapshot(&state, store, &lock).await;
783            }
784        });
785    }
786
787    /// Same as [`schedule_distribution_deploy`] but for DistributionTenants.
788    pub(crate) fn schedule_distribution_tenant_deploy(&self, id: String) {
789        let state = Arc::clone(&self.state);
790        let delay = self.propagation_delay;
791        let store = self.snapshot_store.clone();
792        let lock = self.snapshot_lock.clone();
793        tokio::spawn(async move {
794            tokio::time::sleep(delay).await;
795            let flipped = {
796                let mut s = state.write();
797                let mut flipped = false;
798                for account in s.accounts.values_mut() {
799                    if let Some(t) = account.distribution_tenants.get_mut(&id) {
800                        if t.status == "InProgress" {
801                            t.status = "Deployed".to_string();
802                            flipped = true;
803                        }
804                        break;
805                    }
806                }
807                flipped
808            };
809            if flipped {
810                save_cloudfront_snapshot(&state, store, &lock).await;
811            }
812        });
813    }
814
815    /// Same as [`schedule_distribution_deploy`] but for ConnectionGroups.
816    pub(crate) fn schedule_connection_group_deploy(&self, id: String) {
817        let state = Arc::clone(&self.state);
818        let delay = self.propagation_delay;
819        let store = self.snapshot_store.clone();
820        let lock = self.snapshot_lock.clone();
821        tokio::spawn(async move {
822            tokio::time::sleep(delay).await;
823            let flipped = {
824                let mut s = state.write();
825                let mut flipped = false;
826                for account in s.accounts.values_mut() {
827                    if let Some(g) = account.connection_groups.get_mut(&id) {
828                        if g.status == "InProgress" {
829                            g.status = "Deployed".to_string();
830                            flipped = true;
831                        }
832                        break;
833                    }
834                }
835                flipped
836            };
837            if flipped {
838                save_cloudfront_snapshot(&state, store, &lock).await;
839            }
840        });
841    }
842
843    /// Same as [`schedule_distribution_deploy`] but for StreamingDistributions.
844    /// Real CloudFront mirrors the `InProgress` -> `Deployed` lifecycle for
845    /// RTMP streaming distributions, even though the resource is largely
846    /// deprecated.
847    pub(crate) fn schedule_streaming_distribution_deploy(&self, id: String) {
848        let state = Arc::clone(&self.state);
849        let delay = self.propagation_delay;
850        let store = self.snapshot_store.clone();
851        let lock = self.snapshot_lock.clone();
852        tokio::spawn(async move {
853            tokio::time::sleep(delay).await;
854            let flipped = {
855                let mut s = state.write();
856                let mut flipped = false;
857                for account in s.accounts.values_mut() {
858                    if let Some(d) = account.streaming_distributions.get_mut(&id) {
859                        if d.status == "InProgress" {
860                            d.status = "Deployed".to_string();
861                            flipped = true;
862                        }
863                        break;
864                    }
865                }
866                flipped
867            };
868            if flipped {
869                save_cloudfront_snapshot(&state, store, &lock).await;
870            }
871        });
872    }
873
874    fn get_distribution(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
875        let id = route
876            .id
877            .as_deref()
878            .ok_or_else(|| invalid_argument("missing distribution id"))?;
879        let state = self.state.read();
880        let account = state
881            .accounts
882            .get(DEFAULT_ACCOUNT)
883            .ok_or_else(|| no_such_distribution(id))?;
884        let dist = account
885            .distributions
886            .get(id)
887            .ok_or_else(|| no_such_distribution(id))?
888            .clone();
889        drop(state);
890        let body = build_distribution_xml(&dist);
891        let mut headers = HeaderMap::new();
892        set_header(&mut headers, ETAG, &dist.etag);
893        Ok(xml_response(StatusCode::OK, body, headers))
894    }
895
896    fn get_distribution_config(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
897        let id = route
898            .id
899            .as_deref()
900            .ok_or_else(|| invalid_argument("missing distribution id"))?;
901        let state = self.state.read();
902        let account = state
903            .accounts
904            .get(DEFAULT_ACCOUNT)
905            .ok_or_else(|| no_such_distribution(id))?;
906        let dist = account
907            .distributions
908            .get(id)
909            .ok_or_else(|| no_such_distribution(id))?
910            .clone();
911        drop(state);
912        let body = xml_io::to_xml_root("DistributionConfig", &dist.config)
913            .map_err(|e| internal_error(format!("xml encode failed: {e}")))?;
914        let mut headers = HeaderMap::new();
915        set_header(&mut headers, ETAG, &dist.etag);
916        Ok(xml_response(StatusCode::OK, body, headers))
917    }
918
919    fn update_distribution(
920        &self,
921        req: &AwsRequest,
922        route: &Route,
923    ) -> Result<AwsResponse, AwsServiceError> {
924        let id = route
925            .id
926            .as_deref()
927            .ok_or_else(|| invalid_argument("missing distribution id"))?;
928        let if_match = req
929            .headers
930            .get(IF_MATCH)
931            .and_then(|v| v.to_str().ok())
932            .ok_or_else(|| {
933                aws_error(
934                    StatusCode::BAD_REQUEST,
935                    "InvalidIfMatchVersion",
936                    "Missing If-Match header for UpdateDistribution",
937                )
938            })?
939            .to_string();
940        let new_config: DistributionConfig = xml_io::from_xml_root(&req.body)
941            .map_err(|e| invalid_argument(format!("invalid DistributionConfig XML: {e}")))?;
942        validate_caller_reference(&new_config.caller_reference)?;
943        validate_origins(&new_config)?;
944
945        let mut state = self.state.write();
946        let account = state
947            .accounts
948            .get_mut(DEFAULT_ACCOUNT)
949            .ok_or_else(|| no_such_distribution(id))?;
950        let dist = account
951            .distributions
952            .get_mut(id)
953            .ok_or_else(|| no_such_distribution(id))?;
954        if dist.etag != if_match {
955            return Err(aws_error(
956                StatusCode::PRECONDITION_FAILED,
957                "PreconditionFailed",
958                "If-Match header does not match the current ETag",
959            ));
960        }
961        // ETag stability: only bump the ETag and flip status back to
962        // InProgress when the new config actually differs from what we
963        // have on disk. A no-op UpdateDistribution (PUT the same config
964        // back) leaves the ETag intact, matching AWS behavior.
965        let config_changed = !configs_equal(&dist.config, &new_config);
966        if config_changed {
967            dist.config = new_config;
968            dist.etag = generate_etag();
969            dist.last_modified_time = Utc::now();
970            // UpdateDistribution kicks off a fresh edge propagation; AWS
971            // flips the status back to InProgress until the new config
972            // lands.
973            dist.status = "InProgress".to_string();
974        }
975        let snapshot = dist.clone();
976        drop(state);
977
978        if config_changed {
979            self.schedule_distribution_deploy(id.to_string());
980        }
981
982        let body = build_distribution_xml(&snapshot);
983        let mut headers = HeaderMap::new();
984        set_header(&mut headers, ETAG, &snapshot.etag);
985        Ok(xml_response(StatusCode::OK, body, headers))
986    }
987
988    fn delete_distribution(
989        &self,
990        req: &AwsRequest,
991        route: &Route,
992    ) -> Result<AwsResponse, AwsServiceError> {
993        let id = route
994            .id
995            .as_deref()
996            .ok_or_else(|| invalid_argument("missing distribution id"))?;
997        let if_match = req
998            .headers
999            .get(IF_MATCH)
1000            .and_then(|v| v.to_str().ok())
1001            .ok_or_else(|| {
1002                aws_error(
1003                    StatusCode::BAD_REQUEST,
1004                    "InvalidIfMatchVersion",
1005                    "Missing If-Match header for DeleteDistribution",
1006                )
1007            })?
1008            .to_string();
1009        let mut state = self.state.write();
1010        let account = state
1011            .accounts
1012            .get_mut(DEFAULT_ACCOUNT)
1013            .ok_or_else(|| no_such_distribution(id))?;
1014        {
1015            let dist = account
1016                .distributions
1017                .get(id)
1018                .ok_or_else(|| no_such_distribution(id))?;
1019            if dist.etag != if_match {
1020                return Err(aws_error(
1021                    StatusCode::PRECONDITION_FAILED,
1022                    "PreconditionFailed",
1023                    "If-Match header does not match the current ETag",
1024                ));
1025            }
1026            if dist.config.enabled {
1027                return Err(aws_error(
1028                    StatusCode::PRECONDITION_FAILED,
1029                    "DistributionNotDisabled",
1030                    "Distribution must be disabled before delete",
1031                ));
1032            }
1033        }
1034        let removed = account.distributions.remove(id).unwrap();
1035        account.tags.remove(&removed.arn);
1036        Ok(empty_response(StatusCode::NO_CONTENT))
1037    }
1038
1039    fn list_distributions(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1040        let state = self.state.read();
1041        let mut dists: Vec<StoredDistribution> = state
1042            .accounts
1043            .values()
1044            .flat_map(|a| a.distributions.values().cloned())
1045            .collect();
1046        dists.sort_by_key(|a| a.last_modified_time);
1047        drop(state);
1048        let body = build_distribution_list_xml(&dists, "DistributionList");
1049        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1050    }
1051
1052    fn list_distributions_by(
1053        &self,
1054        req: &AwsRequest,
1055        route: &Route,
1056        action: &str,
1057    ) -> Result<AwsResponse, AwsServiceError> {
1058        // Each "by-X" listing has a Smithy-required identifier (path or query)
1059        // plus, for ConnectionMode, an enum-constrained discriminator. The
1060        // synthetic probe sends `negative_omit_*` variants without these, so
1061        // every handler that can short-circuit on a missing identifier must
1062        // do so up-front. Otherwise the empty-list 200 looks like an honest
1063        // pass to the probe and a 100% conformance number is unreachable.
1064        match action {
1065            // Path-id ops: route.id is the URL placeholder. If the probe
1066            // omitted the field, the substitution left the literal
1067            // `{Member}` braces in place — easy to detect.
1068            "ListDistributionsByCachePolicyId"
1069            | "ListDistributionsByOriginRequestPolicyId"
1070            | "ListDistributionsByResponseHeadersPolicyId"
1071            | "ListDistributionsByKeyGroup"
1072            | "ListDistributionsByWebACLId"
1073            | "ListDistributionsByVpcOriginId"
1074            | "ListDistributionsByAnycastIpListId"
1075            | "ListDistributionsByOwnedResource" => {
1076                let id = route.id.as_deref().unwrap_or("");
1077                if is_placeholder_label(id) {
1078                    return Err(invalid_argument(format!(
1079                        "Required URL identifier for {action} is missing or invalid"
1080                    )));
1081                }
1082            }
1083            "ListDistributionsByConnectionMode" => {
1084                let id = route.id.as_deref().unwrap_or("");
1085                if is_placeholder_label(id) {
1086                    return Err(invalid_argument(
1087                        "ConnectionMode is required for ListDistributionsByConnectionMode",
1088                    ));
1089                }
1090                if id != "direct" && id != "tenant-only" {
1091                    return Err(invalid_argument(format!(
1092                        "ConnectionMode must be 'direct' or 'tenant-only', got '{id}'"
1093                    )));
1094                }
1095            }
1096            "ListDistributionsByConnectionFunction"
1097                if parse_query_value(&req.raw_query, "ConnectionFunctionIdentifier").is_none() =>
1098            {
1099                return Err(invalid_argument(
1100                    "ConnectionFunctionIdentifier query parameter is required",
1101                ));
1102            }
1103            "ListDistributionsByTrustStore"
1104                if parse_query_value(&req.raw_query, "TrustStoreIdentifier").is_none() =>
1105            {
1106                return Err(invalid_argument(
1107                    "TrustStoreIdentifier query parameter is required",
1108                ));
1109            }
1110            _ => {}
1111        }
1112
1113        // The "by-X" listings each have a distinct response root element.
1114        // We never index distributions by the predicate (that would require
1115        // shipping each policy/key-group/etc service first), so each
1116        // response is empty until those services land.
1117        let root = match action {
1118            "ListDistributionsByCachePolicyId"
1119            | "ListDistributionsByOriginRequestPolicyId"
1120            | "ListDistributionsByResponseHeadersPolicyId"
1121            | "ListDistributionsByKeyGroup"
1122            | "ListDistributionsByVpcOriginId" => "DistributionIdList",
1123            "ListDistributionsByOwnedResource" => "DistributionIdOwnerList",
1124            _ => "DistributionList",
1125        };
1126        let body = build_empty_distribution_id_list(root);
1127        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1128    }
1129
1130    fn copy_distribution(
1131        &self,
1132        req: &AwsRequest,
1133        route: &Route,
1134    ) -> Result<AwsResponse, AwsServiceError> {
1135        let primary_id = route
1136            .id
1137            .as_deref()
1138            .ok_or_else(|| invalid_argument("missing primary distribution id"))?;
1139        let if_match = req
1140            .headers
1141            .get(IF_MATCH)
1142            .and_then(|v| v.to_str().ok())
1143            .ok_or_else(|| {
1144                aws_error(
1145                    StatusCode::BAD_REQUEST,
1146                    "InvalidIfMatchVersion",
1147                    "Missing If-Match header for CopyDistribution",
1148                )
1149            })?
1150            .to_string();
1151        let parsed: CopyDistributionRequest = xml_io::from_xml_root(&req.body)
1152            .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
1153        validate_caller_reference(&parsed.caller_reference)?;
1154        let mut state = self.state.write();
1155        let account = state
1156            .accounts
1157            .get_mut(DEFAULT_ACCOUNT)
1158            .ok_or_else(|| no_such_distribution(primary_id))?;
1159        let primary = account
1160            .distributions
1161            .get(primary_id)
1162            .ok_or_else(|| no_such_distribution(primary_id))?
1163            .clone();
1164        if primary.etag != if_match {
1165            return Err(aws_error(
1166                StatusCode::PRECONDITION_FAILED,
1167                "PreconditionFailed",
1168                "If-Match header does not match the current ETag",
1169            ));
1170        }
1171        if account
1172            .distributions
1173            .values()
1174            .any(|d| d.config.caller_reference == parsed.caller_reference)
1175        {
1176            return Err(aws_error(
1177                StatusCode::CONFLICT,
1178                "DistributionAlreadyExists",
1179                "Distribution with the same CallerReference exists",
1180            ));
1181        }
1182        let new_id = generate_distribution_id();
1183        let mut config = primary.config.clone();
1184        config.caller_reference = parsed.caller_reference;
1185        config.enabled = parsed.enabled.unwrap_or(false);
1186        config.staging = parsed.staging;
1187        let now = Utc::now();
1188        let etag = generate_etag();
1189        let arn = format!(
1190            "arn:aws:cloudfront::{}:distribution/{}",
1191            account_id(req),
1192            new_id
1193        );
1194        let stored = StoredDistribution {
1195            id: new_id.clone(),
1196            arn: arn.clone(),
1197            status: "InProgress".to_string(),
1198            last_modified_time: now,
1199            domain_name: format!("{}.cloudfront.net", new_id.to_lowercase()),
1200            in_progress_invalidation_batches: 0,
1201            etag: etag.clone(),
1202            config,
1203        };
1204        account.distributions.insert(new_id.clone(), stored.clone());
1205        drop(state);
1206        self.schedule_distribution_deploy(new_id);
1207        let body = build_distribution_xml(&stored);
1208        let mut headers = HeaderMap::new();
1209        set_header(&mut headers, ETAG, &etag);
1210        set_header(&mut headers, LOCATION, &stored.arn);
1211        Ok(xml_response(StatusCode::CREATED, body, headers))
1212    }
1213}
1214
1215#[derive(Debug, serde::Deserialize, Default)]
1216#[serde(rename_all = "PascalCase")]
1217struct CopyDistributionRequest {
1218    caller_reference: String,
1219    #[serde(default)]
1220    enabled: Option<bool>,
1221    #[serde(default)]
1222    staging: Option<bool>,
1223}
1224
1225// ─── Invalidations ────────────────────────────────────────────────────
1226
1227impl CloudFrontService {
1228    fn create_invalidation(
1229        &self,
1230        req: &AwsRequest,
1231        route: &Route,
1232    ) -> Result<AwsResponse, AwsServiceError> {
1233        let dist_id = route
1234            .id
1235            .as_deref()
1236            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1237        let batch: InvalidationBatch = xml_io::from_xml_root(&req.body)
1238            .map_err(|e| invalid_argument(format!("invalid InvalidationBatch XML: {e}")))?;
1239        if batch.caller_reference.is_empty() {
1240            return Err(invalid_argument("CallerReference is required"));
1241        }
1242        if batch.paths.quantity < 1 {
1243            return Err(invalid_argument(
1244                "InvalidationBatch.Paths must be non-empty",
1245            ));
1246        }
1247        let mut state = self.state.write();
1248        let account = state.entry(DEFAULT_ACCOUNT);
1249        if !account.distributions.contains_key(dist_id) {
1250            return Err(no_such_distribution(dist_id));
1251        }
1252        let id = generate_invalidation_id();
1253        let stored = StoredInvalidation {
1254            id: id.clone(),
1255            distribution_id: dist_id.to_string(),
1256            status: "Completed".to_string(),
1257            create_time: Utc::now(),
1258            batch: batch.clone(),
1259        };
1260        account.invalidations.insert(id.clone(), stored.clone());
1261        drop(state);
1262        let body = build_invalidation_xml(&stored);
1263        let mut headers = HeaderMap::new();
1264        set_header(
1265            &mut headers,
1266            LOCATION,
1267            &format!(
1268                "/2020-05-31/distribution/{dist_id}/invalidation/{}",
1269                stored.id
1270            ),
1271        );
1272        Ok(xml_response(StatusCode::CREATED, body, headers))
1273    }
1274
1275    fn get_invalidation(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
1276        let dist_id = route
1277            .id
1278            .as_deref()
1279            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1280        let inv_id = route
1281            .second_id
1282            .as_deref()
1283            .ok_or_else(|| invalid_argument("missing invalidation id"))?;
1284        let state = self.state.read();
1285        let account = state
1286            .accounts
1287            .get(DEFAULT_ACCOUNT)
1288            .ok_or_else(|| no_such_invalidation(inv_id))?;
1289        if !account.distributions.contains_key(dist_id) {
1290            return Err(no_such_distribution(dist_id));
1291        }
1292        let inv = account
1293            .invalidations
1294            .get(inv_id)
1295            .filter(|i| i.distribution_id == dist_id)
1296            .ok_or_else(|| no_such_invalidation(inv_id))?
1297            .clone();
1298        drop(state);
1299        let body = build_invalidation_xml(&inv);
1300        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1301    }
1302
1303    fn list_invalidations(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
1304        let dist_id = route
1305            .id
1306            .as_deref()
1307            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1308        let state = self.state.read();
1309        let account = state
1310            .accounts
1311            .get(DEFAULT_ACCOUNT)
1312            .ok_or_else(|| no_such_distribution(dist_id))?;
1313        if !account.distributions.contains_key(dist_id) {
1314            return Err(no_such_distribution(dist_id));
1315        }
1316        let mut items: Vec<&StoredInvalidation> = account
1317            .invalidations
1318            .values()
1319            .filter(|i| i.distribution_id == dist_id)
1320            .collect();
1321        items.sort_by_key(|a| a.create_time);
1322        let body = build_invalidation_list_xml(&items);
1323        drop(state);
1324        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1325    }
1326}
1327
1328// ─── Tags ─────────────────────────────────────────────────────────────
1329
1330impl CloudFrontService {
1331    fn parse_arn_query(query: &str) -> Option<String> {
1332        for pair in query.split('&').filter(|p| !p.is_empty()) {
1333            if let Some(rest) = pair.strip_prefix("Resource=") {
1334                return Some(percent_decode(rest));
1335            }
1336        }
1337        None
1338    }
1339
1340    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1341        let arn = Self::parse_arn_query(&req.raw_query)
1342            .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1343        let parsed: ModelTags = xml_io::from_xml_root(&req.body)
1344            .map_err(|e| invalid_argument(format!("invalid Tags XML: {e}")))?;
1345        let new_tags: Vec<Tag> = parsed
1346            .items
1347            .map(|i| {
1348                i.tag
1349                    .into_iter()
1350                    .map(|t| Tag {
1351                        key: t.key,
1352                        value: t.value,
1353                    })
1354                    .collect()
1355            })
1356            .unwrap_or_default();
1357        let mut state = self.state.write();
1358        let account = state.entry(DEFAULT_ACCOUNT);
1359        let entry = account.tags.entry(arn).or_default();
1360        for tag in new_tags {
1361            if let Some(existing) = entry.iter_mut().find(|t| t.key == tag.key) {
1362                existing.value = tag.value;
1363            } else {
1364                entry.push(tag);
1365            }
1366        }
1367        Ok(empty_response(StatusCode::NO_CONTENT))
1368    }
1369
1370    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1371        let arn = Self::parse_arn_query(&req.raw_query)
1372            .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1373        let parsed: TagKeys = xml_io::from_xml_root(&req.body)
1374            .map_err(|e| invalid_argument(format!("invalid TagKeys XML: {e}")))?;
1375        let keys: Vec<String> = parsed.items.map(|k| k.key).unwrap_or_default();
1376        let mut state = self.state.write();
1377        let account = state.entry(DEFAULT_ACCOUNT);
1378        if let Some(existing) = account.tags.get_mut(&arn) {
1379            existing.retain(|t| !keys.contains(&t.key));
1380        }
1381        Ok(empty_response(StatusCode::NO_CONTENT))
1382    }
1383
1384    fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1385        let arn = Self::parse_arn_query(&req.raw_query)
1386            .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1387        let state = self.state.read();
1388        let tags = state
1389            .accounts
1390            .get(DEFAULT_ACCOUNT)
1391            .and_then(|a| a.tags.get(&arn))
1392            .cloned()
1393            .unwrap_or_default();
1394        drop(state);
1395        let body = build_tags_xml(&tags);
1396        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1397    }
1398}
1399
1400// ─── Aliases / WebACL ─────────────────────────────────────────────────
1401
1402impl CloudFrontService {
1403    fn associate_alias(
1404        &self,
1405        req: &AwsRequest,
1406        route: &Route,
1407    ) -> Result<AwsResponse, AwsServiceError> {
1408        let id = route
1409            .id
1410            .as_deref()
1411            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1412        let alias = parse_query_value(&req.raw_query, "Alias")
1413            .ok_or_else(|| invalid_argument("Alias query parameter is required"))?;
1414        let mut state = self.state.write();
1415        let account = state
1416            .accounts
1417            .get_mut(DEFAULT_ACCOUNT)
1418            .ok_or_else(|| no_such_distribution(id))?;
1419        // Reject if the alias is already attached to a different distribution.
1420        if let Some(other) = account.distributions.values().find(|d| {
1421            d.id != id
1422                && d.config
1423                    .aliases
1424                    .as_ref()
1425                    .and_then(|a| a.items.as_ref())
1426                    .is_some_and(|i| i.cname.iter().any(|c| c == &alias))
1427        }) {
1428            return Err(aws_error(
1429                StatusCode::CONFLICT,
1430                "CNAMEAlreadyExists",
1431                format!(
1432                    "Alias {alias} is already associated with distribution {}",
1433                    other.id
1434                ),
1435            ));
1436        }
1437        let dist = account
1438            .distributions
1439            .get_mut(id)
1440            .ok_or_else(|| no_such_distribution(id))?;
1441        let aliases = dist.config.aliases.get_or_insert_with(Default::default);
1442        let items = aliases
1443            .items
1444            .get_or_insert_with(crate::model::AliasItems::default);
1445        if !items.cname.iter().any(|c| c == &alias) {
1446            items.cname.push(alias.clone());
1447            aliases.quantity = items.cname.len() as i32;
1448        }
1449        dist.etag = generate_etag();
1450        dist.last_modified_time = Utc::now();
1451        Ok(empty_response(StatusCode::OK))
1452    }
1453
1454    fn list_conflicting_aliases(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1455        let alias = parse_query_value(&req.raw_query, "Alias")
1456            .ok_or_else(|| invalid_argument("Alias query parameter is required"))?;
1457        let dist_id = parse_query_value(&req.raw_query, "DistributionId")
1458            .ok_or_else(|| invalid_argument("DistributionId query parameter is required"))?;
1459        // aliasString max 253, distributionIdString max 25 per the Smithy
1460        // model. Reject probe-generated boundary variants that overrun the
1461        // documented length so they produce the declared InvalidArgument
1462        // instead of an empty 200.
1463        if alias.len() > 253 {
1464            return Err(invalid_argument(format!(
1465                "Alias length {} exceeds maximum 253",
1466                alias.len()
1467            )));
1468        }
1469        if dist_id.len() > 25 {
1470            return Err(invalid_argument(format!(
1471                "DistributionId length {} exceeds maximum 25",
1472                dist_id.len()
1473            )));
1474        }
1475        if let Some(max_items) = parse_query_value(&req.raw_query, "MaxItems") {
1476            let n: i64 = max_items.parse().map_err(|_| {
1477                invalid_argument(format!("MaxItems must be an integer, got '{max_items}'"))
1478            })?;
1479            if n > 100 {
1480                return Err(invalid_argument(format!(
1481                    "MaxItems {n} exceeds maximum 100"
1482                )));
1483            }
1484        }
1485        // We never produce conflicts because every alias is owned by one
1486        // distribution at most. Return an empty list with the proper shape.
1487        let body = format!(
1488            "{XML_DECL}<ConflictingAliasesList xmlns=\"{NS}\"><Quantity>0</Quantity></ConflictingAliasesList>",
1489            NS = crate::NAMESPACE,
1490            XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1491        );
1492        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1493    }
1494
1495    fn associate_web_acl(
1496        &self,
1497        req: &AwsRequest,
1498        route: &Route,
1499    ) -> Result<AwsResponse, AwsServiceError> {
1500        let id = route
1501            .id
1502            .as_deref()
1503            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1504        let parsed: AssociateAliasRequest = xml_io::from_xml_root(&req.body)
1505            .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
1506        let mut state = self.state.write();
1507        let account = state
1508            .accounts
1509            .get_mut(DEFAULT_ACCOUNT)
1510            .ok_or_else(|| no_such_distribution(id))?;
1511        let dist = account
1512            .distributions
1513            .get_mut(id)
1514            .ok_or_else(|| no_such_distribution(id))?;
1515        dist.config.web_acl_id = Some(parsed.web_acl_arn.clone());
1516        dist.etag = generate_etag();
1517        dist.last_modified_time = Utc::now();
1518        let body = format!(
1519            "{XML_DECL}<AssociateDistributionWebACLResult xmlns=\"{NS}\"><Id>{}</Id><WebACLArn>{}</WebACLArn></AssociateDistributionWebACLResult>",
1520            esc(id), esc(&parsed.web_acl_arn),
1521            NS = crate::NAMESPACE,
1522            XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1523        );
1524        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1525    }
1526
1527    fn disassociate_web_acl(
1528        &self,
1529        _req: &AwsRequest,
1530        route: &Route,
1531    ) -> Result<AwsResponse, AwsServiceError> {
1532        let id = route
1533            .id
1534            .as_deref()
1535            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1536        // DisassociateDistributionWebACL's Smithy model declares EntityNotFound
1537        // (not NoSuchDistribution) for unknown distribution IDs.
1538        let entity_not_found = || {
1539            aws_error(
1540                StatusCode::NOT_FOUND,
1541                "EntityNotFound",
1542                format!("The specified distribution does not exist: {id}"),
1543            )
1544        };
1545        let mut state = self.state.write();
1546        let account = state
1547            .accounts
1548            .get_mut(DEFAULT_ACCOUNT)
1549            .ok_or_else(entity_not_found)?;
1550        let dist = account
1551            .distributions
1552            .get_mut(id)
1553            .ok_or_else(entity_not_found)?;
1554        dist.config.web_acl_id = None;
1555        dist.etag = generate_etag();
1556        dist.last_modified_time = Utc::now();
1557        let body = format!(
1558            "{XML_DECL}<DisassociateDistributionWebACLResult xmlns=\"{NS}\"><Id>{}</Id></DisassociateDistributionWebACLResult>",
1559            esc(id),
1560            NS = crate::NAMESPACE,
1561            XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1562        );
1563        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1564    }
1565}
1566
1567#[derive(serde::Deserialize, Default, Debug)]
1568#[serde(rename_all = "PascalCase")]
1569struct AssociateAliasRequest {
1570    #[serde(rename = "WebACLArn", default)]
1571    web_acl_arn: String,
1572}
1573
1574// ─── XML body builders ────────────────────────────────────────────────
1575
1576/// XML-escape a user-provided string before injecting it into hand-rolled
1577/// XML response bodies. The 5 standard XML metacharacters are escaped;
1578/// everything else passes through unchanged. Keep this in sync with
1579/// `quick_xml`'s entity table — using their own primitive would mean a
1580/// `Writer` per call and we serialize directly into a `String`.
1581pub(crate) fn esc(s: &str) -> String {
1582    let mut out = String::with_capacity(s.len());
1583    for c in s.chars() {
1584        match c {
1585            '&' => out.push_str("&amp;"),
1586            '<' => out.push_str("&lt;"),
1587            '>' => out.push_str("&gt;"),
1588            '"' => out.push_str("&quot;"),
1589            '\'' => out.push_str("&apos;"),
1590            _ => out.push(c),
1591        }
1592    }
1593    out
1594}
1595
1596pub(crate) fn build_distribution_xml(dist: &StoredDistribution) -> String {
1597    let mut out = String::with_capacity(2048);
1598    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1599    out.push_str(&format!(
1600        "<Distribution xmlns=\"{ns}\">",
1601        ns = crate::NAMESPACE
1602    ));
1603    out.push_str(&format!("<Id>{}</Id>", esc(&dist.id)));
1604    out.push_str(&format!("<ARN>{}</ARN>", esc(&dist.arn)));
1605    out.push_str(&format!("<Status>{}</Status>", esc(&dist.status)));
1606    out.push_str(&format!(
1607        "<LastModifiedTime>{}</LastModifiedTime>",
1608        rfc3339(&dist.last_modified_time)
1609    ));
1610    out.push_str(&format!(
1611        "<InProgressInvalidationBatches>{}</InProgressInvalidationBatches>",
1612        dist.in_progress_invalidation_batches
1613    ));
1614    out.push_str(&format!(
1615        "<DomainName>{}</DomainName>",
1616        esc(&dist.domain_name)
1617    ));
1618    out.push_str("<ActiveTrustedSigners><Enabled>false</Enabled><Quantity>0</Quantity></ActiveTrustedSigners>");
1619    out.push_str("<ActiveTrustedKeyGroups><Enabled>false</Enabled><Quantity>0</Quantity></ActiveTrustedKeyGroups>");
1620    let inner = quick_xml::se::to_string_with_root("DistributionConfig", &dist.config)
1621        .unwrap_or_else(|_| String::new());
1622    out.push_str(&inner);
1623    out.push_str("</Distribution>");
1624    out
1625}
1626
1627fn build_distribution_list_xml(dists: &[StoredDistribution], root: &str) -> String {
1628    let mut out = String::with_capacity(2048);
1629    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1630    out.push_str(&format!("<{root} xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1631    out.push_str("<Marker></Marker>");
1632    out.push_str(&format!("<MaxItems>{}</MaxItems>", dists.len().max(100)));
1633    out.push_str("<IsTruncated>false</IsTruncated>");
1634    out.push_str(&format!("<Quantity>{}</Quantity>", dists.len()));
1635    if dists.is_empty() {
1636        out.push_str(&format!("</{root}>"));
1637        return out;
1638    }
1639    out.push_str("<Items>");
1640    for d in dists {
1641        out.push_str("<DistributionSummary>");
1642        out.push_str(&format!("<Id>{}</Id>", esc(&d.id)));
1643        out.push_str(&format!("<ARN>{}</ARN>", esc(&d.arn)));
1644        out.push_str(&format!("<Status>{}</Status>", esc(&d.status)));
1645        out.push_str(&format!(
1646            "<LastModifiedTime>{}</LastModifiedTime>",
1647            rfc3339(&d.last_modified_time)
1648        ));
1649        out.push_str(&format!("<DomainName>{}</DomainName>", esc(&d.domain_name)));
1650        let aliases = d.config.aliases.clone().unwrap_or_default();
1651        out.push_str(&render_inline("Aliases", &aliases));
1652        let origins = d.config.origins.clone();
1653        out.push_str(&render_inline("Origins", &origins));
1654        let dcb = d.config.default_cache_behavior.clone();
1655        out.push_str(&render_inline("DefaultCacheBehavior", &dcb));
1656        let cb = d.config.cache_behaviors.clone().unwrap_or_default();
1657        out.push_str(&render_inline("CacheBehaviors", &cb));
1658        let cer = d.config.custom_error_responses.clone().unwrap_or_default();
1659        out.push_str(&render_inline("CustomErrorResponses", &cer));
1660        out.push_str(&format!("<Comment>{}</Comment>", esc(&d.config.comment)));
1661        out.push_str(&format!(
1662            "<PriceClass>{}</PriceClass>",
1663            esc(&d
1664                .config
1665                .price_class
1666                .clone()
1667                .unwrap_or_else(|| "PriceClass_All".to_string()))
1668        ));
1669        out.push_str(&format!("<Enabled>{}</Enabled>", d.config.enabled));
1670        out.push_str(&render_inline(
1671            "ViewerCertificate",
1672            &d.config.viewer_certificate.clone().unwrap_or_default(),
1673        ));
1674        out.push_str(&render_inline(
1675            "Restrictions",
1676            &d.config.restrictions.clone().unwrap_or_default(),
1677        ));
1678        out.push_str(&format!(
1679            "<WebACLId>{}</WebACLId>",
1680            esc(&d.config.web_acl_id.clone().unwrap_or_default())
1681        ));
1682        out.push_str(&format!(
1683            "<HttpVersion>{}</HttpVersion>",
1684            esc(&d
1685                .config
1686                .http_version
1687                .clone()
1688                .unwrap_or_else(|| "http2".to_string()))
1689        ));
1690        out.push_str(&format!(
1691            "<IsIPV6Enabled>{}</IsIPV6Enabled>",
1692            d.config.is_ipv6_enabled.unwrap_or(true)
1693        ));
1694        out.push_str(&format!(
1695            "<Staging>{}</Staging>",
1696            d.config.staging.unwrap_or(false)
1697        ));
1698        out.push_str("</DistributionSummary>");
1699    }
1700    out.push_str("</Items>");
1701    out.push_str(&format!("</{root}>"));
1702    out
1703}
1704
1705fn build_empty_distribution_id_list(root: &str) -> String {
1706    let mut out = String::with_capacity(256);
1707    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1708    out.push_str(&format!("<{root} xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1709    out.push_str("<Marker></Marker>");
1710    out.push_str("<MaxItems>100</MaxItems>");
1711    out.push_str("<IsTruncated>false</IsTruncated>");
1712    out.push_str("<Quantity>0</Quantity>");
1713    out.push_str(&format!("</{root}>"));
1714    out
1715}
1716
1717fn build_invalidation_xml(inv: &StoredInvalidation) -> String {
1718    let mut out = String::with_capacity(512);
1719    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1720    out.push_str(&format!(
1721        "<Invalidation xmlns=\"{ns}\">",
1722        ns = crate::NAMESPACE
1723    ));
1724    out.push_str(&format!("<Id>{}</Id>", esc(&inv.id)));
1725    out.push_str(&format!("<Status>{}</Status>", esc(&inv.status)));
1726    out.push_str(&format!(
1727        "<CreateTime>{}</CreateTime>",
1728        rfc3339(&inv.create_time)
1729    ));
1730    out.push_str(&render_inline("InvalidationBatch", &inv.batch));
1731    out.push_str("</Invalidation>");
1732    out
1733}
1734
1735fn build_invalidation_list_xml(items: &[&StoredInvalidation]) -> String {
1736    let mut out = String::with_capacity(1024);
1737    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1738    out.push_str(&format!(
1739        "<InvalidationList xmlns=\"{ns}\">",
1740        ns = crate::NAMESPACE
1741    ));
1742    out.push_str("<Marker></Marker>");
1743    out.push_str("<MaxItems>100</MaxItems>");
1744    out.push_str("<IsTruncated>false</IsTruncated>");
1745    out.push_str(&format!("<Quantity>{}</Quantity>", items.len()));
1746    if !items.is_empty() {
1747        out.push_str("<Items>");
1748        for inv in items {
1749            out.push_str("<InvalidationSummary>");
1750            out.push_str(&format!("<Id>{}</Id>", esc(&inv.id)));
1751            out.push_str(&format!(
1752                "<CreateTime>{}</CreateTime>",
1753                rfc3339(&inv.create_time)
1754            ));
1755            out.push_str(&format!("<Status>{}</Status>", esc(&inv.status)));
1756            out.push_str("</InvalidationSummary>");
1757        }
1758        out.push_str("</Items>");
1759    }
1760    out.push_str("</InvalidationList>");
1761    out
1762}
1763
1764fn build_tags_xml(tags: &[Tag]) -> String {
1765    let mut out = String::with_capacity(256);
1766    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1767    out.push_str(&format!("<Tags xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1768    out.push_str("<Items>");
1769    for t in tags {
1770        out.push_str("<Tag>");
1771        out.push_str(&format!("<Key>{}</Key>", esc(&t.key)));
1772        if let Some(v) = &t.value {
1773            out.push_str(&format!("<Value>{}</Value>", esc(v)));
1774        }
1775        out.push_str("</Tag>");
1776    }
1777    out.push_str("</Items>");
1778    out.push_str("</Tags>");
1779    out
1780}
1781
1782fn render_inline<T: serde::Serialize>(root: &str, value: &T) -> String {
1783    quick_xml::se::to_string_with_root(root, value).unwrap_or_default()
1784}
1785
1786// ─── Helpers ──────────────────────────────────────────────────────────
1787
1788fn validate_caller_reference(s: &str) -> Result<(), AwsServiceError> {
1789    if s.is_empty() {
1790        return Err(invalid_argument("CallerReference is required"));
1791    }
1792    Ok(())
1793}
1794
1795fn validate_origins(config: &DistributionConfig) -> Result<(), AwsServiceError> {
1796    if config.origins.quantity < 1 {
1797        return Err(invalid_argument(
1798            "DistributionConfig.Origins must contain at least one origin",
1799        ));
1800    }
1801    Ok(())
1802}
1803
1804/// Compare two `DistributionConfig`s by serializing them to canonical XML
1805/// and comparing bytes. Used by `UpdateDistribution` to detect no-op writes
1806/// so the ETag stays stable when the caller PUTs the same config back.
1807/// Falls back to "not equal" if either serialization fails so we still
1808/// honor the request rather than silently swallow a write.
1809fn configs_equal(lhs: &DistributionConfig, rhs: &DistributionConfig) -> bool {
1810    let Ok(a) = xml_io::to_xml_root("DistributionConfig", lhs) else {
1811        return false;
1812    };
1813    let Ok(b) = xml_io::to_xml_root("DistributionConfig", rhs) else {
1814        return false;
1815    };
1816    a == b
1817}
1818
1819fn account_id(_req: &AwsRequest) -> &'static str {
1820    // Multi-account is wired through AwsRequest.account_id elsewhere; the
1821    // CloudFront control plane only uses the resolved id for the ARN
1822    // suffix. Until that field stabilizes for REST-XML we use the default
1823    // account ID consistently with the rest of the registered services.
1824    DEFAULT_ACCOUNT
1825}
1826
1827fn generate_distribution_id() -> String {
1828    // CloudFront IDs are 14-char base32-ish uppercase strings starting with E.
1829    let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1830    format!("E{}", &raw[..13])
1831}
1832
1833fn generate_invalidation_id() -> String {
1834    let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1835    format!("I{}", &raw[..13])
1836}
1837
1838pub(crate) fn generate_etag() -> String {
1839    let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1840    format!("E{}", &raw[..13])
1841}
1842
1843/// Generate an AWS-shaped CloudFront resource ID with the given prefix.
1844/// Used by Batch 2 policy resources (cache, origin request, response
1845/// headers, continuous deployment, OAC) so each gets a recognizable
1846/// alphabetic prefix in addition to the random suffix.
1847pub(crate) fn generate_id_with_prefix(prefix: &str) -> String {
1848    let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1849    let suffix_len = 14usize.saturating_sub(prefix.len()).min(raw.len());
1850    format!("{prefix}{}", &raw[..suffix_len])
1851}
1852
1853fn rfc3339(t: &chrono::DateTime<chrono::Utc>) -> String {
1854    t.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
1855}
1856
1857pub(crate) fn invalid_argument(msg: impl Into<String>) -> AwsServiceError {
1858    aws_error(StatusCode::BAD_REQUEST, "InvalidArgument", msg)
1859}
1860
1861fn no_such_distribution(id: &str) -> AwsServiceError {
1862    aws_error(
1863        StatusCode::NOT_FOUND,
1864        "NoSuchDistribution",
1865        format!("The specified distribution does not exist: {id}"),
1866    )
1867}
1868
1869fn no_such_invalidation(id: &str) -> AwsServiceError {
1870    aws_error(
1871        StatusCode::NOT_FOUND,
1872        "NoSuchInvalidation",
1873        format!("The specified invalidation does not exist: {id}"),
1874    )
1875}
1876
1877fn internal_error(msg: impl Into<String>) -> AwsServiceError {
1878    aws_error(StatusCode::INTERNAL_SERVER_ERROR, "InternalError", msg)
1879}
1880
1881pub(crate) fn aws_error(
1882    status: StatusCode,
1883    code: impl Into<String>,
1884    msg: impl Into<String>,
1885) -> AwsServiceError {
1886    AwsServiceError::aws_error(status, code.into(), msg)
1887}
1888
1889fn set_header(headers: &mut HeaderMap, name: HeaderName, value: &str) {
1890    if let Ok(v) = HeaderValue::from_str(value) {
1891        headers.insert(name, v);
1892    }
1893}
1894
1895pub(crate) fn xml_response(status: StatusCode, body: String, headers: HeaderMap) -> AwsResponse {
1896    AwsResponse {
1897        status,
1898        content_type: "text/xml".to_string(),
1899        body: ResponseBody::Bytes(Bytes::from(body)),
1900        headers,
1901    }
1902}
1903
1904fn empty_response(status: StatusCode) -> AwsResponse {
1905    AwsResponse {
1906        status,
1907        content_type: "text/xml".to_string(),
1908        body: ResponseBody::Bytes(Bytes::new()),
1909        headers: HeaderMap::new(),
1910    }
1911}
1912
1913fn percent_decode(input: &str) -> String {
1914    let mut out = String::with_capacity(input.len());
1915    let bytes = input.as_bytes();
1916    let mut i = 0;
1917    while i < bytes.len() {
1918        let b = bytes[i];
1919        if b == b'%' && i + 2 < bytes.len() {
1920            if let (Some(a), Some(c)) = (hex_digit(bytes[i + 1]), hex_digit(bytes[i + 2])) {
1921                out.push(((a << 4) | c) as char);
1922                i += 3;
1923                continue;
1924            }
1925        }
1926        if b == b'+' {
1927            out.push(' ');
1928        } else {
1929            out.push(b as char);
1930        }
1931        i += 1;
1932    }
1933    out
1934}
1935
1936fn hex_digit(b: u8) -> Option<u8> {
1937    match b {
1938        b'0'..=b'9' => Some(b - b'0'),
1939        b'a'..=b'f' => Some(b - b'a' + 10),
1940        b'A'..=b'F' => Some(b - b'A' + 10),
1941        _ => None,
1942    }
1943}
1944
1945/// True when a URL-decoded label segment looks like an unsubstituted Smithy
1946/// URI placeholder (e.g. `{Identifier}` or its percent-encoded form
1947/// `%7BIdentifier%7D`). Conformance probes that omit a required `@httpLabel`
1948/// input leave the literal placeholder in the URL; handlers that can
1949/// short-circuit on it return the declared validation error instead of a
1950/// fabricated 200.
1951pub(crate) fn is_placeholder_label(value: &str) -> bool {
1952    if value.is_empty() {
1953        return true;
1954    }
1955    let lower = value.to_ascii_lowercase();
1956    value.starts_with('{') || lower.starts_with("%7b")
1957}
1958
1959/// Best-effort field extractor for request bodies the probe sends as JSON
1960/// even when the service is REST-XML. Walks the body trying JSON first,
1961/// then a naive XML element scan. Returns the value for the first occurrence
1962/// of `key`. Used by handlers that need to validate optional/required body
1963/// members up-front (enum bounds, length, presence) without committing to a
1964/// full strongly-typed parse — the actual handler still uses the typed XML
1965/// parser for the structured fields it consumes.
1966pub(crate) fn extract_body_field(body: &[u8], key: &str) -> Option<String> {
1967    if let Ok(s) = std::str::from_utf8(body) {
1968        let trimmed = s.trim_start();
1969        if trimmed.starts_with('{') {
1970            if let Ok(v) = serde_json::from_str::<serde_json::Value>(trimmed) {
1971                if let Some(field) = v.get(key) {
1972                    return match field {
1973                        serde_json::Value::String(s) => Some(s.clone()),
1974                        serde_json::Value::Number(n) => Some(n.to_string()),
1975                        serde_json::Value::Bool(b) => Some(b.to_string()),
1976                        _ => None,
1977                    };
1978                }
1979                return None;
1980            }
1981        }
1982        // Fall back to a naive XML extraction for genuine XML bodies.
1983        let open = format!("<{key}>");
1984        let close = format!("</{key}>");
1985        if let Some(start) = s.find(&open) {
1986            let after = start + open.len();
1987            if let Some(end_rel) = s[after..].find(&close) {
1988                return Some(s[after..after + end_rel].to_string());
1989            }
1990        }
1991    }
1992    None
1993}
1994
1995fn parse_query_value(query: &str, key: &str) -> Option<String> {
1996    let prefix = format!("{key}=");
1997    for pair in query.split('&').filter(|p| !p.is_empty()) {
1998        if let Some(rest) = pair.strip_prefix(&prefix) {
1999            return Some(percent_decode(rest));
2000        }
2001    }
2002    None
2003}
2004
2005#[cfg(test)]
2006mod tests {
2007    use super::*;
2008
2009    #[test]
2010    fn placeholder_label_detects_braces_and_percent_encoding() {
2011        assert!(is_placeholder_label(""));
2012        assert!(is_placeholder_label("{Identifier}"));
2013        assert!(is_placeholder_label("%7BIdentifier%7D"));
2014        assert!(is_placeholder_label("%7bidentifier%7d"));
2015        assert!(!is_placeholder_label("E1234567890ABC"));
2016        assert!(!is_placeholder_label(
2017            "arn:aws:cloudfront::000:distribution/E1"
2018        ));
2019    }
2020
2021    #[test]
2022    fn extract_body_field_handles_json_and_xml() {
2023        let json = br#"{"Stage":"BROKEN","Marker":"x"}"#;
2024        assert_eq!(
2025            extract_body_field(json, "Stage"),
2026            Some("BROKEN".to_string())
2027        );
2028        assert_eq!(extract_body_field(json, "MaxItems"), None);
2029
2030        let xml = br#"<?xml version="1.0"?><Body><Domain>example.com</Domain></Body>"#;
2031        assert_eq!(
2032            extract_body_field(xml, "Domain"),
2033            Some("example.com".to_string())
2034        );
2035        assert_eq!(extract_body_field(xml, "Missing"), None);
2036
2037        assert_eq!(extract_body_field(b"", "x"), None);
2038    }
2039
2040    fn make_state() -> SharedCloudFrontState {
2041        Arc::new(RwLock::new(CloudFrontAccounts::new()))
2042    }
2043
2044    fn make_request(method: http::Method, path: &str, query: &str, body: &str) -> AwsRequest {
2045        AwsRequest {
2046            service: "cloudfront".into(),
2047            action: String::new(),
2048            region: "us-east-1".into(),
2049            account_id: DEFAULT_ACCOUNT.into(),
2050            request_id: Uuid::new_v4().to_string(),
2051            headers: HeaderMap::new(),
2052            query_params: std::collections::HashMap::new(),
2053            body_stream: parking_lot::Mutex::new(None),
2054            body: Bytes::from(body.to_string()),
2055            path_segments: path
2056                .split('/')
2057                .filter(|s| !s.is_empty())
2058                .map(String::from)
2059                .collect(),
2060            raw_path: path.into(),
2061            raw_query: query.into(),
2062            method,
2063            is_query_protocol: false,
2064            access_key_id: None,
2065            principal: None,
2066        }
2067    }
2068
2069    fn minimal_dist_config_xml(caller_ref: &str) -> String {
2070        format!(
2071            r#"<?xml version="1.0" encoding="UTF-8"?>
2072<DistributionConfig xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2073  <CallerReference>{caller_ref}</CallerReference>
2074  <Origins>
2075    <Quantity>1</Quantity>
2076    <Items>
2077      <Origin>
2078        <Id>primary</Id>
2079        <DomainName>example.com</DomainName>
2080      </Origin>
2081    </Items>
2082  </Origins>
2083  <DefaultCacheBehavior>
2084    <TargetOriginId>primary</TargetOriginId>
2085    <ViewerProtocolPolicy>allow-all</ViewerProtocolPolicy>
2086  </DefaultCacheBehavior>
2087  <Comment></Comment>
2088  <Enabled>true</Enabled>
2089</DistributionConfig>"#
2090        )
2091    }
2092
2093    #[tokio::test]
2094    async fn create_then_get_then_delete_distribution() {
2095        let svc = CloudFrontService::new(make_state());
2096        let body = minimal_dist_config_xml("ref-1");
2097        let create = svc
2098            .handle(make_request(
2099                http::Method::POST,
2100                "/2020-05-31/distribution",
2101                "",
2102                &body,
2103            ))
2104            .await
2105            .unwrap();
2106        assert_eq!(create.status, StatusCode::CREATED);
2107        let etag = create
2108            .headers
2109            .get(ETAG)
2110            .unwrap()
2111            .to_str()
2112            .unwrap()
2113            .to_string();
2114        let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2115        let id = xml
2116            .split("<Id>")
2117            .nth(1)
2118            .unwrap()
2119            .split("</Id>")
2120            .next()
2121            .unwrap()
2122            .to_string();
2123
2124        let get = svc
2125            .handle(make_request(
2126                http::Method::GET,
2127                &format!("/2020-05-31/distribution/{id}"),
2128                "",
2129                "",
2130            ))
2131            .await
2132            .unwrap();
2133        assert_eq!(get.status, StatusCode::OK);
2134
2135        // Disable then delete (CloudFront requires Disabled before delete).
2136        let disable_body = body.replace("<Enabled>true</Enabled>", "<Enabled>false</Enabled>");
2137        let mut update_req = make_request(
2138            http::Method::PUT,
2139            &format!("/2020-05-31/distribution/{id}/config"),
2140            "",
2141            &disable_body,
2142        );
2143        update_req.headers.insert(IF_MATCH, etag.parse().unwrap());
2144        let updated = svc.handle(update_req).await.unwrap();
2145        assert_eq!(updated.status, StatusCode::OK);
2146        let new_etag = updated
2147            .headers
2148            .get(ETAG)
2149            .unwrap()
2150            .to_str()
2151            .unwrap()
2152            .to_string();
2153
2154        let mut del_req = make_request(
2155            http::Method::DELETE,
2156            &format!("/2020-05-31/distribution/{id}"),
2157            "",
2158            "",
2159        );
2160        del_req.headers.insert(IF_MATCH, new_etag.parse().unwrap());
2161        let del = svc.handle(del_req).await.unwrap();
2162        assert_eq!(del.status, StatusCode::NO_CONTENT);
2163    }
2164
2165    async fn create_distribution_returning_id(svc: &CloudFrontService, caller_ref: &str) -> String {
2166        let body = minimal_dist_config_xml(caller_ref);
2167        let create = svc
2168            .handle(make_request(
2169                http::Method::POST,
2170                "/2020-05-31/distribution",
2171                "",
2172                &body,
2173            ))
2174            .await
2175            .unwrap();
2176        let xml = std::str::from_utf8(create.body.expect_bytes())
2177            .unwrap()
2178            .to_string();
2179        xml.split("<Id>")
2180            .nth(1)
2181            .unwrap()
2182            .split("</Id>")
2183            .next()
2184            .unwrap()
2185            .to_string()
2186    }
2187
2188    fn distribution_status(svc: &CloudFrontService, id: &str) -> String {
2189        let state = svc.state.read();
2190        state
2191            .accounts
2192            .get(DEFAULT_ACCOUNT)
2193            .and_then(|a| a.distributions.get(id))
2194            .map(|d| d.status.clone())
2195            .unwrap_or_default()
2196    }
2197
2198    #[tokio::test]
2199    async fn create_distribution_starts_in_progress() {
2200        // Use a long delay so the auto-tick can't race the assertion.
2201        let svc = CloudFrontService::new(make_state())
2202            .with_propagation_delay(std::time::Duration::from_secs(60));
2203        let body = minimal_dist_config_xml("status-ref");
2204        let create = svc
2205            .handle(make_request(
2206                http::Method::POST,
2207                "/2020-05-31/distribution",
2208                "",
2209                &body,
2210            ))
2211            .await
2212            .unwrap();
2213        let xml = std::str::from_utf8(create.body.expect_bytes())
2214            .unwrap()
2215            .to_string();
2216        assert!(
2217            xml.contains("<Status>InProgress</Status>"),
2218            "expected initial status InProgress, got: {xml}"
2219        );
2220    }
2221
2222    #[tokio::test]
2223    async fn auto_transition_after_tick_marks_deployed() {
2224        let svc = CloudFrontService::new(make_state())
2225            .with_propagation_delay(std::time::Duration::from_millis(50));
2226        let id = create_distribution_returning_id(&svc, "auto-tick-ref").await;
2227        assert_eq!(distribution_status(&svc, &id), "InProgress");
2228        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2229        assert_eq!(distribution_status(&svc, &id), "Deployed");
2230    }
2231
2232    #[tokio::test]
2233    async fn set_distribution_status_via_admin_flips_synchronously() {
2234        let svc = CloudFrontService::new(make_state())
2235            .with_propagation_delay(std::time::Duration::from_secs(60));
2236        let id = create_distribution_returning_id(&svc, "admin-ref").await;
2237        assert_eq!(distribution_status(&svc, &id), "InProgress");
2238        assert!(svc.set_distribution_status(&id, "Deployed"));
2239        assert_eq!(distribution_status(&svc, &id), "Deployed");
2240        assert!(svc.set_distribution_status(&id, "InProgress"));
2241        assert_eq!(distribution_status(&svc, &id), "InProgress");
2242    }
2243
2244    #[tokio::test]
2245    async fn set_distribution_status_unknown_id_returns_false() {
2246        let svc = CloudFrontService::new(make_state());
2247        assert!(!svc.set_distribution_status("E-DOES-NOT-EXIST", "Deployed"));
2248    }
2249
2250    #[tokio::test]
2251    async fn update_distribution_resets_to_in_progress() {
2252        let svc = CloudFrontService::new(make_state())
2253            .with_propagation_delay(std::time::Duration::from_secs(60));
2254        let body = minimal_dist_config_xml("update-reset-ref");
2255        let create = svc
2256            .handle(make_request(
2257                http::Method::POST,
2258                "/2020-05-31/distribution",
2259                "",
2260                &body,
2261            ))
2262            .await
2263            .unwrap();
2264        let etag = create
2265            .headers
2266            .get(ETAG)
2267            .unwrap()
2268            .to_str()
2269            .unwrap()
2270            .to_string();
2271        let xml = std::str::from_utf8(create.body.expect_bytes())
2272            .unwrap()
2273            .to_string();
2274        let id = xml
2275            .split("<Id>")
2276            .nth(1)
2277            .unwrap()
2278            .split("</Id>")
2279            .next()
2280            .unwrap()
2281            .to_string();
2282        // Force the distribution to Deployed via the admin mutator so we can
2283        // observe the UpdateDistribution flip back to InProgress.
2284        assert!(svc.set_distribution_status(&id, "Deployed"));
2285        assert_eq!(distribution_status(&svc, &id), "Deployed");
2286
2287        let updated_body = body.replace(
2288            "<ViewerProtocolPolicy>allow-all</ViewerProtocolPolicy>",
2289            "<ViewerProtocolPolicy>https-only</ViewerProtocolPolicy>",
2290        );
2291        let mut update_req = make_request(
2292            http::Method::PUT,
2293            &format!("/2020-05-31/distribution/{id}/config"),
2294            "",
2295            &updated_body,
2296        );
2297        update_req.headers.insert(IF_MATCH, etag.parse().unwrap());
2298        let updated = svc.handle(update_req).await.unwrap();
2299        assert_eq!(updated.status, StatusCode::OK);
2300        assert_eq!(distribution_status(&svc, &id), "InProgress");
2301    }
2302
2303    #[tokio::test]
2304    async fn duplicate_caller_reference_is_rejected() {
2305        let svc = CloudFrontService::new(make_state());
2306        let body = minimal_dist_config_xml("dup-ref");
2307        svc.handle(make_request(
2308            http::Method::POST,
2309            "/2020-05-31/distribution",
2310            "",
2311            &body,
2312        ))
2313        .await
2314        .unwrap();
2315        let result = svc
2316            .handle(make_request(
2317                http::Method::POST,
2318                "/2020-05-31/distribution",
2319                "",
2320                &body,
2321            ))
2322            .await;
2323        let err = match result {
2324            Ok(_) => panic!("expected duplicate caller-reference to fail"),
2325            Err(e) => e,
2326        };
2327        assert_eq!(err.code(), "DistributionAlreadyExists");
2328        assert_eq!(err.status(), StatusCode::CONFLICT);
2329    }
2330
2331    #[tokio::test]
2332    async fn invalidation_lifecycle() {
2333        let svc = CloudFrontService::new(make_state());
2334        let body = minimal_dist_config_xml("inv-ref");
2335        let create = svc
2336            .handle(make_request(
2337                http::Method::POST,
2338                "/2020-05-31/distribution",
2339                "",
2340                &body,
2341            ))
2342            .await
2343            .unwrap();
2344        let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2345        let dist_id = xml
2346            .split("<Id>")
2347            .nth(1)
2348            .unwrap()
2349            .split("</Id>")
2350            .next()
2351            .unwrap();
2352        let inv_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2353<InvalidationBatch xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2354  <Paths><Quantity>1</Quantity><Items><Path>/*</Path></Items></Paths>
2355  <CallerReference>inv-1</CallerReference>
2356</InvalidationBatch>"#;
2357        let inv_resp = svc
2358            .handle(make_request(
2359                http::Method::POST,
2360                &format!("/2020-05-31/distribution/{dist_id}/invalidation"),
2361                "",
2362                inv_body,
2363            ))
2364            .await
2365            .unwrap();
2366        assert_eq!(inv_resp.status, StatusCode::CREATED);
2367        let inv_xml = std::str::from_utf8(inv_resp.body.expect_bytes()).unwrap();
2368        let inv_id = inv_xml
2369            .split("<Id>")
2370            .nth(1)
2371            .unwrap()
2372            .split("</Id>")
2373            .next()
2374            .unwrap();
2375        let get = svc
2376            .handle(make_request(
2377                http::Method::GET,
2378                &format!("/2020-05-31/distribution/{dist_id}/invalidation/{inv_id}"),
2379                "",
2380                "",
2381            ))
2382            .await
2383            .unwrap();
2384        assert_eq!(get.status, StatusCode::OK);
2385        let list = svc
2386            .handle(make_request(
2387                http::Method::GET,
2388                &format!("/2020-05-31/distribution/{dist_id}/invalidation"),
2389                "",
2390                "",
2391            ))
2392            .await
2393            .unwrap();
2394        let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2395        assert!(xml.contains("<Quantity>1</Quantity>"));
2396    }
2397
2398    #[tokio::test]
2399    async fn tags_roundtrip() {
2400        let svc = CloudFrontService::new(make_state());
2401        let body = minimal_dist_config_xml("tag-ref");
2402        let create = svc
2403            .handle(make_request(
2404                http::Method::POST,
2405                "/2020-05-31/distribution",
2406                "",
2407                &body,
2408            ))
2409            .await
2410            .unwrap();
2411        let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2412        let arn = xml
2413            .split("<ARN>")
2414            .nth(1)
2415            .unwrap()
2416            .split("</ARN>")
2417            .next()
2418            .unwrap();
2419        let tag_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2420<Tags xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2421  <Items><Tag><Key>env</Key><Value>prod</Value></Tag></Items>
2422</Tags>"#;
2423        let arn_q = format!("Operation=Tag&Resource={}", arn);
2424        let resp = svc
2425            .handle(make_request(
2426                http::Method::POST,
2427                "/2020-05-31/tagging",
2428                &arn_q,
2429                tag_body,
2430            ))
2431            .await
2432            .unwrap();
2433        assert_eq!(resp.status, StatusCode::NO_CONTENT);
2434        let list = svc
2435            .handle(make_request(
2436                http::Method::GET,
2437                "/2020-05-31/tagging",
2438                &format!("Resource={}", arn),
2439                "",
2440            ))
2441            .await
2442            .unwrap();
2443        let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2444        assert!(xml.contains("<Key>env</Key>"));
2445        assert!(xml.contains("<Value>prod</Value>"));
2446    }
2447
2448    #[tokio::test]
2449    async fn xml_metacharacters_in_user_input_are_escaped() {
2450        let svc = CloudFrontService::new(make_state());
2451        let body = minimal_dist_config_xml("escape-ref").replace(
2452            "<Comment></Comment>",
2453            "<Comment><![CDATA[a&b<c>d]]></Comment>",
2454        );
2455        let create = svc
2456            .handle(make_request(
2457                http::Method::POST,
2458                "/2020-05-31/distribution",
2459                "",
2460                &body,
2461            ))
2462            .await
2463            .unwrap();
2464        let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2465        let dist_id = xml
2466            .split("<Id>")
2467            .nth(1)
2468            .unwrap()
2469            .split("</Id>")
2470            .next()
2471            .unwrap();
2472        let arn = xml
2473            .split("<ARN>")
2474            .nth(1)
2475            .unwrap()
2476            .split("</ARN>")
2477            .next()
2478            .unwrap();
2479
2480        let tag_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2481<Tags xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2482  <Items><Tag><Key>env</Key><Value>a&amp;b&lt;c&gt;d</Value></Tag></Items>
2483</Tags>"#;
2484        let arn_q = format!("Operation=Tag&Resource={}", arn);
2485        svc.handle(make_request(
2486            http::Method::POST,
2487            "/2020-05-31/tagging",
2488            &arn_q,
2489            tag_body,
2490        ))
2491        .await
2492        .unwrap();
2493
2494        let list = svc
2495            .handle(make_request(
2496                http::Method::GET,
2497                "/2020-05-31/tagging",
2498                &format!("Resource={}", arn),
2499                "",
2500            ))
2501            .await
2502            .unwrap();
2503        let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2504        assert!(xml.contains("<Value>a&amp;b&lt;c&gt;d</Value>"));
2505        assert!(!xml.contains("<Value>a&b<c>d</Value>"));
2506
2507        // Force the distribution into the list-rendering path so the
2508        // unescaped Comment field would surface there.
2509        let list_resp = svc
2510            .handle(make_request(
2511                http::Method::GET,
2512                "/2020-05-31/distribution",
2513                "",
2514                "",
2515            ))
2516            .await
2517            .unwrap();
2518        let xml = std::str::from_utf8(list_resp.body.expect_bytes()).unwrap();
2519        // Ensure raw `<` from the user-supplied comment never lands inside
2520        // a Comment element on the wire.
2521        assert!(!xml.contains("<Comment>a&b<c>d"));
2522        assert!(xml.contains(dist_id));
2523    }
2524}