1use std::sync::Arc;
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use chrono::Utc;
8use http::header::{HeaderName, HeaderValue, ETAG, IF_MATCH, LOCATION};
9use http::{HeaderMap, StatusCode};
10use parking_lot::RwLock;
11use tokio::sync::Mutex as AsyncMutex;
12use uuid::Uuid;
13
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError, ResponseBody};
15use fakecloud_persistence::SnapshotStore;
16
17use crate::model::{
18 DistributionConfig, DistributionConfigWithTags, InvalidationBatch, TagKeys, Tags as ModelTags,
19};
20use crate::router::{route, Route};
21use crate::state::{
22 CloudFrontAccounts, CloudFrontSnapshot, SharedCloudFrontState, StoredDistribution,
23 StoredInvalidation, Tag, CLOUDFRONT_SNAPSHOT_SCHEMA_VERSION,
24};
25use crate::xml_io;
26
27fn is_mutating_action(action: &str) -> bool {
31 const PREFIXES: &[&str] = &[
32 "Create",
33 "Update",
34 "Delete",
35 "Copy",
36 "Associate",
37 "Disassociate",
38 "Tag",
39 "Untag",
40 "Publish",
41 "Put",
42 ];
43 PREFIXES.iter().any(|p| action.starts_with(p))
44}
45
46pub(crate) const DEFAULT_ACCOUNT: &str = "000000000000";
47
48const SUPPORTED_ACTIONS: &[&str] = &[
49 "CreateDistribution",
50 "CreateDistributionWithTags",
51 "GetDistribution",
52 "GetDistributionConfig",
53 "UpdateDistribution",
54 "DeleteDistribution",
55 "ListDistributions",
56 "CopyDistribution",
57 "CreateInvalidation",
58 "GetInvalidation",
59 "ListInvalidations",
60 "TagResource",
61 "UntagResource",
62 "ListTagsForResource",
63 "AssociateAlias",
64 "ListConflictingAliases",
65 "ListDistributionsByCachePolicyId",
66 "ListDistributionsByOriginRequestPolicyId",
67 "ListDistributionsByResponseHeadersPolicyId",
68 "ListDistributionsByKeyGroup",
69 "ListDistributionsByWebACLId",
70 "ListDistributionsByVpcOriginId",
71 "ListDistributionsByAnycastIpListId",
72 "ListDistributionsByConnectionMode",
73 "ListDistributionsByConnectionFunction",
74 "ListDistributionsByOwnedResource",
75 "ListDistributionsByTrustStore",
76 "ListDistributionsByRealtimeLogConfig",
77 "AssociateDistributionWebACL",
78 "DisassociateDistributionWebACL",
79 "CreateOriginAccessControl",
80 "GetOriginAccessControl",
81 "GetOriginAccessControlConfig",
82 "UpdateOriginAccessControl",
83 "DeleteOriginAccessControl",
84 "ListOriginAccessControls",
85 "CreateCachePolicy",
86 "GetCachePolicy",
87 "GetCachePolicyConfig",
88 "UpdateCachePolicy",
89 "DeleteCachePolicy",
90 "ListCachePolicies",
91 "CreateOriginRequestPolicy",
92 "GetOriginRequestPolicy",
93 "GetOriginRequestPolicyConfig",
94 "UpdateOriginRequestPolicy",
95 "DeleteOriginRequestPolicy",
96 "ListOriginRequestPolicies",
97 "CreateResponseHeadersPolicy",
98 "GetResponseHeadersPolicy",
99 "GetResponseHeadersPolicyConfig",
100 "UpdateResponseHeadersPolicy",
101 "DeleteResponseHeadersPolicy",
102 "ListResponseHeadersPolicies",
103 "CreateContinuousDeploymentPolicy",
104 "GetContinuousDeploymentPolicy",
105 "GetContinuousDeploymentPolicyConfig",
106 "UpdateContinuousDeploymentPolicy",
107 "DeleteContinuousDeploymentPolicy",
108 "ListContinuousDeploymentPolicies",
109 "CreateFunction",
110 "DescribeFunction",
111 "GetFunction",
112 "UpdateFunction",
113 "DeleteFunction",
114 "ListFunctions",
115 "PublishFunction",
116 "TestFunction",
117 "CreatePublicKey",
118 "GetPublicKey",
119 "GetPublicKeyConfig",
120 "UpdatePublicKey",
121 "DeletePublicKey",
122 "ListPublicKeys",
123 "CreateKeyGroup",
124 "GetKeyGroup",
125 "GetKeyGroupConfig",
126 "UpdateKeyGroup",
127 "DeleteKeyGroup",
128 "ListKeyGroups",
129 "CreateKeyValueStore",
130 "DescribeKeyValueStore",
131 "UpdateKeyValueStore",
132 "DeleteKeyValueStore",
133 "ListKeyValueStores",
134 "CreateCloudFrontOriginAccessIdentity",
135 "GetCloudFrontOriginAccessIdentity",
136 "GetCloudFrontOriginAccessIdentityConfig",
137 "UpdateCloudFrontOriginAccessIdentity",
138 "DeleteCloudFrontOriginAccessIdentity",
139 "ListCloudFrontOriginAccessIdentities",
140 "CreateMonitoringSubscription",
141 "GetMonitoringSubscription",
142 "DeleteMonitoringSubscription",
143 "CreateStreamingDistribution",
144 "CreateStreamingDistributionWithTags",
145 "GetStreamingDistribution",
146 "GetStreamingDistributionConfig",
147 "UpdateStreamingDistribution",
148 "DeleteStreamingDistribution",
149 "ListStreamingDistributions",
150 "CreateFieldLevelEncryptionConfig",
151 "GetFieldLevelEncryption",
152 "GetFieldLevelEncryptionConfig",
153 "UpdateFieldLevelEncryptionConfig",
154 "DeleteFieldLevelEncryptionConfig",
155 "ListFieldLevelEncryptionConfigs",
156 "CreateFieldLevelEncryptionProfile",
157 "GetFieldLevelEncryptionProfile",
158 "GetFieldLevelEncryptionProfileConfig",
159 "UpdateFieldLevelEncryptionProfile",
160 "DeleteFieldLevelEncryptionProfile",
161 "ListFieldLevelEncryptionProfiles",
162 "CreateRealtimeLogConfig",
163 "GetRealtimeLogConfig",
164 "UpdateRealtimeLogConfig",
165 "DeleteRealtimeLogConfig",
166 "ListRealtimeLogConfigs",
167 "CreateVpcOrigin",
168 "GetVpcOrigin",
169 "UpdateVpcOrigin",
170 "DeleteVpcOrigin",
171 "ListVpcOrigins",
172 "CreateAnycastIpList",
173 "GetAnycastIpList",
174 "UpdateAnycastIpList",
175 "DeleteAnycastIpList",
176 "ListAnycastIpLists",
177 "CreateTrustStore",
178 "GetTrustStore",
179 "UpdateTrustStore",
180 "DeleteTrustStore",
181 "ListTrustStores",
182 "GetResourcePolicy",
183 "PutResourcePolicy",
184 "DeleteResourcePolicy",
185 "CreateConnectionGroup",
186 "GetConnectionGroup",
187 "GetConnectionGroupByRoutingEndpoint",
188 "UpdateConnectionGroup",
189 "DeleteConnectionGroup",
190 "ListConnectionGroups",
191 "ListDomainConflicts",
192 "UpdateDomainAssociation",
193 "VerifyDnsConfiguration",
194 "GetManagedCertificateDetails",
195 "UpdateDistributionWithStagingConfig",
196 "CreateDistributionTenant",
197 "GetDistributionTenant",
198 "GetDistributionTenantByDomain",
199 "UpdateDistributionTenant",
200 "DeleteDistributionTenant",
201 "ListDistributionTenants",
202 "ListDistributionTenantsByCustomization",
203 "AssociateDistributionTenantWebACL",
204 "DisassociateDistributionTenantWebACL",
205 "CreateInvalidationForDistributionTenant",
206 "GetInvalidationForDistributionTenant",
207 "ListInvalidationsForDistributionTenant",
208 "CreateConnectionFunction",
209 "GetConnectionFunction",
210 "DescribeConnectionFunction",
211 "UpdateConnectionFunction",
212 "DeleteConnectionFunction",
213 "ListConnectionFunctions",
214 "PublishConnectionFunction",
215 "TestConnectionFunction",
216];
217
218pub struct CloudFrontService {
219 pub(crate) state: SharedCloudFrontState,
220 pub(crate) propagation_delay: std::time::Duration,
230 pub(crate) snapshot_store: Option<Arc<dyn SnapshotStore>>,
231 pub(crate) snapshot_lock: Arc<AsyncMutex<()>>,
232}
233
234fn default_propagation_delay() -> std::time::Duration {
241 let Ok(raw) = std::env::var("FAKECLOUD_CLOUDFRONT_STATUS_DELAY_SEC") else {
242 return std::time::Duration::from_secs(1);
243 };
244 let trimmed = raw.trim();
245 if let Ok(secs) = trimmed.parse::<u64>() {
246 return std::time::Duration::from_secs(secs);
247 }
248 if let Ok(secs) = trimmed.parse::<f64>() {
249 if secs.is_finite() && secs >= 0.0 {
250 return std::time::Duration::from_secs_f64(secs);
251 }
252 }
253 std::time::Duration::from_secs(1)
254}
255
256impl CloudFrontService {
257 pub fn new(state: SharedCloudFrontState) -> Self {
258 Self {
259 state,
260 propagation_delay: default_propagation_delay(),
261 snapshot_store: None,
262 snapshot_lock: Arc::new(AsyncMutex::new(())),
263 }
264 }
265
266 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
267 self.snapshot_store = Some(store);
268 self
269 }
270
271 pub fn shared_state(&self) -> SharedCloudFrontState {
272 Arc::clone(&self.state)
273 }
274
275 async fn save_snapshot(&self) {
279 save_cloudfront_snapshot(
280 &self.state,
281 self.snapshot_store.clone(),
282 &self.snapshot_lock,
283 )
284 .await;
285 }
286
287 pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
292 let store = self.snapshot_store.clone()?;
293 let state = self.state.clone();
294 let lock = self.snapshot_lock.clone();
295 Some(Arc::new(move || {
296 let state = state.clone();
297 let store = store.clone();
298 let lock = lock.clone();
299 Box::pin(async move {
300 save_cloudfront_snapshot(&state, Some(store), &lock).await;
301 })
302 }))
303 }
304
305 pub fn rearm_in_progress(&self) {
310 let (dists, tenants, groups, streaming) = {
311 let s = self.state.read();
312 let mut dists = Vec::new();
313 let mut tenants = Vec::new();
314 let mut groups = Vec::new();
315 let mut streaming = Vec::new();
316 for account in s.accounts.values() {
317 for (id, d) in &account.distributions {
318 if d.status == "InProgress" {
319 dists.push(id.clone());
320 }
321 }
322 for (id, t) in &account.distribution_tenants {
323 if t.status == "InProgress" {
324 tenants.push(id.clone());
325 }
326 }
327 for (id, g) in &account.connection_groups {
328 if g.status == "InProgress" {
329 groups.push(id.clone());
330 }
331 }
332 for (id, d) in &account.streaming_distributions {
333 if d.status == "InProgress" {
334 streaming.push(id.clone());
335 }
336 }
337 }
338 (dists, tenants, groups, streaming)
339 };
340 for id in dists {
341 self.schedule_distribution_deploy(id);
342 }
343 for id in tenants {
344 self.schedule_distribution_tenant_deploy(id);
345 }
346 for id in groups {
347 self.schedule_connection_group_deploy(id);
348 }
349 for id in streaming {
350 self.schedule_streaming_distribution_deploy(id);
351 }
352 }
353
354 pub fn with_propagation_delay(mut self, delay: std::time::Duration) -> Self {
360 self.propagation_delay = delay;
361 self
362 }
363
364 pub fn set_distribution_status(&self, id: &str, status: &str) -> bool {
370 let mut state = self.state.write();
371 for account in state.accounts.values_mut() {
372 if let Some(dist) = account.distributions.get_mut(id) {
373 dist.status = status.to_string();
374 return true;
375 }
376 }
377 false
378 }
379
380 pub async fn set_distribution_status_persistent(&self, id: &str, status: &str) -> bool {
383 let changed = self.set_distribution_status(id, status);
384 if changed {
385 self.save_snapshot().await;
386 }
387 changed
388 }
389}
390
391pub async fn save_cloudfront_snapshot(
397 state: &SharedCloudFrontState,
398 store: Option<Arc<dyn SnapshotStore>>,
399 lock: &AsyncMutex<()>,
400) {
401 let Some(store) = store else {
402 return;
403 };
404 let _guard = lock.lock().await;
405 let snapshot = CloudFrontSnapshot {
406 schema_version: CLOUDFRONT_SNAPSHOT_SCHEMA_VERSION,
407 accounts: Some(state.read().clone()),
408 };
409 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
410 let bytes = serde_json::to_vec(&snapshot)
411 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
412 store.save(&bytes)
413 })
414 .await;
415 match join {
416 Ok(Ok(())) => {}
417 Ok(Err(err)) => tracing::error!(%err, "failed to write cloudfront snapshot"),
418 Err(err) => tracing::error!(%err, "cloudfront snapshot task panicked"),
419 }
420}
421
422impl Default for CloudFrontService {
423 fn default() -> Self {
424 Self::new(Arc::new(RwLock::new(CloudFrontAccounts::new())))
425 }
426}
427
428#[async_trait]
429impl AwsService for CloudFrontService {
430 fn service_name(&self) -> &str {
431 "cloudfront"
432 }
433
434 fn supported_actions(&self) -> &[&str] {
435 SUPPORTED_ACTIONS
436 }
437
438 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
439 let resolved = match route(&req.method, &req.raw_path, &req.raw_query) {
440 Some(r) => r,
441 None => {
442 return Err(aws_error(
443 StatusCode::NOT_FOUND,
444 "InvalidArgument",
445 format!("Unknown CloudFront route: {} {}", req.method, req.raw_path),
446 ));
447 }
448 };
449
450 let mutates = is_mutating_action(resolved.action);
451 let result = match resolved.action {
452 "CreateDistribution" => self.create_distribution(&req, false),
453 "CreateDistributionWithTags" => self.create_distribution(&req, true),
454 "GetDistribution" => self.get_distribution(&resolved),
455 "GetDistributionConfig" => self.get_distribution_config(&resolved),
456 "UpdateDistribution" => self.update_distribution(&req, &resolved),
457 "DeleteDistribution" => self.delete_distribution(&req, &resolved),
458 "ListDistributions" => self.list_distributions(&req),
459 "CopyDistribution" => self.copy_distribution(&req, &resolved),
460 "CreateInvalidation" => self.create_invalidation(&req, &resolved),
461 "GetInvalidation" => self.get_invalidation(&resolved),
462 "ListInvalidations" => self.list_invalidations(&resolved),
463 "TagResource" => self.tag_resource(&req),
464 "UntagResource" => self.untag_resource(&req),
465 "ListTagsForResource" => self.list_tags_for_resource(&req),
466 "AssociateAlias" => self.associate_alias(&req, &resolved),
467 "ListConflictingAliases" => self.list_conflicting_aliases(&req),
468 "AssociateDistributionWebACL" => self.associate_web_acl(&req, &resolved),
469 "DisassociateDistributionWebACL" => self.disassociate_web_acl(&req, &resolved),
470 "ListDistributionsByCachePolicyId"
471 | "ListDistributionsByOriginRequestPolicyId"
472 | "ListDistributionsByResponseHeadersPolicyId"
473 | "ListDistributionsByKeyGroup"
474 | "ListDistributionsByWebACLId"
475 | "ListDistributionsByVpcOriginId"
476 | "ListDistributionsByAnycastIpListId"
477 | "ListDistributionsByConnectionMode"
478 | "ListDistributionsByConnectionFunction"
479 | "ListDistributionsByOwnedResource"
480 | "ListDistributionsByTrustStore"
481 | "ListDistributionsByRealtimeLogConfig" => {
482 self.list_distributions_by(&req, &resolved, resolved.action)
483 }
484 "CreateOriginAccessControl" => self.create_origin_access_control(&req),
485 "GetOriginAccessControl" => self.get_origin_access_control(&resolved),
486 "GetOriginAccessControlConfig" => self.get_origin_access_control_config(&resolved),
487 "UpdateOriginAccessControl" => self.update_origin_access_control(&req, &resolved),
488 "DeleteOriginAccessControl" => self.delete_origin_access_control(&req, &resolved),
489 "ListOriginAccessControls" => self.list_origin_access_controls(&req),
490 "CreateCachePolicy" => self.create_cache_policy(&req),
491 "GetCachePolicy" => self.get_cache_policy(&resolved),
492 "GetCachePolicyConfig" => self.get_cache_policy_config(&resolved),
493 "UpdateCachePolicy" => self.update_cache_policy(&req, &resolved),
494 "DeleteCachePolicy" => self.delete_cache_policy(&req, &resolved),
495 "ListCachePolicies" => self.list_cache_policies(&req),
496 "CreateOriginRequestPolicy" => self.create_origin_request_policy(&req),
497 "GetOriginRequestPolicy" => self.get_origin_request_policy(&resolved),
498 "GetOriginRequestPolicyConfig" => self.get_origin_request_policy_config(&resolved),
499 "UpdateOriginRequestPolicy" => self.update_origin_request_policy(&req, &resolved),
500 "DeleteOriginRequestPolicy" => self.delete_origin_request_policy(&req, &resolved),
501 "ListOriginRequestPolicies" => self.list_origin_request_policies(&req),
502 "CreateResponseHeadersPolicy" => self.create_response_headers_policy(&req),
503 "GetResponseHeadersPolicy" => self.get_response_headers_policy(&resolved),
504 "GetResponseHeadersPolicyConfig" => self.get_response_headers_policy_config(&resolved),
505 "UpdateResponseHeadersPolicy" => self.update_response_headers_policy(&req, &resolved),
506 "DeleteResponseHeadersPolicy" => self.delete_response_headers_policy(&req, &resolved),
507 "ListResponseHeadersPolicies" => self.list_response_headers_policies(&req),
508 "CreateContinuousDeploymentPolicy" => self.create_continuous_deployment_policy(&req),
509 "GetContinuousDeploymentPolicy" => self.get_continuous_deployment_policy(&resolved),
510 "GetContinuousDeploymentPolicyConfig" => {
511 self.get_continuous_deployment_policy_config(&resolved)
512 }
513 "UpdateContinuousDeploymentPolicy" => {
514 self.update_continuous_deployment_policy(&req, &resolved)
515 }
516 "DeleteContinuousDeploymentPolicy" => {
517 self.delete_continuous_deployment_policy(&req, &resolved)
518 }
519 "ListContinuousDeploymentPolicies" => self.list_continuous_deployment_policies(&req),
520 "CreateFunction" => self.create_function(&req),
521 "DescribeFunction" => self.describe_function(&req, &resolved),
522 "GetFunction" => self.get_function(&req, &resolved),
523 "UpdateFunction" => self.update_function(&req, &resolved),
524 "DeleteFunction" => self.delete_function(&req, &resolved),
525 "ListFunctions" => self.list_functions(&req),
526 "PublishFunction" => self.publish_function(&req, &resolved),
527 "TestFunction" => self.test_function(&req, &resolved),
528 "CreatePublicKey" => self.create_public_key(&req),
529 "GetPublicKey" => self.get_public_key(&resolved),
530 "GetPublicKeyConfig" => self.get_public_key_config(&resolved),
531 "UpdatePublicKey" => self.update_public_key(&req, &resolved),
532 "DeletePublicKey" => self.delete_public_key(&req, &resolved),
533 "ListPublicKeys" => self.list_public_keys(&req),
534 "CreateKeyGroup" => self.create_key_group(&req),
535 "GetKeyGroup" => self.get_key_group(&resolved),
536 "GetKeyGroupConfig" => self.get_key_group_config(&resolved),
537 "UpdateKeyGroup" => self.update_key_group(&req, &resolved),
538 "DeleteKeyGroup" => self.delete_key_group(&req, &resolved),
539 "ListKeyGroups" => self.list_key_groups(&req),
540 "CreateKeyValueStore" => self.create_key_value_store(&req),
541 "DescribeKeyValueStore" => self.describe_key_value_store(&resolved),
542 "UpdateKeyValueStore" => self.update_key_value_store(&req, &resolved),
543 "DeleteKeyValueStore" => self.delete_key_value_store(&req, &resolved),
544 "ListKeyValueStores" => self.list_key_value_stores(&req),
545 "CreateCloudFrontOriginAccessIdentity" => self.create_oai(&req),
546 "GetCloudFrontOriginAccessIdentity" => self.get_oai(&resolved),
547 "GetCloudFrontOriginAccessIdentityConfig" => self.get_oai_config(&resolved),
548 "UpdateCloudFrontOriginAccessIdentity" => self.update_oai(&req, &resolved),
549 "DeleteCloudFrontOriginAccessIdentity" => self.delete_oai(&req, &resolved),
550 "ListCloudFrontOriginAccessIdentities" => self.list_oai(&req),
551 "CreateMonitoringSubscription" => self.create_monitoring_subscription(&req, &resolved),
552 "GetMonitoringSubscription" => self.get_monitoring_subscription(&resolved),
553 "DeleteMonitoringSubscription" => self.delete_monitoring_subscription(&resolved),
554 "CreateStreamingDistribution" => self.create_streaming_distribution(&req, false),
555 "CreateStreamingDistributionWithTags" => self.create_streaming_distribution(&req, true),
556 "GetStreamingDistribution" => self.get_streaming_distribution(&resolved),
557 "GetStreamingDistributionConfig" => self.get_streaming_distribution_config(&resolved),
558 "UpdateStreamingDistribution" => self.update_streaming_distribution(&req, &resolved),
559 "DeleteStreamingDistribution" => self.delete_streaming_distribution(&req, &resolved),
560 "ListStreamingDistributions" => self.list_streaming_distributions(&req),
561 "CreateFieldLevelEncryptionConfig" => self.create_field_level_encryption_config(&req),
562 "GetFieldLevelEncryption" => self.get_field_level_encryption(&resolved),
563 "GetFieldLevelEncryptionConfig" => self.get_field_level_encryption_config(&resolved),
564 "UpdateFieldLevelEncryptionConfig" => {
565 self.update_field_level_encryption_config(&req, &resolved)
566 }
567 "DeleteFieldLevelEncryptionConfig" => {
568 self.delete_field_level_encryption_config(&req, &resolved)
569 }
570 "ListFieldLevelEncryptionConfigs" => self.list_field_level_encryption_configs(&req),
571 "CreateFieldLevelEncryptionProfile" => self.create_field_level_encryption_profile(&req),
572 "GetFieldLevelEncryptionProfile" => self.get_field_level_encryption_profile(&resolved),
573 "GetFieldLevelEncryptionProfileConfig" => {
574 self.get_field_level_encryption_profile_config(&resolved)
575 }
576 "UpdateFieldLevelEncryptionProfile" => {
577 self.update_field_level_encryption_profile(&req, &resolved)
578 }
579 "DeleteFieldLevelEncryptionProfile" => {
580 self.delete_field_level_encryption_profile(&req, &resolved)
581 }
582 "ListFieldLevelEncryptionProfiles" => self.list_field_level_encryption_profiles(&req),
583 "CreateRealtimeLogConfig" => self.create_realtime_log_config(&req),
584 "GetRealtimeLogConfig" => self.get_realtime_log_config(&req),
585 "UpdateRealtimeLogConfig" => self.update_realtime_log_config(&req),
586 "DeleteRealtimeLogConfig" => self.delete_realtime_log_config(&req),
587 "ListRealtimeLogConfigs" => self.list_realtime_log_configs(&req),
588 "CreateVpcOrigin" => self.create_vpc_origin(&req),
589 "GetVpcOrigin" => self.get_vpc_origin(&resolved),
590 "UpdateVpcOrigin" => self.update_vpc_origin(&req, &resolved),
591 "DeleteVpcOrigin" => self.delete_vpc_origin(&req, &resolved),
592 "ListVpcOrigins" => self.list_vpc_origins(&req),
593 "CreateAnycastIpList" => self.create_anycast_ip_list(&req),
594 "GetAnycastIpList" => self.get_anycast_ip_list(&resolved),
595 "UpdateAnycastIpList" => self.update_anycast_ip_list(&req, &resolved),
596 "DeleteAnycastIpList" => self.delete_anycast_ip_list(&req, &resolved),
597 "ListAnycastIpLists" => self.list_anycast_ip_lists(&req),
598 "CreateTrustStore" => self.create_trust_store(&req),
599 "GetTrustStore" => self.get_trust_store(&resolved),
600 "UpdateTrustStore" => self.update_trust_store(&req, &resolved),
601 "DeleteTrustStore" => self.delete_trust_store(&req, &resolved),
602 "ListTrustStores" => self.list_trust_stores(&req),
603 "GetResourcePolicy" => self.get_resource_policy(&req),
604 "PutResourcePolicy" => self.put_resource_policy(&req),
605 "DeleteResourcePolicy" => self.delete_resource_policy(&req),
606 "CreateConnectionGroup" => self.create_connection_group(&req),
607 "GetConnectionGroup" => self.get_connection_group(&resolved),
608 "GetConnectionGroupByRoutingEndpoint" => {
609 self.get_connection_group_by_routing_endpoint(&req)
610 }
611 "UpdateConnectionGroup" => self.update_connection_group(&req, &resolved),
612 "DeleteConnectionGroup" => self.delete_connection_group(&req, &resolved),
613 "ListConnectionGroups" => self.list_connection_groups(&req),
614 "ListDomainConflicts" => self.list_domain_conflicts(&req),
615 "UpdateDomainAssociation" => self.update_domain_association(&req),
616 "VerifyDnsConfiguration" => self.verify_dns_configuration(&req),
617 "GetManagedCertificateDetails" => self.get_managed_certificate_details(&resolved),
618 "UpdateDistributionWithStagingConfig" => {
619 self.update_distribution_with_staging_config(&req, &resolved)
620 }
621 "CreateDistributionTenant" => self.create_distribution_tenant(&req),
622 "GetDistributionTenant" => self.get_distribution_tenant(&resolved),
623 "GetDistributionTenantByDomain" => self.get_distribution_tenant_by_domain(&req),
624 "UpdateDistributionTenant" => self.update_distribution_tenant(&req, &resolved),
625 "DeleteDistributionTenant" => self.delete_distribution_tenant(&req, &resolved),
626 "ListDistributionTenants" => self.list_distribution_tenants(&req),
627 "ListDistributionTenantsByCustomization" => {
628 self.list_distribution_tenants_by_customization(&req)
629 }
630 "AssociateDistributionTenantWebACL" => {
631 self.associate_distribution_tenant_web_acl(&req, &resolved)
632 }
633 "DisassociateDistributionTenantWebACL" => {
634 self.disassociate_distribution_tenant_web_acl(&req, &resolved)
635 }
636 "CreateInvalidationForDistributionTenant" => {
637 self.create_invalidation_for_distribution_tenant(&req, &resolved)
638 }
639 "GetInvalidationForDistributionTenant" => {
640 self.get_invalidation_for_distribution_tenant(&resolved)
641 }
642 "ListInvalidationsForDistributionTenant" => {
643 self.list_invalidations_for_distribution_tenant(&resolved)
644 }
645 "CreateConnectionFunction" => self.create_connection_function(&req),
646 "GetConnectionFunction" => self.get_connection_function(&resolved),
647 "DescribeConnectionFunction" => self.describe_connection_function(&resolved),
648 "UpdateConnectionFunction" => self.update_connection_function(&req, &resolved),
649 "DeleteConnectionFunction" => self.delete_connection_function(&req, &resolved),
650 "ListConnectionFunctions" => self.list_connection_functions(&req),
651 "PublishConnectionFunction" => self.publish_connection_function(&req, &resolved),
652 "TestConnectionFunction" => self.test_connection_function(&req, &resolved),
653 other => Err(aws_error(
654 StatusCode::NOT_IMPLEMENTED,
655 "InvalidAction",
656 format!("CloudFront action {other} is not implemented yet"),
657 )),
658 };
659 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
660 self.save_snapshot().await;
661 }
662 result
663 }
664}
665
666impl CloudFrontService {
669 fn create_distribution(
670 &self,
671 req: &AwsRequest,
672 with_tags: bool,
673 ) -> Result<AwsResponse, AwsServiceError> {
674 let (config, tags) = if with_tags {
675 let parsed: DistributionConfigWithTags = xml_io::from_xml_root(&req.body)
676 .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
677 let tags = parsed
678 .tags
679 .items
680 .map(|i| {
681 i.tag
682 .into_iter()
683 .map(|t| Tag {
684 key: t.key,
685 value: t.value,
686 })
687 .collect()
688 })
689 .unwrap_or_default();
690 (parsed.distribution_config, tags)
691 } else {
692 let parsed: DistributionConfig = xml_io::from_xml_root(&req.body)
693 .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
694 (parsed, Vec::new())
695 };
696
697 validate_caller_reference(&config.caller_reference)?;
698 validate_origins(&config)?;
699
700 let mut state = self.state.write();
701 let account = state.entry(account_id(req));
702
703 if let Some(existing) = account
704 .distributions
705 .values()
706 .find(|d| d.config.caller_reference == config.caller_reference)
707 {
708 return Err(aws_error(
709 StatusCode::CONFLICT,
710 "DistributionAlreadyExists",
711 format!(
712 "Distribution with the same CallerReference exists: {}",
713 existing.id
714 ),
715 ));
716 }
717
718 let id = generate_distribution_id();
719 let now = Utc::now();
720 let etag = generate_etag();
721 let domain = format!("{}.cloudfront.net", id.to_lowercase());
722 let arn = format!(
723 "arn:aws:cloudfront::{}:distribution/{}",
724 account_id(req),
725 id
726 );
727
728 let stored = StoredDistribution {
729 id: id.clone(),
730 arn: arn.clone(),
731 status: "InProgress".to_string(),
735 last_modified_time: now,
736 domain_name: domain,
737 in_progress_invalidation_batches: 0,
738 etag: etag.clone(),
739 config,
740 };
741 account.distributions.insert(id.clone(), stored.clone());
742 if !tags.is_empty() {
743 account.tags.insert(arn.clone(), tags);
744 }
745 drop(state);
746
747 self.schedule_distribution_deploy(id.clone());
748
749 let body = build_distribution_xml(&stored);
750 let mut headers = HeaderMap::new();
751 set_header(&mut headers, ETAG, &etag);
752 set_header(&mut headers, LOCATION, &stored.arn);
753 Ok(xml_response(StatusCode::CREATED, body, headers))
754 }
755
756 fn schedule_distribution_deploy(&self, id: String) {
761 let state = Arc::clone(&self.state);
762 let delay = self.propagation_delay;
763 let store = self.snapshot_store.clone();
764 let lock = self.snapshot_lock.clone();
765 tokio::spawn(async move {
766 tokio::time::sleep(delay).await;
767 let flipped = {
768 let mut s = state.write();
769 let mut flipped = false;
770 for account in s.accounts.values_mut() {
771 if let Some(d) = account.distributions.get_mut(&id) {
772 if d.status == "InProgress" {
773 d.status = "Deployed".to_string();
774 flipped = true;
775 }
776 break;
777 }
778 }
779 flipped
780 };
781 if flipped {
782 save_cloudfront_snapshot(&state, store, &lock).await;
783 }
784 });
785 }
786
787 pub(crate) fn schedule_distribution_tenant_deploy(&self, id: String) {
789 let state = Arc::clone(&self.state);
790 let delay = self.propagation_delay;
791 let store = self.snapshot_store.clone();
792 let lock = self.snapshot_lock.clone();
793 tokio::spawn(async move {
794 tokio::time::sleep(delay).await;
795 let flipped = {
796 let mut s = state.write();
797 let mut flipped = false;
798 for account in s.accounts.values_mut() {
799 if let Some(t) = account.distribution_tenants.get_mut(&id) {
800 if t.status == "InProgress" {
801 t.status = "Deployed".to_string();
802 flipped = true;
803 }
804 break;
805 }
806 }
807 flipped
808 };
809 if flipped {
810 save_cloudfront_snapshot(&state, store, &lock).await;
811 }
812 });
813 }
814
815 pub(crate) fn schedule_connection_group_deploy(&self, id: String) {
817 let state = Arc::clone(&self.state);
818 let delay = self.propagation_delay;
819 let store = self.snapshot_store.clone();
820 let lock = self.snapshot_lock.clone();
821 tokio::spawn(async move {
822 tokio::time::sleep(delay).await;
823 let flipped = {
824 let mut s = state.write();
825 let mut flipped = false;
826 for account in s.accounts.values_mut() {
827 if let Some(g) = account.connection_groups.get_mut(&id) {
828 if g.status == "InProgress" {
829 g.status = "Deployed".to_string();
830 flipped = true;
831 }
832 break;
833 }
834 }
835 flipped
836 };
837 if flipped {
838 save_cloudfront_snapshot(&state, store, &lock).await;
839 }
840 });
841 }
842
843 pub(crate) fn schedule_streaming_distribution_deploy(&self, id: String) {
848 let state = Arc::clone(&self.state);
849 let delay = self.propagation_delay;
850 let store = self.snapshot_store.clone();
851 let lock = self.snapshot_lock.clone();
852 tokio::spawn(async move {
853 tokio::time::sleep(delay).await;
854 let flipped = {
855 let mut s = state.write();
856 let mut flipped = false;
857 for account in s.accounts.values_mut() {
858 if let Some(d) = account.streaming_distributions.get_mut(&id) {
859 if d.status == "InProgress" {
860 d.status = "Deployed".to_string();
861 flipped = true;
862 }
863 break;
864 }
865 }
866 flipped
867 };
868 if flipped {
869 save_cloudfront_snapshot(&state, store, &lock).await;
870 }
871 });
872 }
873
874 fn get_distribution(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
875 let id = route
876 .id
877 .as_deref()
878 .ok_or_else(|| invalid_argument("missing distribution id"))?;
879 let state = self.state.read();
880 let account = state
881 .accounts
882 .get(DEFAULT_ACCOUNT)
883 .ok_or_else(|| no_such_distribution(id))?;
884 let dist = account
885 .distributions
886 .get(id)
887 .ok_or_else(|| no_such_distribution(id))?
888 .clone();
889 drop(state);
890 let body = build_distribution_xml(&dist);
891 let mut headers = HeaderMap::new();
892 set_header(&mut headers, ETAG, &dist.etag);
893 Ok(xml_response(StatusCode::OK, body, headers))
894 }
895
896 fn get_distribution_config(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
897 let id = route
898 .id
899 .as_deref()
900 .ok_or_else(|| invalid_argument("missing distribution id"))?;
901 let state = self.state.read();
902 let account = state
903 .accounts
904 .get(DEFAULT_ACCOUNT)
905 .ok_or_else(|| no_such_distribution(id))?;
906 let dist = account
907 .distributions
908 .get(id)
909 .ok_or_else(|| no_such_distribution(id))?
910 .clone();
911 drop(state);
912 let body = xml_io::to_xml_root("DistributionConfig", &dist.config)
913 .map_err(|e| internal_error(format!("xml encode failed: {e}")))?;
914 let mut headers = HeaderMap::new();
915 set_header(&mut headers, ETAG, &dist.etag);
916 Ok(xml_response(StatusCode::OK, body, headers))
917 }
918
919 fn update_distribution(
920 &self,
921 req: &AwsRequest,
922 route: &Route,
923 ) -> Result<AwsResponse, AwsServiceError> {
924 let id = route
925 .id
926 .as_deref()
927 .ok_or_else(|| invalid_argument("missing distribution id"))?;
928 let if_match = req
929 .headers
930 .get(IF_MATCH)
931 .and_then(|v| v.to_str().ok())
932 .ok_or_else(|| {
933 aws_error(
934 StatusCode::BAD_REQUEST,
935 "InvalidIfMatchVersion",
936 "Missing If-Match header for UpdateDistribution",
937 )
938 })?
939 .to_string();
940 let new_config: DistributionConfig = xml_io::from_xml_root(&req.body)
941 .map_err(|e| invalid_argument(format!("invalid DistributionConfig XML: {e}")))?;
942 validate_caller_reference(&new_config.caller_reference)?;
943 validate_origins(&new_config)?;
944
945 let mut state = self.state.write();
946 let account = state
947 .accounts
948 .get_mut(DEFAULT_ACCOUNT)
949 .ok_or_else(|| no_such_distribution(id))?;
950 let dist = account
951 .distributions
952 .get_mut(id)
953 .ok_or_else(|| no_such_distribution(id))?;
954 if dist.etag != if_match {
955 return Err(aws_error(
956 StatusCode::PRECONDITION_FAILED,
957 "PreconditionFailed",
958 "If-Match header does not match the current ETag",
959 ));
960 }
961 let config_changed = !configs_equal(&dist.config, &new_config);
966 if config_changed {
967 dist.config = new_config;
968 dist.etag = generate_etag();
969 dist.last_modified_time = Utc::now();
970 dist.status = "InProgress".to_string();
974 }
975 let snapshot = dist.clone();
976 drop(state);
977
978 if config_changed {
979 self.schedule_distribution_deploy(id.to_string());
980 }
981
982 let body = build_distribution_xml(&snapshot);
983 let mut headers = HeaderMap::new();
984 set_header(&mut headers, ETAG, &snapshot.etag);
985 Ok(xml_response(StatusCode::OK, body, headers))
986 }
987
988 fn delete_distribution(
989 &self,
990 req: &AwsRequest,
991 route: &Route,
992 ) -> Result<AwsResponse, AwsServiceError> {
993 let id = route
994 .id
995 .as_deref()
996 .ok_or_else(|| invalid_argument("missing distribution id"))?;
997 let if_match = req
998 .headers
999 .get(IF_MATCH)
1000 .and_then(|v| v.to_str().ok())
1001 .ok_or_else(|| {
1002 aws_error(
1003 StatusCode::BAD_REQUEST,
1004 "InvalidIfMatchVersion",
1005 "Missing If-Match header for DeleteDistribution",
1006 )
1007 })?
1008 .to_string();
1009 let mut state = self.state.write();
1010 let account = state
1011 .accounts
1012 .get_mut(DEFAULT_ACCOUNT)
1013 .ok_or_else(|| no_such_distribution(id))?;
1014 {
1015 let dist = account
1016 .distributions
1017 .get(id)
1018 .ok_or_else(|| no_such_distribution(id))?;
1019 if dist.etag != if_match {
1020 return Err(aws_error(
1021 StatusCode::PRECONDITION_FAILED,
1022 "PreconditionFailed",
1023 "If-Match header does not match the current ETag",
1024 ));
1025 }
1026 if dist.config.enabled {
1027 return Err(aws_error(
1028 StatusCode::PRECONDITION_FAILED,
1029 "DistributionNotDisabled",
1030 "Distribution must be disabled before delete",
1031 ));
1032 }
1033 }
1034 let removed = account.distributions.remove(id).unwrap();
1035 account.tags.remove(&removed.arn);
1036 Ok(empty_response(StatusCode::NO_CONTENT))
1037 }
1038
1039 fn list_distributions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1040 let state = self.state.read();
1041 let mut dists: Vec<StoredDistribution> = state
1042 .accounts
1043 .values()
1044 .flat_map(|a| a.distributions.values().cloned())
1045 .collect();
1046 drop(state);
1047 dists.sort_by_key(|a| a.last_modified_time);
1048
1049 let (marker, max_items, page, next_marker) = paginate_distributions(req, dists);
1052
1053 let body = build_distribution_list_xml(
1054 &page,
1055 "DistributionList",
1056 &marker,
1057 max_items,
1058 next_marker.as_deref(),
1059 );
1060 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1061 }
1062
1063 fn list_distributions_by(
1064 &self,
1065 req: &AwsRequest,
1066 route: &Route,
1067 action: &str,
1068 ) -> Result<AwsResponse, AwsServiceError> {
1069 match action {
1076 "ListDistributionsByCachePolicyId"
1080 | "ListDistributionsByOriginRequestPolicyId"
1081 | "ListDistributionsByResponseHeadersPolicyId"
1082 | "ListDistributionsByKeyGroup"
1083 | "ListDistributionsByWebACLId"
1084 | "ListDistributionsByVpcOriginId"
1085 | "ListDistributionsByAnycastIpListId"
1086 | "ListDistributionsByOwnedResource" => {
1087 let id = route.id.as_deref().unwrap_or("");
1088 if is_placeholder_label(id) {
1089 return Err(invalid_argument(format!(
1090 "Required URL identifier for {action} is missing or invalid"
1091 )));
1092 }
1093 }
1094 "ListDistributionsByConnectionMode" => {
1095 let id = route.id.as_deref().unwrap_or("");
1096 if is_placeholder_label(id) {
1097 return Err(invalid_argument(
1098 "ConnectionMode is required for ListDistributionsByConnectionMode",
1099 ));
1100 }
1101 if id != "direct" && id != "tenant-only" {
1102 return Err(invalid_argument(format!(
1103 "ConnectionMode must be 'direct' or 'tenant-only', got '{id}'"
1104 )));
1105 }
1106 }
1107 "ListDistributionsByConnectionFunction"
1108 if parse_query_value(&req.raw_query, "ConnectionFunctionIdentifier").is_none() =>
1109 {
1110 return Err(invalid_argument(
1111 "ConnectionFunctionIdentifier query parameter is required",
1112 ));
1113 }
1114 "ListDistributionsByTrustStore"
1115 if parse_query_value(&req.raw_query, "TrustStoreIdentifier").is_none() =>
1116 {
1117 return Err(invalid_argument(
1118 "TrustStoreIdentifier query parameter is required",
1119 ));
1120 }
1121 _ => {}
1122 }
1123
1124 let root = match action {
1130 "ListDistributionsByCachePolicyId"
1131 | "ListDistributionsByOriginRequestPolicyId"
1132 | "ListDistributionsByResponseHeadersPolicyId"
1133 | "ListDistributionsByKeyGroup"
1134 | "ListDistributionsByVpcOriginId" => "DistributionIdList",
1135 "ListDistributionsByOwnedResource" => "DistributionIdOwnerList",
1136 _ => "DistributionList",
1137 };
1138
1139 let mut all: Vec<StoredDistribution> = {
1141 let state = self.state.read();
1142 state
1143 .accounts
1144 .values()
1145 .flat_map(|a| a.distributions.values().cloned())
1146 .collect()
1147 };
1148 all.sort_by_key(|d| d.last_modified_time);
1149
1150 let path_id = route.id.as_deref().unwrap_or("");
1151 let matched: Vec<StoredDistribution> = match action {
1152 "ListDistributionsByCachePolicyId" => all
1153 .into_iter()
1154 .filter(|d| distribution_uses_cache_policy(d, path_id))
1155 .collect(),
1156 "ListDistributionsByOriginRequestPolicyId" => all
1157 .into_iter()
1158 .filter(|d| distribution_uses_origin_request_policy(d, path_id))
1159 .collect(),
1160 "ListDistributionsByResponseHeadersPolicyId" => all
1161 .into_iter()
1162 .filter(|d| distribution_uses_response_headers_policy(d, path_id))
1163 .collect(),
1164 "ListDistributionsByKeyGroup" => all
1165 .into_iter()
1166 .filter(|d| distribution_uses_key_group(d, path_id))
1167 .collect(),
1168 "ListDistributionsByVpcOriginId" => all
1169 .into_iter()
1170 .filter(|d| distribution_uses_vpc_origin(d, path_id))
1171 .collect(),
1172 "ListDistributionsByWebACLId" if path_id == "null" => all
1175 .into_iter()
1176 .filter(|d| {
1177 d.config
1178 .web_acl_id
1179 .as_deref()
1180 .map(|w| w.is_empty())
1181 .unwrap_or(true)
1182 })
1183 .collect(),
1184 "ListDistributionsByWebACLId" => all
1185 .into_iter()
1186 .filter(|d| d.config.web_acl_id.as_deref() == Some(path_id))
1187 .collect(),
1188 "ListDistributionsByAnycastIpListId" => all
1189 .into_iter()
1190 .filter(|d| d.config.anycast_ip_list_id.as_deref() == Some(path_id))
1191 .collect(),
1192 _ => Vec::new(),
1193 };
1194
1195 let (marker, max_items, page, next_marker) = paginate_distributions(req, matched);
1198
1199 let body = if root == "DistributionIdList" {
1200 let ids: Vec<&str> = page.iter().map(|d| d.id.as_str()).collect();
1201 build_distribution_id_list_xml(&ids, &marker, max_items, next_marker.as_deref())
1202 } else if root == "DistributionList"
1203 && matches!(
1204 action,
1205 "ListDistributionsByWebACLId" | "ListDistributionsByAnycastIpListId"
1206 )
1207 {
1208 build_distribution_list_xml(
1209 &page,
1210 "DistributionList",
1211 &marker,
1212 max_items,
1213 next_marker.as_deref(),
1214 )
1215 } else {
1216 build_empty_distribution_id_list(root)
1217 };
1218 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1219 }
1220
1221 fn copy_distribution(
1222 &self,
1223 req: &AwsRequest,
1224 route: &Route,
1225 ) -> Result<AwsResponse, AwsServiceError> {
1226 let primary_id = route
1227 .id
1228 .as_deref()
1229 .ok_or_else(|| invalid_argument("missing primary distribution id"))?;
1230 let if_match = req
1231 .headers
1232 .get(IF_MATCH)
1233 .and_then(|v| v.to_str().ok())
1234 .ok_or_else(|| {
1235 aws_error(
1236 StatusCode::BAD_REQUEST,
1237 "InvalidIfMatchVersion",
1238 "Missing If-Match header for CopyDistribution",
1239 )
1240 })?
1241 .to_string();
1242 let parsed: CopyDistributionRequest = xml_io::from_xml_root(&req.body)
1243 .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
1244 validate_caller_reference(&parsed.caller_reference)?;
1245 let mut state = self.state.write();
1246 let account = state
1247 .accounts
1248 .get_mut(DEFAULT_ACCOUNT)
1249 .ok_or_else(|| no_such_distribution(primary_id))?;
1250 let primary = account
1251 .distributions
1252 .get(primary_id)
1253 .ok_or_else(|| no_such_distribution(primary_id))?
1254 .clone();
1255 if primary.etag != if_match {
1256 return Err(aws_error(
1257 StatusCode::PRECONDITION_FAILED,
1258 "PreconditionFailed",
1259 "If-Match header does not match the current ETag",
1260 ));
1261 }
1262 if account
1263 .distributions
1264 .values()
1265 .any(|d| d.config.caller_reference == parsed.caller_reference)
1266 {
1267 return Err(aws_error(
1268 StatusCode::CONFLICT,
1269 "DistributionAlreadyExists",
1270 "Distribution with the same CallerReference exists",
1271 ));
1272 }
1273 let new_id = generate_distribution_id();
1274 let mut config = primary.config.clone();
1275 config.caller_reference = parsed.caller_reference;
1276 config.enabled = parsed.enabled.unwrap_or(false);
1277 config.staging = parsed.staging;
1278 let now = Utc::now();
1279 let etag = generate_etag();
1280 let arn = format!(
1281 "arn:aws:cloudfront::{}:distribution/{}",
1282 account_id(req),
1283 new_id
1284 );
1285 let stored = StoredDistribution {
1286 id: new_id.clone(),
1287 arn: arn.clone(),
1288 status: "InProgress".to_string(),
1289 last_modified_time: now,
1290 domain_name: format!("{}.cloudfront.net", new_id.to_lowercase()),
1291 in_progress_invalidation_batches: 0,
1292 etag: etag.clone(),
1293 config,
1294 };
1295 account.distributions.insert(new_id.clone(), stored.clone());
1296 drop(state);
1297 self.schedule_distribution_deploy(new_id);
1298 let body = build_distribution_xml(&stored);
1299 let mut headers = HeaderMap::new();
1300 set_header(&mut headers, ETAG, &etag);
1301 set_header(&mut headers, LOCATION, &stored.arn);
1302 Ok(xml_response(StatusCode::CREATED, body, headers))
1303 }
1304}
1305
1306#[derive(Debug, serde::Deserialize, Default)]
1307#[serde(rename_all = "PascalCase")]
1308struct CopyDistributionRequest {
1309 caller_reference: String,
1310 #[serde(default)]
1311 enabled: Option<bool>,
1312 #[serde(default)]
1313 staging: Option<bool>,
1314}
1315
1316impl CloudFrontService {
1319 fn create_invalidation(
1320 &self,
1321 req: &AwsRequest,
1322 route: &Route,
1323 ) -> Result<AwsResponse, AwsServiceError> {
1324 let dist_id = route
1325 .id
1326 .as_deref()
1327 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1328 let batch: InvalidationBatch = xml_io::from_xml_root(&req.body)
1329 .map_err(|e| invalid_argument(format!("invalid InvalidationBatch XML: {e}")))?;
1330 if batch.caller_reference.is_empty() {
1331 return Err(invalid_argument("CallerReference is required"));
1332 }
1333 if batch.paths.quantity < 1 {
1334 return Err(invalid_argument(
1335 "InvalidationBatch.Paths must be non-empty",
1336 ));
1337 }
1338 let mut state = self.state.write();
1339 let account = state.entry(DEFAULT_ACCOUNT);
1340 if !account.distributions.contains_key(dist_id) {
1341 return Err(no_such_distribution(dist_id));
1342 }
1343 let id = generate_invalidation_id();
1344 let stored = StoredInvalidation {
1345 id: id.clone(),
1346 distribution_id: dist_id.to_string(),
1347 status: "Completed".to_string(),
1348 create_time: Utc::now(),
1349 batch: batch.clone(),
1350 };
1351 account.invalidations.insert(id.clone(), stored.clone());
1352 drop(state);
1353 let body = build_invalidation_xml(&stored);
1354 let mut headers = HeaderMap::new();
1355 set_header(
1356 &mut headers,
1357 LOCATION,
1358 &format!(
1359 "/2020-05-31/distribution/{dist_id}/invalidation/{}",
1360 stored.id
1361 ),
1362 );
1363 Ok(xml_response(StatusCode::CREATED, body, headers))
1364 }
1365
1366 fn get_invalidation(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
1367 let dist_id = route
1368 .id
1369 .as_deref()
1370 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1371 let inv_id = route
1372 .second_id
1373 .as_deref()
1374 .ok_or_else(|| invalid_argument("missing invalidation id"))?;
1375 let state = self.state.read();
1376 let account = state
1377 .accounts
1378 .get(DEFAULT_ACCOUNT)
1379 .ok_or_else(|| no_such_invalidation(inv_id))?;
1380 if !account.distributions.contains_key(dist_id) {
1381 return Err(no_such_distribution(dist_id));
1382 }
1383 let inv = account
1384 .invalidations
1385 .get(inv_id)
1386 .filter(|i| i.distribution_id == dist_id)
1387 .ok_or_else(|| no_such_invalidation(inv_id))?
1388 .clone();
1389 drop(state);
1390 let body = build_invalidation_xml(&inv);
1391 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1392 }
1393
1394 fn list_invalidations(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
1395 let dist_id = route
1396 .id
1397 .as_deref()
1398 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1399 let state = self.state.read();
1400 let account = state
1401 .accounts
1402 .get(DEFAULT_ACCOUNT)
1403 .ok_or_else(|| no_such_distribution(dist_id))?;
1404 if !account.distributions.contains_key(dist_id) {
1405 return Err(no_such_distribution(dist_id));
1406 }
1407 let mut items: Vec<&StoredInvalidation> = account
1408 .invalidations
1409 .values()
1410 .filter(|i| i.distribution_id == dist_id)
1411 .collect();
1412 items.sort_by_key(|a| a.create_time);
1413 let body = build_invalidation_list_xml(&items);
1414 drop(state);
1415 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1416 }
1417}
1418
1419impl CloudFrontService {
1422 fn parse_arn_query(query: &str) -> Option<String> {
1423 for pair in query.split('&').filter(|p| !p.is_empty()) {
1424 if let Some(rest) = pair.strip_prefix("Resource=") {
1425 return Some(percent_decode(rest));
1426 }
1427 }
1428 None
1429 }
1430
1431 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1432 let arn = Self::parse_arn_query(&req.raw_query)
1433 .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1434 let parsed: ModelTags = xml_io::from_xml_root(&req.body)
1435 .map_err(|e| invalid_argument(format!("invalid Tags XML: {e}")))?;
1436 let new_tags: Vec<Tag> = parsed
1437 .items
1438 .map(|i| {
1439 i.tag
1440 .into_iter()
1441 .map(|t| Tag {
1442 key: t.key,
1443 value: t.value,
1444 })
1445 .collect()
1446 })
1447 .unwrap_or_default();
1448 let mut state = self.state.write();
1449 let account = state.entry(DEFAULT_ACCOUNT);
1450 let entry = account.tags.entry(arn).or_default();
1451 for tag in new_tags {
1452 if let Some(existing) = entry.iter_mut().find(|t| t.key == tag.key) {
1453 existing.value = tag.value;
1454 } else {
1455 entry.push(tag);
1456 }
1457 }
1458 Ok(empty_response(StatusCode::NO_CONTENT))
1459 }
1460
1461 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1462 let arn = Self::parse_arn_query(&req.raw_query)
1463 .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1464 let parsed: TagKeys = xml_io::from_xml_root(&req.body)
1465 .map_err(|e| invalid_argument(format!("invalid TagKeys XML: {e}")))?;
1466 let keys: Vec<String> = parsed.items.map(|k| k.key).unwrap_or_default();
1467 let mut state = self.state.write();
1468 let account = state.entry(DEFAULT_ACCOUNT);
1469 if let Some(existing) = account.tags.get_mut(&arn) {
1470 existing.retain(|t| !keys.contains(&t.key));
1471 }
1472 Ok(empty_response(StatusCode::NO_CONTENT))
1473 }
1474
1475 fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1476 let arn = Self::parse_arn_query(&req.raw_query)
1477 .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1478 let state = self.state.read();
1479 let tags = state
1480 .accounts
1481 .get(DEFAULT_ACCOUNT)
1482 .and_then(|a| a.tags.get(&arn))
1483 .cloned()
1484 .unwrap_or_default();
1485 drop(state);
1486 let body = build_tags_xml(&tags);
1487 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1488 }
1489}
1490
1491impl CloudFrontService {
1494 fn associate_alias(
1495 &self,
1496 req: &AwsRequest,
1497 route: &Route,
1498 ) -> Result<AwsResponse, AwsServiceError> {
1499 let id = route
1500 .id
1501 .as_deref()
1502 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1503 let alias = parse_query_value(&req.raw_query, "Alias")
1504 .ok_or_else(|| invalid_argument("Alias query parameter is required"))?;
1505 let mut state = self.state.write();
1506 let account = state
1507 .accounts
1508 .get_mut(DEFAULT_ACCOUNT)
1509 .ok_or_else(|| no_such_distribution(id))?;
1510 if let Some(other) = account.distributions.values().find(|d| {
1512 d.id != id
1513 && d.config
1514 .aliases
1515 .as_ref()
1516 .and_then(|a| a.items.as_ref())
1517 .is_some_and(|i| i.cname.iter().any(|c| c == &alias))
1518 }) {
1519 return Err(aws_error(
1520 StatusCode::CONFLICT,
1521 "CNAMEAlreadyExists",
1522 format!(
1523 "Alias {alias} is already associated with distribution {}",
1524 other.id
1525 ),
1526 ));
1527 }
1528 let dist = account
1529 .distributions
1530 .get_mut(id)
1531 .ok_or_else(|| no_such_distribution(id))?;
1532 let aliases = dist.config.aliases.get_or_insert_with(Default::default);
1533 let items = aliases
1534 .items
1535 .get_or_insert_with(crate::model::AliasItems::default);
1536 if !items.cname.iter().any(|c| c == &alias) {
1537 items.cname.push(alias.clone());
1538 aliases.quantity = items.cname.len() as i32;
1539 }
1540 dist.etag = generate_etag();
1541 dist.last_modified_time = Utc::now();
1542 Ok(empty_response(StatusCode::OK))
1543 }
1544
1545 fn list_conflicting_aliases(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1546 let alias = parse_query_value(&req.raw_query, "Alias")
1547 .ok_or_else(|| invalid_argument("Alias query parameter is required"))?;
1548 let dist_id = parse_query_value(&req.raw_query, "DistributionId")
1549 .ok_or_else(|| invalid_argument("DistributionId query parameter is required"))?;
1550 if alias.len() > 253 {
1555 return Err(invalid_argument(format!(
1556 "Alias length {} exceeds maximum 253",
1557 alias.len()
1558 )));
1559 }
1560 if dist_id.len() > 25 {
1561 return Err(invalid_argument(format!(
1562 "DistributionId length {} exceeds maximum 25",
1563 dist_id.len()
1564 )));
1565 }
1566 if let Some(max_items) = parse_query_value(&req.raw_query, "MaxItems") {
1567 let n: i64 = max_items.parse().map_err(|_| {
1568 invalid_argument(format!("MaxItems must be an integer, got '{max_items}'"))
1569 })?;
1570 if n > 100 {
1571 return Err(invalid_argument(format!(
1572 "MaxItems {n} exceeds maximum 100"
1573 )));
1574 }
1575 }
1576 let body = format!(
1579 "{XML_DECL}<ConflictingAliasesList xmlns=\"{NS}\"><Quantity>0</Quantity></ConflictingAliasesList>",
1580 NS = crate::NAMESPACE,
1581 XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1582 );
1583 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1584 }
1585
1586 fn associate_web_acl(
1587 &self,
1588 req: &AwsRequest,
1589 route: &Route,
1590 ) -> Result<AwsResponse, AwsServiceError> {
1591 let id = route
1592 .id
1593 .as_deref()
1594 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1595 let parsed: AssociateAliasRequest = xml_io::from_xml_root(&req.body)
1596 .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
1597 let mut state = self.state.write();
1598 let account = state
1599 .accounts
1600 .get_mut(DEFAULT_ACCOUNT)
1601 .ok_or_else(|| no_such_distribution(id))?;
1602 let dist = account
1603 .distributions
1604 .get_mut(id)
1605 .ok_or_else(|| no_such_distribution(id))?;
1606 dist.config.web_acl_id = Some(parsed.web_acl_arn.clone());
1607 dist.etag = generate_etag();
1608 dist.last_modified_time = Utc::now();
1609 let body = format!(
1610 "{XML_DECL}<AssociateDistributionWebACLResult xmlns=\"{NS}\"><Id>{}</Id><WebACLArn>{}</WebACLArn></AssociateDistributionWebACLResult>",
1611 esc(id), esc(&parsed.web_acl_arn),
1612 NS = crate::NAMESPACE,
1613 XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1614 );
1615 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1616 }
1617
1618 fn disassociate_web_acl(
1619 &self,
1620 _req: &AwsRequest,
1621 route: &Route,
1622 ) -> Result<AwsResponse, AwsServiceError> {
1623 let id = route
1624 .id
1625 .as_deref()
1626 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1627 let entity_not_found = || {
1630 aws_error(
1631 StatusCode::NOT_FOUND,
1632 "EntityNotFound",
1633 format!("The specified distribution does not exist: {id}"),
1634 )
1635 };
1636 let mut state = self.state.write();
1637 let account = state
1638 .accounts
1639 .get_mut(DEFAULT_ACCOUNT)
1640 .ok_or_else(entity_not_found)?;
1641 let dist = account
1642 .distributions
1643 .get_mut(id)
1644 .ok_or_else(entity_not_found)?;
1645 dist.config.web_acl_id = None;
1646 dist.etag = generate_etag();
1647 dist.last_modified_time = Utc::now();
1648 let body = format!(
1649 "{XML_DECL}<DisassociateDistributionWebACLResult xmlns=\"{NS}\"><Id>{}</Id></DisassociateDistributionWebACLResult>",
1650 esc(id),
1651 NS = crate::NAMESPACE,
1652 XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1653 );
1654 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1655 }
1656}
1657
1658#[derive(serde::Deserialize, Default, Debug)]
1659#[serde(rename_all = "PascalCase")]
1660struct AssociateAliasRequest {
1661 #[serde(rename = "WebACLArn", default)]
1662 web_acl_arn: String,
1663}
1664
1665pub(crate) fn esc(s: &str) -> String {
1673 let mut out = String::with_capacity(s.len());
1674 for c in s.chars() {
1675 match c {
1676 '&' => out.push_str("&"),
1677 '<' => out.push_str("<"),
1678 '>' => out.push_str(">"),
1679 '"' => out.push_str("""),
1680 '\'' => out.push_str("'"),
1681 _ => out.push(c),
1682 }
1683 }
1684 out
1685}
1686
1687pub(crate) fn build_distribution_xml(dist: &StoredDistribution) -> String {
1688 let mut out = String::with_capacity(2048);
1689 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1690 out.push_str(&format!(
1691 "<Distribution xmlns=\"{ns}\">",
1692 ns = crate::NAMESPACE
1693 ));
1694 out.push_str(&format!("<Id>{}</Id>", esc(&dist.id)));
1695 out.push_str(&format!("<ARN>{}</ARN>", esc(&dist.arn)));
1696 out.push_str(&format!("<Status>{}</Status>", esc(&dist.status)));
1697 out.push_str(&format!(
1698 "<LastModifiedTime>{}</LastModifiedTime>",
1699 rfc3339(&dist.last_modified_time)
1700 ));
1701 out.push_str(&format!(
1702 "<InProgressInvalidationBatches>{}</InProgressInvalidationBatches>",
1703 dist.in_progress_invalidation_batches
1704 ));
1705 out.push_str(&format!(
1706 "<DomainName>{}</DomainName>",
1707 esc(&dist.domain_name)
1708 ));
1709 out.push_str("<ActiveTrustedSigners><Enabled>false</Enabled><Quantity>0</Quantity></ActiveTrustedSigners>");
1710 out.push_str("<ActiveTrustedKeyGroups><Enabled>false</Enabled><Quantity>0</Quantity></ActiveTrustedKeyGroups>");
1711 let inner = quick_xml::se::to_string_with_root("DistributionConfig", &dist.config)
1712 .unwrap_or_else(|_| String::new());
1713 out.push_str(&inner);
1714 out.push_str("</Distribution>");
1715 out
1716}
1717
1718fn build_distribution_list_xml(
1719 dists: &[StoredDistribution],
1720 root: &str,
1721 marker: &str,
1722 max_items: usize,
1723 next_marker: Option<&str>,
1724) -> String {
1725 let mut out = String::with_capacity(2048);
1726 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1727 out.push_str(&format!("<{root} xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1728 out.push_str(&format!("<Marker>{}</Marker>", esc(marker)));
1729 if let Some(nm) = next_marker {
1730 out.push_str(&format!("<NextMarker>{}</NextMarker>", esc(nm)));
1731 }
1732 out.push_str(&format!("<MaxItems>{max_items}</MaxItems>"));
1733 out.push_str(&format!(
1734 "<IsTruncated>{}</IsTruncated>",
1735 next_marker.is_some()
1736 ));
1737 out.push_str(&format!("<Quantity>{}</Quantity>", dists.len()));
1738 if dists.is_empty() {
1739 out.push_str(&format!("</{root}>"));
1740 return out;
1741 }
1742 out.push_str("<Items>");
1743 for d in dists {
1744 out.push_str("<DistributionSummary>");
1745 out.push_str(&format!("<Id>{}</Id>", esc(&d.id)));
1746 out.push_str(&format!("<ARN>{}</ARN>", esc(&d.arn)));
1747 out.push_str(&format!("<Status>{}</Status>", esc(&d.status)));
1748 out.push_str(&format!(
1749 "<LastModifiedTime>{}</LastModifiedTime>",
1750 rfc3339(&d.last_modified_time)
1751 ));
1752 out.push_str(&format!("<DomainName>{}</DomainName>", esc(&d.domain_name)));
1753 let aliases = d.config.aliases.clone().unwrap_or_default();
1754 out.push_str(&render_inline("Aliases", &aliases));
1755 let origins = d.config.origins.clone();
1756 out.push_str(&render_inline("Origins", &origins));
1757 let dcb = d.config.default_cache_behavior.clone();
1758 out.push_str(&render_inline("DefaultCacheBehavior", &dcb));
1759 let cb = d.config.cache_behaviors.clone().unwrap_or_default();
1760 out.push_str(&render_inline("CacheBehaviors", &cb));
1761 let cer = d.config.custom_error_responses.clone().unwrap_or_default();
1762 out.push_str(&render_inline("CustomErrorResponses", &cer));
1763 out.push_str(&format!("<Comment>{}</Comment>", esc(&d.config.comment)));
1764 out.push_str(&format!(
1765 "<PriceClass>{}</PriceClass>",
1766 esc(&d
1767 .config
1768 .price_class
1769 .clone()
1770 .unwrap_or_else(|| "PriceClass_All".to_string()))
1771 ));
1772 out.push_str(&format!("<Enabled>{}</Enabled>", d.config.enabled));
1773 out.push_str(&render_inline(
1774 "ViewerCertificate",
1775 &d.config.viewer_certificate.clone().unwrap_or_default(),
1776 ));
1777 out.push_str(&render_inline(
1778 "Restrictions",
1779 &d.config.restrictions.clone().unwrap_or_default(),
1780 ));
1781 out.push_str(&format!(
1782 "<WebACLId>{}</WebACLId>",
1783 esc(&d.config.web_acl_id.clone().unwrap_or_default())
1784 ));
1785 out.push_str(&format!(
1786 "<HttpVersion>{}</HttpVersion>",
1787 esc(&d
1788 .config
1789 .http_version
1790 .clone()
1791 .unwrap_or_else(|| "http2".to_string()))
1792 ));
1793 out.push_str(&format!(
1794 "<IsIPV6Enabled>{}</IsIPV6Enabled>",
1795 d.config.is_ipv6_enabled.unwrap_or(true)
1796 ));
1797 out.push_str(&format!(
1798 "<Staging>{}</Staging>",
1799 d.config.staging.unwrap_or(false)
1800 ));
1801 out.push_str("</DistributionSummary>");
1802 }
1803 out.push_str("</Items>");
1804 out.push_str(&format!("</{root}>"));
1805 out
1806}
1807
1808fn build_empty_distribution_id_list(root: &str) -> String {
1809 let mut out = String::with_capacity(256);
1810 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1811 out.push_str(&format!("<{root} xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1812 out.push_str("<Marker></Marker>");
1813 out.push_str("<MaxItems>100</MaxItems>");
1814 out.push_str("<IsTruncated>false</IsTruncated>");
1815 out.push_str("<Quantity>0</Quantity>");
1816 out.push_str(&format!("</{root}>"));
1817 out
1818}
1819
1820fn build_distribution_id_list_xml(
1823 ids: &[&str],
1824 marker: &str,
1825 max_items: usize,
1826 next_marker: Option<&str>,
1827) -> String {
1828 let mut out = String::with_capacity(256);
1829 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1830 out.push_str(&format!(
1831 "<DistributionIdList xmlns=\"{ns}\">",
1832 ns = crate::NAMESPACE
1833 ));
1834 out.push_str(&format!("<Marker>{}</Marker>", esc(marker)));
1835 if let Some(nm) = next_marker {
1836 out.push_str(&format!("<NextMarker>{}</NextMarker>", esc(nm)));
1837 }
1838 out.push_str(&format!("<MaxItems>{max_items}</MaxItems>"));
1839 out.push_str(&format!(
1840 "<IsTruncated>{}</IsTruncated>",
1841 next_marker.is_some()
1842 ));
1843 out.push_str(&format!("<Quantity>{}</Quantity>", ids.len()));
1844 if !ids.is_empty() {
1845 out.push_str("<Items>");
1846 for id in ids {
1847 out.push_str(&format!("<DistributionId>{}</DistributionId>", esc(id)));
1848 }
1849 out.push_str("</Items>");
1850 }
1851 out.push_str("</DistributionIdList>");
1852 out
1853}
1854
1855fn paginate_distributions(
1860 req: &AwsRequest,
1861 items: Vec<StoredDistribution>,
1862) -> (String, usize, Vec<StoredDistribution>, Option<String>) {
1863 let max_items = req
1864 .query_params
1865 .get("MaxItems")
1866 .or_else(|| req.query_params.get("maxitems"))
1867 .and_then(|v| v.parse::<usize>().ok())
1868 .filter(|n| *n > 0)
1869 .unwrap_or(100);
1870 let marker = req
1871 .query_params
1872 .get("Marker")
1873 .or_else(|| req.query_params.get("marker"))
1874 .cloned()
1875 .unwrap_or_default();
1876 let start_idx = if marker.is_empty() {
1880 0
1881 } else {
1882 match items.iter().position(|d| d.id == marker) {
1883 Some(pos) => pos + 1,
1884 None => items.len(),
1885 }
1886 };
1887 let page: Vec<StoredDistribution> = items
1888 .iter()
1889 .skip(start_idx)
1890 .take(max_items)
1891 .cloned()
1892 .collect();
1893 let has_more = start_idx + page.len() < items.len();
1897 let next_marker = if has_more {
1898 page.last().map(|d| d.id.clone())
1899 } else {
1900 None
1901 };
1902 (marker, max_items, page, next_marker)
1903}
1904
1905fn distribution_cache_behaviors(
1908 d: &StoredDistribution,
1909) -> impl Iterator<Item = &crate::model::CacheBehavior> {
1910 d.config
1911 .cache_behaviors
1912 .as_ref()
1913 .and_then(|cb| cb.items.as_ref())
1914 .into_iter()
1915 .flat_map(|items| items.cache_behavior.iter())
1916}
1917
1918fn distribution_uses_cache_policy(d: &StoredDistribution, id: &str) -> bool {
1919 d.config.default_cache_behavior.cache_policy_id.as_deref() == Some(id)
1920 || distribution_cache_behaviors(d).any(|b| b.cache_policy_id.as_deref() == Some(id))
1921}
1922
1923fn distribution_uses_origin_request_policy(d: &StoredDistribution, id: &str) -> bool {
1924 d.config
1925 .default_cache_behavior
1926 .origin_request_policy_id
1927 .as_deref()
1928 == Some(id)
1929 || distribution_cache_behaviors(d)
1930 .any(|b| b.origin_request_policy_id.as_deref() == Some(id))
1931}
1932
1933fn distribution_uses_response_headers_policy(d: &StoredDistribution, id: &str) -> bool {
1934 d.config
1935 .default_cache_behavior
1936 .response_headers_policy_id
1937 .as_deref()
1938 == Some(id)
1939 || distribution_cache_behaviors(d)
1940 .any(|b| b.response_headers_policy_id.as_deref() == Some(id))
1941}
1942
1943fn trusted_key_groups_contains(tkg: Option<&crate::model::TrustedKeyGroups>, id: &str) -> bool {
1944 tkg.and_then(|g| g.items.as_ref())
1945 .map(|items| items.key_group.iter().any(|k| k == id))
1946 .unwrap_or(false)
1947}
1948
1949fn distribution_uses_key_group(d: &StoredDistribution, id: &str) -> bool {
1950 trusted_key_groups_contains(
1951 d.config.default_cache_behavior.trusted_key_groups.as_ref(),
1952 id,
1953 ) || distribution_cache_behaviors(d)
1954 .any(|b| trusted_key_groups_contains(b.trusted_key_groups.as_ref(), id))
1955}
1956
1957fn distribution_uses_vpc_origin(d: &StoredDistribution, id: &str) -> bool {
1958 d.config
1959 .origins
1960 .items
1961 .as_ref()
1962 .map(|items| {
1963 items.origin.iter().any(|o| {
1964 o.vpc_origin_config
1965 .as_ref()
1966 .map(|v| v.vpc_origin_id == id)
1967 .unwrap_or(false)
1968 })
1969 })
1970 .unwrap_or(false)
1971}
1972
1973fn build_invalidation_xml(inv: &StoredInvalidation) -> String {
1974 let mut out = String::with_capacity(512);
1975 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1976 out.push_str(&format!(
1977 "<Invalidation xmlns=\"{ns}\">",
1978 ns = crate::NAMESPACE
1979 ));
1980 out.push_str(&format!("<Id>{}</Id>", esc(&inv.id)));
1981 out.push_str(&format!("<Status>{}</Status>", esc(&inv.status)));
1982 out.push_str(&format!(
1983 "<CreateTime>{}</CreateTime>",
1984 rfc3339(&inv.create_time)
1985 ));
1986 out.push_str(&render_inline("InvalidationBatch", &inv.batch));
1987 out.push_str("</Invalidation>");
1988 out
1989}
1990
1991fn build_invalidation_list_xml(items: &[&StoredInvalidation]) -> String {
1992 let mut out = String::with_capacity(1024);
1993 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1994 out.push_str(&format!(
1995 "<InvalidationList xmlns=\"{ns}\">",
1996 ns = crate::NAMESPACE
1997 ));
1998 out.push_str("<Marker></Marker>");
1999 out.push_str("<MaxItems>100</MaxItems>");
2000 out.push_str("<IsTruncated>false</IsTruncated>");
2001 out.push_str(&format!("<Quantity>{}</Quantity>", items.len()));
2002 if !items.is_empty() {
2003 out.push_str("<Items>");
2004 for inv in items {
2005 out.push_str("<InvalidationSummary>");
2006 out.push_str(&format!("<Id>{}</Id>", esc(&inv.id)));
2007 out.push_str(&format!(
2008 "<CreateTime>{}</CreateTime>",
2009 rfc3339(&inv.create_time)
2010 ));
2011 out.push_str(&format!("<Status>{}</Status>", esc(&inv.status)));
2012 out.push_str("</InvalidationSummary>");
2013 }
2014 out.push_str("</Items>");
2015 }
2016 out.push_str("</InvalidationList>");
2017 out
2018}
2019
2020fn build_tags_xml(tags: &[Tag]) -> String {
2021 let mut out = String::with_capacity(256);
2022 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
2023 out.push_str(&format!("<Tags xmlns=\"{ns}\">", ns = crate::NAMESPACE));
2024 out.push_str("<Items>");
2025 for t in tags {
2026 out.push_str("<Tag>");
2027 out.push_str(&format!("<Key>{}</Key>", esc(&t.key)));
2028 if let Some(v) = &t.value {
2029 out.push_str(&format!("<Value>{}</Value>", esc(v)));
2030 }
2031 out.push_str("</Tag>");
2032 }
2033 out.push_str("</Items>");
2034 out.push_str("</Tags>");
2035 out
2036}
2037
2038fn render_inline<T: serde::Serialize>(root: &str, value: &T) -> String {
2039 quick_xml::se::to_string_with_root(root, value).unwrap_or_default()
2040}
2041
2042fn validate_caller_reference(s: &str) -> Result<(), AwsServiceError> {
2045 if s.is_empty() {
2046 return Err(invalid_argument("CallerReference is required"));
2047 }
2048 Ok(())
2049}
2050
2051fn validate_origins(config: &DistributionConfig) -> Result<(), AwsServiceError> {
2052 if config.origins.quantity < 1 {
2053 return Err(invalid_argument(
2054 "DistributionConfig.Origins must contain at least one origin",
2055 ));
2056 }
2057 Ok(())
2058}
2059
2060fn configs_equal(lhs: &DistributionConfig, rhs: &DistributionConfig) -> bool {
2066 let Ok(a) = xml_io::to_xml_root("DistributionConfig", lhs) else {
2067 return false;
2068 };
2069 let Ok(b) = xml_io::to_xml_root("DistributionConfig", rhs) else {
2070 return false;
2071 };
2072 a == b
2073}
2074
2075fn account_id(_req: &AwsRequest) -> &'static str {
2076 DEFAULT_ACCOUNT
2081}
2082
2083fn generate_distribution_id() -> String {
2084 let raw = Uuid::new_v4().simple().to_string().to_uppercase();
2086 format!("E{}", &raw[..13])
2087}
2088
2089fn generate_invalidation_id() -> String {
2090 let raw = Uuid::new_v4().simple().to_string().to_uppercase();
2091 format!("I{}", &raw[..13])
2092}
2093
2094pub(crate) fn generate_etag() -> String {
2095 let raw = Uuid::new_v4().simple().to_string().to_uppercase();
2096 format!("E{}", &raw[..13])
2097}
2098
2099pub(crate) fn generate_id_with_prefix(prefix: &str) -> String {
2104 let raw = Uuid::new_v4().simple().to_string().to_uppercase();
2105 let suffix_len = 14usize.saturating_sub(prefix.len()).min(raw.len());
2106 format!("{prefix}{}", &raw[..suffix_len])
2107}
2108
2109fn rfc3339(t: &chrono::DateTime<chrono::Utc>) -> String {
2110 t.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
2111}
2112
2113pub(crate) fn invalid_argument(msg: impl Into<String>) -> AwsServiceError {
2114 aws_error(StatusCode::BAD_REQUEST, "InvalidArgument", msg)
2115}
2116
2117fn no_such_distribution(id: &str) -> AwsServiceError {
2118 aws_error(
2119 StatusCode::NOT_FOUND,
2120 "NoSuchDistribution",
2121 format!("The specified distribution does not exist: {id}"),
2122 )
2123}
2124
2125fn no_such_invalidation(id: &str) -> AwsServiceError {
2126 aws_error(
2127 StatusCode::NOT_FOUND,
2128 "NoSuchInvalidation",
2129 format!("The specified invalidation does not exist: {id}"),
2130 )
2131}
2132
2133fn internal_error(msg: impl Into<String>) -> AwsServiceError {
2134 aws_error(StatusCode::INTERNAL_SERVER_ERROR, "InternalError", msg)
2135}
2136
2137pub(crate) fn aws_error(
2138 status: StatusCode,
2139 code: impl Into<String>,
2140 msg: impl Into<String>,
2141) -> AwsServiceError {
2142 AwsServiceError::aws_error(status, code.into(), msg)
2143}
2144
2145fn set_header(headers: &mut HeaderMap, name: HeaderName, value: &str) {
2146 if let Ok(v) = HeaderValue::from_str(value) {
2147 headers.insert(name, v);
2148 }
2149}
2150
2151pub(crate) fn xml_response(status: StatusCode, body: String, headers: HeaderMap) -> AwsResponse {
2152 AwsResponse {
2153 status,
2154 content_type: "text/xml".to_string(),
2155 body: ResponseBody::Bytes(Bytes::from(body)),
2156 headers,
2157 }
2158}
2159
2160fn empty_response(status: StatusCode) -> AwsResponse {
2161 AwsResponse {
2162 status,
2163 content_type: "text/xml".to_string(),
2164 body: ResponseBody::Bytes(Bytes::new()),
2165 headers: HeaderMap::new(),
2166 }
2167}
2168
2169fn percent_decode(input: &str) -> String {
2170 let mut out = String::with_capacity(input.len());
2171 let bytes = input.as_bytes();
2172 let mut i = 0;
2173 while i < bytes.len() {
2174 let b = bytes[i];
2175 if b == b'%' && i + 2 < bytes.len() {
2176 if let (Some(a), Some(c)) = (hex_digit(bytes[i + 1]), hex_digit(bytes[i + 2])) {
2177 out.push(((a << 4) | c) as char);
2178 i += 3;
2179 continue;
2180 }
2181 }
2182 if b == b'+' {
2183 out.push(' ');
2184 } else {
2185 out.push(b as char);
2186 }
2187 i += 1;
2188 }
2189 out
2190}
2191
2192fn hex_digit(b: u8) -> Option<u8> {
2193 match b {
2194 b'0'..=b'9' => Some(b - b'0'),
2195 b'a'..=b'f' => Some(b - b'a' + 10),
2196 b'A'..=b'F' => Some(b - b'A' + 10),
2197 _ => None,
2198 }
2199}
2200
2201pub(crate) fn is_placeholder_label(value: &str) -> bool {
2208 if value.is_empty() {
2209 return true;
2210 }
2211 let lower = value.to_ascii_lowercase();
2212 value.starts_with('{') || lower.starts_with("%7b")
2213}
2214
2215pub(crate) fn extract_body_field(body: &[u8], key: &str) -> Option<String> {
2223 if let Ok(s) = std::str::from_utf8(body) {
2224 let trimmed = s.trim_start();
2225 if trimmed.starts_with('{') {
2226 if let Ok(v) = serde_json::from_str::<serde_json::Value>(trimmed) {
2227 if let Some(field) = v.get(key) {
2228 return match field {
2229 serde_json::Value::String(s) => Some(s.clone()),
2230 serde_json::Value::Number(n) => Some(n.to_string()),
2231 serde_json::Value::Bool(b) => Some(b.to_string()),
2232 _ => None,
2233 };
2234 }
2235 return None;
2236 }
2237 }
2238 let open = format!("<{key}>");
2240 let close = format!("</{key}>");
2241 if let Some(start) = s.find(&open) {
2242 let after = start + open.len();
2243 if let Some(end_rel) = s[after..].find(&close) {
2244 return Some(s[after..after + end_rel].to_string());
2245 }
2246 }
2247 }
2248 None
2249}
2250
2251fn parse_query_value(query: &str, key: &str) -> Option<String> {
2252 let prefix = format!("{key}=");
2253 for pair in query.split('&').filter(|p| !p.is_empty()) {
2254 if let Some(rest) = pair.strip_prefix(&prefix) {
2255 return Some(percent_decode(rest));
2256 }
2257 }
2258 None
2259}
2260
2261#[cfg(test)]
2262mod tests {
2263 use super::*;
2264
2265 #[test]
2266 fn distribution_list_xml_renders_pagination_fields() {
2267 let xml =
2270 build_distribution_list_xml(&[], "DistributionList", "EDFDVBD6", 2, Some("E2NEXT"));
2271 assert!(xml.contains("<Marker>EDFDVBD6</Marker>"));
2272 assert!(xml.contains("<MaxItems>2</MaxItems>"));
2273 assert!(xml.contains("<IsTruncated>true</IsTruncated>"));
2274 assert!(xml.contains("<NextMarker>E2NEXT</NextMarker>"));
2275 let xml = build_distribution_list_xml(&[], "DistributionList", "", 100, None);
2277 assert!(xml.contains("<IsTruncated>false</IsTruncated>"));
2278 assert!(!xml.contains("<NextMarker>"));
2279 }
2280
2281 #[test]
2282 fn placeholder_label_detects_braces_and_percent_encoding() {
2283 assert!(is_placeholder_label(""));
2284 assert!(is_placeholder_label("{Identifier}"));
2285 assert!(is_placeholder_label("%7BIdentifier%7D"));
2286 assert!(is_placeholder_label("%7bidentifier%7d"));
2287 assert!(!is_placeholder_label("E1234567890ABC"));
2288 assert!(!is_placeholder_label(
2289 "arn:aws:cloudfront::000:distribution/E1"
2290 ));
2291 }
2292
2293 #[test]
2294 fn extract_body_field_handles_json_and_xml() {
2295 let json = br#"{"Stage":"BROKEN","Marker":"x"}"#;
2296 assert_eq!(
2297 extract_body_field(json, "Stage"),
2298 Some("BROKEN".to_string())
2299 );
2300 assert_eq!(extract_body_field(json, "MaxItems"), None);
2301
2302 let xml = br#"<?xml version="1.0"?><Body><Domain>example.com</Domain></Body>"#;
2303 assert_eq!(
2304 extract_body_field(xml, "Domain"),
2305 Some("example.com".to_string())
2306 );
2307 assert_eq!(extract_body_field(xml, "Missing"), None);
2308
2309 assert_eq!(extract_body_field(b"", "x"), None);
2310 }
2311
2312 fn make_state() -> SharedCloudFrontState {
2313 Arc::new(RwLock::new(CloudFrontAccounts::new()))
2314 }
2315
2316 fn make_request(method: http::Method, path: &str, query: &str, body: &str) -> AwsRequest {
2317 AwsRequest {
2318 service: "cloudfront".into(),
2319 action: String::new(),
2320 region: "us-east-1".into(),
2321 account_id: DEFAULT_ACCOUNT.into(),
2322 request_id: Uuid::new_v4().to_string(),
2323 headers: HeaderMap::new(),
2324 query_params: std::collections::HashMap::new(),
2325 body_stream: parking_lot::Mutex::new(None),
2326 body: Bytes::from(body.to_string()),
2327 path_segments: path
2328 .split('/')
2329 .filter(|s| !s.is_empty())
2330 .map(String::from)
2331 .collect(),
2332 raw_path: path.into(),
2333 raw_query: query.into(),
2334 method,
2335 is_query_protocol: false,
2336 access_key_id: None,
2337 principal: None,
2338 }
2339 }
2340
2341 fn minimal_dist_config_xml(caller_ref: &str) -> String {
2342 format!(
2343 r#"<?xml version="1.0" encoding="UTF-8"?>
2344<DistributionConfig xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2345 <CallerReference>{caller_ref}</CallerReference>
2346 <Origins>
2347 <Quantity>1</Quantity>
2348 <Items>
2349 <Origin>
2350 <Id>primary</Id>
2351 <DomainName>example.com</DomainName>
2352 </Origin>
2353 </Items>
2354 </Origins>
2355 <DefaultCacheBehavior>
2356 <TargetOriginId>primary</TargetOriginId>
2357 <ViewerProtocolPolicy>allow-all</ViewerProtocolPolicy>
2358 </DefaultCacheBehavior>
2359 <Comment></Comment>
2360 <Enabled>true</Enabled>
2361</DistributionConfig>"#
2362 )
2363 }
2364
2365 #[tokio::test]
2366 async fn create_then_get_then_delete_distribution() {
2367 let svc = CloudFrontService::new(make_state());
2368 let body = minimal_dist_config_xml("ref-1");
2369 let create = svc
2370 .handle(make_request(
2371 http::Method::POST,
2372 "/2020-05-31/distribution",
2373 "",
2374 &body,
2375 ))
2376 .await
2377 .unwrap();
2378 assert_eq!(create.status, StatusCode::CREATED);
2379 let etag = create
2380 .headers
2381 .get(ETAG)
2382 .unwrap()
2383 .to_str()
2384 .unwrap()
2385 .to_string();
2386 let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2387 let id = xml
2388 .split("<Id>")
2389 .nth(1)
2390 .unwrap()
2391 .split("</Id>")
2392 .next()
2393 .unwrap()
2394 .to_string();
2395
2396 let get = svc
2397 .handle(make_request(
2398 http::Method::GET,
2399 &format!("/2020-05-31/distribution/{id}"),
2400 "",
2401 "",
2402 ))
2403 .await
2404 .unwrap();
2405 assert_eq!(get.status, StatusCode::OK);
2406
2407 let disable_body = body.replace("<Enabled>true</Enabled>", "<Enabled>false</Enabled>");
2409 let mut update_req = make_request(
2410 http::Method::PUT,
2411 &format!("/2020-05-31/distribution/{id}/config"),
2412 "",
2413 &disable_body,
2414 );
2415 update_req.headers.insert(IF_MATCH, etag.parse().unwrap());
2416 let updated = svc.handle(update_req).await.unwrap();
2417 assert_eq!(updated.status, StatusCode::OK);
2418 let new_etag = updated
2419 .headers
2420 .get(ETAG)
2421 .unwrap()
2422 .to_str()
2423 .unwrap()
2424 .to_string();
2425
2426 let mut del_req = make_request(
2427 http::Method::DELETE,
2428 &format!("/2020-05-31/distribution/{id}"),
2429 "",
2430 "",
2431 );
2432 del_req.headers.insert(IF_MATCH, new_etag.parse().unwrap());
2433 let del = svc.handle(del_req).await.unwrap();
2434 assert_eq!(del.status, StatusCode::NO_CONTENT);
2435 }
2436
2437 async fn create_distribution_returning_id(svc: &CloudFrontService, caller_ref: &str) -> String {
2438 let body = minimal_dist_config_xml(caller_ref);
2439 let create = svc
2440 .handle(make_request(
2441 http::Method::POST,
2442 "/2020-05-31/distribution",
2443 "",
2444 &body,
2445 ))
2446 .await
2447 .unwrap();
2448 let xml = std::str::from_utf8(create.body.expect_bytes())
2449 .unwrap()
2450 .to_string();
2451 xml.split("<Id>")
2452 .nth(1)
2453 .unwrap()
2454 .split("</Id>")
2455 .next()
2456 .unwrap()
2457 .to_string()
2458 }
2459
2460 fn distribution_status(svc: &CloudFrontService, id: &str) -> String {
2461 let state = svc.state.read();
2462 state
2463 .accounts
2464 .get(DEFAULT_ACCOUNT)
2465 .and_then(|a| a.distributions.get(id))
2466 .map(|d| d.status.clone())
2467 .unwrap_or_default()
2468 }
2469
2470 #[tokio::test]
2471 async fn create_distribution_starts_in_progress() {
2472 let svc = CloudFrontService::new(make_state())
2474 .with_propagation_delay(std::time::Duration::from_secs(60));
2475 let body = minimal_dist_config_xml("status-ref");
2476 let create = svc
2477 .handle(make_request(
2478 http::Method::POST,
2479 "/2020-05-31/distribution",
2480 "",
2481 &body,
2482 ))
2483 .await
2484 .unwrap();
2485 let xml = std::str::from_utf8(create.body.expect_bytes())
2486 .unwrap()
2487 .to_string();
2488 assert!(
2489 xml.contains("<Status>InProgress</Status>"),
2490 "expected initial status InProgress, got: {xml}"
2491 );
2492 }
2493
2494 #[tokio::test]
2495 async fn auto_transition_after_tick_marks_deployed() {
2496 let svc = CloudFrontService::new(make_state())
2497 .with_propagation_delay(std::time::Duration::from_millis(50));
2498 let id = create_distribution_returning_id(&svc, "auto-tick-ref").await;
2499 assert_eq!(distribution_status(&svc, &id), "InProgress");
2500 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2501 assert_eq!(distribution_status(&svc, &id), "Deployed");
2502 }
2503
2504 #[tokio::test]
2505 async fn set_distribution_status_via_admin_flips_synchronously() {
2506 let svc = CloudFrontService::new(make_state())
2507 .with_propagation_delay(std::time::Duration::from_secs(60));
2508 let id = create_distribution_returning_id(&svc, "admin-ref").await;
2509 assert_eq!(distribution_status(&svc, &id), "InProgress");
2510 assert!(svc.set_distribution_status(&id, "Deployed"));
2511 assert_eq!(distribution_status(&svc, &id), "Deployed");
2512 assert!(svc.set_distribution_status(&id, "InProgress"));
2513 assert_eq!(distribution_status(&svc, &id), "InProgress");
2514 }
2515
2516 #[tokio::test]
2517 async fn set_distribution_status_unknown_id_returns_false() {
2518 let svc = CloudFrontService::new(make_state());
2519 assert!(!svc.set_distribution_status("E-DOES-NOT-EXIST", "Deployed"));
2520 }
2521
2522 #[tokio::test]
2523 async fn update_distribution_resets_to_in_progress() {
2524 let svc = CloudFrontService::new(make_state())
2525 .with_propagation_delay(std::time::Duration::from_secs(60));
2526 let body = minimal_dist_config_xml("update-reset-ref");
2527 let create = svc
2528 .handle(make_request(
2529 http::Method::POST,
2530 "/2020-05-31/distribution",
2531 "",
2532 &body,
2533 ))
2534 .await
2535 .unwrap();
2536 let etag = create
2537 .headers
2538 .get(ETAG)
2539 .unwrap()
2540 .to_str()
2541 .unwrap()
2542 .to_string();
2543 let xml = std::str::from_utf8(create.body.expect_bytes())
2544 .unwrap()
2545 .to_string();
2546 let id = xml
2547 .split("<Id>")
2548 .nth(1)
2549 .unwrap()
2550 .split("</Id>")
2551 .next()
2552 .unwrap()
2553 .to_string();
2554 assert!(svc.set_distribution_status(&id, "Deployed"));
2557 assert_eq!(distribution_status(&svc, &id), "Deployed");
2558
2559 let updated_body = body.replace(
2560 "<ViewerProtocolPolicy>allow-all</ViewerProtocolPolicy>",
2561 "<ViewerProtocolPolicy>https-only</ViewerProtocolPolicy>",
2562 );
2563 let mut update_req = make_request(
2564 http::Method::PUT,
2565 &format!("/2020-05-31/distribution/{id}/config"),
2566 "",
2567 &updated_body,
2568 );
2569 update_req.headers.insert(IF_MATCH, etag.parse().unwrap());
2570 let updated = svc.handle(update_req).await.unwrap();
2571 assert_eq!(updated.status, StatusCode::OK);
2572 assert_eq!(distribution_status(&svc, &id), "InProgress");
2573 }
2574
2575 #[tokio::test]
2576 async fn duplicate_caller_reference_is_rejected() {
2577 let svc = CloudFrontService::new(make_state());
2578 let body = minimal_dist_config_xml("dup-ref");
2579 svc.handle(make_request(
2580 http::Method::POST,
2581 "/2020-05-31/distribution",
2582 "",
2583 &body,
2584 ))
2585 .await
2586 .unwrap();
2587 let result = svc
2588 .handle(make_request(
2589 http::Method::POST,
2590 "/2020-05-31/distribution",
2591 "",
2592 &body,
2593 ))
2594 .await;
2595 let err = match result {
2596 Ok(_) => panic!("expected duplicate caller-reference to fail"),
2597 Err(e) => e,
2598 };
2599 assert_eq!(err.code(), "DistributionAlreadyExists");
2600 assert_eq!(err.status(), StatusCode::CONFLICT);
2601 }
2602
2603 #[tokio::test]
2604 async fn invalidation_lifecycle() {
2605 let svc = CloudFrontService::new(make_state());
2606 let body = minimal_dist_config_xml("inv-ref");
2607 let create = svc
2608 .handle(make_request(
2609 http::Method::POST,
2610 "/2020-05-31/distribution",
2611 "",
2612 &body,
2613 ))
2614 .await
2615 .unwrap();
2616 let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2617 let dist_id = xml
2618 .split("<Id>")
2619 .nth(1)
2620 .unwrap()
2621 .split("</Id>")
2622 .next()
2623 .unwrap();
2624 let inv_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2625<InvalidationBatch xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2626 <Paths><Quantity>1</Quantity><Items><Path>/*</Path></Items></Paths>
2627 <CallerReference>inv-1</CallerReference>
2628</InvalidationBatch>"#;
2629 let inv_resp = svc
2630 .handle(make_request(
2631 http::Method::POST,
2632 &format!("/2020-05-31/distribution/{dist_id}/invalidation"),
2633 "",
2634 inv_body,
2635 ))
2636 .await
2637 .unwrap();
2638 assert_eq!(inv_resp.status, StatusCode::CREATED);
2639 let inv_xml = std::str::from_utf8(inv_resp.body.expect_bytes()).unwrap();
2640 let inv_id = inv_xml
2641 .split("<Id>")
2642 .nth(1)
2643 .unwrap()
2644 .split("</Id>")
2645 .next()
2646 .unwrap();
2647 let get = svc
2648 .handle(make_request(
2649 http::Method::GET,
2650 &format!("/2020-05-31/distribution/{dist_id}/invalidation/{inv_id}"),
2651 "",
2652 "",
2653 ))
2654 .await
2655 .unwrap();
2656 assert_eq!(get.status, StatusCode::OK);
2657 let list = svc
2658 .handle(make_request(
2659 http::Method::GET,
2660 &format!("/2020-05-31/distribution/{dist_id}/invalidation"),
2661 "",
2662 "",
2663 ))
2664 .await
2665 .unwrap();
2666 let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2667 assert!(xml.contains("<Quantity>1</Quantity>"));
2668 }
2669
2670 #[tokio::test]
2671 async fn tags_roundtrip() {
2672 let svc = CloudFrontService::new(make_state());
2673 let body = minimal_dist_config_xml("tag-ref");
2674 let create = svc
2675 .handle(make_request(
2676 http::Method::POST,
2677 "/2020-05-31/distribution",
2678 "",
2679 &body,
2680 ))
2681 .await
2682 .unwrap();
2683 let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2684 let arn = xml
2685 .split("<ARN>")
2686 .nth(1)
2687 .unwrap()
2688 .split("</ARN>")
2689 .next()
2690 .unwrap();
2691 let tag_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2692<Tags xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2693 <Items><Tag><Key>env</Key><Value>prod</Value></Tag></Items>
2694</Tags>"#;
2695 let arn_q = format!("Operation=Tag&Resource={}", arn);
2696 let resp = svc
2697 .handle(make_request(
2698 http::Method::POST,
2699 "/2020-05-31/tagging",
2700 &arn_q,
2701 tag_body,
2702 ))
2703 .await
2704 .unwrap();
2705 assert_eq!(resp.status, StatusCode::NO_CONTENT);
2706 let list = svc
2707 .handle(make_request(
2708 http::Method::GET,
2709 "/2020-05-31/tagging",
2710 &format!("Resource={}", arn),
2711 "",
2712 ))
2713 .await
2714 .unwrap();
2715 let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2716 assert!(xml.contains("<Key>env</Key>"));
2717 assert!(xml.contains("<Value>prod</Value>"));
2718 }
2719
2720 #[tokio::test]
2721 async fn xml_metacharacters_in_user_input_are_escaped() {
2722 let svc = CloudFrontService::new(make_state());
2723 let body = minimal_dist_config_xml("escape-ref").replace(
2724 "<Comment></Comment>",
2725 "<Comment><![CDATA[a&b<c>d]]></Comment>",
2726 );
2727 let create = svc
2728 .handle(make_request(
2729 http::Method::POST,
2730 "/2020-05-31/distribution",
2731 "",
2732 &body,
2733 ))
2734 .await
2735 .unwrap();
2736 let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2737 let dist_id = xml
2738 .split("<Id>")
2739 .nth(1)
2740 .unwrap()
2741 .split("</Id>")
2742 .next()
2743 .unwrap();
2744 let arn = xml
2745 .split("<ARN>")
2746 .nth(1)
2747 .unwrap()
2748 .split("</ARN>")
2749 .next()
2750 .unwrap();
2751
2752 let tag_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2753<Tags xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2754 <Items><Tag><Key>env</Key><Value>a&b<c>d</Value></Tag></Items>
2755</Tags>"#;
2756 let arn_q = format!("Operation=Tag&Resource={}", arn);
2757 svc.handle(make_request(
2758 http::Method::POST,
2759 "/2020-05-31/tagging",
2760 &arn_q,
2761 tag_body,
2762 ))
2763 .await
2764 .unwrap();
2765
2766 let list = svc
2767 .handle(make_request(
2768 http::Method::GET,
2769 "/2020-05-31/tagging",
2770 &format!("Resource={}", arn),
2771 "",
2772 ))
2773 .await
2774 .unwrap();
2775 let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2776 assert!(xml.contains("<Value>a&b<c>d</Value>"));
2777 assert!(!xml.contains("<Value>a&b<c>d</Value>"));
2778
2779 let list_resp = svc
2782 .handle(make_request(
2783 http::Method::GET,
2784 "/2020-05-31/distribution",
2785 "",
2786 "",
2787 ))
2788 .await
2789 .unwrap();
2790 let xml = std::str::from_utf8(list_resp.body.expect_bytes()).unwrap();
2791 assert!(!xml.contains("<Comment>a&b<c>d"));
2794 assert!(xml.contains(dist_id));
2795 }
2796
2797 fn predicate_dist_config_xml(caller_ref: &str) -> String {
2798 format!(
2799 r#"<?xml version="1.0" encoding="UTF-8"?>
2800<DistributionConfig xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2801 <CallerReference>{caller_ref}</CallerReference>
2802 <Origins>
2803 <Quantity>1</Quantity>
2804 <Items>
2805 <Origin>
2806 <Id>primary</Id>
2807 <DomainName>example.com</DomainName>
2808 <VpcOriginConfig><VpcOriginId>vo-123</VpcOriginId></VpcOriginConfig>
2809 </Origin>
2810 </Items>
2811 </Origins>
2812 <DefaultCacheBehavior>
2813 <TargetOriginId>primary</TargetOriginId>
2814 <ViewerProtocolPolicy>allow-all</ViewerProtocolPolicy>
2815 <CachePolicyId>cp-123</CachePolicyId>
2816 <OriginRequestPolicyId>orp-123</OriginRequestPolicyId>
2817 <ResponseHeadersPolicyId>rhp-123</ResponseHeadersPolicyId>
2818 <TrustedKeyGroups>
2819 <Enabled>true</Enabled>
2820 <Quantity>1</Quantity>
2821 <Items><KeyGroup>kg-123</KeyGroup></Items>
2822 </TrustedKeyGroups>
2823 </DefaultCacheBehavior>
2824 <Comment></Comment>
2825 <WebACLId>waf-abc</WebACLId>
2826 <AnycastIpListId>ail-123</AnycastIpListId>
2827 <Enabled>true</Enabled>
2828</DistributionConfig>"#
2829 )
2830 }
2831
2832 async fn list_by(svc: &CloudFrontService, path: &str) -> String {
2833 let resp = svc
2834 .handle(make_request(http::Method::GET, path, "", ""))
2835 .await
2836 .unwrap();
2837 assert_eq!(resp.status, StatusCode::OK);
2838 std::str::from_utf8(resp.body.expect_bytes())
2839 .unwrap()
2840 .to_string()
2841 }
2842
2843 #[tokio::test]
2844 async fn list_distributions_by_predicate_fields_filters_state() {
2845 let svc = CloudFrontService::new(make_state());
2846 let create = svc
2847 .handle(make_request(
2848 http::Method::POST,
2849 "/2020-05-31/distribution",
2850 "",
2851 &predicate_dist_config_xml("pred-ref"),
2852 ))
2853 .await
2854 .unwrap();
2855 let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2856 let id = xml
2857 .split("<Id>")
2858 .nth(1)
2859 .unwrap()
2860 .split("</Id>")
2861 .next()
2862 .unwrap()
2863 .to_string();
2864
2865 for (path, root) in [
2867 ("/2020-05-31/distributionsByCachePolicyId/cp-123", "cp-123"),
2868 (
2869 "/2020-05-31/distributionsByOriginRequestPolicyId/orp-123",
2870 "orp-123",
2871 ),
2872 (
2873 "/2020-05-31/distributionsByResponseHeadersPolicyId/rhp-123",
2874 "rhp-123",
2875 ),
2876 ("/2020-05-31/distributionsByKeyGroupId/kg-123", "kg-123"),
2877 ("/2020-05-31/distributionsByVpcOriginId/vo-123", "vo-123"),
2878 ] {
2879 let body = list_by(&svc, path).await;
2880 assert!(body.contains("<DistributionIdList"), "{root}: {body}");
2881 assert!(
2882 body.contains(&format!("<DistributionId>{id}</DistributionId>")),
2883 "{root} did not list distribution: {body}"
2884 );
2885 assert!(body.contains("<Quantity>1</Quantity>"), "{root}: {body}");
2886 }
2887
2888 let empty = list_by(&svc, "/2020-05-31/distributionsByCachePolicyId/other").await;
2890 assert!(empty.contains("<Quantity>0</Quantity>"), "{empty}");
2891 assert!(!empty.contains("<DistributionId>"), "{empty}");
2892
2893 let web = list_by(&svc, "/2020-05-31/distributionsByWebACLId/waf-abc").await;
2895 assert!(web.contains("<DistributionList"), "{web}");
2896 assert!(web.contains(&format!("<Id>{id}</Id>")), "{web}");
2897
2898 let anycast = list_by(&svc, "/2020-05-31/distributionsByAnycastIpListId/ail-123").await;
2899 assert!(anycast.contains("<DistributionList"), "{anycast}");
2900 assert!(anycast.contains(&format!("<Id>{id}</Id>")), "{anycast}");
2901
2902 let anycast_miss = list_by(&svc, "/2020-05-31/distributionsByAnycastIpListId/nope").await;
2903 assert!(
2904 anycast_miss.contains("<Quantity>0</Quantity>"),
2905 "{anycast_miss}"
2906 );
2907 }
2908
2909 fn dist_config_with_cache_policy(caller_ref: &str, cache_policy: &str) -> String {
2910 format!(
2911 r#"<?xml version="1.0" encoding="UTF-8"?>
2912<DistributionConfig xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2913 <CallerReference>{caller_ref}</CallerReference>
2914 <Origins>
2915 <Quantity>1</Quantity>
2916 <Items><Origin><Id>primary</Id><DomainName>example.com</DomainName></Origin></Items>
2917 </Origins>
2918 <DefaultCacheBehavior>
2919 <TargetOriginId>primary</TargetOriginId>
2920 <ViewerProtocolPolicy>allow-all</ViewerProtocolPolicy>
2921 <CachePolicyId>{cache_policy}</CachePolicyId>
2922 </DefaultCacheBehavior>
2923 <Comment></Comment>
2924 <Enabled>true</Enabled>
2925</DistributionConfig>"#
2926 )
2927 }
2928
2929 async fn create_dist_returning_id(svc: &CloudFrontService, config_xml: &str) -> String {
2930 let create = svc
2931 .handle(make_request(
2932 http::Method::POST,
2933 "/2020-05-31/distribution",
2934 "",
2935 config_xml,
2936 ))
2937 .await
2938 .unwrap();
2939 let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2940 xml.split("<Id>")
2941 .nth(1)
2942 .unwrap()
2943 .split("</Id>")
2944 .next()
2945 .unwrap()
2946 .to_string()
2947 }
2948
2949 fn make_request_with_params(path: &str, params: &[(&str, &str)]) -> AwsRequest {
2950 let mut r = make_request(http::Method::GET, path, "", "");
2951 r.query_params = params
2952 .iter()
2953 .map(|(k, v)| (k.to_string(), v.to_string()))
2954 .collect();
2955 r
2956 }
2957
2958 #[tokio::test]
2959 async fn list_distributions_by_web_acl_id_null_lists_unassociated() {
2960 let svc = CloudFrontService::new(make_state());
2962 let with_acl = create_dist_returning_id(
2963 &svc,
2964 &predicate_dist_config_xml("null-with-acl"), )
2966 .await;
2967 let without_acl =
2968 create_dist_returning_id(&svc, &dist_config_with_cache_policy("null-no-acl", "cp-x"))
2969 .await;
2970
2971 let body = list_by(&svc, "/2020-05-31/distributionsByWebACLId/null").await;
2972 assert!(body.contains("<DistributionList"), "{body}");
2973 assert!(
2974 body.contains(&format!("<Id>{without_acl}</Id>")),
2975 "unassociated distribution missing from null listing: {body}"
2976 );
2977 assert!(
2978 !body.contains(&format!("<Id>{with_acl}</Id>")),
2979 "distribution WITH a web ACL leaked into the null listing: {body}"
2980 );
2981 }
2982
2983 fn only_distribution_id(xml: &str) -> String {
2984 assert_eq!(
2985 xml.matches("<DistributionId>").count(),
2986 1,
2987 "expected exactly one DistributionId: {xml}"
2988 );
2989 xml.split("<DistributionId>")
2990 .nth(1)
2991 .unwrap()
2992 .split("</DistributionId>")
2993 .next()
2994 .unwrap()
2995 .to_string()
2996 }
2997
2998 #[tokio::test]
2999 async fn list_distributions_by_predicate_paginates() {
3000 let svc = CloudFrontService::new(make_state());
3004 let d1 =
3006 create_dist_returning_id(&svc, &dist_config_with_cache_policy("pg-1", "cp-pg")).await;
3007 let d2 =
3008 create_dist_returning_id(&svc, &dist_config_with_cache_policy("pg-2", "cp-pg")).await;
3009
3010 let page1 = svc
3012 .handle(make_request_with_params(
3013 "/2020-05-31/distributionsByCachePolicyId/cp-pg",
3014 &[("MaxItems", "1")],
3015 ))
3016 .await
3017 .unwrap();
3018 let xml1 = std::str::from_utf8(page1.body.expect_bytes()).unwrap();
3019 assert!(xml1.contains("<MaxItems>1</MaxItems>"), "{xml1}");
3020 assert!(xml1.contains("<IsTruncated>true</IsTruncated>"), "{xml1}");
3021 let first_id = only_distribution_id(xml1);
3022 let next = xml1
3023 .split("<NextMarker>")
3024 .nth(1)
3025 .expect("NextMarker present")
3026 .split("</NextMarker>")
3027 .next()
3028 .unwrap()
3029 .to_string();
3030 assert_eq!(next, first_id, "NextMarker must be the last id on the page");
3032
3033 let page2 = svc
3036 .handle(make_request_with_params(
3037 "/2020-05-31/distributionsByCachePolicyId/cp-pg",
3038 &[("MaxItems", "1"), ("Marker", &next)],
3039 ))
3040 .await
3041 .unwrap();
3042 let xml2 = std::str::from_utf8(page2.body.expect_bytes()).unwrap();
3043 assert!(xml2.contains(&format!("<Marker>{next}</Marker>")), "{xml2}");
3044 assert!(xml2.contains("<IsTruncated>false</IsTruncated>"), "{xml2}");
3045 let second_id = only_distribution_id(xml2);
3046 assert_ne!(
3047 second_id, first_id,
3048 "exclusive marker must not return the marker item again"
3049 );
3050 let mut seen = [first_id, second_id];
3052 seen.sort();
3053 let mut expected = [d1, d2];
3054 expected.sort();
3055 assert_eq!(seen, expected, "pages did not cover both distributions");
3056 }
3057}