1use std::sync::Arc;
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use chrono::Utc;
8use http::header::{HeaderName, HeaderValue, ETAG, IF_MATCH, LOCATION};
9use http::{HeaderMap, StatusCode};
10use parking_lot::RwLock;
11use tokio::sync::Mutex as AsyncMutex;
12use uuid::Uuid;
13
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError, ResponseBody};
15use fakecloud_persistence::SnapshotStore;
16
17use crate::model::{
18 DistributionConfig, DistributionConfigWithTags, InvalidationBatch, TagKeys, Tags as ModelTags,
19};
20use crate::router::{route, Route};
21use crate::state::{
22 CloudFrontAccounts, CloudFrontSnapshot, SharedCloudFrontState, StoredDistribution,
23 StoredInvalidation, Tag, CLOUDFRONT_SNAPSHOT_SCHEMA_VERSION,
24};
25use crate::xml_io;
26
27fn is_mutating_action(action: &str) -> bool {
31 const PREFIXES: &[&str] = &[
32 "Create",
33 "Update",
34 "Delete",
35 "Copy",
36 "Associate",
37 "Disassociate",
38 "Tag",
39 "Untag",
40 "Publish",
41 "Put",
42 ];
43 PREFIXES.iter().any(|p| action.starts_with(p))
44}
45
46pub(crate) const DEFAULT_ACCOUNT: &str = "000000000000";
47
48const SUPPORTED_ACTIONS: &[&str] = &[
49 "CreateDistribution",
50 "CreateDistributionWithTags",
51 "GetDistribution",
52 "GetDistributionConfig",
53 "UpdateDistribution",
54 "DeleteDistribution",
55 "ListDistributions",
56 "CopyDistribution",
57 "CreateInvalidation",
58 "GetInvalidation",
59 "ListInvalidations",
60 "TagResource",
61 "UntagResource",
62 "ListTagsForResource",
63 "AssociateAlias",
64 "ListConflictingAliases",
65 "ListDistributionsByCachePolicyId",
66 "ListDistributionsByOriginRequestPolicyId",
67 "ListDistributionsByResponseHeadersPolicyId",
68 "ListDistributionsByKeyGroup",
69 "ListDistributionsByWebACLId",
70 "ListDistributionsByVpcOriginId",
71 "ListDistributionsByAnycastIpListId",
72 "ListDistributionsByConnectionMode",
73 "ListDistributionsByConnectionFunction",
74 "ListDistributionsByOwnedResource",
75 "ListDistributionsByTrustStore",
76 "ListDistributionsByRealtimeLogConfig",
77 "AssociateDistributionWebACL",
78 "DisassociateDistributionWebACL",
79 "CreateOriginAccessControl",
80 "GetOriginAccessControl",
81 "GetOriginAccessControlConfig",
82 "UpdateOriginAccessControl",
83 "DeleteOriginAccessControl",
84 "ListOriginAccessControls",
85 "CreateCachePolicy",
86 "GetCachePolicy",
87 "GetCachePolicyConfig",
88 "UpdateCachePolicy",
89 "DeleteCachePolicy",
90 "ListCachePolicies",
91 "CreateOriginRequestPolicy",
92 "GetOriginRequestPolicy",
93 "GetOriginRequestPolicyConfig",
94 "UpdateOriginRequestPolicy",
95 "DeleteOriginRequestPolicy",
96 "ListOriginRequestPolicies",
97 "CreateResponseHeadersPolicy",
98 "GetResponseHeadersPolicy",
99 "GetResponseHeadersPolicyConfig",
100 "UpdateResponseHeadersPolicy",
101 "DeleteResponseHeadersPolicy",
102 "ListResponseHeadersPolicies",
103 "CreateContinuousDeploymentPolicy",
104 "GetContinuousDeploymentPolicy",
105 "GetContinuousDeploymentPolicyConfig",
106 "UpdateContinuousDeploymentPolicy",
107 "DeleteContinuousDeploymentPolicy",
108 "ListContinuousDeploymentPolicies",
109 "CreateFunction",
110 "DescribeFunction",
111 "GetFunction",
112 "UpdateFunction",
113 "DeleteFunction",
114 "ListFunctions",
115 "PublishFunction",
116 "TestFunction",
117 "CreatePublicKey",
118 "GetPublicKey",
119 "GetPublicKeyConfig",
120 "UpdatePublicKey",
121 "DeletePublicKey",
122 "ListPublicKeys",
123 "CreateKeyGroup",
124 "GetKeyGroup",
125 "GetKeyGroupConfig",
126 "UpdateKeyGroup",
127 "DeleteKeyGroup",
128 "ListKeyGroups",
129 "CreateKeyValueStore",
130 "DescribeKeyValueStore",
131 "UpdateKeyValueStore",
132 "DeleteKeyValueStore",
133 "ListKeyValueStores",
134 "CreateCloudFrontOriginAccessIdentity",
135 "GetCloudFrontOriginAccessIdentity",
136 "GetCloudFrontOriginAccessIdentityConfig",
137 "UpdateCloudFrontOriginAccessIdentity",
138 "DeleteCloudFrontOriginAccessIdentity",
139 "ListCloudFrontOriginAccessIdentities",
140 "CreateMonitoringSubscription",
141 "GetMonitoringSubscription",
142 "DeleteMonitoringSubscription",
143 "CreateStreamingDistribution",
144 "CreateStreamingDistributionWithTags",
145 "GetStreamingDistribution",
146 "GetStreamingDistributionConfig",
147 "UpdateStreamingDistribution",
148 "DeleteStreamingDistribution",
149 "ListStreamingDistributions",
150 "CreateFieldLevelEncryptionConfig",
151 "GetFieldLevelEncryption",
152 "GetFieldLevelEncryptionConfig",
153 "UpdateFieldLevelEncryptionConfig",
154 "DeleteFieldLevelEncryptionConfig",
155 "ListFieldLevelEncryptionConfigs",
156 "CreateFieldLevelEncryptionProfile",
157 "GetFieldLevelEncryptionProfile",
158 "GetFieldLevelEncryptionProfileConfig",
159 "UpdateFieldLevelEncryptionProfile",
160 "DeleteFieldLevelEncryptionProfile",
161 "ListFieldLevelEncryptionProfiles",
162 "CreateRealtimeLogConfig",
163 "GetRealtimeLogConfig",
164 "UpdateRealtimeLogConfig",
165 "DeleteRealtimeLogConfig",
166 "ListRealtimeLogConfigs",
167 "CreateVpcOrigin",
168 "GetVpcOrigin",
169 "UpdateVpcOrigin",
170 "DeleteVpcOrigin",
171 "ListVpcOrigins",
172 "CreateAnycastIpList",
173 "GetAnycastIpList",
174 "UpdateAnycastIpList",
175 "DeleteAnycastIpList",
176 "ListAnycastIpLists",
177 "CreateTrustStore",
178 "GetTrustStore",
179 "UpdateTrustStore",
180 "DeleteTrustStore",
181 "ListTrustStores",
182 "GetResourcePolicy",
183 "PutResourcePolicy",
184 "DeleteResourcePolicy",
185 "CreateConnectionGroup",
186 "GetConnectionGroup",
187 "GetConnectionGroupByRoutingEndpoint",
188 "UpdateConnectionGroup",
189 "DeleteConnectionGroup",
190 "ListConnectionGroups",
191 "ListDomainConflicts",
192 "UpdateDomainAssociation",
193 "VerifyDnsConfiguration",
194 "GetManagedCertificateDetails",
195 "UpdateDistributionWithStagingConfig",
196 "CreateDistributionTenant",
197 "GetDistributionTenant",
198 "GetDistributionTenantByDomain",
199 "UpdateDistributionTenant",
200 "DeleteDistributionTenant",
201 "ListDistributionTenants",
202 "ListDistributionTenantsByCustomization",
203 "AssociateDistributionTenantWebACL",
204 "DisassociateDistributionTenantWebACL",
205 "CreateInvalidationForDistributionTenant",
206 "GetInvalidationForDistributionTenant",
207 "ListInvalidationsForDistributionTenant",
208 "CreateConnectionFunction",
209 "GetConnectionFunction",
210 "DescribeConnectionFunction",
211 "UpdateConnectionFunction",
212 "DeleteConnectionFunction",
213 "ListConnectionFunctions",
214 "PublishConnectionFunction",
215 "TestConnectionFunction",
216];
217
218pub struct CloudFrontService {
219 pub(crate) state: SharedCloudFrontState,
220 pub(crate) propagation_delay: std::time::Duration,
230 pub(crate) snapshot_store: Option<Arc<dyn SnapshotStore>>,
231 pub(crate) snapshot_lock: Arc<AsyncMutex<()>>,
232}
233
234fn default_propagation_delay() -> std::time::Duration {
241 let Ok(raw) = std::env::var("FAKECLOUD_CLOUDFRONT_STATUS_DELAY_SEC") else {
242 return std::time::Duration::from_secs(1);
243 };
244 let trimmed = raw.trim();
245 if let Ok(secs) = trimmed.parse::<u64>() {
246 return std::time::Duration::from_secs(secs);
247 }
248 if let Ok(secs) = trimmed.parse::<f64>() {
249 if secs.is_finite() && secs >= 0.0 {
250 return std::time::Duration::from_secs_f64(secs);
251 }
252 }
253 std::time::Duration::from_secs(1)
254}
255
256impl CloudFrontService {
257 pub fn new(state: SharedCloudFrontState) -> Self {
258 Self {
259 state,
260 propagation_delay: default_propagation_delay(),
261 snapshot_store: None,
262 snapshot_lock: Arc::new(AsyncMutex::new(())),
263 }
264 }
265
266 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
267 self.snapshot_store = Some(store);
268 self
269 }
270
271 pub fn shared_state(&self) -> SharedCloudFrontState {
272 Arc::clone(&self.state)
273 }
274
275 async fn save_snapshot(&self) {
279 save_cloudfront_snapshot(
280 &self.state,
281 self.snapshot_store.clone(),
282 &self.snapshot_lock,
283 )
284 .await;
285 }
286
287 pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
292 let store = self.snapshot_store.clone()?;
293 let state = self.state.clone();
294 let lock = self.snapshot_lock.clone();
295 Some(Arc::new(move || {
296 let state = state.clone();
297 let store = store.clone();
298 let lock = lock.clone();
299 Box::pin(async move {
300 save_cloudfront_snapshot(&state, Some(store), &lock).await;
301 })
302 }))
303 }
304
305 pub fn rearm_in_progress(&self) {
310 let (dists, tenants, groups, streaming) = {
311 let s = self.state.read();
312 let mut dists = Vec::new();
313 let mut tenants = Vec::new();
314 let mut groups = Vec::new();
315 let mut streaming = Vec::new();
316 for account in s.accounts.values() {
317 for (id, d) in &account.distributions {
318 if d.status == "InProgress" {
319 dists.push(id.clone());
320 }
321 }
322 for (id, t) in &account.distribution_tenants {
323 if t.status == "InProgress" {
324 tenants.push(id.clone());
325 }
326 }
327 for (id, g) in &account.connection_groups {
328 if g.status == "InProgress" {
329 groups.push(id.clone());
330 }
331 }
332 for (id, d) in &account.streaming_distributions {
333 if d.status == "InProgress" {
334 streaming.push(id.clone());
335 }
336 }
337 }
338 (dists, tenants, groups, streaming)
339 };
340 for id in dists {
341 self.schedule_distribution_deploy(id);
342 }
343 for id in tenants {
344 self.schedule_distribution_tenant_deploy(id);
345 }
346 for id in groups {
347 self.schedule_connection_group_deploy(id);
348 }
349 for id in streaming {
350 self.schedule_streaming_distribution_deploy(id);
351 }
352 }
353
354 pub fn with_propagation_delay(mut self, delay: std::time::Duration) -> Self {
360 self.propagation_delay = delay;
361 self
362 }
363
364 pub fn set_distribution_status(&self, id: &str, status: &str) -> bool {
370 let mut state = self.state.write();
371 for account in state.accounts.values_mut() {
372 if let Some(dist) = account.distributions.get_mut(id) {
373 dist.status = status.to_string();
374 return true;
375 }
376 }
377 false
378 }
379
380 pub async fn set_distribution_status_persistent(&self, id: &str, status: &str) -> bool {
383 let changed = self.set_distribution_status(id, status);
384 if changed {
385 self.save_snapshot().await;
386 }
387 changed
388 }
389}
390
391pub async fn save_cloudfront_snapshot(
397 state: &SharedCloudFrontState,
398 store: Option<Arc<dyn SnapshotStore>>,
399 lock: &AsyncMutex<()>,
400) {
401 let Some(store) = store else {
402 return;
403 };
404 let _guard = lock.lock().await;
405 let snapshot = CloudFrontSnapshot {
406 schema_version: CLOUDFRONT_SNAPSHOT_SCHEMA_VERSION,
407 accounts: Some(state.read().clone()),
408 };
409 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
410 let bytes = serde_json::to_vec(&snapshot)
411 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
412 store.save(&bytes)
413 })
414 .await;
415 match join {
416 Ok(Ok(())) => {}
417 Ok(Err(err)) => tracing::error!(%err, "failed to write cloudfront snapshot"),
418 Err(err) => tracing::error!(%err, "cloudfront snapshot task panicked"),
419 }
420}
421
422impl Default for CloudFrontService {
423 fn default() -> Self {
424 Self::new(Arc::new(RwLock::new(CloudFrontAccounts::new())))
425 }
426}
427
428#[async_trait]
429impl AwsService for CloudFrontService {
430 fn service_name(&self) -> &str {
431 "cloudfront"
432 }
433
434 fn supported_actions(&self) -> &[&str] {
435 SUPPORTED_ACTIONS
436 }
437
438 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
439 let resolved = match route(&req.method, &req.raw_path, &req.raw_query) {
440 Some(r) => r,
441 None => {
442 return Err(aws_error(
443 StatusCode::NOT_FOUND,
444 "InvalidArgument",
445 format!("Unknown CloudFront route: {} {}", req.method, req.raw_path),
446 ));
447 }
448 };
449
450 let mutates = is_mutating_action(resolved.action);
451 let result = match resolved.action {
452 "CreateDistribution" => self.create_distribution(&req, false),
453 "CreateDistributionWithTags" => self.create_distribution(&req, true),
454 "GetDistribution" => self.get_distribution(&resolved),
455 "GetDistributionConfig" => self.get_distribution_config(&resolved),
456 "UpdateDistribution" => self.update_distribution(&req, &resolved),
457 "DeleteDistribution" => self.delete_distribution(&req, &resolved),
458 "ListDistributions" => self.list_distributions(&req),
459 "CopyDistribution" => self.copy_distribution(&req, &resolved),
460 "CreateInvalidation" => self.create_invalidation(&req, &resolved),
461 "GetInvalidation" => self.get_invalidation(&resolved),
462 "ListInvalidations" => self.list_invalidations(&resolved),
463 "TagResource" => self.tag_resource(&req),
464 "UntagResource" => self.untag_resource(&req),
465 "ListTagsForResource" => self.list_tags_for_resource(&req),
466 "AssociateAlias" => self.associate_alias(&req, &resolved),
467 "ListConflictingAliases" => self.list_conflicting_aliases(&req),
468 "AssociateDistributionWebACL" => self.associate_web_acl(&req, &resolved),
469 "DisassociateDistributionWebACL" => self.disassociate_web_acl(&req, &resolved),
470 "ListDistributionsByCachePolicyId"
471 | "ListDistributionsByOriginRequestPolicyId"
472 | "ListDistributionsByResponseHeadersPolicyId"
473 | "ListDistributionsByKeyGroup"
474 | "ListDistributionsByWebACLId"
475 | "ListDistributionsByVpcOriginId"
476 | "ListDistributionsByAnycastIpListId"
477 | "ListDistributionsByConnectionMode"
478 | "ListDistributionsByConnectionFunction"
479 | "ListDistributionsByOwnedResource"
480 | "ListDistributionsByTrustStore"
481 | "ListDistributionsByRealtimeLogConfig" => {
482 self.list_distributions_by(&req, &resolved, resolved.action)
483 }
484 "CreateOriginAccessControl" => self.create_origin_access_control(&req),
485 "GetOriginAccessControl" => self.get_origin_access_control(&resolved),
486 "GetOriginAccessControlConfig" => self.get_origin_access_control_config(&resolved),
487 "UpdateOriginAccessControl" => self.update_origin_access_control(&req, &resolved),
488 "DeleteOriginAccessControl" => self.delete_origin_access_control(&req, &resolved),
489 "ListOriginAccessControls" => self.list_origin_access_controls(&req),
490 "CreateCachePolicy" => self.create_cache_policy(&req),
491 "GetCachePolicy" => self.get_cache_policy(&resolved),
492 "GetCachePolicyConfig" => self.get_cache_policy_config(&resolved),
493 "UpdateCachePolicy" => self.update_cache_policy(&req, &resolved),
494 "DeleteCachePolicy" => self.delete_cache_policy(&req, &resolved),
495 "ListCachePolicies" => self.list_cache_policies(&req),
496 "CreateOriginRequestPolicy" => self.create_origin_request_policy(&req),
497 "GetOriginRequestPolicy" => self.get_origin_request_policy(&resolved),
498 "GetOriginRequestPolicyConfig" => self.get_origin_request_policy_config(&resolved),
499 "UpdateOriginRequestPolicy" => self.update_origin_request_policy(&req, &resolved),
500 "DeleteOriginRequestPolicy" => self.delete_origin_request_policy(&req, &resolved),
501 "ListOriginRequestPolicies" => self.list_origin_request_policies(&req),
502 "CreateResponseHeadersPolicy" => self.create_response_headers_policy(&req),
503 "GetResponseHeadersPolicy" => self.get_response_headers_policy(&resolved),
504 "GetResponseHeadersPolicyConfig" => self.get_response_headers_policy_config(&resolved),
505 "UpdateResponseHeadersPolicy" => self.update_response_headers_policy(&req, &resolved),
506 "DeleteResponseHeadersPolicy" => self.delete_response_headers_policy(&req, &resolved),
507 "ListResponseHeadersPolicies" => self.list_response_headers_policies(&req),
508 "CreateContinuousDeploymentPolicy" => self.create_continuous_deployment_policy(&req),
509 "GetContinuousDeploymentPolicy" => self.get_continuous_deployment_policy(&resolved),
510 "GetContinuousDeploymentPolicyConfig" => {
511 self.get_continuous_deployment_policy_config(&resolved)
512 }
513 "UpdateContinuousDeploymentPolicy" => {
514 self.update_continuous_deployment_policy(&req, &resolved)
515 }
516 "DeleteContinuousDeploymentPolicy" => {
517 self.delete_continuous_deployment_policy(&req, &resolved)
518 }
519 "ListContinuousDeploymentPolicies" => self.list_continuous_deployment_policies(&req),
520 "CreateFunction" => self.create_function(&req),
521 "DescribeFunction" => self.describe_function(&req, &resolved),
522 "GetFunction" => self.get_function(&req, &resolved),
523 "UpdateFunction" => self.update_function(&req, &resolved),
524 "DeleteFunction" => self.delete_function(&req, &resolved),
525 "ListFunctions" => self.list_functions(&req),
526 "PublishFunction" => self.publish_function(&req, &resolved),
527 "TestFunction" => self.test_function(&req, &resolved),
528 "CreatePublicKey" => self.create_public_key(&req),
529 "GetPublicKey" => self.get_public_key(&resolved),
530 "GetPublicKeyConfig" => self.get_public_key_config(&resolved),
531 "UpdatePublicKey" => self.update_public_key(&req, &resolved),
532 "DeletePublicKey" => self.delete_public_key(&req, &resolved),
533 "ListPublicKeys" => self.list_public_keys(&req),
534 "CreateKeyGroup" => self.create_key_group(&req),
535 "GetKeyGroup" => self.get_key_group(&resolved),
536 "GetKeyGroupConfig" => self.get_key_group_config(&resolved),
537 "UpdateKeyGroup" => self.update_key_group(&req, &resolved),
538 "DeleteKeyGroup" => self.delete_key_group(&req, &resolved),
539 "ListKeyGroups" => self.list_key_groups(&req),
540 "CreateKeyValueStore" => self.create_key_value_store(&req),
541 "DescribeKeyValueStore" => self.describe_key_value_store(&resolved),
542 "UpdateKeyValueStore" => self.update_key_value_store(&req, &resolved),
543 "DeleteKeyValueStore" => self.delete_key_value_store(&req, &resolved),
544 "ListKeyValueStores" => self.list_key_value_stores(&req),
545 "CreateCloudFrontOriginAccessIdentity" => self.create_oai(&req),
546 "GetCloudFrontOriginAccessIdentity" => self.get_oai(&resolved),
547 "GetCloudFrontOriginAccessIdentityConfig" => self.get_oai_config(&resolved),
548 "UpdateCloudFrontOriginAccessIdentity" => self.update_oai(&req, &resolved),
549 "DeleteCloudFrontOriginAccessIdentity" => self.delete_oai(&req, &resolved),
550 "ListCloudFrontOriginAccessIdentities" => self.list_oai(&req),
551 "CreateMonitoringSubscription" => self.create_monitoring_subscription(&req, &resolved),
552 "GetMonitoringSubscription" => self.get_monitoring_subscription(&resolved),
553 "DeleteMonitoringSubscription" => self.delete_monitoring_subscription(&resolved),
554 "CreateStreamingDistribution" => self.create_streaming_distribution(&req, false),
555 "CreateStreamingDistributionWithTags" => self.create_streaming_distribution(&req, true),
556 "GetStreamingDistribution" => self.get_streaming_distribution(&resolved),
557 "GetStreamingDistributionConfig" => self.get_streaming_distribution_config(&resolved),
558 "UpdateStreamingDistribution" => self.update_streaming_distribution(&req, &resolved),
559 "DeleteStreamingDistribution" => self.delete_streaming_distribution(&req, &resolved),
560 "ListStreamingDistributions" => self.list_streaming_distributions(&req),
561 "CreateFieldLevelEncryptionConfig" => self.create_field_level_encryption_config(&req),
562 "GetFieldLevelEncryption" => self.get_field_level_encryption(&resolved),
563 "GetFieldLevelEncryptionConfig" => self.get_field_level_encryption_config(&resolved),
564 "UpdateFieldLevelEncryptionConfig" => {
565 self.update_field_level_encryption_config(&req, &resolved)
566 }
567 "DeleteFieldLevelEncryptionConfig" => {
568 self.delete_field_level_encryption_config(&req, &resolved)
569 }
570 "ListFieldLevelEncryptionConfigs" => self.list_field_level_encryption_configs(&req),
571 "CreateFieldLevelEncryptionProfile" => self.create_field_level_encryption_profile(&req),
572 "GetFieldLevelEncryptionProfile" => self.get_field_level_encryption_profile(&resolved),
573 "GetFieldLevelEncryptionProfileConfig" => {
574 self.get_field_level_encryption_profile_config(&resolved)
575 }
576 "UpdateFieldLevelEncryptionProfile" => {
577 self.update_field_level_encryption_profile(&req, &resolved)
578 }
579 "DeleteFieldLevelEncryptionProfile" => {
580 self.delete_field_level_encryption_profile(&req, &resolved)
581 }
582 "ListFieldLevelEncryptionProfiles" => self.list_field_level_encryption_profiles(&req),
583 "CreateRealtimeLogConfig" => self.create_realtime_log_config(&req),
584 "GetRealtimeLogConfig" => self.get_realtime_log_config(&req),
585 "UpdateRealtimeLogConfig" => self.update_realtime_log_config(&req),
586 "DeleteRealtimeLogConfig" => self.delete_realtime_log_config(&req),
587 "ListRealtimeLogConfigs" => self.list_realtime_log_configs(&req),
588 "CreateVpcOrigin" => self.create_vpc_origin(&req),
589 "GetVpcOrigin" => self.get_vpc_origin(&resolved),
590 "UpdateVpcOrigin" => self.update_vpc_origin(&req, &resolved),
591 "DeleteVpcOrigin" => self.delete_vpc_origin(&req, &resolved),
592 "ListVpcOrigins" => self.list_vpc_origins(&req),
593 "CreateAnycastIpList" => self.create_anycast_ip_list(&req),
594 "GetAnycastIpList" => self.get_anycast_ip_list(&resolved),
595 "UpdateAnycastIpList" => self.update_anycast_ip_list(&req, &resolved),
596 "DeleteAnycastIpList" => self.delete_anycast_ip_list(&req, &resolved),
597 "ListAnycastIpLists" => self.list_anycast_ip_lists(&req),
598 "CreateTrustStore" => self.create_trust_store(&req),
599 "GetTrustStore" => self.get_trust_store(&resolved),
600 "UpdateTrustStore" => self.update_trust_store(&req, &resolved),
601 "DeleteTrustStore" => self.delete_trust_store(&req, &resolved),
602 "ListTrustStores" => self.list_trust_stores(&req),
603 "GetResourcePolicy" => self.get_resource_policy(&req),
604 "PutResourcePolicy" => self.put_resource_policy(&req),
605 "DeleteResourcePolicy" => self.delete_resource_policy(&req),
606 "CreateConnectionGroup" => self.create_connection_group(&req),
607 "GetConnectionGroup" => self.get_connection_group(&resolved),
608 "GetConnectionGroupByRoutingEndpoint" => {
609 self.get_connection_group_by_routing_endpoint(&req)
610 }
611 "UpdateConnectionGroup" => self.update_connection_group(&req, &resolved),
612 "DeleteConnectionGroup" => self.delete_connection_group(&req, &resolved),
613 "ListConnectionGroups" => self.list_connection_groups(&req),
614 "ListDomainConflicts" => self.list_domain_conflicts(&req),
615 "UpdateDomainAssociation" => self.update_domain_association(&req),
616 "VerifyDnsConfiguration" => self.verify_dns_configuration(&req),
617 "GetManagedCertificateDetails" => self.get_managed_certificate_details(&resolved),
618 "UpdateDistributionWithStagingConfig" => {
619 self.update_distribution_with_staging_config(&req, &resolved)
620 }
621 "CreateDistributionTenant" => self.create_distribution_tenant(&req),
622 "GetDistributionTenant" => self.get_distribution_tenant(&resolved),
623 "GetDistributionTenantByDomain" => self.get_distribution_tenant_by_domain(&req),
624 "UpdateDistributionTenant" => self.update_distribution_tenant(&req, &resolved),
625 "DeleteDistributionTenant" => self.delete_distribution_tenant(&req, &resolved),
626 "ListDistributionTenants" => self.list_distribution_tenants(&req),
627 "ListDistributionTenantsByCustomization" => {
628 self.list_distribution_tenants_by_customization(&req)
629 }
630 "AssociateDistributionTenantWebACL" => {
631 self.associate_distribution_tenant_web_acl(&req, &resolved)
632 }
633 "DisassociateDistributionTenantWebACL" => {
634 self.disassociate_distribution_tenant_web_acl(&req, &resolved)
635 }
636 "CreateInvalidationForDistributionTenant" => {
637 self.create_invalidation_for_distribution_tenant(&req, &resolved)
638 }
639 "GetInvalidationForDistributionTenant" => {
640 self.get_invalidation_for_distribution_tenant(&resolved)
641 }
642 "ListInvalidationsForDistributionTenant" => {
643 self.list_invalidations_for_distribution_tenant(&resolved)
644 }
645 "CreateConnectionFunction" => self.create_connection_function(&req),
646 "GetConnectionFunction" => self.get_connection_function(&resolved),
647 "DescribeConnectionFunction" => self.describe_connection_function(&resolved),
648 "UpdateConnectionFunction" => self.update_connection_function(&req, &resolved),
649 "DeleteConnectionFunction" => self.delete_connection_function(&req, &resolved),
650 "ListConnectionFunctions" => self.list_connection_functions(&req),
651 "PublishConnectionFunction" => self.publish_connection_function(&req, &resolved),
652 "TestConnectionFunction" => self.test_connection_function(&req, &resolved),
653 other => Err(aws_error(
654 StatusCode::NOT_IMPLEMENTED,
655 "InvalidAction",
656 format!("CloudFront action {other} is not implemented yet"),
657 )),
658 };
659 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
660 self.save_snapshot().await;
661 }
662 result
663 }
664}
665
666impl CloudFrontService {
669 fn create_distribution(
670 &self,
671 req: &AwsRequest,
672 with_tags: bool,
673 ) -> Result<AwsResponse, AwsServiceError> {
674 let (config, tags) = if with_tags {
675 let parsed: DistributionConfigWithTags = xml_io::from_xml_root(&req.body)
676 .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
677 let tags = parsed
678 .tags
679 .items
680 .map(|i| {
681 i.tag
682 .into_iter()
683 .map(|t| Tag {
684 key: t.key,
685 value: t.value,
686 })
687 .collect()
688 })
689 .unwrap_or_default();
690 (parsed.distribution_config, tags)
691 } else {
692 let parsed: DistributionConfig = xml_io::from_xml_root(&req.body)
693 .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
694 (parsed, Vec::new())
695 };
696
697 validate_caller_reference(&config.caller_reference)?;
698 validate_origins(&config)?;
699
700 let mut state = self.state.write();
701 let account = state.entry(account_id(req));
702
703 if let Some(existing) = account
704 .distributions
705 .values()
706 .find(|d| d.config.caller_reference == config.caller_reference)
707 {
708 return Err(aws_error(
709 StatusCode::CONFLICT,
710 "DistributionAlreadyExists",
711 format!(
712 "Distribution with the same CallerReference exists: {}",
713 existing.id
714 ),
715 ));
716 }
717
718 let id = generate_distribution_id();
719 let now = Utc::now();
720 let etag = generate_etag();
721 let domain = format!("{}.cloudfront.net", id.to_lowercase());
722 let arn = format!(
723 "arn:aws:cloudfront::{}:distribution/{}",
724 account_id(req),
725 id
726 );
727
728 let stored = StoredDistribution {
729 id: id.clone(),
730 arn: arn.clone(),
731 status: "InProgress".to_string(),
735 last_modified_time: now,
736 domain_name: domain,
737 in_progress_invalidation_batches: 0,
738 etag: etag.clone(),
739 config,
740 };
741 account.distributions.insert(id.clone(), stored.clone());
742 if !tags.is_empty() {
743 account.tags.insert(arn.clone(), tags);
744 }
745 drop(state);
746
747 self.schedule_distribution_deploy(id.clone());
748
749 let body = build_distribution_xml(&stored);
750 let mut headers = HeaderMap::new();
751 set_header(&mut headers, ETAG, &etag);
752 set_header(&mut headers, LOCATION, &stored.arn);
753 Ok(xml_response(StatusCode::CREATED, body, headers))
754 }
755
756 fn schedule_distribution_deploy(&self, id: String) {
761 let state = Arc::clone(&self.state);
762 let delay = self.propagation_delay;
763 let store = self.snapshot_store.clone();
764 let lock = self.snapshot_lock.clone();
765 tokio::spawn(async move {
766 tokio::time::sleep(delay).await;
767 let flipped = {
768 let mut s = state.write();
769 let mut flipped = false;
770 for account in s.accounts.values_mut() {
771 if let Some(d) = account.distributions.get_mut(&id) {
772 if d.status == "InProgress" {
773 d.status = "Deployed".to_string();
774 flipped = true;
775 }
776 break;
777 }
778 }
779 flipped
780 };
781 if flipped {
782 save_cloudfront_snapshot(&state, store, &lock).await;
783 }
784 });
785 }
786
787 pub(crate) fn schedule_distribution_tenant_deploy(&self, id: String) {
789 let state = Arc::clone(&self.state);
790 let delay = self.propagation_delay;
791 let store = self.snapshot_store.clone();
792 let lock = self.snapshot_lock.clone();
793 tokio::spawn(async move {
794 tokio::time::sleep(delay).await;
795 let flipped = {
796 let mut s = state.write();
797 let mut flipped = false;
798 for account in s.accounts.values_mut() {
799 if let Some(t) = account.distribution_tenants.get_mut(&id) {
800 if t.status == "InProgress" {
801 t.status = "Deployed".to_string();
802 flipped = true;
803 }
804 break;
805 }
806 }
807 flipped
808 };
809 if flipped {
810 save_cloudfront_snapshot(&state, store, &lock).await;
811 }
812 });
813 }
814
815 pub(crate) fn schedule_connection_group_deploy(&self, id: String) {
817 let state = Arc::clone(&self.state);
818 let delay = self.propagation_delay;
819 let store = self.snapshot_store.clone();
820 let lock = self.snapshot_lock.clone();
821 tokio::spawn(async move {
822 tokio::time::sleep(delay).await;
823 let flipped = {
824 let mut s = state.write();
825 let mut flipped = false;
826 for account in s.accounts.values_mut() {
827 if let Some(g) = account.connection_groups.get_mut(&id) {
828 if g.status == "InProgress" {
829 g.status = "Deployed".to_string();
830 flipped = true;
831 }
832 break;
833 }
834 }
835 flipped
836 };
837 if flipped {
838 save_cloudfront_snapshot(&state, store, &lock).await;
839 }
840 });
841 }
842
843 pub(crate) fn schedule_streaming_distribution_deploy(&self, id: String) {
848 let state = Arc::clone(&self.state);
849 let delay = self.propagation_delay;
850 let store = self.snapshot_store.clone();
851 let lock = self.snapshot_lock.clone();
852 tokio::spawn(async move {
853 tokio::time::sleep(delay).await;
854 let flipped = {
855 let mut s = state.write();
856 let mut flipped = false;
857 for account in s.accounts.values_mut() {
858 if let Some(d) = account.streaming_distributions.get_mut(&id) {
859 if d.status == "InProgress" {
860 d.status = "Deployed".to_string();
861 flipped = true;
862 }
863 break;
864 }
865 }
866 flipped
867 };
868 if flipped {
869 save_cloudfront_snapshot(&state, store, &lock).await;
870 }
871 });
872 }
873
874 fn get_distribution(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
875 let id = route
876 .id
877 .as_deref()
878 .ok_or_else(|| invalid_argument("missing distribution id"))?;
879 let state = self.state.read();
880 let account = state
881 .accounts
882 .get(DEFAULT_ACCOUNT)
883 .ok_or_else(|| no_such_distribution(id))?;
884 let dist = account
885 .distributions
886 .get(id)
887 .ok_or_else(|| no_such_distribution(id))?
888 .clone();
889 drop(state);
890 let body = build_distribution_xml(&dist);
891 let mut headers = HeaderMap::new();
892 set_header(&mut headers, ETAG, &dist.etag);
893 Ok(xml_response(StatusCode::OK, body, headers))
894 }
895
896 fn get_distribution_config(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
897 let id = route
898 .id
899 .as_deref()
900 .ok_or_else(|| invalid_argument("missing distribution id"))?;
901 let state = self.state.read();
902 let account = state
903 .accounts
904 .get(DEFAULT_ACCOUNT)
905 .ok_or_else(|| no_such_distribution(id))?;
906 let dist = account
907 .distributions
908 .get(id)
909 .ok_or_else(|| no_such_distribution(id))?
910 .clone();
911 drop(state);
912 let body = xml_io::to_xml_root("DistributionConfig", &dist.config)
913 .map_err(|e| internal_error(format!("xml encode failed: {e}")))?;
914 let mut headers = HeaderMap::new();
915 set_header(&mut headers, ETAG, &dist.etag);
916 Ok(xml_response(StatusCode::OK, body, headers))
917 }
918
919 fn update_distribution(
920 &self,
921 req: &AwsRequest,
922 route: &Route,
923 ) -> Result<AwsResponse, AwsServiceError> {
924 let id = route
925 .id
926 .as_deref()
927 .ok_or_else(|| invalid_argument("missing distribution id"))?;
928 let if_match = req
929 .headers
930 .get(IF_MATCH)
931 .and_then(|v| v.to_str().ok())
932 .ok_or_else(|| {
933 aws_error(
934 StatusCode::BAD_REQUEST,
935 "InvalidIfMatchVersion",
936 "Missing If-Match header for UpdateDistribution",
937 )
938 })?
939 .to_string();
940 let new_config: DistributionConfig = xml_io::from_xml_root(&req.body)
941 .map_err(|e| invalid_argument(format!("invalid DistributionConfig XML: {e}")))?;
942 validate_caller_reference(&new_config.caller_reference)?;
943 validate_origins(&new_config)?;
944
945 let mut state = self.state.write();
946 let account = state
947 .accounts
948 .get_mut(DEFAULT_ACCOUNT)
949 .ok_or_else(|| no_such_distribution(id))?;
950 let dist = account
951 .distributions
952 .get_mut(id)
953 .ok_or_else(|| no_such_distribution(id))?;
954 if dist.etag != if_match {
955 return Err(aws_error(
956 StatusCode::PRECONDITION_FAILED,
957 "PreconditionFailed",
958 "If-Match header does not match the current ETag",
959 ));
960 }
961 let config_changed = !configs_equal(&dist.config, &new_config);
966 if config_changed {
967 dist.config = new_config;
968 dist.etag = generate_etag();
969 dist.last_modified_time = Utc::now();
970 dist.status = "InProgress".to_string();
974 }
975 let snapshot = dist.clone();
976 drop(state);
977
978 if config_changed {
979 self.schedule_distribution_deploy(id.to_string());
980 }
981
982 let body = build_distribution_xml(&snapshot);
983 let mut headers = HeaderMap::new();
984 set_header(&mut headers, ETAG, &snapshot.etag);
985 Ok(xml_response(StatusCode::OK, body, headers))
986 }
987
988 fn delete_distribution(
989 &self,
990 req: &AwsRequest,
991 route: &Route,
992 ) -> Result<AwsResponse, AwsServiceError> {
993 let id = route
994 .id
995 .as_deref()
996 .ok_or_else(|| invalid_argument("missing distribution id"))?;
997 let if_match = req
998 .headers
999 .get(IF_MATCH)
1000 .and_then(|v| v.to_str().ok())
1001 .ok_or_else(|| {
1002 aws_error(
1003 StatusCode::BAD_REQUEST,
1004 "InvalidIfMatchVersion",
1005 "Missing If-Match header for DeleteDistribution",
1006 )
1007 })?
1008 .to_string();
1009 let mut state = self.state.write();
1010 let account = state
1011 .accounts
1012 .get_mut(DEFAULT_ACCOUNT)
1013 .ok_or_else(|| no_such_distribution(id))?;
1014 {
1015 let dist = account
1016 .distributions
1017 .get(id)
1018 .ok_or_else(|| no_such_distribution(id))?;
1019 if dist.etag != if_match {
1020 return Err(aws_error(
1021 StatusCode::PRECONDITION_FAILED,
1022 "PreconditionFailed",
1023 "If-Match header does not match the current ETag",
1024 ));
1025 }
1026 if dist.config.enabled {
1027 return Err(aws_error(
1028 StatusCode::PRECONDITION_FAILED,
1029 "DistributionNotDisabled",
1030 "Distribution must be disabled before delete",
1031 ));
1032 }
1033 }
1034 let removed = account.distributions.remove(id).unwrap();
1035 account.tags.remove(&removed.arn);
1036 Ok(empty_response(StatusCode::NO_CONTENT))
1037 }
1038
1039 fn list_distributions(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1040 let state = self.state.read();
1041 let mut dists: Vec<StoredDistribution> = state
1042 .accounts
1043 .values()
1044 .flat_map(|a| a.distributions.values().cloned())
1045 .collect();
1046 dists.sort_by_key(|a| a.last_modified_time);
1047 drop(state);
1048 let body = build_distribution_list_xml(&dists, "DistributionList");
1049 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1050 }
1051
1052 fn list_distributions_by(
1053 &self,
1054 req: &AwsRequest,
1055 route: &Route,
1056 action: &str,
1057 ) -> Result<AwsResponse, AwsServiceError> {
1058 match action {
1065 "ListDistributionsByCachePolicyId"
1069 | "ListDistributionsByOriginRequestPolicyId"
1070 | "ListDistributionsByResponseHeadersPolicyId"
1071 | "ListDistributionsByKeyGroup"
1072 | "ListDistributionsByWebACLId"
1073 | "ListDistributionsByVpcOriginId"
1074 | "ListDistributionsByAnycastIpListId"
1075 | "ListDistributionsByOwnedResource" => {
1076 let id = route.id.as_deref().unwrap_or("");
1077 if is_placeholder_label(id) {
1078 return Err(invalid_argument(format!(
1079 "Required URL identifier for {action} is missing or invalid"
1080 )));
1081 }
1082 }
1083 "ListDistributionsByConnectionMode" => {
1084 let id = route.id.as_deref().unwrap_or("");
1085 if is_placeholder_label(id) {
1086 return Err(invalid_argument(
1087 "ConnectionMode is required for ListDistributionsByConnectionMode",
1088 ));
1089 }
1090 if id != "direct" && id != "tenant-only" {
1091 return Err(invalid_argument(format!(
1092 "ConnectionMode must be 'direct' or 'tenant-only', got '{id}'"
1093 )));
1094 }
1095 }
1096 "ListDistributionsByConnectionFunction"
1097 if parse_query_value(&req.raw_query, "ConnectionFunctionIdentifier").is_none() =>
1098 {
1099 return Err(invalid_argument(
1100 "ConnectionFunctionIdentifier query parameter is required",
1101 ));
1102 }
1103 "ListDistributionsByTrustStore"
1104 if parse_query_value(&req.raw_query, "TrustStoreIdentifier").is_none() =>
1105 {
1106 return Err(invalid_argument(
1107 "TrustStoreIdentifier query parameter is required",
1108 ));
1109 }
1110 _ => {}
1111 }
1112
1113 let root = match action {
1118 "ListDistributionsByCachePolicyId"
1119 | "ListDistributionsByOriginRequestPolicyId"
1120 | "ListDistributionsByResponseHeadersPolicyId"
1121 | "ListDistributionsByKeyGroup"
1122 | "ListDistributionsByVpcOriginId" => "DistributionIdList",
1123 "ListDistributionsByOwnedResource" => "DistributionIdOwnerList",
1124 _ => "DistributionList",
1125 };
1126 let body = build_empty_distribution_id_list(root);
1127 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1128 }
1129
1130 fn copy_distribution(
1131 &self,
1132 req: &AwsRequest,
1133 route: &Route,
1134 ) -> Result<AwsResponse, AwsServiceError> {
1135 let primary_id = route
1136 .id
1137 .as_deref()
1138 .ok_or_else(|| invalid_argument("missing primary distribution id"))?;
1139 let if_match = req
1140 .headers
1141 .get(IF_MATCH)
1142 .and_then(|v| v.to_str().ok())
1143 .ok_or_else(|| {
1144 aws_error(
1145 StatusCode::BAD_REQUEST,
1146 "InvalidIfMatchVersion",
1147 "Missing If-Match header for CopyDistribution",
1148 )
1149 })?
1150 .to_string();
1151 let parsed: CopyDistributionRequest = xml_io::from_xml_root(&req.body)
1152 .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
1153 validate_caller_reference(&parsed.caller_reference)?;
1154 let mut state = self.state.write();
1155 let account = state
1156 .accounts
1157 .get_mut(DEFAULT_ACCOUNT)
1158 .ok_or_else(|| no_such_distribution(primary_id))?;
1159 let primary = account
1160 .distributions
1161 .get(primary_id)
1162 .ok_or_else(|| no_such_distribution(primary_id))?
1163 .clone();
1164 if primary.etag != if_match {
1165 return Err(aws_error(
1166 StatusCode::PRECONDITION_FAILED,
1167 "PreconditionFailed",
1168 "If-Match header does not match the current ETag",
1169 ));
1170 }
1171 if account
1172 .distributions
1173 .values()
1174 .any(|d| d.config.caller_reference == parsed.caller_reference)
1175 {
1176 return Err(aws_error(
1177 StatusCode::CONFLICT,
1178 "DistributionAlreadyExists",
1179 "Distribution with the same CallerReference exists",
1180 ));
1181 }
1182 let new_id = generate_distribution_id();
1183 let mut config = primary.config.clone();
1184 config.caller_reference = parsed.caller_reference;
1185 config.enabled = parsed.enabled.unwrap_or(false);
1186 config.staging = parsed.staging;
1187 let now = Utc::now();
1188 let etag = generate_etag();
1189 let arn = format!(
1190 "arn:aws:cloudfront::{}:distribution/{}",
1191 account_id(req),
1192 new_id
1193 );
1194 let stored = StoredDistribution {
1195 id: new_id.clone(),
1196 arn: arn.clone(),
1197 status: "InProgress".to_string(),
1198 last_modified_time: now,
1199 domain_name: format!("{}.cloudfront.net", new_id.to_lowercase()),
1200 in_progress_invalidation_batches: 0,
1201 etag: etag.clone(),
1202 config,
1203 };
1204 account.distributions.insert(new_id.clone(), stored.clone());
1205 drop(state);
1206 self.schedule_distribution_deploy(new_id);
1207 let body = build_distribution_xml(&stored);
1208 let mut headers = HeaderMap::new();
1209 set_header(&mut headers, ETAG, &etag);
1210 set_header(&mut headers, LOCATION, &stored.arn);
1211 Ok(xml_response(StatusCode::CREATED, body, headers))
1212 }
1213}
1214
1215#[derive(Debug, serde::Deserialize, Default)]
1216#[serde(rename_all = "PascalCase")]
1217struct CopyDistributionRequest {
1218 caller_reference: String,
1219 #[serde(default)]
1220 enabled: Option<bool>,
1221 #[serde(default)]
1222 staging: Option<bool>,
1223}
1224
1225impl CloudFrontService {
1228 fn create_invalidation(
1229 &self,
1230 req: &AwsRequest,
1231 route: &Route,
1232 ) -> Result<AwsResponse, AwsServiceError> {
1233 let dist_id = route
1234 .id
1235 .as_deref()
1236 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1237 let batch: InvalidationBatch = xml_io::from_xml_root(&req.body)
1238 .map_err(|e| invalid_argument(format!("invalid InvalidationBatch XML: {e}")))?;
1239 if batch.caller_reference.is_empty() {
1240 return Err(invalid_argument("CallerReference is required"));
1241 }
1242 if batch.paths.quantity < 1 {
1243 return Err(invalid_argument(
1244 "InvalidationBatch.Paths must be non-empty",
1245 ));
1246 }
1247 let mut state = self.state.write();
1248 let account = state.entry(DEFAULT_ACCOUNT);
1249 if !account.distributions.contains_key(dist_id) {
1250 return Err(no_such_distribution(dist_id));
1251 }
1252 let id = generate_invalidation_id();
1253 let stored = StoredInvalidation {
1254 id: id.clone(),
1255 distribution_id: dist_id.to_string(),
1256 status: "Completed".to_string(),
1257 create_time: Utc::now(),
1258 batch: batch.clone(),
1259 };
1260 account.invalidations.insert(id.clone(), stored.clone());
1261 drop(state);
1262 let body = build_invalidation_xml(&stored);
1263 let mut headers = HeaderMap::new();
1264 set_header(
1265 &mut headers,
1266 LOCATION,
1267 &format!(
1268 "/2020-05-31/distribution/{dist_id}/invalidation/{}",
1269 stored.id
1270 ),
1271 );
1272 Ok(xml_response(StatusCode::CREATED, body, headers))
1273 }
1274
1275 fn get_invalidation(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
1276 let dist_id = route
1277 .id
1278 .as_deref()
1279 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1280 let inv_id = route
1281 .second_id
1282 .as_deref()
1283 .ok_or_else(|| invalid_argument("missing invalidation id"))?;
1284 let state = self.state.read();
1285 let account = state
1286 .accounts
1287 .get(DEFAULT_ACCOUNT)
1288 .ok_or_else(|| no_such_invalidation(inv_id))?;
1289 if !account.distributions.contains_key(dist_id) {
1290 return Err(no_such_distribution(dist_id));
1291 }
1292 let inv = account
1293 .invalidations
1294 .get(inv_id)
1295 .filter(|i| i.distribution_id == dist_id)
1296 .ok_or_else(|| no_such_invalidation(inv_id))?
1297 .clone();
1298 drop(state);
1299 let body = build_invalidation_xml(&inv);
1300 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1301 }
1302
1303 fn list_invalidations(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
1304 let dist_id = route
1305 .id
1306 .as_deref()
1307 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1308 let state = self.state.read();
1309 let account = state
1310 .accounts
1311 .get(DEFAULT_ACCOUNT)
1312 .ok_or_else(|| no_such_distribution(dist_id))?;
1313 if !account.distributions.contains_key(dist_id) {
1314 return Err(no_such_distribution(dist_id));
1315 }
1316 let mut items: Vec<&StoredInvalidation> = account
1317 .invalidations
1318 .values()
1319 .filter(|i| i.distribution_id == dist_id)
1320 .collect();
1321 items.sort_by_key(|a| a.create_time);
1322 let body = build_invalidation_list_xml(&items);
1323 drop(state);
1324 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1325 }
1326}
1327
1328impl CloudFrontService {
1331 fn parse_arn_query(query: &str) -> Option<String> {
1332 for pair in query.split('&').filter(|p| !p.is_empty()) {
1333 if let Some(rest) = pair.strip_prefix("Resource=") {
1334 return Some(percent_decode(rest));
1335 }
1336 }
1337 None
1338 }
1339
1340 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1341 let arn = Self::parse_arn_query(&req.raw_query)
1342 .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1343 let parsed: ModelTags = xml_io::from_xml_root(&req.body)
1344 .map_err(|e| invalid_argument(format!("invalid Tags XML: {e}")))?;
1345 let new_tags: Vec<Tag> = parsed
1346 .items
1347 .map(|i| {
1348 i.tag
1349 .into_iter()
1350 .map(|t| Tag {
1351 key: t.key,
1352 value: t.value,
1353 })
1354 .collect()
1355 })
1356 .unwrap_or_default();
1357 let mut state = self.state.write();
1358 let account = state.entry(DEFAULT_ACCOUNT);
1359 let entry = account.tags.entry(arn).or_default();
1360 for tag in new_tags {
1361 if let Some(existing) = entry.iter_mut().find(|t| t.key == tag.key) {
1362 existing.value = tag.value;
1363 } else {
1364 entry.push(tag);
1365 }
1366 }
1367 Ok(empty_response(StatusCode::NO_CONTENT))
1368 }
1369
1370 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1371 let arn = Self::parse_arn_query(&req.raw_query)
1372 .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1373 let parsed: TagKeys = xml_io::from_xml_root(&req.body)
1374 .map_err(|e| invalid_argument(format!("invalid TagKeys XML: {e}")))?;
1375 let keys: Vec<String> = parsed.items.map(|k| k.key).unwrap_or_default();
1376 let mut state = self.state.write();
1377 let account = state.entry(DEFAULT_ACCOUNT);
1378 if let Some(existing) = account.tags.get_mut(&arn) {
1379 existing.retain(|t| !keys.contains(&t.key));
1380 }
1381 Ok(empty_response(StatusCode::NO_CONTENT))
1382 }
1383
1384 fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1385 let arn = Self::parse_arn_query(&req.raw_query)
1386 .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1387 let state = self.state.read();
1388 let tags = state
1389 .accounts
1390 .get(DEFAULT_ACCOUNT)
1391 .and_then(|a| a.tags.get(&arn))
1392 .cloned()
1393 .unwrap_or_default();
1394 drop(state);
1395 let body = build_tags_xml(&tags);
1396 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1397 }
1398}
1399
1400impl CloudFrontService {
1403 fn associate_alias(
1404 &self,
1405 req: &AwsRequest,
1406 route: &Route,
1407 ) -> Result<AwsResponse, AwsServiceError> {
1408 let id = route
1409 .id
1410 .as_deref()
1411 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1412 let alias = parse_query_value(&req.raw_query, "Alias")
1413 .ok_or_else(|| invalid_argument("Alias query parameter is required"))?;
1414 let mut state = self.state.write();
1415 let account = state
1416 .accounts
1417 .get_mut(DEFAULT_ACCOUNT)
1418 .ok_or_else(|| no_such_distribution(id))?;
1419 if let Some(other) = account.distributions.values().find(|d| {
1421 d.id != id
1422 && d.config
1423 .aliases
1424 .as_ref()
1425 .and_then(|a| a.items.as_ref())
1426 .is_some_and(|i| i.cname.iter().any(|c| c == &alias))
1427 }) {
1428 return Err(aws_error(
1429 StatusCode::CONFLICT,
1430 "CNAMEAlreadyExists",
1431 format!(
1432 "Alias {alias} is already associated with distribution {}",
1433 other.id
1434 ),
1435 ));
1436 }
1437 let dist = account
1438 .distributions
1439 .get_mut(id)
1440 .ok_or_else(|| no_such_distribution(id))?;
1441 let aliases = dist.config.aliases.get_or_insert_with(Default::default);
1442 let items = aliases
1443 .items
1444 .get_or_insert_with(crate::model::AliasItems::default);
1445 if !items.cname.iter().any(|c| c == &alias) {
1446 items.cname.push(alias.clone());
1447 aliases.quantity = items.cname.len() as i32;
1448 }
1449 dist.etag = generate_etag();
1450 dist.last_modified_time = Utc::now();
1451 Ok(empty_response(StatusCode::OK))
1452 }
1453
1454 fn list_conflicting_aliases(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1455 let alias = parse_query_value(&req.raw_query, "Alias")
1456 .ok_or_else(|| invalid_argument("Alias query parameter is required"))?;
1457 let dist_id = parse_query_value(&req.raw_query, "DistributionId")
1458 .ok_or_else(|| invalid_argument("DistributionId query parameter is required"))?;
1459 if alias.len() > 253 {
1464 return Err(invalid_argument(format!(
1465 "Alias length {} exceeds maximum 253",
1466 alias.len()
1467 )));
1468 }
1469 if dist_id.len() > 25 {
1470 return Err(invalid_argument(format!(
1471 "DistributionId length {} exceeds maximum 25",
1472 dist_id.len()
1473 )));
1474 }
1475 if let Some(max_items) = parse_query_value(&req.raw_query, "MaxItems") {
1476 let n: i64 = max_items.parse().map_err(|_| {
1477 invalid_argument(format!("MaxItems must be an integer, got '{max_items}'"))
1478 })?;
1479 if n > 100 {
1480 return Err(invalid_argument(format!(
1481 "MaxItems {n} exceeds maximum 100"
1482 )));
1483 }
1484 }
1485 let body = format!(
1488 "{XML_DECL}<ConflictingAliasesList xmlns=\"{NS}\"><Quantity>0</Quantity></ConflictingAliasesList>",
1489 NS = crate::NAMESPACE,
1490 XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1491 );
1492 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1493 }
1494
1495 fn associate_web_acl(
1496 &self,
1497 req: &AwsRequest,
1498 route: &Route,
1499 ) -> Result<AwsResponse, AwsServiceError> {
1500 let id = route
1501 .id
1502 .as_deref()
1503 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1504 let parsed: AssociateAliasRequest = xml_io::from_xml_root(&req.body)
1505 .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
1506 let mut state = self.state.write();
1507 let account = state
1508 .accounts
1509 .get_mut(DEFAULT_ACCOUNT)
1510 .ok_or_else(|| no_such_distribution(id))?;
1511 let dist = account
1512 .distributions
1513 .get_mut(id)
1514 .ok_or_else(|| no_such_distribution(id))?;
1515 dist.config.web_acl_id = Some(parsed.web_acl_arn.clone());
1516 dist.etag = generate_etag();
1517 dist.last_modified_time = Utc::now();
1518 let body = format!(
1519 "{XML_DECL}<AssociateDistributionWebACLResult xmlns=\"{NS}\"><Id>{}</Id><WebACLArn>{}</WebACLArn></AssociateDistributionWebACLResult>",
1520 esc(id), esc(&parsed.web_acl_arn),
1521 NS = crate::NAMESPACE,
1522 XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1523 );
1524 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1525 }
1526
1527 fn disassociate_web_acl(
1528 &self,
1529 _req: &AwsRequest,
1530 route: &Route,
1531 ) -> Result<AwsResponse, AwsServiceError> {
1532 let id = route
1533 .id
1534 .as_deref()
1535 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1536 let entity_not_found = || {
1539 aws_error(
1540 StatusCode::NOT_FOUND,
1541 "EntityNotFound",
1542 format!("The specified distribution does not exist: {id}"),
1543 )
1544 };
1545 let mut state = self.state.write();
1546 let account = state
1547 .accounts
1548 .get_mut(DEFAULT_ACCOUNT)
1549 .ok_or_else(entity_not_found)?;
1550 let dist = account
1551 .distributions
1552 .get_mut(id)
1553 .ok_or_else(entity_not_found)?;
1554 dist.config.web_acl_id = None;
1555 dist.etag = generate_etag();
1556 dist.last_modified_time = Utc::now();
1557 let body = format!(
1558 "{XML_DECL}<DisassociateDistributionWebACLResult xmlns=\"{NS}\"><Id>{}</Id></DisassociateDistributionWebACLResult>",
1559 esc(id),
1560 NS = crate::NAMESPACE,
1561 XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1562 );
1563 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1564 }
1565}
1566
1567#[derive(serde::Deserialize, Default, Debug)]
1568#[serde(rename_all = "PascalCase")]
1569struct AssociateAliasRequest {
1570 #[serde(rename = "WebACLArn", default)]
1571 web_acl_arn: String,
1572}
1573
1574pub(crate) fn esc(s: &str) -> String {
1582 let mut out = String::with_capacity(s.len());
1583 for c in s.chars() {
1584 match c {
1585 '&' => out.push_str("&"),
1586 '<' => out.push_str("<"),
1587 '>' => out.push_str(">"),
1588 '"' => out.push_str("""),
1589 '\'' => out.push_str("'"),
1590 _ => out.push(c),
1591 }
1592 }
1593 out
1594}
1595
1596pub(crate) fn build_distribution_xml(dist: &StoredDistribution) -> String {
1597 let mut out = String::with_capacity(2048);
1598 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1599 out.push_str(&format!(
1600 "<Distribution xmlns=\"{ns}\">",
1601 ns = crate::NAMESPACE
1602 ));
1603 out.push_str(&format!("<Id>{}</Id>", esc(&dist.id)));
1604 out.push_str(&format!("<ARN>{}</ARN>", esc(&dist.arn)));
1605 out.push_str(&format!("<Status>{}</Status>", esc(&dist.status)));
1606 out.push_str(&format!(
1607 "<LastModifiedTime>{}</LastModifiedTime>",
1608 rfc3339(&dist.last_modified_time)
1609 ));
1610 out.push_str(&format!(
1611 "<InProgressInvalidationBatches>{}</InProgressInvalidationBatches>",
1612 dist.in_progress_invalidation_batches
1613 ));
1614 out.push_str(&format!(
1615 "<DomainName>{}</DomainName>",
1616 esc(&dist.domain_name)
1617 ));
1618 out.push_str("<ActiveTrustedSigners><Enabled>false</Enabled><Quantity>0</Quantity></ActiveTrustedSigners>");
1619 out.push_str("<ActiveTrustedKeyGroups><Enabled>false</Enabled><Quantity>0</Quantity></ActiveTrustedKeyGroups>");
1620 let inner = quick_xml::se::to_string_with_root("DistributionConfig", &dist.config)
1621 .unwrap_or_else(|_| String::new());
1622 out.push_str(&inner);
1623 out.push_str("</Distribution>");
1624 out
1625}
1626
1627fn build_distribution_list_xml(dists: &[StoredDistribution], root: &str) -> String {
1628 let mut out = String::with_capacity(2048);
1629 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1630 out.push_str(&format!("<{root} xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1631 out.push_str("<Marker></Marker>");
1632 out.push_str(&format!("<MaxItems>{}</MaxItems>", dists.len().max(100)));
1633 out.push_str("<IsTruncated>false</IsTruncated>");
1634 out.push_str(&format!("<Quantity>{}</Quantity>", dists.len()));
1635 if dists.is_empty() {
1636 out.push_str(&format!("</{root}>"));
1637 return out;
1638 }
1639 out.push_str("<Items>");
1640 for d in dists {
1641 out.push_str("<DistributionSummary>");
1642 out.push_str(&format!("<Id>{}</Id>", esc(&d.id)));
1643 out.push_str(&format!("<ARN>{}</ARN>", esc(&d.arn)));
1644 out.push_str(&format!("<Status>{}</Status>", esc(&d.status)));
1645 out.push_str(&format!(
1646 "<LastModifiedTime>{}</LastModifiedTime>",
1647 rfc3339(&d.last_modified_time)
1648 ));
1649 out.push_str(&format!("<DomainName>{}</DomainName>", esc(&d.domain_name)));
1650 let aliases = d.config.aliases.clone().unwrap_or_default();
1651 out.push_str(&render_inline("Aliases", &aliases));
1652 let origins = d.config.origins.clone();
1653 out.push_str(&render_inline("Origins", &origins));
1654 let dcb = d.config.default_cache_behavior.clone();
1655 out.push_str(&render_inline("DefaultCacheBehavior", &dcb));
1656 let cb = d.config.cache_behaviors.clone().unwrap_or_default();
1657 out.push_str(&render_inline("CacheBehaviors", &cb));
1658 let cer = d.config.custom_error_responses.clone().unwrap_or_default();
1659 out.push_str(&render_inline("CustomErrorResponses", &cer));
1660 out.push_str(&format!("<Comment>{}</Comment>", esc(&d.config.comment)));
1661 out.push_str(&format!(
1662 "<PriceClass>{}</PriceClass>",
1663 esc(&d
1664 .config
1665 .price_class
1666 .clone()
1667 .unwrap_or_else(|| "PriceClass_All".to_string()))
1668 ));
1669 out.push_str(&format!("<Enabled>{}</Enabled>", d.config.enabled));
1670 out.push_str(&render_inline(
1671 "ViewerCertificate",
1672 &d.config.viewer_certificate.clone().unwrap_or_default(),
1673 ));
1674 out.push_str(&render_inline(
1675 "Restrictions",
1676 &d.config.restrictions.clone().unwrap_or_default(),
1677 ));
1678 out.push_str(&format!(
1679 "<WebACLId>{}</WebACLId>",
1680 esc(&d.config.web_acl_id.clone().unwrap_or_default())
1681 ));
1682 out.push_str(&format!(
1683 "<HttpVersion>{}</HttpVersion>",
1684 esc(&d
1685 .config
1686 .http_version
1687 .clone()
1688 .unwrap_or_else(|| "http2".to_string()))
1689 ));
1690 out.push_str(&format!(
1691 "<IsIPV6Enabled>{}</IsIPV6Enabled>",
1692 d.config.is_ipv6_enabled.unwrap_or(true)
1693 ));
1694 out.push_str("<Staging>false</Staging>");
1695 out.push_str("</DistributionSummary>");
1696 }
1697 out.push_str("</Items>");
1698 out.push_str(&format!("</{root}>"));
1699 out
1700}
1701
1702fn build_empty_distribution_id_list(root: &str) -> String {
1703 let mut out = String::with_capacity(256);
1704 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1705 out.push_str(&format!("<{root} xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1706 out.push_str("<Marker></Marker>");
1707 out.push_str("<MaxItems>100</MaxItems>");
1708 out.push_str("<IsTruncated>false</IsTruncated>");
1709 out.push_str("<Quantity>0</Quantity>");
1710 out.push_str(&format!("</{root}>"));
1711 out
1712}
1713
1714fn build_invalidation_xml(inv: &StoredInvalidation) -> String {
1715 let mut out = String::with_capacity(512);
1716 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1717 out.push_str(&format!(
1718 "<Invalidation xmlns=\"{ns}\">",
1719 ns = crate::NAMESPACE
1720 ));
1721 out.push_str(&format!("<Id>{}</Id>", esc(&inv.id)));
1722 out.push_str(&format!("<Status>{}</Status>", esc(&inv.status)));
1723 out.push_str(&format!(
1724 "<CreateTime>{}</CreateTime>",
1725 rfc3339(&inv.create_time)
1726 ));
1727 out.push_str(&render_inline("InvalidationBatch", &inv.batch));
1728 out.push_str("</Invalidation>");
1729 out
1730}
1731
1732fn build_invalidation_list_xml(items: &[&StoredInvalidation]) -> String {
1733 let mut out = String::with_capacity(1024);
1734 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1735 out.push_str(&format!(
1736 "<InvalidationList xmlns=\"{ns}\">",
1737 ns = crate::NAMESPACE
1738 ));
1739 out.push_str("<Marker></Marker>");
1740 out.push_str("<MaxItems>100</MaxItems>");
1741 out.push_str("<IsTruncated>false</IsTruncated>");
1742 out.push_str(&format!("<Quantity>{}</Quantity>", items.len()));
1743 if !items.is_empty() {
1744 out.push_str("<Items>");
1745 for inv in items {
1746 out.push_str("<InvalidationSummary>");
1747 out.push_str(&format!("<Id>{}</Id>", esc(&inv.id)));
1748 out.push_str(&format!(
1749 "<CreateTime>{}</CreateTime>",
1750 rfc3339(&inv.create_time)
1751 ));
1752 out.push_str(&format!("<Status>{}</Status>", esc(&inv.status)));
1753 out.push_str("</InvalidationSummary>");
1754 }
1755 out.push_str("</Items>");
1756 }
1757 out.push_str("</InvalidationList>");
1758 out
1759}
1760
1761fn build_tags_xml(tags: &[Tag]) -> String {
1762 let mut out = String::with_capacity(256);
1763 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1764 out.push_str(&format!("<Tags xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1765 out.push_str("<Items>");
1766 for t in tags {
1767 out.push_str("<Tag>");
1768 out.push_str(&format!("<Key>{}</Key>", esc(&t.key)));
1769 if let Some(v) = &t.value {
1770 out.push_str(&format!("<Value>{}</Value>", esc(v)));
1771 }
1772 out.push_str("</Tag>");
1773 }
1774 out.push_str("</Items>");
1775 out.push_str("</Tags>");
1776 out
1777}
1778
1779fn render_inline<T: serde::Serialize>(root: &str, value: &T) -> String {
1780 quick_xml::se::to_string_with_root(root, value).unwrap_or_default()
1781}
1782
1783fn validate_caller_reference(s: &str) -> Result<(), AwsServiceError> {
1786 if s.is_empty() {
1787 return Err(invalid_argument("CallerReference is required"));
1788 }
1789 Ok(())
1790}
1791
1792fn validate_origins(config: &DistributionConfig) -> Result<(), AwsServiceError> {
1793 if config.origins.quantity < 1 {
1794 return Err(invalid_argument(
1795 "DistributionConfig.Origins must contain at least one origin",
1796 ));
1797 }
1798 Ok(())
1799}
1800
1801fn configs_equal(lhs: &DistributionConfig, rhs: &DistributionConfig) -> bool {
1807 let Ok(a) = xml_io::to_xml_root("DistributionConfig", lhs) else {
1808 return false;
1809 };
1810 let Ok(b) = xml_io::to_xml_root("DistributionConfig", rhs) else {
1811 return false;
1812 };
1813 a == b
1814}
1815
1816fn account_id(_req: &AwsRequest) -> &'static str {
1817 DEFAULT_ACCOUNT
1822}
1823
1824fn generate_distribution_id() -> String {
1825 let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1827 format!("E{}", &raw[..13])
1828}
1829
1830fn generate_invalidation_id() -> String {
1831 let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1832 format!("I{}", &raw[..13])
1833}
1834
1835pub(crate) fn generate_etag() -> String {
1836 let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1837 format!("E{}", &raw[..13])
1838}
1839
1840pub(crate) fn generate_id_with_prefix(prefix: &str) -> String {
1845 let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1846 let suffix_len = 14usize.saturating_sub(prefix.len()).min(raw.len());
1847 format!("{prefix}{}", &raw[..suffix_len])
1848}
1849
1850fn rfc3339(t: &chrono::DateTime<chrono::Utc>) -> String {
1851 t.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
1852}
1853
1854pub(crate) fn invalid_argument(msg: impl Into<String>) -> AwsServiceError {
1855 aws_error(StatusCode::BAD_REQUEST, "InvalidArgument", msg)
1856}
1857
1858fn no_such_distribution(id: &str) -> AwsServiceError {
1859 aws_error(
1860 StatusCode::NOT_FOUND,
1861 "NoSuchDistribution",
1862 format!("The specified distribution does not exist: {id}"),
1863 )
1864}
1865
1866fn no_such_invalidation(id: &str) -> AwsServiceError {
1867 aws_error(
1868 StatusCode::NOT_FOUND,
1869 "NoSuchInvalidation",
1870 format!("The specified invalidation does not exist: {id}"),
1871 )
1872}
1873
1874fn internal_error(msg: impl Into<String>) -> AwsServiceError {
1875 aws_error(StatusCode::INTERNAL_SERVER_ERROR, "InternalError", msg)
1876}
1877
1878pub(crate) fn aws_error(
1879 status: StatusCode,
1880 code: impl Into<String>,
1881 msg: impl Into<String>,
1882) -> AwsServiceError {
1883 AwsServiceError::aws_error(status, code.into(), msg)
1884}
1885
1886fn set_header(headers: &mut HeaderMap, name: HeaderName, value: &str) {
1887 if let Ok(v) = HeaderValue::from_str(value) {
1888 headers.insert(name, v);
1889 }
1890}
1891
1892pub(crate) fn xml_response(status: StatusCode, body: String, headers: HeaderMap) -> AwsResponse {
1893 AwsResponse {
1894 status,
1895 content_type: "text/xml".to_string(),
1896 body: ResponseBody::Bytes(Bytes::from(body)),
1897 headers,
1898 }
1899}
1900
1901fn empty_response(status: StatusCode) -> AwsResponse {
1902 AwsResponse {
1903 status,
1904 content_type: "text/xml".to_string(),
1905 body: ResponseBody::Bytes(Bytes::new()),
1906 headers: HeaderMap::new(),
1907 }
1908}
1909
1910fn percent_decode(input: &str) -> String {
1911 let mut out = String::with_capacity(input.len());
1912 let bytes = input.as_bytes();
1913 let mut i = 0;
1914 while i < bytes.len() {
1915 let b = bytes[i];
1916 if b == b'%' && i + 2 < bytes.len() {
1917 if let (Some(a), Some(c)) = (hex_digit(bytes[i + 1]), hex_digit(bytes[i + 2])) {
1918 out.push(((a << 4) | c) as char);
1919 i += 3;
1920 continue;
1921 }
1922 }
1923 if b == b'+' {
1924 out.push(' ');
1925 } else {
1926 out.push(b as char);
1927 }
1928 i += 1;
1929 }
1930 out
1931}
1932
1933fn hex_digit(b: u8) -> Option<u8> {
1934 match b {
1935 b'0'..=b'9' => Some(b - b'0'),
1936 b'a'..=b'f' => Some(b - b'a' + 10),
1937 b'A'..=b'F' => Some(b - b'A' + 10),
1938 _ => None,
1939 }
1940}
1941
1942pub(crate) fn is_placeholder_label(value: &str) -> bool {
1949 if value.is_empty() {
1950 return true;
1951 }
1952 let lower = value.to_ascii_lowercase();
1953 value.starts_with('{') || lower.starts_with("%7b")
1954}
1955
1956pub(crate) fn extract_body_field(body: &[u8], key: &str) -> Option<String> {
1964 if let Ok(s) = std::str::from_utf8(body) {
1965 let trimmed = s.trim_start();
1966 if trimmed.starts_with('{') {
1967 if let Ok(v) = serde_json::from_str::<serde_json::Value>(trimmed) {
1968 if let Some(field) = v.get(key) {
1969 return match field {
1970 serde_json::Value::String(s) => Some(s.clone()),
1971 serde_json::Value::Number(n) => Some(n.to_string()),
1972 serde_json::Value::Bool(b) => Some(b.to_string()),
1973 _ => None,
1974 };
1975 }
1976 return None;
1977 }
1978 }
1979 let open = format!("<{key}>");
1981 let close = format!("</{key}>");
1982 if let Some(start) = s.find(&open) {
1983 let after = start + open.len();
1984 if let Some(end_rel) = s[after..].find(&close) {
1985 return Some(s[after..after + end_rel].to_string());
1986 }
1987 }
1988 }
1989 None
1990}
1991
1992fn parse_query_value(query: &str, key: &str) -> Option<String> {
1993 let prefix = format!("{key}=");
1994 for pair in query.split('&').filter(|p| !p.is_empty()) {
1995 if let Some(rest) = pair.strip_prefix(&prefix) {
1996 return Some(percent_decode(rest));
1997 }
1998 }
1999 None
2000}
2001
2002#[cfg(test)]
2003mod tests {
2004 use super::*;
2005
2006 #[test]
2007 fn placeholder_label_detects_braces_and_percent_encoding() {
2008 assert!(is_placeholder_label(""));
2009 assert!(is_placeholder_label("{Identifier}"));
2010 assert!(is_placeholder_label("%7BIdentifier%7D"));
2011 assert!(is_placeholder_label("%7bidentifier%7d"));
2012 assert!(!is_placeholder_label("E1234567890ABC"));
2013 assert!(!is_placeholder_label(
2014 "arn:aws:cloudfront::000:distribution/E1"
2015 ));
2016 }
2017
2018 #[test]
2019 fn extract_body_field_handles_json_and_xml() {
2020 let json = br#"{"Stage":"BROKEN","Marker":"x"}"#;
2021 assert_eq!(
2022 extract_body_field(json, "Stage"),
2023 Some("BROKEN".to_string())
2024 );
2025 assert_eq!(extract_body_field(json, "MaxItems"), None);
2026
2027 let xml = br#"<?xml version="1.0"?><Body><Domain>example.com</Domain></Body>"#;
2028 assert_eq!(
2029 extract_body_field(xml, "Domain"),
2030 Some("example.com".to_string())
2031 );
2032 assert_eq!(extract_body_field(xml, "Missing"), None);
2033
2034 assert_eq!(extract_body_field(b"", "x"), None);
2035 }
2036
2037 fn make_state() -> SharedCloudFrontState {
2038 Arc::new(RwLock::new(CloudFrontAccounts::new()))
2039 }
2040
2041 fn make_request(method: http::Method, path: &str, query: &str, body: &str) -> AwsRequest {
2042 AwsRequest {
2043 service: "cloudfront".into(),
2044 action: String::new(),
2045 region: "us-east-1".into(),
2046 account_id: DEFAULT_ACCOUNT.into(),
2047 request_id: Uuid::new_v4().to_string(),
2048 headers: HeaderMap::new(),
2049 query_params: std::collections::HashMap::new(),
2050 body_stream: parking_lot::Mutex::new(None),
2051 body: Bytes::from(body.to_string()),
2052 path_segments: path
2053 .split('/')
2054 .filter(|s| !s.is_empty())
2055 .map(String::from)
2056 .collect(),
2057 raw_path: path.into(),
2058 raw_query: query.into(),
2059 method,
2060 is_query_protocol: false,
2061 access_key_id: None,
2062 principal: None,
2063 }
2064 }
2065
2066 fn minimal_dist_config_xml(caller_ref: &str) -> String {
2067 format!(
2068 r#"<?xml version="1.0" encoding="UTF-8"?>
2069<DistributionConfig xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2070 <CallerReference>{caller_ref}</CallerReference>
2071 <Origins>
2072 <Quantity>1</Quantity>
2073 <Items>
2074 <Origin>
2075 <Id>primary</Id>
2076 <DomainName>example.com</DomainName>
2077 </Origin>
2078 </Items>
2079 </Origins>
2080 <DefaultCacheBehavior>
2081 <TargetOriginId>primary</TargetOriginId>
2082 <ViewerProtocolPolicy>allow-all</ViewerProtocolPolicy>
2083 </DefaultCacheBehavior>
2084 <Comment></Comment>
2085 <Enabled>true</Enabled>
2086</DistributionConfig>"#
2087 )
2088 }
2089
2090 #[tokio::test]
2091 async fn create_then_get_then_delete_distribution() {
2092 let svc = CloudFrontService::new(make_state());
2093 let body = minimal_dist_config_xml("ref-1");
2094 let create = svc
2095 .handle(make_request(
2096 http::Method::POST,
2097 "/2020-05-31/distribution",
2098 "",
2099 &body,
2100 ))
2101 .await
2102 .unwrap();
2103 assert_eq!(create.status, StatusCode::CREATED);
2104 let etag = create
2105 .headers
2106 .get(ETAG)
2107 .unwrap()
2108 .to_str()
2109 .unwrap()
2110 .to_string();
2111 let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2112 let id = xml
2113 .split("<Id>")
2114 .nth(1)
2115 .unwrap()
2116 .split("</Id>")
2117 .next()
2118 .unwrap()
2119 .to_string();
2120
2121 let get = svc
2122 .handle(make_request(
2123 http::Method::GET,
2124 &format!("/2020-05-31/distribution/{id}"),
2125 "",
2126 "",
2127 ))
2128 .await
2129 .unwrap();
2130 assert_eq!(get.status, StatusCode::OK);
2131
2132 let disable_body = body.replace("<Enabled>true</Enabled>", "<Enabled>false</Enabled>");
2134 let mut update_req = make_request(
2135 http::Method::PUT,
2136 &format!("/2020-05-31/distribution/{id}/config"),
2137 "",
2138 &disable_body,
2139 );
2140 update_req.headers.insert(IF_MATCH, etag.parse().unwrap());
2141 let updated = svc.handle(update_req).await.unwrap();
2142 assert_eq!(updated.status, StatusCode::OK);
2143 let new_etag = updated
2144 .headers
2145 .get(ETAG)
2146 .unwrap()
2147 .to_str()
2148 .unwrap()
2149 .to_string();
2150
2151 let mut del_req = make_request(
2152 http::Method::DELETE,
2153 &format!("/2020-05-31/distribution/{id}"),
2154 "",
2155 "",
2156 );
2157 del_req.headers.insert(IF_MATCH, new_etag.parse().unwrap());
2158 let del = svc.handle(del_req).await.unwrap();
2159 assert_eq!(del.status, StatusCode::NO_CONTENT);
2160 }
2161
2162 async fn create_distribution_returning_id(svc: &CloudFrontService, caller_ref: &str) -> String {
2163 let body = minimal_dist_config_xml(caller_ref);
2164 let create = svc
2165 .handle(make_request(
2166 http::Method::POST,
2167 "/2020-05-31/distribution",
2168 "",
2169 &body,
2170 ))
2171 .await
2172 .unwrap();
2173 let xml = std::str::from_utf8(create.body.expect_bytes())
2174 .unwrap()
2175 .to_string();
2176 xml.split("<Id>")
2177 .nth(1)
2178 .unwrap()
2179 .split("</Id>")
2180 .next()
2181 .unwrap()
2182 .to_string()
2183 }
2184
2185 fn distribution_status(svc: &CloudFrontService, id: &str) -> String {
2186 let state = svc.state.read();
2187 state
2188 .accounts
2189 .get(DEFAULT_ACCOUNT)
2190 .and_then(|a| a.distributions.get(id))
2191 .map(|d| d.status.clone())
2192 .unwrap_or_default()
2193 }
2194
2195 #[tokio::test]
2196 async fn create_distribution_starts_in_progress() {
2197 let svc = CloudFrontService::new(make_state())
2199 .with_propagation_delay(std::time::Duration::from_secs(60));
2200 let body = minimal_dist_config_xml("status-ref");
2201 let create = svc
2202 .handle(make_request(
2203 http::Method::POST,
2204 "/2020-05-31/distribution",
2205 "",
2206 &body,
2207 ))
2208 .await
2209 .unwrap();
2210 let xml = std::str::from_utf8(create.body.expect_bytes())
2211 .unwrap()
2212 .to_string();
2213 assert!(
2214 xml.contains("<Status>InProgress</Status>"),
2215 "expected initial status InProgress, got: {xml}"
2216 );
2217 }
2218
2219 #[tokio::test]
2220 async fn auto_transition_after_tick_marks_deployed() {
2221 let svc = CloudFrontService::new(make_state())
2222 .with_propagation_delay(std::time::Duration::from_millis(50));
2223 let id = create_distribution_returning_id(&svc, "auto-tick-ref").await;
2224 assert_eq!(distribution_status(&svc, &id), "InProgress");
2225 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2226 assert_eq!(distribution_status(&svc, &id), "Deployed");
2227 }
2228
2229 #[tokio::test]
2230 async fn set_distribution_status_via_admin_flips_synchronously() {
2231 let svc = CloudFrontService::new(make_state())
2232 .with_propagation_delay(std::time::Duration::from_secs(60));
2233 let id = create_distribution_returning_id(&svc, "admin-ref").await;
2234 assert_eq!(distribution_status(&svc, &id), "InProgress");
2235 assert!(svc.set_distribution_status(&id, "Deployed"));
2236 assert_eq!(distribution_status(&svc, &id), "Deployed");
2237 assert!(svc.set_distribution_status(&id, "InProgress"));
2238 assert_eq!(distribution_status(&svc, &id), "InProgress");
2239 }
2240
2241 #[tokio::test]
2242 async fn set_distribution_status_unknown_id_returns_false() {
2243 let svc = CloudFrontService::new(make_state());
2244 assert!(!svc.set_distribution_status("E-DOES-NOT-EXIST", "Deployed"));
2245 }
2246
2247 #[tokio::test]
2248 async fn update_distribution_resets_to_in_progress() {
2249 let svc = CloudFrontService::new(make_state())
2250 .with_propagation_delay(std::time::Duration::from_secs(60));
2251 let body = minimal_dist_config_xml("update-reset-ref");
2252 let create = svc
2253 .handle(make_request(
2254 http::Method::POST,
2255 "/2020-05-31/distribution",
2256 "",
2257 &body,
2258 ))
2259 .await
2260 .unwrap();
2261 let etag = create
2262 .headers
2263 .get(ETAG)
2264 .unwrap()
2265 .to_str()
2266 .unwrap()
2267 .to_string();
2268 let xml = std::str::from_utf8(create.body.expect_bytes())
2269 .unwrap()
2270 .to_string();
2271 let id = xml
2272 .split("<Id>")
2273 .nth(1)
2274 .unwrap()
2275 .split("</Id>")
2276 .next()
2277 .unwrap()
2278 .to_string();
2279 assert!(svc.set_distribution_status(&id, "Deployed"));
2282 assert_eq!(distribution_status(&svc, &id), "Deployed");
2283
2284 let updated_body = body.replace(
2285 "<ViewerProtocolPolicy>allow-all</ViewerProtocolPolicy>",
2286 "<ViewerProtocolPolicy>https-only</ViewerProtocolPolicy>",
2287 );
2288 let mut update_req = make_request(
2289 http::Method::PUT,
2290 &format!("/2020-05-31/distribution/{id}/config"),
2291 "",
2292 &updated_body,
2293 );
2294 update_req.headers.insert(IF_MATCH, etag.parse().unwrap());
2295 let updated = svc.handle(update_req).await.unwrap();
2296 assert_eq!(updated.status, StatusCode::OK);
2297 assert_eq!(distribution_status(&svc, &id), "InProgress");
2298 }
2299
2300 #[tokio::test]
2301 async fn duplicate_caller_reference_is_rejected() {
2302 let svc = CloudFrontService::new(make_state());
2303 let body = minimal_dist_config_xml("dup-ref");
2304 svc.handle(make_request(
2305 http::Method::POST,
2306 "/2020-05-31/distribution",
2307 "",
2308 &body,
2309 ))
2310 .await
2311 .unwrap();
2312 let result = svc
2313 .handle(make_request(
2314 http::Method::POST,
2315 "/2020-05-31/distribution",
2316 "",
2317 &body,
2318 ))
2319 .await;
2320 let err = match result {
2321 Ok(_) => panic!("expected duplicate caller-reference to fail"),
2322 Err(e) => e,
2323 };
2324 assert_eq!(err.code(), "DistributionAlreadyExists");
2325 assert_eq!(err.status(), StatusCode::CONFLICT);
2326 }
2327
2328 #[tokio::test]
2329 async fn invalidation_lifecycle() {
2330 let svc = CloudFrontService::new(make_state());
2331 let body = minimal_dist_config_xml("inv-ref");
2332 let create = svc
2333 .handle(make_request(
2334 http::Method::POST,
2335 "/2020-05-31/distribution",
2336 "",
2337 &body,
2338 ))
2339 .await
2340 .unwrap();
2341 let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2342 let dist_id = xml
2343 .split("<Id>")
2344 .nth(1)
2345 .unwrap()
2346 .split("</Id>")
2347 .next()
2348 .unwrap();
2349 let inv_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2350<InvalidationBatch xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2351 <Paths><Quantity>1</Quantity><Items><Path>/*</Path></Items></Paths>
2352 <CallerReference>inv-1</CallerReference>
2353</InvalidationBatch>"#;
2354 let inv_resp = svc
2355 .handle(make_request(
2356 http::Method::POST,
2357 &format!("/2020-05-31/distribution/{dist_id}/invalidation"),
2358 "",
2359 inv_body,
2360 ))
2361 .await
2362 .unwrap();
2363 assert_eq!(inv_resp.status, StatusCode::CREATED);
2364 let inv_xml = std::str::from_utf8(inv_resp.body.expect_bytes()).unwrap();
2365 let inv_id = inv_xml
2366 .split("<Id>")
2367 .nth(1)
2368 .unwrap()
2369 .split("</Id>")
2370 .next()
2371 .unwrap();
2372 let get = svc
2373 .handle(make_request(
2374 http::Method::GET,
2375 &format!("/2020-05-31/distribution/{dist_id}/invalidation/{inv_id}"),
2376 "",
2377 "",
2378 ))
2379 .await
2380 .unwrap();
2381 assert_eq!(get.status, StatusCode::OK);
2382 let list = svc
2383 .handle(make_request(
2384 http::Method::GET,
2385 &format!("/2020-05-31/distribution/{dist_id}/invalidation"),
2386 "",
2387 "",
2388 ))
2389 .await
2390 .unwrap();
2391 let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2392 assert!(xml.contains("<Quantity>1</Quantity>"));
2393 }
2394
2395 #[tokio::test]
2396 async fn tags_roundtrip() {
2397 let svc = CloudFrontService::new(make_state());
2398 let body = minimal_dist_config_xml("tag-ref");
2399 let create = svc
2400 .handle(make_request(
2401 http::Method::POST,
2402 "/2020-05-31/distribution",
2403 "",
2404 &body,
2405 ))
2406 .await
2407 .unwrap();
2408 let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2409 let arn = xml
2410 .split("<ARN>")
2411 .nth(1)
2412 .unwrap()
2413 .split("</ARN>")
2414 .next()
2415 .unwrap();
2416 let tag_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2417<Tags xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2418 <Items><Tag><Key>env</Key><Value>prod</Value></Tag></Items>
2419</Tags>"#;
2420 let arn_q = format!("Operation=Tag&Resource={}", arn);
2421 let resp = svc
2422 .handle(make_request(
2423 http::Method::POST,
2424 "/2020-05-31/tagging",
2425 &arn_q,
2426 tag_body,
2427 ))
2428 .await
2429 .unwrap();
2430 assert_eq!(resp.status, StatusCode::NO_CONTENT);
2431 let list = svc
2432 .handle(make_request(
2433 http::Method::GET,
2434 "/2020-05-31/tagging",
2435 &format!("Resource={}", arn),
2436 "",
2437 ))
2438 .await
2439 .unwrap();
2440 let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2441 assert!(xml.contains("<Key>env</Key>"));
2442 assert!(xml.contains("<Value>prod</Value>"));
2443 }
2444
2445 #[tokio::test]
2446 async fn xml_metacharacters_in_user_input_are_escaped() {
2447 let svc = CloudFrontService::new(make_state());
2448 let body = minimal_dist_config_xml("escape-ref").replace(
2449 "<Comment></Comment>",
2450 "<Comment><![CDATA[a&b<c>d]]></Comment>",
2451 );
2452 let create = svc
2453 .handle(make_request(
2454 http::Method::POST,
2455 "/2020-05-31/distribution",
2456 "",
2457 &body,
2458 ))
2459 .await
2460 .unwrap();
2461 let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2462 let dist_id = xml
2463 .split("<Id>")
2464 .nth(1)
2465 .unwrap()
2466 .split("</Id>")
2467 .next()
2468 .unwrap();
2469 let arn = xml
2470 .split("<ARN>")
2471 .nth(1)
2472 .unwrap()
2473 .split("</ARN>")
2474 .next()
2475 .unwrap();
2476
2477 let tag_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2478<Tags xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2479 <Items><Tag><Key>env</Key><Value>a&b<c>d</Value></Tag></Items>
2480</Tags>"#;
2481 let arn_q = format!("Operation=Tag&Resource={}", arn);
2482 svc.handle(make_request(
2483 http::Method::POST,
2484 "/2020-05-31/tagging",
2485 &arn_q,
2486 tag_body,
2487 ))
2488 .await
2489 .unwrap();
2490
2491 let list = svc
2492 .handle(make_request(
2493 http::Method::GET,
2494 "/2020-05-31/tagging",
2495 &format!("Resource={}", arn),
2496 "",
2497 ))
2498 .await
2499 .unwrap();
2500 let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2501 assert!(xml.contains("<Value>a&b<c>d</Value>"));
2502 assert!(!xml.contains("<Value>a&b<c>d</Value>"));
2503
2504 let list_resp = svc
2507 .handle(make_request(
2508 http::Method::GET,
2509 "/2020-05-31/distribution",
2510 "",
2511 "",
2512 ))
2513 .await
2514 .unwrap();
2515 let xml = std::str::from_utf8(list_resp.body.expect_bytes()).unwrap();
2516 assert!(!xml.contains("<Comment>a&b<c>d"));
2519 assert!(xml.contains(dist_id));
2520 }
2521}