Skip to main content

fakecloud_cloudfront/
service.rs

1//! CloudFront REST-XML service implementation.
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use chrono::Utc;
8use http::header::{HeaderName, HeaderValue, ETAG, IF_MATCH, LOCATION};
9use http::{HeaderMap, StatusCode};
10use parking_lot::RwLock;
11use uuid::Uuid;
12
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError, ResponseBody};
14
15use crate::model::{
16    DistributionConfig, DistributionConfigWithTags, InvalidationBatch, TagKeys, Tags as ModelTags,
17};
18use crate::router::{route, Route};
19use crate::state::{
20    CloudFrontAccounts, SharedCloudFrontState, StoredDistribution, StoredInvalidation, Tag,
21};
22use crate::xml_io;
23
24pub(crate) const DEFAULT_ACCOUNT: &str = "000000000000";
25
26const SUPPORTED_ACTIONS: &[&str] = &[
27    "CreateDistribution",
28    "CreateDistributionWithTags",
29    "GetDistribution",
30    "GetDistributionConfig",
31    "UpdateDistribution",
32    "DeleteDistribution",
33    "ListDistributions",
34    "CopyDistribution",
35    "CreateInvalidation",
36    "GetInvalidation",
37    "ListInvalidations",
38    "TagResource",
39    "UntagResource",
40    "ListTagsForResource",
41    "AssociateAlias",
42    "ListConflictingAliases",
43    "ListDistributionsByCachePolicyId",
44    "ListDistributionsByOriginRequestPolicyId",
45    "ListDistributionsByResponseHeadersPolicyId",
46    "ListDistributionsByKeyGroup",
47    "ListDistributionsByWebACLId",
48    "ListDistributionsByVpcOriginId",
49    "ListDistributionsByAnycastIpListId",
50    "ListDistributionsByConnectionMode",
51    "ListDistributionsByConnectionFunction",
52    "ListDistributionsByOwnedResource",
53    "ListDistributionsByTrustStore",
54    "ListDistributionsByRealtimeLogConfig",
55    "AssociateDistributionWebACL",
56    "DisassociateDistributionWebACL",
57    "CreateOriginAccessControl",
58    "GetOriginAccessControl",
59    "GetOriginAccessControlConfig",
60    "UpdateOriginAccessControl",
61    "DeleteOriginAccessControl",
62    "ListOriginAccessControls",
63    "CreateCachePolicy",
64    "GetCachePolicy",
65    "GetCachePolicyConfig",
66    "UpdateCachePolicy",
67    "DeleteCachePolicy",
68    "ListCachePolicies",
69    "CreateOriginRequestPolicy",
70    "GetOriginRequestPolicy",
71    "GetOriginRequestPolicyConfig",
72    "UpdateOriginRequestPolicy",
73    "DeleteOriginRequestPolicy",
74    "ListOriginRequestPolicies",
75    "CreateResponseHeadersPolicy",
76    "GetResponseHeadersPolicy",
77    "GetResponseHeadersPolicyConfig",
78    "UpdateResponseHeadersPolicy",
79    "DeleteResponseHeadersPolicy",
80    "ListResponseHeadersPolicies",
81    "CreateContinuousDeploymentPolicy",
82    "GetContinuousDeploymentPolicy",
83    "GetContinuousDeploymentPolicyConfig",
84    "UpdateContinuousDeploymentPolicy",
85    "DeleteContinuousDeploymentPolicy",
86    "ListContinuousDeploymentPolicies",
87    "CreateFunction",
88    "DescribeFunction",
89    "GetFunction",
90    "UpdateFunction",
91    "DeleteFunction",
92    "ListFunctions",
93    "PublishFunction",
94    "TestFunction",
95    "CreatePublicKey",
96    "GetPublicKey",
97    "GetPublicKeyConfig",
98    "UpdatePublicKey",
99    "DeletePublicKey",
100    "ListPublicKeys",
101    "CreateKeyGroup",
102    "GetKeyGroup",
103    "GetKeyGroupConfig",
104    "UpdateKeyGroup",
105    "DeleteKeyGroup",
106    "ListKeyGroups",
107    "CreateKeyValueStore",
108    "DescribeKeyValueStore",
109    "UpdateKeyValueStore",
110    "DeleteKeyValueStore",
111    "ListKeyValueStores",
112    "CreateCloudFrontOriginAccessIdentity",
113    "GetCloudFrontOriginAccessIdentity",
114    "GetCloudFrontOriginAccessIdentityConfig",
115    "UpdateCloudFrontOriginAccessIdentity",
116    "DeleteCloudFrontOriginAccessIdentity",
117    "ListCloudFrontOriginAccessIdentities",
118    "CreateMonitoringSubscription",
119    "GetMonitoringSubscription",
120    "DeleteMonitoringSubscription",
121    "CreateStreamingDistribution",
122    "CreateStreamingDistributionWithTags",
123    "GetStreamingDistribution",
124    "GetStreamingDistributionConfig",
125    "UpdateStreamingDistribution",
126    "DeleteStreamingDistribution",
127    "ListStreamingDistributions",
128    "CreateFieldLevelEncryptionConfig",
129    "GetFieldLevelEncryption",
130    "GetFieldLevelEncryptionConfig",
131    "UpdateFieldLevelEncryptionConfig",
132    "DeleteFieldLevelEncryptionConfig",
133    "ListFieldLevelEncryptionConfigs",
134    "CreateFieldLevelEncryptionProfile",
135    "GetFieldLevelEncryptionProfile",
136    "GetFieldLevelEncryptionProfileConfig",
137    "UpdateFieldLevelEncryptionProfile",
138    "DeleteFieldLevelEncryptionProfile",
139    "ListFieldLevelEncryptionProfiles",
140    "CreateRealtimeLogConfig",
141    "GetRealtimeLogConfig",
142    "UpdateRealtimeLogConfig",
143    "DeleteRealtimeLogConfig",
144    "ListRealtimeLogConfigs",
145    "CreateVpcOrigin",
146    "GetVpcOrigin",
147    "UpdateVpcOrigin",
148    "DeleteVpcOrigin",
149    "ListVpcOrigins",
150    "CreateAnycastIpList",
151    "GetAnycastIpList",
152    "UpdateAnycastIpList",
153    "DeleteAnycastIpList",
154    "ListAnycastIpLists",
155    "CreateTrustStore",
156    "GetTrustStore",
157    "UpdateTrustStore",
158    "DeleteTrustStore",
159    "ListTrustStores",
160    "GetResourcePolicy",
161    "PutResourcePolicy",
162    "DeleteResourcePolicy",
163    "CreateConnectionGroup",
164    "GetConnectionGroup",
165    "GetConnectionGroupByRoutingEndpoint",
166    "UpdateConnectionGroup",
167    "DeleteConnectionGroup",
168    "ListConnectionGroups",
169    "ListDomainConflicts",
170    "UpdateDomainAssociation",
171    "VerifyDnsConfiguration",
172    "GetManagedCertificateDetails",
173    "UpdateDistributionWithStagingConfig",
174    "CreateDistributionTenant",
175    "GetDistributionTenant",
176    "GetDistributionTenantByDomain",
177    "UpdateDistributionTenant",
178    "DeleteDistributionTenant",
179    "ListDistributionTenants",
180    "ListDistributionTenantsByCustomization",
181    "AssociateDistributionTenantWebACL",
182    "DisassociateDistributionTenantWebACL",
183    "CreateInvalidationForDistributionTenant",
184    "GetInvalidationForDistributionTenant",
185    "ListInvalidationsForDistributionTenant",
186    "CreateConnectionFunction",
187    "GetConnectionFunction",
188    "DescribeConnectionFunction",
189    "UpdateConnectionFunction",
190    "DeleteConnectionFunction",
191    "ListConnectionFunctions",
192    "PublishConnectionFunction",
193    "TestConnectionFunction",
194];
195
196pub struct CloudFrontService {
197    pub(crate) state: SharedCloudFrontState,
198    /// How long the propagation tick sleeps before flipping a freshly
199    /// created or updated Distribution / DistributionTenant /
200    /// ConnectionGroup / StreamingDistribution from `InProgress` to
201    /// `Deployed`. Real CloudFront takes ~15 minutes; the default of 1s
202    /// keeps SDK-driven integration tests fast while still simulating the
203    /// async transition. Tests can shrink it via
204    /// [`CloudFrontService::with_propagation_delay`], and operators can
205    /// override the default via the
206    /// `FAKECLOUD_CLOUDFRONT_STATUS_DELAY_SEC` env var.
207    pub(crate) propagation_delay: std::time::Duration,
208}
209
210/// Resolve the default `InProgress` -> `Deployed` delay from the
211/// `FAKECLOUD_CLOUDFRONT_STATUS_DELAY_SEC` env var, falling back to 1
212/// second. The value parses as either a non-negative integer (seconds) or
213/// a floating-point number; `0` keeps the transition synchronous, which is
214/// useful for fast unit tests. Invalid input falls back to 1 second so a
215/// typo in the env var can't accidentally hang the process.
216fn default_propagation_delay() -> std::time::Duration {
217    let Ok(raw) = std::env::var("FAKECLOUD_CLOUDFRONT_STATUS_DELAY_SEC") else {
218        return std::time::Duration::from_secs(1);
219    };
220    let trimmed = raw.trim();
221    if let Ok(secs) = trimmed.parse::<u64>() {
222        return std::time::Duration::from_secs(secs);
223    }
224    if let Ok(secs) = trimmed.parse::<f64>() {
225        if secs.is_finite() && secs >= 0.0 {
226            return std::time::Duration::from_secs_f64(secs);
227        }
228    }
229    std::time::Duration::from_secs(1)
230}
231
232impl CloudFrontService {
233    pub fn new(state: SharedCloudFrontState) -> Self {
234        Self {
235            state,
236            propagation_delay: default_propagation_delay(),
237        }
238    }
239
240    pub fn shared_state(&self) -> SharedCloudFrontState {
241        Arc::clone(&self.state)
242    }
243
244    /// Override the propagation delay used by `CreateDistribution`,
245    /// `UpdateDistribution`, and the equivalent DistributionTenant /
246    /// ConnectionGroup ops. Tests pass a small duration so they don't
247    /// have to wait wall-clock seconds for the `InProgress` -> `Deployed`
248    /// transition.
249    pub fn with_propagation_delay(mut self, delay: std::time::Duration) -> Self {
250        self.propagation_delay = delay;
251        self
252    }
253
254    /// Flip a stored Distribution's status synchronously. Returns `false`
255    /// if no distribution matches `id` across any account. The admin
256    /// endpoint `POST /_fakecloud/cloudfront/distributions/{id}/status`
257    /// calls this so tests can force `InProgress` <-> `Deployed` without
258    /// waiting on the propagation tick.
259    pub fn set_distribution_status(&self, id: &str, status: &str) -> bool {
260        let mut state = self.state.write();
261        for account in state.accounts.values_mut() {
262            if let Some(dist) = account.distributions.get_mut(id) {
263                dist.status = status.to_string();
264                return true;
265            }
266        }
267        false
268    }
269}
270
271impl Default for CloudFrontService {
272    fn default() -> Self {
273        Self::new(Arc::new(RwLock::new(CloudFrontAccounts::new())))
274    }
275}
276
277#[async_trait]
278impl AwsService for CloudFrontService {
279    fn service_name(&self) -> &str {
280        "cloudfront"
281    }
282
283    fn supported_actions(&self) -> &[&str] {
284        SUPPORTED_ACTIONS
285    }
286
287    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
288        let resolved = match route(&req.method, &req.raw_path, &req.raw_query) {
289            Some(r) => r,
290            None => {
291                return Err(aws_error(
292                    StatusCode::NOT_FOUND,
293                    "InvalidArgument",
294                    format!("Unknown CloudFront route: {} {}", req.method, req.raw_path),
295                ));
296            }
297        };
298
299        match resolved.action {
300            "CreateDistribution" => self.create_distribution(&req, false),
301            "CreateDistributionWithTags" => self.create_distribution(&req, true),
302            "GetDistribution" => self.get_distribution(&resolved),
303            "GetDistributionConfig" => self.get_distribution_config(&resolved),
304            "UpdateDistribution" => self.update_distribution(&req, &resolved),
305            "DeleteDistribution" => self.delete_distribution(&req, &resolved),
306            "ListDistributions" => self.list_distributions(&req),
307            "CopyDistribution" => self.copy_distribution(&req, &resolved),
308            "CreateInvalidation" => self.create_invalidation(&req, &resolved),
309            "GetInvalidation" => self.get_invalidation(&resolved),
310            "ListInvalidations" => self.list_invalidations(&resolved),
311            "TagResource" => self.tag_resource(&req),
312            "UntagResource" => self.untag_resource(&req),
313            "ListTagsForResource" => self.list_tags_for_resource(&req),
314            "AssociateAlias" => self.associate_alias(&req, &resolved),
315            "ListConflictingAliases" => self.list_conflicting_aliases(&req),
316            "AssociateDistributionWebACL" => self.associate_web_acl(&req, &resolved),
317            "DisassociateDistributionWebACL" => self.disassociate_web_acl(&req, &resolved),
318            "ListDistributionsByCachePolicyId"
319            | "ListDistributionsByOriginRequestPolicyId"
320            | "ListDistributionsByResponseHeadersPolicyId"
321            | "ListDistributionsByKeyGroup"
322            | "ListDistributionsByWebACLId"
323            | "ListDistributionsByVpcOriginId"
324            | "ListDistributionsByAnycastIpListId"
325            | "ListDistributionsByConnectionMode"
326            | "ListDistributionsByConnectionFunction"
327            | "ListDistributionsByOwnedResource"
328            | "ListDistributionsByTrustStore"
329            | "ListDistributionsByRealtimeLogConfig" => {
330                self.list_distributions_by(&req, &resolved, resolved.action)
331            }
332            "CreateOriginAccessControl" => self.create_origin_access_control(&req),
333            "GetOriginAccessControl" => self.get_origin_access_control(&resolved),
334            "GetOriginAccessControlConfig" => self.get_origin_access_control_config(&resolved),
335            "UpdateOriginAccessControl" => self.update_origin_access_control(&req, &resolved),
336            "DeleteOriginAccessControl" => self.delete_origin_access_control(&req, &resolved),
337            "ListOriginAccessControls" => self.list_origin_access_controls(&req),
338            "CreateCachePolicy" => self.create_cache_policy(&req),
339            "GetCachePolicy" => self.get_cache_policy(&resolved),
340            "GetCachePolicyConfig" => self.get_cache_policy_config(&resolved),
341            "UpdateCachePolicy" => self.update_cache_policy(&req, &resolved),
342            "DeleteCachePolicy" => self.delete_cache_policy(&req, &resolved),
343            "ListCachePolicies" => self.list_cache_policies(&req),
344            "CreateOriginRequestPolicy" => self.create_origin_request_policy(&req),
345            "GetOriginRequestPolicy" => self.get_origin_request_policy(&resolved),
346            "GetOriginRequestPolicyConfig" => self.get_origin_request_policy_config(&resolved),
347            "UpdateOriginRequestPolicy" => self.update_origin_request_policy(&req, &resolved),
348            "DeleteOriginRequestPolicy" => self.delete_origin_request_policy(&req, &resolved),
349            "ListOriginRequestPolicies" => self.list_origin_request_policies(&req),
350            "CreateResponseHeadersPolicy" => self.create_response_headers_policy(&req),
351            "GetResponseHeadersPolicy" => self.get_response_headers_policy(&resolved),
352            "GetResponseHeadersPolicyConfig" => self.get_response_headers_policy_config(&resolved),
353            "UpdateResponseHeadersPolicy" => self.update_response_headers_policy(&req, &resolved),
354            "DeleteResponseHeadersPolicy" => self.delete_response_headers_policy(&req, &resolved),
355            "ListResponseHeadersPolicies" => self.list_response_headers_policies(&req),
356            "CreateContinuousDeploymentPolicy" => self.create_continuous_deployment_policy(&req),
357            "GetContinuousDeploymentPolicy" => self.get_continuous_deployment_policy(&resolved),
358            "GetContinuousDeploymentPolicyConfig" => {
359                self.get_continuous_deployment_policy_config(&resolved)
360            }
361            "UpdateContinuousDeploymentPolicy" => {
362                self.update_continuous_deployment_policy(&req, &resolved)
363            }
364            "DeleteContinuousDeploymentPolicy" => {
365                self.delete_continuous_deployment_policy(&req, &resolved)
366            }
367            "ListContinuousDeploymentPolicies" => self.list_continuous_deployment_policies(&req),
368            "CreateFunction" => self.create_function(&req),
369            "DescribeFunction" => self.describe_function(&req, &resolved),
370            "GetFunction" => self.get_function(&req, &resolved),
371            "UpdateFunction" => self.update_function(&req, &resolved),
372            "DeleteFunction" => self.delete_function(&req, &resolved),
373            "ListFunctions" => self.list_functions(&req),
374            "PublishFunction" => self.publish_function(&req, &resolved),
375            "TestFunction" => self.test_function(&req, &resolved),
376            "CreatePublicKey" => self.create_public_key(&req),
377            "GetPublicKey" => self.get_public_key(&resolved),
378            "GetPublicKeyConfig" => self.get_public_key_config(&resolved),
379            "UpdatePublicKey" => self.update_public_key(&req, &resolved),
380            "DeletePublicKey" => self.delete_public_key(&req, &resolved),
381            "ListPublicKeys" => self.list_public_keys(&req),
382            "CreateKeyGroup" => self.create_key_group(&req),
383            "GetKeyGroup" => self.get_key_group(&resolved),
384            "GetKeyGroupConfig" => self.get_key_group_config(&resolved),
385            "UpdateKeyGroup" => self.update_key_group(&req, &resolved),
386            "DeleteKeyGroup" => self.delete_key_group(&req, &resolved),
387            "ListKeyGroups" => self.list_key_groups(&req),
388            "CreateKeyValueStore" => self.create_key_value_store(&req),
389            "DescribeKeyValueStore" => self.describe_key_value_store(&resolved),
390            "UpdateKeyValueStore" => self.update_key_value_store(&req, &resolved),
391            "DeleteKeyValueStore" => self.delete_key_value_store(&req, &resolved),
392            "ListKeyValueStores" => self.list_key_value_stores(&req),
393            "CreateCloudFrontOriginAccessIdentity" => self.create_oai(&req),
394            "GetCloudFrontOriginAccessIdentity" => self.get_oai(&resolved),
395            "GetCloudFrontOriginAccessIdentityConfig" => self.get_oai_config(&resolved),
396            "UpdateCloudFrontOriginAccessIdentity" => self.update_oai(&req, &resolved),
397            "DeleteCloudFrontOriginAccessIdentity" => self.delete_oai(&req, &resolved),
398            "ListCloudFrontOriginAccessIdentities" => self.list_oai(&req),
399            "CreateMonitoringSubscription" => self.create_monitoring_subscription(&req, &resolved),
400            "GetMonitoringSubscription" => self.get_monitoring_subscription(&resolved),
401            "DeleteMonitoringSubscription" => self.delete_monitoring_subscription(&resolved),
402            "CreateStreamingDistribution" => self.create_streaming_distribution(&req, false),
403            "CreateStreamingDistributionWithTags" => self.create_streaming_distribution(&req, true),
404            "GetStreamingDistribution" => self.get_streaming_distribution(&resolved),
405            "GetStreamingDistributionConfig" => self.get_streaming_distribution_config(&resolved),
406            "UpdateStreamingDistribution" => self.update_streaming_distribution(&req, &resolved),
407            "DeleteStreamingDistribution" => self.delete_streaming_distribution(&req, &resolved),
408            "ListStreamingDistributions" => self.list_streaming_distributions(&req),
409            "CreateFieldLevelEncryptionConfig" => self.create_field_level_encryption_config(&req),
410            "GetFieldLevelEncryption" => self.get_field_level_encryption(&resolved),
411            "GetFieldLevelEncryptionConfig" => self.get_field_level_encryption_config(&resolved),
412            "UpdateFieldLevelEncryptionConfig" => {
413                self.update_field_level_encryption_config(&req, &resolved)
414            }
415            "DeleteFieldLevelEncryptionConfig" => {
416                self.delete_field_level_encryption_config(&req, &resolved)
417            }
418            "ListFieldLevelEncryptionConfigs" => self.list_field_level_encryption_configs(&req),
419            "CreateFieldLevelEncryptionProfile" => self.create_field_level_encryption_profile(&req),
420            "GetFieldLevelEncryptionProfile" => self.get_field_level_encryption_profile(&resolved),
421            "GetFieldLevelEncryptionProfileConfig" => {
422                self.get_field_level_encryption_profile_config(&resolved)
423            }
424            "UpdateFieldLevelEncryptionProfile" => {
425                self.update_field_level_encryption_profile(&req, &resolved)
426            }
427            "DeleteFieldLevelEncryptionProfile" => {
428                self.delete_field_level_encryption_profile(&req, &resolved)
429            }
430            "ListFieldLevelEncryptionProfiles" => self.list_field_level_encryption_profiles(&req),
431            "CreateRealtimeLogConfig" => self.create_realtime_log_config(&req),
432            "GetRealtimeLogConfig" => self.get_realtime_log_config(&req),
433            "UpdateRealtimeLogConfig" => self.update_realtime_log_config(&req),
434            "DeleteRealtimeLogConfig" => self.delete_realtime_log_config(&req),
435            "ListRealtimeLogConfigs" => self.list_realtime_log_configs(&req),
436            "CreateVpcOrigin" => self.create_vpc_origin(&req),
437            "GetVpcOrigin" => self.get_vpc_origin(&resolved),
438            "UpdateVpcOrigin" => self.update_vpc_origin(&req, &resolved),
439            "DeleteVpcOrigin" => self.delete_vpc_origin(&req, &resolved),
440            "ListVpcOrigins" => self.list_vpc_origins(&req),
441            "CreateAnycastIpList" => self.create_anycast_ip_list(&req),
442            "GetAnycastIpList" => self.get_anycast_ip_list(&resolved),
443            "UpdateAnycastIpList" => self.update_anycast_ip_list(&req, &resolved),
444            "DeleteAnycastIpList" => self.delete_anycast_ip_list(&req, &resolved),
445            "ListAnycastIpLists" => self.list_anycast_ip_lists(&req),
446            "CreateTrustStore" => self.create_trust_store(&req),
447            "GetTrustStore" => self.get_trust_store(&resolved),
448            "UpdateTrustStore" => self.update_trust_store(&req, &resolved),
449            "DeleteTrustStore" => self.delete_trust_store(&req, &resolved),
450            "ListTrustStores" => self.list_trust_stores(&req),
451            "GetResourcePolicy" => self.get_resource_policy(&req),
452            "PutResourcePolicy" => self.put_resource_policy(&req),
453            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
454            "CreateConnectionGroup" => self.create_connection_group(&req),
455            "GetConnectionGroup" => self.get_connection_group(&resolved),
456            "GetConnectionGroupByRoutingEndpoint" => {
457                self.get_connection_group_by_routing_endpoint(&req)
458            }
459            "UpdateConnectionGroup" => self.update_connection_group(&req, &resolved),
460            "DeleteConnectionGroup" => self.delete_connection_group(&req, &resolved),
461            "ListConnectionGroups" => self.list_connection_groups(&req),
462            "ListDomainConflicts" => self.list_domain_conflicts(&req),
463            "UpdateDomainAssociation" => self.update_domain_association(&req),
464            "VerifyDnsConfiguration" => self.verify_dns_configuration(&req),
465            "GetManagedCertificateDetails" => self.get_managed_certificate_details(&resolved),
466            "UpdateDistributionWithStagingConfig" => {
467                self.update_distribution_with_staging_config(&req, &resolved)
468            }
469            "CreateDistributionTenant" => self.create_distribution_tenant(&req),
470            "GetDistributionTenant" => self.get_distribution_tenant(&resolved),
471            "GetDistributionTenantByDomain" => self.get_distribution_tenant_by_domain(&req),
472            "UpdateDistributionTenant" => self.update_distribution_tenant(&req, &resolved),
473            "DeleteDistributionTenant" => self.delete_distribution_tenant(&req, &resolved),
474            "ListDistributionTenants" => self.list_distribution_tenants(&req),
475            "ListDistributionTenantsByCustomization" => {
476                self.list_distribution_tenants_by_customization(&req)
477            }
478            "AssociateDistributionTenantWebACL" => {
479                self.associate_distribution_tenant_web_acl(&req, &resolved)
480            }
481            "DisassociateDistributionTenantWebACL" => {
482                self.disassociate_distribution_tenant_web_acl(&req, &resolved)
483            }
484            "CreateInvalidationForDistributionTenant" => {
485                self.create_invalidation_for_distribution_tenant(&req, &resolved)
486            }
487            "GetInvalidationForDistributionTenant" => {
488                self.get_invalidation_for_distribution_tenant(&resolved)
489            }
490            "ListInvalidationsForDistributionTenant" => {
491                self.list_invalidations_for_distribution_tenant(&resolved)
492            }
493            "CreateConnectionFunction" => self.create_connection_function(&req),
494            "GetConnectionFunction" => self.get_connection_function(&resolved),
495            "DescribeConnectionFunction" => self.describe_connection_function(&resolved),
496            "UpdateConnectionFunction" => self.update_connection_function(&req, &resolved),
497            "DeleteConnectionFunction" => self.delete_connection_function(&req, &resolved),
498            "ListConnectionFunctions" => self.list_connection_functions(&req),
499            "PublishConnectionFunction" => self.publish_connection_function(&req, &resolved),
500            "TestConnectionFunction" => self.test_connection_function(&req, &resolved),
501            other => Err(aws_error(
502                StatusCode::NOT_IMPLEMENTED,
503                "InvalidAction",
504                format!("CloudFront action {other} is not implemented yet"),
505            )),
506        }
507    }
508}
509
510// ─── Distribution handlers ────────────────────────────────────────────
511
512impl CloudFrontService {
513    fn create_distribution(
514        &self,
515        req: &AwsRequest,
516        with_tags: bool,
517    ) -> Result<AwsResponse, AwsServiceError> {
518        let (config, tags) = if with_tags {
519            let parsed: DistributionConfigWithTags = xml_io::from_xml_root(&req.body)
520                .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
521            let tags = parsed
522                .tags
523                .items
524                .map(|i| {
525                    i.tag
526                        .into_iter()
527                        .map(|t| Tag {
528                            key: t.key,
529                            value: t.value,
530                        })
531                        .collect()
532                })
533                .unwrap_or_default();
534            (parsed.distribution_config, tags)
535        } else {
536            let parsed: DistributionConfig = xml_io::from_xml_root(&req.body)
537                .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
538            (parsed, Vec::new())
539        };
540
541        validate_caller_reference(&config.caller_reference)?;
542        validate_origins(&config)?;
543
544        let mut state = self.state.write();
545        let account = state.entry(account_id(req));
546
547        if let Some(existing) = account
548            .distributions
549            .values()
550            .find(|d| d.config.caller_reference == config.caller_reference)
551        {
552            return Err(aws_error(
553                StatusCode::CONFLICT,
554                "DistributionAlreadyExists",
555                format!(
556                    "Distribution with the same CallerReference exists: {}",
557                    existing.id
558                ),
559            ));
560        }
561
562        let id = generate_distribution_id();
563        let now = Utc::now();
564        let etag = generate_etag();
565        let domain = format!("{}.cloudfront.net", id.to_lowercase());
566        let arn = format!(
567            "arn:aws:cloudfront::{}:distribution/{}",
568            account_id(req),
569            id
570        );
571
572        let stored = StoredDistribution {
573            id: id.clone(),
574            arn: arn.clone(),
575            // Real CloudFront returns InProgress immediately and flips to
576            // Deployed ~15 minutes later once the edge propagation completes.
577            // The spawn below mirrors that lifecycle on a configurable delay.
578            status: "InProgress".to_string(),
579            last_modified_time: now,
580            domain_name: domain,
581            in_progress_invalidation_batches: 0,
582            etag: etag.clone(),
583            config,
584        };
585        account.distributions.insert(id.clone(), stored.clone());
586        if !tags.is_empty() {
587            account.tags.insert(arn.clone(), tags);
588        }
589        drop(state);
590
591        self.schedule_distribution_deploy(id.clone());
592
593        let body = build_distribution_xml(&stored);
594        let mut headers = HeaderMap::new();
595        set_header(&mut headers, ETAG, &etag);
596        set_header(&mut headers, LOCATION, &stored.arn);
597        Ok(xml_response(StatusCode::CREATED, body, headers))
598    }
599
600    /// Spawn a tokio task that flips the named distribution from
601    /// `InProgress` to `Deployed` after `propagation_delay`. Used by
602    /// `CreateDistribution` and `UpdateDistribution` to model the real
603    /// CloudFront edge propagation lifecycle.
604    fn schedule_distribution_deploy(&self, id: String) {
605        let state = Arc::clone(&self.state);
606        let delay = self.propagation_delay;
607        tokio::spawn(async move {
608            tokio::time::sleep(delay).await;
609            let mut s = state.write();
610            for account in s.accounts.values_mut() {
611                if let Some(d) = account.distributions.get_mut(&id) {
612                    if d.status == "InProgress" {
613                        d.status = "Deployed".to_string();
614                    }
615                    return;
616                }
617            }
618        });
619    }
620
621    /// Same as [`schedule_distribution_deploy`] but for DistributionTenants.
622    pub(crate) fn schedule_distribution_tenant_deploy(&self, id: String) {
623        let state = Arc::clone(&self.state);
624        let delay = self.propagation_delay;
625        tokio::spawn(async move {
626            tokio::time::sleep(delay).await;
627            let mut s = state.write();
628            for account in s.accounts.values_mut() {
629                if let Some(t) = account.distribution_tenants.get_mut(&id) {
630                    if t.status == "InProgress" {
631                        t.status = "Deployed".to_string();
632                    }
633                    return;
634                }
635            }
636        });
637    }
638
639    /// Same as [`schedule_distribution_deploy`] but for ConnectionGroups.
640    pub(crate) fn schedule_connection_group_deploy(&self, id: String) {
641        let state = Arc::clone(&self.state);
642        let delay = self.propagation_delay;
643        tokio::spawn(async move {
644            tokio::time::sleep(delay).await;
645            let mut s = state.write();
646            for account in s.accounts.values_mut() {
647                if let Some(g) = account.connection_groups.get_mut(&id) {
648                    if g.status == "InProgress" {
649                        g.status = "Deployed".to_string();
650                    }
651                    return;
652                }
653            }
654        });
655    }
656
657    /// Same as [`schedule_distribution_deploy`] but for StreamingDistributions.
658    /// Real CloudFront mirrors the `InProgress` -> `Deployed` lifecycle for
659    /// RTMP streaming distributions, even though the resource is largely
660    /// deprecated.
661    pub(crate) fn schedule_streaming_distribution_deploy(&self, id: String) {
662        let state = Arc::clone(&self.state);
663        let delay = self.propagation_delay;
664        tokio::spawn(async move {
665            tokio::time::sleep(delay).await;
666            let mut s = state.write();
667            for account in s.accounts.values_mut() {
668                if let Some(d) = account.streaming_distributions.get_mut(&id) {
669                    if d.status == "InProgress" {
670                        d.status = "Deployed".to_string();
671                    }
672                    return;
673                }
674            }
675        });
676    }
677
678    fn get_distribution(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
679        let id = route
680            .id
681            .as_deref()
682            .ok_or_else(|| invalid_argument("missing distribution id"))?;
683        let state = self.state.read();
684        let account = state
685            .accounts
686            .get(DEFAULT_ACCOUNT)
687            .ok_or_else(|| no_such_distribution(id))?;
688        let dist = account
689            .distributions
690            .get(id)
691            .ok_or_else(|| no_such_distribution(id))?
692            .clone();
693        drop(state);
694        let body = build_distribution_xml(&dist);
695        let mut headers = HeaderMap::new();
696        set_header(&mut headers, ETAG, &dist.etag);
697        Ok(xml_response(StatusCode::OK, body, headers))
698    }
699
700    fn get_distribution_config(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
701        let id = route
702            .id
703            .as_deref()
704            .ok_or_else(|| invalid_argument("missing distribution id"))?;
705        let state = self.state.read();
706        let account = state
707            .accounts
708            .get(DEFAULT_ACCOUNT)
709            .ok_or_else(|| no_such_distribution(id))?;
710        let dist = account
711            .distributions
712            .get(id)
713            .ok_or_else(|| no_such_distribution(id))?
714            .clone();
715        drop(state);
716        let body = xml_io::to_xml_root("DistributionConfig", &dist.config)
717            .map_err(|e| internal_error(format!("xml encode failed: {e}")))?;
718        let mut headers = HeaderMap::new();
719        set_header(&mut headers, ETAG, &dist.etag);
720        Ok(xml_response(StatusCode::OK, body, headers))
721    }
722
723    fn update_distribution(
724        &self,
725        req: &AwsRequest,
726        route: &Route,
727    ) -> Result<AwsResponse, AwsServiceError> {
728        let id = route
729            .id
730            .as_deref()
731            .ok_or_else(|| invalid_argument("missing distribution id"))?;
732        let if_match = req
733            .headers
734            .get(IF_MATCH)
735            .and_then(|v| v.to_str().ok())
736            .ok_or_else(|| {
737                aws_error(
738                    StatusCode::BAD_REQUEST,
739                    "InvalidIfMatchVersion",
740                    "Missing If-Match header for UpdateDistribution",
741                )
742            })?
743            .to_string();
744        let new_config: DistributionConfig = xml_io::from_xml_root(&req.body)
745            .map_err(|e| invalid_argument(format!("invalid DistributionConfig XML: {e}")))?;
746        validate_caller_reference(&new_config.caller_reference)?;
747        validate_origins(&new_config)?;
748
749        let mut state = self.state.write();
750        let account = state
751            .accounts
752            .get_mut(DEFAULT_ACCOUNT)
753            .ok_or_else(|| no_such_distribution(id))?;
754        let dist = account
755            .distributions
756            .get_mut(id)
757            .ok_or_else(|| no_such_distribution(id))?;
758        if dist.etag != if_match {
759            return Err(aws_error(
760                StatusCode::PRECONDITION_FAILED,
761                "PreconditionFailed",
762                "If-Match header does not match the current ETag",
763            ));
764        }
765        // ETag stability: only bump the ETag and flip status back to
766        // InProgress when the new config actually differs from what we
767        // have on disk. A no-op UpdateDistribution (PUT the same config
768        // back) leaves the ETag intact, matching AWS behavior.
769        let config_changed = !configs_equal(&dist.config, &new_config);
770        if config_changed {
771            dist.config = new_config;
772            dist.etag = generate_etag();
773            dist.last_modified_time = Utc::now();
774            // UpdateDistribution kicks off a fresh edge propagation; AWS
775            // flips the status back to InProgress until the new config
776            // lands.
777            dist.status = "InProgress".to_string();
778        }
779        let snapshot = dist.clone();
780        drop(state);
781
782        if config_changed {
783            self.schedule_distribution_deploy(id.to_string());
784        }
785
786        let body = build_distribution_xml(&snapshot);
787        let mut headers = HeaderMap::new();
788        set_header(&mut headers, ETAG, &snapshot.etag);
789        Ok(xml_response(StatusCode::OK, body, headers))
790    }
791
792    fn delete_distribution(
793        &self,
794        req: &AwsRequest,
795        route: &Route,
796    ) -> Result<AwsResponse, AwsServiceError> {
797        let id = route
798            .id
799            .as_deref()
800            .ok_or_else(|| invalid_argument("missing distribution id"))?;
801        let if_match = req
802            .headers
803            .get(IF_MATCH)
804            .and_then(|v| v.to_str().ok())
805            .ok_or_else(|| {
806                aws_error(
807                    StatusCode::BAD_REQUEST,
808                    "InvalidIfMatchVersion",
809                    "Missing If-Match header for DeleteDistribution",
810                )
811            })?
812            .to_string();
813        let mut state = self.state.write();
814        let account = state
815            .accounts
816            .get_mut(DEFAULT_ACCOUNT)
817            .ok_or_else(|| no_such_distribution(id))?;
818        {
819            let dist = account
820                .distributions
821                .get(id)
822                .ok_or_else(|| no_such_distribution(id))?;
823            if dist.etag != if_match {
824                return Err(aws_error(
825                    StatusCode::PRECONDITION_FAILED,
826                    "PreconditionFailed",
827                    "If-Match header does not match the current ETag",
828                ));
829            }
830            if dist.config.enabled {
831                return Err(aws_error(
832                    StatusCode::PRECONDITION_FAILED,
833                    "DistributionNotDisabled",
834                    "Distribution must be disabled before delete",
835                ));
836            }
837        }
838        let removed = account.distributions.remove(id).unwrap();
839        account.tags.remove(&removed.arn);
840        Ok(empty_response(StatusCode::NO_CONTENT))
841    }
842
843    fn list_distributions(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
844        let state = self.state.read();
845        let mut dists: Vec<StoredDistribution> = state
846            .accounts
847            .values()
848            .flat_map(|a| a.distributions.values().cloned())
849            .collect();
850        dists.sort_by_key(|a| a.last_modified_time);
851        drop(state);
852        let body = build_distribution_list_xml(&dists, "DistributionList");
853        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
854    }
855
856    fn list_distributions_by(
857        &self,
858        req: &AwsRequest,
859        route: &Route,
860        action: &str,
861    ) -> Result<AwsResponse, AwsServiceError> {
862        // Each "by-X" listing has a Smithy-required identifier (path or query)
863        // plus, for ConnectionMode, an enum-constrained discriminator. The
864        // synthetic probe sends `negative_omit_*` variants without these, so
865        // every handler that can short-circuit on a missing identifier must
866        // do so up-front. Otherwise the empty-list 200 looks like an honest
867        // pass to the probe and a 100% conformance number is unreachable.
868        match action {
869            // Path-id ops: route.id is the URL placeholder. If the probe
870            // omitted the field, the substitution left the literal
871            // `{Member}` braces in place — easy to detect.
872            "ListDistributionsByCachePolicyId"
873            | "ListDistributionsByOriginRequestPolicyId"
874            | "ListDistributionsByResponseHeadersPolicyId"
875            | "ListDistributionsByKeyGroup"
876            | "ListDistributionsByWebACLId"
877            | "ListDistributionsByVpcOriginId"
878            | "ListDistributionsByAnycastIpListId"
879            | "ListDistributionsByOwnedResource" => {
880                let id = route.id.as_deref().unwrap_or("");
881                if is_placeholder_label(id) {
882                    return Err(invalid_argument(format!(
883                        "Required URL identifier for {action} is missing or invalid"
884                    )));
885                }
886            }
887            "ListDistributionsByConnectionMode" => {
888                let id = route.id.as_deref().unwrap_or("");
889                if is_placeholder_label(id) {
890                    return Err(invalid_argument(
891                        "ConnectionMode is required for ListDistributionsByConnectionMode",
892                    ));
893                }
894                if id != "direct" && id != "tenant-only" {
895                    return Err(invalid_argument(format!(
896                        "ConnectionMode must be 'direct' or 'tenant-only', got '{id}'"
897                    )));
898                }
899            }
900            "ListDistributionsByConnectionFunction"
901                if parse_query_value(&req.raw_query, "ConnectionFunctionIdentifier").is_none() =>
902            {
903                return Err(invalid_argument(
904                    "ConnectionFunctionIdentifier query parameter is required",
905                ));
906            }
907            "ListDistributionsByTrustStore"
908                if parse_query_value(&req.raw_query, "TrustStoreIdentifier").is_none() =>
909            {
910                return Err(invalid_argument(
911                    "TrustStoreIdentifier query parameter is required",
912                ));
913            }
914            _ => {}
915        }
916
917        // The "by-X" listings each have a distinct response root element.
918        // We never index distributions by the predicate (that would require
919        // shipping each policy/key-group/etc service first), so each
920        // response is empty until those services land.
921        let root = match action {
922            "ListDistributionsByCachePolicyId"
923            | "ListDistributionsByOriginRequestPolicyId"
924            | "ListDistributionsByResponseHeadersPolicyId"
925            | "ListDistributionsByKeyGroup"
926            | "ListDistributionsByVpcOriginId" => "DistributionIdList",
927            "ListDistributionsByOwnedResource" => "DistributionIdOwnerList",
928            _ => "DistributionList",
929        };
930        let body = build_empty_distribution_id_list(root);
931        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
932    }
933
934    fn copy_distribution(
935        &self,
936        req: &AwsRequest,
937        route: &Route,
938    ) -> Result<AwsResponse, AwsServiceError> {
939        let primary_id = route
940            .id
941            .as_deref()
942            .ok_or_else(|| invalid_argument("missing primary distribution id"))?;
943        let if_match = req
944            .headers
945            .get(IF_MATCH)
946            .and_then(|v| v.to_str().ok())
947            .ok_or_else(|| {
948                aws_error(
949                    StatusCode::BAD_REQUEST,
950                    "InvalidIfMatchVersion",
951                    "Missing If-Match header for CopyDistribution",
952                )
953            })?
954            .to_string();
955        let parsed: CopyDistributionRequest = xml_io::from_xml_root(&req.body)
956            .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
957        validate_caller_reference(&parsed.caller_reference)?;
958        let mut state = self.state.write();
959        let account = state
960            .accounts
961            .get_mut(DEFAULT_ACCOUNT)
962            .ok_or_else(|| no_such_distribution(primary_id))?;
963        let primary = account
964            .distributions
965            .get(primary_id)
966            .ok_or_else(|| no_such_distribution(primary_id))?
967            .clone();
968        if primary.etag != if_match {
969            return Err(aws_error(
970                StatusCode::PRECONDITION_FAILED,
971                "PreconditionFailed",
972                "If-Match header does not match the current ETag",
973            ));
974        }
975        if account
976            .distributions
977            .values()
978            .any(|d| d.config.caller_reference == parsed.caller_reference)
979        {
980            return Err(aws_error(
981                StatusCode::CONFLICT,
982                "DistributionAlreadyExists",
983                "Distribution with the same CallerReference exists",
984            ));
985        }
986        let new_id = generate_distribution_id();
987        let mut config = primary.config.clone();
988        config.caller_reference = parsed.caller_reference;
989        config.enabled = parsed.enabled.unwrap_or(false);
990        config.staging = parsed.staging;
991        let now = Utc::now();
992        let etag = generate_etag();
993        let arn = format!(
994            "arn:aws:cloudfront::{}:distribution/{}",
995            account_id(req),
996            new_id
997        );
998        let stored = StoredDistribution {
999            id: new_id.clone(),
1000            arn: arn.clone(),
1001            status: "InProgress".to_string(),
1002            last_modified_time: now,
1003            domain_name: format!("{}.cloudfront.net", new_id.to_lowercase()),
1004            in_progress_invalidation_batches: 0,
1005            etag: etag.clone(),
1006            config,
1007        };
1008        account.distributions.insert(new_id.clone(), stored.clone());
1009        drop(state);
1010        self.schedule_distribution_deploy(new_id);
1011        let body = build_distribution_xml(&stored);
1012        let mut headers = HeaderMap::new();
1013        set_header(&mut headers, ETAG, &etag);
1014        set_header(&mut headers, LOCATION, &stored.arn);
1015        Ok(xml_response(StatusCode::CREATED, body, headers))
1016    }
1017}
1018
1019#[derive(Debug, serde::Deserialize, Default)]
1020#[serde(rename_all = "PascalCase")]
1021struct CopyDistributionRequest {
1022    caller_reference: String,
1023    #[serde(default)]
1024    enabled: Option<bool>,
1025    #[serde(default)]
1026    staging: Option<bool>,
1027}
1028
1029// ─── Invalidations ────────────────────────────────────────────────────
1030
1031impl CloudFrontService {
1032    fn create_invalidation(
1033        &self,
1034        req: &AwsRequest,
1035        route: &Route,
1036    ) -> Result<AwsResponse, AwsServiceError> {
1037        let dist_id = route
1038            .id
1039            .as_deref()
1040            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1041        let batch: InvalidationBatch = xml_io::from_xml_root(&req.body)
1042            .map_err(|e| invalid_argument(format!("invalid InvalidationBatch XML: {e}")))?;
1043        if batch.caller_reference.is_empty() {
1044            return Err(invalid_argument("CallerReference is required"));
1045        }
1046        if batch.paths.quantity < 1 {
1047            return Err(invalid_argument(
1048                "InvalidationBatch.Paths must be non-empty",
1049            ));
1050        }
1051        let mut state = self.state.write();
1052        let account = state.entry(DEFAULT_ACCOUNT);
1053        if !account.distributions.contains_key(dist_id) {
1054            return Err(no_such_distribution(dist_id));
1055        }
1056        let id = generate_invalidation_id();
1057        let stored = StoredInvalidation {
1058            id: id.clone(),
1059            distribution_id: dist_id.to_string(),
1060            status: "Completed".to_string(),
1061            create_time: Utc::now(),
1062            batch: batch.clone(),
1063        };
1064        account.invalidations.insert(id.clone(), stored.clone());
1065        drop(state);
1066        let body = build_invalidation_xml(&stored);
1067        let mut headers = HeaderMap::new();
1068        set_header(
1069            &mut headers,
1070            LOCATION,
1071            &format!(
1072                "/2020-05-31/distribution/{dist_id}/invalidation/{}",
1073                stored.id
1074            ),
1075        );
1076        Ok(xml_response(StatusCode::CREATED, body, headers))
1077    }
1078
1079    fn get_invalidation(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
1080        let dist_id = route
1081            .id
1082            .as_deref()
1083            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1084        let inv_id = route
1085            .second_id
1086            .as_deref()
1087            .ok_or_else(|| invalid_argument("missing invalidation id"))?;
1088        let state = self.state.read();
1089        let account = state
1090            .accounts
1091            .get(DEFAULT_ACCOUNT)
1092            .ok_or_else(|| no_such_invalidation(inv_id))?;
1093        if !account.distributions.contains_key(dist_id) {
1094            return Err(no_such_distribution(dist_id));
1095        }
1096        let inv = account
1097            .invalidations
1098            .get(inv_id)
1099            .filter(|i| i.distribution_id == dist_id)
1100            .ok_or_else(|| no_such_invalidation(inv_id))?
1101            .clone();
1102        drop(state);
1103        let body = build_invalidation_xml(&inv);
1104        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1105    }
1106
1107    fn list_invalidations(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
1108        let dist_id = route
1109            .id
1110            .as_deref()
1111            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1112        let state = self.state.read();
1113        let account = state
1114            .accounts
1115            .get(DEFAULT_ACCOUNT)
1116            .ok_or_else(|| no_such_distribution(dist_id))?;
1117        if !account.distributions.contains_key(dist_id) {
1118            return Err(no_such_distribution(dist_id));
1119        }
1120        let mut items: Vec<&StoredInvalidation> = account
1121            .invalidations
1122            .values()
1123            .filter(|i| i.distribution_id == dist_id)
1124            .collect();
1125        items.sort_by_key(|a| a.create_time);
1126        let body = build_invalidation_list_xml(&items);
1127        drop(state);
1128        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1129    }
1130}
1131
1132// ─── Tags ─────────────────────────────────────────────────────────────
1133
1134impl CloudFrontService {
1135    fn parse_arn_query(query: &str) -> Option<String> {
1136        for pair in query.split('&').filter(|p| !p.is_empty()) {
1137            if let Some(rest) = pair.strip_prefix("Resource=") {
1138                return Some(percent_decode(rest));
1139            }
1140        }
1141        None
1142    }
1143
1144    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1145        let arn = Self::parse_arn_query(&req.raw_query)
1146            .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1147        let parsed: ModelTags = xml_io::from_xml_root(&req.body)
1148            .map_err(|e| invalid_argument(format!("invalid Tags XML: {e}")))?;
1149        let new_tags: Vec<Tag> = parsed
1150            .items
1151            .map(|i| {
1152                i.tag
1153                    .into_iter()
1154                    .map(|t| Tag {
1155                        key: t.key,
1156                        value: t.value,
1157                    })
1158                    .collect()
1159            })
1160            .unwrap_or_default();
1161        let mut state = self.state.write();
1162        let account = state.entry(DEFAULT_ACCOUNT);
1163        let entry = account.tags.entry(arn).or_default();
1164        for tag in new_tags {
1165            if let Some(existing) = entry.iter_mut().find(|t| t.key == tag.key) {
1166                existing.value = tag.value;
1167            } else {
1168                entry.push(tag);
1169            }
1170        }
1171        Ok(empty_response(StatusCode::NO_CONTENT))
1172    }
1173
1174    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1175        let arn = Self::parse_arn_query(&req.raw_query)
1176            .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1177        let parsed: TagKeys = xml_io::from_xml_root(&req.body)
1178            .map_err(|e| invalid_argument(format!("invalid TagKeys XML: {e}")))?;
1179        let keys: Vec<String> = parsed.items.map(|k| k.key).unwrap_or_default();
1180        let mut state = self.state.write();
1181        let account = state.entry(DEFAULT_ACCOUNT);
1182        if let Some(existing) = account.tags.get_mut(&arn) {
1183            existing.retain(|t| !keys.contains(&t.key));
1184        }
1185        Ok(empty_response(StatusCode::NO_CONTENT))
1186    }
1187
1188    fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1189        let arn = Self::parse_arn_query(&req.raw_query)
1190            .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1191        let state = self.state.read();
1192        let tags = state
1193            .accounts
1194            .get(DEFAULT_ACCOUNT)
1195            .and_then(|a| a.tags.get(&arn))
1196            .cloned()
1197            .unwrap_or_default();
1198        drop(state);
1199        let body = build_tags_xml(&tags);
1200        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1201    }
1202}
1203
1204// ─── Aliases / WebACL ─────────────────────────────────────────────────
1205
1206impl CloudFrontService {
1207    fn associate_alias(
1208        &self,
1209        req: &AwsRequest,
1210        route: &Route,
1211    ) -> Result<AwsResponse, AwsServiceError> {
1212        let id = route
1213            .id
1214            .as_deref()
1215            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1216        let alias = parse_query_value(&req.raw_query, "Alias")
1217            .ok_or_else(|| invalid_argument("Alias query parameter is required"))?;
1218        let mut state = self.state.write();
1219        let account = state
1220            .accounts
1221            .get_mut(DEFAULT_ACCOUNT)
1222            .ok_or_else(|| no_such_distribution(id))?;
1223        // Reject if the alias is already attached to a different distribution.
1224        if let Some(other) = account.distributions.values().find(|d| {
1225            d.id != id
1226                && d.config
1227                    .aliases
1228                    .as_ref()
1229                    .and_then(|a| a.items.as_ref())
1230                    .is_some_and(|i| i.cname.iter().any(|c| c == &alias))
1231        }) {
1232            return Err(aws_error(
1233                StatusCode::CONFLICT,
1234                "CNAMEAlreadyExists",
1235                format!(
1236                    "Alias {alias} is already associated with distribution {}",
1237                    other.id
1238                ),
1239            ));
1240        }
1241        let dist = account
1242            .distributions
1243            .get_mut(id)
1244            .ok_or_else(|| no_such_distribution(id))?;
1245        let aliases = dist.config.aliases.get_or_insert_with(Default::default);
1246        let items = aliases
1247            .items
1248            .get_or_insert_with(crate::model::AliasItems::default);
1249        if !items.cname.iter().any(|c| c == &alias) {
1250            items.cname.push(alias.clone());
1251            aliases.quantity = items.cname.len() as i32;
1252        }
1253        dist.etag = generate_etag();
1254        dist.last_modified_time = Utc::now();
1255        Ok(empty_response(StatusCode::OK))
1256    }
1257
1258    fn list_conflicting_aliases(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1259        let alias = parse_query_value(&req.raw_query, "Alias")
1260            .ok_or_else(|| invalid_argument("Alias query parameter is required"))?;
1261        let dist_id = parse_query_value(&req.raw_query, "DistributionId")
1262            .ok_or_else(|| invalid_argument("DistributionId query parameter is required"))?;
1263        // aliasString max 253, distributionIdString max 25 per the Smithy
1264        // model. Reject probe-generated boundary variants that overrun the
1265        // documented length so they produce the declared InvalidArgument
1266        // instead of an empty 200.
1267        if alias.len() > 253 {
1268            return Err(invalid_argument(format!(
1269                "Alias length {} exceeds maximum 253",
1270                alias.len()
1271            )));
1272        }
1273        if dist_id.len() > 25 {
1274            return Err(invalid_argument(format!(
1275                "DistributionId length {} exceeds maximum 25",
1276                dist_id.len()
1277            )));
1278        }
1279        if let Some(max_items) = parse_query_value(&req.raw_query, "MaxItems") {
1280            let n: i64 = max_items.parse().map_err(|_| {
1281                invalid_argument(format!("MaxItems must be an integer, got '{max_items}'"))
1282            })?;
1283            if n > 100 {
1284                return Err(invalid_argument(format!(
1285                    "MaxItems {n} exceeds maximum 100"
1286                )));
1287            }
1288        }
1289        // We never produce conflicts because every alias is owned by one
1290        // distribution at most. Return an empty list with the proper shape.
1291        let body = format!(
1292            "{XML_DECL}<ConflictingAliasesList xmlns=\"{NS}\"><Quantity>0</Quantity></ConflictingAliasesList>",
1293            NS = crate::NAMESPACE,
1294            XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1295        );
1296        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1297    }
1298
1299    fn associate_web_acl(
1300        &self,
1301        req: &AwsRequest,
1302        route: &Route,
1303    ) -> Result<AwsResponse, AwsServiceError> {
1304        let id = route
1305            .id
1306            .as_deref()
1307            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1308        let parsed: AssociateAliasRequest = xml_io::from_xml_root(&req.body)
1309            .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
1310        let mut state = self.state.write();
1311        let account = state
1312            .accounts
1313            .get_mut(DEFAULT_ACCOUNT)
1314            .ok_or_else(|| no_such_distribution(id))?;
1315        let dist = account
1316            .distributions
1317            .get_mut(id)
1318            .ok_or_else(|| no_such_distribution(id))?;
1319        dist.config.web_acl_id = Some(parsed.web_acl_arn.clone());
1320        dist.etag = generate_etag();
1321        dist.last_modified_time = Utc::now();
1322        let body = format!(
1323            "{XML_DECL}<AssociateDistributionWebACLResult xmlns=\"{NS}\"><Id>{}</Id><WebACLArn>{}</WebACLArn></AssociateDistributionWebACLResult>",
1324            esc(id), esc(&parsed.web_acl_arn),
1325            NS = crate::NAMESPACE,
1326            XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1327        );
1328        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1329    }
1330
1331    fn disassociate_web_acl(
1332        &self,
1333        _req: &AwsRequest,
1334        route: &Route,
1335    ) -> Result<AwsResponse, AwsServiceError> {
1336        let id = route
1337            .id
1338            .as_deref()
1339            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1340        // DisassociateDistributionWebACL's Smithy model declares EntityNotFound
1341        // (not NoSuchDistribution) for unknown distribution IDs.
1342        let entity_not_found = || {
1343            aws_error(
1344                StatusCode::NOT_FOUND,
1345                "EntityNotFound",
1346                format!("The specified distribution does not exist: {id}"),
1347            )
1348        };
1349        let mut state = self.state.write();
1350        let account = state
1351            .accounts
1352            .get_mut(DEFAULT_ACCOUNT)
1353            .ok_or_else(entity_not_found)?;
1354        let dist = account
1355            .distributions
1356            .get_mut(id)
1357            .ok_or_else(entity_not_found)?;
1358        dist.config.web_acl_id = None;
1359        dist.etag = generate_etag();
1360        dist.last_modified_time = Utc::now();
1361        let body = format!(
1362            "{XML_DECL}<DisassociateDistributionWebACLResult xmlns=\"{NS}\"><Id>{}</Id></DisassociateDistributionWebACLResult>",
1363            esc(id),
1364            NS = crate::NAMESPACE,
1365            XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1366        );
1367        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1368    }
1369}
1370
1371#[derive(serde::Deserialize, Default, Debug)]
1372#[serde(rename_all = "PascalCase")]
1373struct AssociateAliasRequest {
1374    #[serde(rename = "WebACLArn", default)]
1375    web_acl_arn: String,
1376}
1377
1378// ─── XML body builders ────────────────────────────────────────────────
1379
1380/// XML-escape a user-provided string before injecting it into hand-rolled
1381/// XML response bodies. The 5 standard XML metacharacters are escaped;
1382/// everything else passes through unchanged. Keep this in sync with
1383/// `quick_xml`'s entity table — using their own primitive would mean a
1384/// `Writer` per call and we serialize directly into a `String`.
1385pub(crate) fn esc(s: &str) -> String {
1386    let mut out = String::with_capacity(s.len());
1387    for c in s.chars() {
1388        match c {
1389            '&' => out.push_str("&amp;"),
1390            '<' => out.push_str("&lt;"),
1391            '>' => out.push_str("&gt;"),
1392            '"' => out.push_str("&quot;"),
1393            '\'' => out.push_str("&apos;"),
1394            _ => out.push(c),
1395        }
1396    }
1397    out
1398}
1399
1400pub(crate) fn build_distribution_xml(dist: &StoredDistribution) -> String {
1401    let mut out = String::with_capacity(2048);
1402    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1403    out.push_str(&format!(
1404        "<Distribution xmlns=\"{ns}\">",
1405        ns = crate::NAMESPACE
1406    ));
1407    out.push_str(&format!("<Id>{}</Id>", esc(&dist.id)));
1408    out.push_str(&format!("<ARN>{}</ARN>", esc(&dist.arn)));
1409    out.push_str(&format!("<Status>{}</Status>", esc(&dist.status)));
1410    out.push_str(&format!(
1411        "<LastModifiedTime>{}</LastModifiedTime>",
1412        rfc3339(&dist.last_modified_time)
1413    ));
1414    out.push_str(&format!(
1415        "<InProgressInvalidationBatches>{}</InProgressInvalidationBatches>",
1416        dist.in_progress_invalidation_batches
1417    ));
1418    out.push_str(&format!(
1419        "<DomainName>{}</DomainName>",
1420        esc(&dist.domain_name)
1421    ));
1422    out.push_str("<ActiveTrustedSigners><Enabled>false</Enabled><Quantity>0</Quantity></ActiveTrustedSigners>");
1423    out.push_str("<ActiveTrustedKeyGroups><Enabled>false</Enabled><Quantity>0</Quantity></ActiveTrustedKeyGroups>");
1424    let inner = quick_xml::se::to_string_with_root("DistributionConfig", &dist.config)
1425        .unwrap_or_else(|_| String::new());
1426    out.push_str(&inner);
1427    out.push_str("</Distribution>");
1428    out
1429}
1430
1431fn build_distribution_list_xml(dists: &[StoredDistribution], root: &str) -> String {
1432    let mut out = String::with_capacity(2048);
1433    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1434    out.push_str(&format!("<{root} xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1435    out.push_str("<Marker></Marker>");
1436    out.push_str(&format!("<MaxItems>{}</MaxItems>", dists.len().max(100)));
1437    out.push_str("<IsTruncated>false</IsTruncated>");
1438    out.push_str(&format!("<Quantity>{}</Quantity>", dists.len()));
1439    if dists.is_empty() {
1440        out.push_str(&format!("</{root}>"));
1441        return out;
1442    }
1443    out.push_str("<Items>");
1444    for d in dists {
1445        out.push_str("<DistributionSummary>");
1446        out.push_str(&format!("<Id>{}</Id>", esc(&d.id)));
1447        out.push_str(&format!("<ARN>{}</ARN>", esc(&d.arn)));
1448        out.push_str(&format!("<Status>{}</Status>", esc(&d.status)));
1449        out.push_str(&format!(
1450            "<LastModifiedTime>{}</LastModifiedTime>",
1451            rfc3339(&d.last_modified_time)
1452        ));
1453        out.push_str(&format!("<DomainName>{}</DomainName>", esc(&d.domain_name)));
1454        let aliases = d.config.aliases.clone().unwrap_or_default();
1455        out.push_str(&render_inline("Aliases", &aliases));
1456        let origins = d.config.origins.clone();
1457        out.push_str(&render_inline("Origins", &origins));
1458        let dcb = d.config.default_cache_behavior.clone();
1459        out.push_str(&render_inline("DefaultCacheBehavior", &dcb));
1460        let cb = d.config.cache_behaviors.clone().unwrap_or_default();
1461        out.push_str(&render_inline("CacheBehaviors", &cb));
1462        let cer = d.config.custom_error_responses.clone().unwrap_or_default();
1463        out.push_str(&render_inline("CustomErrorResponses", &cer));
1464        out.push_str(&format!("<Comment>{}</Comment>", esc(&d.config.comment)));
1465        out.push_str(&format!(
1466            "<PriceClass>{}</PriceClass>",
1467            esc(&d
1468                .config
1469                .price_class
1470                .clone()
1471                .unwrap_or_else(|| "PriceClass_All".to_string()))
1472        ));
1473        out.push_str(&format!("<Enabled>{}</Enabled>", d.config.enabled));
1474        out.push_str(&render_inline(
1475            "ViewerCertificate",
1476            &d.config.viewer_certificate.clone().unwrap_or_default(),
1477        ));
1478        out.push_str(&render_inline(
1479            "Restrictions",
1480            &d.config.restrictions.clone().unwrap_or_default(),
1481        ));
1482        out.push_str(&format!(
1483            "<WebACLId>{}</WebACLId>",
1484            esc(&d.config.web_acl_id.clone().unwrap_or_default())
1485        ));
1486        out.push_str(&format!(
1487            "<HttpVersion>{}</HttpVersion>",
1488            esc(&d
1489                .config
1490                .http_version
1491                .clone()
1492                .unwrap_or_else(|| "http2".to_string()))
1493        ));
1494        out.push_str(&format!(
1495            "<IsIPV6Enabled>{}</IsIPV6Enabled>",
1496            d.config.is_ipv6_enabled.unwrap_or(true)
1497        ));
1498        out.push_str("<Staging>false</Staging>");
1499        out.push_str("</DistributionSummary>");
1500    }
1501    out.push_str("</Items>");
1502    out.push_str(&format!("</{root}>"));
1503    out
1504}
1505
1506fn build_empty_distribution_id_list(root: &str) -> String {
1507    let mut out = String::with_capacity(256);
1508    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1509    out.push_str(&format!("<{root} xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1510    out.push_str("<Marker></Marker>");
1511    out.push_str("<MaxItems>100</MaxItems>");
1512    out.push_str("<IsTruncated>false</IsTruncated>");
1513    out.push_str("<Quantity>0</Quantity>");
1514    out.push_str(&format!("</{root}>"));
1515    out
1516}
1517
1518fn build_invalidation_xml(inv: &StoredInvalidation) -> String {
1519    let mut out = String::with_capacity(512);
1520    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1521    out.push_str(&format!(
1522        "<Invalidation xmlns=\"{ns}\">",
1523        ns = crate::NAMESPACE
1524    ));
1525    out.push_str(&format!("<Id>{}</Id>", esc(&inv.id)));
1526    out.push_str(&format!("<Status>{}</Status>", esc(&inv.status)));
1527    out.push_str(&format!(
1528        "<CreateTime>{}</CreateTime>",
1529        rfc3339(&inv.create_time)
1530    ));
1531    out.push_str(&render_inline("InvalidationBatch", &inv.batch));
1532    out.push_str("</Invalidation>");
1533    out
1534}
1535
1536fn build_invalidation_list_xml(items: &[&StoredInvalidation]) -> String {
1537    let mut out = String::with_capacity(1024);
1538    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1539    out.push_str(&format!(
1540        "<InvalidationList xmlns=\"{ns}\">",
1541        ns = crate::NAMESPACE
1542    ));
1543    out.push_str("<Marker></Marker>");
1544    out.push_str("<MaxItems>100</MaxItems>");
1545    out.push_str("<IsTruncated>false</IsTruncated>");
1546    out.push_str(&format!("<Quantity>{}</Quantity>", items.len()));
1547    if !items.is_empty() {
1548        out.push_str("<Items>");
1549        for inv in items {
1550            out.push_str("<InvalidationSummary>");
1551            out.push_str(&format!("<Id>{}</Id>", esc(&inv.id)));
1552            out.push_str(&format!(
1553                "<CreateTime>{}</CreateTime>",
1554                rfc3339(&inv.create_time)
1555            ));
1556            out.push_str(&format!("<Status>{}</Status>", esc(&inv.status)));
1557            out.push_str("</InvalidationSummary>");
1558        }
1559        out.push_str("</Items>");
1560    }
1561    out.push_str("</InvalidationList>");
1562    out
1563}
1564
1565fn build_tags_xml(tags: &[Tag]) -> String {
1566    let mut out = String::with_capacity(256);
1567    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1568    out.push_str(&format!("<Tags xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1569    out.push_str("<Items>");
1570    for t in tags {
1571        out.push_str("<Tag>");
1572        out.push_str(&format!("<Key>{}</Key>", esc(&t.key)));
1573        if let Some(v) = &t.value {
1574            out.push_str(&format!("<Value>{}</Value>", esc(v)));
1575        }
1576        out.push_str("</Tag>");
1577    }
1578    out.push_str("</Items>");
1579    out.push_str("</Tags>");
1580    out
1581}
1582
1583fn render_inline<T: serde::Serialize>(root: &str, value: &T) -> String {
1584    quick_xml::se::to_string_with_root(root, value).unwrap_or_default()
1585}
1586
1587// ─── Helpers ──────────────────────────────────────────────────────────
1588
1589fn validate_caller_reference(s: &str) -> Result<(), AwsServiceError> {
1590    if s.is_empty() {
1591        return Err(invalid_argument("CallerReference is required"));
1592    }
1593    Ok(())
1594}
1595
1596fn validate_origins(config: &DistributionConfig) -> Result<(), AwsServiceError> {
1597    if config.origins.quantity < 1 {
1598        return Err(invalid_argument(
1599            "DistributionConfig.Origins must contain at least one origin",
1600        ));
1601    }
1602    Ok(())
1603}
1604
1605/// Compare two `DistributionConfig`s by serializing them to canonical XML
1606/// and comparing bytes. Used by `UpdateDistribution` to detect no-op writes
1607/// so the ETag stays stable when the caller PUTs the same config back.
1608/// Falls back to "not equal" if either serialization fails so we still
1609/// honor the request rather than silently swallow a write.
1610fn configs_equal(lhs: &DistributionConfig, rhs: &DistributionConfig) -> bool {
1611    let Ok(a) = xml_io::to_xml_root("DistributionConfig", lhs) else {
1612        return false;
1613    };
1614    let Ok(b) = xml_io::to_xml_root("DistributionConfig", rhs) else {
1615        return false;
1616    };
1617    a == b
1618}
1619
1620fn account_id(_req: &AwsRequest) -> &'static str {
1621    // Multi-account is wired through AwsRequest.account_id elsewhere; the
1622    // CloudFront control plane only uses the resolved id for the ARN
1623    // suffix. Until that field stabilizes for REST-XML we use the default
1624    // account ID consistently with the rest of the registered services.
1625    DEFAULT_ACCOUNT
1626}
1627
1628fn generate_distribution_id() -> String {
1629    // CloudFront IDs are 14-char base32-ish uppercase strings starting with E.
1630    let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1631    format!("E{}", &raw[..13])
1632}
1633
1634fn generate_invalidation_id() -> String {
1635    let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1636    format!("I{}", &raw[..13])
1637}
1638
1639pub(crate) fn generate_etag() -> String {
1640    let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1641    format!("E{}", &raw[..13])
1642}
1643
1644/// Generate an AWS-shaped CloudFront resource ID with the given prefix.
1645/// Used by Batch 2 policy resources (cache, origin request, response
1646/// headers, continuous deployment, OAC) so each gets a recognizable
1647/// alphabetic prefix in addition to the random suffix.
1648pub(crate) fn generate_id_with_prefix(prefix: &str) -> String {
1649    let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1650    let suffix_len = 14usize.saturating_sub(prefix.len()).min(raw.len());
1651    format!("{prefix}{}", &raw[..suffix_len])
1652}
1653
1654fn rfc3339(t: &chrono::DateTime<chrono::Utc>) -> String {
1655    t.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
1656}
1657
1658pub(crate) fn invalid_argument(msg: impl Into<String>) -> AwsServiceError {
1659    aws_error(StatusCode::BAD_REQUEST, "InvalidArgument", msg)
1660}
1661
1662fn no_such_distribution(id: &str) -> AwsServiceError {
1663    aws_error(
1664        StatusCode::NOT_FOUND,
1665        "NoSuchDistribution",
1666        format!("The specified distribution does not exist: {id}"),
1667    )
1668}
1669
1670fn no_such_invalidation(id: &str) -> AwsServiceError {
1671    aws_error(
1672        StatusCode::NOT_FOUND,
1673        "NoSuchInvalidation",
1674        format!("The specified invalidation does not exist: {id}"),
1675    )
1676}
1677
1678fn internal_error(msg: impl Into<String>) -> AwsServiceError {
1679    aws_error(StatusCode::INTERNAL_SERVER_ERROR, "InternalError", msg)
1680}
1681
1682pub(crate) fn aws_error(
1683    status: StatusCode,
1684    code: impl Into<String>,
1685    msg: impl Into<String>,
1686) -> AwsServiceError {
1687    AwsServiceError::aws_error(status, code.into(), msg)
1688}
1689
1690fn set_header(headers: &mut HeaderMap, name: HeaderName, value: &str) {
1691    if let Ok(v) = HeaderValue::from_str(value) {
1692        headers.insert(name, v);
1693    }
1694}
1695
1696pub(crate) fn xml_response(status: StatusCode, body: String, headers: HeaderMap) -> AwsResponse {
1697    AwsResponse {
1698        status,
1699        content_type: "text/xml".to_string(),
1700        body: ResponseBody::Bytes(Bytes::from(body)),
1701        headers,
1702    }
1703}
1704
1705fn empty_response(status: StatusCode) -> AwsResponse {
1706    AwsResponse {
1707        status,
1708        content_type: "text/xml".to_string(),
1709        body: ResponseBody::Bytes(Bytes::new()),
1710        headers: HeaderMap::new(),
1711    }
1712}
1713
1714fn percent_decode(input: &str) -> String {
1715    let mut out = String::with_capacity(input.len());
1716    let bytes = input.as_bytes();
1717    let mut i = 0;
1718    while i < bytes.len() {
1719        let b = bytes[i];
1720        if b == b'%' && i + 2 < bytes.len() {
1721            if let (Some(a), Some(c)) = (hex_digit(bytes[i + 1]), hex_digit(bytes[i + 2])) {
1722                out.push(((a << 4) | c) as char);
1723                i += 3;
1724                continue;
1725            }
1726        }
1727        if b == b'+' {
1728            out.push(' ');
1729        } else {
1730            out.push(b as char);
1731        }
1732        i += 1;
1733    }
1734    out
1735}
1736
1737fn hex_digit(b: u8) -> Option<u8> {
1738    match b {
1739        b'0'..=b'9' => Some(b - b'0'),
1740        b'a'..=b'f' => Some(b - b'a' + 10),
1741        b'A'..=b'F' => Some(b - b'A' + 10),
1742        _ => None,
1743    }
1744}
1745
1746/// True when a URL-decoded label segment looks like an unsubstituted Smithy
1747/// URI placeholder (e.g. `{Identifier}` or its percent-encoded form
1748/// `%7BIdentifier%7D`). Conformance probes that omit a required `@httpLabel`
1749/// input leave the literal placeholder in the URL; handlers that can
1750/// short-circuit on it return the declared validation error instead of a
1751/// fabricated 200.
1752pub(crate) fn is_placeholder_label(value: &str) -> bool {
1753    if value.is_empty() {
1754        return true;
1755    }
1756    let lower = value.to_ascii_lowercase();
1757    value.starts_with('{') || lower.starts_with("%7b")
1758}
1759
1760/// Best-effort field extractor for request bodies the probe sends as JSON
1761/// even when the service is REST-XML. Walks the body trying JSON first,
1762/// then a naive XML element scan. Returns the value for the first occurrence
1763/// of `key`. Used by handlers that need to validate optional/required body
1764/// members up-front (enum bounds, length, presence) without committing to a
1765/// full strongly-typed parse — the actual handler still uses the typed XML
1766/// parser for the structured fields it consumes.
1767pub(crate) fn extract_body_field(body: &[u8], key: &str) -> Option<String> {
1768    if let Ok(s) = std::str::from_utf8(body) {
1769        let trimmed = s.trim_start();
1770        if trimmed.starts_with('{') {
1771            if let Ok(v) = serde_json::from_str::<serde_json::Value>(trimmed) {
1772                if let Some(field) = v.get(key) {
1773                    return match field {
1774                        serde_json::Value::String(s) => Some(s.clone()),
1775                        serde_json::Value::Number(n) => Some(n.to_string()),
1776                        serde_json::Value::Bool(b) => Some(b.to_string()),
1777                        _ => None,
1778                    };
1779                }
1780                return None;
1781            }
1782        }
1783        // Fall back to a naive XML extraction for genuine XML bodies.
1784        let open = format!("<{key}>");
1785        let close = format!("</{key}>");
1786        if let Some(start) = s.find(&open) {
1787            let after = start + open.len();
1788            if let Some(end_rel) = s[after..].find(&close) {
1789                return Some(s[after..after + end_rel].to_string());
1790            }
1791        }
1792    }
1793    None
1794}
1795
1796fn parse_query_value(query: &str, key: &str) -> Option<String> {
1797    let prefix = format!("{key}=");
1798    for pair in query.split('&').filter(|p| !p.is_empty()) {
1799        if let Some(rest) = pair.strip_prefix(&prefix) {
1800            return Some(percent_decode(rest));
1801        }
1802    }
1803    None
1804}
1805
1806#[cfg(test)]
1807mod tests {
1808    use super::*;
1809
1810    #[test]
1811    fn placeholder_label_detects_braces_and_percent_encoding() {
1812        assert!(is_placeholder_label(""));
1813        assert!(is_placeholder_label("{Identifier}"));
1814        assert!(is_placeholder_label("%7BIdentifier%7D"));
1815        assert!(is_placeholder_label("%7bidentifier%7d"));
1816        assert!(!is_placeholder_label("E1234567890ABC"));
1817        assert!(!is_placeholder_label(
1818            "arn:aws:cloudfront::000:distribution/E1"
1819        ));
1820    }
1821
1822    #[test]
1823    fn extract_body_field_handles_json_and_xml() {
1824        let json = br#"{"Stage":"BROKEN","Marker":"x"}"#;
1825        assert_eq!(
1826            extract_body_field(json, "Stage"),
1827            Some("BROKEN".to_string())
1828        );
1829        assert_eq!(extract_body_field(json, "MaxItems"), None);
1830
1831        let xml = br#"<?xml version="1.0"?><Body><Domain>example.com</Domain></Body>"#;
1832        assert_eq!(
1833            extract_body_field(xml, "Domain"),
1834            Some("example.com".to_string())
1835        );
1836        assert_eq!(extract_body_field(xml, "Missing"), None);
1837
1838        assert_eq!(extract_body_field(b"", "x"), None);
1839    }
1840
1841    fn make_state() -> SharedCloudFrontState {
1842        Arc::new(RwLock::new(CloudFrontAccounts::new()))
1843    }
1844
1845    fn make_request(method: http::Method, path: &str, query: &str, body: &str) -> AwsRequest {
1846        AwsRequest {
1847            service: "cloudfront".into(),
1848            action: String::new(),
1849            region: "us-east-1".into(),
1850            account_id: DEFAULT_ACCOUNT.into(),
1851            request_id: Uuid::new_v4().to_string(),
1852            headers: HeaderMap::new(),
1853            query_params: std::collections::HashMap::new(),
1854            body_stream: parking_lot::Mutex::new(None),
1855            body: Bytes::from(body.to_string()),
1856            path_segments: path
1857                .split('/')
1858                .filter(|s| !s.is_empty())
1859                .map(String::from)
1860                .collect(),
1861            raw_path: path.into(),
1862            raw_query: query.into(),
1863            method,
1864            is_query_protocol: false,
1865            access_key_id: None,
1866            principal: None,
1867        }
1868    }
1869
1870    fn minimal_dist_config_xml(caller_ref: &str) -> String {
1871        format!(
1872            r#"<?xml version="1.0" encoding="UTF-8"?>
1873<DistributionConfig xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
1874  <CallerReference>{caller_ref}</CallerReference>
1875  <Origins>
1876    <Quantity>1</Quantity>
1877    <Items>
1878      <Origin>
1879        <Id>primary</Id>
1880        <DomainName>example.com</DomainName>
1881      </Origin>
1882    </Items>
1883  </Origins>
1884  <DefaultCacheBehavior>
1885    <TargetOriginId>primary</TargetOriginId>
1886    <ViewerProtocolPolicy>allow-all</ViewerProtocolPolicy>
1887  </DefaultCacheBehavior>
1888  <Comment></Comment>
1889  <Enabled>true</Enabled>
1890</DistributionConfig>"#
1891        )
1892    }
1893
1894    #[tokio::test]
1895    async fn create_then_get_then_delete_distribution() {
1896        let svc = CloudFrontService::new(make_state());
1897        let body = minimal_dist_config_xml("ref-1");
1898        let create = svc
1899            .handle(make_request(
1900                http::Method::POST,
1901                "/2020-05-31/distribution",
1902                "",
1903                &body,
1904            ))
1905            .await
1906            .unwrap();
1907        assert_eq!(create.status, StatusCode::CREATED);
1908        let etag = create
1909            .headers
1910            .get(ETAG)
1911            .unwrap()
1912            .to_str()
1913            .unwrap()
1914            .to_string();
1915        let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
1916        let id = xml
1917            .split("<Id>")
1918            .nth(1)
1919            .unwrap()
1920            .split("</Id>")
1921            .next()
1922            .unwrap()
1923            .to_string();
1924
1925        let get = svc
1926            .handle(make_request(
1927                http::Method::GET,
1928                &format!("/2020-05-31/distribution/{id}"),
1929                "",
1930                "",
1931            ))
1932            .await
1933            .unwrap();
1934        assert_eq!(get.status, StatusCode::OK);
1935
1936        // Disable then delete (CloudFront requires Disabled before delete).
1937        let disable_body = body.replace("<Enabled>true</Enabled>", "<Enabled>false</Enabled>");
1938        let mut update_req = make_request(
1939            http::Method::PUT,
1940            &format!("/2020-05-31/distribution/{id}/config"),
1941            "",
1942            &disable_body,
1943        );
1944        update_req.headers.insert(IF_MATCH, etag.parse().unwrap());
1945        let updated = svc.handle(update_req).await.unwrap();
1946        assert_eq!(updated.status, StatusCode::OK);
1947        let new_etag = updated
1948            .headers
1949            .get(ETAG)
1950            .unwrap()
1951            .to_str()
1952            .unwrap()
1953            .to_string();
1954
1955        let mut del_req = make_request(
1956            http::Method::DELETE,
1957            &format!("/2020-05-31/distribution/{id}"),
1958            "",
1959            "",
1960        );
1961        del_req.headers.insert(IF_MATCH, new_etag.parse().unwrap());
1962        let del = svc.handle(del_req).await.unwrap();
1963        assert_eq!(del.status, StatusCode::NO_CONTENT);
1964    }
1965
1966    async fn create_distribution_returning_id(svc: &CloudFrontService, caller_ref: &str) -> String {
1967        let body = minimal_dist_config_xml(caller_ref);
1968        let create = svc
1969            .handle(make_request(
1970                http::Method::POST,
1971                "/2020-05-31/distribution",
1972                "",
1973                &body,
1974            ))
1975            .await
1976            .unwrap();
1977        let xml = std::str::from_utf8(create.body.expect_bytes())
1978            .unwrap()
1979            .to_string();
1980        xml.split("<Id>")
1981            .nth(1)
1982            .unwrap()
1983            .split("</Id>")
1984            .next()
1985            .unwrap()
1986            .to_string()
1987    }
1988
1989    fn distribution_status(svc: &CloudFrontService, id: &str) -> String {
1990        let state = svc.state.read();
1991        state
1992            .accounts
1993            .get(DEFAULT_ACCOUNT)
1994            .and_then(|a| a.distributions.get(id))
1995            .map(|d| d.status.clone())
1996            .unwrap_or_default()
1997    }
1998
1999    #[tokio::test]
2000    async fn create_distribution_starts_in_progress() {
2001        // Use a long delay so the auto-tick can't race the assertion.
2002        let svc = CloudFrontService::new(make_state())
2003            .with_propagation_delay(std::time::Duration::from_secs(60));
2004        let body = minimal_dist_config_xml("status-ref");
2005        let create = svc
2006            .handle(make_request(
2007                http::Method::POST,
2008                "/2020-05-31/distribution",
2009                "",
2010                &body,
2011            ))
2012            .await
2013            .unwrap();
2014        let xml = std::str::from_utf8(create.body.expect_bytes())
2015            .unwrap()
2016            .to_string();
2017        assert!(
2018            xml.contains("<Status>InProgress</Status>"),
2019            "expected initial status InProgress, got: {xml}"
2020        );
2021    }
2022
2023    #[tokio::test]
2024    async fn auto_transition_after_tick_marks_deployed() {
2025        let svc = CloudFrontService::new(make_state())
2026            .with_propagation_delay(std::time::Duration::from_millis(50));
2027        let id = create_distribution_returning_id(&svc, "auto-tick-ref").await;
2028        assert_eq!(distribution_status(&svc, &id), "InProgress");
2029        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2030        assert_eq!(distribution_status(&svc, &id), "Deployed");
2031    }
2032
2033    #[tokio::test]
2034    async fn set_distribution_status_via_admin_flips_synchronously() {
2035        let svc = CloudFrontService::new(make_state())
2036            .with_propagation_delay(std::time::Duration::from_secs(60));
2037        let id = create_distribution_returning_id(&svc, "admin-ref").await;
2038        assert_eq!(distribution_status(&svc, &id), "InProgress");
2039        assert!(svc.set_distribution_status(&id, "Deployed"));
2040        assert_eq!(distribution_status(&svc, &id), "Deployed");
2041        assert!(svc.set_distribution_status(&id, "InProgress"));
2042        assert_eq!(distribution_status(&svc, &id), "InProgress");
2043    }
2044
2045    #[tokio::test]
2046    async fn set_distribution_status_unknown_id_returns_false() {
2047        let svc = CloudFrontService::new(make_state());
2048        assert!(!svc.set_distribution_status("E-DOES-NOT-EXIST", "Deployed"));
2049    }
2050
2051    #[tokio::test]
2052    async fn update_distribution_resets_to_in_progress() {
2053        let svc = CloudFrontService::new(make_state())
2054            .with_propagation_delay(std::time::Duration::from_secs(60));
2055        let body = minimal_dist_config_xml("update-reset-ref");
2056        let create = svc
2057            .handle(make_request(
2058                http::Method::POST,
2059                "/2020-05-31/distribution",
2060                "",
2061                &body,
2062            ))
2063            .await
2064            .unwrap();
2065        let etag = create
2066            .headers
2067            .get(ETAG)
2068            .unwrap()
2069            .to_str()
2070            .unwrap()
2071            .to_string();
2072        let xml = std::str::from_utf8(create.body.expect_bytes())
2073            .unwrap()
2074            .to_string();
2075        let id = xml
2076            .split("<Id>")
2077            .nth(1)
2078            .unwrap()
2079            .split("</Id>")
2080            .next()
2081            .unwrap()
2082            .to_string();
2083        // Force the distribution to Deployed via the admin mutator so we can
2084        // observe the UpdateDistribution flip back to InProgress.
2085        assert!(svc.set_distribution_status(&id, "Deployed"));
2086        assert_eq!(distribution_status(&svc, &id), "Deployed");
2087
2088        let updated_body = body.replace(
2089            "<ViewerProtocolPolicy>allow-all</ViewerProtocolPolicy>",
2090            "<ViewerProtocolPolicy>https-only</ViewerProtocolPolicy>",
2091        );
2092        let mut update_req = make_request(
2093            http::Method::PUT,
2094            &format!("/2020-05-31/distribution/{id}/config"),
2095            "",
2096            &updated_body,
2097        );
2098        update_req.headers.insert(IF_MATCH, etag.parse().unwrap());
2099        let updated = svc.handle(update_req).await.unwrap();
2100        assert_eq!(updated.status, StatusCode::OK);
2101        assert_eq!(distribution_status(&svc, &id), "InProgress");
2102    }
2103
2104    #[tokio::test]
2105    async fn duplicate_caller_reference_is_rejected() {
2106        let svc = CloudFrontService::new(make_state());
2107        let body = minimal_dist_config_xml("dup-ref");
2108        svc.handle(make_request(
2109            http::Method::POST,
2110            "/2020-05-31/distribution",
2111            "",
2112            &body,
2113        ))
2114        .await
2115        .unwrap();
2116        let result = svc
2117            .handle(make_request(
2118                http::Method::POST,
2119                "/2020-05-31/distribution",
2120                "",
2121                &body,
2122            ))
2123            .await;
2124        let err = match result {
2125            Ok(_) => panic!("expected duplicate caller-reference to fail"),
2126            Err(e) => e,
2127        };
2128        assert_eq!(err.code(), "DistributionAlreadyExists");
2129        assert_eq!(err.status(), StatusCode::CONFLICT);
2130    }
2131
2132    #[tokio::test]
2133    async fn invalidation_lifecycle() {
2134        let svc = CloudFrontService::new(make_state());
2135        let body = minimal_dist_config_xml("inv-ref");
2136        let create = svc
2137            .handle(make_request(
2138                http::Method::POST,
2139                "/2020-05-31/distribution",
2140                "",
2141                &body,
2142            ))
2143            .await
2144            .unwrap();
2145        let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2146        let dist_id = xml
2147            .split("<Id>")
2148            .nth(1)
2149            .unwrap()
2150            .split("</Id>")
2151            .next()
2152            .unwrap();
2153        let inv_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2154<InvalidationBatch xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2155  <Paths><Quantity>1</Quantity><Items><Path>/*</Path></Items></Paths>
2156  <CallerReference>inv-1</CallerReference>
2157</InvalidationBatch>"#;
2158        let inv_resp = svc
2159            .handle(make_request(
2160                http::Method::POST,
2161                &format!("/2020-05-31/distribution/{dist_id}/invalidation"),
2162                "",
2163                inv_body,
2164            ))
2165            .await
2166            .unwrap();
2167        assert_eq!(inv_resp.status, StatusCode::CREATED);
2168        let inv_xml = std::str::from_utf8(inv_resp.body.expect_bytes()).unwrap();
2169        let inv_id = inv_xml
2170            .split("<Id>")
2171            .nth(1)
2172            .unwrap()
2173            .split("</Id>")
2174            .next()
2175            .unwrap();
2176        let get = svc
2177            .handle(make_request(
2178                http::Method::GET,
2179                &format!("/2020-05-31/distribution/{dist_id}/invalidation/{inv_id}"),
2180                "",
2181                "",
2182            ))
2183            .await
2184            .unwrap();
2185        assert_eq!(get.status, StatusCode::OK);
2186        let list = svc
2187            .handle(make_request(
2188                http::Method::GET,
2189                &format!("/2020-05-31/distribution/{dist_id}/invalidation"),
2190                "",
2191                "",
2192            ))
2193            .await
2194            .unwrap();
2195        let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2196        assert!(xml.contains("<Quantity>1</Quantity>"));
2197    }
2198
2199    #[tokio::test]
2200    async fn tags_roundtrip() {
2201        let svc = CloudFrontService::new(make_state());
2202        let body = minimal_dist_config_xml("tag-ref");
2203        let create = svc
2204            .handle(make_request(
2205                http::Method::POST,
2206                "/2020-05-31/distribution",
2207                "",
2208                &body,
2209            ))
2210            .await
2211            .unwrap();
2212        let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2213        let arn = xml
2214            .split("<ARN>")
2215            .nth(1)
2216            .unwrap()
2217            .split("</ARN>")
2218            .next()
2219            .unwrap();
2220        let tag_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2221<Tags xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2222  <Items><Tag><Key>env</Key><Value>prod</Value></Tag></Items>
2223</Tags>"#;
2224        let arn_q = format!("Operation=Tag&Resource={}", arn);
2225        let resp = svc
2226            .handle(make_request(
2227                http::Method::POST,
2228                "/2020-05-31/tagging",
2229                &arn_q,
2230                tag_body,
2231            ))
2232            .await
2233            .unwrap();
2234        assert_eq!(resp.status, StatusCode::NO_CONTENT);
2235        let list = svc
2236            .handle(make_request(
2237                http::Method::GET,
2238                "/2020-05-31/tagging",
2239                &format!("Resource={}", arn),
2240                "",
2241            ))
2242            .await
2243            .unwrap();
2244        let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2245        assert!(xml.contains("<Key>env</Key>"));
2246        assert!(xml.contains("<Value>prod</Value>"));
2247    }
2248
2249    #[tokio::test]
2250    async fn xml_metacharacters_in_user_input_are_escaped() {
2251        let svc = CloudFrontService::new(make_state());
2252        let body = minimal_dist_config_xml("escape-ref").replace(
2253            "<Comment></Comment>",
2254            "<Comment><![CDATA[a&b<c>d]]></Comment>",
2255        );
2256        let create = svc
2257            .handle(make_request(
2258                http::Method::POST,
2259                "/2020-05-31/distribution",
2260                "",
2261                &body,
2262            ))
2263            .await
2264            .unwrap();
2265        let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2266        let dist_id = xml
2267            .split("<Id>")
2268            .nth(1)
2269            .unwrap()
2270            .split("</Id>")
2271            .next()
2272            .unwrap();
2273        let arn = xml
2274            .split("<ARN>")
2275            .nth(1)
2276            .unwrap()
2277            .split("</ARN>")
2278            .next()
2279            .unwrap();
2280
2281        let tag_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2282<Tags xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2283  <Items><Tag><Key>env</Key><Value>a&amp;b&lt;c&gt;d</Value></Tag></Items>
2284</Tags>"#;
2285        let arn_q = format!("Operation=Tag&Resource={}", arn);
2286        svc.handle(make_request(
2287            http::Method::POST,
2288            "/2020-05-31/tagging",
2289            &arn_q,
2290            tag_body,
2291        ))
2292        .await
2293        .unwrap();
2294
2295        let list = svc
2296            .handle(make_request(
2297                http::Method::GET,
2298                "/2020-05-31/tagging",
2299                &format!("Resource={}", arn),
2300                "",
2301            ))
2302            .await
2303            .unwrap();
2304        let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2305        assert!(xml.contains("<Value>a&amp;b&lt;c&gt;d</Value>"));
2306        assert!(!xml.contains("<Value>a&b<c>d</Value>"));
2307
2308        // Force the distribution into the list-rendering path so the
2309        // unescaped Comment field would surface there.
2310        let list_resp = svc
2311            .handle(make_request(
2312                http::Method::GET,
2313                "/2020-05-31/distribution",
2314                "",
2315                "",
2316            ))
2317            .await
2318            .unwrap();
2319        let xml = std::str::from_utf8(list_resp.body.expect_bytes()).unwrap();
2320        // Ensure raw `<` from the user-supplied comment never lands inside
2321        // a Comment element on the wire.
2322        assert!(!xml.contains("<Comment>a&b<c>d"));
2323        assert!(xml.contains(dist_id));
2324    }
2325}