Skip to main content

fakecloud_cloudfront/
service.rs

1//! CloudFront REST-XML service implementation.
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use chrono::Utc;
8use http::header::{HeaderName, HeaderValue, ETAG, IF_MATCH, LOCATION};
9use http::{HeaderMap, StatusCode};
10use parking_lot::RwLock;
11use uuid::Uuid;
12
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError, ResponseBody};
14
15use crate::model::{
16    DistributionConfig, DistributionConfigWithTags, InvalidationBatch, TagKeys, Tags as ModelTags,
17};
18use crate::router::{route, Route};
19use crate::state::{
20    CloudFrontAccounts, SharedCloudFrontState, StoredDistribution, StoredInvalidation, Tag,
21};
22use crate::xml_io;
23
24pub(crate) const DEFAULT_ACCOUNT: &str = "000000000000";
25
26const SUPPORTED_ACTIONS: &[&str] = &[
27    "CreateDistribution",
28    "CreateDistributionWithTags",
29    "GetDistribution",
30    "GetDistributionConfig",
31    "UpdateDistribution",
32    "DeleteDistribution",
33    "ListDistributions",
34    "CopyDistribution",
35    "CreateInvalidation",
36    "GetInvalidation",
37    "ListInvalidations",
38    "TagResource",
39    "UntagResource",
40    "ListTagsForResource",
41    "AssociateAlias",
42    "ListConflictingAliases",
43    "ListDistributionsByCachePolicyId",
44    "ListDistributionsByOriginRequestPolicyId",
45    "ListDistributionsByResponseHeadersPolicyId",
46    "ListDistributionsByKeyGroup",
47    "ListDistributionsByWebACLId",
48    "ListDistributionsByVpcOriginId",
49    "ListDistributionsByAnycastIpListId",
50    "ListDistributionsByConnectionMode",
51    "ListDistributionsByConnectionFunction",
52    "ListDistributionsByOwnedResource",
53    "ListDistributionsByTrustStore",
54    "ListDistributionsByRealtimeLogConfig",
55    "AssociateDistributionWebACL",
56    "DisassociateDistributionWebACL",
57    "CreateOriginAccessControl",
58    "GetOriginAccessControl",
59    "GetOriginAccessControlConfig",
60    "UpdateOriginAccessControl",
61    "DeleteOriginAccessControl",
62    "ListOriginAccessControls",
63    "CreateCachePolicy",
64    "GetCachePolicy",
65    "GetCachePolicyConfig",
66    "UpdateCachePolicy",
67    "DeleteCachePolicy",
68    "ListCachePolicies",
69    "CreateOriginRequestPolicy",
70    "GetOriginRequestPolicy",
71    "GetOriginRequestPolicyConfig",
72    "UpdateOriginRequestPolicy",
73    "DeleteOriginRequestPolicy",
74    "ListOriginRequestPolicies",
75    "CreateResponseHeadersPolicy",
76    "GetResponseHeadersPolicy",
77    "GetResponseHeadersPolicyConfig",
78    "UpdateResponseHeadersPolicy",
79    "DeleteResponseHeadersPolicy",
80    "ListResponseHeadersPolicies",
81    "CreateContinuousDeploymentPolicy",
82    "GetContinuousDeploymentPolicy",
83    "GetContinuousDeploymentPolicyConfig",
84    "UpdateContinuousDeploymentPolicy",
85    "DeleteContinuousDeploymentPolicy",
86    "ListContinuousDeploymentPolicies",
87    "CreateFunction",
88    "DescribeFunction",
89    "GetFunction",
90    "UpdateFunction",
91    "DeleteFunction",
92    "ListFunctions",
93    "PublishFunction",
94    "TestFunction",
95    "CreatePublicKey",
96    "GetPublicKey",
97    "GetPublicKeyConfig",
98    "UpdatePublicKey",
99    "DeletePublicKey",
100    "ListPublicKeys",
101    "CreateKeyGroup",
102    "GetKeyGroup",
103    "GetKeyGroupConfig",
104    "UpdateKeyGroup",
105    "DeleteKeyGroup",
106    "ListKeyGroups",
107    "CreateKeyValueStore",
108    "DescribeKeyValueStore",
109    "UpdateKeyValueStore",
110    "DeleteKeyValueStore",
111    "ListKeyValueStores",
112    "CreateCloudFrontOriginAccessIdentity",
113    "GetCloudFrontOriginAccessIdentity",
114    "GetCloudFrontOriginAccessIdentityConfig",
115    "UpdateCloudFrontOriginAccessIdentity",
116    "DeleteCloudFrontOriginAccessIdentity",
117    "ListCloudFrontOriginAccessIdentities",
118    "CreateMonitoringSubscription",
119    "GetMonitoringSubscription",
120    "DeleteMonitoringSubscription",
121    "CreateStreamingDistribution",
122    "CreateStreamingDistributionWithTags",
123    "GetStreamingDistribution",
124    "GetStreamingDistributionConfig",
125    "UpdateStreamingDistribution",
126    "DeleteStreamingDistribution",
127    "ListStreamingDistributions",
128    "CreateFieldLevelEncryptionConfig",
129    "GetFieldLevelEncryption",
130    "GetFieldLevelEncryptionConfig",
131    "UpdateFieldLevelEncryptionConfig",
132    "DeleteFieldLevelEncryptionConfig",
133    "ListFieldLevelEncryptionConfigs",
134    "CreateFieldLevelEncryptionProfile",
135    "GetFieldLevelEncryptionProfile",
136    "GetFieldLevelEncryptionProfileConfig",
137    "UpdateFieldLevelEncryptionProfile",
138    "DeleteFieldLevelEncryptionProfile",
139    "ListFieldLevelEncryptionProfiles",
140    "CreateRealtimeLogConfig",
141    "GetRealtimeLogConfig",
142    "UpdateRealtimeLogConfig",
143    "DeleteRealtimeLogConfig",
144    "ListRealtimeLogConfigs",
145    "CreateVpcOrigin",
146    "GetVpcOrigin",
147    "UpdateVpcOrigin",
148    "DeleteVpcOrigin",
149    "ListVpcOrigins",
150    "CreateAnycastIpList",
151    "GetAnycastIpList",
152    "UpdateAnycastIpList",
153    "DeleteAnycastIpList",
154    "ListAnycastIpLists",
155    "CreateTrustStore",
156    "GetTrustStore",
157    "UpdateTrustStore",
158    "DeleteTrustStore",
159    "ListTrustStores",
160    "GetResourcePolicy",
161    "PutResourcePolicy",
162    "DeleteResourcePolicy",
163    "CreateConnectionGroup",
164    "GetConnectionGroup",
165    "GetConnectionGroupByRoutingEndpoint",
166    "UpdateConnectionGroup",
167    "DeleteConnectionGroup",
168    "ListConnectionGroups",
169    "ListDomainConflicts",
170    "UpdateDomainAssociation",
171    "VerifyDnsConfiguration",
172    "GetManagedCertificateDetails",
173    "UpdateDistributionWithStagingConfig",
174    "CreateDistributionTenant",
175    "GetDistributionTenant",
176    "GetDistributionTenantByDomain",
177    "UpdateDistributionTenant",
178    "DeleteDistributionTenant",
179    "ListDistributionTenants",
180    "ListDistributionTenantsByCustomization",
181    "AssociateDistributionTenantWebACL",
182    "DisassociateDistributionTenantWebACL",
183    "CreateInvalidationForDistributionTenant",
184    "GetInvalidationForDistributionTenant",
185    "ListInvalidationsForDistributionTenant",
186    "CreateConnectionFunction",
187    "GetConnectionFunction",
188    "DescribeConnectionFunction",
189    "UpdateConnectionFunction",
190    "DeleteConnectionFunction",
191    "ListConnectionFunctions",
192    "PublishConnectionFunction",
193    "TestConnectionFunction",
194];
195
196pub struct CloudFrontService {
197    pub(crate) state: SharedCloudFrontState,
198    /// How long the propagation tick sleeps before flipping a freshly
199    /// created or updated Distribution / DistributionTenant /
200    /// ConnectionGroup / StreamingDistribution from `InProgress` to
201    /// `Deployed`. Real CloudFront takes ~15 minutes; the default of 1s
202    /// keeps SDK-driven integration tests fast while still simulating the
203    /// async transition. Tests can shrink it via
204    /// [`CloudFrontService::with_propagation_delay`], and operators can
205    /// override the default via the
206    /// `FAKECLOUD_CLOUDFRONT_STATUS_DELAY_SEC` env var.
207    pub(crate) propagation_delay: std::time::Duration,
208}
209
210/// Resolve the default `InProgress` -> `Deployed` delay from the
211/// `FAKECLOUD_CLOUDFRONT_STATUS_DELAY_SEC` env var, falling back to 1
212/// second. The value parses as either a non-negative integer (seconds) or
213/// a floating-point number; `0` keeps the transition synchronous, which is
214/// useful for fast unit tests. Invalid input falls back to 1 second so a
215/// typo in the env var can't accidentally hang the process.
216fn default_propagation_delay() -> std::time::Duration {
217    let Ok(raw) = std::env::var("FAKECLOUD_CLOUDFRONT_STATUS_DELAY_SEC") else {
218        return std::time::Duration::from_secs(1);
219    };
220    let trimmed = raw.trim();
221    if let Ok(secs) = trimmed.parse::<u64>() {
222        return std::time::Duration::from_secs(secs);
223    }
224    if let Ok(secs) = trimmed.parse::<f64>() {
225        if secs.is_finite() && secs >= 0.0 {
226            return std::time::Duration::from_secs_f64(secs);
227        }
228    }
229    std::time::Duration::from_secs(1)
230}
231
232impl CloudFrontService {
233    pub fn new(state: SharedCloudFrontState) -> Self {
234        Self {
235            state,
236            propagation_delay: default_propagation_delay(),
237        }
238    }
239
240    pub fn shared_state(&self) -> SharedCloudFrontState {
241        Arc::clone(&self.state)
242    }
243
244    /// Override the propagation delay used by `CreateDistribution`,
245    /// `UpdateDistribution`, and the equivalent DistributionTenant /
246    /// ConnectionGroup ops. Tests pass a small duration so they don't
247    /// have to wait wall-clock seconds for the `InProgress` -> `Deployed`
248    /// transition.
249    pub fn with_propagation_delay(mut self, delay: std::time::Duration) -> Self {
250        self.propagation_delay = delay;
251        self
252    }
253
254    /// Flip a stored Distribution's status synchronously. Returns `false`
255    /// if no distribution matches `id` across any account. The admin
256    /// endpoint `POST /_fakecloud/cloudfront/distributions/{id}/status`
257    /// calls this so tests can force `InProgress` <-> `Deployed` without
258    /// waiting on the propagation tick.
259    pub fn set_distribution_status(&self, id: &str, status: &str) -> bool {
260        let mut state = self.state.write();
261        for account in state.accounts.values_mut() {
262            if let Some(dist) = account.distributions.get_mut(id) {
263                dist.status = status.to_string();
264                return true;
265            }
266        }
267        false
268    }
269}
270
271impl Default for CloudFrontService {
272    fn default() -> Self {
273        Self::new(Arc::new(RwLock::new(CloudFrontAccounts::new())))
274    }
275}
276
277#[async_trait]
278impl AwsService for CloudFrontService {
279    fn service_name(&self) -> &str {
280        "cloudfront"
281    }
282
283    fn supported_actions(&self) -> &[&str] {
284        SUPPORTED_ACTIONS
285    }
286
287    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
288        let resolved = match route(&req.method, &req.raw_path, &req.raw_query) {
289            Some(r) => r,
290            None => {
291                return Err(aws_error(
292                    StatusCode::NOT_FOUND,
293                    "InvalidArgument",
294                    format!("Unknown CloudFront route: {} {}", req.method, req.raw_path),
295                ));
296            }
297        };
298
299        match resolved.action {
300            "CreateDistribution" => self.create_distribution(&req, false),
301            "CreateDistributionWithTags" => self.create_distribution(&req, true),
302            "GetDistribution" => self.get_distribution(&resolved),
303            "GetDistributionConfig" => self.get_distribution_config(&resolved),
304            "UpdateDistribution" => self.update_distribution(&req, &resolved),
305            "DeleteDistribution" => self.delete_distribution(&req, &resolved),
306            "ListDistributions" => self.list_distributions(&req),
307            "CopyDistribution" => self.copy_distribution(&req, &resolved),
308            "CreateInvalidation" => self.create_invalidation(&req, &resolved),
309            "GetInvalidation" => self.get_invalidation(&resolved),
310            "ListInvalidations" => self.list_invalidations(&resolved),
311            "TagResource" => self.tag_resource(&req),
312            "UntagResource" => self.untag_resource(&req),
313            "ListTagsForResource" => self.list_tags_for_resource(&req),
314            "AssociateAlias" => self.associate_alias(&req, &resolved),
315            "ListConflictingAliases" => self.list_conflicting_aliases(&req),
316            "AssociateDistributionWebACL" => self.associate_web_acl(&req, &resolved),
317            "DisassociateDistributionWebACL" => self.disassociate_web_acl(&req, &resolved),
318            "ListDistributionsByCachePolicyId"
319            | "ListDistributionsByOriginRequestPolicyId"
320            | "ListDistributionsByResponseHeadersPolicyId"
321            | "ListDistributionsByKeyGroup"
322            | "ListDistributionsByWebACLId"
323            | "ListDistributionsByVpcOriginId"
324            | "ListDistributionsByAnycastIpListId"
325            | "ListDistributionsByConnectionMode"
326            | "ListDistributionsByConnectionFunction"
327            | "ListDistributionsByOwnedResource"
328            | "ListDistributionsByTrustStore"
329            | "ListDistributionsByRealtimeLogConfig" => self.list_distributions_by(resolved.action),
330            "CreateOriginAccessControl" => self.create_origin_access_control(&req),
331            "GetOriginAccessControl" => self.get_origin_access_control(&resolved),
332            "GetOriginAccessControlConfig" => self.get_origin_access_control_config(&resolved),
333            "UpdateOriginAccessControl" => self.update_origin_access_control(&req, &resolved),
334            "DeleteOriginAccessControl" => self.delete_origin_access_control(&req, &resolved),
335            "ListOriginAccessControls" => self.list_origin_access_controls(&req),
336            "CreateCachePolicy" => self.create_cache_policy(&req),
337            "GetCachePolicy" => self.get_cache_policy(&resolved),
338            "GetCachePolicyConfig" => self.get_cache_policy_config(&resolved),
339            "UpdateCachePolicy" => self.update_cache_policy(&req, &resolved),
340            "DeleteCachePolicy" => self.delete_cache_policy(&req, &resolved),
341            "ListCachePolicies" => self.list_cache_policies(&req),
342            "CreateOriginRequestPolicy" => self.create_origin_request_policy(&req),
343            "GetOriginRequestPolicy" => self.get_origin_request_policy(&resolved),
344            "GetOriginRequestPolicyConfig" => self.get_origin_request_policy_config(&resolved),
345            "UpdateOriginRequestPolicy" => self.update_origin_request_policy(&req, &resolved),
346            "DeleteOriginRequestPolicy" => self.delete_origin_request_policy(&req, &resolved),
347            "ListOriginRequestPolicies" => self.list_origin_request_policies(&req),
348            "CreateResponseHeadersPolicy" => self.create_response_headers_policy(&req),
349            "GetResponseHeadersPolicy" => self.get_response_headers_policy(&resolved),
350            "GetResponseHeadersPolicyConfig" => self.get_response_headers_policy_config(&resolved),
351            "UpdateResponseHeadersPolicy" => self.update_response_headers_policy(&req, &resolved),
352            "DeleteResponseHeadersPolicy" => self.delete_response_headers_policy(&req, &resolved),
353            "ListResponseHeadersPolicies" => self.list_response_headers_policies(&req),
354            "CreateContinuousDeploymentPolicy" => self.create_continuous_deployment_policy(&req),
355            "GetContinuousDeploymentPolicy" => self.get_continuous_deployment_policy(&resolved),
356            "GetContinuousDeploymentPolicyConfig" => {
357                self.get_continuous_deployment_policy_config(&resolved)
358            }
359            "UpdateContinuousDeploymentPolicy" => {
360                self.update_continuous_deployment_policy(&req, &resolved)
361            }
362            "DeleteContinuousDeploymentPolicy" => {
363                self.delete_continuous_deployment_policy(&req, &resolved)
364            }
365            "ListContinuousDeploymentPolicies" => self.list_continuous_deployment_policies(&req),
366            "CreateFunction" => self.create_function(&req),
367            "DescribeFunction" => self.describe_function(&req, &resolved),
368            "GetFunction" => self.get_function(&req, &resolved),
369            "UpdateFunction" => self.update_function(&req, &resolved),
370            "DeleteFunction" => self.delete_function(&req, &resolved),
371            "ListFunctions" => self.list_functions(&req),
372            "PublishFunction" => self.publish_function(&req, &resolved),
373            "TestFunction" => self.test_function(&req, &resolved),
374            "CreatePublicKey" => self.create_public_key(&req),
375            "GetPublicKey" => self.get_public_key(&resolved),
376            "GetPublicKeyConfig" => self.get_public_key_config(&resolved),
377            "UpdatePublicKey" => self.update_public_key(&req, &resolved),
378            "DeletePublicKey" => self.delete_public_key(&req, &resolved),
379            "ListPublicKeys" => self.list_public_keys(&req),
380            "CreateKeyGroup" => self.create_key_group(&req),
381            "GetKeyGroup" => self.get_key_group(&resolved),
382            "GetKeyGroupConfig" => self.get_key_group_config(&resolved),
383            "UpdateKeyGroup" => self.update_key_group(&req, &resolved),
384            "DeleteKeyGroup" => self.delete_key_group(&req, &resolved),
385            "ListKeyGroups" => self.list_key_groups(&req),
386            "CreateKeyValueStore" => self.create_key_value_store(&req),
387            "DescribeKeyValueStore" => self.describe_key_value_store(&resolved),
388            "UpdateKeyValueStore" => self.update_key_value_store(&req, &resolved),
389            "DeleteKeyValueStore" => self.delete_key_value_store(&req, &resolved),
390            "ListKeyValueStores" => self.list_key_value_stores(&req),
391            "CreateCloudFrontOriginAccessIdentity" => self.create_oai(&req),
392            "GetCloudFrontOriginAccessIdentity" => self.get_oai(&resolved),
393            "GetCloudFrontOriginAccessIdentityConfig" => self.get_oai_config(&resolved),
394            "UpdateCloudFrontOriginAccessIdentity" => self.update_oai(&req, &resolved),
395            "DeleteCloudFrontOriginAccessIdentity" => self.delete_oai(&req, &resolved),
396            "ListCloudFrontOriginAccessIdentities" => self.list_oai(&req),
397            "CreateMonitoringSubscription" => self.create_monitoring_subscription(&req, &resolved),
398            "GetMonitoringSubscription" => self.get_monitoring_subscription(&resolved),
399            "DeleteMonitoringSubscription" => self.delete_monitoring_subscription(&resolved),
400            "CreateStreamingDistribution" => self.create_streaming_distribution(&req, false),
401            "CreateStreamingDistributionWithTags" => self.create_streaming_distribution(&req, true),
402            "GetStreamingDistribution" => self.get_streaming_distribution(&resolved),
403            "GetStreamingDistributionConfig" => self.get_streaming_distribution_config(&resolved),
404            "UpdateStreamingDistribution" => self.update_streaming_distribution(&req, &resolved),
405            "DeleteStreamingDistribution" => self.delete_streaming_distribution(&req, &resolved),
406            "ListStreamingDistributions" => self.list_streaming_distributions(&req),
407            "CreateFieldLevelEncryptionConfig" => self.create_field_level_encryption_config(&req),
408            "GetFieldLevelEncryption" => self.get_field_level_encryption(&resolved),
409            "GetFieldLevelEncryptionConfig" => self.get_field_level_encryption_config(&resolved),
410            "UpdateFieldLevelEncryptionConfig" => {
411                self.update_field_level_encryption_config(&req, &resolved)
412            }
413            "DeleteFieldLevelEncryptionConfig" => {
414                self.delete_field_level_encryption_config(&req, &resolved)
415            }
416            "ListFieldLevelEncryptionConfigs" => self.list_field_level_encryption_configs(&req),
417            "CreateFieldLevelEncryptionProfile" => self.create_field_level_encryption_profile(&req),
418            "GetFieldLevelEncryptionProfile" => self.get_field_level_encryption_profile(&resolved),
419            "GetFieldLevelEncryptionProfileConfig" => {
420                self.get_field_level_encryption_profile_config(&resolved)
421            }
422            "UpdateFieldLevelEncryptionProfile" => {
423                self.update_field_level_encryption_profile(&req, &resolved)
424            }
425            "DeleteFieldLevelEncryptionProfile" => {
426                self.delete_field_level_encryption_profile(&req, &resolved)
427            }
428            "ListFieldLevelEncryptionProfiles" => self.list_field_level_encryption_profiles(&req),
429            "CreateRealtimeLogConfig" => self.create_realtime_log_config(&req),
430            "GetRealtimeLogConfig" => self.get_realtime_log_config(&req),
431            "UpdateRealtimeLogConfig" => self.update_realtime_log_config(&req),
432            "DeleteRealtimeLogConfig" => self.delete_realtime_log_config(&req),
433            "ListRealtimeLogConfigs" => self.list_realtime_log_configs(&req),
434            "CreateVpcOrigin" => self.create_vpc_origin(&req),
435            "GetVpcOrigin" => self.get_vpc_origin(&resolved),
436            "UpdateVpcOrigin" => self.update_vpc_origin(&req, &resolved),
437            "DeleteVpcOrigin" => self.delete_vpc_origin(&req, &resolved),
438            "ListVpcOrigins" => self.list_vpc_origins(&req),
439            "CreateAnycastIpList" => self.create_anycast_ip_list(&req),
440            "GetAnycastIpList" => self.get_anycast_ip_list(&resolved),
441            "UpdateAnycastIpList" => self.update_anycast_ip_list(&req, &resolved),
442            "DeleteAnycastIpList" => self.delete_anycast_ip_list(&req, &resolved),
443            "ListAnycastIpLists" => self.list_anycast_ip_lists(&req),
444            "CreateTrustStore" => self.create_trust_store(&req),
445            "GetTrustStore" => self.get_trust_store(&resolved),
446            "UpdateTrustStore" => self.update_trust_store(&req, &resolved),
447            "DeleteTrustStore" => self.delete_trust_store(&req, &resolved),
448            "ListTrustStores" => self.list_trust_stores(&req),
449            "GetResourcePolicy" => self.get_resource_policy(&req),
450            "PutResourcePolicy" => self.put_resource_policy(&req),
451            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
452            "CreateConnectionGroup" => self.create_connection_group(&req),
453            "GetConnectionGroup" => self.get_connection_group(&resolved),
454            "GetConnectionGroupByRoutingEndpoint" => {
455                self.get_connection_group_by_routing_endpoint(&req)
456            }
457            "UpdateConnectionGroup" => self.update_connection_group(&req, &resolved),
458            "DeleteConnectionGroup" => self.delete_connection_group(&req, &resolved),
459            "ListConnectionGroups" => self.list_connection_groups(&req),
460            "ListDomainConflicts" => self.list_domain_conflicts(&req),
461            "UpdateDomainAssociation" => self.update_domain_association(&req),
462            "VerifyDnsConfiguration" => self.verify_dns_configuration(&req),
463            "GetManagedCertificateDetails" => self.get_managed_certificate_details(&resolved),
464            "UpdateDistributionWithStagingConfig" => {
465                self.update_distribution_with_staging_config(&req, &resolved)
466            }
467            "CreateDistributionTenant" => self.create_distribution_tenant(&req),
468            "GetDistributionTenant" => self.get_distribution_tenant(&resolved),
469            "GetDistributionTenantByDomain" => self.get_distribution_tenant_by_domain(&req),
470            "UpdateDistributionTenant" => self.update_distribution_tenant(&req, &resolved),
471            "DeleteDistributionTenant" => self.delete_distribution_tenant(&req, &resolved),
472            "ListDistributionTenants" => self.list_distribution_tenants(&req),
473            "ListDistributionTenantsByCustomization" => {
474                self.list_distribution_tenants_by_customization(&req)
475            }
476            "AssociateDistributionTenantWebACL" => {
477                self.associate_distribution_tenant_web_acl(&req, &resolved)
478            }
479            "DisassociateDistributionTenantWebACL" => {
480                self.disassociate_distribution_tenant_web_acl(&req, &resolved)
481            }
482            "CreateInvalidationForDistributionTenant" => {
483                self.create_invalidation_for_distribution_tenant(&req, &resolved)
484            }
485            "GetInvalidationForDistributionTenant" => {
486                self.get_invalidation_for_distribution_tenant(&resolved)
487            }
488            "ListInvalidationsForDistributionTenant" => {
489                self.list_invalidations_for_distribution_tenant(&resolved)
490            }
491            "CreateConnectionFunction" => self.create_connection_function(&req),
492            "GetConnectionFunction" => self.get_connection_function(&resolved),
493            "DescribeConnectionFunction" => self.describe_connection_function(&resolved),
494            "UpdateConnectionFunction" => self.update_connection_function(&req, &resolved),
495            "DeleteConnectionFunction" => self.delete_connection_function(&req, &resolved),
496            "ListConnectionFunctions" => self.list_connection_functions(&req),
497            "PublishConnectionFunction" => self.publish_connection_function(&req, &resolved),
498            "TestConnectionFunction" => self.test_connection_function(&req, &resolved),
499            other => Err(aws_error(
500                StatusCode::NOT_IMPLEMENTED,
501                "InvalidAction",
502                format!("CloudFront action {other} is not implemented yet"),
503            )),
504        }
505    }
506}
507
508// ─── Distribution handlers ────────────────────────────────────────────
509
510impl CloudFrontService {
511    fn create_distribution(
512        &self,
513        req: &AwsRequest,
514        with_tags: bool,
515    ) -> Result<AwsResponse, AwsServiceError> {
516        let (config, tags) = if with_tags {
517            let parsed: DistributionConfigWithTags = xml_io::from_xml_root(&req.body)
518                .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
519            let tags = parsed
520                .tags
521                .items
522                .map(|i| {
523                    i.tag
524                        .into_iter()
525                        .map(|t| Tag {
526                            key: t.key,
527                            value: t.value,
528                        })
529                        .collect()
530                })
531                .unwrap_or_default();
532            (parsed.distribution_config, tags)
533        } else {
534            let parsed: DistributionConfig = xml_io::from_xml_root(&req.body)
535                .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
536            (parsed, Vec::new())
537        };
538
539        validate_caller_reference(&config.caller_reference)?;
540        validate_origins(&config)?;
541
542        let mut state = self.state.write();
543        let account = state.entry(account_id(req));
544
545        if let Some(existing) = account
546            .distributions
547            .values()
548            .find(|d| d.config.caller_reference == config.caller_reference)
549        {
550            return Err(aws_error(
551                StatusCode::CONFLICT,
552                "DistributionAlreadyExists",
553                format!(
554                    "Distribution with the same CallerReference exists: {}",
555                    existing.id
556                ),
557            ));
558        }
559
560        let id = generate_distribution_id();
561        let now = Utc::now();
562        let etag = generate_etag();
563        let domain = format!("{}.cloudfront.net", id.to_lowercase());
564        let arn = format!(
565            "arn:aws:cloudfront::{}:distribution/{}",
566            account_id(req),
567            id
568        );
569
570        let stored = StoredDistribution {
571            id: id.clone(),
572            arn: arn.clone(),
573            // Real CloudFront returns InProgress immediately and flips to
574            // Deployed ~15 minutes later once the edge propagation completes.
575            // The spawn below mirrors that lifecycle on a configurable delay.
576            status: "InProgress".to_string(),
577            last_modified_time: now,
578            domain_name: domain,
579            in_progress_invalidation_batches: 0,
580            etag: etag.clone(),
581            config,
582        };
583        account.distributions.insert(id.clone(), stored.clone());
584        if !tags.is_empty() {
585            account.tags.insert(arn.clone(), tags);
586        }
587        drop(state);
588
589        self.schedule_distribution_deploy(id.clone());
590
591        let body = build_distribution_xml(&stored);
592        let mut headers = HeaderMap::new();
593        set_header(&mut headers, ETAG, &etag);
594        set_header(&mut headers, LOCATION, &stored.arn);
595        Ok(xml_response(StatusCode::CREATED, body, headers))
596    }
597
598    /// Spawn a tokio task that flips the named distribution from
599    /// `InProgress` to `Deployed` after `propagation_delay`. Used by
600    /// `CreateDistribution` and `UpdateDistribution` to model the real
601    /// CloudFront edge propagation lifecycle.
602    fn schedule_distribution_deploy(&self, id: String) {
603        let state = Arc::clone(&self.state);
604        let delay = self.propagation_delay;
605        tokio::spawn(async move {
606            tokio::time::sleep(delay).await;
607            let mut s = state.write();
608            for account in s.accounts.values_mut() {
609                if let Some(d) = account.distributions.get_mut(&id) {
610                    if d.status == "InProgress" {
611                        d.status = "Deployed".to_string();
612                    }
613                    return;
614                }
615            }
616        });
617    }
618
619    /// Same as [`schedule_distribution_deploy`] but for DistributionTenants.
620    pub(crate) fn schedule_distribution_tenant_deploy(&self, id: String) {
621        let state = Arc::clone(&self.state);
622        let delay = self.propagation_delay;
623        tokio::spawn(async move {
624            tokio::time::sleep(delay).await;
625            let mut s = state.write();
626            for account in s.accounts.values_mut() {
627                if let Some(t) = account.distribution_tenants.get_mut(&id) {
628                    if t.status == "InProgress" {
629                        t.status = "Deployed".to_string();
630                    }
631                    return;
632                }
633            }
634        });
635    }
636
637    /// Same as [`schedule_distribution_deploy`] but for ConnectionGroups.
638    pub(crate) fn schedule_connection_group_deploy(&self, id: String) {
639        let state = Arc::clone(&self.state);
640        let delay = self.propagation_delay;
641        tokio::spawn(async move {
642            tokio::time::sleep(delay).await;
643            let mut s = state.write();
644            for account in s.accounts.values_mut() {
645                if let Some(g) = account.connection_groups.get_mut(&id) {
646                    if g.status == "InProgress" {
647                        g.status = "Deployed".to_string();
648                    }
649                    return;
650                }
651            }
652        });
653    }
654
655    /// Same as [`schedule_distribution_deploy`] but for StreamingDistributions.
656    /// Real CloudFront mirrors the `InProgress` -> `Deployed` lifecycle for
657    /// RTMP streaming distributions, even though the resource is largely
658    /// deprecated.
659    pub(crate) fn schedule_streaming_distribution_deploy(&self, id: String) {
660        let state = Arc::clone(&self.state);
661        let delay = self.propagation_delay;
662        tokio::spawn(async move {
663            tokio::time::sleep(delay).await;
664            let mut s = state.write();
665            for account in s.accounts.values_mut() {
666                if let Some(d) = account.streaming_distributions.get_mut(&id) {
667                    if d.status == "InProgress" {
668                        d.status = "Deployed".to_string();
669                    }
670                    return;
671                }
672            }
673        });
674    }
675
676    fn get_distribution(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
677        let id = route
678            .id
679            .as_deref()
680            .ok_or_else(|| invalid_argument("missing distribution id"))?;
681        let state = self.state.read();
682        let account = state
683            .accounts
684            .get(DEFAULT_ACCOUNT)
685            .ok_or_else(|| no_such_distribution(id))?;
686        let dist = account
687            .distributions
688            .get(id)
689            .ok_or_else(|| no_such_distribution(id))?
690            .clone();
691        drop(state);
692        let body = build_distribution_xml(&dist);
693        let mut headers = HeaderMap::new();
694        set_header(&mut headers, ETAG, &dist.etag);
695        Ok(xml_response(StatusCode::OK, body, headers))
696    }
697
698    fn get_distribution_config(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
699        let id = route
700            .id
701            .as_deref()
702            .ok_or_else(|| invalid_argument("missing distribution id"))?;
703        let state = self.state.read();
704        let account = state
705            .accounts
706            .get(DEFAULT_ACCOUNT)
707            .ok_or_else(|| no_such_distribution(id))?;
708        let dist = account
709            .distributions
710            .get(id)
711            .ok_or_else(|| no_such_distribution(id))?
712            .clone();
713        drop(state);
714        let body = xml_io::to_xml_root("DistributionConfig", &dist.config)
715            .map_err(|e| internal_error(format!("xml encode failed: {e}")))?;
716        let mut headers = HeaderMap::new();
717        set_header(&mut headers, ETAG, &dist.etag);
718        Ok(xml_response(StatusCode::OK, body, headers))
719    }
720
721    fn update_distribution(
722        &self,
723        req: &AwsRequest,
724        route: &Route,
725    ) -> Result<AwsResponse, AwsServiceError> {
726        let id = route
727            .id
728            .as_deref()
729            .ok_or_else(|| invalid_argument("missing distribution id"))?;
730        let if_match = req
731            .headers
732            .get(IF_MATCH)
733            .and_then(|v| v.to_str().ok())
734            .ok_or_else(|| {
735                aws_error(
736                    StatusCode::BAD_REQUEST,
737                    "InvalidIfMatchVersion",
738                    "Missing If-Match header for UpdateDistribution",
739                )
740            })?
741            .to_string();
742        let new_config: DistributionConfig = xml_io::from_xml_root(&req.body)
743            .map_err(|e| invalid_argument(format!("invalid DistributionConfig XML: {e}")))?;
744        validate_caller_reference(&new_config.caller_reference)?;
745        validate_origins(&new_config)?;
746
747        let mut state = self.state.write();
748        let account = state
749            .accounts
750            .get_mut(DEFAULT_ACCOUNT)
751            .ok_or_else(|| no_such_distribution(id))?;
752        let dist = account
753            .distributions
754            .get_mut(id)
755            .ok_or_else(|| no_such_distribution(id))?;
756        if dist.etag != if_match {
757            return Err(aws_error(
758                StatusCode::PRECONDITION_FAILED,
759                "PreconditionFailed",
760                "If-Match header does not match the current ETag",
761            ));
762        }
763        // ETag stability: only bump the ETag and flip status back to
764        // InProgress when the new config actually differs from what we
765        // have on disk. A no-op UpdateDistribution (PUT the same config
766        // back) leaves the ETag intact, matching AWS behavior.
767        let config_changed = !configs_equal(&dist.config, &new_config);
768        if config_changed {
769            dist.config = new_config;
770            dist.etag = generate_etag();
771            dist.last_modified_time = Utc::now();
772            // UpdateDistribution kicks off a fresh edge propagation; AWS
773            // flips the status back to InProgress until the new config
774            // lands.
775            dist.status = "InProgress".to_string();
776        }
777        let snapshot = dist.clone();
778        drop(state);
779
780        if config_changed {
781            self.schedule_distribution_deploy(id.to_string());
782        }
783
784        let body = build_distribution_xml(&snapshot);
785        let mut headers = HeaderMap::new();
786        set_header(&mut headers, ETAG, &snapshot.etag);
787        Ok(xml_response(StatusCode::OK, body, headers))
788    }
789
790    fn delete_distribution(
791        &self,
792        req: &AwsRequest,
793        route: &Route,
794    ) -> Result<AwsResponse, AwsServiceError> {
795        let id = route
796            .id
797            .as_deref()
798            .ok_or_else(|| invalid_argument("missing distribution id"))?;
799        let if_match = req
800            .headers
801            .get(IF_MATCH)
802            .and_then(|v| v.to_str().ok())
803            .ok_or_else(|| {
804                aws_error(
805                    StatusCode::BAD_REQUEST,
806                    "InvalidIfMatchVersion",
807                    "Missing If-Match header for DeleteDistribution",
808                )
809            })?
810            .to_string();
811        let mut state = self.state.write();
812        let account = state
813            .accounts
814            .get_mut(DEFAULT_ACCOUNT)
815            .ok_or_else(|| no_such_distribution(id))?;
816        {
817            let dist = account
818                .distributions
819                .get(id)
820                .ok_or_else(|| no_such_distribution(id))?;
821            if dist.etag != if_match {
822                return Err(aws_error(
823                    StatusCode::PRECONDITION_FAILED,
824                    "PreconditionFailed",
825                    "If-Match header does not match the current ETag",
826                ));
827            }
828            if dist.config.enabled {
829                return Err(aws_error(
830                    StatusCode::PRECONDITION_FAILED,
831                    "DistributionNotDisabled",
832                    "Distribution must be disabled before delete",
833                ));
834            }
835        }
836        let removed = account.distributions.remove(id).unwrap();
837        account.tags.remove(&removed.arn);
838        Ok(empty_response(StatusCode::NO_CONTENT))
839    }
840
841    fn list_distributions(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
842        let state = self.state.read();
843        let mut dists: Vec<StoredDistribution> = state
844            .accounts
845            .values()
846            .flat_map(|a| a.distributions.values().cloned())
847            .collect();
848        dists.sort_by_key(|a| a.last_modified_time);
849        drop(state);
850        let body = build_distribution_list_xml(&dists, "DistributionList");
851        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
852    }
853
854    fn list_distributions_by(&self, action: &str) -> Result<AwsResponse, AwsServiceError> {
855        // The "by-X" listings each have a distinct response root element.
856        // We never index distributions by the predicate (that would require
857        // shipping each policy/key-group/etc service first), so each
858        // response is empty until those services land.
859        let root = match action {
860            "ListDistributionsByCachePolicyId"
861            | "ListDistributionsByOriginRequestPolicyId"
862            | "ListDistributionsByResponseHeadersPolicyId"
863            | "ListDistributionsByKeyGroup"
864            | "ListDistributionsByVpcOriginId" => "DistributionIdList",
865            "ListDistributionsByOwnedResource" => "DistributionIdOwnerList",
866            _ => "DistributionList",
867        };
868        let body = build_empty_distribution_id_list(root);
869        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
870    }
871
872    fn copy_distribution(
873        &self,
874        req: &AwsRequest,
875        route: &Route,
876    ) -> Result<AwsResponse, AwsServiceError> {
877        let primary_id = route
878            .id
879            .as_deref()
880            .ok_or_else(|| invalid_argument("missing primary distribution id"))?;
881        let if_match = req
882            .headers
883            .get(IF_MATCH)
884            .and_then(|v| v.to_str().ok())
885            .ok_or_else(|| {
886                aws_error(
887                    StatusCode::BAD_REQUEST,
888                    "InvalidIfMatchVersion",
889                    "Missing If-Match header for CopyDistribution",
890                )
891            })?
892            .to_string();
893        let parsed: CopyDistributionRequest = xml_io::from_xml_root(&req.body)
894            .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
895        validate_caller_reference(&parsed.caller_reference)?;
896        let mut state = self.state.write();
897        let account = state
898            .accounts
899            .get_mut(DEFAULT_ACCOUNT)
900            .ok_or_else(|| no_such_distribution(primary_id))?;
901        let primary = account
902            .distributions
903            .get(primary_id)
904            .ok_or_else(|| no_such_distribution(primary_id))?
905            .clone();
906        if primary.etag != if_match {
907            return Err(aws_error(
908                StatusCode::PRECONDITION_FAILED,
909                "PreconditionFailed",
910                "If-Match header does not match the current ETag",
911            ));
912        }
913        if account
914            .distributions
915            .values()
916            .any(|d| d.config.caller_reference == parsed.caller_reference)
917        {
918            return Err(aws_error(
919                StatusCode::CONFLICT,
920                "DistributionAlreadyExists",
921                "Distribution with the same CallerReference exists",
922            ));
923        }
924        let new_id = generate_distribution_id();
925        let mut config = primary.config.clone();
926        config.caller_reference = parsed.caller_reference;
927        config.enabled = parsed.enabled.unwrap_or(false);
928        config.staging = parsed.staging;
929        let now = Utc::now();
930        let etag = generate_etag();
931        let arn = format!(
932            "arn:aws:cloudfront::{}:distribution/{}",
933            account_id(req),
934            new_id
935        );
936        let stored = StoredDistribution {
937            id: new_id.clone(),
938            arn: arn.clone(),
939            status: "InProgress".to_string(),
940            last_modified_time: now,
941            domain_name: format!("{}.cloudfront.net", new_id.to_lowercase()),
942            in_progress_invalidation_batches: 0,
943            etag: etag.clone(),
944            config,
945        };
946        account.distributions.insert(new_id.clone(), stored.clone());
947        drop(state);
948        self.schedule_distribution_deploy(new_id);
949        let body = build_distribution_xml(&stored);
950        let mut headers = HeaderMap::new();
951        set_header(&mut headers, ETAG, &etag);
952        set_header(&mut headers, LOCATION, &stored.arn);
953        Ok(xml_response(StatusCode::CREATED, body, headers))
954    }
955}
956
957#[derive(Debug, serde::Deserialize, Default)]
958#[serde(rename_all = "PascalCase")]
959struct CopyDistributionRequest {
960    caller_reference: String,
961    #[serde(default)]
962    enabled: Option<bool>,
963    #[serde(default)]
964    staging: Option<bool>,
965}
966
967// ─── Invalidations ────────────────────────────────────────────────────
968
969impl CloudFrontService {
970    fn create_invalidation(
971        &self,
972        req: &AwsRequest,
973        route: &Route,
974    ) -> Result<AwsResponse, AwsServiceError> {
975        let dist_id = route
976            .id
977            .as_deref()
978            .ok_or_else(|| invalid_argument("missing distribution id"))?;
979        let batch: InvalidationBatch = xml_io::from_xml_root(&req.body)
980            .map_err(|e| invalid_argument(format!("invalid InvalidationBatch XML: {e}")))?;
981        if batch.caller_reference.is_empty() {
982            return Err(invalid_argument("CallerReference is required"));
983        }
984        if batch.paths.quantity < 1 {
985            return Err(invalid_argument(
986                "InvalidationBatch.Paths must be non-empty",
987            ));
988        }
989        let mut state = self.state.write();
990        let account = state.entry(DEFAULT_ACCOUNT);
991        if !account.distributions.contains_key(dist_id) {
992            return Err(no_such_distribution(dist_id));
993        }
994        let id = generate_invalidation_id();
995        let stored = StoredInvalidation {
996            id: id.clone(),
997            distribution_id: dist_id.to_string(),
998            status: "Completed".to_string(),
999            create_time: Utc::now(),
1000            batch: batch.clone(),
1001        };
1002        account.invalidations.insert(id.clone(), stored.clone());
1003        drop(state);
1004        let body = build_invalidation_xml(&stored);
1005        let mut headers = HeaderMap::new();
1006        set_header(
1007            &mut headers,
1008            LOCATION,
1009            &format!(
1010                "/2020-05-31/distribution/{dist_id}/invalidation/{}",
1011                stored.id
1012            ),
1013        );
1014        Ok(xml_response(StatusCode::CREATED, body, headers))
1015    }
1016
1017    fn get_invalidation(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
1018        let dist_id = route
1019            .id
1020            .as_deref()
1021            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1022        let inv_id = route
1023            .second_id
1024            .as_deref()
1025            .ok_or_else(|| invalid_argument("missing invalidation id"))?;
1026        let state = self.state.read();
1027        let account = state
1028            .accounts
1029            .get(DEFAULT_ACCOUNT)
1030            .ok_or_else(|| no_such_invalidation(inv_id))?;
1031        if !account.distributions.contains_key(dist_id) {
1032            return Err(no_such_distribution(dist_id));
1033        }
1034        let inv = account
1035            .invalidations
1036            .get(inv_id)
1037            .filter(|i| i.distribution_id == dist_id)
1038            .ok_or_else(|| no_such_invalidation(inv_id))?
1039            .clone();
1040        drop(state);
1041        let body = build_invalidation_xml(&inv);
1042        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1043    }
1044
1045    fn list_invalidations(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
1046        let dist_id = route
1047            .id
1048            .as_deref()
1049            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1050        let state = self.state.read();
1051        let account = state
1052            .accounts
1053            .get(DEFAULT_ACCOUNT)
1054            .ok_or_else(|| no_such_distribution(dist_id))?;
1055        if !account.distributions.contains_key(dist_id) {
1056            return Err(no_such_distribution(dist_id));
1057        }
1058        let mut items: Vec<&StoredInvalidation> = account
1059            .invalidations
1060            .values()
1061            .filter(|i| i.distribution_id == dist_id)
1062            .collect();
1063        items.sort_by_key(|a| a.create_time);
1064        let body = build_invalidation_list_xml(&items);
1065        drop(state);
1066        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1067    }
1068}
1069
1070// ─── Tags ─────────────────────────────────────────────────────────────
1071
1072impl CloudFrontService {
1073    fn parse_arn_query(query: &str) -> Option<String> {
1074        for pair in query.split('&').filter(|p| !p.is_empty()) {
1075            if let Some(rest) = pair.strip_prefix("Resource=") {
1076                return Some(percent_decode(rest));
1077            }
1078        }
1079        None
1080    }
1081
1082    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1083        let arn = Self::parse_arn_query(&req.raw_query)
1084            .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1085        let parsed: ModelTags = xml_io::from_xml_root(&req.body)
1086            .map_err(|e| invalid_argument(format!("invalid Tags XML: {e}")))?;
1087        let new_tags: Vec<Tag> = parsed
1088            .items
1089            .map(|i| {
1090                i.tag
1091                    .into_iter()
1092                    .map(|t| Tag {
1093                        key: t.key,
1094                        value: t.value,
1095                    })
1096                    .collect()
1097            })
1098            .unwrap_or_default();
1099        let mut state = self.state.write();
1100        let account = state.entry(DEFAULT_ACCOUNT);
1101        let entry = account.tags.entry(arn).or_default();
1102        for tag in new_tags {
1103            if let Some(existing) = entry.iter_mut().find(|t| t.key == tag.key) {
1104                existing.value = tag.value;
1105            } else {
1106                entry.push(tag);
1107            }
1108        }
1109        Ok(empty_response(StatusCode::NO_CONTENT))
1110    }
1111
1112    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1113        let arn = Self::parse_arn_query(&req.raw_query)
1114            .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1115        let parsed: TagKeys = xml_io::from_xml_root(&req.body)
1116            .map_err(|e| invalid_argument(format!("invalid TagKeys XML: {e}")))?;
1117        let keys: Vec<String> = parsed.items.map(|k| k.key).unwrap_or_default();
1118        let mut state = self.state.write();
1119        let account = state.entry(DEFAULT_ACCOUNT);
1120        if let Some(existing) = account.tags.get_mut(&arn) {
1121            existing.retain(|t| !keys.contains(&t.key));
1122        }
1123        Ok(empty_response(StatusCode::NO_CONTENT))
1124    }
1125
1126    fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1127        let arn = Self::parse_arn_query(&req.raw_query)
1128            .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1129        let state = self.state.read();
1130        let tags = state
1131            .accounts
1132            .get(DEFAULT_ACCOUNT)
1133            .and_then(|a| a.tags.get(&arn))
1134            .cloned()
1135            .unwrap_or_default();
1136        drop(state);
1137        let body = build_tags_xml(&tags);
1138        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1139    }
1140}
1141
1142// ─── Aliases / WebACL ─────────────────────────────────────────────────
1143
1144impl CloudFrontService {
1145    fn associate_alias(
1146        &self,
1147        req: &AwsRequest,
1148        route: &Route,
1149    ) -> Result<AwsResponse, AwsServiceError> {
1150        let id = route
1151            .id
1152            .as_deref()
1153            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1154        let alias = parse_query_value(&req.raw_query, "Alias")
1155            .ok_or_else(|| invalid_argument("Alias query parameter is required"))?;
1156        let mut state = self.state.write();
1157        let account = state
1158            .accounts
1159            .get_mut(DEFAULT_ACCOUNT)
1160            .ok_or_else(|| no_such_distribution(id))?;
1161        // Reject if the alias is already attached to a different distribution.
1162        if let Some(other) = account.distributions.values().find(|d| {
1163            d.id != id
1164                && d.config
1165                    .aliases
1166                    .as_ref()
1167                    .and_then(|a| a.items.as_ref())
1168                    .is_some_and(|i| i.cname.iter().any(|c| c == &alias))
1169        }) {
1170            return Err(aws_error(
1171                StatusCode::CONFLICT,
1172                "CNAMEAlreadyExists",
1173                format!(
1174                    "Alias {alias} is already associated with distribution {}",
1175                    other.id
1176                ),
1177            ));
1178        }
1179        let dist = account
1180            .distributions
1181            .get_mut(id)
1182            .ok_or_else(|| no_such_distribution(id))?;
1183        let aliases = dist.config.aliases.get_or_insert_with(Default::default);
1184        let items = aliases
1185            .items
1186            .get_or_insert_with(crate::model::AliasItems::default);
1187        if !items.cname.iter().any(|c| c == &alias) {
1188            items.cname.push(alias.clone());
1189            aliases.quantity = items.cname.len() as i32;
1190        }
1191        dist.etag = generate_etag();
1192        dist.last_modified_time = Utc::now();
1193        Ok(empty_response(StatusCode::OK))
1194    }
1195
1196    fn list_conflicting_aliases(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1197        // We never produce conflicts because every alias is owned by one
1198        // distribution at most. Return an empty list with the proper shape.
1199        let body = format!(
1200            "{XML_DECL}<ConflictingAliasesList xmlns=\"{NS}\"><Quantity>0</Quantity></ConflictingAliasesList>",
1201            NS = crate::NAMESPACE,
1202            XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1203        );
1204        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1205    }
1206
1207    fn associate_web_acl(
1208        &self,
1209        req: &AwsRequest,
1210        route: &Route,
1211    ) -> Result<AwsResponse, AwsServiceError> {
1212        let id = route
1213            .id
1214            .as_deref()
1215            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1216        let parsed: AssociateAliasRequest = xml_io::from_xml_root(&req.body)
1217            .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
1218        let mut state = self.state.write();
1219        let account = state
1220            .accounts
1221            .get_mut(DEFAULT_ACCOUNT)
1222            .ok_or_else(|| no_such_distribution(id))?;
1223        let dist = account
1224            .distributions
1225            .get_mut(id)
1226            .ok_or_else(|| no_such_distribution(id))?;
1227        dist.config.web_acl_id = Some(parsed.web_acl_arn.clone());
1228        dist.etag = generate_etag();
1229        dist.last_modified_time = Utc::now();
1230        let body = format!(
1231            "{XML_DECL}<AssociateDistributionWebACLResult xmlns=\"{NS}\"><Id>{}</Id><WebACLArn>{}</WebACLArn></AssociateDistributionWebACLResult>",
1232            esc(id), esc(&parsed.web_acl_arn),
1233            NS = crate::NAMESPACE,
1234            XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1235        );
1236        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1237    }
1238
1239    fn disassociate_web_acl(
1240        &self,
1241        _req: &AwsRequest,
1242        route: &Route,
1243    ) -> Result<AwsResponse, AwsServiceError> {
1244        let id = route
1245            .id
1246            .as_deref()
1247            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1248        let mut state = self.state.write();
1249        let account = state
1250            .accounts
1251            .get_mut(DEFAULT_ACCOUNT)
1252            .ok_or_else(|| no_such_distribution(id))?;
1253        let dist = account
1254            .distributions
1255            .get_mut(id)
1256            .ok_or_else(|| no_such_distribution(id))?;
1257        dist.config.web_acl_id = None;
1258        dist.etag = generate_etag();
1259        dist.last_modified_time = Utc::now();
1260        let body = format!(
1261            "{XML_DECL}<DisassociateDistributionWebACLResult xmlns=\"{NS}\"><Id>{}</Id></DisassociateDistributionWebACLResult>",
1262            esc(id),
1263            NS = crate::NAMESPACE,
1264            XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1265        );
1266        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1267    }
1268}
1269
1270#[derive(serde::Deserialize, Default, Debug)]
1271#[serde(rename_all = "PascalCase")]
1272struct AssociateAliasRequest {
1273    #[serde(rename = "WebACLArn", default)]
1274    web_acl_arn: String,
1275}
1276
1277// ─── XML body builders ────────────────────────────────────────────────
1278
1279/// XML-escape a user-provided string before injecting it into hand-rolled
1280/// XML response bodies. The 5 standard XML metacharacters are escaped;
1281/// everything else passes through unchanged. Keep this in sync with
1282/// `quick_xml`'s entity table — using their own primitive would mean a
1283/// `Writer` per call and we serialize directly into a `String`.
1284pub(crate) fn esc(s: &str) -> String {
1285    let mut out = String::with_capacity(s.len());
1286    for c in s.chars() {
1287        match c {
1288            '&' => out.push_str("&amp;"),
1289            '<' => out.push_str("&lt;"),
1290            '>' => out.push_str("&gt;"),
1291            '"' => out.push_str("&quot;"),
1292            '\'' => out.push_str("&apos;"),
1293            _ => out.push(c),
1294        }
1295    }
1296    out
1297}
1298
1299pub(crate) fn build_distribution_xml(dist: &StoredDistribution) -> String {
1300    let mut out = String::with_capacity(2048);
1301    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1302    out.push_str(&format!(
1303        "<Distribution xmlns=\"{ns}\">",
1304        ns = crate::NAMESPACE
1305    ));
1306    out.push_str(&format!("<Id>{}</Id>", esc(&dist.id)));
1307    out.push_str(&format!("<ARN>{}</ARN>", esc(&dist.arn)));
1308    out.push_str(&format!("<Status>{}</Status>", esc(&dist.status)));
1309    out.push_str(&format!(
1310        "<LastModifiedTime>{}</LastModifiedTime>",
1311        rfc3339(&dist.last_modified_time)
1312    ));
1313    out.push_str(&format!(
1314        "<InProgressInvalidationBatches>{}</InProgressInvalidationBatches>",
1315        dist.in_progress_invalidation_batches
1316    ));
1317    out.push_str(&format!(
1318        "<DomainName>{}</DomainName>",
1319        esc(&dist.domain_name)
1320    ));
1321    out.push_str("<ActiveTrustedSigners><Enabled>false</Enabled><Quantity>0</Quantity></ActiveTrustedSigners>");
1322    out.push_str("<ActiveTrustedKeyGroups><Enabled>false</Enabled><Quantity>0</Quantity></ActiveTrustedKeyGroups>");
1323    let inner = quick_xml::se::to_string_with_root("DistributionConfig", &dist.config)
1324        .unwrap_or_else(|_| String::new());
1325    out.push_str(&inner);
1326    out.push_str("</Distribution>");
1327    out
1328}
1329
1330fn build_distribution_list_xml(dists: &[StoredDistribution], root: &str) -> String {
1331    let mut out = String::with_capacity(2048);
1332    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1333    out.push_str(&format!("<{root} xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1334    out.push_str("<Marker></Marker>");
1335    out.push_str(&format!("<MaxItems>{}</MaxItems>", dists.len().max(100)));
1336    out.push_str("<IsTruncated>false</IsTruncated>");
1337    out.push_str(&format!("<Quantity>{}</Quantity>", dists.len()));
1338    if dists.is_empty() {
1339        out.push_str(&format!("</{root}>"));
1340        return out;
1341    }
1342    out.push_str("<Items>");
1343    for d in dists {
1344        out.push_str("<DistributionSummary>");
1345        out.push_str(&format!("<Id>{}</Id>", esc(&d.id)));
1346        out.push_str(&format!("<ARN>{}</ARN>", esc(&d.arn)));
1347        out.push_str(&format!("<Status>{}</Status>", esc(&d.status)));
1348        out.push_str(&format!(
1349            "<LastModifiedTime>{}</LastModifiedTime>",
1350            rfc3339(&d.last_modified_time)
1351        ));
1352        out.push_str(&format!("<DomainName>{}</DomainName>", esc(&d.domain_name)));
1353        let aliases = d.config.aliases.clone().unwrap_or_default();
1354        out.push_str(&render_inline("Aliases", &aliases));
1355        let origins = d.config.origins.clone();
1356        out.push_str(&render_inline("Origins", &origins));
1357        let dcb = d.config.default_cache_behavior.clone();
1358        out.push_str(&render_inline("DefaultCacheBehavior", &dcb));
1359        let cb = d.config.cache_behaviors.clone().unwrap_or_default();
1360        out.push_str(&render_inline("CacheBehaviors", &cb));
1361        let cer = d.config.custom_error_responses.clone().unwrap_or_default();
1362        out.push_str(&render_inline("CustomErrorResponses", &cer));
1363        out.push_str(&format!("<Comment>{}</Comment>", esc(&d.config.comment)));
1364        out.push_str(&format!(
1365            "<PriceClass>{}</PriceClass>",
1366            esc(&d
1367                .config
1368                .price_class
1369                .clone()
1370                .unwrap_or_else(|| "PriceClass_All".to_string()))
1371        ));
1372        out.push_str(&format!("<Enabled>{}</Enabled>", d.config.enabled));
1373        out.push_str(&render_inline(
1374            "ViewerCertificate",
1375            &d.config.viewer_certificate.clone().unwrap_or_default(),
1376        ));
1377        out.push_str(&render_inline(
1378            "Restrictions",
1379            &d.config.restrictions.clone().unwrap_or_default(),
1380        ));
1381        out.push_str(&format!(
1382            "<WebACLId>{}</WebACLId>",
1383            esc(&d.config.web_acl_id.clone().unwrap_or_default())
1384        ));
1385        out.push_str(&format!(
1386            "<HttpVersion>{}</HttpVersion>",
1387            esc(&d
1388                .config
1389                .http_version
1390                .clone()
1391                .unwrap_or_else(|| "http2".to_string()))
1392        ));
1393        out.push_str(&format!(
1394            "<IsIPV6Enabled>{}</IsIPV6Enabled>",
1395            d.config.is_ipv6_enabled.unwrap_or(true)
1396        ));
1397        out.push_str("<Staging>false</Staging>");
1398        out.push_str("</DistributionSummary>");
1399    }
1400    out.push_str("</Items>");
1401    out.push_str(&format!("</{root}>"));
1402    out
1403}
1404
1405fn build_empty_distribution_id_list(root: &str) -> String {
1406    let mut out = String::with_capacity(256);
1407    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1408    out.push_str(&format!("<{root} xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1409    out.push_str("<Marker></Marker>");
1410    out.push_str("<MaxItems>100</MaxItems>");
1411    out.push_str("<IsTruncated>false</IsTruncated>");
1412    out.push_str("<Quantity>0</Quantity>");
1413    out.push_str(&format!("</{root}>"));
1414    out
1415}
1416
1417fn build_invalidation_xml(inv: &StoredInvalidation) -> String {
1418    let mut out = String::with_capacity(512);
1419    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1420    out.push_str(&format!(
1421        "<Invalidation xmlns=\"{ns}\">",
1422        ns = crate::NAMESPACE
1423    ));
1424    out.push_str(&format!("<Id>{}</Id>", esc(&inv.id)));
1425    out.push_str(&format!("<Status>{}</Status>", esc(&inv.status)));
1426    out.push_str(&format!(
1427        "<CreateTime>{}</CreateTime>",
1428        rfc3339(&inv.create_time)
1429    ));
1430    out.push_str(&render_inline("InvalidationBatch", &inv.batch));
1431    out.push_str("</Invalidation>");
1432    out
1433}
1434
1435fn build_invalidation_list_xml(items: &[&StoredInvalidation]) -> String {
1436    let mut out = String::with_capacity(1024);
1437    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1438    out.push_str(&format!(
1439        "<InvalidationList xmlns=\"{ns}\">",
1440        ns = crate::NAMESPACE
1441    ));
1442    out.push_str("<Marker></Marker>");
1443    out.push_str("<MaxItems>100</MaxItems>");
1444    out.push_str("<IsTruncated>false</IsTruncated>");
1445    out.push_str(&format!("<Quantity>{}</Quantity>", items.len()));
1446    if !items.is_empty() {
1447        out.push_str("<Items>");
1448        for inv in items {
1449            out.push_str("<InvalidationSummary>");
1450            out.push_str(&format!("<Id>{}</Id>", esc(&inv.id)));
1451            out.push_str(&format!(
1452                "<CreateTime>{}</CreateTime>",
1453                rfc3339(&inv.create_time)
1454            ));
1455            out.push_str(&format!("<Status>{}</Status>", esc(&inv.status)));
1456            out.push_str("</InvalidationSummary>");
1457        }
1458        out.push_str("</Items>");
1459    }
1460    out.push_str("</InvalidationList>");
1461    out
1462}
1463
1464fn build_tags_xml(tags: &[Tag]) -> String {
1465    let mut out = String::with_capacity(256);
1466    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1467    out.push_str(&format!("<Tags xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1468    out.push_str("<Items>");
1469    for t in tags {
1470        out.push_str("<Tag>");
1471        out.push_str(&format!("<Key>{}</Key>", esc(&t.key)));
1472        if let Some(v) = &t.value {
1473            out.push_str(&format!("<Value>{}</Value>", esc(v)));
1474        }
1475        out.push_str("</Tag>");
1476    }
1477    out.push_str("</Items>");
1478    out.push_str("</Tags>");
1479    out
1480}
1481
1482fn render_inline<T: serde::Serialize>(root: &str, value: &T) -> String {
1483    quick_xml::se::to_string_with_root(root, value).unwrap_or_default()
1484}
1485
1486// ─── Helpers ──────────────────────────────────────────────────────────
1487
1488fn validate_caller_reference(s: &str) -> Result<(), AwsServiceError> {
1489    if s.is_empty() {
1490        return Err(invalid_argument("CallerReference is required"));
1491    }
1492    Ok(())
1493}
1494
1495fn validate_origins(config: &DistributionConfig) -> Result<(), AwsServiceError> {
1496    if config.origins.quantity < 1 {
1497        return Err(invalid_argument(
1498            "DistributionConfig.Origins must contain at least one origin",
1499        ));
1500    }
1501    Ok(())
1502}
1503
1504/// Compare two `DistributionConfig`s by serializing them to canonical XML
1505/// and comparing bytes. Used by `UpdateDistribution` to detect no-op writes
1506/// so the ETag stays stable when the caller PUTs the same config back.
1507/// Falls back to "not equal" if either serialization fails so we still
1508/// honor the request rather than silently swallow a write.
1509fn configs_equal(lhs: &DistributionConfig, rhs: &DistributionConfig) -> bool {
1510    let Ok(a) = xml_io::to_xml_root("DistributionConfig", lhs) else {
1511        return false;
1512    };
1513    let Ok(b) = xml_io::to_xml_root("DistributionConfig", rhs) else {
1514        return false;
1515    };
1516    a == b
1517}
1518
1519fn account_id(_req: &AwsRequest) -> &'static str {
1520    // Multi-account is wired through AwsRequest.account_id elsewhere; the
1521    // CloudFront control plane only uses the resolved id for the ARN
1522    // suffix. Until that field stabilizes for REST-XML we use the default
1523    // account ID consistently with the rest of the registered services.
1524    DEFAULT_ACCOUNT
1525}
1526
1527fn generate_distribution_id() -> String {
1528    // CloudFront IDs are 14-char base32-ish uppercase strings starting with E.
1529    let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1530    format!("E{}", &raw[..13])
1531}
1532
1533fn generate_invalidation_id() -> String {
1534    let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1535    format!("I{}", &raw[..13])
1536}
1537
1538pub(crate) fn generate_etag() -> String {
1539    let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1540    format!("E{}", &raw[..13])
1541}
1542
1543/// Generate an AWS-shaped CloudFront resource ID with the given prefix.
1544/// Used by Batch 2 policy resources (cache, origin request, response
1545/// headers, continuous deployment, OAC) so each gets a recognizable
1546/// alphabetic prefix in addition to the random suffix.
1547pub(crate) fn generate_id_with_prefix(prefix: &str) -> String {
1548    let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1549    let suffix_len = 14usize.saturating_sub(prefix.len()).min(raw.len());
1550    format!("{prefix}{}", &raw[..suffix_len])
1551}
1552
1553fn rfc3339(t: &chrono::DateTime<chrono::Utc>) -> String {
1554    t.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
1555}
1556
1557pub(crate) fn invalid_argument(msg: impl Into<String>) -> AwsServiceError {
1558    aws_error(StatusCode::BAD_REQUEST, "InvalidArgument", msg)
1559}
1560
1561fn no_such_distribution(id: &str) -> AwsServiceError {
1562    aws_error(
1563        StatusCode::NOT_FOUND,
1564        "NoSuchDistribution",
1565        format!("The specified distribution does not exist: {id}"),
1566    )
1567}
1568
1569fn no_such_invalidation(id: &str) -> AwsServiceError {
1570    aws_error(
1571        StatusCode::NOT_FOUND,
1572        "NoSuchInvalidation",
1573        format!("The specified invalidation does not exist: {id}"),
1574    )
1575}
1576
1577fn internal_error(msg: impl Into<String>) -> AwsServiceError {
1578    aws_error(StatusCode::INTERNAL_SERVER_ERROR, "InternalError", msg)
1579}
1580
1581pub(crate) fn aws_error(
1582    status: StatusCode,
1583    code: impl Into<String>,
1584    msg: impl Into<String>,
1585) -> AwsServiceError {
1586    AwsServiceError::aws_error(status, code.into(), msg)
1587}
1588
1589fn set_header(headers: &mut HeaderMap, name: HeaderName, value: &str) {
1590    if let Ok(v) = HeaderValue::from_str(value) {
1591        headers.insert(name, v);
1592    }
1593}
1594
1595pub(crate) fn xml_response(status: StatusCode, body: String, headers: HeaderMap) -> AwsResponse {
1596    AwsResponse {
1597        status,
1598        content_type: "text/xml".to_string(),
1599        body: ResponseBody::Bytes(Bytes::from(body)),
1600        headers,
1601    }
1602}
1603
1604fn empty_response(status: StatusCode) -> AwsResponse {
1605    AwsResponse {
1606        status,
1607        content_type: "text/xml".to_string(),
1608        body: ResponseBody::Bytes(Bytes::new()),
1609        headers: HeaderMap::new(),
1610    }
1611}
1612
1613fn percent_decode(input: &str) -> String {
1614    let mut out = String::with_capacity(input.len());
1615    let bytes = input.as_bytes();
1616    let mut i = 0;
1617    while i < bytes.len() {
1618        let b = bytes[i];
1619        if b == b'%' && i + 2 < bytes.len() {
1620            if let (Some(a), Some(c)) = (hex_digit(bytes[i + 1]), hex_digit(bytes[i + 2])) {
1621                out.push(((a << 4) | c) as char);
1622                i += 3;
1623                continue;
1624            }
1625        }
1626        if b == b'+' {
1627            out.push(' ');
1628        } else {
1629            out.push(b as char);
1630        }
1631        i += 1;
1632    }
1633    out
1634}
1635
1636fn hex_digit(b: u8) -> Option<u8> {
1637    match b {
1638        b'0'..=b'9' => Some(b - b'0'),
1639        b'a'..=b'f' => Some(b - b'a' + 10),
1640        b'A'..=b'F' => Some(b - b'A' + 10),
1641        _ => None,
1642    }
1643}
1644
1645fn parse_query_value(query: &str, key: &str) -> Option<String> {
1646    let prefix = format!("{key}=");
1647    for pair in query.split('&').filter(|p| !p.is_empty()) {
1648        if let Some(rest) = pair.strip_prefix(&prefix) {
1649            return Some(percent_decode(rest));
1650        }
1651    }
1652    None
1653}
1654
1655#[cfg(test)]
1656mod tests {
1657    use super::*;
1658
1659    fn make_state() -> SharedCloudFrontState {
1660        Arc::new(RwLock::new(CloudFrontAccounts::new()))
1661    }
1662
1663    fn make_request(method: http::Method, path: &str, query: &str, body: &str) -> AwsRequest {
1664        AwsRequest {
1665            service: "cloudfront".into(),
1666            action: String::new(),
1667            region: "us-east-1".into(),
1668            account_id: DEFAULT_ACCOUNT.into(),
1669            request_id: Uuid::new_v4().to_string(),
1670            headers: HeaderMap::new(),
1671            query_params: std::collections::HashMap::new(),
1672            body_stream: parking_lot::Mutex::new(None),
1673            body: Bytes::from(body.to_string()),
1674            path_segments: path
1675                .split('/')
1676                .filter(|s| !s.is_empty())
1677                .map(String::from)
1678                .collect(),
1679            raw_path: path.into(),
1680            raw_query: query.into(),
1681            method,
1682            is_query_protocol: false,
1683            access_key_id: None,
1684            principal: None,
1685        }
1686    }
1687
1688    fn minimal_dist_config_xml(caller_ref: &str) -> String {
1689        format!(
1690            r#"<?xml version="1.0" encoding="UTF-8"?>
1691<DistributionConfig xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
1692  <CallerReference>{caller_ref}</CallerReference>
1693  <Origins>
1694    <Quantity>1</Quantity>
1695    <Items>
1696      <Origin>
1697        <Id>primary</Id>
1698        <DomainName>example.com</DomainName>
1699      </Origin>
1700    </Items>
1701  </Origins>
1702  <DefaultCacheBehavior>
1703    <TargetOriginId>primary</TargetOriginId>
1704    <ViewerProtocolPolicy>allow-all</ViewerProtocolPolicy>
1705  </DefaultCacheBehavior>
1706  <Comment></Comment>
1707  <Enabled>true</Enabled>
1708</DistributionConfig>"#
1709        )
1710    }
1711
1712    #[tokio::test]
1713    async fn create_then_get_then_delete_distribution() {
1714        let svc = CloudFrontService::new(make_state());
1715        let body = minimal_dist_config_xml("ref-1");
1716        let create = svc
1717            .handle(make_request(
1718                http::Method::POST,
1719                "/2020-05-31/distribution",
1720                "",
1721                &body,
1722            ))
1723            .await
1724            .unwrap();
1725        assert_eq!(create.status, StatusCode::CREATED);
1726        let etag = create
1727            .headers
1728            .get(ETAG)
1729            .unwrap()
1730            .to_str()
1731            .unwrap()
1732            .to_string();
1733        let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
1734        let id = xml
1735            .split("<Id>")
1736            .nth(1)
1737            .unwrap()
1738            .split("</Id>")
1739            .next()
1740            .unwrap()
1741            .to_string();
1742
1743        let get = svc
1744            .handle(make_request(
1745                http::Method::GET,
1746                &format!("/2020-05-31/distribution/{id}"),
1747                "",
1748                "",
1749            ))
1750            .await
1751            .unwrap();
1752        assert_eq!(get.status, StatusCode::OK);
1753
1754        // Disable then delete (CloudFront requires Disabled before delete).
1755        let disable_body = body.replace("<Enabled>true</Enabled>", "<Enabled>false</Enabled>");
1756        let mut update_req = make_request(
1757            http::Method::PUT,
1758            &format!("/2020-05-31/distribution/{id}/config"),
1759            "",
1760            &disable_body,
1761        );
1762        update_req.headers.insert(IF_MATCH, etag.parse().unwrap());
1763        let updated = svc.handle(update_req).await.unwrap();
1764        assert_eq!(updated.status, StatusCode::OK);
1765        let new_etag = updated
1766            .headers
1767            .get(ETAG)
1768            .unwrap()
1769            .to_str()
1770            .unwrap()
1771            .to_string();
1772
1773        let mut del_req = make_request(
1774            http::Method::DELETE,
1775            &format!("/2020-05-31/distribution/{id}"),
1776            "",
1777            "",
1778        );
1779        del_req.headers.insert(IF_MATCH, new_etag.parse().unwrap());
1780        let del = svc.handle(del_req).await.unwrap();
1781        assert_eq!(del.status, StatusCode::NO_CONTENT);
1782    }
1783
1784    async fn create_distribution_returning_id(svc: &CloudFrontService, caller_ref: &str) -> String {
1785        let body = minimal_dist_config_xml(caller_ref);
1786        let create = svc
1787            .handle(make_request(
1788                http::Method::POST,
1789                "/2020-05-31/distribution",
1790                "",
1791                &body,
1792            ))
1793            .await
1794            .unwrap();
1795        let xml = std::str::from_utf8(create.body.expect_bytes())
1796            .unwrap()
1797            .to_string();
1798        xml.split("<Id>")
1799            .nth(1)
1800            .unwrap()
1801            .split("</Id>")
1802            .next()
1803            .unwrap()
1804            .to_string()
1805    }
1806
1807    fn distribution_status(svc: &CloudFrontService, id: &str) -> String {
1808        let state = svc.state.read();
1809        state
1810            .accounts
1811            .get(DEFAULT_ACCOUNT)
1812            .and_then(|a| a.distributions.get(id))
1813            .map(|d| d.status.clone())
1814            .unwrap_or_default()
1815    }
1816
1817    #[tokio::test]
1818    async fn create_distribution_starts_in_progress() {
1819        // Use a long delay so the auto-tick can't race the assertion.
1820        let svc = CloudFrontService::new(make_state())
1821            .with_propagation_delay(std::time::Duration::from_secs(60));
1822        let body = minimal_dist_config_xml("status-ref");
1823        let create = svc
1824            .handle(make_request(
1825                http::Method::POST,
1826                "/2020-05-31/distribution",
1827                "",
1828                &body,
1829            ))
1830            .await
1831            .unwrap();
1832        let xml = std::str::from_utf8(create.body.expect_bytes())
1833            .unwrap()
1834            .to_string();
1835        assert!(
1836            xml.contains("<Status>InProgress</Status>"),
1837            "expected initial status InProgress, got: {xml}"
1838        );
1839    }
1840
1841    #[tokio::test]
1842    async fn auto_transition_after_tick_marks_deployed() {
1843        let svc = CloudFrontService::new(make_state())
1844            .with_propagation_delay(std::time::Duration::from_millis(50));
1845        let id = create_distribution_returning_id(&svc, "auto-tick-ref").await;
1846        assert_eq!(distribution_status(&svc, &id), "InProgress");
1847        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1848        assert_eq!(distribution_status(&svc, &id), "Deployed");
1849    }
1850
1851    #[tokio::test]
1852    async fn set_distribution_status_via_admin_flips_synchronously() {
1853        let svc = CloudFrontService::new(make_state())
1854            .with_propagation_delay(std::time::Duration::from_secs(60));
1855        let id = create_distribution_returning_id(&svc, "admin-ref").await;
1856        assert_eq!(distribution_status(&svc, &id), "InProgress");
1857        assert!(svc.set_distribution_status(&id, "Deployed"));
1858        assert_eq!(distribution_status(&svc, &id), "Deployed");
1859        assert!(svc.set_distribution_status(&id, "InProgress"));
1860        assert_eq!(distribution_status(&svc, &id), "InProgress");
1861    }
1862
1863    #[tokio::test]
1864    async fn set_distribution_status_unknown_id_returns_false() {
1865        let svc = CloudFrontService::new(make_state());
1866        assert!(!svc.set_distribution_status("E-DOES-NOT-EXIST", "Deployed"));
1867    }
1868
1869    #[tokio::test]
1870    async fn update_distribution_resets_to_in_progress() {
1871        let svc = CloudFrontService::new(make_state())
1872            .with_propagation_delay(std::time::Duration::from_secs(60));
1873        let body = minimal_dist_config_xml("update-reset-ref");
1874        let create = svc
1875            .handle(make_request(
1876                http::Method::POST,
1877                "/2020-05-31/distribution",
1878                "",
1879                &body,
1880            ))
1881            .await
1882            .unwrap();
1883        let etag = create
1884            .headers
1885            .get(ETAG)
1886            .unwrap()
1887            .to_str()
1888            .unwrap()
1889            .to_string();
1890        let xml = std::str::from_utf8(create.body.expect_bytes())
1891            .unwrap()
1892            .to_string();
1893        let id = xml
1894            .split("<Id>")
1895            .nth(1)
1896            .unwrap()
1897            .split("</Id>")
1898            .next()
1899            .unwrap()
1900            .to_string();
1901        // Force the distribution to Deployed via the admin mutator so we can
1902        // observe the UpdateDistribution flip back to InProgress.
1903        assert!(svc.set_distribution_status(&id, "Deployed"));
1904        assert_eq!(distribution_status(&svc, &id), "Deployed");
1905
1906        let updated_body = body.replace(
1907            "<ViewerProtocolPolicy>allow-all</ViewerProtocolPolicy>",
1908            "<ViewerProtocolPolicy>https-only</ViewerProtocolPolicy>",
1909        );
1910        let mut update_req = make_request(
1911            http::Method::PUT,
1912            &format!("/2020-05-31/distribution/{id}/config"),
1913            "",
1914            &updated_body,
1915        );
1916        update_req.headers.insert(IF_MATCH, etag.parse().unwrap());
1917        let updated = svc.handle(update_req).await.unwrap();
1918        assert_eq!(updated.status, StatusCode::OK);
1919        assert_eq!(distribution_status(&svc, &id), "InProgress");
1920    }
1921
1922    #[tokio::test]
1923    async fn duplicate_caller_reference_is_rejected() {
1924        let svc = CloudFrontService::new(make_state());
1925        let body = minimal_dist_config_xml("dup-ref");
1926        svc.handle(make_request(
1927            http::Method::POST,
1928            "/2020-05-31/distribution",
1929            "",
1930            &body,
1931        ))
1932        .await
1933        .unwrap();
1934        let result = svc
1935            .handle(make_request(
1936                http::Method::POST,
1937                "/2020-05-31/distribution",
1938                "",
1939                &body,
1940            ))
1941            .await;
1942        let err = match result {
1943            Ok(_) => panic!("expected duplicate caller-reference to fail"),
1944            Err(e) => e,
1945        };
1946        assert_eq!(err.code(), "DistributionAlreadyExists");
1947        assert_eq!(err.status(), StatusCode::CONFLICT);
1948    }
1949
1950    #[tokio::test]
1951    async fn invalidation_lifecycle() {
1952        let svc = CloudFrontService::new(make_state());
1953        let body = minimal_dist_config_xml("inv-ref");
1954        let create = svc
1955            .handle(make_request(
1956                http::Method::POST,
1957                "/2020-05-31/distribution",
1958                "",
1959                &body,
1960            ))
1961            .await
1962            .unwrap();
1963        let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
1964        let dist_id = xml
1965            .split("<Id>")
1966            .nth(1)
1967            .unwrap()
1968            .split("</Id>")
1969            .next()
1970            .unwrap();
1971        let inv_body = r#"<?xml version="1.0" encoding="UTF-8"?>
1972<InvalidationBatch xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
1973  <Paths><Quantity>1</Quantity><Items><Path>/*</Path></Items></Paths>
1974  <CallerReference>inv-1</CallerReference>
1975</InvalidationBatch>"#;
1976        let inv_resp = svc
1977            .handle(make_request(
1978                http::Method::POST,
1979                &format!("/2020-05-31/distribution/{dist_id}/invalidation"),
1980                "",
1981                inv_body,
1982            ))
1983            .await
1984            .unwrap();
1985        assert_eq!(inv_resp.status, StatusCode::CREATED);
1986        let inv_xml = std::str::from_utf8(inv_resp.body.expect_bytes()).unwrap();
1987        let inv_id = inv_xml
1988            .split("<Id>")
1989            .nth(1)
1990            .unwrap()
1991            .split("</Id>")
1992            .next()
1993            .unwrap();
1994        let get = svc
1995            .handle(make_request(
1996                http::Method::GET,
1997                &format!("/2020-05-31/distribution/{dist_id}/invalidation/{inv_id}"),
1998                "",
1999                "",
2000            ))
2001            .await
2002            .unwrap();
2003        assert_eq!(get.status, StatusCode::OK);
2004        let list = svc
2005            .handle(make_request(
2006                http::Method::GET,
2007                &format!("/2020-05-31/distribution/{dist_id}/invalidation"),
2008                "",
2009                "",
2010            ))
2011            .await
2012            .unwrap();
2013        let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2014        assert!(xml.contains("<Quantity>1</Quantity>"));
2015    }
2016
2017    #[tokio::test]
2018    async fn tags_roundtrip() {
2019        let svc = CloudFrontService::new(make_state());
2020        let body = minimal_dist_config_xml("tag-ref");
2021        let create = svc
2022            .handle(make_request(
2023                http::Method::POST,
2024                "/2020-05-31/distribution",
2025                "",
2026                &body,
2027            ))
2028            .await
2029            .unwrap();
2030        let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2031        let arn = xml
2032            .split("<ARN>")
2033            .nth(1)
2034            .unwrap()
2035            .split("</ARN>")
2036            .next()
2037            .unwrap();
2038        let tag_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2039<Tags xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2040  <Items><Tag><Key>env</Key><Value>prod</Value></Tag></Items>
2041</Tags>"#;
2042        let arn_q = format!("Operation=Tag&Resource={}", arn);
2043        let resp = svc
2044            .handle(make_request(
2045                http::Method::POST,
2046                "/2020-05-31/tagging",
2047                &arn_q,
2048                tag_body,
2049            ))
2050            .await
2051            .unwrap();
2052        assert_eq!(resp.status, StatusCode::NO_CONTENT);
2053        let list = svc
2054            .handle(make_request(
2055                http::Method::GET,
2056                "/2020-05-31/tagging",
2057                &format!("Resource={}", arn),
2058                "",
2059            ))
2060            .await
2061            .unwrap();
2062        let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2063        assert!(xml.contains("<Key>env</Key>"));
2064        assert!(xml.contains("<Value>prod</Value>"));
2065    }
2066
2067    #[tokio::test]
2068    async fn xml_metacharacters_in_user_input_are_escaped() {
2069        let svc = CloudFrontService::new(make_state());
2070        let body = minimal_dist_config_xml("escape-ref").replace(
2071            "<Comment></Comment>",
2072            "<Comment><![CDATA[a&b<c>d]]></Comment>",
2073        );
2074        let create = svc
2075            .handle(make_request(
2076                http::Method::POST,
2077                "/2020-05-31/distribution",
2078                "",
2079                &body,
2080            ))
2081            .await
2082            .unwrap();
2083        let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2084        let dist_id = xml
2085            .split("<Id>")
2086            .nth(1)
2087            .unwrap()
2088            .split("</Id>")
2089            .next()
2090            .unwrap();
2091        let arn = xml
2092            .split("<ARN>")
2093            .nth(1)
2094            .unwrap()
2095            .split("</ARN>")
2096            .next()
2097            .unwrap();
2098
2099        let tag_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2100<Tags xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2101  <Items><Tag><Key>env</Key><Value>a&amp;b&lt;c&gt;d</Value></Tag></Items>
2102</Tags>"#;
2103        let arn_q = format!("Operation=Tag&Resource={}", arn);
2104        svc.handle(make_request(
2105            http::Method::POST,
2106            "/2020-05-31/tagging",
2107            &arn_q,
2108            tag_body,
2109        ))
2110        .await
2111        .unwrap();
2112
2113        let list = svc
2114            .handle(make_request(
2115                http::Method::GET,
2116                "/2020-05-31/tagging",
2117                &format!("Resource={}", arn),
2118                "",
2119            ))
2120            .await
2121            .unwrap();
2122        let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2123        assert!(xml.contains("<Value>a&amp;b&lt;c&gt;d</Value>"));
2124        assert!(!xml.contains("<Value>a&b<c>d</Value>"));
2125
2126        // Force the distribution into the list-rendering path so the
2127        // unescaped Comment field would surface there.
2128        let list_resp = svc
2129            .handle(make_request(
2130                http::Method::GET,
2131                "/2020-05-31/distribution",
2132                "",
2133                "",
2134            ))
2135            .await
2136            .unwrap();
2137        let xml = std::str::from_utf8(list_resp.body.expect_bytes()).unwrap();
2138        // Ensure raw `<` from the user-supplied comment never lands inside
2139        // a Comment element on the wire.
2140        assert!(!xml.contains("<Comment>a&b<c>d"));
2141        assert!(xml.contains(dist_id));
2142    }
2143}