Skip to main content

fakecloud_cloudfront/
service.rs

1//! CloudFront REST-XML service implementation.
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use chrono::Utc;
8use http::header::{HeaderName, HeaderValue, ETAG, IF_MATCH, LOCATION};
9use http::{HeaderMap, StatusCode};
10use parking_lot::RwLock;
11use tokio::sync::Mutex as AsyncMutex;
12use uuid::Uuid;
13
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError, ResponseBody};
15use fakecloud_persistence::SnapshotStore;
16
17use crate::model::{
18    DistributionConfig, DistributionConfigWithTags, InvalidationBatch, TagKeys, Tags as ModelTags,
19};
20use crate::router::{route, Route};
21use crate::state::{
22    CloudFrontAccounts, CloudFrontSnapshot, SharedCloudFrontState, StoredDistribution,
23    StoredInvalidation, Tag, CLOUDFRONT_SNAPSHOT_SCHEMA_VERSION,
24};
25use crate::xml_io;
26
27/// CloudFront mutating actions share these prefixes; everything else
28/// (`Get*`/`List*`/`Describe*`/`Test*`/`Verify*`) is read-only. Used to decide
29/// when a handled request must trigger a persistence snapshot.
30fn is_mutating_action(action: &str) -> bool {
31    const PREFIXES: &[&str] = &[
32        "Create",
33        "Update",
34        "Delete",
35        "Copy",
36        "Associate",
37        "Disassociate",
38        "Tag",
39        "Untag",
40        "Publish",
41        "Put",
42    ];
43    PREFIXES.iter().any(|p| action.starts_with(p))
44}
45
46pub(crate) const DEFAULT_ACCOUNT: &str = "000000000000";
47
48const SUPPORTED_ACTIONS: &[&str] = &[
49    "CreateDistribution",
50    "CreateDistributionWithTags",
51    "GetDistribution",
52    "GetDistributionConfig",
53    "UpdateDistribution",
54    "DeleteDistribution",
55    "ListDistributions",
56    "CopyDistribution",
57    "CreateInvalidation",
58    "GetInvalidation",
59    "ListInvalidations",
60    "TagResource",
61    "UntagResource",
62    "ListTagsForResource",
63    "AssociateAlias",
64    "ListConflictingAliases",
65    "ListDistributionsByCachePolicyId",
66    "ListDistributionsByOriginRequestPolicyId",
67    "ListDistributionsByResponseHeadersPolicyId",
68    "ListDistributionsByKeyGroup",
69    "ListDistributionsByWebACLId",
70    "ListDistributionsByVpcOriginId",
71    "ListDistributionsByAnycastIpListId",
72    "ListDistributionsByConnectionMode",
73    "ListDistributionsByConnectionFunction",
74    "ListDistributionsByOwnedResource",
75    "ListDistributionsByTrustStore",
76    "ListDistributionsByRealtimeLogConfig",
77    "AssociateDistributionWebACL",
78    "DisassociateDistributionWebACL",
79    "CreateOriginAccessControl",
80    "GetOriginAccessControl",
81    "GetOriginAccessControlConfig",
82    "UpdateOriginAccessControl",
83    "DeleteOriginAccessControl",
84    "ListOriginAccessControls",
85    "CreateCachePolicy",
86    "GetCachePolicy",
87    "GetCachePolicyConfig",
88    "UpdateCachePolicy",
89    "DeleteCachePolicy",
90    "ListCachePolicies",
91    "CreateOriginRequestPolicy",
92    "GetOriginRequestPolicy",
93    "GetOriginRequestPolicyConfig",
94    "UpdateOriginRequestPolicy",
95    "DeleteOriginRequestPolicy",
96    "ListOriginRequestPolicies",
97    "CreateResponseHeadersPolicy",
98    "GetResponseHeadersPolicy",
99    "GetResponseHeadersPolicyConfig",
100    "UpdateResponseHeadersPolicy",
101    "DeleteResponseHeadersPolicy",
102    "ListResponseHeadersPolicies",
103    "CreateContinuousDeploymentPolicy",
104    "GetContinuousDeploymentPolicy",
105    "GetContinuousDeploymentPolicyConfig",
106    "UpdateContinuousDeploymentPolicy",
107    "DeleteContinuousDeploymentPolicy",
108    "ListContinuousDeploymentPolicies",
109    "CreateFunction",
110    "DescribeFunction",
111    "GetFunction",
112    "UpdateFunction",
113    "DeleteFunction",
114    "ListFunctions",
115    "PublishFunction",
116    "TestFunction",
117    "CreatePublicKey",
118    "GetPublicKey",
119    "GetPublicKeyConfig",
120    "UpdatePublicKey",
121    "DeletePublicKey",
122    "ListPublicKeys",
123    "CreateKeyGroup",
124    "GetKeyGroup",
125    "GetKeyGroupConfig",
126    "UpdateKeyGroup",
127    "DeleteKeyGroup",
128    "ListKeyGroups",
129    "CreateKeyValueStore",
130    "DescribeKeyValueStore",
131    "UpdateKeyValueStore",
132    "DeleteKeyValueStore",
133    "ListKeyValueStores",
134    "CreateCloudFrontOriginAccessIdentity",
135    "GetCloudFrontOriginAccessIdentity",
136    "GetCloudFrontOriginAccessIdentityConfig",
137    "UpdateCloudFrontOriginAccessIdentity",
138    "DeleteCloudFrontOriginAccessIdentity",
139    "ListCloudFrontOriginAccessIdentities",
140    "CreateMonitoringSubscription",
141    "GetMonitoringSubscription",
142    "DeleteMonitoringSubscription",
143    "CreateStreamingDistribution",
144    "CreateStreamingDistributionWithTags",
145    "GetStreamingDistribution",
146    "GetStreamingDistributionConfig",
147    "UpdateStreamingDistribution",
148    "DeleteStreamingDistribution",
149    "ListStreamingDistributions",
150    "CreateFieldLevelEncryptionConfig",
151    "GetFieldLevelEncryption",
152    "GetFieldLevelEncryptionConfig",
153    "UpdateFieldLevelEncryptionConfig",
154    "DeleteFieldLevelEncryptionConfig",
155    "ListFieldLevelEncryptionConfigs",
156    "CreateFieldLevelEncryptionProfile",
157    "GetFieldLevelEncryptionProfile",
158    "GetFieldLevelEncryptionProfileConfig",
159    "UpdateFieldLevelEncryptionProfile",
160    "DeleteFieldLevelEncryptionProfile",
161    "ListFieldLevelEncryptionProfiles",
162    "CreateRealtimeLogConfig",
163    "GetRealtimeLogConfig",
164    "UpdateRealtimeLogConfig",
165    "DeleteRealtimeLogConfig",
166    "ListRealtimeLogConfigs",
167    "CreateVpcOrigin",
168    "GetVpcOrigin",
169    "UpdateVpcOrigin",
170    "DeleteVpcOrigin",
171    "ListVpcOrigins",
172    "CreateAnycastIpList",
173    "GetAnycastIpList",
174    "UpdateAnycastIpList",
175    "DeleteAnycastIpList",
176    "ListAnycastIpLists",
177    "CreateTrustStore",
178    "GetTrustStore",
179    "UpdateTrustStore",
180    "DeleteTrustStore",
181    "ListTrustStores",
182    "GetResourcePolicy",
183    "PutResourcePolicy",
184    "DeleteResourcePolicy",
185    "CreateConnectionGroup",
186    "GetConnectionGroup",
187    "GetConnectionGroupByRoutingEndpoint",
188    "UpdateConnectionGroup",
189    "DeleteConnectionGroup",
190    "ListConnectionGroups",
191    "ListDomainConflicts",
192    "UpdateDomainAssociation",
193    "VerifyDnsConfiguration",
194    "GetManagedCertificateDetails",
195    "UpdateDistributionWithStagingConfig",
196    "CreateDistributionTenant",
197    "GetDistributionTenant",
198    "GetDistributionTenantByDomain",
199    "UpdateDistributionTenant",
200    "DeleteDistributionTenant",
201    "ListDistributionTenants",
202    "ListDistributionTenantsByCustomization",
203    "AssociateDistributionTenantWebACL",
204    "DisassociateDistributionTenantWebACL",
205    "CreateInvalidationForDistributionTenant",
206    "GetInvalidationForDistributionTenant",
207    "ListInvalidationsForDistributionTenant",
208    "CreateConnectionFunction",
209    "GetConnectionFunction",
210    "DescribeConnectionFunction",
211    "UpdateConnectionFunction",
212    "DeleteConnectionFunction",
213    "ListConnectionFunctions",
214    "PublishConnectionFunction",
215    "TestConnectionFunction",
216];
217
218pub struct CloudFrontService {
219    pub(crate) state: SharedCloudFrontState,
220    /// How long the propagation tick sleeps before flipping a freshly
221    /// created or updated Distribution / DistributionTenant /
222    /// ConnectionGroup / StreamingDistribution from `InProgress` to
223    /// `Deployed`. Real CloudFront takes ~15 minutes; the default of 1s
224    /// keeps SDK-driven integration tests fast while still simulating the
225    /// async transition. Tests can shrink it via
226    /// [`CloudFrontService::with_propagation_delay`], and operators can
227    /// override the default via the
228    /// `FAKECLOUD_CLOUDFRONT_STATUS_DELAY_SEC` env var.
229    pub(crate) propagation_delay: std::time::Duration,
230    pub(crate) snapshot_store: Option<Arc<dyn SnapshotStore>>,
231    pub(crate) snapshot_lock: Arc<AsyncMutex<()>>,
232    /// Guards [`CloudFrontService::start_dataplane`] so repeated calls don't
233    /// spawn duplicate supervisor loops racing to bind listeners.
234    dataplane_started: Arc<std::sync::atomic::AtomicBool>,
235}
236
237/// Resolve the default `InProgress` -> `Deployed` delay from the
238/// `FAKECLOUD_CLOUDFRONT_STATUS_DELAY_SEC` env var, falling back to 1
239/// second. The value parses as either a non-negative integer (seconds) or
240/// a floating-point number; `0` keeps the transition synchronous, which is
241/// useful for fast unit tests. Invalid input falls back to 1 second so a
242/// typo in the env var can't accidentally hang the process.
243fn default_propagation_delay() -> std::time::Duration {
244    let Ok(raw) = std::env::var("FAKECLOUD_CLOUDFRONT_STATUS_DELAY_SEC") else {
245        return std::time::Duration::from_secs(1);
246    };
247    let trimmed = raw.trim();
248    if let Ok(secs) = trimmed.parse::<u64>() {
249        return std::time::Duration::from_secs(secs);
250    }
251    if let Ok(secs) = trimmed.parse::<f64>() {
252        if secs.is_finite() && secs >= 0.0 {
253            return std::time::Duration::from_secs_f64(secs);
254        }
255    }
256    std::time::Duration::from_secs(1)
257}
258
259impl CloudFrontService {
260    pub fn new(state: SharedCloudFrontState) -> Self {
261        Self {
262            state,
263            propagation_delay: default_propagation_delay(),
264            snapshot_store: None,
265            snapshot_lock: Arc::new(AsyncMutex::new(())),
266            dataplane_started: Arc::new(std::sync::atomic::AtomicBool::new(false)),
267        }
268    }
269
270    /// Start the in-process data-plane supervisor (opt-in). `new` constructs
271    /// the service without a data plane; the server calls this once after
272    /// wiring so that enabled distributions actually serve viewer traffic. A
273    /// no-op when disabled via `FAKECLOUD_CLOUDFRONT_DISABLE_DATAPLANE`. Must be
274    /// called from within a Tokio runtime (it spawns the supervisor task).
275    /// `server_port` is fakecloud's own listen port, used to reach S3-website
276    /// origins served by this process.
277    pub fn start_dataplane(&self, server_port: u16) {
278        use std::sync::atomic::Ordering;
279        // Idempotent: only the first call spawns the supervisor. Repeat calls
280        // would otherwise start racing loops that both bind listeners and
281        // clobber each distribution's `bound_port`.
282        if self
283            .dataplane_started
284            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
285            .is_err()
286        {
287            return;
288        }
289        crate::dataplane::spawn_dataplane(self.state.clone(), server_port);
290    }
291
292    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
293        self.snapshot_store = Some(store);
294        self
295    }
296
297    pub fn shared_state(&self) -> SharedCloudFrontState {
298        Arc::clone(&self.state)
299    }
300
301    /// Persist current state as a snapshot. Held across the
302    /// clone-serialize-write sequence to prevent stale-last writes, with serde
303    /// + file I/O offloaded to the blocking pool.
304    async fn save_snapshot(&self) {
305        save_cloudfront_snapshot(
306            &self.state,
307            self.snapshot_store.clone(),
308            &self.snapshot_lock,
309        )
310        .await;
311    }
312
313    /// Build a hook that persists the current CloudFront state when invoked, or
314    /// `None` in memory mode. The CloudFormation provisioner mutates `state`
315    /// directly and uses this to write a CFN-provisioned resource through to
316    /// disk, the same way a direct mutating API call would.
317    pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
318        let store = self.snapshot_store.clone()?;
319        let state = self.state.clone();
320        let lock = self.snapshot_lock.clone();
321        Some(Arc::new(move || {
322            let state = state.clone();
323            let store = store.clone();
324            let lock = lock.clone();
325            Box::pin(async move {
326                save_cloudfront_snapshot(&state, Some(store), &lock).await;
327            })
328        }))
329    }
330
331    /// Re-arm the propagation tick for any Distribution / DistributionTenant /
332    /// ConnectionGroup / StreamingDistribution that was still `InProgress` when
333    /// the previous process exited, so they still transition to `Deployed`
334    /// after a restart. Called by the server after loading a snapshot.
335    pub fn rearm_in_progress(&self) {
336        let (dists, tenants, groups, streaming) = {
337            let s = self.state.read();
338            let mut dists = Vec::new();
339            let mut tenants = Vec::new();
340            let mut groups = Vec::new();
341            let mut streaming = Vec::new();
342            for account in s.accounts.values() {
343                for (id, d) in &account.distributions {
344                    if d.status == "InProgress" {
345                        dists.push(id.clone());
346                    }
347                }
348                for (id, t) in &account.distribution_tenants {
349                    if t.status == "InProgress" {
350                        tenants.push(id.clone());
351                    }
352                }
353                for (id, g) in &account.connection_groups {
354                    if g.status == "InProgress" {
355                        groups.push(id.clone());
356                    }
357                }
358                for (id, d) in &account.streaming_distributions {
359                    if d.status == "InProgress" {
360                        streaming.push(id.clone());
361                    }
362                }
363            }
364            (dists, tenants, groups, streaming)
365        };
366        for id in dists {
367            self.schedule_distribution_deploy(id);
368        }
369        for id in tenants {
370            self.schedule_distribution_tenant_deploy(id);
371        }
372        for id in groups {
373            self.schedule_connection_group_deploy(id);
374        }
375        for id in streaming {
376            self.schedule_streaming_distribution_deploy(id);
377        }
378    }
379
380    /// Override the propagation delay used by `CreateDistribution`,
381    /// `UpdateDistribution`, and the equivalent DistributionTenant /
382    /// ConnectionGroup ops. Tests pass a small duration so they don't
383    /// have to wait wall-clock seconds for the `InProgress` -> `Deployed`
384    /// transition.
385    pub fn with_propagation_delay(mut self, delay: std::time::Duration) -> Self {
386        self.propagation_delay = delay;
387        self
388    }
389
390    /// Flip a stored Distribution's status synchronously. Returns `false`
391    /// if no distribution matches `id` across any account. The admin
392    /// endpoint `POST /_fakecloud/cloudfront/distributions/{id}/status`
393    /// calls this so tests can force `InProgress` <-> `Deployed` without
394    /// waiting on the propagation tick.
395    pub fn set_distribution_status(&self, id: &str, status: &str) -> bool {
396        let mut state = self.state.write();
397        for account in state.accounts.values_mut() {
398            if let Some(dist) = account.distributions.get_mut(id) {
399                dist.status = status.to_string();
400                return true;
401            }
402        }
403        false
404    }
405
406    /// Admin-endpoint flavour of [`set_distribution_status`](Self::set_distribution_status)
407    /// that persists the forced status so it survives a restart.
408    pub async fn set_distribution_status_persistent(&self, id: &str, status: &str) -> bool {
409        let changed = self.set_distribution_status(id, status);
410        if changed {
411            self.save_snapshot().await;
412        }
413        changed
414    }
415}
416
417/// Persist the current CloudFront state as a snapshot. Offloads the serde +
418/// blocking file write to the Tokio blocking pool. Noop when `store` is `None`
419/// (memory mode). Shared by `CloudFrontService::save_snapshot`, the propagation
420/// ticks, and the CloudFormation provisioner persist hook so all route through
421/// the same serialize-and-write path.
422pub async fn save_cloudfront_snapshot(
423    state: &SharedCloudFrontState,
424    store: Option<Arc<dyn SnapshotStore>>,
425    lock: &AsyncMutex<()>,
426) {
427    let Some(store) = store else {
428        return;
429    };
430    let _guard = lock.lock().await;
431    let snapshot = CloudFrontSnapshot {
432        schema_version: CLOUDFRONT_SNAPSHOT_SCHEMA_VERSION,
433        accounts: Some(state.read().clone()),
434    };
435    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
436        let bytes = serde_json::to_vec(&snapshot)
437            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
438        store.save(&bytes)
439    })
440    .await;
441    match join {
442        Ok(Ok(())) => {}
443        Ok(Err(err)) => tracing::error!(%err, "failed to write cloudfront snapshot"),
444        Err(err) => tracing::error!(%err, "cloudfront snapshot task panicked"),
445    }
446}
447
448impl Default for CloudFrontService {
449    fn default() -> Self {
450        Self::new(Arc::new(RwLock::new(CloudFrontAccounts::new())))
451    }
452}
453
454#[async_trait]
455impl AwsService for CloudFrontService {
456    fn service_name(&self) -> &str {
457        "cloudfront"
458    }
459
460    fn supported_actions(&self) -> &[&str] {
461        SUPPORTED_ACTIONS
462    }
463
464    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
465        let resolved = match route(&req.method, &req.raw_path, &req.raw_query) {
466            Some(r) => r,
467            None => {
468                return Err(aws_error(
469                    StatusCode::NOT_FOUND,
470                    "InvalidArgument",
471                    format!("Unknown CloudFront route: {} {}", req.method, req.raw_path),
472                ));
473            }
474        };
475
476        let mutates = is_mutating_action(resolved.action);
477        let result = match resolved.action {
478            "CreateDistribution" => self.create_distribution(&req, false),
479            "CreateDistributionWithTags" => self.create_distribution(&req, true),
480            "GetDistribution" => self.get_distribution(&resolved),
481            "GetDistributionConfig" => self.get_distribution_config(&resolved),
482            "UpdateDistribution" => self.update_distribution(&req, &resolved),
483            "DeleteDistribution" => self.delete_distribution(&req, &resolved),
484            "ListDistributions" => self.list_distributions(&req),
485            "CopyDistribution" => self.copy_distribution(&req, &resolved),
486            "CreateInvalidation" => self.create_invalidation(&req, &resolved),
487            "GetInvalidation" => self.get_invalidation(&resolved),
488            "ListInvalidations" => self.list_invalidations(&resolved),
489            "TagResource" => self.tag_resource(&req),
490            "UntagResource" => self.untag_resource(&req),
491            "ListTagsForResource" => self.list_tags_for_resource(&req),
492            "AssociateAlias" => self.associate_alias(&req, &resolved),
493            "ListConflictingAliases" => self.list_conflicting_aliases(&req),
494            "AssociateDistributionWebACL" => self.associate_web_acl(&req, &resolved),
495            "DisassociateDistributionWebACL" => self.disassociate_web_acl(&req, &resolved),
496            "ListDistributionsByCachePolicyId"
497            | "ListDistributionsByOriginRequestPolicyId"
498            | "ListDistributionsByResponseHeadersPolicyId"
499            | "ListDistributionsByKeyGroup"
500            | "ListDistributionsByWebACLId"
501            | "ListDistributionsByVpcOriginId"
502            | "ListDistributionsByAnycastIpListId"
503            | "ListDistributionsByConnectionMode"
504            | "ListDistributionsByConnectionFunction"
505            | "ListDistributionsByOwnedResource"
506            | "ListDistributionsByTrustStore"
507            | "ListDistributionsByRealtimeLogConfig" => {
508                self.list_distributions_by(&req, &resolved, resolved.action)
509            }
510            "CreateOriginAccessControl" => self.create_origin_access_control(&req),
511            "GetOriginAccessControl" => self.get_origin_access_control(&resolved),
512            "GetOriginAccessControlConfig" => self.get_origin_access_control_config(&resolved),
513            "UpdateOriginAccessControl" => self.update_origin_access_control(&req, &resolved),
514            "DeleteOriginAccessControl" => self.delete_origin_access_control(&req, &resolved),
515            "ListOriginAccessControls" => self.list_origin_access_controls(&req),
516            "CreateCachePolicy" => self.create_cache_policy(&req),
517            "GetCachePolicy" => self.get_cache_policy(&resolved),
518            "GetCachePolicyConfig" => self.get_cache_policy_config(&resolved),
519            "UpdateCachePolicy" => self.update_cache_policy(&req, &resolved),
520            "DeleteCachePolicy" => self.delete_cache_policy(&req, &resolved),
521            "ListCachePolicies" => self.list_cache_policies(&req),
522            "CreateOriginRequestPolicy" => self.create_origin_request_policy(&req),
523            "GetOriginRequestPolicy" => self.get_origin_request_policy(&resolved),
524            "GetOriginRequestPolicyConfig" => self.get_origin_request_policy_config(&resolved),
525            "UpdateOriginRequestPolicy" => self.update_origin_request_policy(&req, &resolved),
526            "DeleteOriginRequestPolicy" => self.delete_origin_request_policy(&req, &resolved),
527            "ListOriginRequestPolicies" => self.list_origin_request_policies(&req),
528            "CreateResponseHeadersPolicy" => self.create_response_headers_policy(&req),
529            "GetResponseHeadersPolicy" => self.get_response_headers_policy(&resolved),
530            "GetResponseHeadersPolicyConfig" => self.get_response_headers_policy_config(&resolved),
531            "UpdateResponseHeadersPolicy" => self.update_response_headers_policy(&req, &resolved),
532            "DeleteResponseHeadersPolicy" => self.delete_response_headers_policy(&req, &resolved),
533            "ListResponseHeadersPolicies" => self.list_response_headers_policies(&req),
534            "CreateContinuousDeploymentPolicy" => self.create_continuous_deployment_policy(&req),
535            "GetContinuousDeploymentPolicy" => self.get_continuous_deployment_policy(&resolved),
536            "GetContinuousDeploymentPolicyConfig" => {
537                self.get_continuous_deployment_policy_config(&resolved)
538            }
539            "UpdateContinuousDeploymentPolicy" => {
540                self.update_continuous_deployment_policy(&req, &resolved)
541            }
542            "DeleteContinuousDeploymentPolicy" => {
543                self.delete_continuous_deployment_policy(&req, &resolved)
544            }
545            "ListContinuousDeploymentPolicies" => self.list_continuous_deployment_policies(&req),
546            "CreateFunction" => self.create_function(&req),
547            "DescribeFunction" => self.describe_function(&req, &resolved),
548            "GetFunction" => self.get_function(&req, &resolved),
549            "UpdateFunction" => self.update_function(&req, &resolved),
550            "DeleteFunction" => self.delete_function(&req, &resolved),
551            "ListFunctions" => self.list_functions(&req),
552            "PublishFunction" => self.publish_function(&req, &resolved),
553            "TestFunction" => self.test_function(&req, &resolved),
554            "CreatePublicKey" => self.create_public_key(&req),
555            "GetPublicKey" => self.get_public_key(&resolved),
556            "GetPublicKeyConfig" => self.get_public_key_config(&resolved),
557            "UpdatePublicKey" => self.update_public_key(&req, &resolved),
558            "DeletePublicKey" => self.delete_public_key(&req, &resolved),
559            "ListPublicKeys" => self.list_public_keys(&req),
560            "CreateKeyGroup" => self.create_key_group(&req),
561            "GetKeyGroup" => self.get_key_group(&resolved),
562            "GetKeyGroupConfig" => self.get_key_group_config(&resolved),
563            "UpdateKeyGroup" => self.update_key_group(&req, &resolved),
564            "DeleteKeyGroup" => self.delete_key_group(&req, &resolved),
565            "ListKeyGroups" => self.list_key_groups(&req),
566            "CreateKeyValueStore" => self.create_key_value_store(&req),
567            "DescribeKeyValueStore" => self.describe_key_value_store(&resolved),
568            "UpdateKeyValueStore" => self.update_key_value_store(&req, &resolved),
569            "DeleteKeyValueStore" => self.delete_key_value_store(&req, &resolved),
570            "ListKeyValueStores" => self.list_key_value_stores(&req),
571            "CreateCloudFrontOriginAccessIdentity" => self.create_oai(&req),
572            "GetCloudFrontOriginAccessIdentity" => self.get_oai(&resolved),
573            "GetCloudFrontOriginAccessIdentityConfig" => self.get_oai_config(&resolved),
574            "UpdateCloudFrontOriginAccessIdentity" => self.update_oai(&req, &resolved),
575            "DeleteCloudFrontOriginAccessIdentity" => self.delete_oai(&req, &resolved),
576            "ListCloudFrontOriginAccessIdentities" => self.list_oai(&req),
577            "CreateMonitoringSubscription" => self.create_monitoring_subscription(&req, &resolved),
578            "GetMonitoringSubscription" => self.get_monitoring_subscription(&resolved),
579            "DeleteMonitoringSubscription" => self.delete_monitoring_subscription(&resolved),
580            "CreateStreamingDistribution" => self.create_streaming_distribution(&req, false),
581            "CreateStreamingDistributionWithTags" => self.create_streaming_distribution(&req, true),
582            "GetStreamingDistribution" => self.get_streaming_distribution(&resolved),
583            "GetStreamingDistributionConfig" => self.get_streaming_distribution_config(&resolved),
584            "UpdateStreamingDistribution" => self.update_streaming_distribution(&req, &resolved),
585            "DeleteStreamingDistribution" => self.delete_streaming_distribution(&req, &resolved),
586            "ListStreamingDistributions" => self.list_streaming_distributions(&req),
587            "CreateFieldLevelEncryptionConfig" => self.create_field_level_encryption_config(&req),
588            "GetFieldLevelEncryption" => self.get_field_level_encryption(&resolved),
589            "GetFieldLevelEncryptionConfig" => self.get_field_level_encryption_config(&resolved),
590            "UpdateFieldLevelEncryptionConfig" => {
591                self.update_field_level_encryption_config(&req, &resolved)
592            }
593            "DeleteFieldLevelEncryptionConfig" => {
594                self.delete_field_level_encryption_config(&req, &resolved)
595            }
596            "ListFieldLevelEncryptionConfigs" => self.list_field_level_encryption_configs(&req),
597            "CreateFieldLevelEncryptionProfile" => self.create_field_level_encryption_profile(&req),
598            "GetFieldLevelEncryptionProfile" => self.get_field_level_encryption_profile(&resolved),
599            "GetFieldLevelEncryptionProfileConfig" => {
600                self.get_field_level_encryption_profile_config(&resolved)
601            }
602            "UpdateFieldLevelEncryptionProfile" => {
603                self.update_field_level_encryption_profile(&req, &resolved)
604            }
605            "DeleteFieldLevelEncryptionProfile" => {
606                self.delete_field_level_encryption_profile(&req, &resolved)
607            }
608            "ListFieldLevelEncryptionProfiles" => self.list_field_level_encryption_profiles(&req),
609            "CreateRealtimeLogConfig" => self.create_realtime_log_config(&req),
610            "GetRealtimeLogConfig" => self.get_realtime_log_config(&req),
611            "UpdateRealtimeLogConfig" => self.update_realtime_log_config(&req),
612            "DeleteRealtimeLogConfig" => self.delete_realtime_log_config(&req),
613            "ListRealtimeLogConfigs" => self.list_realtime_log_configs(&req),
614            "CreateVpcOrigin" => self.create_vpc_origin(&req),
615            "GetVpcOrigin" => self.get_vpc_origin(&resolved),
616            "UpdateVpcOrigin" => self.update_vpc_origin(&req, &resolved),
617            "DeleteVpcOrigin" => self.delete_vpc_origin(&req, &resolved),
618            "ListVpcOrigins" => self.list_vpc_origins(&req),
619            "CreateAnycastIpList" => self.create_anycast_ip_list(&req),
620            "GetAnycastIpList" => self.get_anycast_ip_list(&resolved),
621            "UpdateAnycastIpList" => self.update_anycast_ip_list(&req, &resolved),
622            "DeleteAnycastIpList" => self.delete_anycast_ip_list(&req, &resolved),
623            "ListAnycastIpLists" => self.list_anycast_ip_lists(&req),
624            "CreateTrustStore" => self.create_trust_store(&req),
625            "GetTrustStore" => self.get_trust_store(&resolved),
626            "UpdateTrustStore" => self.update_trust_store(&req, &resolved),
627            "DeleteTrustStore" => self.delete_trust_store(&req, &resolved),
628            "ListTrustStores" => self.list_trust_stores(&req),
629            "GetResourcePolicy" => self.get_resource_policy(&req),
630            "PutResourcePolicy" => self.put_resource_policy(&req),
631            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
632            "CreateConnectionGroup" => self.create_connection_group(&req),
633            "GetConnectionGroup" => self.get_connection_group(&resolved),
634            "GetConnectionGroupByRoutingEndpoint" => {
635                self.get_connection_group_by_routing_endpoint(&req)
636            }
637            "UpdateConnectionGroup" => self.update_connection_group(&req, &resolved),
638            "DeleteConnectionGroup" => self.delete_connection_group(&req, &resolved),
639            "ListConnectionGroups" => self.list_connection_groups(&req),
640            "ListDomainConflicts" => self.list_domain_conflicts(&req),
641            "UpdateDomainAssociation" => self.update_domain_association(&req),
642            "VerifyDnsConfiguration" => self.verify_dns_configuration(&req),
643            "GetManagedCertificateDetails" => self.get_managed_certificate_details(&resolved),
644            "UpdateDistributionWithStagingConfig" => {
645                self.update_distribution_with_staging_config(&req, &resolved)
646            }
647            "CreateDistributionTenant" => self.create_distribution_tenant(&req),
648            "GetDistributionTenant" => self.get_distribution_tenant(&resolved),
649            "GetDistributionTenantByDomain" => self.get_distribution_tenant_by_domain(&req),
650            "UpdateDistributionTenant" => self.update_distribution_tenant(&req, &resolved),
651            "DeleteDistributionTenant" => self.delete_distribution_tenant(&req, &resolved),
652            "ListDistributionTenants" => self.list_distribution_tenants(&req),
653            "ListDistributionTenantsByCustomization" => {
654                self.list_distribution_tenants_by_customization(&req)
655            }
656            "AssociateDistributionTenantWebACL" => {
657                self.associate_distribution_tenant_web_acl(&req, &resolved)
658            }
659            "DisassociateDistributionTenantWebACL" => {
660                self.disassociate_distribution_tenant_web_acl(&req, &resolved)
661            }
662            "CreateInvalidationForDistributionTenant" => {
663                self.create_invalidation_for_distribution_tenant(&req, &resolved)
664            }
665            "GetInvalidationForDistributionTenant" => {
666                self.get_invalidation_for_distribution_tenant(&resolved)
667            }
668            "ListInvalidationsForDistributionTenant" => {
669                self.list_invalidations_for_distribution_tenant(&resolved)
670            }
671            "CreateConnectionFunction" => self.create_connection_function(&req),
672            "GetConnectionFunction" => self.get_connection_function(&resolved),
673            "DescribeConnectionFunction" => self.describe_connection_function(&resolved),
674            "UpdateConnectionFunction" => self.update_connection_function(&req, &resolved),
675            "DeleteConnectionFunction" => self.delete_connection_function(&req, &resolved),
676            "ListConnectionFunctions" => self.list_connection_functions(&req),
677            "PublishConnectionFunction" => self.publish_connection_function(&req, &resolved),
678            "TestConnectionFunction" => self.test_connection_function(&req, &resolved),
679            other => Err(aws_error(
680                StatusCode::NOT_IMPLEMENTED,
681                "InvalidAction",
682                format!("CloudFront action {other} is not implemented yet"),
683            )),
684        };
685        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
686            self.save_snapshot().await;
687        }
688        result
689    }
690}
691
692// ─── Distribution handlers ────────────────────────────────────────────
693
694impl CloudFrontService {
695    fn create_distribution(
696        &self,
697        req: &AwsRequest,
698        with_tags: bool,
699    ) -> Result<AwsResponse, AwsServiceError> {
700        let (config, tags) = if with_tags {
701            let parsed: DistributionConfigWithTags = xml_io::from_xml_root(&req.body)
702                .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
703            let tags = parsed
704                .tags
705                .items
706                .map(|i| {
707                    i.tag
708                        .into_iter()
709                        .map(|t| Tag {
710                            key: t.key,
711                            value: t.value,
712                        })
713                        .collect()
714                })
715                .unwrap_or_default();
716            (parsed.distribution_config, tags)
717        } else {
718            let parsed: DistributionConfig = xml_io::from_xml_root(&req.body)
719                .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
720            (parsed, Vec::new())
721        };
722
723        validate_caller_reference(&config.caller_reference)?;
724        validate_origins(&config)?;
725
726        let mut state = self.state.write();
727        let account = state.entry(account_id(req));
728
729        if let Some(existing) = account
730            .distributions
731            .values()
732            .find(|d| d.config.caller_reference == config.caller_reference)
733        {
734            return Err(aws_error(
735                StatusCode::CONFLICT,
736                "DistributionAlreadyExists",
737                format!(
738                    "Distribution with the same CallerReference exists: {}",
739                    existing.id
740                ),
741            ));
742        }
743
744        let id = generate_distribution_id();
745        let now = Utc::now();
746        let etag = generate_etag();
747        let domain = format!("{}.cloudfront.net", id.to_lowercase());
748        let arn = format!(
749            "arn:aws:cloudfront::{}:distribution/{}",
750            account_id(req),
751            id
752        );
753
754        let stored = StoredDistribution {
755            id: id.clone(),
756            arn: arn.clone(),
757            // Real CloudFront returns InProgress immediately and flips to
758            // Deployed ~15 minutes later once the edge propagation completes.
759            // The spawn below mirrors that lifecycle on a configurable delay.
760            status: "InProgress".to_string(),
761            last_modified_time: now,
762            domain_name: domain,
763            in_progress_invalidation_batches: 0,
764            etag: etag.clone(),
765            config,
766            bound_port: None,
767        };
768        account.distributions.insert(id.clone(), stored.clone());
769        if !tags.is_empty() {
770            account.tags.insert(arn.clone(), tags);
771        }
772        drop(state);
773
774        self.schedule_distribution_deploy(id.clone());
775
776        let body = build_distribution_xml(&stored);
777        let mut headers = HeaderMap::new();
778        set_header(&mut headers, ETAG, &etag);
779        set_header(&mut headers, LOCATION, &stored.arn);
780        Ok(xml_response(StatusCode::CREATED, body, headers))
781    }
782
783    /// Spawn a tokio task that flips the named distribution from
784    /// `InProgress` to `Deployed` after `propagation_delay`. Used by
785    /// `CreateDistribution` and `UpdateDistribution` to model the real
786    /// CloudFront edge propagation lifecycle.
787    fn schedule_distribution_deploy(&self, id: String) {
788        let state = Arc::clone(&self.state);
789        let delay = self.propagation_delay;
790        let store = self.snapshot_store.clone();
791        let lock = self.snapshot_lock.clone();
792        tokio::spawn(async move {
793            tokio::time::sleep(delay).await;
794            let flipped = {
795                let mut s = state.write();
796                let mut flipped = false;
797                for account in s.accounts.values_mut() {
798                    if let Some(d) = account.distributions.get_mut(&id) {
799                        if d.status == "InProgress" {
800                            d.status = "Deployed".to_string();
801                            flipped = true;
802                        }
803                        break;
804                    }
805                }
806                flipped
807            };
808            if flipped {
809                save_cloudfront_snapshot(&state, store, &lock).await;
810            }
811        });
812    }
813
814    /// Same as [`schedule_distribution_deploy`] but for DistributionTenants.
815    pub(crate) fn schedule_distribution_tenant_deploy(&self, id: String) {
816        let state = Arc::clone(&self.state);
817        let delay = self.propagation_delay;
818        let store = self.snapshot_store.clone();
819        let lock = self.snapshot_lock.clone();
820        tokio::spawn(async move {
821            tokio::time::sleep(delay).await;
822            let flipped = {
823                let mut s = state.write();
824                let mut flipped = false;
825                for account in s.accounts.values_mut() {
826                    if let Some(t) = account.distribution_tenants.get_mut(&id) {
827                        if t.status == "InProgress" {
828                            t.status = "Deployed".to_string();
829                            flipped = true;
830                        }
831                        break;
832                    }
833                }
834                flipped
835            };
836            if flipped {
837                save_cloudfront_snapshot(&state, store, &lock).await;
838            }
839        });
840    }
841
842    /// Same as [`schedule_distribution_deploy`] but for ConnectionGroups.
843    pub(crate) fn schedule_connection_group_deploy(&self, id: String) {
844        let state = Arc::clone(&self.state);
845        let delay = self.propagation_delay;
846        let store = self.snapshot_store.clone();
847        let lock = self.snapshot_lock.clone();
848        tokio::spawn(async move {
849            tokio::time::sleep(delay).await;
850            let flipped = {
851                let mut s = state.write();
852                let mut flipped = false;
853                for account in s.accounts.values_mut() {
854                    if let Some(g) = account.connection_groups.get_mut(&id) {
855                        if g.status == "InProgress" {
856                            g.status = "Deployed".to_string();
857                            flipped = true;
858                        }
859                        break;
860                    }
861                }
862                flipped
863            };
864            if flipped {
865                save_cloudfront_snapshot(&state, store, &lock).await;
866            }
867        });
868    }
869
870    /// Same as [`schedule_distribution_deploy`] but for StreamingDistributions.
871    /// Real CloudFront mirrors the `InProgress` -> `Deployed` lifecycle for
872    /// RTMP streaming distributions, even though the resource is largely
873    /// deprecated.
874    pub(crate) fn schedule_streaming_distribution_deploy(&self, id: String) {
875        let state = Arc::clone(&self.state);
876        let delay = self.propagation_delay;
877        let store = self.snapshot_store.clone();
878        let lock = self.snapshot_lock.clone();
879        tokio::spawn(async move {
880            tokio::time::sleep(delay).await;
881            let flipped = {
882                let mut s = state.write();
883                let mut flipped = false;
884                for account in s.accounts.values_mut() {
885                    if let Some(d) = account.streaming_distributions.get_mut(&id) {
886                        if d.status == "InProgress" {
887                            d.status = "Deployed".to_string();
888                            flipped = true;
889                        }
890                        break;
891                    }
892                }
893                flipped
894            };
895            if flipped {
896                save_cloudfront_snapshot(&state, store, &lock).await;
897            }
898        });
899    }
900
901    fn get_distribution(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
902        let id = route
903            .id
904            .as_deref()
905            .ok_or_else(|| invalid_argument("missing distribution id"))?;
906        let state = self.state.read();
907        let account = state
908            .accounts
909            .get(DEFAULT_ACCOUNT)
910            .ok_or_else(|| no_such_distribution(id))?;
911        let dist = account
912            .distributions
913            .get(id)
914            .ok_or_else(|| no_such_distribution(id))?
915            .clone();
916        drop(state);
917        let body = build_distribution_xml(&dist);
918        let mut headers = HeaderMap::new();
919        set_header(&mut headers, ETAG, &dist.etag);
920        Ok(xml_response(StatusCode::OK, body, headers))
921    }
922
923    fn get_distribution_config(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
924        let id = route
925            .id
926            .as_deref()
927            .ok_or_else(|| invalid_argument("missing distribution id"))?;
928        let state = self.state.read();
929        let account = state
930            .accounts
931            .get(DEFAULT_ACCOUNT)
932            .ok_or_else(|| no_such_distribution(id))?;
933        let dist = account
934            .distributions
935            .get(id)
936            .ok_or_else(|| no_such_distribution(id))?
937            .clone();
938        drop(state);
939        let body = xml_io::to_xml_root("DistributionConfig", &dist.config)
940            .map_err(|e| internal_error(format!("xml encode failed: {e}")))?;
941        let mut headers = HeaderMap::new();
942        set_header(&mut headers, ETAG, &dist.etag);
943        Ok(xml_response(StatusCode::OK, body, headers))
944    }
945
946    fn update_distribution(
947        &self,
948        req: &AwsRequest,
949        route: &Route,
950    ) -> Result<AwsResponse, AwsServiceError> {
951        let id = route
952            .id
953            .as_deref()
954            .ok_or_else(|| invalid_argument("missing distribution id"))?;
955        let if_match = req
956            .headers
957            .get(IF_MATCH)
958            .and_then(|v| v.to_str().ok())
959            .ok_or_else(|| {
960                aws_error(
961                    StatusCode::BAD_REQUEST,
962                    "InvalidIfMatchVersion",
963                    "Missing If-Match header for UpdateDistribution",
964                )
965            })?
966            .to_string();
967        let new_config: DistributionConfig = xml_io::from_xml_root(&req.body)
968            .map_err(|e| invalid_argument(format!("invalid DistributionConfig XML: {e}")))?;
969        validate_caller_reference(&new_config.caller_reference)?;
970        validate_origins(&new_config)?;
971
972        let mut state = self.state.write();
973        let account = state
974            .accounts
975            .get_mut(DEFAULT_ACCOUNT)
976            .ok_or_else(|| no_such_distribution(id))?;
977        let dist = account
978            .distributions
979            .get_mut(id)
980            .ok_or_else(|| no_such_distribution(id))?;
981        if dist.etag != if_match {
982            return Err(aws_error(
983                StatusCode::PRECONDITION_FAILED,
984                "PreconditionFailed",
985                "If-Match header does not match the current ETag",
986            ));
987        }
988        // ETag stability: only bump the ETag and flip status back to
989        // InProgress when the new config actually differs from what we
990        // have on disk. A no-op UpdateDistribution (PUT the same config
991        // back) leaves the ETag intact, matching AWS behavior.
992        let config_changed = !configs_equal(&dist.config, &new_config);
993        if config_changed {
994            dist.config = new_config;
995            dist.etag = generate_etag();
996            dist.last_modified_time = Utc::now();
997            // UpdateDistribution kicks off a fresh edge propagation; AWS
998            // flips the status back to InProgress until the new config
999            // lands.
1000            dist.status = "InProgress".to_string();
1001        }
1002        let snapshot = dist.clone();
1003        drop(state);
1004
1005        if config_changed {
1006            self.schedule_distribution_deploy(id.to_string());
1007        }
1008
1009        let body = build_distribution_xml(&snapshot);
1010        let mut headers = HeaderMap::new();
1011        set_header(&mut headers, ETAG, &snapshot.etag);
1012        Ok(xml_response(StatusCode::OK, body, headers))
1013    }
1014
1015    fn delete_distribution(
1016        &self,
1017        req: &AwsRequest,
1018        route: &Route,
1019    ) -> Result<AwsResponse, AwsServiceError> {
1020        let id = route
1021            .id
1022            .as_deref()
1023            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1024        let if_match = req
1025            .headers
1026            .get(IF_MATCH)
1027            .and_then(|v| v.to_str().ok())
1028            .ok_or_else(|| {
1029                aws_error(
1030                    StatusCode::BAD_REQUEST,
1031                    "InvalidIfMatchVersion",
1032                    "Missing If-Match header for DeleteDistribution",
1033                )
1034            })?
1035            .to_string();
1036        let mut state = self.state.write();
1037        let account = state
1038            .accounts
1039            .get_mut(DEFAULT_ACCOUNT)
1040            .ok_or_else(|| no_such_distribution(id))?;
1041        {
1042            let dist = account
1043                .distributions
1044                .get(id)
1045                .ok_or_else(|| no_such_distribution(id))?;
1046            if dist.etag != if_match {
1047                return Err(aws_error(
1048                    StatusCode::PRECONDITION_FAILED,
1049                    "PreconditionFailed",
1050                    "If-Match header does not match the current ETag",
1051                ));
1052            }
1053            if dist.config.enabled {
1054                return Err(aws_error(
1055                    StatusCode::PRECONDITION_FAILED,
1056                    "DistributionNotDisabled",
1057                    "Distribution must be disabled before delete",
1058                ));
1059            }
1060        }
1061        let removed = account.distributions.remove(id).unwrap();
1062        account.tags.remove(&removed.arn);
1063        Ok(empty_response(StatusCode::NO_CONTENT))
1064    }
1065
1066    fn list_distributions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1067        let state = self.state.read();
1068        let mut dists: Vec<StoredDistribution> = state
1069            .accounts
1070            .values()
1071            .flat_map(|a| a.distributions.values().cloned())
1072            .collect();
1073        drop(state);
1074        dists.sort_by_key(|a| a.last_modified_time);
1075
1076        // CloudFront paginates with an exclusive Marker (start after this id) +
1077        // MaxItems; shared with the ListDistributionsBy* handlers.
1078        let (marker, max_items, page, next_marker) = paginate_distributions(req, dists);
1079
1080        let body = build_distribution_list_xml(
1081            &page,
1082            "DistributionList",
1083            &marker,
1084            max_items,
1085            next_marker.as_deref(),
1086        );
1087        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1088    }
1089
1090    fn list_distributions_by(
1091        &self,
1092        req: &AwsRequest,
1093        route: &Route,
1094        action: &str,
1095    ) -> Result<AwsResponse, AwsServiceError> {
1096        // Each "by-X" listing has a Smithy-required identifier (path or query)
1097        // plus, for ConnectionMode, an enum-constrained discriminator. The
1098        // synthetic probe sends `negative_omit_*` variants without these, so
1099        // every handler that can short-circuit on a missing identifier must
1100        // do so up-front. Otherwise the empty-list 200 looks like an honest
1101        // pass to the probe and a 100% conformance number is unreachable.
1102        match action {
1103            // Path-id ops: route.id is the URL placeholder. If the probe
1104            // omitted the field, the substitution left the literal
1105            // `{Member}` braces in place — easy to detect.
1106            "ListDistributionsByCachePolicyId"
1107            | "ListDistributionsByOriginRequestPolicyId"
1108            | "ListDistributionsByResponseHeadersPolicyId"
1109            | "ListDistributionsByKeyGroup"
1110            | "ListDistributionsByWebACLId"
1111            | "ListDistributionsByVpcOriginId"
1112            | "ListDistributionsByAnycastIpListId"
1113            | "ListDistributionsByOwnedResource" => {
1114                let id = route.id.as_deref().unwrap_or("");
1115                if is_placeholder_label(id) {
1116                    return Err(invalid_argument(format!(
1117                        "Required URL identifier for {action} is missing or invalid"
1118                    )));
1119                }
1120            }
1121            "ListDistributionsByConnectionMode" => {
1122                let id = route.id.as_deref().unwrap_or("");
1123                if is_placeholder_label(id) {
1124                    return Err(invalid_argument(
1125                        "ConnectionMode is required for ListDistributionsByConnectionMode",
1126                    ));
1127                }
1128                if id != "direct" && id != "tenant-only" {
1129                    return Err(invalid_argument(format!(
1130                        "ConnectionMode must be 'direct' or 'tenant-only', got '{id}'"
1131                    )));
1132                }
1133            }
1134            "ListDistributionsByConnectionFunction"
1135                if parse_query_value(&req.raw_query, "ConnectionFunctionIdentifier").is_none() =>
1136            {
1137                return Err(invalid_argument(
1138                    "ConnectionFunctionIdentifier query parameter is required",
1139                ));
1140            }
1141            "ListDistributionsByTrustStore"
1142                if parse_query_value(&req.raw_query, "TrustStoreIdentifier").is_none() =>
1143            {
1144                return Err(invalid_argument(
1145                    "TrustStoreIdentifier query parameter is required",
1146                ));
1147            }
1148            _ => {}
1149        }
1150
1151        // The "by-X" listings each have a distinct response root element.
1152        // For the seven predicate ops whose match field is persisted on the
1153        // distribution config we filter the live state; the remaining ops
1154        // (owned-resource, connection-mode/function, trust-store,
1155        // realtime-log-config) are not yet indexed and stay empty.
1156        let root = match action {
1157            "ListDistributionsByCachePolicyId"
1158            | "ListDistributionsByOriginRequestPolicyId"
1159            | "ListDistributionsByResponseHeadersPolicyId"
1160            | "ListDistributionsByKeyGroup"
1161            | "ListDistributionsByVpcOriginId" => "DistributionIdList",
1162            "ListDistributionsByOwnedResource" => "DistributionIdOwnerList",
1163            _ => "DistributionList",
1164        };
1165
1166        // Collect all distributions once, sorted for stable ordering.
1167        let mut all: Vec<StoredDistribution> = {
1168            let state = self.state.read();
1169            state
1170                .accounts
1171                .values()
1172                .flat_map(|a| a.distributions.values().cloned())
1173                .collect()
1174        };
1175        all.sort_by_key(|d| d.last_modified_time);
1176
1177        let path_id = route.id.as_deref().unwrap_or("");
1178        let matched: Vec<StoredDistribution> = match action {
1179            "ListDistributionsByCachePolicyId" => all
1180                .into_iter()
1181                .filter(|d| distribution_uses_cache_policy(d, path_id))
1182                .collect(),
1183            "ListDistributionsByOriginRequestPolicyId" => all
1184                .into_iter()
1185                .filter(|d| distribution_uses_origin_request_policy(d, path_id))
1186                .collect(),
1187            "ListDistributionsByResponseHeadersPolicyId" => all
1188                .into_iter()
1189                .filter(|d| distribution_uses_response_headers_policy(d, path_id))
1190                .collect(),
1191            "ListDistributionsByKeyGroup" => all
1192                .into_iter()
1193                .filter(|d| distribution_uses_key_group(d, path_id))
1194                .collect(),
1195            "ListDistributionsByVpcOriginId" => all
1196                .into_iter()
1197                .filter(|d| distribution_uses_vpc_origin(d, path_id))
1198                .collect(),
1199            // CloudFront treats the literal WebACLId "null" as "list the
1200            // distributions that aren't associated with any web ACL".
1201            "ListDistributionsByWebACLId" if path_id == "null" => all
1202                .into_iter()
1203                .filter(|d| {
1204                    d.config
1205                        .web_acl_id
1206                        .as_deref()
1207                        .map(|w| w.is_empty())
1208                        .unwrap_or(true)
1209                })
1210                .collect(),
1211            "ListDistributionsByWebACLId" => all
1212                .into_iter()
1213                .filter(|d| d.config.web_acl_id.as_deref() == Some(path_id))
1214                .collect(),
1215            "ListDistributionsByAnycastIpListId" => all
1216                .into_iter()
1217                .filter(|d| d.config.anycast_ip_list_id.as_deref() == Some(path_id))
1218                .collect(),
1219            _ => Vec::new(),
1220        };
1221
1222        // Mirror the Marker/MaxItems pagination the plain ListDistributions
1223        // handler applies so large predicate result sets page correctly.
1224        let (marker, max_items, page, next_marker) = paginate_distributions(req, matched);
1225
1226        let body = if root == "DistributionIdList" {
1227            let ids: Vec<&str> = page.iter().map(|d| d.id.as_str()).collect();
1228            build_distribution_id_list_xml(&ids, &marker, max_items, next_marker.as_deref())
1229        } else if root == "DistributionList"
1230            && matches!(
1231                action,
1232                "ListDistributionsByWebACLId" | "ListDistributionsByAnycastIpListId"
1233            )
1234        {
1235            build_distribution_list_xml(
1236                &page,
1237                "DistributionList",
1238                &marker,
1239                max_items,
1240                next_marker.as_deref(),
1241            )
1242        } else {
1243            build_empty_distribution_id_list(root)
1244        };
1245        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1246    }
1247
1248    fn copy_distribution(
1249        &self,
1250        req: &AwsRequest,
1251        route: &Route,
1252    ) -> Result<AwsResponse, AwsServiceError> {
1253        let primary_id = route
1254            .id
1255            .as_deref()
1256            .ok_or_else(|| invalid_argument("missing primary distribution id"))?;
1257        let if_match = req
1258            .headers
1259            .get(IF_MATCH)
1260            .and_then(|v| v.to_str().ok())
1261            .ok_or_else(|| {
1262                aws_error(
1263                    StatusCode::BAD_REQUEST,
1264                    "InvalidIfMatchVersion",
1265                    "Missing If-Match header for CopyDistribution",
1266                )
1267            })?
1268            .to_string();
1269        let parsed: CopyDistributionRequest = xml_io::from_xml_root(&req.body)
1270            .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
1271        validate_caller_reference(&parsed.caller_reference)?;
1272        let mut state = self.state.write();
1273        let account = state
1274            .accounts
1275            .get_mut(DEFAULT_ACCOUNT)
1276            .ok_or_else(|| no_such_distribution(primary_id))?;
1277        let primary = account
1278            .distributions
1279            .get(primary_id)
1280            .ok_or_else(|| no_such_distribution(primary_id))?
1281            .clone();
1282        if primary.etag != if_match {
1283            return Err(aws_error(
1284                StatusCode::PRECONDITION_FAILED,
1285                "PreconditionFailed",
1286                "If-Match header does not match the current ETag",
1287            ));
1288        }
1289        if account
1290            .distributions
1291            .values()
1292            .any(|d| d.config.caller_reference == parsed.caller_reference)
1293        {
1294            return Err(aws_error(
1295                StatusCode::CONFLICT,
1296                "DistributionAlreadyExists",
1297                "Distribution with the same CallerReference exists",
1298            ));
1299        }
1300        let new_id = generate_distribution_id();
1301        let mut config = primary.config.clone();
1302        config.caller_reference = parsed.caller_reference;
1303        config.enabled = parsed.enabled.unwrap_or(false);
1304        config.staging = parsed.staging;
1305        let now = Utc::now();
1306        let etag = generate_etag();
1307        let arn = format!(
1308            "arn:aws:cloudfront::{}:distribution/{}",
1309            account_id(req),
1310            new_id
1311        );
1312        let stored = StoredDistribution {
1313            id: new_id.clone(),
1314            arn: arn.clone(),
1315            status: "InProgress".to_string(),
1316            last_modified_time: now,
1317            domain_name: format!("{}.cloudfront.net", new_id.to_lowercase()),
1318            in_progress_invalidation_batches: 0,
1319            etag: etag.clone(),
1320            config,
1321            bound_port: None,
1322        };
1323        account.distributions.insert(new_id.clone(), stored.clone());
1324        drop(state);
1325        self.schedule_distribution_deploy(new_id);
1326        let body = build_distribution_xml(&stored);
1327        let mut headers = HeaderMap::new();
1328        set_header(&mut headers, ETAG, &etag);
1329        set_header(&mut headers, LOCATION, &stored.arn);
1330        Ok(xml_response(StatusCode::CREATED, body, headers))
1331    }
1332}
1333
1334#[derive(Debug, serde::Deserialize, Default)]
1335#[serde(rename_all = "PascalCase")]
1336struct CopyDistributionRequest {
1337    caller_reference: String,
1338    #[serde(default)]
1339    enabled: Option<bool>,
1340    #[serde(default)]
1341    staging: Option<bool>,
1342}
1343
1344// ─── Invalidations ────────────────────────────────────────────────────
1345
1346impl CloudFrontService {
1347    fn create_invalidation(
1348        &self,
1349        req: &AwsRequest,
1350        route: &Route,
1351    ) -> Result<AwsResponse, AwsServiceError> {
1352        let dist_id = route
1353            .id
1354            .as_deref()
1355            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1356        let batch: InvalidationBatch = xml_io::from_xml_root(&req.body)
1357            .map_err(|e| invalid_argument(format!("invalid InvalidationBatch XML: {e}")))?;
1358        if batch.caller_reference.is_empty() {
1359            return Err(invalid_argument("CallerReference is required"));
1360        }
1361        if batch.paths.quantity < 1 {
1362            return Err(invalid_argument(
1363                "InvalidationBatch.Paths must be non-empty",
1364            ));
1365        }
1366        let mut state = self.state.write();
1367        let account = state.entry(DEFAULT_ACCOUNT);
1368        if !account.distributions.contains_key(dist_id) {
1369            return Err(no_such_distribution(dist_id));
1370        }
1371        let id = generate_invalidation_id();
1372        let stored = StoredInvalidation {
1373            id: id.clone(),
1374            distribution_id: dist_id.to_string(),
1375            status: "Completed".to_string(),
1376            create_time: Utc::now(),
1377            batch: batch.clone(),
1378        };
1379        account.invalidations.insert(id.clone(), stored.clone());
1380        drop(state);
1381        let body = build_invalidation_xml(&stored);
1382        let mut headers = HeaderMap::new();
1383        set_header(
1384            &mut headers,
1385            LOCATION,
1386            &format!(
1387                "/2020-05-31/distribution/{dist_id}/invalidation/{}",
1388                stored.id
1389            ),
1390        );
1391        Ok(xml_response(StatusCode::CREATED, body, headers))
1392    }
1393
1394    fn get_invalidation(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
1395        let dist_id = route
1396            .id
1397            .as_deref()
1398            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1399        let inv_id = route
1400            .second_id
1401            .as_deref()
1402            .ok_or_else(|| invalid_argument("missing invalidation id"))?;
1403        let state = self.state.read();
1404        let account = state
1405            .accounts
1406            .get(DEFAULT_ACCOUNT)
1407            .ok_or_else(|| no_such_invalidation(inv_id))?;
1408        if !account.distributions.contains_key(dist_id) {
1409            return Err(no_such_distribution(dist_id));
1410        }
1411        let inv = account
1412            .invalidations
1413            .get(inv_id)
1414            .filter(|i| i.distribution_id == dist_id)
1415            .ok_or_else(|| no_such_invalidation(inv_id))?
1416            .clone();
1417        drop(state);
1418        let body = build_invalidation_xml(&inv);
1419        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1420    }
1421
1422    fn list_invalidations(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
1423        let dist_id = route
1424            .id
1425            .as_deref()
1426            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1427        let state = self.state.read();
1428        let account = state
1429            .accounts
1430            .get(DEFAULT_ACCOUNT)
1431            .ok_or_else(|| no_such_distribution(dist_id))?;
1432        if !account.distributions.contains_key(dist_id) {
1433            return Err(no_such_distribution(dist_id));
1434        }
1435        let mut items: Vec<&StoredInvalidation> = account
1436            .invalidations
1437            .values()
1438            .filter(|i| i.distribution_id == dist_id)
1439            .collect();
1440        items.sort_by_key(|a| a.create_time);
1441        let body = build_invalidation_list_xml(&items);
1442        drop(state);
1443        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1444    }
1445}
1446
1447// ─── Tags ─────────────────────────────────────────────────────────────
1448
1449impl CloudFrontService {
1450    fn parse_arn_query(query: &str) -> Option<String> {
1451        for pair in query.split('&').filter(|p| !p.is_empty()) {
1452            if let Some(rest) = pair.strip_prefix("Resource=") {
1453                return Some(percent_decode(rest));
1454            }
1455        }
1456        None
1457    }
1458
1459    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1460        let arn = Self::parse_arn_query(&req.raw_query)
1461            .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1462        let parsed: ModelTags = xml_io::from_xml_root(&req.body)
1463            .map_err(|e| invalid_argument(format!("invalid Tags XML: {e}")))?;
1464        let new_tags: Vec<Tag> = parsed
1465            .items
1466            .map(|i| {
1467                i.tag
1468                    .into_iter()
1469                    .map(|t| Tag {
1470                        key: t.key,
1471                        value: t.value,
1472                    })
1473                    .collect()
1474            })
1475            .unwrap_or_default();
1476        let mut state = self.state.write();
1477        let account = state.entry(DEFAULT_ACCOUNT);
1478        let entry = account.tags.entry(arn).or_default();
1479        for tag in new_tags {
1480            if let Some(existing) = entry.iter_mut().find(|t| t.key == tag.key) {
1481                existing.value = tag.value;
1482            } else {
1483                entry.push(tag);
1484            }
1485        }
1486        Ok(empty_response(StatusCode::NO_CONTENT))
1487    }
1488
1489    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1490        let arn = Self::parse_arn_query(&req.raw_query)
1491            .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1492        let parsed: TagKeys = xml_io::from_xml_root(&req.body)
1493            .map_err(|e| invalid_argument(format!("invalid TagKeys XML: {e}")))?;
1494        let keys: Vec<String> = parsed.items.map(|k| k.key).unwrap_or_default();
1495        let mut state = self.state.write();
1496        let account = state.entry(DEFAULT_ACCOUNT);
1497        if let Some(existing) = account.tags.get_mut(&arn) {
1498            existing.retain(|t| !keys.contains(&t.key));
1499        }
1500        Ok(empty_response(StatusCode::NO_CONTENT))
1501    }
1502
1503    fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1504        let arn = Self::parse_arn_query(&req.raw_query)
1505            .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1506        let state = self.state.read();
1507        let tags = state
1508            .accounts
1509            .get(DEFAULT_ACCOUNT)
1510            .and_then(|a| a.tags.get(&arn))
1511            .cloned()
1512            .unwrap_or_default();
1513        drop(state);
1514        let body = build_tags_xml(&tags);
1515        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1516    }
1517}
1518
1519// ─── Aliases / WebACL ─────────────────────────────────────────────────
1520
1521impl CloudFrontService {
1522    fn associate_alias(
1523        &self,
1524        req: &AwsRequest,
1525        route: &Route,
1526    ) -> Result<AwsResponse, AwsServiceError> {
1527        let id = route
1528            .id
1529            .as_deref()
1530            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1531        let alias = parse_query_value(&req.raw_query, "Alias")
1532            .ok_or_else(|| invalid_argument("Alias query parameter is required"))?;
1533        let mut state = self.state.write();
1534        let account = state
1535            .accounts
1536            .get_mut(DEFAULT_ACCOUNT)
1537            .ok_or_else(|| no_such_distribution(id))?;
1538        // Reject if the alias is already attached to a different distribution.
1539        if let Some(other) = account.distributions.values().find(|d| {
1540            d.id != id
1541                && d.config
1542                    .aliases
1543                    .as_ref()
1544                    .and_then(|a| a.items.as_ref())
1545                    .is_some_and(|i| i.cname.iter().any(|c| c == &alias))
1546        }) {
1547            return Err(aws_error(
1548                StatusCode::CONFLICT,
1549                "CNAMEAlreadyExists",
1550                format!(
1551                    "Alias {alias} is already associated with distribution {}",
1552                    other.id
1553                ),
1554            ));
1555        }
1556        let dist = account
1557            .distributions
1558            .get_mut(id)
1559            .ok_or_else(|| no_such_distribution(id))?;
1560        let aliases = dist.config.aliases.get_or_insert_with(Default::default);
1561        let items = aliases
1562            .items
1563            .get_or_insert_with(crate::model::AliasItems::default);
1564        if !items.cname.iter().any(|c| c == &alias) {
1565            items.cname.push(alias.clone());
1566            aliases.quantity = items.cname.len() as i32;
1567        }
1568        dist.etag = generate_etag();
1569        dist.last_modified_time = Utc::now();
1570        Ok(empty_response(StatusCode::OK))
1571    }
1572
1573    fn list_conflicting_aliases(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1574        let alias = parse_query_value(&req.raw_query, "Alias")
1575            .ok_or_else(|| invalid_argument("Alias query parameter is required"))?;
1576        let dist_id = parse_query_value(&req.raw_query, "DistributionId")
1577            .ok_or_else(|| invalid_argument("DistributionId query parameter is required"))?;
1578        // aliasString max 253, distributionIdString max 25 per the Smithy
1579        // model. Reject probe-generated boundary variants that overrun the
1580        // documented length so they produce the declared InvalidArgument
1581        // instead of an empty 200.
1582        if alias.len() > 253 {
1583            return Err(invalid_argument(format!(
1584                "Alias length {} exceeds maximum 253",
1585                alias.len()
1586            )));
1587        }
1588        if dist_id.len() > 25 {
1589            return Err(invalid_argument(format!(
1590                "DistributionId length {} exceeds maximum 25",
1591                dist_id.len()
1592            )));
1593        }
1594        if let Some(max_items) = parse_query_value(&req.raw_query, "MaxItems") {
1595            let n: i64 = max_items.parse().map_err(|_| {
1596                invalid_argument(format!("MaxItems must be an integer, got '{max_items}'"))
1597            })?;
1598            if n > 100 {
1599                return Err(invalid_argument(format!(
1600                    "MaxItems {n} exceeds maximum 100"
1601                )));
1602            }
1603        }
1604        // We never produce conflicts because every alias is owned by one
1605        // distribution at most. Return an empty list with the proper shape.
1606        let body = format!(
1607            "{XML_DECL}<ConflictingAliasesList xmlns=\"{NS}\"><Quantity>0</Quantity></ConflictingAliasesList>",
1608            NS = crate::NAMESPACE,
1609            XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1610        );
1611        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1612    }
1613
1614    fn associate_web_acl(
1615        &self,
1616        req: &AwsRequest,
1617        route: &Route,
1618    ) -> Result<AwsResponse, AwsServiceError> {
1619        let id = route
1620            .id
1621            .as_deref()
1622            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1623        let parsed: AssociateAliasRequest = xml_io::from_xml_root(&req.body)
1624            .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
1625        let mut state = self.state.write();
1626        let account = state
1627            .accounts
1628            .get_mut(DEFAULT_ACCOUNT)
1629            .ok_or_else(|| no_such_distribution(id))?;
1630        let dist = account
1631            .distributions
1632            .get_mut(id)
1633            .ok_or_else(|| no_such_distribution(id))?;
1634        dist.config.web_acl_id = Some(parsed.web_acl_arn.clone());
1635        dist.etag = generate_etag();
1636        dist.last_modified_time = Utc::now();
1637        let body = format!(
1638            "{XML_DECL}<AssociateDistributionWebACLResult xmlns=\"{NS}\"><Id>{}</Id><WebACLArn>{}</WebACLArn></AssociateDistributionWebACLResult>",
1639            esc(id), esc(&parsed.web_acl_arn),
1640            NS = crate::NAMESPACE,
1641            XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1642        );
1643        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1644    }
1645
1646    fn disassociate_web_acl(
1647        &self,
1648        _req: &AwsRequest,
1649        route: &Route,
1650    ) -> Result<AwsResponse, AwsServiceError> {
1651        let id = route
1652            .id
1653            .as_deref()
1654            .ok_or_else(|| invalid_argument("missing distribution id"))?;
1655        // DisassociateDistributionWebACL's Smithy model declares EntityNotFound
1656        // (not NoSuchDistribution) for unknown distribution IDs.
1657        let entity_not_found = || {
1658            aws_error(
1659                StatusCode::NOT_FOUND,
1660                "EntityNotFound",
1661                format!("The specified distribution does not exist: {id}"),
1662            )
1663        };
1664        let mut state = self.state.write();
1665        let account = state
1666            .accounts
1667            .get_mut(DEFAULT_ACCOUNT)
1668            .ok_or_else(entity_not_found)?;
1669        let dist = account
1670            .distributions
1671            .get_mut(id)
1672            .ok_or_else(entity_not_found)?;
1673        dist.config.web_acl_id = None;
1674        dist.etag = generate_etag();
1675        dist.last_modified_time = Utc::now();
1676        let body = format!(
1677            "{XML_DECL}<DisassociateDistributionWebACLResult xmlns=\"{NS}\"><Id>{}</Id></DisassociateDistributionWebACLResult>",
1678            esc(id),
1679            NS = crate::NAMESPACE,
1680            XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1681        );
1682        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1683    }
1684}
1685
1686#[derive(serde::Deserialize, Default, Debug)]
1687#[serde(rename_all = "PascalCase")]
1688struct AssociateAliasRequest {
1689    #[serde(rename = "WebACLArn", default)]
1690    web_acl_arn: String,
1691}
1692
1693// ─── XML body builders ────────────────────────────────────────────────
1694
1695/// XML-escape a user-provided string before injecting it into hand-rolled
1696/// XML response bodies. The 5 standard XML metacharacters are escaped;
1697/// everything else passes through unchanged. Keep this in sync with
1698/// `quick_xml`'s entity table — using their own primitive would mean a
1699/// `Writer` per call and we serialize directly into a `String`.
1700pub(crate) fn esc(s: &str) -> String {
1701    let mut out = String::with_capacity(s.len());
1702    for c in s.chars() {
1703        match c {
1704            '&' => out.push_str("&amp;"),
1705            '<' => out.push_str("&lt;"),
1706            '>' => out.push_str("&gt;"),
1707            '"' => out.push_str("&quot;"),
1708            '\'' => out.push_str("&apos;"),
1709            _ => out.push(c),
1710        }
1711    }
1712    out
1713}
1714
1715pub(crate) fn build_distribution_xml(dist: &StoredDistribution) -> String {
1716    let mut out = String::with_capacity(2048);
1717    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1718    out.push_str(&format!(
1719        "<Distribution xmlns=\"{ns}\">",
1720        ns = crate::NAMESPACE
1721    ));
1722    out.push_str(&format!("<Id>{}</Id>", esc(&dist.id)));
1723    out.push_str(&format!("<ARN>{}</ARN>", esc(&dist.arn)));
1724    out.push_str(&format!("<Status>{}</Status>", esc(&dist.status)));
1725    out.push_str(&format!(
1726        "<LastModifiedTime>{}</LastModifiedTime>",
1727        rfc3339(&dist.last_modified_time)
1728    ));
1729    out.push_str(&format!(
1730        "<InProgressInvalidationBatches>{}</InProgressInvalidationBatches>",
1731        dist.in_progress_invalidation_batches
1732    ));
1733    out.push_str(&format!(
1734        "<DomainName>{}</DomainName>",
1735        esc(&dist.domain_name)
1736    ));
1737    out.push_str("<ActiveTrustedSigners><Enabled>false</Enabled><Quantity>0</Quantity></ActiveTrustedSigners>");
1738    out.push_str("<ActiveTrustedKeyGroups><Enabled>false</Enabled><Quantity>0</Quantity></ActiveTrustedKeyGroups>");
1739    let inner = quick_xml::se::to_string_with_root("DistributionConfig", &dist.config)
1740        .unwrap_or_else(|_| String::new());
1741    out.push_str(&inner);
1742    out.push_str("</Distribution>");
1743    out
1744}
1745
1746fn build_distribution_list_xml(
1747    dists: &[StoredDistribution],
1748    root: &str,
1749    marker: &str,
1750    max_items: usize,
1751    next_marker: Option<&str>,
1752) -> String {
1753    let mut out = String::with_capacity(2048);
1754    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1755    out.push_str(&format!("<{root} xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1756    out.push_str(&format!("<Marker>{}</Marker>", esc(marker)));
1757    if let Some(nm) = next_marker {
1758        out.push_str(&format!("<NextMarker>{}</NextMarker>", esc(nm)));
1759    }
1760    out.push_str(&format!("<MaxItems>{max_items}</MaxItems>"));
1761    out.push_str(&format!(
1762        "<IsTruncated>{}</IsTruncated>",
1763        next_marker.is_some()
1764    ));
1765    out.push_str(&format!("<Quantity>{}</Quantity>", dists.len()));
1766    if dists.is_empty() {
1767        out.push_str(&format!("</{root}>"));
1768        return out;
1769    }
1770    out.push_str("<Items>");
1771    for d in dists {
1772        out.push_str("<DistributionSummary>");
1773        out.push_str(&format!("<Id>{}</Id>", esc(&d.id)));
1774        out.push_str(&format!("<ARN>{}</ARN>", esc(&d.arn)));
1775        out.push_str(&format!("<Status>{}</Status>", esc(&d.status)));
1776        out.push_str(&format!(
1777            "<LastModifiedTime>{}</LastModifiedTime>",
1778            rfc3339(&d.last_modified_time)
1779        ));
1780        out.push_str(&format!("<DomainName>{}</DomainName>", esc(&d.domain_name)));
1781        let aliases = d.config.aliases.clone().unwrap_or_default();
1782        out.push_str(&render_inline("Aliases", &aliases));
1783        let origins = d.config.origins.clone();
1784        out.push_str(&render_inline("Origins", &origins));
1785        let dcb = d.config.default_cache_behavior.clone();
1786        out.push_str(&render_inline("DefaultCacheBehavior", &dcb));
1787        let cb = d.config.cache_behaviors.clone().unwrap_or_default();
1788        out.push_str(&render_inline("CacheBehaviors", &cb));
1789        let cer = d.config.custom_error_responses.clone().unwrap_or_default();
1790        out.push_str(&render_inline("CustomErrorResponses", &cer));
1791        out.push_str(&format!("<Comment>{}</Comment>", esc(&d.config.comment)));
1792        out.push_str(&format!(
1793            "<PriceClass>{}</PriceClass>",
1794            esc(&d
1795                .config
1796                .price_class
1797                .clone()
1798                .unwrap_or_else(|| "PriceClass_All".to_string()))
1799        ));
1800        out.push_str(&format!("<Enabled>{}</Enabled>", d.config.enabled));
1801        out.push_str(&render_inline(
1802            "ViewerCertificate",
1803            &d.config.viewer_certificate.clone().unwrap_or_default(),
1804        ));
1805        out.push_str(&render_inline(
1806            "Restrictions",
1807            &d.config.restrictions.clone().unwrap_or_default(),
1808        ));
1809        out.push_str(&format!(
1810            "<WebACLId>{}</WebACLId>",
1811            esc(&d.config.web_acl_id.clone().unwrap_or_default())
1812        ));
1813        out.push_str(&format!(
1814            "<HttpVersion>{}</HttpVersion>",
1815            esc(&d
1816                .config
1817                .http_version
1818                .clone()
1819                .unwrap_or_else(|| "http2".to_string()))
1820        ));
1821        out.push_str(&format!(
1822            "<IsIPV6Enabled>{}</IsIPV6Enabled>",
1823            d.config.is_ipv6_enabled.unwrap_or(true)
1824        ));
1825        out.push_str(&format!(
1826            "<Staging>{}</Staging>",
1827            d.config.staging.unwrap_or(false)
1828        ));
1829        out.push_str("</DistributionSummary>");
1830    }
1831    out.push_str("</Items>");
1832    out.push_str(&format!("</{root}>"));
1833    out
1834}
1835
1836fn build_empty_distribution_id_list(root: &str) -> String {
1837    let mut out = String::with_capacity(256);
1838    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1839    out.push_str(&format!("<{root} xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1840    out.push_str("<Marker></Marker>");
1841    out.push_str("<MaxItems>100</MaxItems>");
1842    out.push_str("<IsTruncated>false</IsTruncated>");
1843    out.push_str("<Quantity>0</Quantity>");
1844    out.push_str(&format!("</{root}>"));
1845    out
1846}
1847
1848/// Render a `DistributionIdList` response body listing the given ids, echoing
1849/// the Marker/MaxItems cursor and NextMarker/IsTruncated pagination metadata.
1850fn build_distribution_id_list_xml(
1851    ids: &[&str],
1852    marker: &str,
1853    max_items: usize,
1854    next_marker: Option<&str>,
1855) -> String {
1856    let mut out = String::with_capacity(256);
1857    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1858    out.push_str(&format!(
1859        "<DistributionIdList xmlns=\"{ns}\">",
1860        ns = crate::NAMESPACE
1861    ));
1862    out.push_str(&format!("<Marker>{}</Marker>", esc(marker)));
1863    if let Some(nm) = next_marker {
1864        out.push_str(&format!("<NextMarker>{}</NextMarker>", esc(nm)));
1865    }
1866    out.push_str(&format!("<MaxItems>{max_items}</MaxItems>"));
1867    out.push_str(&format!(
1868        "<IsTruncated>{}</IsTruncated>",
1869        next_marker.is_some()
1870    ));
1871    out.push_str(&format!("<Quantity>{}</Quantity>", ids.len()));
1872    if !ids.is_empty() {
1873        out.push_str("<Items>");
1874        for id in ids {
1875            out.push_str(&format!("<DistributionId>{}</DistributionId>", esc(id)));
1876        }
1877        out.push_str("</Items>");
1878    }
1879    out.push_str("</DistributionIdList>");
1880    out
1881}
1882
1883/// Apply the CloudFront Marker/MaxItems pagination scheme (the same one the
1884/// plain `ListDistributions` handler uses) to an already-filtered, sorted set
1885/// of distributions. Returns the echoed marker, the effective MaxItems, the
1886/// page slice, and the NextMarker cursor when the result is truncated.
1887fn paginate_distributions(
1888    req: &AwsRequest,
1889    items: Vec<StoredDistribution>,
1890) -> (String, usize, Vec<StoredDistribution>, Option<String>) {
1891    let max_items = req
1892        .query_params
1893        .get("MaxItems")
1894        .or_else(|| req.query_params.get("maxitems"))
1895        .and_then(|v| v.parse::<usize>().ok())
1896        .filter(|n| *n > 0)
1897        .unwrap_or(100);
1898    let marker = req
1899        .query_params
1900        .get("Marker")
1901        .or_else(|| req.query_params.get("marker"))
1902        .cloned()
1903        .unwrap_or_default();
1904    // CloudFront markers are EXCLUSIVE: a supplied Marker means "start after
1905    // this id". If the marker id isn't found, the page is empty (start past
1906    // the end), matching AWS.
1907    let start_idx = if marker.is_empty() {
1908        0
1909    } else {
1910        match items.iter().position(|d| d.id == marker) {
1911            Some(pos) => pos + 1,
1912            None => items.len(),
1913        }
1914    };
1915    let page: Vec<StoredDistribution> = items
1916        .iter()
1917        .skip(start_idx)
1918        .take(max_items)
1919        .cloned()
1920        .collect();
1921    // Truncated when unread items remain after this page. NextMarker is the id
1922    // of the LAST item on the current page (where listing left off), so the
1923    // caller passes it back as the next Marker to resume strictly after it.
1924    let has_more = start_idx + page.len() < items.len();
1925    let next_marker = if has_more {
1926        page.last().map(|d| d.id.clone())
1927    } else {
1928        None
1929    };
1930    (marker, max_items, page, next_marker)
1931}
1932
1933/// Iterate the default cache behavior followed by every additional cache
1934/// behavior of a distribution.
1935fn distribution_cache_behaviors(
1936    d: &StoredDistribution,
1937) -> impl Iterator<Item = &crate::model::CacheBehavior> {
1938    d.config
1939        .cache_behaviors
1940        .as_ref()
1941        .and_then(|cb| cb.items.as_ref())
1942        .into_iter()
1943        .flat_map(|items| items.cache_behavior.iter())
1944}
1945
1946fn distribution_uses_cache_policy(d: &StoredDistribution, id: &str) -> bool {
1947    d.config.default_cache_behavior.cache_policy_id.as_deref() == Some(id)
1948        || distribution_cache_behaviors(d).any(|b| b.cache_policy_id.as_deref() == Some(id))
1949}
1950
1951fn distribution_uses_origin_request_policy(d: &StoredDistribution, id: &str) -> bool {
1952    d.config
1953        .default_cache_behavior
1954        .origin_request_policy_id
1955        .as_deref()
1956        == Some(id)
1957        || distribution_cache_behaviors(d)
1958            .any(|b| b.origin_request_policy_id.as_deref() == Some(id))
1959}
1960
1961fn distribution_uses_response_headers_policy(d: &StoredDistribution, id: &str) -> bool {
1962    d.config
1963        .default_cache_behavior
1964        .response_headers_policy_id
1965        .as_deref()
1966        == Some(id)
1967        || distribution_cache_behaviors(d)
1968            .any(|b| b.response_headers_policy_id.as_deref() == Some(id))
1969}
1970
1971fn trusted_key_groups_contains(tkg: Option<&crate::model::TrustedKeyGroups>, id: &str) -> bool {
1972    tkg.and_then(|g| g.items.as_ref())
1973        .map(|items| items.key_group.iter().any(|k| k == id))
1974        .unwrap_or(false)
1975}
1976
1977fn distribution_uses_key_group(d: &StoredDistribution, id: &str) -> bool {
1978    trusted_key_groups_contains(
1979        d.config.default_cache_behavior.trusted_key_groups.as_ref(),
1980        id,
1981    ) || distribution_cache_behaviors(d)
1982        .any(|b| trusted_key_groups_contains(b.trusted_key_groups.as_ref(), id))
1983}
1984
1985fn distribution_uses_vpc_origin(d: &StoredDistribution, id: &str) -> bool {
1986    d.config
1987        .origins
1988        .items
1989        .as_ref()
1990        .map(|items| {
1991            items.origin.iter().any(|o| {
1992                o.vpc_origin_config
1993                    .as_ref()
1994                    .map(|v| v.vpc_origin_id == id)
1995                    .unwrap_or(false)
1996            })
1997        })
1998        .unwrap_or(false)
1999}
2000
2001fn build_invalidation_xml(inv: &StoredInvalidation) -> String {
2002    let mut out = String::with_capacity(512);
2003    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
2004    out.push_str(&format!(
2005        "<Invalidation xmlns=\"{ns}\">",
2006        ns = crate::NAMESPACE
2007    ));
2008    out.push_str(&format!("<Id>{}</Id>", esc(&inv.id)));
2009    out.push_str(&format!("<Status>{}</Status>", esc(&inv.status)));
2010    out.push_str(&format!(
2011        "<CreateTime>{}</CreateTime>",
2012        rfc3339(&inv.create_time)
2013    ));
2014    out.push_str(&render_inline("InvalidationBatch", &inv.batch));
2015    out.push_str("</Invalidation>");
2016    out
2017}
2018
2019fn build_invalidation_list_xml(items: &[&StoredInvalidation]) -> String {
2020    let mut out = String::with_capacity(1024);
2021    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
2022    out.push_str(&format!(
2023        "<InvalidationList xmlns=\"{ns}\">",
2024        ns = crate::NAMESPACE
2025    ));
2026    out.push_str("<Marker></Marker>");
2027    out.push_str("<MaxItems>100</MaxItems>");
2028    out.push_str("<IsTruncated>false</IsTruncated>");
2029    out.push_str(&format!("<Quantity>{}</Quantity>", items.len()));
2030    if !items.is_empty() {
2031        out.push_str("<Items>");
2032        for inv in items {
2033            out.push_str("<InvalidationSummary>");
2034            out.push_str(&format!("<Id>{}</Id>", esc(&inv.id)));
2035            out.push_str(&format!(
2036                "<CreateTime>{}</CreateTime>",
2037                rfc3339(&inv.create_time)
2038            ));
2039            out.push_str(&format!("<Status>{}</Status>", esc(&inv.status)));
2040            out.push_str("</InvalidationSummary>");
2041        }
2042        out.push_str("</Items>");
2043    }
2044    out.push_str("</InvalidationList>");
2045    out
2046}
2047
2048fn build_tags_xml(tags: &[Tag]) -> String {
2049    let mut out = String::with_capacity(256);
2050    out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
2051    out.push_str(&format!("<Tags xmlns=\"{ns}\">", ns = crate::NAMESPACE));
2052    out.push_str("<Items>");
2053    for t in tags {
2054        out.push_str("<Tag>");
2055        out.push_str(&format!("<Key>{}</Key>", esc(&t.key)));
2056        if let Some(v) = &t.value {
2057            out.push_str(&format!("<Value>{}</Value>", esc(v)));
2058        }
2059        out.push_str("</Tag>");
2060    }
2061    out.push_str("</Items>");
2062    out.push_str("</Tags>");
2063    out
2064}
2065
2066fn render_inline<T: serde::Serialize>(root: &str, value: &T) -> String {
2067    quick_xml::se::to_string_with_root(root, value).unwrap_or_default()
2068}
2069
2070// ─── Helpers ──────────────────────────────────────────────────────────
2071
2072fn validate_caller_reference(s: &str) -> Result<(), AwsServiceError> {
2073    if s.is_empty() {
2074        return Err(invalid_argument("CallerReference is required"));
2075    }
2076    Ok(())
2077}
2078
2079fn validate_origins(config: &DistributionConfig) -> Result<(), AwsServiceError> {
2080    if config.origins.quantity < 1 {
2081        return Err(invalid_argument(
2082            "DistributionConfig.Origins must contain at least one origin",
2083        ));
2084    }
2085    Ok(())
2086}
2087
2088/// Compare two `DistributionConfig`s by serializing them to canonical XML
2089/// and comparing bytes. Used by `UpdateDistribution` to detect no-op writes
2090/// so the ETag stays stable when the caller PUTs the same config back.
2091/// Falls back to "not equal" if either serialization fails so we still
2092/// honor the request rather than silently swallow a write.
2093fn configs_equal(lhs: &DistributionConfig, rhs: &DistributionConfig) -> bool {
2094    let Ok(a) = xml_io::to_xml_root("DistributionConfig", lhs) else {
2095        return false;
2096    };
2097    let Ok(b) = xml_io::to_xml_root("DistributionConfig", rhs) else {
2098        return false;
2099    };
2100    a == b
2101}
2102
2103fn account_id(_req: &AwsRequest) -> &'static str {
2104    // Multi-account is wired through AwsRequest.account_id elsewhere; the
2105    // CloudFront control plane only uses the resolved id for the ARN
2106    // suffix. Until that field stabilizes for REST-XML we use the default
2107    // account ID consistently with the rest of the registered services.
2108    DEFAULT_ACCOUNT
2109}
2110
2111fn generate_distribution_id() -> String {
2112    // CloudFront IDs are 14-char base32-ish uppercase strings starting with E.
2113    let raw = Uuid::new_v4().simple().to_string().to_uppercase();
2114    format!("E{}", &raw[..13])
2115}
2116
2117fn generate_invalidation_id() -> String {
2118    let raw = Uuid::new_v4().simple().to_string().to_uppercase();
2119    format!("I{}", &raw[..13])
2120}
2121
2122pub(crate) fn generate_etag() -> String {
2123    let raw = Uuid::new_v4().simple().to_string().to_uppercase();
2124    format!("E{}", &raw[..13])
2125}
2126
2127/// Generate an AWS-shaped CloudFront resource ID with the given prefix.
2128/// Used by Batch 2 policy resources (cache, origin request, response
2129/// headers, continuous deployment, OAC) so each gets a recognizable
2130/// alphabetic prefix in addition to the random suffix.
2131pub(crate) fn generate_id_with_prefix(prefix: &str) -> String {
2132    let raw = Uuid::new_v4().simple().to_string().to_uppercase();
2133    let suffix_len = 14usize.saturating_sub(prefix.len()).min(raw.len());
2134    format!("{prefix}{}", &raw[..suffix_len])
2135}
2136
2137fn rfc3339(t: &chrono::DateTime<chrono::Utc>) -> String {
2138    t.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
2139}
2140
2141pub(crate) fn invalid_argument(msg: impl Into<String>) -> AwsServiceError {
2142    aws_error(StatusCode::BAD_REQUEST, "InvalidArgument", msg)
2143}
2144
2145fn no_such_distribution(id: &str) -> AwsServiceError {
2146    aws_error(
2147        StatusCode::NOT_FOUND,
2148        "NoSuchDistribution",
2149        format!("The specified distribution does not exist: {id}"),
2150    )
2151}
2152
2153fn no_such_invalidation(id: &str) -> AwsServiceError {
2154    aws_error(
2155        StatusCode::NOT_FOUND,
2156        "NoSuchInvalidation",
2157        format!("The specified invalidation does not exist: {id}"),
2158    )
2159}
2160
2161fn internal_error(msg: impl Into<String>) -> AwsServiceError {
2162    aws_error(StatusCode::INTERNAL_SERVER_ERROR, "InternalError", msg)
2163}
2164
2165pub(crate) fn aws_error(
2166    status: StatusCode,
2167    code: impl Into<String>,
2168    msg: impl Into<String>,
2169) -> AwsServiceError {
2170    AwsServiceError::aws_error(status, code.into(), msg)
2171}
2172
2173fn set_header(headers: &mut HeaderMap, name: HeaderName, value: &str) {
2174    if let Ok(v) = HeaderValue::from_str(value) {
2175        headers.insert(name, v);
2176    }
2177}
2178
2179pub(crate) fn xml_response(status: StatusCode, body: String, headers: HeaderMap) -> AwsResponse {
2180    AwsResponse {
2181        status,
2182        content_type: "text/xml".to_string(),
2183        body: ResponseBody::Bytes(Bytes::from(body)),
2184        headers,
2185    }
2186}
2187
2188fn empty_response(status: StatusCode) -> AwsResponse {
2189    AwsResponse {
2190        status,
2191        content_type: "text/xml".to_string(),
2192        body: ResponseBody::Bytes(Bytes::new()),
2193        headers: HeaderMap::new(),
2194    }
2195}
2196
2197fn percent_decode(input: &str) -> String {
2198    let mut out = String::with_capacity(input.len());
2199    let bytes = input.as_bytes();
2200    let mut i = 0;
2201    while i < bytes.len() {
2202        let b = bytes[i];
2203        if b == b'%' && i + 2 < bytes.len() {
2204            if let (Some(a), Some(c)) = (hex_digit(bytes[i + 1]), hex_digit(bytes[i + 2])) {
2205                out.push(((a << 4) | c) as char);
2206                i += 3;
2207                continue;
2208            }
2209        }
2210        if b == b'+' {
2211            out.push(' ');
2212        } else {
2213            out.push(b as char);
2214        }
2215        i += 1;
2216    }
2217    out
2218}
2219
2220fn hex_digit(b: u8) -> Option<u8> {
2221    match b {
2222        b'0'..=b'9' => Some(b - b'0'),
2223        b'a'..=b'f' => Some(b - b'a' + 10),
2224        b'A'..=b'F' => Some(b - b'A' + 10),
2225        _ => None,
2226    }
2227}
2228
2229/// True when a URL-decoded label segment looks like an unsubstituted Smithy
2230/// URI placeholder (e.g. `{Identifier}` or its percent-encoded form
2231/// `%7BIdentifier%7D`). Conformance probes that omit a required `@httpLabel`
2232/// input leave the literal placeholder in the URL; handlers that can
2233/// short-circuit on it return the declared validation error instead of a
2234/// fabricated 200.
2235pub(crate) fn is_placeholder_label(value: &str) -> bool {
2236    if value.is_empty() {
2237        return true;
2238    }
2239    let lower = value.to_ascii_lowercase();
2240    value.starts_with('{') || lower.starts_with("%7b")
2241}
2242
2243/// Best-effort field extractor for request bodies the probe sends as JSON
2244/// even when the service is REST-XML. Walks the body trying JSON first,
2245/// then a naive XML element scan. Returns the value for the first occurrence
2246/// of `key`. Used by handlers that need to validate optional/required body
2247/// members up-front (enum bounds, length, presence) without committing to a
2248/// full strongly-typed parse — the actual handler still uses the typed XML
2249/// parser for the structured fields it consumes.
2250pub(crate) fn extract_body_field(body: &[u8], key: &str) -> Option<String> {
2251    if let Ok(s) = std::str::from_utf8(body) {
2252        let trimmed = s.trim_start();
2253        if trimmed.starts_with('{') {
2254            if let Ok(v) = serde_json::from_str::<serde_json::Value>(trimmed) {
2255                if let Some(field) = v.get(key) {
2256                    return match field {
2257                        serde_json::Value::String(s) => Some(s.clone()),
2258                        serde_json::Value::Number(n) => Some(n.to_string()),
2259                        serde_json::Value::Bool(b) => Some(b.to_string()),
2260                        _ => None,
2261                    };
2262                }
2263                return None;
2264            }
2265        }
2266        // Fall back to a naive XML extraction for genuine XML bodies.
2267        let open = format!("<{key}>");
2268        let close = format!("</{key}>");
2269        if let Some(start) = s.find(&open) {
2270            let after = start + open.len();
2271            if let Some(end_rel) = s[after..].find(&close) {
2272                return Some(s[after..after + end_rel].to_string());
2273            }
2274        }
2275    }
2276    None
2277}
2278
2279fn parse_query_value(query: &str, key: &str) -> Option<String> {
2280    let prefix = format!("{key}=");
2281    for pair in query.split('&').filter(|p| !p.is_empty()) {
2282        if let Some(rest) = pair.strip_prefix(&prefix) {
2283            return Some(percent_decode(rest));
2284        }
2285    }
2286    None
2287}
2288
2289#[cfg(test)]
2290mod tests {
2291    use super::*;
2292
2293    #[test]
2294    fn distribution_list_xml_renders_pagination_fields() {
2295        // Truncated page: echoes the request marker, the requested MaxItems,
2296        // IsTruncated=true, and the NextMarker cursor.
2297        let xml =
2298            build_distribution_list_xml(&[], "DistributionList", "EDFDVBD6", 2, Some("E2NEXT"));
2299        assert!(xml.contains("<Marker>EDFDVBD6</Marker>"));
2300        assert!(xml.contains("<MaxItems>2</MaxItems>"));
2301        assert!(xml.contains("<IsTruncated>true</IsTruncated>"));
2302        assert!(xml.contains("<NextMarker>E2NEXT</NextMarker>"));
2303        // Final page: no NextMarker, IsTruncated=false.
2304        let xml = build_distribution_list_xml(&[], "DistributionList", "", 100, None);
2305        assert!(xml.contains("<IsTruncated>false</IsTruncated>"));
2306        assert!(!xml.contains("<NextMarker>"));
2307    }
2308
2309    #[test]
2310    fn placeholder_label_detects_braces_and_percent_encoding() {
2311        assert!(is_placeholder_label(""));
2312        assert!(is_placeholder_label("{Identifier}"));
2313        assert!(is_placeholder_label("%7BIdentifier%7D"));
2314        assert!(is_placeholder_label("%7bidentifier%7d"));
2315        assert!(!is_placeholder_label("E1234567890ABC"));
2316        assert!(!is_placeholder_label(
2317            "arn:aws:cloudfront::000:distribution/E1"
2318        ));
2319    }
2320
2321    #[test]
2322    fn extract_body_field_handles_json_and_xml() {
2323        let json = br#"{"Stage":"BROKEN","Marker":"x"}"#;
2324        assert_eq!(
2325            extract_body_field(json, "Stage"),
2326            Some("BROKEN".to_string())
2327        );
2328        assert_eq!(extract_body_field(json, "MaxItems"), None);
2329
2330        let xml = br#"<?xml version="1.0"?><Body><Domain>example.com</Domain></Body>"#;
2331        assert_eq!(
2332            extract_body_field(xml, "Domain"),
2333            Some("example.com".to_string())
2334        );
2335        assert_eq!(extract_body_field(xml, "Missing"), None);
2336
2337        assert_eq!(extract_body_field(b"", "x"), None);
2338    }
2339
2340    fn make_state() -> SharedCloudFrontState {
2341        Arc::new(RwLock::new(CloudFrontAccounts::new()))
2342    }
2343
2344    fn make_request(method: http::Method, path: &str, query: &str, body: &str) -> AwsRequest {
2345        AwsRequest {
2346            service: "cloudfront".into(),
2347            action: String::new(),
2348            region: "us-east-1".into(),
2349            account_id: DEFAULT_ACCOUNT.into(),
2350            request_id: Uuid::new_v4().to_string(),
2351            headers: HeaderMap::new(),
2352            query_params: std::collections::HashMap::new(),
2353            body_stream: parking_lot::Mutex::new(None),
2354            body: Bytes::from(body.to_string()),
2355            path_segments: path
2356                .split('/')
2357                .filter(|s| !s.is_empty())
2358                .map(String::from)
2359                .collect(),
2360            raw_path: path.into(),
2361            raw_query: query.into(),
2362            method,
2363            is_query_protocol: false,
2364            access_key_id: None,
2365            principal: None,
2366        }
2367    }
2368
2369    fn minimal_dist_config_xml(caller_ref: &str) -> String {
2370        format!(
2371            r#"<?xml version="1.0" encoding="UTF-8"?>
2372<DistributionConfig xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2373  <CallerReference>{caller_ref}</CallerReference>
2374  <Origins>
2375    <Quantity>1</Quantity>
2376    <Items>
2377      <Origin>
2378        <Id>primary</Id>
2379        <DomainName>example.com</DomainName>
2380      </Origin>
2381    </Items>
2382  </Origins>
2383  <DefaultCacheBehavior>
2384    <TargetOriginId>primary</TargetOriginId>
2385    <ViewerProtocolPolicy>allow-all</ViewerProtocolPolicy>
2386  </DefaultCacheBehavior>
2387  <Comment></Comment>
2388  <Enabled>true</Enabled>
2389</DistributionConfig>"#
2390        )
2391    }
2392
2393    #[tokio::test]
2394    async fn create_then_get_then_delete_distribution() {
2395        let svc = CloudFrontService::new(make_state());
2396        let body = minimal_dist_config_xml("ref-1");
2397        let create = svc
2398            .handle(make_request(
2399                http::Method::POST,
2400                "/2020-05-31/distribution",
2401                "",
2402                &body,
2403            ))
2404            .await
2405            .unwrap();
2406        assert_eq!(create.status, StatusCode::CREATED);
2407        let etag = create
2408            .headers
2409            .get(ETAG)
2410            .unwrap()
2411            .to_str()
2412            .unwrap()
2413            .to_string();
2414        let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2415        let id = xml
2416            .split("<Id>")
2417            .nth(1)
2418            .unwrap()
2419            .split("</Id>")
2420            .next()
2421            .unwrap()
2422            .to_string();
2423
2424        let get = svc
2425            .handle(make_request(
2426                http::Method::GET,
2427                &format!("/2020-05-31/distribution/{id}"),
2428                "",
2429                "",
2430            ))
2431            .await
2432            .unwrap();
2433        assert_eq!(get.status, StatusCode::OK);
2434
2435        // Disable then delete (CloudFront requires Disabled before delete).
2436        let disable_body = body.replace("<Enabled>true</Enabled>", "<Enabled>false</Enabled>");
2437        let mut update_req = make_request(
2438            http::Method::PUT,
2439            &format!("/2020-05-31/distribution/{id}/config"),
2440            "",
2441            &disable_body,
2442        );
2443        update_req.headers.insert(IF_MATCH, etag.parse().unwrap());
2444        let updated = svc.handle(update_req).await.unwrap();
2445        assert_eq!(updated.status, StatusCode::OK);
2446        let new_etag = updated
2447            .headers
2448            .get(ETAG)
2449            .unwrap()
2450            .to_str()
2451            .unwrap()
2452            .to_string();
2453
2454        let mut del_req = make_request(
2455            http::Method::DELETE,
2456            &format!("/2020-05-31/distribution/{id}"),
2457            "",
2458            "",
2459        );
2460        del_req.headers.insert(IF_MATCH, new_etag.parse().unwrap());
2461        let del = svc.handle(del_req).await.unwrap();
2462        assert_eq!(del.status, StatusCode::NO_CONTENT);
2463    }
2464
2465    async fn create_distribution_returning_id(svc: &CloudFrontService, caller_ref: &str) -> String {
2466        let body = minimal_dist_config_xml(caller_ref);
2467        let create = svc
2468            .handle(make_request(
2469                http::Method::POST,
2470                "/2020-05-31/distribution",
2471                "",
2472                &body,
2473            ))
2474            .await
2475            .unwrap();
2476        let xml = std::str::from_utf8(create.body.expect_bytes())
2477            .unwrap()
2478            .to_string();
2479        xml.split("<Id>")
2480            .nth(1)
2481            .unwrap()
2482            .split("</Id>")
2483            .next()
2484            .unwrap()
2485            .to_string()
2486    }
2487
2488    fn distribution_status(svc: &CloudFrontService, id: &str) -> String {
2489        let state = svc.state.read();
2490        state
2491            .accounts
2492            .get(DEFAULT_ACCOUNT)
2493            .and_then(|a| a.distributions.get(id))
2494            .map(|d| d.status.clone())
2495            .unwrap_or_default()
2496    }
2497
2498    #[tokio::test]
2499    async fn create_distribution_starts_in_progress() {
2500        // Use a long delay so the auto-tick can't race the assertion.
2501        let svc = CloudFrontService::new(make_state())
2502            .with_propagation_delay(std::time::Duration::from_secs(60));
2503        let body = minimal_dist_config_xml("status-ref");
2504        let create = svc
2505            .handle(make_request(
2506                http::Method::POST,
2507                "/2020-05-31/distribution",
2508                "",
2509                &body,
2510            ))
2511            .await
2512            .unwrap();
2513        let xml = std::str::from_utf8(create.body.expect_bytes())
2514            .unwrap()
2515            .to_string();
2516        assert!(
2517            xml.contains("<Status>InProgress</Status>"),
2518            "expected initial status InProgress, got: {xml}"
2519        );
2520    }
2521
2522    #[tokio::test]
2523    async fn auto_transition_after_tick_marks_deployed() {
2524        let svc = CloudFrontService::new(make_state())
2525            .with_propagation_delay(std::time::Duration::from_millis(50));
2526        let id = create_distribution_returning_id(&svc, "auto-tick-ref").await;
2527        assert_eq!(distribution_status(&svc, &id), "InProgress");
2528        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2529        assert_eq!(distribution_status(&svc, &id), "Deployed");
2530    }
2531
2532    #[tokio::test]
2533    async fn set_distribution_status_via_admin_flips_synchronously() {
2534        let svc = CloudFrontService::new(make_state())
2535            .with_propagation_delay(std::time::Duration::from_secs(60));
2536        let id = create_distribution_returning_id(&svc, "admin-ref").await;
2537        assert_eq!(distribution_status(&svc, &id), "InProgress");
2538        assert!(svc.set_distribution_status(&id, "Deployed"));
2539        assert_eq!(distribution_status(&svc, &id), "Deployed");
2540        assert!(svc.set_distribution_status(&id, "InProgress"));
2541        assert_eq!(distribution_status(&svc, &id), "InProgress");
2542    }
2543
2544    #[tokio::test]
2545    async fn set_distribution_status_unknown_id_returns_false() {
2546        let svc = CloudFrontService::new(make_state());
2547        assert!(!svc.set_distribution_status("E-DOES-NOT-EXIST", "Deployed"));
2548    }
2549
2550    #[tokio::test]
2551    async fn update_distribution_resets_to_in_progress() {
2552        let svc = CloudFrontService::new(make_state())
2553            .with_propagation_delay(std::time::Duration::from_secs(60));
2554        let body = minimal_dist_config_xml("update-reset-ref");
2555        let create = svc
2556            .handle(make_request(
2557                http::Method::POST,
2558                "/2020-05-31/distribution",
2559                "",
2560                &body,
2561            ))
2562            .await
2563            .unwrap();
2564        let etag = create
2565            .headers
2566            .get(ETAG)
2567            .unwrap()
2568            .to_str()
2569            .unwrap()
2570            .to_string();
2571        let xml = std::str::from_utf8(create.body.expect_bytes())
2572            .unwrap()
2573            .to_string();
2574        let id = xml
2575            .split("<Id>")
2576            .nth(1)
2577            .unwrap()
2578            .split("</Id>")
2579            .next()
2580            .unwrap()
2581            .to_string();
2582        // Force the distribution to Deployed via the admin mutator so we can
2583        // observe the UpdateDistribution flip back to InProgress.
2584        assert!(svc.set_distribution_status(&id, "Deployed"));
2585        assert_eq!(distribution_status(&svc, &id), "Deployed");
2586
2587        let updated_body = body.replace(
2588            "<ViewerProtocolPolicy>allow-all</ViewerProtocolPolicy>",
2589            "<ViewerProtocolPolicy>https-only</ViewerProtocolPolicy>",
2590        );
2591        let mut update_req = make_request(
2592            http::Method::PUT,
2593            &format!("/2020-05-31/distribution/{id}/config"),
2594            "",
2595            &updated_body,
2596        );
2597        update_req.headers.insert(IF_MATCH, etag.parse().unwrap());
2598        let updated = svc.handle(update_req).await.unwrap();
2599        assert_eq!(updated.status, StatusCode::OK);
2600        assert_eq!(distribution_status(&svc, &id), "InProgress");
2601    }
2602
2603    #[tokio::test]
2604    async fn duplicate_caller_reference_is_rejected() {
2605        let svc = CloudFrontService::new(make_state());
2606        let body = minimal_dist_config_xml("dup-ref");
2607        svc.handle(make_request(
2608            http::Method::POST,
2609            "/2020-05-31/distribution",
2610            "",
2611            &body,
2612        ))
2613        .await
2614        .unwrap();
2615        let result = svc
2616            .handle(make_request(
2617                http::Method::POST,
2618                "/2020-05-31/distribution",
2619                "",
2620                &body,
2621            ))
2622            .await;
2623        let err = match result {
2624            Ok(_) => panic!("expected duplicate caller-reference to fail"),
2625            Err(e) => e,
2626        };
2627        assert_eq!(err.code(), "DistributionAlreadyExists");
2628        assert_eq!(err.status(), StatusCode::CONFLICT);
2629    }
2630
2631    #[tokio::test]
2632    async fn invalidation_lifecycle() {
2633        let svc = CloudFrontService::new(make_state());
2634        let body = minimal_dist_config_xml("inv-ref");
2635        let create = svc
2636            .handle(make_request(
2637                http::Method::POST,
2638                "/2020-05-31/distribution",
2639                "",
2640                &body,
2641            ))
2642            .await
2643            .unwrap();
2644        let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2645        let dist_id = xml
2646            .split("<Id>")
2647            .nth(1)
2648            .unwrap()
2649            .split("</Id>")
2650            .next()
2651            .unwrap();
2652        let inv_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2653<InvalidationBatch xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2654  <Paths><Quantity>1</Quantity><Items><Path>/*</Path></Items></Paths>
2655  <CallerReference>inv-1</CallerReference>
2656</InvalidationBatch>"#;
2657        let inv_resp = svc
2658            .handle(make_request(
2659                http::Method::POST,
2660                &format!("/2020-05-31/distribution/{dist_id}/invalidation"),
2661                "",
2662                inv_body,
2663            ))
2664            .await
2665            .unwrap();
2666        assert_eq!(inv_resp.status, StatusCode::CREATED);
2667        let inv_xml = std::str::from_utf8(inv_resp.body.expect_bytes()).unwrap();
2668        let inv_id = inv_xml
2669            .split("<Id>")
2670            .nth(1)
2671            .unwrap()
2672            .split("</Id>")
2673            .next()
2674            .unwrap();
2675        let get = svc
2676            .handle(make_request(
2677                http::Method::GET,
2678                &format!("/2020-05-31/distribution/{dist_id}/invalidation/{inv_id}"),
2679                "",
2680                "",
2681            ))
2682            .await
2683            .unwrap();
2684        assert_eq!(get.status, StatusCode::OK);
2685        let list = svc
2686            .handle(make_request(
2687                http::Method::GET,
2688                &format!("/2020-05-31/distribution/{dist_id}/invalidation"),
2689                "",
2690                "",
2691            ))
2692            .await
2693            .unwrap();
2694        let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2695        assert!(xml.contains("<Quantity>1</Quantity>"));
2696    }
2697
2698    #[tokio::test]
2699    async fn tags_roundtrip() {
2700        let svc = CloudFrontService::new(make_state());
2701        let body = minimal_dist_config_xml("tag-ref");
2702        let create = svc
2703            .handle(make_request(
2704                http::Method::POST,
2705                "/2020-05-31/distribution",
2706                "",
2707                &body,
2708            ))
2709            .await
2710            .unwrap();
2711        let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2712        let arn = xml
2713            .split("<ARN>")
2714            .nth(1)
2715            .unwrap()
2716            .split("</ARN>")
2717            .next()
2718            .unwrap();
2719        let tag_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2720<Tags xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2721  <Items><Tag><Key>env</Key><Value>prod</Value></Tag></Items>
2722</Tags>"#;
2723        let arn_q = format!("Operation=Tag&Resource={}", arn);
2724        let resp = svc
2725            .handle(make_request(
2726                http::Method::POST,
2727                "/2020-05-31/tagging",
2728                &arn_q,
2729                tag_body,
2730            ))
2731            .await
2732            .unwrap();
2733        assert_eq!(resp.status, StatusCode::NO_CONTENT);
2734        let list = svc
2735            .handle(make_request(
2736                http::Method::GET,
2737                "/2020-05-31/tagging",
2738                &format!("Resource={}", arn),
2739                "",
2740            ))
2741            .await
2742            .unwrap();
2743        let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2744        assert!(xml.contains("<Key>env</Key>"));
2745        assert!(xml.contains("<Value>prod</Value>"));
2746    }
2747
2748    #[tokio::test]
2749    async fn xml_metacharacters_in_user_input_are_escaped() {
2750        let svc = CloudFrontService::new(make_state());
2751        let body = minimal_dist_config_xml("escape-ref").replace(
2752            "<Comment></Comment>",
2753            "<Comment><![CDATA[a&b<c>d]]></Comment>",
2754        );
2755        let create = svc
2756            .handle(make_request(
2757                http::Method::POST,
2758                "/2020-05-31/distribution",
2759                "",
2760                &body,
2761            ))
2762            .await
2763            .unwrap();
2764        let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2765        let dist_id = xml
2766            .split("<Id>")
2767            .nth(1)
2768            .unwrap()
2769            .split("</Id>")
2770            .next()
2771            .unwrap();
2772        let arn = xml
2773            .split("<ARN>")
2774            .nth(1)
2775            .unwrap()
2776            .split("</ARN>")
2777            .next()
2778            .unwrap();
2779
2780        let tag_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2781<Tags xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2782  <Items><Tag><Key>env</Key><Value>a&amp;b&lt;c&gt;d</Value></Tag></Items>
2783</Tags>"#;
2784        let arn_q = format!("Operation=Tag&Resource={}", arn);
2785        svc.handle(make_request(
2786            http::Method::POST,
2787            "/2020-05-31/tagging",
2788            &arn_q,
2789            tag_body,
2790        ))
2791        .await
2792        .unwrap();
2793
2794        let list = svc
2795            .handle(make_request(
2796                http::Method::GET,
2797                "/2020-05-31/tagging",
2798                &format!("Resource={}", arn),
2799                "",
2800            ))
2801            .await
2802            .unwrap();
2803        let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2804        assert!(xml.contains("<Value>a&amp;b&lt;c&gt;d</Value>"));
2805        assert!(!xml.contains("<Value>a&b<c>d</Value>"));
2806
2807        // Force the distribution into the list-rendering path so the
2808        // unescaped Comment field would surface there.
2809        let list_resp = svc
2810            .handle(make_request(
2811                http::Method::GET,
2812                "/2020-05-31/distribution",
2813                "",
2814                "",
2815            ))
2816            .await
2817            .unwrap();
2818        let xml = std::str::from_utf8(list_resp.body.expect_bytes()).unwrap();
2819        // Ensure raw `<` from the user-supplied comment never lands inside
2820        // a Comment element on the wire.
2821        assert!(!xml.contains("<Comment>a&b<c>d"));
2822        assert!(xml.contains(dist_id));
2823    }
2824
2825    fn predicate_dist_config_xml(caller_ref: &str) -> String {
2826        format!(
2827            r#"<?xml version="1.0" encoding="UTF-8"?>
2828<DistributionConfig xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2829  <CallerReference>{caller_ref}</CallerReference>
2830  <Origins>
2831    <Quantity>1</Quantity>
2832    <Items>
2833      <Origin>
2834        <Id>primary</Id>
2835        <DomainName>example.com</DomainName>
2836        <VpcOriginConfig><VpcOriginId>vo-123</VpcOriginId></VpcOriginConfig>
2837      </Origin>
2838    </Items>
2839  </Origins>
2840  <DefaultCacheBehavior>
2841    <TargetOriginId>primary</TargetOriginId>
2842    <ViewerProtocolPolicy>allow-all</ViewerProtocolPolicy>
2843    <CachePolicyId>cp-123</CachePolicyId>
2844    <OriginRequestPolicyId>orp-123</OriginRequestPolicyId>
2845    <ResponseHeadersPolicyId>rhp-123</ResponseHeadersPolicyId>
2846    <TrustedKeyGroups>
2847      <Enabled>true</Enabled>
2848      <Quantity>1</Quantity>
2849      <Items><KeyGroup>kg-123</KeyGroup></Items>
2850    </TrustedKeyGroups>
2851  </DefaultCacheBehavior>
2852  <Comment></Comment>
2853  <WebACLId>waf-abc</WebACLId>
2854  <AnycastIpListId>ail-123</AnycastIpListId>
2855  <Enabled>true</Enabled>
2856</DistributionConfig>"#
2857        )
2858    }
2859
2860    async fn list_by(svc: &CloudFrontService, path: &str) -> String {
2861        let resp = svc
2862            .handle(make_request(http::Method::GET, path, "", ""))
2863            .await
2864            .unwrap();
2865        assert_eq!(resp.status, StatusCode::OK);
2866        std::str::from_utf8(resp.body.expect_bytes())
2867            .unwrap()
2868            .to_string()
2869    }
2870
2871    #[tokio::test]
2872    async fn list_distributions_by_predicate_fields_filters_state() {
2873        let svc = CloudFrontService::new(make_state());
2874        let create = svc
2875            .handle(make_request(
2876                http::Method::POST,
2877                "/2020-05-31/distribution",
2878                "",
2879                &predicate_dist_config_xml("pred-ref"),
2880            ))
2881            .await
2882            .unwrap();
2883        let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2884        let id = xml
2885            .split("<Id>")
2886            .nth(1)
2887            .unwrap()
2888            .split("</Id>")
2889            .next()
2890            .unwrap()
2891            .to_string();
2892
2893        // DistributionIdList-shaped ops: match id must appear, mismatch empty.
2894        for (path, root) in [
2895            ("/2020-05-31/distributionsByCachePolicyId/cp-123", "cp-123"),
2896            (
2897                "/2020-05-31/distributionsByOriginRequestPolicyId/orp-123",
2898                "orp-123",
2899            ),
2900            (
2901                "/2020-05-31/distributionsByResponseHeadersPolicyId/rhp-123",
2902                "rhp-123",
2903            ),
2904            ("/2020-05-31/distributionsByKeyGroupId/kg-123", "kg-123"),
2905            ("/2020-05-31/distributionsByVpcOriginId/vo-123", "vo-123"),
2906        ] {
2907            let body = list_by(&svc, path).await;
2908            assert!(body.contains("<DistributionIdList"), "{root}: {body}");
2909            assert!(
2910                body.contains(&format!("<DistributionId>{id}</DistributionId>")),
2911                "{root} did not list distribution: {body}"
2912            );
2913            assert!(body.contains("<Quantity>1</Quantity>"), "{root}: {body}");
2914        }
2915
2916        // Mismatched id -> empty.
2917        let empty = list_by(&svc, "/2020-05-31/distributionsByCachePolicyId/other").await;
2918        assert!(empty.contains("<Quantity>0</Quantity>"), "{empty}");
2919        assert!(!empty.contains("<DistributionId>"), "{empty}");
2920
2921        // DistributionList-shaped ops: WebACLId + AnycastIpListId.
2922        let web = list_by(&svc, "/2020-05-31/distributionsByWebACLId/waf-abc").await;
2923        assert!(web.contains("<DistributionList"), "{web}");
2924        assert!(web.contains(&format!("<Id>{id}</Id>")), "{web}");
2925
2926        let anycast = list_by(&svc, "/2020-05-31/distributionsByAnycastIpListId/ail-123").await;
2927        assert!(anycast.contains("<DistributionList"), "{anycast}");
2928        assert!(anycast.contains(&format!("<Id>{id}</Id>")), "{anycast}");
2929
2930        let anycast_miss = list_by(&svc, "/2020-05-31/distributionsByAnycastIpListId/nope").await;
2931        assert!(
2932            anycast_miss.contains("<Quantity>0</Quantity>"),
2933            "{anycast_miss}"
2934        );
2935    }
2936
2937    fn dist_config_with_cache_policy(caller_ref: &str, cache_policy: &str) -> String {
2938        format!(
2939            r#"<?xml version="1.0" encoding="UTF-8"?>
2940<DistributionConfig xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2941  <CallerReference>{caller_ref}</CallerReference>
2942  <Origins>
2943    <Quantity>1</Quantity>
2944    <Items><Origin><Id>primary</Id><DomainName>example.com</DomainName></Origin></Items>
2945  </Origins>
2946  <DefaultCacheBehavior>
2947    <TargetOriginId>primary</TargetOriginId>
2948    <ViewerProtocolPolicy>allow-all</ViewerProtocolPolicy>
2949    <CachePolicyId>{cache_policy}</CachePolicyId>
2950  </DefaultCacheBehavior>
2951  <Comment></Comment>
2952  <Enabled>true</Enabled>
2953</DistributionConfig>"#
2954        )
2955    }
2956
2957    async fn create_dist_returning_id(svc: &CloudFrontService, config_xml: &str) -> String {
2958        let create = svc
2959            .handle(make_request(
2960                http::Method::POST,
2961                "/2020-05-31/distribution",
2962                "",
2963                config_xml,
2964            ))
2965            .await
2966            .unwrap();
2967        let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2968        xml.split("<Id>")
2969            .nth(1)
2970            .unwrap()
2971            .split("</Id>")
2972            .next()
2973            .unwrap()
2974            .to_string()
2975    }
2976
2977    fn make_request_with_params(path: &str, params: &[(&str, &str)]) -> AwsRequest {
2978        let mut r = make_request(http::Method::GET, path, "", "");
2979        r.query_params = params
2980            .iter()
2981            .map(|(k, v)| (k.to_string(), v.to_string()))
2982            .collect();
2983        r
2984    }
2985
2986    #[tokio::test]
2987    async fn list_distributions_by_web_acl_id_null_lists_unassociated() {
2988        // Finding #2: WebACLId "null" lists distributions with no web ACL.
2989        let svc = CloudFrontService::new(make_state());
2990        let with_acl = create_dist_returning_id(
2991            &svc,
2992            &predicate_dist_config_xml("null-with-acl"), // has WebACLId waf-abc
2993        )
2994        .await;
2995        let without_acl =
2996            create_dist_returning_id(&svc, &dist_config_with_cache_policy("null-no-acl", "cp-x"))
2997                .await;
2998
2999        let body = list_by(&svc, "/2020-05-31/distributionsByWebACLId/null").await;
3000        assert!(body.contains("<DistributionList"), "{body}");
3001        assert!(
3002            body.contains(&format!("<Id>{without_acl}</Id>")),
3003            "unassociated distribution missing from null listing: {body}"
3004        );
3005        assert!(
3006            !body.contains(&format!("<Id>{with_acl}</Id>")),
3007            "distribution WITH a web ACL leaked into the null listing: {body}"
3008        );
3009    }
3010
3011    fn only_distribution_id(xml: &str) -> String {
3012        assert_eq!(
3013            xml.matches("<DistributionId>").count(),
3014            1,
3015            "expected exactly one DistributionId: {xml}"
3016        );
3017        xml.split("<DistributionId>")
3018            .nth(1)
3019            .unwrap()
3020            .split("</DistributionId>")
3021            .next()
3022            .unwrap()
3023            .to_string()
3024    }
3025
3026    #[tokio::test]
3027    async fn list_distributions_by_predicate_paginates() {
3028        // Finding #3: Marker/MaxItems pagination applies to the filtered set,
3029        // with EXCLUSIVE markers (NextMarker = last id on the page; the next
3030        // request resumes strictly after it).
3031        let svc = CloudFrontService::new(make_state());
3032        // Two distributions sharing the same cache policy.
3033        let d1 =
3034            create_dist_returning_id(&svc, &dist_config_with_cache_policy("pg-1", "cp-pg")).await;
3035        let d2 =
3036            create_dist_returning_id(&svc, &dist_config_with_cache_policy("pg-2", "cp-pg")).await;
3037
3038        // First page: MaxItems=1 -> one id, truncated, NextMarker == that id.
3039        let page1 = svc
3040            .handle(make_request_with_params(
3041                "/2020-05-31/distributionsByCachePolicyId/cp-pg",
3042                &[("MaxItems", "1")],
3043            ))
3044            .await
3045            .unwrap();
3046        let xml1 = std::str::from_utf8(page1.body.expect_bytes()).unwrap();
3047        assert!(xml1.contains("<MaxItems>1</MaxItems>"), "{xml1}");
3048        assert!(xml1.contains("<IsTruncated>true</IsTruncated>"), "{xml1}");
3049        let first_id = only_distribution_id(xml1);
3050        let next = xml1
3051            .split("<NextMarker>")
3052            .nth(1)
3053            .expect("NextMarker present")
3054            .split("</NextMarker>")
3055            .next()
3056            .unwrap()
3057            .to_string();
3058        // Exclusive marker: NextMarker is the LAST id on the current page.
3059        assert_eq!(next, first_id, "NextMarker must be the last id on the page");
3060
3061        // Second page via the cursor: resumes AFTER the marker -> the other id,
3062        // never the marker item again, and not truncated.
3063        let page2 = svc
3064            .handle(make_request_with_params(
3065                "/2020-05-31/distributionsByCachePolicyId/cp-pg",
3066                &[("MaxItems", "1"), ("Marker", &next)],
3067            ))
3068            .await
3069            .unwrap();
3070        let xml2 = std::str::from_utf8(page2.body.expect_bytes()).unwrap();
3071        assert!(xml2.contains(&format!("<Marker>{next}</Marker>")), "{xml2}");
3072        assert!(xml2.contains("<IsTruncated>false</IsTruncated>"), "{xml2}");
3073        let second_id = only_distribution_id(xml2);
3074        assert_ne!(
3075            second_id, first_id,
3076            "exclusive marker must not return the marker item again"
3077        );
3078        // The two pages together cover both distributions exactly once.
3079        let mut seen = [first_id, second_id];
3080        seen.sort();
3081        let mut expected = [d1, d2];
3082        expected.sort();
3083        assert_eq!(seen, expected, "pages did not cover both distributions");
3084    }
3085}