1use std::sync::Arc;
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use chrono::Utc;
8use http::header::{HeaderName, HeaderValue, ETAG, IF_MATCH, LOCATION};
9use http::{HeaderMap, StatusCode};
10use parking_lot::RwLock;
11use tokio::sync::Mutex as AsyncMutex;
12use uuid::Uuid;
13
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError, ResponseBody};
15use fakecloud_persistence::SnapshotStore;
16
17use crate::model::{
18 DistributionConfig, DistributionConfigWithTags, InvalidationBatch, TagKeys, Tags as ModelTags,
19};
20use crate::router::{route, Route};
21use crate::state::{
22 CloudFrontAccounts, CloudFrontSnapshot, SharedCloudFrontState, StoredDistribution,
23 StoredInvalidation, Tag, CLOUDFRONT_SNAPSHOT_SCHEMA_VERSION,
24};
25use crate::xml_io;
26
27fn is_mutating_action(action: &str) -> bool {
31 const PREFIXES: &[&str] = &[
32 "Create",
33 "Update",
34 "Delete",
35 "Copy",
36 "Associate",
37 "Disassociate",
38 "Tag",
39 "Untag",
40 "Publish",
41 "Put",
42 ];
43 PREFIXES.iter().any(|p| action.starts_with(p))
44}
45
46pub(crate) const DEFAULT_ACCOUNT: &str = "000000000000";
47
48const SUPPORTED_ACTIONS: &[&str] = &[
49 "CreateDistribution",
50 "CreateDistributionWithTags",
51 "GetDistribution",
52 "GetDistributionConfig",
53 "UpdateDistribution",
54 "DeleteDistribution",
55 "ListDistributions",
56 "CopyDistribution",
57 "CreateInvalidation",
58 "GetInvalidation",
59 "ListInvalidations",
60 "TagResource",
61 "UntagResource",
62 "ListTagsForResource",
63 "AssociateAlias",
64 "ListConflictingAliases",
65 "ListDistributionsByCachePolicyId",
66 "ListDistributionsByOriginRequestPolicyId",
67 "ListDistributionsByResponseHeadersPolicyId",
68 "ListDistributionsByKeyGroup",
69 "ListDistributionsByWebACLId",
70 "ListDistributionsByVpcOriginId",
71 "ListDistributionsByAnycastIpListId",
72 "ListDistributionsByConnectionMode",
73 "ListDistributionsByConnectionFunction",
74 "ListDistributionsByOwnedResource",
75 "ListDistributionsByTrustStore",
76 "ListDistributionsByRealtimeLogConfig",
77 "AssociateDistributionWebACL",
78 "DisassociateDistributionWebACL",
79 "CreateOriginAccessControl",
80 "GetOriginAccessControl",
81 "GetOriginAccessControlConfig",
82 "UpdateOriginAccessControl",
83 "DeleteOriginAccessControl",
84 "ListOriginAccessControls",
85 "CreateCachePolicy",
86 "GetCachePolicy",
87 "GetCachePolicyConfig",
88 "UpdateCachePolicy",
89 "DeleteCachePolicy",
90 "ListCachePolicies",
91 "CreateOriginRequestPolicy",
92 "GetOriginRequestPolicy",
93 "GetOriginRequestPolicyConfig",
94 "UpdateOriginRequestPolicy",
95 "DeleteOriginRequestPolicy",
96 "ListOriginRequestPolicies",
97 "CreateResponseHeadersPolicy",
98 "GetResponseHeadersPolicy",
99 "GetResponseHeadersPolicyConfig",
100 "UpdateResponseHeadersPolicy",
101 "DeleteResponseHeadersPolicy",
102 "ListResponseHeadersPolicies",
103 "CreateContinuousDeploymentPolicy",
104 "GetContinuousDeploymentPolicy",
105 "GetContinuousDeploymentPolicyConfig",
106 "UpdateContinuousDeploymentPolicy",
107 "DeleteContinuousDeploymentPolicy",
108 "ListContinuousDeploymentPolicies",
109 "CreateFunction",
110 "DescribeFunction",
111 "GetFunction",
112 "UpdateFunction",
113 "DeleteFunction",
114 "ListFunctions",
115 "PublishFunction",
116 "TestFunction",
117 "CreatePublicKey",
118 "GetPublicKey",
119 "GetPublicKeyConfig",
120 "UpdatePublicKey",
121 "DeletePublicKey",
122 "ListPublicKeys",
123 "CreateKeyGroup",
124 "GetKeyGroup",
125 "GetKeyGroupConfig",
126 "UpdateKeyGroup",
127 "DeleteKeyGroup",
128 "ListKeyGroups",
129 "CreateKeyValueStore",
130 "DescribeKeyValueStore",
131 "UpdateKeyValueStore",
132 "DeleteKeyValueStore",
133 "ListKeyValueStores",
134 "CreateCloudFrontOriginAccessIdentity",
135 "GetCloudFrontOriginAccessIdentity",
136 "GetCloudFrontOriginAccessIdentityConfig",
137 "UpdateCloudFrontOriginAccessIdentity",
138 "DeleteCloudFrontOriginAccessIdentity",
139 "ListCloudFrontOriginAccessIdentities",
140 "CreateMonitoringSubscription",
141 "GetMonitoringSubscription",
142 "DeleteMonitoringSubscription",
143 "CreateStreamingDistribution",
144 "CreateStreamingDistributionWithTags",
145 "GetStreamingDistribution",
146 "GetStreamingDistributionConfig",
147 "UpdateStreamingDistribution",
148 "DeleteStreamingDistribution",
149 "ListStreamingDistributions",
150 "CreateFieldLevelEncryptionConfig",
151 "GetFieldLevelEncryption",
152 "GetFieldLevelEncryptionConfig",
153 "UpdateFieldLevelEncryptionConfig",
154 "DeleteFieldLevelEncryptionConfig",
155 "ListFieldLevelEncryptionConfigs",
156 "CreateFieldLevelEncryptionProfile",
157 "GetFieldLevelEncryptionProfile",
158 "GetFieldLevelEncryptionProfileConfig",
159 "UpdateFieldLevelEncryptionProfile",
160 "DeleteFieldLevelEncryptionProfile",
161 "ListFieldLevelEncryptionProfiles",
162 "CreateRealtimeLogConfig",
163 "GetRealtimeLogConfig",
164 "UpdateRealtimeLogConfig",
165 "DeleteRealtimeLogConfig",
166 "ListRealtimeLogConfigs",
167 "CreateVpcOrigin",
168 "GetVpcOrigin",
169 "UpdateVpcOrigin",
170 "DeleteVpcOrigin",
171 "ListVpcOrigins",
172 "CreateAnycastIpList",
173 "GetAnycastIpList",
174 "UpdateAnycastIpList",
175 "DeleteAnycastIpList",
176 "ListAnycastIpLists",
177 "CreateTrustStore",
178 "GetTrustStore",
179 "UpdateTrustStore",
180 "DeleteTrustStore",
181 "ListTrustStores",
182 "GetResourcePolicy",
183 "PutResourcePolicy",
184 "DeleteResourcePolicy",
185 "CreateConnectionGroup",
186 "GetConnectionGroup",
187 "GetConnectionGroupByRoutingEndpoint",
188 "UpdateConnectionGroup",
189 "DeleteConnectionGroup",
190 "ListConnectionGroups",
191 "ListDomainConflicts",
192 "UpdateDomainAssociation",
193 "VerifyDnsConfiguration",
194 "GetManagedCertificateDetails",
195 "UpdateDistributionWithStagingConfig",
196 "CreateDistributionTenant",
197 "GetDistributionTenant",
198 "GetDistributionTenantByDomain",
199 "UpdateDistributionTenant",
200 "DeleteDistributionTenant",
201 "ListDistributionTenants",
202 "ListDistributionTenantsByCustomization",
203 "AssociateDistributionTenantWebACL",
204 "DisassociateDistributionTenantWebACL",
205 "CreateInvalidationForDistributionTenant",
206 "GetInvalidationForDistributionTenant",
207 "ListInvalidationsForDistributionTenant",
208 "CreateConnectionFunction",
209 "GetConnectionFunction",
210 "DescribeConnectionFunction",
211 "UpdateConnectionFunction",
212 "DeleteConnectionFunction",
213 "ListConnectionFunctions",
214 "PublishConnectionFunction",
215 "TestConnectionFunction",
216];
217
218pub struct CloudFrontService {
219 pub(crate) state: SharedCloudFrontState,
220 pub(crate) propagation_delay: std::time::Duration,
230 pub(crate) snapshot_store: Option<Arc<dyn SnapshotStore>>,
231 pub(crate) snapshot_lock: Arc<AsyncMutex<()>>,
232}
233
234fn default_propagation_delay() -> std::time::Duration {
241 let Ok(raw) = std::env::var("FAKECLOUD_CLOUDFRONT_STATUS_DELAY_SEC") else {
242 return std::time::Duration::from_secs(1);
243 };
244 let trimmed = raw.trim();
245 if let Ok(secs) = trimmed.parse::<u64>() {
246 return std::time::Duration::from_secs(secs);
247 }
248 if let Ok(secs) = trimmed.parse::<f64>() {
249 if secs.is_finite() && secs >= 0.0 {
250 return std::time::Duration::from_secs_f64(secs);
251 }
252 }
253 std::time::Duration::from_secs(1)
254}
255
256impl CloudFrontService {
257 pub fn new(state: SharedCloudFrontState) -> Self {
258 Self {
259 state,
260 propagation_delay: default_propagation_delay(),
261 snapshot_store: None,
262 snapshot_lock: Arc::new(AsyncMutex::new(())),
263 }
264 }
265
266 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
267 self.snapshot_store = Some(store);
268 self
269 }
270
271 pub fn shared_state(&self) -> SharedCloudFrontState {
272 Arc::clone(&self.state)
273 }
274
275 async fn save_snapshot(&self) {
279 save_cloudfront_snapshot(
280 &self.state,
281 self.snapshot_store.clone(),
282 &self.snapshot_lock,
283 )
284 .await;
285 }
286
287 pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
292 let store = self.snapshot_store.clone()?;
293 let state = self.state.clone();
294 let lock = self.snapshot_lock.clone();
295 Some(Arc::new(move || {
296 let state = state.clone();
297 let store = store.clone();
298 let lock = lock.clone();
299 Box::pin(async move {
300 save_cloudfront_snapshot(&state, Some(store), &lock).await;
301 })
302 }))
303 }
304
305 pub fn rearm_in_progress(&self) {
310 let (dists, tenants, groups, streaming) = {
311 let s = self.state.read();
312 let mut dists = Vec::new();
313 let mut tenants = Vec::new();
314 let mut groups = Vec::new();
315 let mut streaming = Vec::new();
316 for account in s.accounts.values() {
317 for (id, d) in &account.distributions {
318 if d.status == "InProgress" {
319 dists.push(id.clone());
320 }
321 }
322 for (id, t) in &account.distribution_tenants {
323 if t.status == "InProgress" {
324 tenants.push(id.clone());
325 }
326 }
327 for (id, g) in &account.connection_groups {
328 if g.status == "InProgress" {
329 groups.push(id.clone());
330 }
331 }
332 for (id, d) in &account.streaming_distributions {
333 if d.status == "InProgress" {
334 streaming.push(id.clone());
335 }
336 }
337 }
338 (dists, tenants, groups, streaming)
339 };
340 for id in dists {
341 self.schedule_distribution_deploy(id);
342 }
343 for id in tenants {
344 self.schedule_distribution_tenant_deploy(id);
345 }
346 for id in groups {
347 self.schedule_connection_group_deploy(id);
348 }
349 for id in streaming {
350 self.schedule_streaming_distribution_deploy(id);
351 }
352 }
353
354 pub fn with_propagation_delay(mut self, delay: std::time::Duration) -> Self {
360 self.propagation_delay = delay;
361 self
362 }
363
364 pub fn set_distribution_status(&self, id: &str, status: &str) -> bool {
370 let mut state = self.state.write();
371 for account in state.accounts.values_mut() {
372 if let Some(dist) = account.distributions.get_mut(id) {
373 dist.status = status.to_string();
374 return true;
375 }
376 }
377 false
378 }
379
380 pub async fn set_distribution_status_persistent(&self, id: &str, status: &str) -> bool {
383 let changed = self.set_distribution_status(id, status);
384 if changed {
385 self.save_snapshot().await;
386 }
387 changed
388 }
389}
390
391pub async fn save_cloudfront_snapshot(
397 state: &SharedCloudFrontState,
398 store: Option<Arc<dyn SnapshotStore>>,
399 lock: &AsyncMutex<()>,
400) {
401 let Some(store) = store else {
402 return;
403 };
404 let _guard = lock.lock().await;
405 let snapshot = CloudFrontSnapshot {
406 schema_version: CLOUDFRONT_SNAPSHOT_SCHEMA_VERSION,
407 accounts: Some(state.read().clone()),
408 };
409 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
410 let bytes = serde_json::to_vec(&snapshot)
411 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
412 store.save(&bytes)
413 })
414 .await;
415 match join {
416 Ok(Ok(())) => {}
417 Ok(Err(err)) => tracing::error!(%err, "failed to write cloudfront snapshot"),
418 Err(err) => tracing::error!(%err, "cloudfront snapshot task panicked"),
419 }
420}
421
422impl Default for CloudFrontService {
423 fn default() -> Self {
424 Self::new(Arc::new(RwLock::new(CloudFrontAccounts::new())))
425 }
426}
427
428#[async_trait]
429impl AwsService for CloudFrontService {
430 fn service_name(&self) -> &str {
431 "cloudfront"
432 }
433
434 fn supported_actions(&self) -> &[&str] {
435 SUPPORTED_ACTIONS
436 }
437
438 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
439 let resolved = match route(&req.method, &req.raw_path, &req.raw_query) {
440 Some(r) => r,
441 None => {
442 return Err(aws_error(
443 StatusCode::NOT_FOUND,
444 "InvalidArgument",
445 format!("Unknown CloudFront route: {} {}", req.method, req.raw_path),
446 ));
447 }
448 };
449
450 let mutates = is_mutating_action(resolved.action);
451 let result = match resolved.action {
452 "CreateDistribution" => self.create_distribution(&req, false),
453 "CreateDistributionWithTags" => self.create_distribution(&req, true),
454 "GetDistribution" => self.get_distribution(&resolved),
455 "GetDistributionConfig" => self.get_distribution_config(&resolved),
456 "UpdateDistribution" => self.update_distribution(&req, &resolved),
457 "DeleteDistribution" => self.delete_distribution(&req, &resolved),
458 "ListDistributions" => self.list_distributions(&req),
459 "CopyDistribution" => self.copy_distribution(&req, &resolved),
460 "CreateInvalidation" => self.create_invalidation(&req, &resolved),
461 "GetInvalidation" => self.get_invalidation(&resolved),
462 "ListInvalidations" => self.list_invalidations(&resolved),
463 "TagResource" => self.tag_resource(&req),
464 "UntagResource" => self.untag_resource(&req),
465 "ListTagsForResource" => self.list_tags_for_resource(&req),
466 "AssociateAlias" => self.associate_alias(&req, &resolved),
467 "ListConflictingAliases" => self.list_conflicting_aliases(&req),
468 "AssociateDistributionWebACL" => self.associate_web_acl(&req, &resolved),
469 "DisassociateDistributionWebACL" => self.disassociate_web_acl(&req, &resolved),
470 "ListDistributionsByCachePolicyId"
471 | "ListDistributionsByOriginRequestPolicyId"
472 | "ListDistributionsByResponseHeadersPolicyId"
473 | "ListDistributionsByKeyGroup"
474 | "ListDistributionsByWebACLId"
475 | "ListDistributionsByVpcOriginId"
476 | "ListDistributionsByAnycastIpListId"
477 | "ListDistributionsByConnectionMode"
478 | "ListDistributionsByConnectionFunction"
479 | "ListDistributionsByOwnedResource"
480 | "ListDistributionsByTrustStore"
481 | "ListDistributionsByRealtimeLogConfig" => {
482 self.list_distributions_by(&req, &resolved, resolved.action)
483 }
484 "CreateOriginAccessControl" => self.create_origin_access_control(&req),
485 "GetOriginAccessControl" => self.get_origin_access_control(&resolved),
486 "GetOriginAccessControlConfig" => self.get_origin_access_control_config(&resolved),
487 "UpdateOriginAccessControl" => self.update_origin_access_control(&req, &resolved),
488 "DeleteOriginAccessControl" => self.delete_origin_access_control(&req, &resolved),
489 "ListOriginAccessControls" => self.list_origin_access_controls(&req),
490 "CreateCachePolicy" => self.create_cache_policy(&req),
491 "GetCachePolicy" => self.get_cache_policy(&resolved),
492 "GetCachePolicyConfig" => self.get_cache_policy_config(&resolved),
493 "UpdateCachePolicy" => self.update_cache_policy(&req, &resolved),
494 "DeleteCachePolicy" => self.delete_cache_policy(&req, &resolved),
495 "ListCachePolicies" => self.list_cache_policies(&req),
496 "CreateOriginRequestPolicy" => self.create_origin_request_policy(&req),
497 "GetOriginRequestPolicy" => self.get_origin_request_policy(&resolved),
498 "GetOriginRequestPolicyConfig" => self.get_origin_request_policy_config(&resolved),
499 "UpdateOriginRequestPolicy" => self.update_origin_request_policy(&req, &resolved),
500 "DeleteOriginRequestPolicy" => self.delete_origin_request_policy(&req, &resolved),
501 "ListOriginRequestPolicies" => self.list_origin_request_policies(&req),
502 "CreateResponseHeadersPolicy" => self.create_response_headers_policy(&req),
503 "GetResponseHeadersPolicy" => self.get_response_headers_policy(&resolved),
504 "GetResponseHeadersPolicyConfig" => self.get_response_headers_policy_config(&resolved),
505 "UpdateResponseHeadersPolicy" => self.update_response_headers_policy(&req, &resolved),
506 "DeleteResponseHeadersPolicy" => self.delete_response_headers_policy(&req, &resolved),
507 "ListResponseHeadersPolicies" => self.list_response_headers_policies(&req),
508 "CreateContinuousDeploymentPolicy" => self.create_continuous_deployment_policy(&req),
509 "GetContinuousDeploymentPolicy" => self.get_continuous_deployment_policy(&resolved),
510 "GetContinuousDeploymentPolicyConfig" => {
511 self.get_continuous_deployment_policy_config(&resolved)
512 }
513 "UpdateContinuousDeploymentPolicy" => {
514 self.update_continuous_deployment_policy(&req, &resolved)
515 }
516 "DeleteContinuousDeploymentPolicy" => {
517 self.delete_continuous_deployment_policy(&req, &resolved)
518 }
519 "ListContinuousDeploymentPolicies" => self.list_continuous_deployment_policies(&req),
520 "CreateFunction" => self.create_function(&req),
521 "DescribeFunction" => self.describe_function(&req, &resolved),
522 "GetFunction" => self.get_function(&req, &resolved),
523 "UpdateFunction" => self.update_function(&req, &resolved),
524 "DeleteFunction" => self.delete_function(&req, &resolved),
525 "ListFunctions" => self.list_functions(&req),
526 "PublishFunction" => self.publish_function(&req, &resolved),
527 "TestFunction" => self.test_function(&req, &resolved),
528 "CreatePublicKey" => self.create_public_key(&req),
529 "GetPublicKey" => self.get_public_key(&resolved),
530 "GetPublicKeyConfig" => self.get_public_key_config(&resolved),
531 "UpdatePublicKey" => self.update_public_key(&req, &resolved),
532 "DeletePublicKey" => self.delete_public_key(&req, &resolved),
533 "ListPublicKeys" => self.list_public_keys(&req),
534 "CreateKeyGroup" => self.create_key_group(&req),
535 "GetKeyGroup" => self.get_key_group(&resolved),
536 "GetKeyGroupConfig" => self.get_key_group_config(&resolved),
537 "UpdateKeyGroup" => self.update_key_group(&req, &resolved),
538 "DeleteKeyGroup" => self.delete_key_group(&req, &resolved),
539 "ListKeyGroups" => self.list_key_groups(&req),
540 "CreateKeyValueStore" => self.create_key_value_store(&req),
541 "DescribeKeyValueStore" => self.describe_key_value_store(&resolved),
542 "UpdateKeyValueStore" => self.update_key_value_store(&req, &resolved),
543 "DeleteKeyValueStore" => self.delete_key_value_store(&req, &resolved),
544 "ListKeyValueStores" => self.list_key_value_stores(&req),
545 "CreateCloudFrontOriginAccessIdentity" => self.create_oai(&req),
546 "GetCloudFrontOriginAccessIdentity" => self.get_oai(&resolved),
547 "GetCloudFrontOriginAccessIdentityConfig" => self.get_oai_config(&resolved),
548 "UpdateCloudFrontOriginAccessIdentity" => self.update_oai(&req, &resolved),
549 "DeleteCloudFrontOriginAccessIdentity" => self.delete_oai(&req, &resolved),
550 "ListCloudFrontOriginAccessIdentities" => self.list_oai(&req),
551 "CreateMonitoringSubscription" => self.create_monitoring_subscription(&req, &resolved),
552 "GetMonitoringSubscription" => self.get_monitoring_subscription(&resolved),
553 "DeleteMonitoringSubscription" => self.delete_monitoring_subscription(&resolved),
554 "CreateStreamingDistribution" => self.create_streaming_distribution(&req, false),
555 "CreateStreamingDistributionWithTags" => self.create_streaming_distribution(&req, true),
556 "GetStreamingDistribution" => self.get_streaming_distribution(&resolved),
557 "GetStreamingDistributionConfig" => self.get_streaming_distribution_config(&resolved),
558 "UpdateStreamingDistribution" => self.update_streaming_distribution(&req, &resolved),
559 "DeleteStreamingDistribution" => self.delete_streaming_distribution(&req, &resolved),
560 "ListStreamingDistributions" => self.list_streaming_distributions(&req),
561 "CreateFieldLevelEncryptionConfig" => self.create_field_level_encryption_config(&req),
562 "GetFieldLevelEncryption" => self.get_field_level_encryption(&resolved),
563 "GetFieldLevelEncryptionConfig" => self.get_field_level_encryption_config(&resolved),
564 "UpdateFieldLevelEncryptionConfig" => {
565 self.update_field_level_encryption_config(&req, &resolved)
566 }
567 "DeleteFieldLevelEncryptionConfig" => {
568 self.delete_field_level_encryption_config(&req, &resolved)
569 }
570 "ListFieldLevelEncryptionConfigs" => self.list_field_level_encryption_configs(&req),
571 "CreateFieldLevelEncryptionProfile" => self.create_field_level_encryption_profile(&req),
572 "GetFieldLevelEncryptionProfile" => self.get_field_level_encryption_profile(&resolved),
573 "GetFieldLevelEncryptionProfileConfig" => {
574 self.get_field_level_encryption_profile_config(&resolved)
575 }
576 "UpdateFieldLevelEncryptionProfile" => {
577 self.update_field_level_encryption_profile(&req, &resolved)
578 }
579 "DeleteFieldLevelEncryptionProfile" => {
580 self.delete_field_level_encryption_profile(&req, &resolved)
581 }
582 "ListFieldLevelEncryptionProfiles" => self.list_field_level_encryption_profiles(&req),
583 "CreateRealtimeLogConfig" => self.create_realtime_log_config(&req),
584 "GetRealtimeLogConfig" => self.get_realtime_log_config(&req),
585 "UpdateRealtimeLogConfig" => self.update_realtime_log_config(&req),
586 "DeleteRealtimeLogConfig" => self.delete_realtime_log_config(&req),
587 "ListRealtimeLogConfigs" => self.list_realtime_log_configs(&req),
588 "CreateVpcOrigin" => self.create_vpc_origin(&req),
589 "GetVpcOrigin" => self.get_vpc_origin(&resolved),
590 "UpdateVpcOrigin" => self.update_vpc_origin(&req, &resolved),
591 "DeleteVpcOrigin" => self.delete_vpc_origin(&req, &resolved),
592 "ListVpcOrigins" => self.list_vpc_origins(&req),
593 "CreateAnycastIpList" => self.create_anycast_ip_list(&req),
594 "GetAnycastIpList" => self.get_anycast_ip_list(&resolved),
595 "UpdateAnycastIpList" => self.update_anycast_ip_list(&req, &resolved),
596 "DeleteAnycastIpList" => self.delete_anycast_ip_list(&req, &resolved),
597 "ListAnycastIpLists" => self.list_anycast_ip_lists(&req),
598 "CreateTrustStore" => self.create_trust_store(&req),
599 "GetTrustStore" => self.get_trust_store(&resolved),
600 "UpdateTrustStore" => self.update_trust_store(&req, &resolved),
601 "DeleteTrustStore" => self.delete_trust_store(&req, &resolved),
602 "ListTrustStores" => self.list_trust_stores(&req),
603 "GetResourcePolicy" => self.get_resource_policy(&req),
604 "PutResourcePolicy" => self.put_resource_policy(&req),
605 "DeleteResourcePolicy" => self.delete_resource_policy(&req),
606 "CreateConnectionGroup" => self.create_connection_group(&req),
607 "GetConnectionGroup" => self.get_connection_group(&resolved),
608 "GetConnectionGroupByRoutingEndpoint" => {
609 self.get_connection_group_by_routing_endpoint(&req)
610 }
611 "UpdateConnectionGroup" => self.update_connection_group(&req, &resolved),
612 "DeleteConnectionGroup" => self.delete_connection_group(&req, &resolved),
613 "ListConnectionGroups" => self.list_connection_groups(&req),
614 "ListDomainConflicts" => self.list_domain_conflicts(&req),
615 "UpdateDomainAssociation" => self.update_domain_association(&req),
616 "VerifyDnsConfiguration" => self.verify_dns_configuration(&req),
617 "GetManagedCertificateDetails" => self.get_managed_certificate_details(&resolved),
618 "UpdateDistributionWithStagingConfig" => {
619 self.update_distribution_with_staging_config(&req, &resolved)
620 }
621 "CreateDistributionTenant" => self.create_distribution_tenant(&req),
622 "GetDistributionTenant" => self.get_distribution_tenant(&resolved),
623 "GetDistributionTenantByDomain" => self.get_distribution_tenant_by_domain(&req),
624 "UpdateDistributionTenant" => self.update_distribution_tenant(&req, &resolved),
625 "DeleteDistributionTenant" => self.delete_distribution_tenant(&req, &resolved),
626 "ListDistributionTenants" => self.list_distribution_tenants(&req),
627 "ListDistributionTenantsByCustomization" => {
628 self.list_distribution_tenants_by_customization(&req)
629 }
630 "AssociateDistributionTenantWebACL" => {
631 self.associate_distribution_tenant_web_acl(&req, &resolved)
632 }
633 "DisassociateDistributionTenantWebACL" => {
634 self.disassociate_distribution_tenant_web_acl(&req, &resolved)
635 }
636 "CreateInvalidationForDistributionTenant" => {
637 self.create_invalidation_for_distribution_tenant(&req, &resolved)
638 }
639 "GetInvalidationForDistributionTenant" => {
640 self.get_invalidation_for_distribution_tenant(&resolved)
641 }
642 "ListInvalidationsForDistributionTenant" => {
643 self.list_invalidations_for_distribution_tenant(&resolved)
644 }
645 "CreateConnectionFunction" => self.create_connection_function(&req),
646 "GetConnectionFunction" => self.get_connection_function(&resolved),
647 "DescribeConnectionFunction" => self.describe_connection_function(&resolved),
648 "UpdateConnectionFunction" => self.update_connection_function(&req, &resolved),
649 "DeleteConnectionFunction" => self.delete_connection_function(&req, &resolved),
650 "ListConnectionFunctions" => self.list_connection_functions(&req),
651 "PublishConnectionFunction" => self.publish_connection_function(&req, &resolved),
652 "TestConnectionFunction" => self.test_connection_function(&req, &resolved),
653 other => Err(aws_error(
654 StatusCode::NOT_IMPLEMENTED,
655 "InvalidAction",
656 format!("CloudFront action {other} is not implemented yet"),
657 )),
658 };
659 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
660 self.save_snapshot().await;
661 }
662 result
663 }
664}
665
666impl CloudFrontService {
669 fn create_distribution(
670 &self,
671 req: &AwsRequest,
672 with_tags: bool,
673 ) -> Result<AwsResponse, AwsServiceError> {
674 let (config, tags) = if with_tags {
675 let parsed: DistributionConfigWithTags = xml_io::from_xml_root(&req.body)
676 .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
677 let tags = parsed
678 .tags
679 .items
680 .map(|i| {
681 i.tag
682 .into_iter()
683 .map(|t| Tag {
684 key: t.key,
685 value: t.value,
686 })
687 .collect()
688 })
689 .unwrap_or_default();
690 (parsed.distribution_config, tags)
691 } else {
692 let parsed: DistributionConfig = xml_io::from_xml_root(&req.body)
693 .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
694 (parsed, Vec::new())
695 };
696
697 validate_caller_reference(&config.caller_reference)?;
698 validate_origins(&config)?;
699
700 let mut state = self.state.write();
701 let account = state.entry(account_id(req));
702
703 if let Some(existing) = account
704 .distributions
705 .values()
706 .find(|d| d.config.caller_reference == config.caller_reference)
707 {
708 return Err(aws_error(
709 StatusCode::CONFLICT,
710 "DistributionAlreadyExists",
711 format!(
712 "Distribution with the same CallerReference exists: {}",
713 existing.id
714 ),
715 ));
716 }
717
718 let id = generate_distribution_id();
719 let now = Utc::now();
720 let etag = generate_etag();
721 let domain = format!("{}.cloudfront.net", id.to_lowercase());
722 let arn = format!(
723 "arn:aws:cloudfront::{}:distribution/{}",
724 account_id(req),
725 id
726 );
727
728 let stored = StoredDistribution {
729 id: id.clone(),
730 arn: arn.clone(),
731 status: "InProgress".to_string(),
735 last_modified_time: now,
736 domain_name: domain,
737 in_progress_invalidation_batches: 0,
738 etag: etag.clone(),
739 config,
740 };
741 account.distributions.insert(id.clone(), stored.clone());
742 if !tags.is_empty() {
743 account.tags.insert(arn.clone(), tags);
744 }
745 drop(state);
746
747 self.schedule_distribution_deploy(id.clone());
748
749 let body = build_distribution_xml(&stored);
750 let mut headers = HeaderMap::new();
751 set_header(&mut headers, ETAG, &etag);
752 set_header(&mut headers, LOCATION, &stored.arn);
753 Ok(xml_response(StatusCode::CREATED, body, headers))
754 }
755
756 fn schedule_distribution_deploy(&self, id: String) {
761 let state = Arc::clone(&self.state);
762 let delay = self.propagation_delay;
763 let store = self.snapshot_store.clone();
764 let lock = self.snapshot_lock.clone();
765 tokio::spawn(async move {
766 tokio::time::sleep(delay).await;
767 let flipped = {
768 let mut s = state.write();
769 let mut flipped = false;
770 for account in s.accounts.values_mut() {
771 if let Some(d) = account.distributions.get_mut(&id) {
772 if d.status == "InProgress" {
773 d.status = "Deployed".to_string();
774 flipped = true;
775 }
776 break;
777 }
778 }
779 flipped
780 };
781 if flipped {
782 save_cloudfront_snapshot(&state, store, &lock).await;
783 }
784 });
785 }
786
787 pub(crate) fn schedule_distribution_tenant_deploy(&self, id: String) {
789 let state = Arc::clone(&self.state);
790 let delay = self.propagation_delay;
791 let store = self.snapshot_store.clone();
792 let lock = self.snapshot_lock.clone();
793 tokio::spawn(async move {
794 tokio::time::sleep(delay).await;
795 let flipped = {
796 let mut s = state.write();
797 let mut flipped = false;
798 for account in s.accounts.values_mut() {
799 if let Some(t) = account.distribution_tenants.get_mut(&id) {
800 if t.status == "InProgress" {
801 t.status = "Deployed".to_string();
802 flipped = true;
803 }
804 break;
805 }
806 }
807 flipped
808 };
809 if flipped {
810 save_cloudfront_snapshot(&state, store, &lock).await;
811 }
812 });
813 }
814
815 pub(crate) fn schedule_connection_group_deploy(&self, id: String) {
817 let state = Arc::clone(&self.state);
818 let delay = self.propagation_delay;
819 let store = self.snapshot_store.clone();
820 let lock = self.snapshot_lock.clone();
821 tokio::spawn(async move {
822 tokio::time::sleep(delay).await;
823 let flipped = {
824 let mut s = state.write();
825 let mut flipped = false;
826 for account in s.accounts.values_mut() {
827 if let Some(g) = account.connection_groups.get_mut(&id) {
828 if g.status == "InProgress" {
829 g.status = "Deployed".to_string();
830 flipped = true;
831 }
832 break;
833 }
834 }
835 flipped
836 };
837 if flipped {
838 save_cloudfront_snapshot(&state, store, &lock).await;
839 }
840 });
841 }
842
843 pub(crate) fn schedule_streaming_distribution_deploy(&self, id: String) {
848 let state = Arc::clone(&self.state);
849 let delay = self.propagation_delay;
850 let store = self.snapshot_store.clone();
851 let lock = self.snapshot_lock.clone();
852 tokio::spawn(async move {
853 tokio::time::sleep(delay).await;
854 let flipped = {
855 let mut s = state.write();
856 let mut flipped = false;
857 for account in s.accounts.values_mut() {
858 if let Some(d) = account.streaming_distributions.get_mut(&id) {
859 if d.status == "InProgress" {
860 d.status = "Deployed".to_string();
861 flipped = true;
862 }
863 break;
864 }
865 }
866 flipped
867 };
868 if flipped {
869 save_cloudfront_snapshot(&state, store, &lock).await;
870 }
871 });
872 }
873
874 fn get_distribution(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
875 let id = route
876 .id
877 .as_deref()
878 .ok_or_else(|| invalid_argument("missing distribution id"))?;
879 let state = self.state.read();
880 let account = state
881 .accounts
882 .get(DEFAULT_ACCOUNT)
883 .ok_or_else(|| no_such_distribution(id))?;
884 let dist = account
885 .distributions
886 .get(id)
887 .ok_or_else(|| no_such_distribution(id))?
888 .clone();
889 drop(state);
890 let body = build_distribution_xml(&dist);
891 let mut headers = HeaderMap::new();
892 set_header(&mut headers, ETAG, &dist.etag);
893 Ok(xml_response(StatusCode::OK, body, headers))
894 }
895
896 fn get_distribution_config(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
897 let id = route
898 .id
899 .as_deref()
900 .ok_or_else(|| invalid_argument("missing distribution id"))?;
901 let state = self.state.read();
902 let account = state
903 .accounts
904 .get(DEFAULT_ACCOUNT)
905 .ok_or_else(|| no_such_distribution(id))?;
906 let dist = account
907 .distributions
908 .get(id)
909 .ok_or_else(|| no_such_distribution(id))?
910 .clone();
911 drop(state);
912 let body = xml_io::to_xml_root("DistributionConfig", &dist.config)
913 .map_err(|e| internal_error(format!("xml encode failed: {e}")))?;
914 let mut headers = HeaderMap::new();
915 set_header(&mut headers, ETAG, &dist.etag);
916 Ok(xml_response(StatusCode::OK, body, headers))
917 }
918
919 fn update_distribution(
920 &self,
921 req: &AwsRequest,
922 route: &Route,
923 ) -> Result<AwsResponse, AwsServiceError> {
924 let id = route
925 .id
926 .as_deref()
927 .ok_or_else(|| invalid_argument("missing distribution id"))?;
928 let if_match = req
929 .headers
930 .get(IF_MATCH)
931 .and_then(|v| v.to_str().ok())
932 .ok_or_else(|| {
933 aws_error(
934 StatusCode::BAD_REQUEST,
935 "InvalidIfMatchVersion",
936 "Missing If-Match header for UpdateDistribution",
937 )
938 })?
939 .to_string();
940 let new_config: DistributionConfig = xml_io::from_xml_root(&req.body)
941 .map_err(|e| invalid_argument(format!("invalid DistributionConfig XML: {e}")))?;
942 validate_caller_reference(&new_config.caller_reference)?;
943 validate_origins(&new_config)?;
944
945 let mut state = self.state.write();
946 let account = state
947 .accounts
948 .get_mut(DEFAULT_ACCOUNT)
949 .ok_or_else(|| no_such_distribution(id))?;
950 let dist = account
951 .distributions
952 .get_mut(id)
953 .ok_or_else(|| no_such_distribution(id))?;
954 if dist.etag != if_match {
955 return Err(aws_error(
956 StatusCode::PRECONDITION_FAILED,
957 "PreconditionFailed",
958 "If-Match header does not match the current ETag",
959 ));
960 }
961 let config_changed = !configs_equal(&dist.config, &new_config);
966 if config_changed {
967 dist.config = new_config;
968 dist.etag = generate_etag();
969 dist.last_modified_time = Utc::now();
970 dist.status = "InProgress".to_string();
974 }
975 let snapshot = dist.clone();
976 drop(state);
977
978 if config_changed {
979 self.schedule_distribution_deploy(id.to_string());
980 }
981
982 let body = build_distribution_xml(&snapshot);
983 let mut headers = HeaderMap::new();
984 set_header(&mut headers, ETAG, &snapshot.etag);
985 Ok(xml_response(StatusCode::OK, body, headers))
986 }
987
988 fn delete_distribution(
989 &self,
990 req: &AwsRequest,
991 route: &Route,
992 ) -> Result<AwsResponse, AwsServiceError> {
993 let id = route
994 .id
995 .as_deref()
996 .ok_or_else(|| invalid_argument("missing distribution id"))?;
997 let if_match = req
998 .headers
999 .get(IF_MATCH)
1000 .and_then(|v| v.to_str().ok())
1001 .ok_or_else(|| {
1002 aws_error(
1003 StatusCode::BAD_REQUEST,
1004 "InvalidIfMatchVersion",
1005 "Missing If-Match header for DeleteDistribution",
1006 )
1007 })?
1008 .to_string();
1009 let mut state = self.state.write();
1010 let account = state
1011 .accounts
1012 .get_mut(DEFAULT_ACCOUNT)
1013 .ok_or_else(|| no_such_distribution(id))?;
1014 {
1015 let dist = account
1016 .distributions
1017 .get(id)
1018 .ok_or_else(|| no_such_distribution(id))?;
1019 if dist.etag != if_match {
1020 return Err(aws_error(
1021 StatusCode::PRECONDITION_FAILED,
1022 "PreconditionFailed",
1023 "If-Match header does not match the current ETag",
1024 ));
1025 }
1026 if dist.config.enabled {
1027 return Err(aws_error(
1028 StatusCode::PRECONDITION_FAILED,
1029 "DistributionNotDisabled",
1030 "Distribution must be disabled before delete",
1031 ));
1032 }
1033 }
1034 let removed = account.distributions.remove(id).unwrap();
1035 account.tags.remove(&removed.arn);
1036 Ok(empty_response(StatusCode::NO_CONTENT))
1037 }
1038
1039 fn list_distributions(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1040 let state = self.state.read();
1041 let mut dists: Vec<StoredDistribution> = state
1042 .accounts
1043 .values()
1044 .flat_map(|a| a.distributions.values().cloned())
1045 .collect();
1046 dists.sort_by_key(|a| a.last_modified_time);
1047 drop(state);
1048 let body = build_distribution_list_xml(&dists, "DistributionList");
1049 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1050 }
1051
1052 fn list_distributions_by(
1053 &self,
1054 req: &AwsRequest,
1055 route: &Route,
1056 action: &str,
1057 ) -> Result<AwsResponse, AwsServiceError> {
1058 match action {
1065 "ListDistributionsByCachePolicyId"
1069 | "ListDistributionsByOriginRequestPolicyId"
1070 | "ListDistributionsByResponseHeadersPolicyId"
1071 | "ListDistributionsByKeyGroup"
1072 | "ListDistributionsByWebACLId"
1073 | "ListDistributionsByVpcOriginId"
1074 | "ListDistributionsByAnycastIpListId"
1075 | "ListDistributionsByOwnedResource" => {
1076 let id = route.id.as_deref().unwrap_or("");
1077 if is_placeholder_label(id) {
1078 return Err(invalid_argument(format!(
1079 "Required URL identifier for {action} is missing or invalid"
1080 )));
1081 }
1082 }
1083 "ListDistributionsByConnectionMode" => {
1084 let id = route.id.as_deref().unwrap_or("");
1085 if is_placeholder_label(id) {
1086 return Err(invalid_argument(
1087 "ConnectionMode is required for ListDistributionsByConnectionMode",
1088 ));
1089 }
1090 if id != "direct" && id != "tenant-only" {
1091 return Err(invalid_argument(format!(
1092 "ConnectionMode must be 'direct' or 'tenant-only', got '{id}'"
1093 )));
1094 }
1095 }
1096 "ListDistributionsByConnectionFunction"
1097 if parse_query_value(&req.raw_query, "ConnectionFunctionIdentifier").is_none() =>
1098 {
1099 return Err(invalid_argument(
1100 "ConnectionFunctionIdentifier query parameter is required",
1101 ));
1102 }
1103 "ListDistributionsByTrustStore"
1104 if parse_query_value(&req.raw_query, "TrustStoreIdentifier").is_none() =>
1105 {
1106 return Err(invalid_argument(
1107 "TrustStoreIdentifier query parameter is required",
1108 ));
1109 }
1110 _ => {}
1111 }
1112
1113 let root = match action {
1118 "ListDistributionsByCachePolicyId"
1119 | "ListDistributionsByOriginRequestPolicyId"
1120 | "ListDistributionsByResponseHeadersPolicyId"
1121 | "ListDistributionsByKeyGroup"
1122 | "ListDistributionsByVpcOriginId" => "DistributionIdList",
1123 "ListDistributionsByOwnedResource" => "DistributionIdOwnerList",
1124 _ => "DistributionList",
1125 };
1126 let body = build_empty_distribution_id_list(root);
1127 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1128 }
1129
1130 fn copy_distribution(
1131 &self,
1132 req: &AwsRequest,
1133 route: &Route,
1134 ) -> Result<AwsResponse, AwsServiceError> {
1135 let primary_id = route
1136 .id
1137 .as_deref()
1138 .ok_or_else(|| invalid_argument("missing primary distribution id"))?;
1139 let if_match = req
1140 .headers
1141 .get(IF_MATCH)
1142 .and_then(|v| v.to_str().ok())
1143 .ok_or_else(|| {
1144 aws_error(
1145 StatusCode::BAD_REQUEST,
1146 "InvalidIfMatchVersion",
1147 "Missing If-Match header for CopyDistribution",
1148 )
1149 })?
1150 .to_string();
1151 let parsed: CopyDistributionRequest = xml_io::from_xml_root(&req.body)
1152 .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
1153 validate_caller_reference(&parsed.caller_reference)?;
1154 let mut state = self.state.write();
1155 let account = state
1156 .accounts
1157 .get_mut(DEFAULT_ACCOUNT)
1158 .ok_or_else(|| no_such_distribution(primary_id))?;
1159 let primary = account
1160 .distributions
1161 .get(primary_id)
1162 .ok_or_else(|| no_such_distribution(primary_id))?
1163 .clone();
1164 if primary.etag != if_match {
1165 return Err(aws_error(
1166 StatusCode::PRECONDITION_FAILED,
1167 "PreconditionFailed",
1168 "If-Match header does not match the current ETag",
1169 ));
1170 }
1171 if account
1172 .distributions
1173 .values()
1174 .any(|d| d.config.caller_reference == parsed.caller_reference)
1175 {
1176 return Err(aws_error(
1177 StatusCode::CONFLICT,
1178 "DistributionAlreadyExists",
1179 "Distribution with the same CallerReference exists",
1180 ));
1181 }
1182 let new_id = generate_distribution_id();
1183 let mut config = primary.config.clone();
1184 config.caller_reference = parsed.caller_reference;
1185 config.enabled = parsed.enabled.unwrap_or(false);
1186 config.staging = parsed.staging;
1187 let now = Utc::now();
1188 let etag = generate_etag();
1189 let arn = format!(
1190 "arn:aws:cloudfront::{}:distribution/{}",
1191 account_id(req),
1192 new_id
1193 );
1194 let stored = StoredDistribution {
1195 id: new_id.clone(),
1196 arn: arn.clone(),
1197 status: "InProgress".to_string(),
1198 last_modified_time: now,
1199 domain_name: format!("{}.cloudfront.net", new_id.to_lowercase()),
1200 in_progress_invalidation_batches: 0,
1201 etag: etag.clone(),
1202 config,
1203 };
1204 account.distributions.insert(new_id.clone(), stored.clone());
1205 drop(state);
1206 self.schedule_distribution_deploy(new_id);
1207 let body = build_distribution_xml(&stored);
1208 let mut headers = HeaderMap::new();
1209 set_header(&mut headers, ETAG, &etag);
1210 set_header(&mut headers, LOCATION, &stored.arn);
1211 Ok(xml_response(StatusCode::CREATED, body, headers))
1212 }
1213}
1214
1215#[derive(Debug, serde::Deserialize, Default)]
1216#[serde(rename_all = "PascalCase")]
1217struct CopyDistributionRequest {
1218 caller_reference: String,
1219 #[serde(default)]
1220 enabled: Option<bool>,
1221 #[serde(default)]
1222 staging: Option<bool>,
1223}
1224
1225impl CloudFrontService {
1228 fn create_invalidation(
1229 &self,
1230 req: &AwsRequest,
1231 route: &Route,
1232 ) -> Result<AwsResponse, AwsServiceError> {
1233 let dist_id = route
1234 .id
1235 .as_deref()
1236 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1237 let batch: InvalidationBatch = xml_io::from_xml_root(&req.body)
1238 .map_err(|e| invalid_argument(format!("invalid InvalidationBatch XML: {e}")))?;
1239 if batch.caller_reference.is_empty() {
1240 return Err(invalid_argument("CallerReference is required"));
1241 }
1242 if batch.paths.quantity < 1 {
1243 return Err(invalid_argument(
1244 "InvalidationBatch.Paths must be non-empty",
1245 ));
1246 }
1247 let mut state = self.state.write();
1248 let account = state.entry(DEFAULT_ACCOUNT);
1249 if !account.distributions.contains_key(dist_id) {
1250 return Err(no_such_distribution(dist_id));
1251 }
1252 let id = generate_invalidation_id();
1253 let stored = StoredInvalidation {
1254 id: id.clone(),
1255 distribution_id: dist_id.to_string(),
1256 status: "Completed".to_string(),
1257 create_time: Utc::now(),
1258 batch: batch.clone(),
1259 };
1260 account.invalidations.insert(id.clone(), stored.clone());
1261 drop(state);
1262 let body = build_invalidation_xml(&stored);
1263 let mut headers = HeaderMap::new();
1264 set_header(
1265 &mut headers,
1266 LOCATION,
1267 &format!(
1268 "/2020-05-31/distribution/{dist_id}/invalidation/{}",
1269 stored.id
1270 ),
1271 );
1272 Ok(xml_response(StatusCode::CREATED, body, headers))
1273 }
1274
1275 fn get_invalidation(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
1276 let dist_id = route
1277 .id
1278 .as_deref()
1279 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1280 let inv_id = route
1281 .second_id
1282 .as_deref()
1283 .ok_or_else(|| invalid_argument("missing invalidation id"))?;
1284 let state = self.state.read();
1285 let account = state
1286 .accounts
1287 .get(DEFAULT_ACCOUNT)
1288 .ok_or_else(|| no_such_invalidation(inv_id))?;
1289 if !account.distributions.contains_key(dist_id) {
1290 return Err(no_such_distribution(dist_id));
1291 }
1292 let inv = account
1293 .invalidations
1294 .get(inv_id)
1295 .filter(|i| i.distribution_id == dist_id)
1296 .ok_or_else(|| no_such_invalidation(inv_id))?
1297 .clone();
1298 drop(state);
1299 let body = build_invalidation_xml(&inv);
1300 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1301 }
1302
1303 fn list_invalidations(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
1304 let dist_id = route
1305 .id
1306 .as_deref()
1307 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1308 let state = self.state.read();
1309 let account = state
1310 .accounts
1311 .get(DEFAULT_ACCOUNT)
1312 .ok_or_else(|| no_such_distribution(dist_id))?;
1313 if !account.distributions.contains_key(dist_id) {
1314 return Err(no_such_distribution(dist_id));
1315 }
1316 let mut items: Vec<&StoredInvalidation> = account
1317 .invalidations
1318 .values()
1319 .filter(|i| i.distribution_id == dist_id)
1320 .collect();
1321 items.sort_by_key(|a| a.create_time);
1322 let body = build_invalidation_list_xml(&items);
1323 drop(state);
1324 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1325 }
1326}
1327
1328impl CloudFrontService {
1331 fn parse_arn_query(query: &str) -> Option<String> {
1332 for pair in query.split('&').filter(|p| !p.is_empty()) {
1333 if let Some(rest) = pair.strip_prefix("Resource=") {
1334 return Some(percent_decode(rest));
1335 }
1336 }
1337 None
1338 }
1339
1340 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1341 let arn = Self::parse_arn_query(&req.raw_query)
1342 .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1343 let parsed: ModelTags = xml_io::from_xml_root(&req.body)
1344 .map_err(|e| invalid_argument(format!("invalid Tags XML: {e}")))?;
1345 let new_tags: Vec<Tag> = parsed
1346 .items
1347 .map(|i| {
1348 i.tag
1349 .into_iter()
1350 .map(|t| Tag {
1351 key: t.key,
1352 value: t.value,
1353 })
1354 .collect()
1355 })
1356 .unwrap_or_default();
1357 let mut state = self.state.write();
1358 let account = state.entry(DEFAULT_ACCOUNT);
1359 let entry = account.tags.entry(arn).or_default();
1360 for tag in new_tags {
1361 if let Some(existing) = entry.iter_mut().find(|t| t.key == tag.key) {
1362 existing.value = tag.value;
1363 } else {
1364 entry.push(tag);
1365 }
1366 }
1367 Ok(empty_response(StatusCode::NO_CONTENT))
1368 }
1369
1370 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1371 let arn = Self::parse_arn_query(&req.raw_query)
1372 .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1373 let parsed: TagKeys = xml_io::from_xml_root(&req.body)
1374 .map_err(|e| invalid_argument(format!("invalid TagKeys XML: {e}")))?;
1375 let keys: Vec<String> = parsed.items.map(|k| k.key).unwrap_or_default();
1376 let mut state = self.state.write();
1377 let account = state.entry(DEFAULT_ACCOUNT);
1378 if let Some(existing) = account.tags.get_mut(&arn) {
1379 existing.retain(|t| !keys.contains(&t.key));
1380 }
1381 Ok(empty_response(StatusCode::NO_CONTENT))
1382 }
1383
1384 fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1385 let arn = Self::parse_arn_query(&req.raw_query)
1386 .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1387 let state = self.state.read();
1388 let tags = state
1389 .accounts
1390 .get(DEFAULT_ACCOUNT)
1391 .and_then(|a| a.tags.get(&arn))
1392 .cloned()
1393 .unwrap_or_default();
1394 drop(state);
1395 let body = build_tags_xml(&tags);
1396 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1397 }
1398}
1399
1400impl CloudFrontService {
1403 fn associate_alias(
1404 &self,
1405 req: &AwsRequest,
1406 route: &Route,
1407 ) -> Result<AwsResponse, AwsServiceError> {
1408 let id = route
1409 .id
1410 .as_deref()
1411 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1412 let alias = parse_query_value(&req.raw_query, "Alias")
1413 .ok_or_else(|| invalid_argument("Alias query parameter is required"))?;
1414 let mut state = self.state.write();
1415 let account = state
1416 .accounts
1417 .get_mut(DEFAULT_ACCOUNT)
1418 .ok_or_else(|| no_such_distribution(id))?;
1419 if let Some(other) = account.distributions.values().find(|d| {
1421 d.id != id
1422 && d.config
1423 .aliases
1424 .as_ref()
1425 .and_then(|a| a.items.as_ref())
1426 .is_some_and(|i| i.cname.iter().any(|c| c == &alias))
1427 }) {
1428 return Err(aws_error(
1429 StatusCode::CONFLICT,
1430 "CNAMEAlreadyExists",
1431 format!(
1432 "Alias {alias} is already associated with distribution {}",
1433 other.id
1434 ),
1435 ));
1436 }
1437 let dist = account
1438 .distributions
1439 .get_mut(id)
1440 .ok_or_else(|| no_such_distribution(id))?;
1441 let aliases = dist.config.aliases.get_or_insert_with(Default::default);
1442 let items = aliases
1443 .items
1444 .get_or_insert_with(crate::model::AliasItems::default);
1445 if !items.cname.iter().any(|c| c == &alias) {
1446 items.cname.push(alias.clone());
1447 aliases.quantity = items.cname.len() as i32;
1448 }
1449 dist.etag = generate_etag();
1450 dist.last_modified_time = Utc::now();
1451 Ok(empty_response(StatusCode::OK))
1452 }
1453
1454 fn list_conflicting_aliases(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1455 let alias = parse_query_value(&req.raw_query, "Alias")
1456 .ok_or_else(|| invalid_argument("Alias query parameter is required"))?;
1457 let dist_id = parse_query_value(&req.raw_query, "DistributionId")
1458 .ok_or_else(|| invalid_argument("DistributionId query parameter is required"))?;
1459 if alias.len() > 253 {
1464 return Err(invalid_argument(format!(
1465 "Alias length {} exceeds maximum 253",
1466 alias.len()
1467 )));
1468 }
1469 if dist_id.len() > 25 {
1470 return Err(invalid_argument(format!(
1471 "DistributionId length {} exceeds maximum 25",
1472 dist_id.len()
1473 )));
1474 }
1475 if let Some(max_items) = parse_query_value(&req.raw_query, "MaxItems") {
1476 let n: i64 = max_items.parse().map_err(|_| {
1477 invalid_argument(format!("MaxItems must be an integer, got '{max_items}'"))
1478 })?;
1479 if n > 100 {
1480 return Err(invalid_argument(format!(
1481 "MaxItems {n} exceeds maximum 100"
1482 )));
1483 }
1484 }
1485 let body = format!(
1488 "{XML_DECL}<ConflictingAliasesList xmlns=\"{NS}\"><Quantity>0</Quantity></ConflictingAliasesList>",
1489 NS = crate::NAMESPACE,
1490 XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1491 );
1492 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1493 }
1494
1495 fn associate_web_acl(
1496 &self,
1497 req: &AwsRequest,
1498 route: &Route,
1499 ) -> Result<AwsResponse, AwsServiceError> {
1500 let id = route
1501 .id
1502 .as_deref()
1503 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1504 let parsed: AssociateAliasRequest = xml_io::from_xml_root(&req.body)
1505 .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
1506 let mut state = self.state.write();
1507 let account = state
1508 .accounts
1509 .get_mut(DEFAULT_ACCOUNT)
1510 .ok_or_else(|| no_such_distribution(id))?;
1511 let dist = account
1512 .distributions
1513 .get_mut(id)
1514 .ok_or_else(|| no_such_distribution(id))?;
1515 dist.config.web_acl_id = Some(parsed.web_acl_arn.clone());
1516 dist.etag = generate_etag();
1517 dist.last_modified_time = Utc::now();
1518 let body = format!(
1519 "{XML_DECL}<AssociateDistributionWebACLResult xmlns=\"{NS}\"><Id>{}</Id><WebACLArn>{}</WebACLArn></AssociateDistributionWebACLResult>",
1520 esc(id), esc(&parsed.web_acl_arn),
1521 NS = crate::NAMESPACE,
1522 XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1523 );
1524 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1525 }
1526
1527 fn disassociate_web_acl(
1528 &self,
1529 _req: &AwsRequest,
1530 route: &Route,
1531 ) -> Result<AwsResponse, AwsServiceError> {
1532 let id = route
1533 .id
1534 .as_deref()
1535 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1536 let entity_not_found = || {
1539 aws_error(
1540 StatusCode::NOT_FOUND,
1541 "EntityNotFound",
1542 format!("The specified distribution does not exist: {id}"),
1543 )
1544 };
1545 let mut state = self.state.write();
1546 let account = state
1547 .accounts
1548 .get_mut(DEFAULT_ACCOUNT)
1549 .ok_or_else(entity_not_found)?;
1550 let dist = account
1551 .distributions
1552 .get_mut(id)
1553 .ok_or_else(entity_not_found)?;
1554 dist.config.web_acl_id = None;
1555 dist.etag = generate_etag();
1556 dist.last_modified_time = Utc::now();
1557 let body = format!(
1558 "{XML_DECL}<DisassociateDistributionWebACLResult xmlns=\"{NS}\"><Id>{}</Id></DisassociateDistributionWebACLResult>",
1559 esc(id),
1560 NS = crate::NAMESPACE,
1561 XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1562 );
1563 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1564 }
1565}
1566
1567#[derive(serde::Deserialize, Default, Debug)]
1568#[serde(rename_all = "PascalCase")]
1569struct AssociateAliasRequest {
1570 #[serde(rename = "WebACLArn", default)]
1571 web_acl_arn: String,
1572}
1573
1574pub(crate) fn esc(s: &str) -> String {
1582 let mut out = String::with_capacity(s.len());
1583 for c in s.chars() {
1584 match c {
1585 '&' => out.push_str("&"),
1586 '<' => out.push_str("<"),
1587 '>' => out.push_str(">"),
1588 '"' => out.push_str("""),
1589 '\'' => out.push_str("'"),
1590 _ => out.push(c),
1591 }
1592 }
1593 out
1594}
1595
1596pub(crate) fn build_distribution_xml(dist: &StoredDistribution) -> String {
1597 let mut out = String::with_capacity(2048);
1598 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1599 out.push_str(&format!(
1600 "<Distribution xmlns=\"{ns}\">",
1601 ns = crate::NAMESPACE
1602 ));
1603 out.push_str(&format!("<Id>{}</Id>", esc(&dist.id)));
1604 out.push_str(&format!("<ARN>{}</ARN>", esc(&dist.arn)));
1605 out.push_str(&format!("<Status>{}</Status>", esc(&dist.status)));
1606 out.push_str(&format!(
1607 "<LastModifiedTime>{}</LastModifiedTime>",
1608 rfc3339(&dist.last_modified_time)
1609 ));
1610 out.push_str(&format!(
1611 "<InProgressInvalidationBatches>{}</InProgressInvalidationBatches>",
1612 dist.in_progress_invalidation_batches
1613 ));
1614 out.push_str(&format!(
1615 "<DomainName>{}</DomainName>",
1616 esc(&dist.domain_name)
1617 ));
1618 out.push_str("<ActiveTrustedSigners><Enabled>false</Enabled><Quantity>0</Quantity></ActiveTrustedSigners>");
1619 out.push_str("<ActiveTrustedKeyGroups><Enabled>false</Enabled><Quantity>0</Quantity></ActiveTrustedKeyGroups>");
1620 let inner = quick_xml::se::to_string_with_root("DistributionConfig", &dist.config)
1621 .unwrap_or_else(|_| String::new());
1622 out.push_str(&inner);
1623 out.push_str("</Distribution>");
1624 out
1625}
1626
1627fn build_distribution_list_xml(dists: &[StoredDistribution], root: &str) -> String {
1628 let mut out = String::with_capacity(2048);
1629 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1630 out.push_str(&format!("<{root} xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1631 out.push_str("<Marker></Marker>");
1632 out.push_str(&format!("<MaxItems>{}</MaxItems>", dists.len().max(100)));
1633 out.push_str("<IsTruncated>false</IsTruncated>");
1634 out.push_str(&format!("<Quantity>{}</Quantity>", dists.len()));
1635 if dists.is_empty() {
1636 out.push_str(&format!("</{root}>"));
1637 return out;
1638 }
1639 out.push_str("<Items>");
1640 for d in dists {
1641 out.push_str("<DistributionSummary>");
1642 out.push_str(&format!("<Id>{}</Id>", esc(&d.id)));
1643 out.push_str(&format!("<ARN>{}</ARN>", esc(&d.arn)));
1644 out.push_str(&format!("<Status>{}</Status>", esc(&d.status)));
1645 out.push_str(&format!(
1646 "<LastModifiedTime>{}</LastModifiedTime>",
1647 rfc3339(&d.last_modified_time)
1648 ));
1649 out.push_str(&format!("<DomainName>{}</DomainName>", esc(&d.domain_name)));
1650 let aliases = d.config.aliases.clone().unwrap_or_default();
1651 out.push_str(&render_inline("Aliases", &aliases));
1652 let origins = d.config.origins.clone();
1653 out.push_str(&render_inline("Origins", &origins));
1654 let dcb = d.config.default_cache_behavior.clone();
1655 out.push_str(&render_inline("DefaultCacheBehavior", &dcb));
1656 let cb = d.config.cache_behaviors.clone().unwrap_or_default();
1657 out.push_str(&render_inline("CacheBehaviors", &cb));
1658 let cer = d.config.custom_error_responses.clone().unwrap_or_default();
1659 out.push_str(&render_inline("CustomErrorResponses", &cer));
1660 out.push_str(&format!("<Comment>{}</Comment>", esc(&d.config.comment)));
1661 out.push_str(&format!(
1662 "<PriceClass>{}</PriceClass>",
1663 esc(&d
1664 .config
1665 .price_class
1666 .clone()
1667 .unwrap_or_else(|| "PriceClass_All".to_string()))
1668 ));
1669 out.push_str(&format!("<Enabled>{}</Enabled>", d.config.enabled));
1670 out.push_str(&render_inline(
1671 "ViewerCertificate",
1672 &d.config.viewer_certificate.clone().unwrap_or_default(),
1673 ));
1674 out.push_str(&render_inline(
1675 "Restrictions",
1676 &d.config.restrictions.clone().unwrap_or_default(),
1677 ));
1678 out.push_str(&format!(
1679 "<WebACLId>{}</WebACLId>",
1680 esc(&d.config.web_acl_id.clone().unwrap_or_default())
1681 ));
1682 out.push_str(&format!(
1683 "<HttpVersion>{}</HttpVersion>",
1684 esc(&d
1685 .config
1686 .http_version
1687 .clone()
1688 .unwrap_or_else(|| "http2".to_string()))
1689 ));
1690 out.push_str(&format!(
1691 "<IsIPV6Enabled>{}</IsIPV6Enabled>",
1692 d.config.is_ipv6_enabled.unwrap_or(true)
1693 ));
1694 out.push_str(&format!(
1695 "<Staging>{}</Staging>",
1696 d.config.staging.unwrap_or(false)
1697 ));
1698 out.push_str("</DistributionSummary>");
1699 }
1700 out.push_str("</Items>");
1701 out.push_str(&format!("</{root}>"));
1702 out
1703}
1704
1705fn build_empty_distribution_id_list(root: &str) -> String {
1706 let mut out = String::with_capacity(256);
1707 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1708 out.push_str(&format!("<{root} xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1709 out.push_str("<Marker></Marker>");
1710 out.push_str("<MaxItems>100</MaxItems>");
1711 out.push_str("<IsTruncated>false</IsTruncated>");
1712 out.push_str("<Quantity>0</Quantity>");
1713 out.push_str(&format!("</{root}>"));
1714 out
1715}
1716
1717fn build_invalidation_xml(inv: &StoredInvalidation) -> String {
1718 let mut out = String::with_capacity(512);
1719 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1720 out.push_str(&format!(
1721 "<Invalidation xmlns=\"{ns}\">",
1722 ns = crate::NAMESPACE
1723 ));
1724 out.push_str(&format!("<Id>{}</Id>", esc(&inv.id)));
1725 out.push_str(&format!("<Status>{}</Status>", esc(&inv.status)));
1726 out.push_str(&format!(
1727 "<CreateTime>{}</CreateTime>",
1728 rfc3339(&inv.create_time)
1729 ));
1730 out.push_str(&render_inline("InvalidationBatch", &inv.batch));
1731 out.push_str("</Invalidation>");
1732 out
1733}
1734
1735fn build_invalidation_list_xml(items: &[&StoredInvalidation]) -> String {
1736 let mut out = String::with_capacity(1024);
1737 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1738 out.push_str(&format!(
1739 "<InvalidationList xmlns=\"{ns}\">",
1740 ns = crate::NAMESPACE
1741 ));
1742 out.push_str("<Marker></Marker>");
1743 out.push_str("<MaxItems>100</MaxItems>");
1744 out.push_str("<IsTruncated>false</IsTruncated>");
1745 out.push_str(&format!("<Quantity>{}</Quantity>", items.len()));
1746 if !items.is_empty() {
1747 out.push_str("<Items>");
1748 for inv in items {
1749 out.push_str("<InvalidationSummary>");
1750 out.push_str(&format!("<Id>{}</Id>", esc(&inv.id)));
1751 out.push_str(&format!(
1752 "<CreateTime>{}</CreateTime>",
1753 rfc3339(&inv.create_time)
1754 ));
1755 out.push_str(&format!("<Status>{}</Status>", esc(&inv.status)));
1756 out.push_str("</InvalidationSummary>");
1757 }
1758 out.push_str("</Items>");
1759 }
1760 out.push_str("</InvalidationList>");
1761 out
1762}
1763
1764fn build_tags_xml(tags: &[Tag]) -> String {
1765 let mut out = String::with_capacity(256);
1766 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1767 out.push_str(&format!("<Tags xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1768 out.push_str("<Items>");
1769 for t in tags {
1770 out.push_str("<Tag>");
1771 out.push_str(&format!("<Key>{}</Key>", esc(&t.key)));
1772 if let Some(v) = &t.value {
1773 out.push_str(&format!("<Value>{}</Value>", esc(v)));
1774 }
1775 out.push_str("</Tag>");
1776 }
1777 out.push_str("</Items>");
1778 out.push_str("</Tags>");
1779 out
1780}
1781
1782fn render_inline<T: serde::Serialize>(root: &str, value: &T) -> String {
1783 quick_xml::se::to_string_with_root(root, value).unwrap_or_default()
1784}
1785
1786fn validate_caller_reference(s: &str) -> Result<(), AwsServiceError> {
1789 if s.is_empty() {
1790 return Err(invalid_argument("CallerReference is required"));
1791 }
1792 Ok(())
1793}
1794
1795fn validate_origins(config: &DistributionConfig) -> Result<(), AwsServiceError> {
1796 if config.origins.quantity < 1 {
1797 return Err(invalid_argument(
1798 "DistributionConfig.Origins must contain at least one origin",
1799 ));
1800 }
1801 Ok(())
1802}
1803
1804fn configs_equal(lhs: &DistributionConfig, rhs: &DistributionConfig) -> bool {
1810 let Ok(a) = xml_io::to_xml_root("DistributionConfig", lhs) else {
1811 return false;
1812 };
1813 let Ok(b) = xml_io::to_xml_root("DistributionConfig", rhs) else {
1814 return false;
1815 };
1816 a == b
1817}
1818
1819fn account_id(_req: &AwsRequest) -> &'static str {
1820 DEFAULT_ACCOUNT
1825}
1826
1827fn generate_distribution_id() -> String {
1828 let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1830 format!("E{}", &raw[..13])
1831}
1832
1833fn generate_invalidation_id() -> String {
1834 let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1835 format!("I{}", &raw[..13])
1836}
1837
1838pub(crate) fn generate_etag() -> String {
1839 let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1840 format!("E{}", &raw[..13])
1841}
1842
1843pub(crate) fn generate_id_with_prefix(prefix: &str) -> String {
1848 let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1849 let suffix_len = 14usize.saturating_sub(prefix.len()).min(raw.len());
1850 format!("{prefix}{}", &raw[..suffix_len])
1851}
1852
1853fn rfc3339(t: &chrono::DateTime<chrono::Utc>) -> String {
1854 t.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
1855}
1856
1857pub(crate) fn invalid_argument(msg: impl Into<String>) -> AwsServiceError {
1858 aws_error(StatusCode::BAD_REQUEST, "InvalidArgument", msg)
1859}
1860
1861fn no_such_distribution(id: &str) -> AwsServiceError {
1862 aws_error(
1863 StatusCode::NOT_FOUND,
1864 "NoSuchDistribution",
1865 format!("The specified distribution does not exist: {id}"),
1866 )
1867}
1868
1869fn no_such_invalidation(id: &str) -> AwsServiceError {
1870 aws_error(
1871 StatusCode::NOT_FOUND,
1872 "NoSuchInvalidation",
1873 format!("The specified invalidation does not exist: {id}"),
1874 )
1875}
1876
1877fn internal_error(msg: impl Into<String>) -> AwsServiceError {
1878 aws_error(StatusCode::INTERNAL_SERVER_ERROR, "InternalError", msg)
1879}
1880
1881pub(crate) fn aws_error(
1882 status: StatusCode,
1883 code: impl Into<String>,
1884 msg: impl Into<String>,
1885) -> AwsServiceError {
1886 AwsServiceError::aws_error(status, code.into(), msg)
1887}
1888
1889fn set_header(headers: &mut HeaderMap, name: HeaderName, value: &str) {
1890 if let Ok(v) = HeaderValue::from_str(value) {
1891 headers.insert(name, v);
1892 }
1893}
1894
1895pub(crate) fn xml_response(status: StatusCode, body: String, headers: HeaderMap) -> AwsResponse {
1896 AwsResponse {
1897 status,
1898 content_type: "text/xml".to_string(),
1899 body: ResponseBody::Bytes(Bytes::from(body)),
1900 headers,
1901 }
1902}
1903
1904fn empty_response(status: StatusCode) -> AwsResponse {
1905 AwsResponse {
1906 status,
1907 content_type: "text/xml".to_string(),
1908 body: ResponseBody::Bytes(Bytes::new()),
1909 headers: HeaderMap::new(),
1910 }
1911}
1912
1913fn percent_decode(input: &str) -> String {
1914 let mut out = String::with_capacity(input.len());
1915 let bytes = input.as_bytes();
1916 let mut i = 0;
1917 while i < bytes.len() {
1918 let b = bytes[i];
1919 if b == b'%' && i + 2 < bytes.len() {
1920 if let (Some(a), Some(c)) = (hex_digit(bytes[i + 1]), hex_digit(bytes[i + 2])) {
1921 out.push(((a << 4) | c) as char);
1922 i += 3;
1923 continue;
1924 }
1925 }
1926 if b == b'+' {
1927 out.push(' ');
1928 } else {
1929 out.push(b as char);
1930 }
1931 i += 1;
1932 }
1933 out
1934}
1935
1936fn hex_digit(b: u8) -> Option<u8> {
1937 match b {
1938 b'0'..=b'9' => Some(b - b'0'),
1939 b'a'..=b'f' => Some(b - b'a' + 10),
1940 b'A'..=b'F' => Some(b - b'A' + 10),
1941 _ => None,
1942 }
1943}
1944
1945pub(crate) fn is_placeholder_label(value: &str) -> bool {
1952 if value.is_empty() {
1953 return true;
1954 }
1955 let lower = value.to_ascii_lowercase();
1956 value.starts_with('{') || lower.starts_with("%7b")
1957}
1958
1959pub(crate) fn extract_body_field(body: &[u8], key: &str) -> Option<String> {
1967 if let Ok(s) = std::str::from_utf8(body) {
1968 let trimmed = s.trim_start();
1969 if trimmed.starts_with('{') {
1970 if let Ok(v) = serde_json::from_str::<serde_json::Value>(trimmed) {
1971 if let Some(field) = v.get(key) {
1972 return match field {
1973 serde_json::Value::String(s) => Some(s.clone()),
1974 serde_json::Value::Number(n) => Some(n.to_string()),
1975 serde_json::Value::Bool(b) => Some(b.to_string()),
1976 _ => None,
1977 };
1978 }
1979 return None;
1980 }
1981 }
1982 let open = format!("<{key}>");
1984 let close = format!("</{key}>");
1985 if let Some(start) = s.find(&open) {
1986 let after = start + open.len();
1987 if let Some(end_rel) = s[after..].find(&close) {
1988 return Some(s[after..after + end_rel].to_string());
1989 }
1990 }
1991 }
1992 None
1993}
1994
1995fn parse_query_value(query: &str, key: &str) -> Option<String> {
1996 let prefix = format!("{key}=");
1997 for pair in query.split('&').filter(|p| !p.is_empty()) {
1998 if let Some(rest) = pair.strip_prefix(&prefix) {
1999 return Some(percent_decode(rest));
2000 }
2001 }
2002 None
2003}
2004
2005#[cfg(test)]
2006mod tests {
2007 use super::*;
2008
2009 #[test]
2010 fn placeholder_label_detects_braces_and_percent_encoding() {
2011 assert!(is_placeholder_label(""));
2012 assert!(is_placeholder_label("{Identifier}"));
2013 assert!(is_placeholder_label("%7BIdentifier%7D"));
2014 assert!(is_placeholder_label("%7bidentifier%7d"));
2015 assert!(!is_placeholder_label("E1234567890ABC"));
2016 assert!(!is_placeholder_label(
2017 "arn:aws:cloudfront::000:distribution/E1"
2018 ));
2019 }
2020
2021 #[test]
2022 fn extract_body_field_handles_json_and_xml() {
2023 let json = br#"{"Stage":"BROKEN","Marker":"x"}"#;
2024 assert_eq!(
2025 extract_body_field(json, "Stage"),
2026 Some("BROKEN".to_string())
2027 );
2028 assert_eq!(extract_body_field(json, "MaxItems"), None);
2029
2030 let xml = br#"<?xml version="1.0"?><Body><Domain>example.com</Domain></Body>"#;
2031 assert_eq!(
2032 extract_body_field(xml, "Domain"),
2033 Some("example.com".to_string())
2034 );
2035 assert_eq!(extract_body_field(xml, "Missing"), None);
2036
2037 assert_eq!(extract_body_field(b"", "x"), None);
2038 }
2039
2040 fn make_state() -> SharedCloudFrontState {
2041 Arc::new(RwLock::new(CloudFrontAccounts::new()))
2042 }
2043
2044 fn make_request(method: http::Method, path: &str, query: &str, body: &str) -> AwsRequest {
2045 AwsRequest {
2046 service: "cloudfront".into(),
2047 action: String::new(),
2048 region: "us-east-1".into(),
2049 account_id: DEFAULT_ACCOUNT.into(),
2050 request_id: Uuid::new_v4().to_string(),
2051 headers: HeaderMap::new(),
2052 query_params: std::collections::HashMap::new(),
2053 body_stream: parking_lot::Mutex::new(None),
2054 body: Bytes::from(body.to_string()),
2055 path_segments: path
2056 .split('/')
2057 .filter(|s| !s.is_empty())
2058 .map(String::from)
2059 .collect(),
2060 raw_path: path.into(),
2061 raw_query: query.into(),
2062 method,
2063 is_query_protocol: false,
2064 access_key_id: None,
2065 principal: None,
2066 }
2067 }
2068
2069 fn minimal_dist_config_xml(caller_ref: &str) -> String {
2070 format!(
2071 r#"<?xml version="1.0" encoding="UTF-8"?>
2072<DistributionConfig xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2073 <CallerReference>{caller_ref}</CallerReference>
2074 <Origins>
2075 <Quantity>1</Quantity>
2076 <Items>
2077 <Origin>
2078 <Id>primary</Id>
2079 <DomainName>example.com</DomainName>
2080 </Origin>
2081 </Items>
2082 </Origins>
2083 <DefaultCacheBehavior>
2084 <TargetOriginId>primary</TargetOriginId>
2085 <ViewerProtocolPolicy>allow-all</ViewerProtocolPolicy>
2086 </DefaultCacheBehavior>
2087 <Comment></Comment>
2088 <Enabled>true</Enabled>
2089</DistributionConfig>"#
2090 )
2091 }
2092
2093 #[tokio::test]
2094 async fn create_then_get_then_delete_distribution() {
2095 let svc = CloudFrontService::new(make_state());
2096 let body = minimal_dist_config_xml("ref-1");
2097 let create = svc
2098 .handle(make_request(
2099 http::Method::POST,
2100 "/2020-05-31/distribution",
2101 "",
2102 &body,
2103 ))
2104 .await
2105 .unwrap();
2106 assert_eq!(create.status, StatusCode::CREATED);
2107 let etag = create
2108 .headers
2109 .get(ETAG)
2110 .unwrap()
2111 .to_str()
2112 .unwrap()
2113 .to_string();
2114 let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2115 let id = xml
2116 .split("<Id>")
2117 .nth(1)
2118 .unwrap()
2119 .split("</Id>")
2120 .next()
2121 .unwrap()
2122 .to_string();
2123
2124 let get = svc
2125 .handle(make_request(
2126 http::Method::GET,
2127 &format!("/2020-05-31/distribution/{id}"),
2128 "",
2129 "",
2130 ))
2131 .await
2132 .unwrap();
2133 assert_eq!(get.status, StatusCode::OK);
2134
2135 let disable_body = body.replace("<Enabled>true</Enabled>", "<Enabled>false</Enabled>");
2137 let mut update_req = make_request(
2138 http::Method::PUT,
2139 &format!("/2020-05-31/distribution/{id}/config"),
2140 "",
2141 &disable_body,
2142 );
2143 update_req.headers.insert(IF_MATCH, etag.parse().unwrap());
2144 let updated = svc.handle(update_req).await.unwrap();
2145 assert_eq!(updated.status, StatusCode::OK);
2146 let new_etag = updated
2147 .headers
2148 .get(ETAG)
2149 .unwrap()
2150 .to_str()
2151 .unwrap()
2152 .to_string();
2153
2154 let mut del_req = make_request(
2155 http::Method::DELETE,
2156 &format!("/2020-05-31/distribution/{id}"),
2157 "",
2158 "",
2159 );
2160 del_req.headers.insert(IF_MATCH, new_etag.parse().unwrap());
2161 let del = svc.handle(del_req).await.unwrap();
2162 assert_eq!(del.status, StatusCode::NO_CONTENT);
2163 }
2164
2165 async fn create_distribution_returning_id(svc: &CloudFrontService, caller_ref: &str) -> String {
2166 let body = minimal_dist_config_xml(caller_ref);
2167 let create = svc
2168 .handle(make_request(
2169 http::Method::POST,
2170 "/2020-05-31/distribution",
2171 "",
2172 &body,
2173 ))
2174 .await
2175 .unwrap();
2176 let xml = std::str::from_utf8(create.body.expect_bytes())
2177 .unwrap()
2178 .to_string();
2179 xml.split("<Id>")
2180 .nth(1)
2181 .unwrap()
2182 .split("</Id>")
2183 .next()
2184 .unwrap()
2185 .to_string()
2186 }
2187
2188 fn distribution_status(svc: &CloudFrontService, id: &str) -> String {
2189 let state = svc.state.read();
2190 state
2191 .accounts
2192 .get(DEFAULT_ACCOUNT)
2193 .and_then(|a| a.distributions.get(id))
2194 .map(|d| d.status.clone())
2195 .unwrap_or_default()
2196 }
2197
2198 #[tokio::test]
2199 async fn create_distribution_starts_in_progress() {
2200 let svc = CloudFrontService::new(make_state())
2202 .with_propagation_delay(std::time::Duration::from_secs(60));
2203 let body = minimal_dist_config_xml("status-ref");
2204 let create = svc
2205 .handle(make_request(
2206 http::Method::POST,
2207 "/2020-05-31/distribution",
2208 "",
2209 &body,
2210 ))
2211 .await
2212 .unwrap();
2213 let xml = std::str::from_utf8(create.body.expect_bytes())
2214 .unwrap()
2215 .to_string();
2216 assert!(
2217 xml.contains("<Status>InProgress</Status>"),
2218 "expected initial status InProgress, got: {xml}"
2219 );
2220 }
2221
2222 #[tokio::test]
2223 async fn auto_transition_after_tick_marks_deployed() {
2224 let svc = CloudFrontService::new(make_state())
2225 .with_propagation_delay(std::time::Duration::from_millis(50));
2226 let id = create_distribution_returning_id(&svc, "auto-tick-ref").await;
2227 assert_eq!(distribution_status(&svc, &id), "InProgress");
2228 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2229 assert_eq!(distribution_status(&svc, &id), "Deployed");
2230 }
2231
2232 #[tokio::test]
2233 async fn set_distribution_status_via_admin_flips_synchronously() {
2234 let svc = CloudFrontService::new(make_state())
2235 .with_propagation_delay(std::time::Duration::from_secs(60));
2236 let id = create_distribution_returning_id(&svc, "admin-ref").await;
2237 assert_eq!(distribution_status(&svc, &id), "InProgress");
2238 assert!(svc.set_distribution_status(&id, "Deployed"));
2239 assert_eq!(distribution_status(&svc, &id), "Deployed");
2240 assert!(svc.set_distribution_status(&id, "InProgress"));
2241 assert_eq!(distribution_status(&svc, &id), "InProgress");
2242 }
2243
2244 #[tokio::test]
2245 async fn set_distribution_status_unknown_id_returns_false() {
2246 let svc = CloudFrontService::new(make_state());
2247 assert!(!svc.set_distribution_status("E-DOES-NOT-EXIST", "Deployed"));
2248 }
2249
2250 #[tokio::test]
2251 async fn update_distribution_resets_to_in_progress() {
2252 let svc = CloudFrontService::new(make_state())
2253 .with_propagation_delay(std::time::Duration::from_secs(60));
2254 let body = minimal_dist_config_xml("update-reset-ref");
2255 let create = svc
2256 .handle(make_request(
2257 http::Method::POST,
2258 "/2020-05-31/distribution",
2259 "",
2260 &body,
2261 ))
2262 .await
2263 .unwrap();
2264 let etag = create
2265 .headers
2266 .get(ETAG)
2267 .unwrap()
2268 .to_str()
2269 .unwrap()
2270 .to_string();
2271 let xml = std::str::from_utf8(create.body.expect_bytes())
2272 .unwrap()
2273 .to_string();
2274 let id = xml
2275 .split("<Id>")
2276 .nth(1)
2277 .unwrap()
2278 .split("</Id>")
2279 .next()
2280 .unwrap()
2281 .to_string();
2282 assert!(svc.set_distribution_status(&id, "Deployed"));
2285 assert_eq!(distribution_status(&svc, &id), "Deployed");
2286
2287 let updated_body = body.replace(
2288 "<ViewerProtocolPolicy>allow-all</ViewerProtocolPolicy>",
2289 "<ViewerProtocolPolicy>https-only</ViewerProtocolPolicy>",
2290 );
2291 let mut update_req = make_request(
2292 http::Method::PUT,
2293 &format!("/2020-05-31/distribution/{id}/config"),
2294 "",
2295 &updated_body,
2296 );
2297 update_req.headers.insert(IF_MATCH, etag.parse().unwrap());
2298 let updated = svc.handle(update_req).await.unwrap();
2299 assert_eq!(updated.status, StatusCode::OK);
2300 assert_eq!(distribution_status(&svc, &id), "InProgress");
2301 }
2302
2303 #[tokio::test]
2304 async fn duplicate_caller_reference_is_rejected() {
2305 let svc = CloudFrontService::new(make_state());
2306 let body = minimal_dist_config_xml("dup-ref");
2307 svc.handle(make_request(
2308 http::Method::POST,
2309 "/2020-05-31/distribution",
2310 "",
2311 &body,
2312 ))
2313 .await
2314 .unwrap();
2315 let result = svc
2316 .handle(make_request(
2317 http::Method::POST,
2318 "/2020-05-31/distribution",
2319 "",
2320 &body,
2321 ))
2322 .await;
2323 let err = match result {
2324 Ok(_) => panic!("expected duplicate caller-reference to fail"),
2325 Err(e) => e,
2326 };
2327 assert_eq!(err.code(), "DistributionAlreadyExists");
2328 assert_eq!(err.status(), StatusCode::CONFLICT);
2329 }
2330
2331 #[tokio::test]
2332 async fn invalidation_lifecycle() {
2333 let svc = CloudFrontService::new(make_state());
2334 let body = minimal_dist_config_xml("inv-ref");
2335 let create = svc
2336 .handle(make_request(
2337 http::Method::POST,
2338 "/2020-05-31/distribution",
2339 "",
2340 &body,
2341 ))
2342 .await
2343 .unwrap();
2344 let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2345 let dist_id = xml
2346 .split("<Id>")
2347 .nth(1)
2348 .unwrap()
2349 .split("</Id>")
2350 .next()
2351 .unwrap();
2352 let inv_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2353<InvalidationBatch xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2354 <Paths><Quantity>1</Quantity><Items><Path>/*</Path></Items></Paths>
2355 <CallerReference>inv-1</CallerReference>
2356</InvalidationBatch>"#;
2357 let inv_resp = svc
2358 .handle(make_request(
2359 http::Method::POST,
2360 &format!("/2020-05-31/distribution/{dist_id}/invalidation"),
2361 "",
2362 inv_body,
2363 ))
2364 .await
2365 .unwrap();
2366 assert_eq!(inv_resp.status, StatusCode::CREATED);
2367 let inv_xml = std::str::from_utf8(inv_resp.body.expect_bytes()).unwrap();
2368 let inv_id = inv_xml
2369 .split("<Id>")
2370 .nth(1)
2371 .unwrap()
2372 .split("</Id>")
2373 .next()
2374 .unwrap();
2375 let get = svc
2376 .handle(make_request(
2377 http::Method::GET,
2378 &format!("/2020-05-31/distribution/{dist_id}/invalidation/{inv_id}"),
2379 "",
2380 "",
2381 ))
2382 .await
2383 .unwrap();
2384 assert_eq!(get.status, StatusCode::OK);
2385 let list = svc
2386 .handle(make_request(
2387 http::Method::GET,
2388 &format!("/2020-05-31/distribution/{dist_id}/invalidation"),
2389 "",
2390 "",
2391 ))
2392 .await
2393 .unwrap();
2394 let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2395 assert!(xml.contains("<Quantity>1</Quantity>"));
2396 }
2397
2398 #[tokio::test]
2399 async fn tags_roundtrip() {
2400 let svc = CloudFrontService::new(make_state());
2401 let body = minimal_dist_config_xml("tag-ref");
2402 let create = svc
2403 .handle(make_request(
2404 http::Method::POST,
2405 "/2020-05-31/distribution",
2406 "",
2407 &body,
2408 ))
2409 .await
2410 .unwrap();
2411 let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2412 let arn = xml
2413 .split("<ARN>")
2414 .nth(1)
2415 .unwrap()
2416 .split("</ARN>")
2417 .next()
2418 .unwrap();
2419 let tag_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2420<Tags xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2421 <Items><Tag><Key>env</Key><Value>prod</Value></Tag></Items>
2422</Tags>"#;
2423 let arn_q = format!("Operation=Tag&Resource={}", arn);
2424 let resp = svc
2425 .handle(make_request(
2426 http::Method::POST,
2427 "/2020-05-31/tagging",
2428 &arn_q,
2429 tag_body,
2430 ))
2431 .await
2432 .unwrap();
2433 assert_eq!(resp.status, StatusCode::NO_CONTENT);
2434 let list = svc
2435 .handle(make_request(
2436 http::Method::GET,
2437 "/2020-05-31/tagging",
2438 &format!("Resource={}", arn),
2439 "",
2440 ))
2441 .await
2442 .unwrap();
2443 let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2444 assert!(xml.contains("<Key>env</Key>"));
2445 assert!(xml.contains("<Value>prod</Value>"));
2446 }
2447
2448 #[tokio::test]
2449 async fn xml_metacharacters_in_user_input_are_escaped() {
2450 let svc = CloudFrontService::new(make_state());
2451 let body = minimal_dist_config_xml("escape-ref").replace(
2452 "<Comment></Comment>",
2453 "<Comment><![CDATA[a&b<c>d]]></Comment>",
2454 );
2455 let create = svc
2456 .handle(make_request(
2457 http::Method::POST,
2458 "/2020-05-31/distribution",
2459 "",
2460 &body,
2461 ))
2462 .await
2463 .unwrap();
2464 let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2465 let dist_id = xml
2466 .split("<Id>")
2467 .nth(1)
2468 .unwrap()
2469 .split("</Id>")
2470 .next()
2471 .unwrap();
2472 let arn = xml
2473 .split("<ARN>")
2474 .nth(1)
2475 .unwrap()
2476 .split("</ARN>")
2477 .next()
2478 .unwrap();
2479
2480 let tag_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2481<Tags xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2482 <Items><Tag><Key>env</Key><Value>a&b<c>d</Value></Tag></Items>
2483</Tags>"#;
2484 let arn_q = format!("Operation=Tag&Resource={}", arn);
2485 svc.handle(make_request(
2486 http::Method::POST,
2487 "/2020-05-31/tagging",
2488 &arn_q,
2489 tag_body,
2490 ))
2491 .await
2492 .unwrap();
2493
2494 let list = svc
2495 .handle(make_request(
2496 http::Method::GET,
2497 "/2020-05-31/tagging",
2498 &format!("Resource={}", arn),
2499 "",
2500 ))
2501 .await
2502 .unwrap();
2503 let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2504 assert!(xml.contains("<Value>a&b<c>d</Value>"));
2505 assert!(!xml.contains("<Value>a&b<c>d</Value>"));
2506
2507 let list_resp = svc
2510 .handle(make_request(
2511 http::Method::GET,
2512 "/2020-05-31/distribution",
2513 "",
2514 "",
2515 ))
2516 .await
2517 .unwrap();
2518 let xml = std::str::from_utf8(list_resp.body.expect_bytes()).unwrap();
2519 assert!(!xml.contains("<Comment>a&b<c>d"));
2522 assert!(xml.contains(dist_id));
2523 }
2524}