1use std::sync::Arc;
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use chrono::Utc;
8use http::header::{HeaderName, HeaderValue, ETAG, IF_MATCH, LOCATION};
9use http::{HeaderMap, StatusCode};
10use parking_lot::RwLock;
11use tokio::sync::Mutex as AsyncMutex;
12use uuid::Uuid;
13
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError, ResponseBody};
15use fakecloud_persistence::SnapshotStore;
16
17use crate::model::{
18 DistributionConfig, DistributionConfigWithTags, InvalidationBatch, TagKeys, Tags as ModelTags,
19};
20use crate::router::{route, Route};
21use crate::state::{
22 CloudFrontAccounts, CloudFrontSnapshot, SharedCloudFrontState, StoredDistribution,
23 StoredInvalidation, Tag, CLOUDFRONT_SNAPSHOT_SCHEMA_VERSION,
24};
25use crate::xml_io;
26
27fn is_mutating_action(action: &str) -> bool {
31 const PREFIXES: &[&str] = &[
32 "Create",
33 "Update",
34 "Delete",
35 "Copy",
36 "Associate",
37 "Disassociate",
38 "Tag",
39 "Untag",
40 "Publish",
41 "Put",
42 ];
43 PREFIXES.iter().any(|p| action.starts_with(p))
44}
45
46pub(crate) const DEFAULT_ACCOUNT: &str = "000000000000";
47
48const SUPPORTED_ACTIONS: &[&str] = &[
49 "CreateDistribution",
50 "CreateDistributionWithTags",
51 "GetDistribution",
52 "GetDistributionConfig",
53 "UpdateDistribution",
54 "DeleteDistribution",
55 "ListDistributions",
56 "CopyDistribution",
57 "CreateInvalidation",
58 "GetInvalidation",
59 "ListInvalidations",
60 "TagResource",
61 "UntagResource",
62 "ListTagsForResource",
63 "AssociateAlias",
64 "ListConflictingAliases",
65 "ListDistributionsByCachePolicyId",
66 "ListDistributionsByOriginRequestPolicyId",
67 "ListDistributionsByResponseHeadersPolicyId",
68 "ListDistributionsByKeyGroup",
69 "ListDistributionsByWebACLId",
70 "ListDistributionsByVpcOriginId",
71 "ListDistributionsByAnycastIpListId",
72 "ListDistributionsByConnectionMode",
73 "ListDistributionsByConnectionFunction",
74 "ListDistributionsByOwnedResource",
75 "ListDistributionsByTrustStore",
76 "ListDistributionsByRealtimeLogConfig",
77 "AssociateDistributionWebACL",
78 "DisassociateDistributionWebACL",
79 "CreateOriginAccessControl",
80 "GetOriginAccessControl",
81 "GetOriginAccessControlConfig",
82 "UpdateOriginAccessControl",
83 "DeleteOriginAccessControl",
84 "ListOriginAccessControls",
85 "CreateCachePolicy",
86 "GetCachePolicy",
87 "GetCachePolicyConfig",
88 "UpdateCachePolicy",
89 "DeleteCachePolicy",
90 "ListCachePolicies",
91 "CreateOriginRequestPolicy",
92 "GetOriginRequestPolicy",
93 "GetOriginRequestPolicyConfig",
94 "UpdateOriginRequestPolicy",
95 "DeleteOriginRequestPolicy",
96 "ListOriginRequestPolicies",
97 "CreateResponseHeadersPolicy",
98 "GetResponseHeadersPolicy",
99 "GetResponseHeadersPolicyConfig",
100 "UpdateResponseHeadersPolicy",
101 "DeleteResponseHeadersPolicy",
102 "ListResponseHeadersPolicies",
103 "CreateContinuousDeploymentPolicy",
104 "GetContinuousDeploymentPolicy",
105 "GetContinuousDeploymentPolicyConfig",
106 "UpdateContinuousDeploymentPolicy",
107 "DeleteContinuousDeploymentPolicy",
108 "ListContinuousDeploymentPolicies",
109 "CreateFunction",
110 "DescribeFunction",
111 "GetFunction",
112 "UpdateFunction",
113 "DeleteFunction",
114 "ListFunctions",
115 "PublishFunction",
116 "TestFunction",
117 "CreatePublicKey",
118 "GetPublicKey",
119 "GetPublicKeyConfig",
120 "UpdatePublicKey",
121 "DeletePublicKey",
122 "ListPublicKeys",
123 "CreateKeyGroup",
124 "GetKeyGroup",
125 "GetKeyGroupConfig",
126 "UpdateKeyGroup",
127 "DeleteKeyGroup",
128 "ListKeyGroups",
129 "CreateKeyValueStore",
130 "DescribeKeyValueStore",
131 "UpdateKeyValueStore",
132 "DeleteKeyValueStore",
133 "ListKeyValueStores",
134 "CreateCloudFrontOriginAccessIdentity",
135 "GetCloudFrontOriginAccessIdentity",
136 "GetCloudFrontOriginAccessIdentityConfig",
137 "UpdateCloudFrontOriginAccessIdentity",
138 "DeleteCloudFrontOriginAccessIdentity",
139 "ListCloudFrontOriginAccessIdentities",
140 "CreateMonitoringSubscription",
141 "GetMonitoringSubscription",
142 "DeleteMonitoringSubscription",
143 "CreateStreamingDistribution",
144 "CreateStreamingDistributionWithTags",
145 "GetStreamingDistribution",
146 "GetStreamingDistributionConfig",
147 "UpdateStreamingDistribution",
148 "DeleteStreamingDistribution",
149 "ListStreamingDistributions",
150 "CreateFieldLevelEncryptionConfig",
151 "GetFieldLevelEncryption",
152 "GetFieldLevelEncryptionConfig",
153 "UpdateFieldLevelEncryptionConfig",
154 "DeleteFieldLevelEncryptionConfig",
155 "ListFieldLevelEncryptionConfigs",
156 "CreateFieldLevelEncryptionProfile",
157 "GetFieldLevelEncryptionProfile",
158 "GetFieldLevelEncryptionProfileConfig",
159 "UpdateFieldLevelEncryptionProfile",
160 "DeleteFieldLevelEncryptionProfile",
161 "ListFieldLevelEncryptionProfiles",
162 "CreateRealtimeLogConfig",
163 "GetRealtimeLogConfig",
164 "UpdateRealtimeLogConfig",
165 "DeleteRealtimeLogConfig",
166 "ListRealtimeLogConfigs",
167 "CreateVpcOrigin",
168 "GetVpcOrigin",
169 "UpdateVpcOrigin",
170 "DeleteVpcOrigin",
171 "ListVpcOrigins",
172 "CreateAnycastIpList",
173 "GetAnycastIpList",
174 "UpdateAnycastIpList",
175 "DeleteAnycastIpList",
176 "ListAnycastIpLists",
177 "CreateTrustStore",
178 "GetTrustStore",
179 "UpdateTrustStore",
180 "DeleteTrustStore",
181 "ListTrustStores",
182 "GetResourcePolicy",
183 "PutResourcePolicy",
184 "DeleteResourcePolicy",
185 "CreateConnectionGroup",
186 "GetConnectionGroup",
187 "GetConnectionGroupByRoutingEndpoint",
188 "UpdateConnectionGroup",
189 "DeleteConnectionGroup",
190 "ListConnectionGroups",
191 "ListDomainConflicts",
192 "UpdateDomainAssociation",
193 "VerifyDnsConfiguration",
194 "GetManagedCertificateDetails",
195 "UpdateDistributionWithStagingConfig",
196 "CreateDistributionTenant",
197 "GetDistributionTenant",
198 "GetDistributionTenantByDomain",
199 "UpdateDistributionTenant",
200 "DeleteDistributionTenant",
201 "ListDistributionTenants",
202 "ListDistributionTenantsByCustomization",
203 "AssociateDistributionTenantWebACL",
204 "DisassociateDistributionTenantWebACL",
205 "CreateInvalidationForDistributionTenant",
206 "GetInvalidationForDistributionTenant",
207 "ListInvalidationsForDistributionTenant",
208 "CreateConnectionFunction",
209 "GetConnectionFunction",
210 "DescribeConnectionFunction",
211 "UpdateConnectionFunction",
212 "DeleteConnectionFunction",
213 "ListConnectionFunctions",
214 "PublishConnectionFunction",
215 "TestConnectionFunction",
216];
217
218pub struct CloudFrontService {
219 pub(crate) state: SharedCloudFrontState,
220 pub(crate) propagation_delay: std::time::Duration,
230 pub(crate) snapshot_store: Option<Arc<dyn SnapshotStore>>,
231 pub(crate) snapshot_lock: Arc<AsyncMutex<()>>,
232 dataplane_started: Arc<std::sync::atomic::AtomicBool>,
235}
236
237fn default_propagation_delay() -> std::time::Duration {
244 let Ok(raw) = std::env::var("FAKECLOUD_CLOUDFRONT_STATUS_DELAY_SEC") else {
245 return std::time::Duration::from_secs(1);
246 };
247 let trimmed = raw.trim();
248 if let Ok(secs) = trimmed.parse::<u64>() {
249 return std::time::Duration::from_secs(secs);
250 }
251 if let Ok(secs) = trimmed.parse::<f64>() {
252 if secs.is_finite() && secs >= 0.0 {
253 return std::time::Duration::from_secs_f64(secs);
254 }
255 }
256 std::time::Duration::from_secs(1)
257}
258
259impl CloudFrontService {
260 pub fn new(state: SharedCloudFrontState) -> Self {
261 Self {
262 state,
263 propagation_delay: default_propagation_delay(),
264 snapshot_store: None,
265 snapshot_lock: Arc::new(AsyncMutex::new(())),
266 dataplane_started: Arc::new(std::sync::atomic::AtomicBool::new(false)),
267 }
268 }
269
270 pub fn start_dataplane(&self, server_port: u16) {
278 use std::sync::atomic::Ordering;
279 if self
283 .dataplane_started
284 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
285 .is_err()
286 {
287 return;
288 }
289 crate::dataplane::spawn_dataplane(self.state.clone(), server_port);
290 }
291
292 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
293 self.snapshot_store = Some(store);
294 self
295 }
296
297 pub fn shared_state(&self) -> SharedCloudFrontState {
298 Arc::clone(&self.state)
299 }
300
301 async fn save_snapshot(&self) {
305 save_cloudfront_snapshot(
306 &self.state,
307 self.snapshot_store.clone(),
308 &self.snapshot_lock,
309 )
310 .await;
311 }
312
313 pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
318 let store = self.snapshot_store.clone()?;
319 let state = self.state.clone();
320 let lock = self.snapshot_lock.clone();
321 Some(Arc::new(move || {
322 let state = state.clone();
323 let store = store.clone();
324 let lock = lock.clone();
325 Box::pin(async move {
326 save_cloudfront_snapshot(&state, Some(store), &lock).await;
327 })
328 }))
329 }
330
331 pub fn rearm_in_progress(&self) {
336 let (dists, tenants, groups, streaming) = {
337 let s = self.state.read();
338 let mut dists = Vec::new();
339 let mut tenants = Vec::new();
340 let mut groups = Vec::new();
341 let mut streaming = Vec::new();
342 for account in s.accounts.values() {
343 for (id, d) in &account.distributions {
344 if d.status == "InProgress" {
345 dists.push(id.clone());
346 }
347 }
348 for (id, t) in &account.distribution_tenants {
349 if t.status == "InProgress" {
350 tenants.push(id.clone());
351 }
352 }
353 for (id, g) in &account.connection_groups {
354 if g.status == "InProgress" {
355 groups.push(id.clone());
356 }
357 }
358 for (id, d) in &account.streaming_distributions {
359 if d.status == "InProgress" {
360 streaming.push(id.clone());
361 }
362 }
363 }
364 (dists, tenants, groups, streaming)
365 };
366 for id in dists {
367 self.schedule_distribution_deploy(id);
368 }
369 for id in tenants {
370 self.schedule_distribution_tenant_deploy(id);
371 }
372 for id in groups {
373 self.schedule_connection_group_deploy(id);
374 }
375 for id in streaming {
376 self.schedule_streaming_distribution_deploy(id);
377 }
378 }
379
380 pub fn with_propagation_delay(mut self, delay: std::time::Duration) -> Self {
386 self.propagation_delay = delay;
387 self
388 }
389
390 pub fn set_distribution_status(&self, id: &str, status: &str) -> bool {
396 let mut state = self.state.write();
397 for account in state.accounts.values_mut() {
398 if let Some(dist) = account.distributions.get_mut(id) {
399 dist.status = status.to_string();
400 return true;
401 }
402 }
403 false
404 }
405
406 pub async fn set_distribution_status_persistent(&self, id: &str, status: &str) -> bool {
409 let changed = self.set_distribution_status(id, status);
410 if changed {
411 self.save_snapshot().await;
412 }
413 changed
414 }
415}
416
417pub async fn save_cloudfront_snapshot(
423 state: &SharedCloudFrontState,
424 store: Option<Arc<dyn SnapshotStore>>,
425 lock: &AsyncMutex<()>,
426) {
427 let Some(store) = store else {
428 return;
429 };
430 let _guard = lock.lock().await;
431 let snapshot = CloudFrontSnapshot {
432 schema_version: CLOUDFRONT_SNAPSHOT_SCHEMA_VERSION,
433 accounts: Some(state.read().clone()),
434 };
435 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
436 let bytes = serde_json::to_vec(&snapshot)
437 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
438 store.save(&bytes)
439 })
440 .await;
441 match join {
442 Ok(Ok(())) => {}
443 Ok(Err(err)) => tracing::error!(%err, "failed to write cloudfront snapshot"),
444 Err(err) => tracing::error!(%err, "cloudfront snapshot task panicked"),
445 }
446}
447
448impl Default for CloudFrontService {
449 fn default() -> Self {
450 Self::new(Arc::new(RwLock::new(CloudFrontAccounts::new())))
451 }
452}
453
454#[async_trait]
455impl AwsService for CloudFrontService {
456 fn service_name(&self) -> &str {
457 "cloudfront"
458 }
459
460 fn supported_actions(&self) -> &[&str] {
461 SUPPORTED_ACTIONS
462 }
463
464 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
465 let resolved = match route(&req.method, &req.raw_path, &req.raw_query) {
466 Some(r) => r,
467 None => {
468 return Err(aws_error(
469 StatusCode::NOT_FOUND,
470 "InvalidArgument",
471 format!("Unknown CloudFront route: {} {}", req.method, req.raw_path),
472 ));
473 }
474 };
475
476 let mutates = is_mutating_action(resolved.action);
477 let result = match resolved.action {
478 "CreateDistribution" => self.create_distribution(&req, false),
479 "CreateDistributionWithTags" => self.create_distribution(&req, true),
480 "GetDistribution" => self.get_distribution(&resolved),
481 "GetDistributionConfig" => self.get_distribution_config(&resolved),
482 "UpdateDistribution" => self.update_distribution(&req, &resolved),
483 "DeleteDistribution" => self.delete_distribution(&req, &resolved),
484 "ListDistributions" => self.list_distributions(&req),
485 "CopyDistribution" => self.copy_distribution(&req, &resolved),
486 "CreateInvalidation" => self.create_invalidation(&req, &resolved),
487 "GetInvalidation" => self.get_invalidation(&resolved),
488 "ListInvalidations" => self.list_invalidations(&resolved),
489 "TagResource" => self.tag_resource(&req),
490 "UntagResource" => self.untag_resource(&req),
491 "ListTagsForResource" => self.list_tags_for_resource(&req),
492 "AssociateAlias" => self.associate_alias(&req, &resolved),
493 "ListConflictingAliases" => self.list_conflicting_aliases(&req),
494 "AssociateDistributionWebACL" => self.associate_web_acl(&req, &resolved),
495 "DisassociateDistributionWebACL" => self.disassociate_web_acl(&req, &resolved),
496 "ListDistributionsByCachePolicyId"
497 | "ListDistributionsByOriginRequestPolicyId"
498 | "ListDistributionsByResponseHeadersPolicyId"
499 | "ListDistributionsByKeyGroup"
500 | "ListDistributionsByWebACLId"
501 | "ListDistributionsByVpcOriginId"
502 | "ListDistributionsByAnycastIpListId"
503 | "ListDistributionsByConnectionMode"
504 | "ListDistributionsByConnectionFunction"
505 | "ListDistributionsByOwnedResource"
506 | "ListDistributionsByTrustStore"
507 | "ListDistributionsByRealtimeLogConfig" => {
508 self.list_distributions_by(&req, &resolved, resolved.action)
509 }
510 "CreateOriginAccessControl" => self.create_origin_access_control(&req),
511 "GetOriginAccessControl" => self.get_origin_access_control(&resolved),
512 "GetOriginAccessControlConfig" => self.get_origin_access_control_config(&resolved),
513 "UpdateOriginAccessControl" => self.update_origin_access_control(&req, &resolved),
514 "DeleteOriginAccessControl" => self.delete_origin_access_control(&req, &resolved),
515 "ListOriginAccessControls" => self.list_origin_access_controls(&req),
516 "CreateCachePolicy" => self.create_cache_policy(&req),
517 "GetCachePolicy" => self.get_cache_policy(&resolved),
518 "GetCachePolicyConfig" => self.get_cache_policy_config(&resolved),
519 "UpdateCachePolicy" => self.update_cache_policy(&req, &resolved),
520 "DeleteCachePolicy" => self.delete_cache_policy(&req, &resolved),
521 "ListCachePolicies" => self.list_cache_policies(&req),
522 "CreateOriginRequestPolicy" => self.create_origin_request_policy(&req),
523 "GetOriginRequestPolicy" => self.get_origin_request_policy(&resolved),
524 "GetOriginRequestPolicyConfig" => self.get_origin_request_policy_config(&resolved),
525 "UpdateOriginRequestPolicy" => self.update_origin_request_policy(&req, &resolved),
526 "DeleteOriginRequestPolicy" => self.delete_origin_request_policy(&req, &resolved),
527 "ListOriginRequestPolicies" => self.list_origin_request_policies(&req),
528 "CreateResponseHeadersPolicy" => self.create_response_headers_policy(&req),
529 "GetResponseHeadersPolicy" => self.get_response_headers_policy(&resolved),
530 "GetResponseHeadersPolicyConfig" => self.get_response_headers_policy_config(&resolved),
531 "UpdateResponseHeadersPolicy" => self.update_response_headers_policy(&req, &resolved),
532 "DeleteResponseHeadersPolicy" => self.delete_response_headers_policy(&req, &resolved),
533 "ListResponseHeadersPolicies" => self.list_response_headers_policies(&req),
534 "CreateContinuousDeploymentPolicy" => self.create_continuous_deployment_policy(&req),
535 "GetContinuousDeploymentPolicy" => self.get_continuous_deployment_policy(&resolved),
536 "GetContinuousDeploymentPolicyConfig" => {
537 self.get_continuous_deployment_policy_config(&resolved)
538 }
539 "UpdateContinuousDeploymentPolicy" => {
540 self.update_continuous_deployment_policy(&req, &resolved)
541 }
542 "DeleteContinuousDeploymentPolicy" => {
543 self.delete_continuous_deployment_policy(&req, &resolved)
544 }
545 "ListContinuousDeploymentPolicies" => self.list_continuous_deployment_policies(&req),
546 "CreateFunction" => self.create_function(&req),
547 "DescribeFunction" => self.describe_function(&req, &resolved),
548 "GetFunction" => self.get_function(&req, &resolved),
549 "UpdateFunction" => self.update_function(&req, &resolved),
550 "DeleteFunction" => self.delete_function(&req, &resolved),
551 "ListFunctions" => self.list_functions(&req),
552 "PublishFunction" => self.publish_function(&req, &resolved),
553 "TestFunction" => self.test_function(&req, &resolved),
554 "CreatePublicKey" => self.create_public_key(&req),
555 "GetPublicKey" => self.get_public_key(&resolved),
556 "GetPublicKeyConfig" => self.get_public_key_config(&resolved),
557 "UpdatePublicKey" => self.update_public_key(&req, &resolved),
558 "DeletePublicKey" => self.delete_public_key(&req, &resolved),
559 "ListPublicKeys" => self.list_public_keys(&req),
560 "CreateKeyGroup" => self.create_key_group(&req),
561 "GetKeyGroup" => self.get_key_group(&resolved),
562 "GetKeyGroupConfig" => self.get_key_group_config(&resolved),
563 "UpdateKeyGroup" => self.update_key_group(&req, &resolved),
564 "DeleteKeyGroup" => self.delete_key_group(&req, &resolved),
565 "ListKeyGroups" => self.list_key_groups(&req),
566 "CreateKeyValueStore" => self.create_key_value_store(&req),
567 "DescribeKeyValueStore" => self.describe_key_value_store(&resolved),
568 "UpdateKeyValueStore" => self.update_key_value_store(&req, &resolved),
569 "DeleteKeyValueStore" => self.delete_key_value_store(&req, &resolved),
570 "ListKeyValueStores" => self.list_key_value_stores(&req),
571 "CreateCloudFrontOriginAccessIdentity" => self.create_oai(&req),
572 "GetCloudFrontOriginAccessIdentity" => self.get_oai(&resolved),
573 "GetCloudFrontOriginAccessIdentityConfig" => self.get_oai_config(&resolved),
574 "UpdateCloudFrontOriginAccessIdentity" => self.update_oai(&req, &resolved),
575 "DeleteCloudFrontOriginAccessIdentity" => self.delete_oai(&req, &resolved),
576 "ListCloudFrontOriginAccessIdentities" => self.list_oai(&req),
577 "CreateMonitoringSubscription" => self.create_monitoring_subscription(&req, &resolved),
578 "GetMonitoringSubscription" => self.get_monitoring_subscription(&resolved),
579 "DeleteMonitoringSubscription" => self.delete_monitoring_subscription(&resolved),
580 "CreateStreamingDistribution" => self.create_streaming_distribution(&req, false),
581 "CreateStreamingDistributionWithTags" => self.create_streaming_distribution(&req, true),
582 "GetStreamingDistribution" => self.get_streaming_distribution(&resolved),
583 "GetStreamingDistributionConfig" => self.get_streaming_distribution_config(&resolved),
584 "UpdateStreamingDistribution" => self.update_streaming_distribution(&req, &resolved),
585 "DeleteStreamingDistribution" => self.delete_streaming_distribution(&req, &resolved),
586 "ListStreamingDistributions" => self.list_streaming_distributions(&req),
587 "CreateFieldLevelEncryptionConfig" => self.create_field_level_encryption_config(&req),
588 "GetFieldLevelEncryption" => self.get_field_level_encryption(&resolved),
589 "GetFieldLevelEncryptionConfig" => self.get_field_level_encryption_config(&resolved),
590 "UpdateFieldLevelEncryptionConfig" => {
591 self.update_field_level_encryption_config(&req, &resolved)
592 }
593 "DeleteFieldLevelEncryptionConfig" => {
594 self.delete_field_level_encryption_config(&req, &resolved)
595 }
596 "ListFieldLevelEncryptionConfigs" => self.list_field_level_encryption_configs(&req),
597 "CreateFieldLevelEncryptionProfile" => self.create_field_level_encryption_profile(&req),
598 "GetFieldLevelEncryptionProfile" => self.get_field_level_encryption_profile(&resolved),
599 "GetFieldLevelEncryptionProfileConfig" => {
600 self.get_field_level_encryption_profile_config(&resolved)
601 }
602 "UpdateFieldLevelEncryptionProfile" => {
603 self.update_field_level_encryption_profile(&req, &resolved)
604 }
605 "DeleteFieldLevelEncryptionProfile" => {
606 self.delete_field_level_encryption_profile(&req, &resolved)
607 }
608 "ListFieldLevelEncryptionProfiles" => self.list_field_level_encryption_profiles(&req),
609 "CreateRealtimeLogConfig" => self.create_realtime_log_config(&req),
610 "GetRealtimeLogConfig" => self.get_realtime_log_config(&req),
611 "UpdateRealtimeLogConfig" => self.update_realtime_log_config(&req),
612 "DeleteRealtimeLogConfig" => self.delete_realtime_log_config(&req),
613 "ListRealtimeLogConfigs" => self.list_realtime_log_configs(&req),
614 "CreateVpcOrigin" => self.create_vpc_origin(&req),
615 "GetVpcOrigin" => self.get_vpc_origin(&resolved),
616 "UpdateVpcOrigin" => self.update_vpc_origin(&req, &resolved),
617 "DeleteVpcOrigin" => self.delete_vpc_origin(&req, &resolved),
618 "ListVpcOrigins" => self.list_vpc_origins(&req),
619 "CreateAnycastIpList" => self.create_anycast_ip_list(&req),
620 "GetAnycastIpList" => self.get_anycast_ip_list(&resolved),
621 "UpdateAnycastIpList" => self.update_anycast_ip_list(&req, &resolved),
622 "DeleteAnycastIpList" => self.delete_anycast_ip_list(&req, &resolved),
623 "ListAnycastIpLists" => self.list_anycast_ip_lists(&req),
624 "CreateTrustStore" => self.create_trust_store(&req),
625 "GetTrustStore" => self.get_trust_store(&resolved),
626 "UpdateTrustStore" => self.update_trust_store(&req, &resolved),
627 "DeleteTrustStore" => self.delete_trust_store(&req, &resolved),
628 "ListTrustStores" => self.list_trust_stores(&req),
629 "GetResourcePolicy" => self.get_resource_policy(&req),
630 "PutResourcePolicy" => self.put_resource_policy(&req),
631 "DeleteResourcePolicy" => self.delete_resource_policy(&req),
632 "CreateConnectionGroup" => self.create_connection_group(&req),
633 "GetConnectionGroup" => self.get_connection_group(&resolved),
634 "GetConnectionGroupByRoutingEndpoint" => {
635 self.get_connection_group_by_routing_endpoint(&req)
636 }
637 "UpdateConnectionGroup" => self.update_connection_group(&req, &resolved),
638 "DeleteConnectionGroup" => self.delete_connection_group(&req, &resolved),
639 "ListConnectionGroups" => self.list_connection_groups(&req),
640 "ListDomainConflicts" => self.list_domain_conflicts(&req),
641 "UpdateDomainAssociation" => self.update_domain_association(&req),
642 "VerifyDnsConfiguration" => self.verify_dns_configuration(&req),
643 "GetManagedCertificateDetails" => self.get_managed_certificate_details(&resolved),
644 "UpdateDistributionWithStagingConfig" => {
645 self.update_distribution_with_staging_config(&req, &resolved)
646 }
647 "CreateDistributionTenant" => self.create_distribution_tenant(&req),
648 "GetDistributionTenant" => self.get_distribution_tenant(&resolved),
649 "GetDistributionTenantByDomain" => self.get_distribution_tenant_by_domain(&req),
650 "UpdateDistributionTenant" => self.update_distribution_tenant(&req, &resolved),
651 "DeleteDistributionTenant" => self.delete_distribution_tenant(&req, &resolved),
652 "ListDistributionTenants" => self.list_distribution_tenants(&req),
653 "ListDistributionTenantsByCustomization" => {
654 self.list_distribution_tenants_by_customization(&req)
655 }
656 "AssociateDistributionTenantWebACL" => {
657 self.associate_distribution_tenant_web_acl(&req, &resolved)
658 }
659 "DisassociateDistributionTenantWebACL" => {
660 self.disassociate_distribution_tenant_web_acl(&req, &resolved)
661 }
662 "CreateInvalidationForDistributionTenant" => {
663 self.create_invalidation_for_distribution_tenant(&req, &resolved)
664 }
665 "GetInvalidationForDistributionTenant" => {
666 self.get_invalidation_for_distribution_tenant(&resolved)
667 }
668 "ListInvalidationsForDistributionTenant" => {
669 self.list_invalidations_for_distribution_tenant(&resolved)
670 }
671 "CreateConnectionFunction" => self.create_connection_function(&req),
672 "GetConnectionFunction" => self.get_connection_function(&resolved),
673 "DescribeConnectionFunction" => self.describe_connection_function(&resolved),
674 "UpdateConnectionFunction" => self.update_connection_function(&req, &resolved),
675 "DeleteConnectionFunction" => self.delete_connection_function(&req, &resolved),
676 "ListConnectionFunctions" => self.list_connection_functions(&req),
677 "PublishConnectionFunction" => self.publish_connection_function(&req, &resolved),
678 "TestConnectionFunction" => self.test_connection_function(&req, &resolved),
679 other => Err(aws_error(
680 StatusCode::NOT_IMPLEMENTED,
681 "InvalidAction",
682 format!("CloudFront action {other} is not implemented yet"),
683 )),
684 };
685 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
686 self.save_snapshot().await;
687 }
688 result
689 }
690}
691
692impl CloudFrontService {
695 fn create_distribution(
696 &self,
697 req: &AwsRequest,
698 with_tags: bool,
699 ) -> Result<AwsResponse, AwsServiceError> {
700 let (config, tags) = if with_tags {
701 let parsed: DistributionConfigWithTags = xml_io::from_xml_root(&req.body)
702 .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
703 let tags = parsed
704 .tags
705 .items
706 .map(|i| {
707 i.tag
708 .into_iter()
709 .map(|t| Tag {
710 key: t.key,
711 value: t.value,
712 })
713 .collect()
714 })
715 .unwrap_or_default();
716 (parsed.distribution_config, tags)
717 } else {
718 let parsed: DistributionConfig = xml_io::from_xml_root(&req.body)
719 .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
720 (parsed, Vec::new())
721 };
722
723 validate_caller_reference(&config.caller_reference)?;
724 validate_origins(&config)?;
725
726 let mut state = self.state.write();
727 let account = state.entry(account_id(req));
728
729 if let Some(existing) = account
730 .distributions
731 .values()
732 .find(|d| d.config.caller_reference == config.caller_reference)
733 {
734 return Err(aws_error(
735 StatusCode::CONFLICT,
736 "DistributionAlreadyExists",
737 format!(
738 "Distribution with the same CallerReference exists: {}",
739 existing.id
740 ),
741 ));
742 }
743
744 let id = generate_distribution_id();
745 let now = Utc::now();
746 let etag = generate_etag();
747 let domain = format!("{}.cloudfront.net", id.to_lowercase());
748 let arn = format!(
749 "arn:aws:cloudfront::{}:distribution/{}",
750 account_id(req),
751 id
752 );
753
754 let stored = StoredDistribution {
755 id: id.clone(),
756 arn: arn.clone(),
757 status: "InProgress".to_string(),
761 last_modified_time: now,
762 domain_name: domain,
763 in_progress_invalidation_batches: 0,
764 etag: etag.clone(),
765 config,
766 bound_port: None,
767 };
768 account.distributions.insert(id.clone(), stored.clone());
769 if !tags.is_empty() {
770 account.tags.insert(arn.clone(), tags);
771 }
772 drop(state);
773
774 self.schedule_distribution_deploy(id.clone());
775
776 let body = build_distribution_xml(&stored);
777 let mut headers = HeaderMap::new();
778 set_header(&mut headers, ETAG, &etag);
779 set_header(&mut headers, LOCATION, &stored.arn);
780 Ok(xml_response(StatusCode::CREATED, body, headers))
781 }
782
783 fn schedule_distribution_deploy(&self, id: String) {
788 let state = Arc::clone(&self.state);
789 let delay = self.propagation_delay;
790 let store = self.snapshot_store.clone();
791 let lock = self.snapshot_lock.clone();
792 tokio::spawn(async move {
793 tokio::time::sleep(delay).await;
794 let flipped = {
795 let mut s = state.write();
796 let mut flipped = false;
797 for account in s.accounts.values_mut() {
798 if let Some(d) = account.distributions.get_mut(&id) {
799 if d.status == "InProgress" {
800 d.status = "Deployed".to_string();
801 flipped = true;
802 }
803 break;
804 }
805 }
806 flipped
807 };
808 if flipped {
809 save_cloudfront_snapshot(&state, store, &lock).await;
810 }
811 });
812 }
813
814 pub(crate) fn schedule_distribution_tenant_deploy(&self, id: String) {
816 let state = Arc::clone(&self.state);
817 let delay = self.propagation_delay;
818 let store = self.snapshot_store.clone();
819 let lock = self.snapshot_lock.clone();
820 tokio::spawn(async move {
821 tokio::time::sleep(delay).await;
822 let flipped = {
823 let mut s = state.write();
824 let mut flipped = false;
825 for account in s.accounts.values_mut() {
826 if let Some(t) = account.distribution_tenants.get_mut(&id) {
827 if t.status == "InProgress" {
828 t.status = "Deployed".to_string();
829 flipped = true;
830 }
831 break;
832 }
833 }
834 flipped
835 };
836 if flipped {
837 save_cloudfront_snapshot(&state, store, &lock).await;
838 }
839 });
840 }
841
842 pub(crate) fn schedule_connection_group_deploy(&self, id: String) {
844 let state = Arc::clone(&self.state);
845 let delay = self.propagation_delay;
846 let store = self.snapshot_store.clone();
847 let lock = self.snapshot_lock.clone();
848 tokio::spawn(async move {
849 tokio::time::sleep(delay).await;
850 let flipped = {
851 let mut s = state.write();
852 let mut flipped = false;
853 for account in s.accounts.values_mut() {
854 if let Some(g) = account.connection_groups.get_mut(&id) {
855 if g.status == "InProgress" {
856 g.status = "Deployed".to_string();
857 flipped = true;
858 }
859 break;
860 }
861 }
862 flipped
863 };
864 if flipped {
865 save_cloudfront_snapshot(&state, store, &lock).await;
866 }
867 });
868 }
869
870 pub(crate) fn schedule_streaming_distribution_deploy(&self, id: String) {
875 let state = Arc::clone(&self.state);
876 let delay = self.propagation_delay;
877 let store = self.snapshot_store.clone();
878 let lock = self.snapshot_lock.clone();
879 tokio::spawn(async move {
880 tokio::time::sleep(delay).await;
881 let flipped = {
882 let mut s = state.write();
883 let mut flipped = false;
884 for account in s.accounts.values_mut() {
885 if let Some(d) = account.streaming_distributions.get_mut(&id) {
886 if d.status == "InProgress" {
887 d.status = "Deployed".to_string();
888 flipped = true;
889 }
890 break;
891 }
892 }
893 flipped
894 };
895 if flipped {
896 save_cloudfront_snapshot(&state, store, &lock).await;
897 }
898 });
899 }
900
901 fn get_distribution(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
902 let id = route
903 .id
904 .as_deref()
905 .ok_or_else(|| invalid_argument("missing distribution id"))?;
906 let state = self.state.read();
907 let account = state
908 .accounts
909 .get(DEFAULT_ACCOUNT)
910 .ok_or_else(|| no_such_distribution(id))?;
911 let dist = account
912 .distributions
913 .get(id)
914 .ok_or_else(|| no_such_distribution(id))?
915 .clone();
916 drop(state);
917 let body = build_distribution_xml(&dist);
918 let mut headers = HeaderMap::new();
919 set_header(&mut headers, ETAG, &dist.etag);
920 Ok(xml_response(StatusCode::OK, body, headers))
921 }
922
923 fn get_distribution_config(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
924 let id = route
925 .id
926 .as_deref()
927 .ok_or_else(|| invalid_argument("missing distribution id"))?;
928 let state = self.state.read();
929 let account = state
930 .accounts
931 .get(DEFAULT_ACCOUNT)
932 .ok_or_else(|| no_such_distribution(id))?;
933 let dist = account
934 .distributions
935 .get(id)
936 .ok_or_else(|| no_such_distribution(id))?
937 .clone();
938 drop(state);
939 let body = xml_io::to_xml_root("DistributionConfig", &dist.config)
940 .map_err(|e| internal_error(format!("xml encode failed: {e}")))?;
941 let mut headers = HeaderMap::new();
942 set_header(&mut headers, ETAG, &dist.etag);
943 Ok(xml_response(StatusCode::OK, body, headers))
944 }
945
946 fn update_distribution(
947 &self,
948 req: &AwsRequest,
949 route: &Route,
950 ) -> Result<AwsResponse, AwsServiceError> {
951 let id = route
952 .id
953 .as_deref()
954 .ok_or_else(|| invalid_argument("missing distribution id"))?;
955 let if_match = req
956 .headers
957 .get(IF_MATCH)
958 .and_then(|v| v.to_str().ok())
959 .ok_or_else(|| {
960 aws_error(
961 StatusCode::BAD_REQUEST,
962 "InvalidIfMatchVersion",
963 "Missing If-Match header for UpdateDistribution",
964 )
965 })?
966 .to_string();
967 let new_config: DistributionConfig = xml_io::from_xml_root(&req.body)
968 .map_err(|e| invalid_argument(format!("invalid DistributionConfig XML: {e}")))?;
969 validate_caller_reference(&new_config.caller_reference)?;
970 validate_origins(&new_config)?;
971
972 let mut state = self.state.write();
973 let account = state
974 .accounts
975 .get_mut(DEFAULT_ACCOUNT)
976 .ok_or_else(|| no_such_distribution(id))?;
977 let dist = account
978 .distributions
979 .get_mut(id)
980 .ok_or_else(|| no_such_distribution(id))?;
981 if dist.etag != if_match {
982 return Err(aws_error(
983 StatusCode::PRECONDITION_FAILED,
984 "PreconditionFailed",
985 "If-Match header does not match the current ETag",
986 ));
987 }
988 let config_changed = !configs_equal(&dist.config, &new_config);
993 if config_changed {
994 dist.config = new_config;
995 dist.etag = generate_etag();
996 dist.last_modified_time = Utc::now();
997 dist.status = "InProgress".to_string();
1001 }
1002 let snapshot = dist.clone();
1003 drop(state);
1004
1005 if config_changed {
1006 self.schedule_distribution_deploy(id.to_string());
1007 }
1008
1009 let body = build_distribution_xml(&snapshot);
1010 let mut headers = HeaderMap::new();
1011 set_header(&mut headers, ETAG, &snapshot.etag);
1012 Ok(xml_response(StatusCode::OK, body, headers))
1013 }
1014
1015 fn delete_distribution(
1016 &self,
1017 req: &AwsRequest,
1018 route: &Route,
1019 ) -> Result<AwsResponse, AwsServiceError> {
1020 let id = route
1021 .id
1022 .as_deref()
1023 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1024 let if_match = req
1025 .headers
1026 .get(IF_MATCH)
1027 .and_then(|v| v.to_str().ok())
1028 .ok_or_else(|| {
1029 aws_error(
1030 StatusCode::BAD_REQUEST,
1031 "InvalidIfMatchVersion",
1032 "Missing If-Match header for DeleteDistribution",
1033 )
1034 })?
1035 .to_string();
1036 let mut state = self.state.write();
1037 let account = state
1038 .accounts
1039 .get_mut(DEFAULT_ACCOUNT)
1040 .ok_or_else(|| no_such_distribution(id))?;
1041 {
1042 let dist = account
1043 .distributions
1044 .get(id)
1045 .ok_or_else(|| no_such_distribution(id))?;
1046 if dist.etag != if_match {
1047 return Err(aws_error(
1048 StatusCode::PRECONDITION_FAILED,
1049 "PreconditionFailed",
1050 "If-Match header does not match the current ETag",
1051 ));
1052 }
1053 if dist.config.enabled {
1054 return Err(aws_error(
1055 StatusCode::PRECONDITION_FAILED,
1056 "DistributionNotDisabled",
1057 "Distribution must be disabled before delete",
1058 ));
1059 }
1060 }
1061 let removed = account.distributions.remove(id).unwrap();
1062 account.tags.remove(&removed.arn);
1063 Ok(empty_response(StatusCode::NO_CONTENT))
1064 }
1065
1066 fn list_distributions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1067 let state = self.state.read();
1068 let mut dists: Vec<StoredDistribution> = state
1069 .accounts
1070 .values()
1071 .flat_map(|a| a.distributions.values().cloned())
1072 .collect();
1073 drop(state);
1074 dists.sort_by_key(|a| a.last_modified_time);
1075
1076 let (marker, max_items, page, next_marker) = paginate_distributions(req, dists);
1079
1080 let body = build_distribution_list_xml(
1081 &page,
1082 "DistributionList",
1083 &marker,
1084 max_items,
1085 next_marker.as_deref(),
1086 );
1087 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1088 }
1089
1090 fn list_distributions_by(
1091 &self,
1092 req: &AwsRequest,
1093 route: &Route,
1094 action: &str,
1095 ) -> Result<AwsResponse, AwsServiceError> {
1096 match action {
1103 "ListDistributionsByCachePolicyId"
1107 | "ListDistributionsByOriginRequestPolicyId"
1108 | "ListDistributionsByResponseHeadersPolicyId"
1109 | "ListDistributionsByKeyGroup"
1110 | "ListDistributionsByWebACLId"
1111 | "ListDistributionsByVpcOriginId"
1112 | "ListDistributionsByAnycastIpListId"
1113 | "ListDistributionsByOwnedResource" => {
1114 let id = route.id.as_deref().unwrap_or("");
1115 if is_placeholder_label(id) {
1116 return Err(invalid_argument(format!(
1117 "Required URL identifier for {action} is missing or invalid"
1118 )));
1119 }
1120 }
1121 "ListDistributionsByConnectionMode" => {
1122 let id = route.id.as_deref().unwrap_or("");
1123 if is_placeholder_label(id) {
1124 return Err(invalid_argument(
1125 "ConnectionMode is required for ListDistributionsByConnectionMode",
1126 ));
1127 }
1128 if id != "direct" && id != "tenant-only" {
1129 return Err(invalid_argument(format!(
1130 "ConnectionMode must be 'direct' or 'tenant-only', got '{id}'"
1131 )));
1132 }
1133 }
1134 "ListDistributionsByConnectionFunction"
1135 if parse_query_value(&req.raw_query, "ConnectionFunctionIdentifier").is_none() =>
1136 {
1137 return Err(invalid_argument(
1138 "ConnectionFunctionIdentifier query parameter is required",
1139 ));
1140 }
1141 "ListDistributionsByTrustStore"
1142 if parse_query_value(&req.raw_query, "TrustStoreIdentifier").is_none() =>
1143 {
1144 return Err(invalid_argument(
1145 "TrustStoreIdentifier query parameter is required",
1146 ));
1147 }
1148 _ => {}
1149 }
1150
1151 let root = match action {
1157 "ListDistributionsByCachePolicyId"
1158 | "ListDistributionsByOriginRequestPolicyId"
1159 | "ListDistributionsByResponseHeadersPolicyId"
1160 | "ListDistributionsByKeyGroup"
1161 | "ListDistributionsByVpcOriginId" => "DistributionIdList",
1162 "ListDistributionsByOwnedResource" => "DistributionIdOwnerList",
1163 _ => "DistributionList",
1164 };
1165
1166 let mut all: Vec<StoredDistribution> = {
1168 let state = self.state.read();
1169 state
1170 .accounts
1171 .values()
1172 .flat_map(|a| a.distributions.values().cloned())
1173 .collect()
1174 };
1175 all.sort_by_key(|d| d.last_modified_time);
1176
1177 let path_id = route.id.as_deref().unwrap_or("");
1178 let matched: Vec<StoredDistribution> = match action {
1179 "ListDistributionsByCachePolicyId" => all
1180 .into_iter()
1181 .filter(|d| distribution_uses_cache_policy(d, path_id))
1182 .collect(),
1183 "ListDistributionsByOriginRequestPolicyId" => all
1184 .into_iter()
1185 .filter(|d| distribution_uses_origin_request_policy(d, path_id))
1186 .collect(),
1187 "ListDistributionsByResponseHeadersPolicyId" => all
1188 .into_iter()
1189 .filter(|d| distribution_uses_response_headers_policy(d, path_id))
1190 .collect(),
1191 "ListDistributionsByKeyGroup" => all
1192 .into_iter()
1193 .filter(|d| distribution_uses_key_group(d, path_id))
1194 .collect(),
1195 "ListDistributionsByVpcOriginId" => all
1196 .into_iter()
1197 .filter(|d| distribution_uses_vpc_origin(d, path_id))
1198 .collect(),
1199 "ListDistributionsByWebACLId" if path_id == "null" => all
1202 .into_iter()
1203 .filter(|d| {
1204 d.config
1205 .web_acl_id
1206 .as_deref()
1207 .map(|w| w.is_empty())
1208 .unwrap_or(true)
1209 })
1210 .collect(),
1211 "ListDistributionsByWebACLId" => all
1212 .into_iter()
1213 .filter(|d| d.config.web_acl_id.as_deref() == Some(path_id))
1214 .collect(),
1215 "ListDistributionsByAnycastIpListId" => all
1216 .into_iter()
1217 .filter(|d| d.config.anycast_ip_list_id.as_deref() == Some(path_id))
1218 .collect(),
1219 _ => Vec::new(),
1220 };
1221
1222 let (marker, max_items, page, next_marker) = paginate_distributions(req, matched);
1225
1226 let body = if root == "DistributionIdList" {
1227 let ids: Vec<&str> = page.iter().map(|d| d.id.as_str()).collect();
1228 build_distribution_id_list_xml(&ids, &marker, max_items, next_marker.as_deref())
1229 } else if root == "DistributionList"
1230 && matches!(
1231 action,
1232 "ListDistributionsByWebACLId" | "ListDistributionsByAnycastIpListId"
1233 )
1234 {
1235 build_distribution_list_xml(
1236 &page,
1237 "DistributionList",
1238 &marker,
1239 max_items,
1240 next_marker.as_deref(),
1241 )
1242 } else {
1243 build_empty_distribution_id_list(root)
1244 };
1245 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1246 }
1247
1248 fn copy_distribution(
1249 &self,
1250 req: &AwsRequest,
1251 route: &Route,
1252 ) -> Result<AwsResponse, AwsServiceError> {
1253 let primary_id = route
1254 .id
1255 .as_deref()
1256 .ok_or_else(|| invalid_argument("missing primary distribution id"))?;
1257 let if_match = req
1258 .headers
1259 .get(IF_MATCH)
1260 .and_then(|v| v.to_str().ok())
1261 .ok_or_else(|| {
1262 aws_error(
1263 StatusCode::BAD_REQUEST,
1264 "InvalidIfMatchVersion",
1265 "Missing If-Match header for CopyDistribution",
1266 )
1267 })?
1268 .to_string();
1269 let parsed: CopyDistributionRequest = xml_io::from_xml_root(&req.body)
1270 .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
1271 validate_caller_reference(&parsed.caller_reference)?;
1272 let mut state = self.state.write();
1273 let account = state
1274 .accounts
1275 .get_mut(DEFAULT_ACCOUNT)
1276 .ok_or_else(|| no_such_distribution(primary_id))?;
1277 let primary = account
1278 .distributions
1279 .get(primary_id)
1280 .ok_or_else(|| no_such_distribution(primary_id))?
1281 .clone();
1282 if primary.etag != if_match {
1283 return Err(aws_error(
1284 StatusCode::PRECONDITION_FAILED,
1285 "PreconditionFailed",
1286 "If-Match header does not match the current ETag",
1287 ));
1288 }
1289 if account
1290 .distributions
1291 .values()
1292 .any(|d| d.config.caller_reference == parsed.caller_reference)
1293 {
1294 return Err(aws_error(
1295 StatusCode::CONFLICT,
1296 "DistributionAlreadyExists",
1297 "Distribution with the same CallerReference exists",
1298 ));
1299 }
1300 let new_id = generate_distribution_id();
1301 let mut config = primary.config.clone();
1302 config.caller_reference = parsed.caller_reference;
1303 config.enabled = parsed.enabled.unwrap_or(false);
1304 config.staging = parsed.staging;
1305 let now = Utc::now();
1306 let etag = generate_etag();
1307 let arn = format!(
1308 "arn:aws:cloudfront::{}:distribution/{}",
1309 account_id(req),
1310 new_id
1311 );
1312 let stored = StoredDistribution {
1313 id: new_id.clone(),
1314 arn: arn.clone(),
1315 status: "InProgress".to_string(),
1316 last_modified_time: now,
1317 domain_name: format!("{}.cloudfront.net", new_id.to_lowercase()),
1318 in_progress_invalidation_batches: 0,
1319 etag: etag.clone(),
1320 config,
1321 bound_port: None,
1322 };
1323 account.distributions.insert(new_id.clone(), stored.clone());
1324 drop(state);
1325 self.schedule_distribution_deploy(new_id);
1326 let body = build_distribution_xml(&stored);
1327 let mut headers = HeaderMap::new();
1328 set_header(&mut headers, ETAG, &etag);
1329 set_header(&mut headers, LOCATION, &stored.arn);
1330 Ok(xml_response(StatusCode::CREATED, body, headers))
1331 }
1332}
1333
1334#[derive(Debug, serde::Deserialize, Default)]
1335#[serde(rename_all = "PascalCase")]
1336struct CopyDistributionRequest {
1337 caller_reference: String,
1338 #[serde(default)]
1339 enabled: Option<bool>,
1340 #[serde(default)]
1341 staging: Option<bool>,
1342}
1343
1344impl CloudFrontService {
1347 fn create_invalidation(
1348 &self,
1349 req: &AwsRequest,
1350 route: &Route,
1351 ) -> Result<AwsResponse, AwsServiceError> {
1352 let dist_id = route
1353 .id
1354 .as_deref()
1355 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1356 let batch: InvalidationBatch = xml_io::from_xml_root(&req.body)
1357 .map_err(|e| invalid_argument(format!("invalid InvalidationBatch XML: {e}")))?;
1358 if batch.caller_reference.is_empty() {
1359 return Err(invalid_argument("CallerReference is required"));
1360 }
1361 if batch.paths.quantity < 1 {
1362 return Err(invalid_argument(
1363 "InvalidationBatch.Paths must be non-empty",
1364 ));
1365 }
1366 let mut state = self.state.write();
1367 let account = state.entry(DEFAULT_ACCOUNT);
1368 if !account.distributions.contains_key(dist_id) {
1369 return Err(no_such_distribution(dist_id));
1370 }
1371 let id = generate_invalidation_id();
1372 let stored = StoredInvalidation {
1373 id: id.clone(),
1374 distribution_id: dist_id.to_string(),
1375 status: "Completed".to_string(),
1376 create_time: Utc::now(),
1377 batch: batch.clone(),
1378 };
1379 account.invalidations.insert(id.clone(), stored.clone());
1380 drop(state);
1381 let body = build_invalidation_xml(&stored);
1382 let mut headers = HeaderMap::new();
1383 set_header(
1384 &mut headers,
1385 LOCATION,
1386 &format!(
1387 "/2020-05-31/distribution/{dist_id}/invalidation/{}",
1388 stored.id
1389 ),
1390 );
1391 Ok(xml_response(StatusCode::CREATED, body, headers))
1392 }
1393
1394 fn get_invalidation(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
1395 let dist_id = route
1396 .id
1397 .as_deref()
1398 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1399 let inv_id = route
1400 .second_id
1401 .as_deref()
1402 .ok_or_else(|| invalid_argument("missing invalidation id"))?;
1403 let state = self.state.read();
1404 let account = state
1405 .accounts
1406 .get(DEFAULT_ACCOUNT)
1407 .ok_or_else(|| no_such_invalidation(inv_id))?;
1408 if !account.distributions.contains_key(dist_id) {
1409 return Err(no_such_distribution(dist_id));
1410 }
1411 let inv = account
1412 .invalidations
1413 .get(inv_id)
1414 .filter(|i| i.distribution_id == dist_id)
1415 .ok_or_else(|| no_such_invalidation(inv_id))?
1416 .clone();
1417 drop(state);
1418 let body = build_invalidation_xml(&inv);
1419 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1420 }
1421
1422 fn list_invalidations(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
1423 let dist_id = route
1424 .id
1425 .as_deref()
1426 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1427 let state = self.state.read();
1428 let account = state
1429 .accounts
1430 .get(DEFAULT_ACCOUNT)
1431 .ok_or_else(|| no_such_distribution(dist_id))?;
1432 if !account.distributions.contains_key(dist_id) {
1433 return Err(no_such_distribution(dist_id));
1434 }
1435 let mut items: Vec<&StoredInvalidation> = account
1436 .invalidations
1437 .values()
1438 .filter(|i| i.distribution_id == dist_id)
1439 .collect();
1440 items.sort_by_key(|a| a.create_time);
1441 let body = build_invalidation_list_xml(&items);
1442 drop(state);
1443 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1444 }
1445}
1446
1447impl CloudFrontService {
1450 fn parse_arn_query(query: &str) -> Option<String> {
1451 for pair in query.split('&').filter(|p| !p.is_empty()) {
1452 if let Some(rest) = pair.strip_prefix("Resource=") {
1453 return Some(percent_decode(rest));
1454 }
1455 }
1456 None
1457 }
1458
1459 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1460 let arn = Self::parse_arn_query(&req.raw_query)
1461 .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1462 let parsed: ModelTags = xml_io::from_xml_root(&req.body)
1463 .map_err(|e| invalid_argument(format!("invalid Tags XML: {e}")))?;
1464 let new_tags: Vec<Tag> = parsed
1465 .items
1466 .map(|i| {
1467 i.tag
1468 .into_iter()
1469 .map(|t| Tag {
1470 key: t.key,
1471 value: t.value,
1472 })
1473 .collect()
1474 })
1475 .unwrap_or_default();
1476 let mut state = self.state.write();
1477 let account = state.entry(DEFAULT_ACCOUNT);
1478 let entry = account.tags.entry(arn).or_default();
1479 for tag in new_tags {
1480 if let Some(existing) = entry.iter_mut().find(|t| t.key == tag.key) {
1481 existing.value = tag.value;
1482 } else {
1483 entry.push(tag);
1484 }
1485 }
1486 Ok(empty_response(StatusCode::NO_CONTENT))
1487 }
1488
1489 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1490 let arn = Self::parse_arn_query(&req.raw_query)
1491 .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1492 let parsed: TagKeys = xml_io::from_xml_root(&req.body)
1493 .map_err(|e| invalid_argument(format!("invalid TagKeys XML: {e}")))?;
1494 let keys: Vec<String> = parsed.items.map(|k| k.key).unwrap_or_default();
1495 let mut state = self.state.write();
1496 let account = state.entry(DEFAULT_ACCOUNT);
1497 if let Some(existing) = account.tags.get_mut(&arn) {
1498 existing.retain(|t| !keys.contains(&t.key));
1499 }
1500 Ok(empty_response(StatusCode::NO_CONTENT))
1501 }
1502
1503 fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1504 let arn = Self::parse_arn_query(&req.raw_query)
1505 .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1506 let state = self.state.read();
1507 let tags = state
1508 .accounts
1509 .get(DEFAULT_ACCOUNT)
1510 .and_then(|a| a.tags.get(&arn))
1511 .cloned()
1512 .unwrap_or_default();
1513 drop(state);
1514 let body = build_tags_xml(&tags);
1515 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1516 }
1517}
1518
1519impl CloudFrontService {
1522 fn associate_alias(
1523 &self,
1524 req: &AwsRequest,
1525 route: &Route,
1526 ) -> Result<AwsResponse, AwsServiceError> {
1527 let id = route
1528 .id
1529 .as_deref()
1530 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1531 let alias = parse_query_value(&req.raw_query, "Alias")
1532 .ok_or_else(|| invalid_argument("Alias query parameter is required"))?;
1533 let mut state = self.state.write();
1534 let account = state
1535 .accounts
1536 .get_mut(DEFAULT_ACCOUNT)
1537 .ok_or_else(|| no_such_distribution(id))?;
1538 if let Some(other) = account.distributions.values().find(|d| {
1540 d.id != id
1541 && d.config
1542 .aliases
1543 .as_ref()
1544 .and_then(|a| a.items.as_ref())
1545 .is_some_and(|i| i.cname.iter().any(|c| c == &alias))
1546 }) {
1547 return Err(aws_error(
1548 StatusCode::CONFLICT,
1549 "CNAMEAlreadyExists",
1550 format!(
1551 "Alias {alias} is already associated with distribution {}",
1552 other.id
1553 ),
1554 ));
1555 }
1556 let dist = account
1557 .distributions
1558 .get_mut(id)
1559 .ok_or_else(|| no_such_distribution(id))?;
1560 let aliases = dist.config.aliases.get_or_insert_with(Default::default);
1561 let items = aliases
1562 .items
1563 .get_or_insert_with(crate::model::AliasItems::default);
1564 if !items.cname.iter().any(|c| c == &alias) {
1565 items.cname.push(alias.clone());
1566 aliases.quantity = items.cname.len() as i32;
1567 }
1568 dist.etag = generate_etag();
1569 dist.last_modified_time = Utc::now();
1570 Ok(empty_response(StatusCode::OK))
1571 }
1572
1573 fn list_conflicting_aliases(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1574 let alias = parse_query_value(&req.raw_query, "Alias")
1575 .ok_or_else(|| invalid_argument("Alias query parameter is required"))?;
1576 let dist_id = parse_query_value(&req.raw_query, "DistributionId")
1577 .ok_or_else(|| invalid_argument("DistributionId query parameter is required"))?;
1578 if alias.len() > 253 {
1583 return Err(invalid_argument(format!(
1584 "Alias length {} exceeds maximum 253",
1585 alias.len()
1586 )));
1587 }
1588 if dist_id.len() > 25 {
1589 return Err(invalid_argument(format!(
1590 "DistributionId length {} exceeds maximum 25",
1591 dist_id.len()
1592 )));
1593 }
1594 if let Some(max_items) = parse_query_value(&req.raw_query, "MaxItems") {
1595 let n: i64 = max_items.parse().map_err(|_| {
1596 invalid_argument(format!("MaxItems must be an integer, got '{max_items}'"))
1597 })?;
1598 if n > 100 {
1599 return Err(invalid_argument(format!(
1600 "MaxItems {n} exceeds maximum 100"
1601 )));
1602 }
1603 }
1604 let body = format!(
1607 "{XML_DECL}<ConflictingAliasesList xmlns=\"{NS}\"><Quantity>0</Quantity></ConflictingAliasesList>",
1608 NS = crate::NAMESPACE,
1609 XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1610 );
1611 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1612 }
1613
1614 fn associate_web_acl(
1615 &self,
1616 req: &AwsRequest,
1617 route: &Route,
1618 ) -> Result<AwsResponse, AwsServiceError> {
1619 let id = route
1620 .id
1621 .as_deref()
1622 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1623 let parsed: AssociateAliasRequest = xml_io::from_xml_root(&req.body)
1624 .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
1625 let mut state = self.state.write();
1626 let account = state
1627 .accounts
1628 .get_mut(DEFAULT_ACCOUNT)
1629 .ok_or_else(|| no_such_distribution(id))?;
1630 let dist = account
1631 .distributions
1632 .get_mut(id)
1633 .ok_or_else(|| no_such_distribution(id))?;
1634 dist.config.web_acl_id = Some(parsed.web_acl_arn.clone());
1635 dist.etag = generate_etag();
1636 dist.last_modified_time = Utc::now();
1637 let body = format!(
1638 "{XML_DECL}<AssociateDistributionWebACLResult xmlns=\"{NS}\"><Id>{}</Id><WebACLArn>{}</WebACLArn></AssociateDistributionWebACLResult>",
1639 esc(id), esc(&parsed.web_acl_arn),
1640 NS = crate::NAMESPACE,
1641 XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1642 );
1643 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1644 }
1645
1646 fn disassociate_web_acl(
1647 &self,
1648 _req: &AwsRequest,
1649 route: &Route,
1650 ) -> Result<AwsResponse, AwsServiceError> {
1651 let id = route
1652 .id
1653 .as_deref()
1654 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1655 let entity_not_found = || {
1658 aws_error(
1659 StatusCode::NOT_FOUND,
1660 "EntityNotFound",
1661 format!("The specified distribution does not exist: {id}"),
1662 )
1663 };
1664 let mut state = self.state.write();
1665 let account = state
1666 .accounts
1667 .get_mut(DEFAULT_ACCOUNT)
1668 .ok_or_else(entity_not_found)?;
1669 let dist = account
1670 .distributions
1671 .get_mut(id)
1672 .ok_or_else(entity_not_found)?;
1673 dist.config.web_acl_id = None;
1674 dist.etag = generate_etag();
1675 dist.last_modified_time = Utc::now();
1676 let body = format!(
1677 "{XML_DECL}<DisassociateDistributionWebACLResult xmlns=\"{NS}\"><Id>{}</Id></DisassociateDistributionWebACLResult>",
1678 esc(id),
1679 NS = crate::NAMESPACE,
1680 XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1681 );
1682 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1683 }
1684}
1685
1686#[derive(serde::Deserialize, Default, Debug)]
1687#[serde(rename_all = "PascalCase")]
1688struct AssociateAliasRequest {
1689 #[serde(rename = "WebACLArn", default)]
1690 web_acl_arn: String,
1691}
1692
1693pub(crate) fn esc(s: &str) -> String {
1701 let mut out = String::with_capacity(s.len());
1702 for c in s.chars() {
1703 match c {
1704 '&' => out.push_str("&"),
1705 '<' => out.push_str("<"),
1706 '>' => out.push_str(">"),
1707 '"' => out.push_str("""),
1708 '\'' => out.push_str("'"),
1709 _ => out.push(c),
1710 }
1711 }
1712 out
1713}
1714
1715pub(crate) fn build_distribution_xml(dist: &StoredDistribution) -> String {
1716 let mut out = String::with_capacity(2048);
1717 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1718 out.push_str(&format!(
1719 "<Distribution xmlns=\"{ns}\">",
1720 ns = crate::NAMESPACE
1721 ));
1722 out.push_str(&format!("<Id>{}</Id>", esc(&dist.id)));
1723 out.push_str(&format!("<ARN>{}</ARN>", esc(&dist.arn)));
1724 out.push_str(&format!("<Status>{}</Status>", esc(&dist.status)));
1725 out.push_str(&format!(
1726 "<LastModifiedTime>{}</LastModifiedTime>",
1727 rfc3339(&dist.last_modified_time)
1728 ));
1729 out.push_str(&format!(
1730 "<InProgressInvalidationBatches>{}</InProgressInvalidationBatches>",
1731 dist.in_progress_invalidation_batches
1732 ));
1733 out.push_str(&format!(
1734 "<DomainName>{}</DomainName>",
1735 esc(&dist.domain_name)
1736 ));
1737 out.push_str("<ActiveTrustedSigners><Enabled>false</Enabled><Quantity>0</Quantity></ActiveTrustedSigners>");
1738 out.push_str("<ActiveTrustedKeyGroups><Enabled>false</Enabled><Quantity>0</Quantity></ActiveTrustedKeyGroups>");
1739 let inner = quick_xml::se::to_string_with_root("DistributionConfig", &dist.config)
1740 .unwrap_or_else(|_| String::new());
1741 out.push_str(&inner);
1742 out.push_str("</Distribution>");
1743 out
1744}
1745
1746fn build_distribution_list_xml(
1747 dists: &[StoredDistribution],
1748 root: &str,
1749 marker: &str,
1750 max_items: usize,
1751 next_marker: Option<&str>,
1752) -> String {
1753 let mut out = String::with_capacity(2048);
1754 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1755 out.push_str(&format!("<{root} xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1756 out.push_str(&format!("<Marker>{}</Marker>", esc(marker)));
1757 if let Some(nm) = next_marker {
1758 out.push_str(&format!("<NextMarker>{}</NextMarker>", esc(nm)));
1759 }
1760 out.push_str(&format!("<MaxItems>{max_items}</MaxItems>"));
1761 out.push_str(&format!(
1762 "<IsTruncated>{}</IsTruncated>",
1763 next_marker.is_some()
1764 ));
1765 out.push_str(&format!("<Quantity>{}</Quantity>", dists.len()));
1766 if dists.is_empty() {
1767 out.push_str(&format!("</{root}>"));
1768 return out;
1769 }
1770 out.push_str("<Items>");
1771 for d in dists {
1772 out.push_str("<DistributionSummary>");
1773 out.push_str(&format!("<Id>{}</Id>", esc(&d.id)));
1774 out.push_str(&format!("<ARN>{}</ARN>", esc(&d.arn)));
1775 out.push_str(&format!("<Status>{}</Status>", esc(&d.status)));
1776 out.push_str(&format!(
1777 "<LastModifiedTime>{}</LastModifiedTime>",
1778 rfc3339(&d.last_modified_time)
1779 ));
1780 out.push_str(&format!("<DomainName>{}</DomainName>", esc(&d.domain_name)));
1781 let aliases = d.config.aliases.clone().unwrap_or_default();
1782 out.push_str(&render_inline("Aliases", &aliases));
1783 let origins = d.config.origins.clone();
1784 out.push_str(&render_inline("Origins", &origins));
1785 let dcb = d.config.default_cache_behavior.clone();
1786 out.push_str(&render_inline("DefaultCacheBehavior", &dcb));
1787 let cb = d.config.cache_behaviors.clone().unwrap_or_default();
1788 out.push_str(&render_inline("CacheBehaviors", &cb));
1789 let cer = d.config.custom_error_responses.clone().unwrap_or_default();
1790 out.push_str(&render_inline("CustomErrorResponses", &cer));
1791 out.push_str(&format!("<Comment>{}</Comment>", esc(&d.config.comment)));
1792 out.push_str(&format!(
1793 "<PriceClass>{}</PriceClass>",
1794 esc(&d
1795 .config
1796 .price_class
1797 .clone()
1798 .unwrap_or_else(|| "PriceClass_All".to_string()))
1799 ));
1800 out.push_str(&format!("<Enabled>{}</Enabled>", d.config.enabled));
1801 out.push_str(&render_inline(
1802 "ViewerCertificate",
1803 &d.config.viewer_certificate.clone().unwrap_or_default(),
1804 ));
1805 out.push_str(&render_inline(
1806 "Restrictions",
1807 &d.config.restrictions.clone().unwrap_or_default(),
1808 ));
1809 out.push_str(&format!(
1810 "<WebACLId>{}</WebACLId>",
1811 esc(&d.config.web_acl_id.clone().unwrap_or_default())
1812 ));
1813 out.push_str(&format!(
1814 "<HttpVersion>{}</HttpVersion>",
1815 esc(&d
1816 .config
1817 .http_version
1818 .clone()
1819 .unwrap_or_else(|| "http2".to_string()))
1820 ));
1821 out.push_str(&format!(
1822 "<IsIPV6Enabled>{}</IsIPV6Enabled>",
1823 d.config.is_ipv6_enabled.unwrap_or(true)
1824 ));
1825 out.push_str(&format!(
1826 "<Staging>{}</Staging>",
1827 d.config.staging.unwrap_or(false)
1828 ));
1829 out.push_str("</DistributionSummary>");
1830 }
1831 out.push_str("</Items>");
1832 out.push_str(&format!("</{root}>"));
1833 out
1834}
1835
1836fn build_empty_distribution_id_list(root: &str) -> String {
1837 let mut out = String::with_capacity(256);
1838 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1839 out.push_str(&format!("<{root} xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1840 out.push_str("<Marker></Marker>");
1841 out.push_str("<MaxItems>100</MaxItems>");
1842 out.push_str("<IsTruncated>false</IsTruncated>");
1843 out.push_str("<Quantity>0</Quantity>");
1844 out.push_str(&format!("</{root}>"));
1845 out
1846}
1847
1848fn build_distribution_id_list_xml(
1851 ids: &[&str],
1852 marker: &str,
1853 max_items: usize,
1854 next_marker: Option<&str>,
1855) -> String {
1856 let mut out = String::with_capacity(256);
1857 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1858 out.push_str(&format!(
1859 "<DistributionIdList xmlns=\"{ns}\">",
1860 ns = crate::NAMESPACE
1861 ));
1862 out.push_str(&format!("<Marker>{}</Marker>", esc(marker)));
1863 if let Some(nm) = next_marker {
1864 out.push_str(&format!("<NextMarker>{}</NextMarker>", esc(nm)));
1865 }
1866 out.push_str(&format!("<MaxItems>{max_items}</MaxItems>"));
1867 out.push_str(&format!(
1868 "<IsTruncated>{}</IsTruncated>",
1869 next_marker.is_some()
1870 ));
1871 out.push_str(&format!("<Quantity>{}</Quantity>", ids.len()));
1872 if !ids.is_empty() {
1873 out.push_str("<Items>");
1874 for id in ids {
1875 out.push_str(&format!("<DistributionId>{}</DistributionId>", esc(id)));
1876 }
1877 out.push_str("</Items>");
1878 }
1879 out.push_str("</DistributionIdList>");
1880 out
1881}
1882
1883fn paginate_distributions(
1888 req: &AwsRequest,
1889 items: Vec<StoredDistribution>,
1890) -> (String, usize, Vec<StoredDistribution>, Option<String>) {
1891 let max_items = req
1892 .query_params
1893 .get("MaxItems")
1894 .or_else(|| req.query_params.get("maxitems"))
1895 .and_then(|v| v.parse::<usize>().ok())
1896 .filter(|n| *n > 0)
1897 .unwrap_or(100);
1898 let marker = req
1899 .query_params
1900 .get("Marker")
1901 .or_else(|| req.query_params.get("marker"))
1902 .cloned()
1903 .unwrap_or_default();
1904 let start_idx = if marker.is_empty() {
1908 0
1909 } else {
1910 match items.iter().position(|d| d.id == marker) {
1911 Some(pos) => pos + 1,
1912 None => items.len(),
1913 }
1914 };
1915 let page: Vec<StoredDistribution> = items
1916 .iter()
1917 .skip(start_idx)
1918 .take(max_items)
1919 .cloned()
1920 .collect();
1921 let has_more = start_idx + page.len() < items.len();
1925 let next_marker = if has_more {
1926 page.last().map(|d| d.id.clone())
1927 } else {
1928 None
1929 };
1930 (marker, max_items, page, next_marker)
1931}
1932
1933fn distribution_cache_behaviors(
1936 d: &StoredDistribution,
1937) -> impl Iterator<Item = &crate::model::CacheBehavior> {
1938 d.config
1939 .cache_behaviors
1940 .as_ref()
1941 .and_then(|cb| cb.items.as_ref())
1942 .into_iter()
1943 .flat_map(|items| items.cache_behavior.iter())
1944}
1945
1946fn distribution_uses_cache_policy(d: &StoredDistribution, id: &str) -> bool {
1947 d.config.default_cache_behavior.cache_policy_id.as_deref() == Some(id)
1948 || distribution_cache_behaviors(d).any(|b| b.cache_policy_id.as_deref() == Some(id))
1949}
1950
1951fn distribution_uses_origin_request_policy(d: &StoredDistribution, id: &str) -> bool {
1952 d.config
1953 .default_cache_behavior
1954 .origin_request_policy_id
1955 .as_deref()
1956 == Some(id)
1957 || distribution_cache_behaviors(d)
1958 .any(|b| b.origin_request_policy_id.as_deref() == Some(id))
1959}
1960
1961fn distribution_uses_response_headers_policy(d: &StoredDistribution, id: &str) -> bool {
1962 d.config
1963 .default_cache_behavior
1964 .response_headers_policy_id
1965 .as_deref()
1966 == Some(id)
1967 || distribution_cache_behaviors(d)
1968 .any(|b| b.response_headers_policy_id.as_deref() == Some(id))
1969}
1970
1971fn trusted_key_groups_contains(tkg: Option<&crate::model::TrustedKeyGroups>, id: &str) -> bool {
1972 tkg.and_then(|g| g.items.as_ref())
1973 .map(|items| items.key_group.iter().any(|k| k == id))
1974 .unwrap_or(false)
1975}
1976
1977fn distribution_uses_key_group(d: &StoredDistribution, id: &str) -> bool {
1978 trusted_key_groups_contains(
1979 d.config.default_cache_behavior.trusted_key_groups.as_ref(),
1980 id,
1981 ) || distribution_cache_behaviors(d)
1982 .any(|b| trusted_key_groups_contains(b.trusted_key_groups.as_ref(), id))
1983}
1984
1985fn distribution_uses_vpc_origin(d: &StoredDistribution, id: &str) -> bool {
1986 d.config
1987 .origins
1988 .items
1989 .as_ref()
1990 .map(|items| {
1991 items.origin.iter().any(|o| {
1992 o.vpc_origin_config
1993 .as_ref()
1994 .map(|v| v.vpc_origin_id == id)
1995 .unwrap_or(false)
1996 })
1997 })
1998 .unwrap_or(false)
1999}
2000
2001fn build_invalidation_xml(inv: &StoredInvalidation) -> String {
2002 let mut out = String::with_capacity(512);
2003 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
2004 out.push_str(&format!(
2005 "<Invalidation xmlns=\"{ns}\">",
2006 ns = crate::NAMESPACE
2007 ));
2008 out.push_str(&format!("<Id>{}</Id>", esc(&inv.id)));
2009 out.push_str(&format!("<Status>{}</Status>", esc(&inv.status)));
2010 out.push_str(&format!(
2011 "<CreateTime>{}</CreateTime>",
2012 rfc3339(&inv.create_time)
2013 ));
2014 out.push_str(&render_inline("InvalidationBatch", &inv.batch));
2015 out.push_str("</Invalidation>");
2016 out
2017}
2018
2019fn build_invalidation_list_xml(items: &[&StoredInvalidation]) -> String {
2020 let mut out = String::with_capacity(1024);
2021 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
2022 out.push_str(&format!(
2023 "<InvalidationList xmlns=\"{ns}\">",
2024 ns = crate::NAMESPACE
2025 ));
2026 out.push_str("<Marker></Marker>");
2027 out.push_str("<MaxItems>100</MaxItems>");
2028 out.push_str("<IsTruncated>false</IsTruncated>");
2029 out.push_str(&format!("<Quantity>{}</Quantity>", items.len()));
2030 if !items.is_empty() {
2031 out.push_str("<Items>");
2032 for inv in items {
2033 out.push_str("<InvalidationSummary>");
2034 out.push_str(&format!("<Id>{}</Id>", esc(&inv.id)));
2035 out.push_str(&format!(
2036 "<CreateTime>{}</CreateTime>",
2037 rfc3339(&inv.create_time)
2038 ));
2039 out.push_str(&format!("<Status>{}</Status>", esc(&inv.status)));
2040 out.push_str("</InvalidationSummary>");
2041 }
2042 out.push_str("</Items>");
2043 }
2044 out.push_str("</InvalidationList>");
2045 out
2046}
2047
2048fn build_tags_xml(tags: &[Tag]) -> String {
2049 let mut out = String::with_capacity(256);
2050 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
2051 out.push_str(&format!("<Tags xmlns=\"{ns}\">", ns = crate::NAMESPACE));
2052 out.push_str("<Items>");
2053 for t in tags {
2054 out.push_str("<Tag>");
2055 out.push_str(&format!("<Key>{}</Key>", esc(&t.key)));
2056 if let Some(v) = &t.value {
2057 out.push_str(&format!("<Value>{}</Value>", esc(v)));
2058 }
2059 out.push_str("</Tag>");
2060 }
2061 out.push_str("</Items>");
2062 out.push_str("</Tags>");
2063 out
2064}
2065
2066fn render_inline<T: serde::Serialize>(root: &str, value: &T) -> String {
2067 quick_xml::se::to_string_with_root(root, value).unwrap_or_default()
2068}
2069
2070fn validate_caller_reference(s: &str) -> Result<(), AwsServiceError> {
2073 if s.is_empty() {
2074 return Err(invalid_argument("CallerReference is required"));
2075 }
2076 Ok(())
2077}
2078
2079fn validate_origins(config: &DistributionConfig) -> Result<(), AwsServiceError> {
2080 if config.origins.quantity < 1 {
2081 return Err(invalid_argument(
2082 "DistributionConfig.Origins must contain at least one origin",
2083 ));
2084 }
2085 Ok(())
2086}
2087
2088fn configs_equal(lhs: &DistributionConfig, rhs: &DistributionConfig) -> bool {
2094 let Ok(a) = xml_io::to_xml_root("DistributionConfig", lhs) else {
2095 return false;
2096 };
2097 let Ok(b) = xml_io::to_xml_root("DistributionConfig", rhs) else {
2098 return false;
2099 };
2100 a == b
2101}
2102
2103fn account_id(_req: &AwsRequest) -> &'static str {
2104 DEFAULT_ACCOUNT
2109}
2110
2111fn generate_distribution_id() -> String {
2112 let raw = Uuid::new_v4().simple().to_string().to_uppercase();
2114 format!("E{}", &raw[..13])
2115}
2116
2117fn generate_invalidation_id() -> String {
2118 let raw = Uuid::new_v4().simple().to_string().to_uppercase();
2119 format!("I{}", &raw[..13])
2120}
2121
2122pub(crate) fn generate_etag() -> String {
2123 let raw = Uuid::new_v4().simple().to_string().to_uppercase();
2124 format!("E{}", &raw[..13])
2125}
2126
2127pub(crate) fn generate_id_with_prefix(prefix: &str) -> String {
2132 let raw = Uuid::new_v4().simple().to_string().to_uppercase();
2133 let suffix_len = 14usize.saturating_sub(prefix.len()).min(raw.len());
2134 format!("{prefix}{}", &raw[..suffix_len])
2135}
2136
2137fn rfc3339(t: &chrono::DateTime<chrono::Utc>) -> String {
2138 t.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
2139}
2140
2141pub(crate) fn invalid_argument(msg: impl Into<String>) -> AwsServiceError {
2142 aws_error(StatusCode::BAD_REQUEST, "InvalidArgument", msg)
2143}
2144
2145fn no_such_distribution(id: &str) -> AwsServiceError {
2146 aws_error(
2147 StatusCode::NOT_FOUND,
2148 "NoSuchDistribution",
2149 format!("The specified distribution does not exist: {id}"),
2150 )
2151}
2152
2153fn no_such_invalidation(id: &str) -> AwsServiceError {
2154 aws_error(
2155 StatusCode::NOT_FOUND,
2156 "NoSuchInvalidation",
2157 format!("The specified invalidation does not exist: {id}"),
2158 )
2159}
2160
2161fn internal_error(msg: impl Into<String>) -> AwsServiceError {
2162 aws_error(StatusCode::INTERNAL_SERVER_ERROR, "InternalError", msg)
2163}
2164
2165pub(crate) fn aws_error(
2166 status: StatusCode,
2167 code: impl Into<String>,
2168 msg: impl Into<String>,
2169) -> AwsServiceError {
2170 AwsServiceError::aws_error(status, code.into(), msg)
2171}
2172
2173fn set_header(headers: &mut HeaderMap, name: HeaderName, value: &str) {
2174 if let Ok(v) = HeaderValue::from_str(value) {
2175 headers.insert(name, v);
2176 }
2177}
2178
2179pub(crate) fn xml_response(status: StatusCode, body: String, headers: HeaderMap) -> AwsResponse {
2180 AwsResponse {
2181 status,
2182 content_type: "text/xml".to_string(),
2183 body: ResponseBody::Bytes(Bytes::from(body)),
2184 headers,
2185 }
2186}
2187
2188fn empty_response(status: StatusCode) -> AwsResponse {
2189 AwsResponse {
2190 status,
2191 content_type: "text/xml".to_string(),
2192 body: ResponseBody::Bytes(Bytes::new()),
2193 headers: HeaderMap::new(),
2194 }
2195}
2196
2197fn percent_decode(input: &str) -> String {
2198 let mut out = String::with_capacity(input.len());
2199 let bytes = input.as_bytes();
2200 let mut i = 0;
2201 while i < bytes.len() {
2202 let b = bytes[i];
2203 if b == b'%' && i + 2 < bytes.len() {
2204 if let (Some(a), Some(c)) = (hex_digit(bytes[i + 1]), hex_digit(bytes[i + 2])) {
2205 out.push(((a << 4) | c) as char);
2206 i += 3;
2207 continue;
2208 }
2209 }
2210 if b == b'+' {
2211 out.push(' ');
2212 } else {
2213 out.push(b as char);
2214 }
2215 i += 1;
2216 }
2217 out
2218}
2219
2220fn hex_digit(b: u8) -> Option<u8> {
2221 match b {
2222 b'0'..=b'9' => Some(b - b'0'),
2223 b'a'..=b'f' => Some(b - b'a' + 10),
2224 b'A'..=b'F' => Some(b - b'A' + 10),
2225 _ => None,
2226 }
2227}
2228
2229pub(crate) fn is_placeholder_label(value: &str) -> bool {
2236 if value.is_empty() {
2237 return true;
2238 }
2239 let lower = value.to_ascii_lowercase();
2240 value.starts_with('{') || lower.starts_with("%7b")
2241}
2242
2243pub(crate) fn extract_body_field(body: &[u8], key: &str) -> Option<String> {
2251 if let Ok(s) = std::str::from_utf8(body) {
2252 let trimmed = s.trim_start();
2253 if trimmed.starts_with('{') {
2254 if let Ok(v) = serde_json::from_str::<serde_json::Value>(trimmed) {
2255 if let Some(field) = v.get(key) {
2256 return match field {
2257 serde_json::Value::String(s) => Some(s.clone()),
2258 serde_json::Value::Number(n) => Some(n.to_string()),
2259 serde_json::Value::Bool(b) => Some(b.to_string()),
2260 _ => None,
2261 };
2262 }
2263 return None;
2264 }
2265 }
2266 let open = format!("<{key}>");
2268 let close = format!("</{key}>");
2269 if let Some(start) = s.find(&open) {
2270 let after = start + open.len();
2271 if let Some(end_rel) = s[after..].find(&close) {
2272 return Some(s[after..after + end_rel].to_string());
2273 }
2274 }
2275 }
2276 None
2277}
2278
2279fn parse_query_value(query: &str, key: &str) -> Option<String> {
2280 let prefix = format!("{key}=");
2281 for pair in query.split('&').filter(|p| !p.is_empty()) {
2282 if let Some(rest) = pair.strip_prefix(&prefix) {
2283 return Some(percent_decode(rest));
2284 }
2285 }
2286 None
2287}
2288
2289#[cfg(test)]
2290mod tests {
2291 use super::*;
2292
2293 #[test]
2294 fn distribution_list_xml_renders_pagination_fields() {
2295 let xml =
2298 build_distribution_list_xml(&[], "DistributionList", "EDFDVBD6", 2, Some("E2NEXT"));
2299 assert!(xml.contains("<Marker>EDFDVBD6</Marker>"));
2300 assert!(xml.contains("<MaxItems>2</MaxItems>"));
2301 assert!(xml.contains("<IsTruncated>true</IsTruncated>"));
2302 assert!(xml.contains("<NextMarker>E2NEXT</NextMarker>"));
2303 let xml = build_distribution_list_xml(&[], "DistributionList", "", 100, None);
2305 assert!(xml.contains("<IsTruncated>false</IsTruncated>"));
2306 assert!(!xml.contains("<NextMarker>"));
2307 }
2308
2309 #[test]
2310 fn placeholder_label_detects_braces_and_percent_encoding() {
2311 assert!(is_placeholder_label(""));
2312 assert!(is_placeholder_label("{Identifier}"));
2313 assert!(is_placeholder_label("%7BIdentifier%7D"));
2314 assert!(is_placeholder_label("%7bidentifier%7d"));
2315 assert!(!is_placeholder_label("E1234567890ABC"));
2316 assert!(!is_placeholder_label(
2317 "arn:aws:cloudfront::000:distribution/E1"
2318 ));
2319 }
2320
2321 #[test]
2322 fn extract_body_field_handles_json_and_xml() {
2323 let json = br#"{"Stage":"BROKEN","Marker":"x"}"#;
2324 assert_eq!(
2325 extract_body_field(json, "Stage"),
2326 Some("BROKEN".to_string())
2327 );
2328 assert_eq!(extract_body_field(json, "MaxItems"), None);
2329
2330 let xml = br#"<?xml version="1.0"?><Body><Domain>example.com</Domain></Body>"#;
2331 assert_eq!(
2332 extract_body_field(xml, "Domain"),
2333 Some("example.com".to_string())
2334 );
2335 assert_eq!(extract_body_field(xml, "Missing"), None);
2336
2337 assert_eq!(extract_body_field(b"", "x"), None);
2338 }
2339
2340 fn make_state() -> SharedCloudFrontState {
2341 Arc::new(RwLock::new(CloudFrontAccounts::new()))
2342 }
2343
2344 fn make_request(method: http::Method, path: &str, query: &str, body: &str) -> AwsRequest {
2345 AwsRequest {
2346 service: "cloudfront".into(),
2347 action: String::new(),
2348 region: "us-east-1".into(),
2349 account_id: DEFAULT_ACCOUNT.into(),
2350 request_id: Uuid::new_v4().to_string(),
2351 headers: HeaderMap::new(),
2352 query_params: std::collections::HashMap::new(),
2353 body_stream: parking_lot::Mutex::new(None),
2354 body: Bytes::from(body.to_string()),
2355 path_segments: path
2356 .split('/')
2357 .filter(|s| !s.is_empty())
2358 .map(String::from)
2359 .collect(),
2360 raw_path: path.into(),
2361 raw_query: query.into(),
2362 method,
2363 is_query_protocol: false,
2364 access_key_id: None,
2365 principal: None,
2366 }
2367 }
2368
2369 fn minimal_dist_config_xml(caller_ref: &str) -> String {
2370 format!(
2371 r#"<?xml version="1.0" encoding="UTF-8"?>
2372<DistributionConfig xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2373 <CallerReference>{caller_ref}</CallerReference>
2374 <Origins>
2375 <Quantity>1</Quantity>
2376 <Items>
2377 <Origin>
2378 <Id>primary</Id>
2379 <DomainName>example.com</DomainName>
2380 </Origin>
2381 </Items>
2382 </Origins>
2383 <DefaultCacheBehavior>
2384 <TargetOriginId>primary</TargetOriginId>
2385 <ViewerProtocolPolicy>allow-all</ViewerProtocolPolicy>
2386 </DefaultCacheBehavior>
2387 <Comment></Comment>
2388 <Enabled>true</Enabled>
2389</DistributionConfig>"#
2390 )
2391 }
2392
2393 #[tokio::test]
2394 async fn create_then_get_then_delete_distribution() {
2395 let svc = CloudFrontService::new(make_state());
2396 let body = minimal_dist_config_xml("ref-1");
2397 let create = svc
2398 .handle(make_request(
2399 http::Method::POST,
2400 "/2020-05-31/distribution",
2401 "",
2402 &body,
2403 ))
2404 .await
2405 .unwrap();
2406 assert_eq!(create.status, StatusCode::CREATED);
2407 let etag = create
2408 .headers
2409 .get(ETAG)
2410 .unwrap()
2411 .to_str()
2412 .unwrap()
2413 .to_string();
2414 let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2415 let id = xml
2416 .split("<Id>")
2417 .nth(1)
2418 .unwrap()
2419 .split("</Id>")
2420 .next()
2421 .unwrap()
2422 .to_string();
2423
2424 let get = svc
2425 .handle(make_request(
2426 http::Method::GET,
2427 &format!("/2020-05-31/distribution/{id}"),
2428 "",
2429 "",
2430 ))
2431 .await
2432 .unwrap();
2433 assert_eq!(get.status, StatusCode::OK);
2434
2435 let disable_body = body.replace("<Enabled>true</Enabled>", "<Enabled>false</Enabled>");
2437 let mut update_req = make_request(
2438 http::Method::PUT,
2439 &format!("/2020-05-31/distribution/{id}/config"),
2440 "",
2441 &disable_body,
2442 );
2443 update_req.headers.insert(IF_MATCH, etag.parse().unwrap());
2444 let updated = svc.handle(update_req).await.unwrap();
2445 assert_eq!(updated.status, StatusCode::OK);
2446 let new_etag = updated
2447 .headers
2448 .get(ETAG)
2449 .unwrap()
2450 .to_str()
2451 .unwrap()
2452 .to_string();
2453
2454 let mut del_req = make_request(
2455 http::Method::DELETE,
2456 &format!("/2020-05-31/distribution/{id}"),
2457 "",
2458 "",
2459 );
2460 del_req.headers.insert(IF_MATCH, new_etag.parse().unwrap());
2461 let del = svc.handle(del_req).await.unwrap();
2462 assert_eq!(del.status, StatusCode::NO_CONTENT);
2463 }
2464
2465 async fn create_distribution_returning_id(svc: &CloudFrontService, caller_ref: &str) -> String {
2466 let body = minimal_dist_config_xml(caller_ref);
2467 let create = svc
2468 .handle(make_request(
2469 http::Method::POST,
2470 "/2020-05-31/distribution",
2471 "",
2472 &body,
2473 ))
2474 .await
2475 .unwrap();
2476 let xml = std::str::from_utf8(create.body.expect_bytes())
2477 .unwrap()
2478 .to_string();
2479 xml.split("<Id>")
2480 .nth(1)
2481 .unwrap()
2482 .split("</Id>")
2483 .next()
2484 .unwrap()
2485 .to_string()
2486 }
2487
2488 fn distribution_status(svc: &CloudFrontService, id: &str) -> String {
2489 let state = svc.state.read();
2490 state
2491 .accounts
2492 .get(DEFAULT_ACCOUNT)
2493 .and_then(|a| a.distributions.get(id))
2494 .map(|d| d.status.clone())
2495 .unwrap_or_default()
2496 }
2497
2498 #[tokio::test]
2499 async fn create_distribution_starts_in_progress() {
2500 let svc = CloudFrontService::new(make_state())
2502 .with_propagation_delay(std::time::Duration::from_secs(60));
2503 let body = minimal_dist_config_xml("status-ref");
2504 let create = svc
2505 .handle(make_request(
2506 http::Method::POST,
2507 "/2020-05-31/distribution",
2508 "",
2509 &body,
2510 ))
2511 .await
2512 .unwrap();
2513 let xml = std::str::from_utf8(create.body.expect_bytes())
2514 .unwrap()
2515 .to_string();
2516 assert!(
2517 xml.contains("<Status>InProgress</Status>"),
2518 "expected initial status InProgress, got: {xml}"
2519 );
2520 }
2521
2522 #[tokio::test]
2523 async fn auto_transition_after_tick_marks_deployed() {
2524 let svc = CloudFrontService::new(make_state())
2525 .with_propagation_delay(std::time::Duration::from_millis(50));
2526 let id = create_distribution_returning_id(&svc, "auto-tick-ref").await;
2527 assert_eq!(distribution_status(&svc, &id), "InProgress");
2528 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2529 assert_eq!(distribution_status(&svc, &id), "Deployed");
2530 }
2531
2532 #[tokio::test]
2533 async fn set_distribution_status_via_admin_flips_synchronously() {
2534 let svc = CloudFrontService::new(make_state())
2535 .with_propagation_delay(std::time::Duration::from_secs(60));
2536 let id = create_distribution_returning_id(&svc, "admin-ref").await;
2537 assert_eq!(distribution_status(&svc, &id), "InProgress");
2538 assert!(svc.set_distribution_status(&id, "Deployed"));
2539 assert_eq!(distribution_status(&svc, &id), "Deployed");
2540 assert!(svc.set_distribution_status(&id, "InProgress"));
2541 assert_eq!(distribution_status(&svc, &id), "InProgress");
2542 }
2543
2544 #[tokio::test]
2545 async fn set_distribution_status_unknown_id_returns_false() {
2546 let svc = CloudFrontService::new(make_state());
2547 assert!(!svc.set_distribution_status("E-DOES-NOT-EXIST", "Deployed"));
2548 }
2549
2550 #[tokio::test]
2551 async fn update_distribution_resets_to_in_progress() {
2552 let svc = CloudFrontService::new(make_state())
2553 .with_propagation_delay(std::time::Duration::from_secs(60));
2554 let body = minimal_dist_config_xml("update-reset-ref");
2555 let create = svc
2556 .handle(make_request(
2557 http::Method::POST,
2558 "/2020-05-31/distribution",
2559 "",
2560 &body,
2561 ))
2562 .await
2563 .unwrap();
2564 let etag = create
2565 .headers
2566 .get(ETAG)
2567 .unwrap()
2568 .to_str()
2569 .unwrap()
2570 .to_string();
2571 let xml = std::str::from_utf8(create.body.expect_bytes())
2572 .unwrap()
2573 .to_string();
2574 let id = xml
2575 .split("<Id>")
2576 .nth(1)
2577 .unwrap()
2578 .split("</Id>")
2579 .next()
2580 .unwrap()
2581 .to_string();
2582 assert!(svc.set_distribution_status(&id, "Deployed"));
2585 assert_eq!(distribution_status(&svc, &id), "Deployed");
2586
2587 let updated_body = body.replace(
2588 "<ViewerProtocolPolicy>allow-all</ViewerProtocolPolicy>",
2589 "<ViewerProtocolPolicy>https-only</ViewerProtocolPolicy>",
2590 );
2591 let mut update_req = make_request(
2592 http::Method::PUT,
2593 &format!("/2020-05-31/distribution/{id}/config"),
2594 "",
2595 &updated_body,
2596 );
2597 update_req.headers.insert(IF_MATCH, etag.parse().unwrap());
2598 let updated = svc.handle(update_req).await.unwrap();
2599 assert_eq!(updated.status, StatusCode::OK);
2600 assert_eq!(distribution_status(&svc, &id), "InProgress");
2601 }
2602
2603 #[tokio::test]
2604 async fn duplicate_caller_reference_is_rejected() {
2605 let svc = CloudFrontService::new(make_state());
2606 let body = minimal_dist_config_xml("dup-ref");
2607 svc.handle(make_request(
2608 http::Method::POST,
2609 "/2020-05-31/distribution",
2610 "",
2611 &body,
2612 ))
2613 .await
2614 .unwrap();
2615 let result = svc
2616 .handle(make_request(
2617 http::Method::POST,
2618 "/2020-05-31/distribution",
2619 "",
2620 &body,
2621 ))
2622 .await;
2623 let err = match result {
2624 Ok(_) => panic!("expected duplicate caller-reference to fail"),
2625 Err(e) => e,
2626 };
2627 assert_eq!(err.code(), "DistributionAlreadyExists");
2628 assert_eq!(err.status(), StatusCode::CONFLICT);
2629 }
2630
2631 #[tokio::test]
2632 async fn invalidation_lifecycle() {
2633 let svc = CloudFrontService::new(make_state());
2634 let body = minimal_dist_config_xml("inv-ref");
2635 let create = svc
2636 .handle(make_request(
2637 http::Method::POST,
2638 "/2020-05-31/distribution",
2639 "",
2640 &body,
2641 ))
2642 .await
2643 .unwrap();
2644 let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2645 let dist_id = xml
2646 .split("<Id>")
2647 .nth(1)
2648 .unwrap()
2649 .split("</Id>")
2650 .next()
2651 .unwrap();
2652 let inv_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2653<InvalidationBatch xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2654 <Paths><Quantity>1</Quantity><Items><Path>/*</Path></Items></Paths>
2655 <CallerReference>inv-1</CallerReference>
2656</InvalidationBatch>"#;
2657 let inv_resp = svc
2658 .handle(make_request(
2659 http::Method::POST,
2660 &format!("/2020-05-31/distribution/{dist_id}/invalidation"),
2661 "",
2662 inv_body,
2663 ))
2664 .await
2665 .unwrap();
2666 assert_eq!(inv_resp.status, StatusCode::CREATED);
2667 let inv_xml = std::str::from_utf8(inv_resp.body.expect_bytes()).unwrap();
2668 let inv_id = inv_xml
2669 .split("<Id>")
2670 .nth(1)
2671 .unwrap()
2672 .split("</Id>")
2673 .next()
2674 .unwrap();
2675 let get = svc
2676 .handle(make_request(
2677 http::Method::GET,
2678 &format!("/2020-05-31/distribution/{dist_id}/invalidation/{inv_id}"),
2679 "",
2680 "",
2681 ))
2682 .await
2683 .unwrap();
2684 assert_eq!(get.status, StatusCode::OK);
2685 let list = svc
2686 .handle(make_request(
2687 http::Method::GET,
2688 &format!("/2020-05-31/distribution/{dist_id}/invalidation"),
2689 "",
2690 "",
2691 ))
2692 .await
2693 .unwrap();
2694 let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2695 assert!(xml.contains("<Quantity>1</Quantity>"));
2696 }
2697
2698 #[tokio::test]
2699 async fn tags_roundtrip() {
2700 let svc = CloudFrontService::new(make_state());
2701 let body = minimal_dist_config_xml("tag-ref");
2702 let create = svc
2703 .handle(make_request(
2704 http::Method::POST,
2705 "/2020-05-31/distribution",
2706 "",
2707 &body,
2708 ))
2709 .await
2710 .unwrap();
2711 let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2712 let arn = xml
2713 .split("<ARN>")
2714 .nth(1)
2715 .unwrap()
2716 .split("</ARN>")
2717 .next()
2718 .unwrap();
2719 let tag_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2720<Tags xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2721 <Items><Tag><Key>env</Key><Value>prod</Value></Tag></Items>
2722</Tags>"#;
2723 let arn_q = format!("Operation=Tag&Resource={}", arn);
2724 let resp = svc
2725 .handle(make_request(
2726 http::Method::POST,
2727 "/2020-05-31/tagging",
2728 &arn_q,
2729 tag_body,
2730 ))
2731 .await
2732 .unwrap();
2733 assert_eq!(resp.status, StatusCode::NO_CONTENT);
2734 let list = svc
2735 .handle(make_request(
2736 http::Method::GET,
2737 "/2020-05-31/tagging",
2738 &format!("Resource={}", arn),
2739 "",
2740 ))
2741 .await
2742 .unwrap();
2743 let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2744 assert!(xml.contains("<Key>env</Key>"));
2745 assert!(xml.contains("<Value>prod</Value>"));
2746 }
2747
2748 #[tokio::test]
2749 async fn xml_metacharacters_in_user_input_are_escaped() {
2750 let svc = CloudFrontService::new(make_state());
2751 let body = minimal_dist_config_xml("escape-ref").replace(
2752 "<Comment></Comment>",
2753 "<Comment><![CDATA[a&b<c>d]]></Comment>",
2754 );
2755 let create = svc
2756 .handle(make_request(
2757 http::Method::POST,
2758 "/2020-05-31/distribution",
2759 "",
2760 &body,
2761 ))
2762 .await
2763 .unwrap();
2764 let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2765 let dist_id = xml
2766 .split("<Id>")
2767 .nth(1)
2768 .unwrap()
2769 .split("</Id>")
2770 .next()
2771 .unwrap();
2772 let arn = xml
2773 .split("<ARN>")
2774 .nth(1)
2775 .unwrap()
2776 .split("</ARN>")
2777 .next()
2778 .unwrap();
2779
2780 let tag_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2781<Tags xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2782 <Items><Tag><Key>env</Key><Value>a&b<c>d</Value></Tag></Items>
2783</Tags>"#;
2784 let arn_q = format!("Operation=Tag&Resource={}", arn);
2785 svc.handle(make_request(
2786 http::Method::POST,
2787 "/2020-05-31/tagging",
2788 &arn_q,
2789 tag_body,
2790 ))
2791 .await
2792 .unwrap();
2793
2794 let list = svc
2795 .handle(make_request(
2796 http::Method::GET,
2797 "/2020-05-31/tagging",
2798 &format!("Resource={}", arn),
2799 "",
2800 ))
2801 .await
2802 .unwrap();
2803 let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2804 assert!(xml.contains("<Value>a&b<c>d</Value>"));
2805 assert!(!xml.contains("<Value>a&b<c>d</Value>"));
2806
2807 let list_resp = svc
2810 .handle(make_request(
2811 http::Method::GET,
2812 "/2020-05-31/distribution",
2813 "",
2814 "",
2815 ))
2816 .await
2817 .unwrap();
2818 let xml = std::str::from_utf8(list_resp.body.expect_bytes()).unwrap();
2819 assert!(!xml.contains("<Comment>a&b<c>d"));
2822 assert!(xml.contains(dist_id));
2823 }
2824
2825 fn predicate_dist_config_xml(caller_ref: &str) -> String {
2826 format!(
2827 r#"<?xml version="1.0" encoding="UTF-8"?>
2828<DistributionConfig xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2829 <CallerReference>{caller_ref}</CallerReference>
2830 <Origins>
2831 <Quantity>1</Quantity>
2832 <Items>
2833 <Origin>
2834 <Id>primary</Id>
2835 <DomainName>example.com</DomainName>
2836 <VpcOriginConfig><VpcOriginId>vo-123</VpcOriginId></VpcOriginConfig>
2837 </Origin>
2838 </Items>
2839 </Origins>
2840 <DefaultCacheBehavior>
2841 <TargetOriginId>primary</TargetOriginId>
2842 <ViewerProtocolPolicy>allow-all</ViewerProtocolPolicy>
2843 <CachePolicyId>cp-123</CachePolicyId>
2844 <OriginRequestPolicyId>orp-123</OriginRequestPolicyId>
2845 <ResponseHeadersPolicyId>rhp-123</ResponseHeadersPolicyId>
2846 <TrustedKeyGroups>
2847 <Enabled>true</Enabled>
2848 <Quantity>1</Quantity>
2849 <Items><KeyGroup>kg-123</KeyGroup></Items>
2850 </TrustedKeyGroups>
2851 </DefaultCacheBehavior>
2852 <Comment></Comment>
2853 <WebACLId>waf-abc</WebACLId>
2854 <AnycastIpListId>ail-123</AnycastIpListId>
2855 <Enabled>true</Enabled>
2856</DistributionConfig>"#
2857 )
2858 }
2859
2860 async fn list_by(svc: &CloudFrontService, path: &str) -> String {
2861 let resp = svc
2862 .handle(make_request(http::Method::GET, path, "", ""))
2863 .await
2864 .unwrap();
2865 assert_eq!(resp.status, StatusCode::OK);
2866 std::str::from_utf8(resp.body.expect_bytes())
2867 .unwrap()
2868 .to_string()
2869 }
2870
2871 #[tokio::test]
2872 async fn list_distributions_by_predicate_fields_filters_state() {
2873 let svc = CloudFrontService::new(make_state());
2874 let create = svc
2875 .handle(make_request(
2876 http::Method::POST,
2877 "/2020-05-31/distribution",
2878 "",
2879 &predicate_dist_config_xml("pred-ref"),
2880 ))
2881 .await
2882 .unwrap();
2883 let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2884 let id = xml
2885 .split("<Id>")
2886 .nth(1)
2887 .unwrap()
2888 .split("</Id>")
2889 .next()
2890 .unwrap()
2891 .to_string();
2892
2893 for (path, root) in [
2895 ("/2020-05-31/distributionsByCachePolicyId/cp-123", "cp-123"),
2896 (
2897 "/2020-05-31/distributionsByOriginRequestPolicyId/orp-123",
2898 "orp-123",
2899 ),
2900 (
2901 "/2020-05-31/distributionsByResponseHeadersPolicyId/rhp-123",
2902 "rhp-123",
2903 ),
2904 ("/2020-05-31/distributionsByKeyGroupId/kg-123", "kg-123"),
2905 ("/2020-05-31/distributionsByVpcOriginId/vo-123", "vo-123"),
2906 ] {
2907 let body = list_by(&svc, path).await;
2908 assert!(body.contains("<DistributionIdList"), "{root}: {body}");
2909 assert!(
2910 body.contains(&format!("<DistributionId>{id}</DistributionId>")),
2911 "{root} did not list distribution: {body}"
2912 );
2913 assert!(body.contains("<Quantity>1</Quantity>"), "{root}: {body}");
2914 }
2915
2916 let empty = list_by(&svc, "/2020-05-31/distributionsByCachePolicyId/other").await;
2918 assert!(empty.contains("<Quantity>0</Quantity>"), "{empty}");
2919 assert!(!empty.contains("<DistributionId>"), "{empty}");
2920
2921 let web = list_by(&svc, "/2020-05-31/distributionsByWebACLId/waf-abc").await;
2923 assert!(web.contains("<DistributionList"), "{web}");
2924 assert!(web.contains(&format!("<Id>{id}</Id>")), "{web}");
2925
2926 let anycast = list_by(&svc, "/2020-05-31/distributionsByAnycastIpListId/ail-123").await;
2927 assert!(anycast.contains("<DistributionList"), "{anycast}");
2928 assert!(anycast.contains(&format!("<Id>{id}</Id>")), "{anycast}");
2929
2930 let anycast_miss = list_by(&svc, "/2020-05-31/distributionsByAnycastIpListId/nope").await;
2931 assert!(
2932 anycast_miss.contains("<Quantity>0</Quantity>"),
2933 "{anycast_miss}"
2934 );
2935 }
2936
2937 fn dist_config_with_cache_policy(caller_ref: &str, cache_policy: &str) -> String {
2938 format!(
2939 r#"<?xml version="1.0" encoding="UTF-8"?>
2940<DistributionConfig xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2941 <CallerReference>{caller_ref}</CallerReference>
2942 <Origins>
2943 <Quantity>1</Quantity>
2944 <Items><Origin><Id>primary</Id><DomainName>example.com</DomainName></Origin></Items>
2945 </Origins>
2946 <DefaultCacheBehavior>
2947 <TargetOriginId>primary</TargetOriginId>
2948 <ViewerProtocolPolicy>allow-all</ViewerProtocolPolicy>
2949 <CachePolicyId>{cache_policy}</CachePolicyId>
2950 </DefaultCacheBehavior>
2951 <Comment></Comment>
2952 <Enabled>true</Enabled>
2953</DistributionConfig>"#
2954 )
2955 }
2956
2957 async fn create_dist_returning_id(svc: &CloudFrontService, config_xml: &str) -> String {
2958 let create = svc
2959 .handle(make_request(
2960 http::Method::POST,
2961 "/2020-05-31/distribution",
2962 "",
2963 config_xml,
2964 ))
2965 .await
2966 .unwrap();
2967 let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2968 xml.split("<Id>")
2969 .nth(1)
2970 .unwrap()
2971 .split("</Id>")
2972 .next()
2973 .unwrap()
2974 .to_string()
2975 }
2976
2977 fn make_request_with_params(path: &str, params: &[(&str, &str)]) -> AwsRequest {
2978 let mut r = make_request(http::Method::GET, path, "", "");
2979 r.query_params = params
2980 .iter()
2981 .map(|(k, v)| (k.to_string(), v.to_string()))
2982 .collect();
2983 r
2984 }
2985
2986 #[tokio::test]
2987 async fn list_distributions_by_web_acl_id_null_lists_unassociated() {
2988 let svc = CloudFrontService::new(make_state());
2990 let with_acl = create_dist_returning_id(
2991 &svc,
2992 &predicate_dist_config_xml("null-with-acl"), )
2994 .await;
2995 let without_acl =
2996 create_dist_returning_id(&svc, &dist_config_with_cache_policy("null-no-acl", "cp-x"))
2997 .await;
2998
2999 let body = list_by(&svc, "/2020-05-31/distributionsByWebACLId/null").await;
3000 assert!(body.contains("<DistributionList"), "{body}");
3001 assert!(
3002 body.contains(&format!("<Id>{without_acl}</Id>")),
3003 "unassociated distribution missing from null listing: {body}"
3004 );
3005 assert!(
3006 !body.contains(&format!("<Id>{with_acl}</Id>")),
3007 "distribution WITH a web ACL leaked into the null listing: {body}"
3008 );
3009 }
3010
3011 fn only_distribution_id(xml: &str) -> String {
3012 assert_eq!(
3013 xml.matches("<DistributionId>").count(),
3014 1,
3015 "expected exactly one DistributionId: {xml}"
3016 );
3017 xml.split("<DistributionId>")
3018 .nth(1)
3019 .unwrap()
3020 .split("</DistributionId>")
3021 .next()
3022 .unwrap()
3023 .to_string()
3024 }
3025
3026 #[tokio::test]
3027 async fn list_distributions_by_predicate_paginates() {
3028 let svc = CloudFrontService::new(make_state());
3032 let d1 =
3034 create_dist_returning_id(&svc, &dist_config_with_cache_policy("pg-1", "cp-pg")).await;
3035 let d2 =
3036 create_dist_returning_id(&svc, &dist_config_with_cache_policy("pg-2", "cp-pg")).await;
3037
3038 let page1 = svc
3040 .handle(make_request_with_params(
3041 "/2020-05-31/distributionsByCachePolicyId/cp-pg",
3042 &[("MaxItems", "1")],
3043 ))
3044 .await
3045 .unwrap();
3046 let xml1 = std::str::from_utf8(page1.body.expect_bytes()).unwrap();
3047 assert!(xml1.contains("<MaxItems>1</MaxItems>"), "{xml1}");
3048 assert!(xml1.contains("<IsTruncated>true</IsTruncated>"), "{xml1}");
3049 let first_id = only_distribution_id(xml1);
3050 let next = xml1
3051 .split("<NextMarker>")
3052 .nth(1)
3053 .expect("NextMarker present")
3054 .split("</NextMarker>")
3055 .next()
3056 .unwrap()
3057 .to_string();
3058 assert_eq!(next, first_id, "NextMarker must be the last id on the page");
3060
3061 let page2 = svc
3064 .handle(make_request_with_params(
3065 "/2020-05-31/distributionsByCachePolicyId/cp-pg",
3066 &[("MaxItems", "1"), ("Marker", &next)],
3067 ))
3068 .await
3069 .unwrap();
3070 let xml2 = std::str::from_utf8(page2.body.expect_bytes()).unwrap();
3071 assert!(xml2.contains(&format!("<Marker>{next}</Marker>")), "{xml2}");
3072 assert!(xml2.contains("<IsTruncated>false</IsTruncated>"), "{xml2}");
3073 let second_id = only_distribution_id(xml2);
3074 assert_ne!(
3075 second_id, first_id,
3076 "exclusive marker must not return the marker item again"
3077 );
3078 let mut seen = [first_id, second_id];
3080 seen.sort();
3081 let mut expected = [d1, d2];
3082 expected.sort();
3083 assert_eq!(seen, expected, "pages did not cover both distributions");
3084 }
3085}