1use std::sync::Arc;
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use chrono::Utc;
8use http::header::{HeaderName, HeaderValue, ETAG, IF_MATCH, LOCATION};
9use http::{HeaderMap, StatusCode};
10use parking_lot::RwLock;
11use tokio::sync::Mutex as AsyncMutex;
12use uuid::Uuid;
13
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError, ResponseBody};
15use fakecloud_persistence::SnapshotStore;
16
17use crate::model::{
18 DistributionConfig, DistributionConfigWithTags, InvalidationBatch, TagKeys, Tags as ModelTags,
19};
20use crate::router::{route, Route};
21use crate::state::{
22 CloudFrontAccounts, CloudFrontSnapshot, SharedCloudFrontState, StoredDistribution,
23 StoredInvalidation, Tag, CLOUDFRONT_SNAPSHOT_SCHEMA_VERSION,
24};
25use crate::xml_io;
26
27fn is_mutating_action(action: &str) -> bool {
31 const PREFIXES: &[&str] = &[
32 "Create",
33 "Update",
34 "Delete",
35 "Copy",
36 "Associate",
37 "Disassociate",
38 "Tag",
39 "Untag",
40 "Publish",
41 "Put",
42 ];
43 PREFIXES.iter().any(|p| action.starts_with(p))
44}
45
46pub(crate) const DEFAULT_ACCOUNT: &str = "000000000000";
47
48const SUPPORTED_ACTIONS: &[&str] = &[
49 "CreateDistribution",
50 "CreateDistributionWithTags",
51 "GetDistribution",
52 "GetDistributionConfig",
53 "UpdateDistribution",
54 "DeleteDistribution",
55 "ListDistributions",
56 "CopyDistribution",
57 "CreateInvalidation",
58 "GetInvalidation",
59 "ListInvalidations",
60 "TagResource",
61 "UntagResource",
62 "ListTagsForResource",
63 "AssociateAlias",
64 "ListConflictingAliases",
65 "ListDistributionsByCachePolicyId",
66 "ListDistributionsByOriginRequestPolicyId",
67 "ListDistributionsByResponseHeadersPolicyId",
68 "ListDistributionsByKeyGroup",
69 "ListDistributionsByWebACLId",
70 "ListDistributionsByVpcOriginId",
71 "ListDistributionsByAnycastIpListId",
72 "ListDistributionsByConnectionMode",
73 "ListDistributionsByConnectionFunction",
74 "ListDistributionsByOwnedResource",
75 "ListDistributionsByTrustStore",
76 "ListDistributionsByRealtimeLogConfig",
77 "AssociateDistributionWebACL",
78 "DisassociateDistributionWebACL",
79 "CreateOriginAccessControl",
80 "GetOriginAccessControl",
81 "GetOriginAccessControlConfig",
82 "UpdateOriginAccessControl",
83 "DeleteOriginAccessControl",
84 "ListOriginAccessControls",
85 "CreateCachePolicy",
86 "GetCachePolicy",
87 "GetCachePolicyConfig",
88 "UpdateCachePolicy",
89 "DeleteCachePolicy",
90 "ListCachePolicies",
91 "CreateOriginRequestPolicy",
92 "GetOriginRequestPolicy",
93 "GetOriginRequestPolicyConfig",
94 "UpdateOriginRequestPolicy",
95 "DeleteOriginRequestPolicy",
96 "ListOriginRequestPolicies",
97 "CreateResponseHeadersPolicy",
98 "GetResponseHeadersPolicy",
99 "GetResponseHeadersPolicyConfig",
100 "UpdateResponseHeadersPolicy",
101 "DeleteResponseHeadersPolicy",
102 "ListResponseHeadersPolicies",
103 "CreateContinuousDeploymentPolicy",
104 "GetContinuousDeploymentPolicy",
105 "GetContinuousDeploymentPolicyConfig",
106 "UpdateContinuousDeploymentPolicy",
107 "DeleteContinuousDeploymentPolicy",
108 "ListContinuousDeploymentPolicies",
109 "CreateFunction",
110 "DescribeFunction",
111 "GetFunction",
112 "UpdateFunction",
113 "DeleteFunction",
114 "ListFunctions",
115 "PublishFunction",
116 "TestFunction",
117 "CreatePublicKey",
118 "GetPublicKey",
119 "GetPublicKeyConfig",
120 "UpdatePublicKey",
121 "DeletePublicKey",
122 "ListPublicKeys",
123 "CreateKeyGroup",
124 "GetKeyGroup",
125 "GetKeyGroupConfig",
126 "UpdateKeyGroup",
127 "DeleteKeyGroup",
128 "ListKeyGroups",
129 "CreateKeyValueStore",
130 "DescribeKeyValueStore",
131 "UpdateKeyValueStore",
132 "DeleteKeyValueStore",
133 "ListKeyValueStores",
134 "CreateCloudFrontOriginAccessIdentity",
135 "GetCloudFrontOriginAccessIdentity",
136 "GetCloudFrontOriginAccessIdentityConfig",
137 "UpdateCloudFrontOriginAccessIdentity",
138 "DeleteCloudFrontOriginAccessIdentity",
139 "ListCloudFrontOriginAccessIdentities",
140 "CreateMonitoringSubscription",
141 "GetMonitoringSubscription",
142 "DeleteMonitoringSubscription",
143 "CreateStreamingDistribution",
144 "CreateStreamingDistributionWithTags",
145 "GetStreamingDistribution",
146 "GetStreamingDistributionConfig",
147 "UpdateStreamingDistribution",
148 "DeleteStreamingDistribution",
149 "ListStreamingDistributions",
150 "CreateFieldLevelEncryptionConfig",
151 "GetFieldLevelEncryption",
152 "GetFieldLevelEncryptionConfig",
153 "UpdateFieldLevelEncryptionConfig",
154 "DeleteFieldLevelEncryptionConfig",
155 "ListFieldLevelEncryptionConfigs",
156 "CreateFieldLevelEncryptionProfile",
157 "GetFieldLevelEncryptionProfile",
158 "GetFieldLevelEncryptionProfileConfig",
159 "UpdateFieldLevelEncryptionProfile",
160 "DeleteFieldLevelEncryptionProfile",
161 "ListFieldLevelEncryptionProfiles",
162 "CreateRealtimeLogConfig",
163 "GetRealtimeLogConfig",
164 "UpdateRealtimeLogConfig",
165 "DeleteRealtimeLogConfig",
166 "ListRealtimeLogConfigs",
167 "CreateVpcOrigin",
168 "GetVpcOrigin",
169 "UpdateVpcOrigin",
170 "DeleteVpcOrigin",
171 "ListVpcOrigins",
172 "CreateAnycastIpList",
173 "GetAnycastIpList",
174 "UpdateAnycastIpList",
175 "DeleteAnycastIpList",
176 "ListAnycastIpLists",
177 "CreateTrustStore",
178 "GetTrustStore",
179 "UpdateTrustStore",
180 "DeleteTrustStore",
181 "ListTrustStores",
182 "GetResourcePolicy",
183 "PutResourcePolicy",
184 "DeleteResourcePolicy",
185 "CreateConnectionGroup",
186 "GetConnectionGroup",
187 "GetConnectionGroupByRoutingEndpoint",
188 "UpdateConnectionGroup",
189 "DeleteConnectionGroup",
190 "ListConnectionGroups",
191 "ListDomainConflicts",
192 "UpdateDomainAssociation",
193 "VerifyDnsConfiguration",
194 "GetManagedCertificateDetails",
195 "UpdateDistributionWithStagingConfig",
196 "CreateDistributionTenant",
197 "GetDistributionTenant",
198 "GetDistributionTenantByDomain",
199 "UpdateDistributionTenant",
200 "DeleteDistributionTenant",
201 "ListDistributionTenants",
202 "ListDistributionTenantsByCustomization",
203 "AssociateDistributionTenantWebACL",
204 "DisassociateDistributionTenantWebACL",
205 "CreateInvalidationForDistributionTenant",
206 "GetInvalidationForDistributionTenant",
207 "ListInvalidationsForDistributionTenant",
208 "CreateConnectionFunction",
209 "GetConnectionFunction",
210 "DescribeConnectionFunction",
211 "UpdateConnectionFunction",
212 "DeleteConnectionFunction",
213 "ListConnectionFunctions",
214 "PublishConnectionFunction",
215 "TestConnectionFunction",
216];
217
218pub struct CloudFrontService {
219 pub(crate) state: SharedCloudFrontState,
220 pub(crate) propagation_delay: std::time::Duration,
230 pub(crate) snapshot_store: Option<Arc<dyn SnapshotStore>>,
231 pub(crate) snapshot_lock: Arc<AsyncMutex<()>>,
232}
233
234fn default_propagation_delay() -> std::time::Duration {
241 let Ok(raw) = std::env::var("FAKECLOUD_CLOUDFRONT_STATUS_DELAY_SEC") else {
242 return std::time::Duration::from_secs(1);
243 };
244 let trimmed = raw.trim();
245 if let Ok(secs) = trimmed.parse::<u64>() {
246 return std::time::Duration::from_secs(secs);
247 }
248 if let Ok(secs) = trimmed.parse::<f64>() {
249 if secs.is_finite() && secs >= 0.0 {
250 return std::time::Duration::from_secs_f64(secs);
251 }
252 }
253 std::time::Duration::from_secs(1)
254}
255
256impl CloudFrontService {
257 pub fn new(state: SharedCloudFrontState) -> Self {
258 Self {
259 state,
260 propagation_delay: default_propagation_delay(),
261 snapshot_store: None,
262 snapshot_lock: Arc::new(AsyncMutex::new(())),
263 }
264 }
265
266 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
267 self.snapshot_store = Some(store);
268 self
269 }
270
271 pub fn shared_state(&self) -> SharedCloudFrontState {
272 Arc::clone(&self.state)
273 }
274
275 async fn save_snapshot(&self) {
279 save_cloudfront_snapshot(
280 &self.state,
281 self.snapshot_store.clone(),
282 &self.snapshot_lock,
283 )
284 .await;
285 }
286
287 pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
292 let store = self.snapshot_store.clone()?;
293 let state = self.state.clone();
294 let lock = self.snapshot_lock.clone();
295 Some(Arc::new(move || {
296 let state = state.clone();
297 let store = store.clone();
298 let lock = lock.clone();
299 Box::pin(async move {
300 save_cloudfront_snapshot(&state, Some(store), &lock).await;
301 })
302 }))
303 }
304
305 pub fn rearm_in_progress(&self) {
310 let (dists, tenants, groups, streaming) = {
311 let s = self.state.read();
312 let mut dists = Vec::new();
313 let mut tenants = Vec::new();
314 let mut groups = Vec::new();
315 let mut streaming = Vec::new();
316 for account in s.accounts.values() {
317 for (id, d) in &account.distributions {
318 if d.status == "InProgress" {
319 dists.push(id.clone());
320 }
321 }
322 for (id, t) in &account.distribution_tenants {
323 if t.status == "InProgress" {
324 tenants.push(id.clone());
325 }
326 }
327 for (id, g) in &account.connection_groups {
328 if g.status == "InProgress" {
329 groups.push(id.clone());
330 }
331 }
332 for (id, d) in &account.streaming_distributions {
333 if d.status == "InProgress" {
334 streaming.push(id.clone());
335 }
336 }
337 }
338 (dists, tenants, groups, streaming)
339 };
340 for id in dists {
341 self.schedule_distribution_deploy(id);
342 }
343 for id in tenants {
344 self.schedule_distribution_tenant_deploy(id);
345 }
346 for id in groups {
347 self.schedule_connection_group_deploy(id);
348 }
349 for id in streaming {
350 self.schedule_streaming_distribution_deploy(id);
351 }
352 }
353
354 pub fn with_propagation_delay(mut self, delay: std::time::Duration) -> Self {
360 self.propagation_delay = delay;
361 self
362 }
363
364 pub fn set_distribution_status(&self, id: &str, status: &str) -> bool {
370 let mut state = self.state.write();
371 for account in state.accounts.values_mut() {
372 if let Some(dist) = account.distributions.get_mut(id) {
373 dist.status = status.to_string();
374 return true;
375 }
376 }
377 false
378 }
379
380 pub async fn set_distribution_status_persistent(&self, id: &str, status: &str) -> bool {
383 let changed = self.set_distribution_status(id, status);
384 if changed {
385 self.save_snapshot().await;
386 }
387 changed
388 }
389}
390
391pub async fn save_cloudfront_snapshot(
397 state: &SharedCloudFrontState,
398 store: Option<Arc<dyn SnapshotStore>>,
399 lock: &AsyncMutex<()>,
400) {
401 let Some(store) = store else {
402 return;
403 };
404 let _guard = lock.lock().await;
405 let snapshot = CloudFrontSnapshot {
406 schema_version: CLOUDFRONT_SNAPSHOT_SCHEMA_VERSION,
407 accounts: Some(state.read().clone()),
408 };
409 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
410 let bytes = serde_json::to_vec(&snapshot)
411 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
412 store.save(&bytes)
413 })
414 .await;
415 match join {
416 Ok(Ok(())) => {}
417 Ok(Err(err)) => tracing::error!(%err, "failed to write cloudfront snapshot"),
418 Err(err) => tracing::error!(%err, "cloudfront snapshot task panicked"),
419 }
420}
421
422impl Default for CloudFrontService {
423 fn default() -> Self {
424 Self::new(Arc::new(RwLock::new(CloudFrontAccounts::new())))
425 }
426}
427
428#[async_trait]
429impl AwsService for CloudFrontService {
430 fn service_name(&self) -> &str {
431 "cloudfront"
432 }
433
434 fn supported_actions(&self) -> &[&str] {
435 SUPPORTED_ACTIONS
436 }
437
438 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
439 let resolved = match route(&req.method, &req.raw_path, &req.raw_query) {
440 Some(r) => r,
441 None => {
442 return Err(aws_error(
443 StatusCode::NOT_FOUND,
444 "InvalidArgument",
445 format!("Unknown CloudFront route: {} {}", req.method, req.raw_path),
446 ));
447 }
448 };
449
450 let mutates = is_mutating_action(resolved.action);
451 let result = match resolved.action {
452 "CreateDistribution" => self.create_distribution(&req, false),
453 "CreateDistributionWithTags" => self.create_distribution(&req, true),
454 "GetDistribution" => self.get_distribution(&resolved),
455 "GetDistributionConfig" => self.get_distribution_config(&resolved),
456 "UpdateDistribution" => self.update_distribution(&req, &resolved),
457 "DeleteDistribution" => self.delete_distribution(&req, &resolved),
458 "ListDistributions" => self.list_distributions(&req),
459 "CopyDistribution" => self.copy_distribution(&req, &resolved),
460 "CreateInvalidation" => self.create_invalidation(&req, &resolved),
461 "GetInvalidation" => self.get_invalidation(&resolved),
462 "ListInvalidations" => self.list_invalidations(&resolved),
463 "TagResource" => self.tag_resource(&req),
464 "UntagResource" => self.untag_resource(&req),
465 "ListTagsForResource" => self.list_tags_for_resource(&req),
466 "AssociateAlias" => self.associate_alias(&req, &resolved),
467 "ListConflictingAliases" => self.list_conflicting_aliases(&req),
468 "AssociateDistributionWebACL" => self.associate_web_acl(&req, &resolved),
469 "DisassociateDistributionWebACL" => self.disassociate_web_acl(&req, &resolved),
470 "ListDistributionsByCachePolicyId"
471 | "ListDistributionsByOriginRequestPolicyId"
472 | "ListDistributionsByResponseHeadersPolicyId"
473 | "ListDistributionsByKeyGroup"
474 | "ListDistributionsByWebACLId"
475 | "ListDistributionsByVpcOriginId"
476 | "ListDistributionsByAnycastIpListId"
477 | "ListDistributionsByConnectionMode"
478 | "ListDistributionsByConnectionFunction"
479 | "ListDistributionsByOwnedResource"
480 | "ListDistributionsByTrustStore"
481 | "ListDistributionsByRealtimeLogConfig" => {
482 self.list_distributions_by(&req, &resolved, resolved.action)
483 }
484 "CreateOriginAccessControl" => self.create_origin_access_control(&req),
485 "GetOriginAccessControl" => self.get_origin_access_control(&resolved),
486 "GetOriginAccessControlConfig" => self.get_origin_access_control_config(&resolved),
487 "UpdateOriginAccessControl" => self.update_origin_access_control(&req, &resolved),
488 "DeleteOriginAccessControl" => self.delete_origin_access_control(&req, &resolved),
489 "ListOriginAccessControls" => self.list_origin_access_controls(&req),
490 "CreateCachePolicy" => self.create_cache_policy(&req),
491 "GetCachePolicy" => self.get_cache_policy(&resolved),
492 "GetCachePolicyConfig" => self.get_cache_policy_config(&resolved),
493 "UpdateCachePolicy" => self.update_cache_policy(&req, &resolved),
494 "DeleteCachePolicy" => self.delete_cache_policy(&req, &resolved),
495 "ListCachePolicies" => self.list_cache_policies(&req),
496 "CreateOriginRequestPolicy" => self.create_origin_request_policy(&req),
497 "GetOriginRequestPolicy" => self.get_origin_request_policy(&resolved),
498 "GetOriginRequestPolicyConfig" => self.get_origin_request_policy_config(&resolved),
499 "UpdateOriginRequestPolicy" => self.update_origin_request_policy(&req, &resolved),
500 "DeleteOriginRequestPolicy" => self.delete_origin_request_policy(&req, &resolved),
501 "ListOriginRequestPolicies" => self.list_origin_request_policies(&req),
502 "CreateResponseHeadersPolicy" => self.create_response_headers_policy(&req),
503 "GetResponseHeadersPolicy" => self.get_response_headers_policy(&resolved),
504 "GetResponseHeadersPolicyConfig" => self.get_response_headers_policy_config(&resolved),
505 "UpdateResponseHeadersPolicy" => self.update_response_headers_policy(&req, &resolved),
506 "DeleteResponseHeadersPolicy" => self.delete_response_headers_policy(&req, &resolved),
507 "ListResponseHeadersPolicies" => self.list_response_headers_policies(&req),
508 "CreateContinuousDeploymentPolicy" => self.create_continuous_deployment_policy(&req),
509 "GetContinuousDeploymentPolicy" => self.get_continuous_deployment_policy(&resolved),
510 "GetContinuousDeploymentPolicyConfig" => {
511 self.get_continuous_deployment_policy_config(&resolved)
512 }
513 "UpdateContinuousDeploymentPolicy" => {
514 self.update_continuous_deployment_policy(&req, &resolved)
515 }
516 "DeleteContinuousDeploymentPolicy" => {
517 self.delete_continuous_deployment_policy(&req, &resolved)
518 }
519 "ListContinuousDeploymentPolicies" => self.list_continuous_deployment_policies(&req),
520 "CreateFunction" => self.create_function(&req),
521 "DescribeFunction" => self.describe_function(&req, &resolved),
522 "GetFunction" => self.get_function(&req, &resolved),
523 "UpdateFunction" => self.update_function(&req, &resolved),
524 "DeleteFunction" => self.delete_function(&req, &resolved),
525 "ListFunctions" => self.list_functions(&req),
526 "PublishFunction" => self.publish_function(&req, &resolved),
527 "TestFunction" => self.test_function(&req, &resolved),
528 "CreatePublicKey" => self.create_public_key(&req),
529 "GetPublicKey" => self.get_public_key(&resolved),
530 "GetPublicKeyConfig" => self.get_public_key_config(&resolved),
531 "UpdatePublicKey" => self.update_public_key(&req, &resolved),
532 "DeletePublicKey" => self.delete_public_key(&req, &resolved),
533 "ListPublicKeys" => self.list_public_keys(&req),
534 "CreateKeyGroup" => self.create_key_group(&req),
535 "GetKeyGroup" => self.get_key_group(&resolved),
536 "GetKeyGroupConfig" => self.get_key_group_config(&resolved),
537 "UpdateKeyGroup" => self.update_key_group(&req, &resolved),
538 "DeleteKeyGroup" => self.delete_key_group(&req, &resolved),
539 "ListKeyGroups" => self.list_key_groups(&req),
540 "CreateKeyValueStore" => self.create_key_value_store(&req),
541 "DescribeKeyValueStore" => self.describe_key_value_store(&resolved),
542 "UpdateKeyValueStore" => self.update_key_value_store(&req, &resolved),
543 "DeleteKeyValueStore" => self.delete_key_value_store(&req, &resolved),
544 "ListKeyValueStores" => self.list_key_value_stores(&req),
545 "CreateCloudFrontOriginAccessIdentity" => self.create_oai(&req),
546 "GetCloudFrontOriginAccessIdentity" => self.get_oai(&resolved),
547 "GetCloudFrontOriginAccessIdentityConfig" => self.get_oai_config(&resolved),
548 "UpdateCloudFrontOriginAccessIdentity" => self.update_oai(&req, &resolved),
549 "DeleteCloudFrontOriginAccessIdentity" => self.delete_oai(&req, &resolved),
550 "ListCloudFrontOriginAccessIdentities" => self.list_oai(&req),
551 "CreateMonitoringSubscription" => self.create_monitoring_subscription(&req, &resolved),
552 "GetMonitoringSubscription" => self.get_monitoring_subscription(&resolved),
553 "DeleteMonitoringSubscription" => self.delete_monitoring_subscription(&resolved),
554 "CreateStreamingDistribution" => self.create_streaming_distribution(&req, false),
555 "CreateStreamingDistributionWithTags" => self.create_streaming_distribution(&req, true),
556 "GetStreamingDistribution" => self.get_streaming_distribution(&resolved),
557 "GetStreamingDistributionConfig" => self.get_streaming_distribution_config(&resolved),
558 "UpdateStreamingDistribution" => self.update_streaming_distribution(&req, &resolved),
559 "DeleteStreamingDistribution" => self.delete_streaming_distribution(&req, &resolved),
560 "ListStreamingDistributions" => self.list_streaming_distributions(&req),
561 "CreateFieldLevelEncryptionConfig" => self.create_field_level_encryption_config(&req),
562 "GetFieldLevelEncryption" => self.get_field_level_encryption(&resolved),
563 "GetFieldLevelEncryptionConfig" => self.get_field_level_encryption_config(&resolved),
564 "UpdateFieldLevelEncryptionConfig" => {
565 self.update_field_level_encryption_config(&req, &resolved)
566 }
567 "DeleteFieldLevelEncryptionConfig" => {
568 self.delete_field_level_encryption_config(&req, &resolved)
569 }
570 "ListFieldLevelEncryptionConfigs" => self.list_field_level_encryption_configs(&req),
571 "CreateFieldLevelEncryptionProfile" => self.create_field_level_encryption_profile(&req),
572 "GetFieldLevelEncryptionProfile" => self.get_field_level_encryption_profile(&resolved),
573 "GetFieldLevelEncryptionProfileConfig" => {
574 self.get_field_level_encryption_profile_config(&resolved)
575 }
576 "UpdateFieldLevelEncryptionProfile" => {
577 self.update_field_level_encryption_profile(&req, &resolved)
578 }
579 "DeleteFieldLevelEncryptionProfile" => {
580 self.delete_field_level_encryption_profile(&req, &resolved)
581 }
582 "ListFieldLevelEncryptionProfiles" => self.list_field_level_encryption_profiles(&req),
583 "CreateRealtimeLogConfig" => self.create_realtime_log_config(&req),
584 "GetRealtimeLogConfig" => self.get_realtime_log_config(&req),
585 "UpdateRealtimeLogConfig" => self.update_realtime_log_config(&req),
586 "DeleteRealtimeLogConfig" => self.delete_realtime_log_config(&req),
587 "ListRealtimeLogConfigs" => self.list_realtime_log_configs(&req),
588 "CreateVpcOrigin" => self.create_vpc_origin(&req),
589 "GetVpcOrigin" => self.get_vpc_origin(&resolved),
590 "UpdateVpcOrigin" => self.update_vpc_origin(&req, &resolved),
591 "DeleteVpcOrigin" => self.delete_vpc_origin(&req, &resolved),
592 "ListVpcOrigins" => self.list_vpc_origins(&req),
593 "CreateAnycastIpList" => self.create_anycast_ip_list(&req),
594 "GetAnycastIpList" => self.get_anycast_ip_list(&resolved),
595 "UpdateAnycastIpList" => self.update_anycast_ip_list(&req, &resolved),
596 "DeleteAnycastIpList" => self.delete_anycast_ip_list(&req, &resolved),
597 "ListAnycastIpLists" => self.list_anycast_ip_lists(&req),
598 "CreateTrustStore" => self.create_trust_store(&req),
599 "GetTrustStore" => self.get_trust_store(&resolved),
600 "UpdateTrustStore" => self.update_trust_store(&req, &resolved),
601 "DeleteTrustStore" => self.delete_trust_store(&req, &resolved),
602 "ListTrustStores" => self.list_trust_stores(&req),
603 "GetResourcePolicy" => self.get_resource_policy(&req),
604 "PutResourcePolicy" => self.put_resource_policy(&req),
605 "DeleteResourcePolicy" => self.delete_resource_policy(&req),
606 "CreateConnectionGroup" => self.create_connection_group(&req),
607 "GetConnectionGroup" => self.get_connection_group(&resolved),
608 "GetConnectionGroupByRoutingEndpoint" => {
609 self.get_connection_group_by_routing_endpoint(&req)
610 }
611 "UpdateConnectionGroup" => self.update_connection_group(&req, &resolved),
612 "DeleteConnectionGroup" => self.delete_connection_group(&req, &resolved),
613 "ListConnectionGroups" => self.list_connection_groups(&req),
614 "ListDomainConflicts" => self.list_domain_conflicts(&req),
615 "UpdateDomainAssociation" => self.update_domain_association(&req),
616 "VerifyDnsConfiguration" => self.verify_dns_configuration(&req),
617 "GetManagedCertificateDetails" => self.get_managed_certificate_details(&resolved),
618 "UpdateDistributionWithStagingConfig" => {
619 self.update_distribution_with_staging_config(&req, &resolved)
620 }
621 "CreateDistributionTenant" => self.create_distribution_tenant(&req),
622 "GetDistributionTenant" => self.get_distribution_tenant(&resolved),
623 "GetDistributionTenantByDomain" => self.get_distribution_tenant_by_domain(&req),
624 "UpdateDistributionTenant" => self.update_distribution_tenant(&req, &resolved),
625 "DeleteDistributionTenant" => self.delete_distribution_tenant(&req, &resolved),
626 "ListDistributionTenants" => self.list_distribution_tenants(&req),
627 "ListDistributionTenantsByCustomization" => {
628 self.list_distribution_tenants_by_customization(&req)
629 }
630 "AssociateDistributionTenantWebACL" => {
631 self.associate_distribution_tenant_web_acl(&req, &resolved)
632 }
633 "DisassociateDistributionTenantWebACL" => {
634 self.disassociate_distribution_tenant_web_acl(&req, &resolved)
635 }
636 "CreateInvalidationForDistributionTenant" => {
637 self.create_invalidation_for_distribution_tenant(&req, &resolved)
638 }
639 "GetInvalidationForDistributionTenant" => {
640 self.get_invalidation_for_distribution_tenant(&resolved)
641 }
642 "ListInvalidationsForDistributionTenant" => {
643 self.list_invalidations_for_distribution_tenant(&resolved)
644 }
645 "CreateConnectionFunction" => self.create_connection_function(&req),
646 "GetConnectionFunction" => self.get_connection_function(&resolved),
647 "DescribeConnectionFunction" => self.describe_connection_function(&resolved),
648 "UpdateConnectionFunction" => self.update_connection_function(&req, &resolved),
649 "DeleteConnectionFunction" => self.delete_connection_function(&req, &resolved),
650 "ListConnectionFunctions" => self.list_connection_functions(&req),
651 "PublishConnectionFunction" => self.publish_connection_function(&req, &resolved),
652 "TestConnectionFunction" => self.test_connection_function(&req, &resolved),
653 other => Err(aws_error(
654 StatusCode::NOT_IMPLEMENTED,
655 "InvalidAction",
656 format!("CloudFront action {other} is not implemented yet"),
657 )),
658 };
659 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
660 self.save_snapshot().await;
661 }
662 result
663 }
664}
665
666impl CloudFrontService {
669 fn create_distribution(
670 &self,
671 req: &AwsRequest,
672 with_tags: bool,
673 ) -> Result<AwsResponse, AwsServiceError> {
674 let (config, tags) = if with_tags {
675 let parsed: DistributionConfigWithTags = xml_io::from_xml_root(&req.body)
676 .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
677 let tags = parsed
678 .tags
679 .items
680 .map(|i| {
681 i.tag
682 .into_iter()
683 .map(|t| Tag {
684 key: t.key,
685 value: t.value,
686 })
687 .collect()
688 })
689 .unwrap_or_default();
690 (parsed.distribution_config, tags)
691 } else {
692 let parsed: DistributionConfig = xml_io::from_xml_root(&req.body)
693 .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
694 (parsed, Vec::new())
695 };
696
697 validate_caller_reference(&config.caller_reference)?;
698 validate_origins(&config)?;
699
700 let mut state = self.state.write();
701 let account = state.entry(account_id(req));
702
703 if let Some(existing) = account
704 .distributions
705 .values()
706 .find(|d| d.config.caller_reference == config.caller_reference)
707 {
708 return Err(aws_error(
709 StatusCode::CONFLICT,
710 "DistributionAlreadyExists",
711 format!(
712 "Distribution with the same CallerReference exists: {}",
713 existing.id
714 ),
715 ));
716 }
717
718 let id = generate_distribution_id();
719 let now = Utc::now();
720 let etag = generate_etag();
721 let domain = format!("{}.cloudfront.net", id.to_lowercase());
722 let arn = format!(
723 "arn:aws:cloudfront::{}:distribution/{}",
724 account_id(req),
725 id
726 );
727
728 let stored = StoredDistribution {
729 id: id.clone(),
730 arn: arn.clone(),
731 status: "InProgress".to_string(),
735 last_modified_time: now,
736 domain_name: domain,
737 in_progress_invalidation_batches: 0,
738 etag: etag.clone(),
739 config,
740 };
741 account.distributions.insert(id.clone(), stored.clone());
742 if !tags.is_empty() {
743 account.tags.insert(arn.clone(), tags);
744 }
745 drop(state);
746
747 self.schedule_distribution_deploy(id.clone());
748
749 let body = build_distribution_xml(&stored);
750 let mut headers = HeaderMap::new();
751 set_header(&mut headers, ETAG, &etag);
752 set_header(&mut headers, LOCATION, &stored.arn);
753 Ok(xml_response(StatusCode::CREATED, body, headers))
754 }
755
756 fn schedule_distribution_deploy(&self, id: String) {
761 let state = Arc::clone(&self.state);
762 let delay = self.propagation_delay;
763 let store = self.snapshot_store.clone();
764 let lock = self.snapshot_lock.clone();
765 tokio::spawn(async move {
766 tokio::time::sleep(delay).await;
767 let flipped = {
768 let mut s = state.write();
769 let mut flipped = false;
770 for account in s.accounts.values_mut() {
771 if let Some(d) = account.distributions.get_mut(&id) {
772 if d.status == "InProgress" {
773 d.status = "Deployed".to_string();
774 flipped = true;
775 }
776 break;
777 }
778 }
779 flipped
780 };
781 if flipped {
782 save_cloudfront_snapshot(&state, store, &lock).await;
783 }
784 });
785 }
786
787 pub(crate) fn schedule_distribution_tenant_deploy(&self, id: String) {
789 let state = Arc::clone(&self.state);
790 let delay = self.propagation_delay;
791 let store = self.snapshot_store.clone();
792 let lock = self.snapshot_lock.clone();
793 tokio::spawn(async move {
794 tokio::time::sleep(delay).await;
795 let flipped = {
796 let mut s = state.write();
797 let mut flipped = false;
798 for account in s.accounts.values_mut() {
799 if let Some(t) = account.distribution_tenants.get_mut(&id) {
800 if t.status == "InProgress" {
801 t.status = "Deployed".to_string();
802 flipped = true;
803 }
804 break;
805 }
806 }
807 flipped
808 };
809 if flipped {
810 save_cloudfront_snapshot(&state, store, &lock).await;
811 }
812 });
813 }
814
815 pub(crate) fn schedule_connection_group_deploy(&self, id: String) {
817 let state = Arc::clone(&self.state);
818 let delay = self.propagation_delay;
819 let store = self.snapshot_store.clone();
820 let lock = self.snapshot_lock.clone();
821 tokio::spawn(async move {
822 tokio::time::sleep(delay).await;
823 let flipped = {
824 let mut s = state.write();
825 let mut flipped = false;
826 for account in s.accounts.values_mut() {
827 if let Some(g) = account.connection_groups.get_mut(&id) {
828 if g.status == "InProgress" {
829 g.status = "Deployed".to_string();
830 flipped = true;
831 }
832 break;
833 }
834 }
835 flipped
836 };
837 if flipped {
838 save_cloudfront_snapshot(&state, store, &lock).await;
839 }
840 });
841 }
842
843 pub(crate) fn schedule_streaming_distribution_deploy(&self, id: String) {
848 let state = Arc::clone(&self.state);
849 let delay = self.propagation_delay;
850 let store = self.snapshot_store.clone();
851 let lock = self.snapshot_lock.clone();
852 tokio::spawn(async move {
853 tokio::time::sleep(delay).await;
854 let flipped = {
855 let mut s = state.write();
856 let mut flipped = false;
857 for account in s.accounts.values_mut() {
858 if let Some(d) = account.streaming_distributions.get_mut(&id) {
859 if d.status == "InProgress" {
860 d.status = "Deployed".to_string();
861 flipped = true;
862 }
863 break;
864 }
865 }
866 flipped
867 };
868 if flipped {
869 save_cloudfront_snapshot(&state, store, &lock).await;
870 }
871 });
872 }
873
874 fn get_distribution(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
875 let id = route
876 .id
877 .as_deref()
878 .ok_or_else(|| invalid_argument("missing distribution id"))?;
879 let state = self.state.read();
880 let account = state
881 .accounts
882 .get(DEFAULT_ACCOUNT)
883 .ok_or_else(|| no_such_distribution(id))?;
884 let dist = account
885 .distributions
886 .get(id)
887 .ok_or_else(|| no_such_distribution(id))?
888 .clone();
889 drop(state);
890 let body = build_distribution_xml(&dist);
891 let mut headers = HeaderMap::new();
892 set_header(&mut headers, ETAG, &dist.etag);
893 Ok(xml_response(StatusCode::OK, body, headers))
894 }
895
896 fn get_distribution_config(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
897 let id = route
898 .id
899 .as_deref()
900 .ok_or_else(|| invalid_argument("missing distribution id"))?;
901 let state = self.state.read();
902 let account = state
903 .accounts
904 .get(DEFAULT_ACCOUNT)
905 .ok_or_else(|| no_such_distribution(id))?;
906 let dist = account
907 .distributions
908 .get(id)
909 .ok_or_else(|| no_such_distribution(id))?
910 .clone();
911 drop(state);
912 let body = xml_io::to_xml_root("DistributionConfig", &dist.config)
913 .map_err(|e| internal_error(format!("xml encode failed: {e}")))?;
914 let mut headers = HeaderMap::new();
915 set_header(&mut headers, ETAG, &dist.etag);
916 Ok(xml_response(StatusCode::OK, body, headers))
917 }
918
919 fn update_distribution(
920 &self,
921 req: &AwsRequest,
922 route: &Route,
923 ) -> Result<AwsResponse, AwsServiceError> {
924 let id = route
925 .id
926 .as_deref()
927 .ok_or_else(|| invalid_argument("missing distribution id"))?;
928 let if_match = req
929 .headers
930 .get(IF_MATCH)
931 .and_then(|v| v.to_str().ok())
932 .ok_or_else(|| {
933 aws_error(
934 StatusCode::BAD_REQUEST,
935 "InvalidIfMatchVersion",
936 "Missing If-Match header for UpdateDistribution",
937 )
938 })?
939 .to_string();
940 let new_config: DistributionConfig = xml_io::from_xml_root(&req.body)
941 .map_err(|e| invalid_argument(format!("invalid DistributionConfig XML: {e}")))?;
942 validate_caller_reference(&new_config.caller_reference)?;
943 validate_origins(&new_config)?;
944
945 let mut state = self.state.write();
946 let account = state
947 .accounts
948 .get_mut(DEFAULT_ACCOUNT)
949 .ok_or_else(|| no_such_distribution(id))?;
950 let dist = account
951 .distributions
952 .get_mut(id)
953 .ok_or_else(|| no_such_distribution(id))?;
954 if dist.etag != if_match {
955 return Err(aws_error(
956 StatusCode::PRECONDITION_FAILED,
957 "PreconditionFailed",
958 "If-Match header does not match the current ETag",
959 ));
960 }
961 let config_changed = !configs_equal(&dist.config, &new_config);
966 if config_changed {
967 dist.config = new_config;
968 dist.etag = generate_etag();
969 dist.last_modified_time = Utc::now();
970 dist.status = "InProgress".to_string();
974 }
975 let snapshot = dist.clone();
976 drop(state);
977
978 if config_changed {
979 self.schedule_distribution_deploy(id.to_string());
980 }
981
982 let body = build_distribution_xml(&snapshot);
983 let mut headers = HeaderMap::new();
984 set_header(&mut headers, ETAG, &snapshot.etag);
985 Ok(xml_response(StatusCode::OK, body, headers))
986 }
987
988 fn delete_distribution(
989 &self,
990 req: &AwsRequest,
991 route: &Route,
992 ) -> Result<AwsResponse, AwsServiceError> {
993 let id = route
994 .id
995 .as_deref()
996 .ok_or_else(|| invalid_argument("missing distribution id"))?;
997 let if_match = req
998 .headers
999 .get(IF_MATCH)
1000 .and_then(|v| v.to_str().ok())
1001 .ok_or_else(|| {
1002 aws_error(
1003 StatusCode::BAD_REQUEST,
1004 "InvalidIfMatchVersion",
1005 "Missing If-Match header for DeleteDistribution",
1006 )
1007 })?
1008 .to_string();
1009 let mut state = self.state.write();
1010 let account = state
1011 .accounts
1012 .get_mut(DEFAULT_ACCOUNT)
1013 .ok_or_else(|| no_such_distribution(id))?;
1014 {
1015 let dist = account
1016 .distributions
1017 .get(id)
1018 .ok_or_else(|| no_such_distribution(id))?;
1019 if dist.etag != if_match {
1020 return Err(aws_error(
1021 StatusCode::PRECONDITION_FAILED,
1022 "PreconditionFailed",
1023 "If-Match header does not match the current ETag",
1024 ));
1025 }
1026 if dist.config.enabled {
1027 return Err(aws_error(
1028 StatusCode::PRECONDITION_FAILED,
1029 "DistributionNotDisabled",
1030 "Distribution must be disabled before delete",
1031 ));
1032 }
1033 }
1034 let removed = account.distributions.remove(id).unwrap();
1035 account.tags.remove(&removed.arn);
1036 Ok(empty_response(StatusCode::NO_CONTENT))
1037 }
1038
1039 fn list_distributions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1040 let state = self.state.read();
1041 let mut dists: Vec<StoredDistribution> = state
1042 .accounts
1043 .values()
1044 .flat_map(|a| a.distributions.values().cloned())
1045 .collect();
1046 dists.sort_by_key(|a| a.last_modified_time);
1047 drop(state);
1048
1049 let max_items = req
1051 .query_params
1052 .get("MaxItems")
1053 .or_else(|| req.query_params.get("maxitems"))
1054 .and_then(|v| v.parse::<usize>().ok())
1055 .filter(|n| *n > 0)
1056 .unwrap_or(100);
1057 let marker = req
1058 .query_params
1059 .get("Marker")
1060 .or_else(|| req.query_params.get("marker"));
1061 let start_idx = match marker {
1062 Some(m) if !m.is_empty() => {
1063 dists.iter().position(|d| &d.id == m).unwrap_or(dists.len())
1064 }
1065 _ => 0,
1066 };
1067 let page: Vec<StoredDistribution> = dists
1068 .iter()
1069 .skip(start_idx)
1070 .take(max_items)
1071 .cloned()
1072 .collect();
1073 let next_marker = dists.get(start_idx + page.len()).map(|d| d.id.clone());
1074
1075 let body = build_distribution_list_xml(
1076 &page,
1077 "DistributionList",
1078 marker.map(String::as_str).unwrap_or(""),
1079 max_items,
1080 next_marker.as_deref(),
1081 );
1082 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1083 }
1084
1085 fn list_distributions_by(
1086 &self,
1087 req: &AwsRequest,
1088 route: &Route,
1089 action: &str,
1090 ) -> Result<AwsResponse, AwsServiceError> {
1091 match action {
1098 "ListDistributionsByCachePolicyId"
1102 | "ListDistributionsByOriginRequestPolicyId"
1103 | "ListDistributionsByResponseHeadersPolicyId"
1104 | "ListDistributionsByKeyGroup"
1105 | "ListDistributionsByWebACLId"
1106 | "ListDistributionsByVpcOriginId"
1107 | "ListDistributionsByAnycastIpListId"
1108 | "ListDistributionsByOwnedResource" => {
1109 let id = route.id.as_deref().unwrap_or("");
1110 if is_placeholder_label(id) {
1111 return Err(invalid_argument(format!(
1112 "Required URL identifier for {action} is missing or invalid"
1113 )));
1114 }
1115 }
1116 "ListDistributionsByConnectionMode" => {
1117 let id = route.id.as_deref().unwrap_or("");
1118 if is_placeholder_label(id) {
1119 return Err(invalid_argument(
1120 "ConnectionMode is required for ListDistributionsByConnectionMode",
1121 ));
1122 }
1123 if id != "direct" && id != "tenant-only" {
1124 return Err(invalid_argument(format!(
1125 "ConnectionMode must be 'direct' or 'tenant-only', got '{id}'"
1126 )));
1127 }
1128 }
1129 "ListDistributionsByConnectionFunction"
1130 if parse_query_value(&req.raw_query, "ConnectionFunctionIdentifier").is_none() =>
1131 {
1132 return Err(invalid_argument(
1133 "ConnectionFunctionIdentifier query parameter is required",
1134 ));
1135 }
1136 "ListDistributionsByTrustStore"
1137 if parse_query_value(&req.raw_query, "TrustStoreIdentifier").is_none() =>
1138 {
1139 return Err(invalid_argument(
1140 "TrustStoreIdentifier query parameter is required",
1141 ));
1142 }
1143 _ => {}
1144 }
1145
1146 let root = match action {
1151 "ListDistributionsByCachePolicyId"
1152 | "ListDistributionsByOriginRequestPolicyId"
1153 | "ListDistributionsByResponseHeadersPolicyId"
1154 | "ListDistributionsByKeyGroup"
1155 | "ListDistributionsByVpcOriginId" => "DistributionIdList",
1156 "ListDistributionsByOwnedResource" => "DistributionIdOwnerList",
1157 _ => "DistributionList",
1158 };
1159 let body = build_empty_distribution_id_list(root);
1160 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1161 }
1162
1163 fn copy_distribution(
1164 &self,
1165 req: &AwsRequest,
1166 route: &Route,
1167 ) -> Result<AwsResponse, AwsServiceError> {
1168 let primary_id = route
1169 .id
1170 .as_deref()
1171 .ok_or_else(|| invalid_argument("missing primary distribution id"))?;
1172 let if_match = req
1173 .headers
1174 .get(IF_MATCH)
1175 .and_then(|v| v.to_str().ok())
1176 .ok_or_else(|| {
1177 aws_error(
1178 StatusCode::BAD_REQUEST,
1179 "InvalidIfMatchVersion",
1180 "Missing If-Match header for CopyDistribution",
1181 )
1182 })?
1183 .to_string();
1184 let parsed: CopyDistributionRequest = xml_io::from_xml_root(&req.body)
1185 .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
1186 validate_caller_reference(&parsed.caller_reference)?;
1187 let mut state = self.state.write();
1188 let account = state
1189 .accounts
1190 .get_mut(DEFAULT_ACCOUNT)
1191 .ok_or_else(|| no_such_distribution(primary_id))?;
1192 let primary = account
1193 .distributions
1194 .get(primary_id)
1195 .ok_or_else(|| no_such_distribution(primary_id))?
1196 .clone();
1197 if primary.etag != if_match {
1198 return Err(aws_error(
1199 StatusCode::PRECONDITION_FAILED,
1200 "PreconditionFailed",
1201 "If-Match header does not match the current ETag",
1202 ));
1203 }
1204 if account
1205 .distributions
1206 .values()
1207 .any(|d| d.config.caller_reference == parsed.caller_reference)
1208 {
1209 return Err(aws_error(
1210 StatusCode::CONFLICT,
1211 "DistributionAlreadyExists",
1212 "Distribution with the same CallerReference exists",
1213 ));
1214 }
1215 let new_id = generate_distribution_id();
1216 let mut config = primary.config.clone();
1217 config.caller_reference = parsed.caller_reference;
1218 config.enabled = parsed.enabled.unwrap_or(false);
1219 config.staging = parsed.staging;
1220 let now = Utc::now();
1221 let etag = generate_etag();
1222 let arn = format!(
1223 "arn:aws:cloudfront::{}:distribution/{}",
1224 account_id(req),
1225 new_id
1226 );
1227 let stored = StoredDistribution {
1228 id: new_id.clone(),
1229 arn: arn.clone(),
1230 status: "InProgress".to_string(),
1231 last_modified_time: now,
1232 domain_name: format!("{}.cloudfront.net", new_id.to_lowercase()),
1233 in_progress_invalidation_batches: 0,
1234 etag: etag.clone(),
1235 config,
1236 };
1237 account.distributions.insert(new_id.clone(), stored.clone());
1238 drop(state);
1239 self.schedule_distribution_deploy(new_id);
1240 let body = build_distribution_xml(&stored);
1241 let mut headers = HeaderMap::new();
1242 set_header(&mut headers, ETAG, &etag);
1243 set_header(&mut headers, LOCATION, &stored.arn);
1244 Ok(xml_response(StatusCode::CREATED, body, headers))
1245 }
1246}
1247
1248#[derive(Debug, serde::Deserialize, Default)]
1249#[serde(rename_all = "PascalCase")]
1250struct CopyDistributionRequest {
1251 caller_reference: String,
1252 #[serde(default)]
1253 enabled: Option<bool>,
1254 #[serde(default)]
1255 staging: Option<bool>,
1256}
1257
1258impl CloudFrontService {
1261 fn create_invalidation(
1262 &self,
1263 req: &AwsRequest,
1264 route: &Route,
1265 ) -> Result<AwsResponse, AwsServiceError> {
1266 let dist_id = route
1267 .id
1268 .as_deref()
1269 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1270 let batch: InvalidationBatch = xml_io::from_xml_root(&req.body)
1271 .map_err(|e| invalid_argument(format!("invalid InvalidationBatch XML: {e}")))?;
1272 if batch.caller_reference.is_empty() {
1273 return Err(invalid_argument("CallerReference is required"));
1274 }
1275 if batch.paths.quantity < 1 {
1276 return Err(invalid_argument(
1277 "InvalidationBatch.Paths must be non-empty",
1278 ));
1279 }
1280 let mut state = self.state.write();
1281 let account = state.entry(DEFAULT_ACCOUNT);
1282 if !account.distributions.contains_key(dist_id) {
1283 return Err(no_such_distribution(dist_id));
1284 }
1285 let id = generate_invalidation_id();
1286 let stored = StoredInvalidation {
1287 id: id.clone(),
1288 distribution_id: dist_id.to_string(),
1289 status: "Completed".to_string(),
1290 create_time: Utc::now(),
1291 batch: batch.clone(),
1292 };
1293 account.invalidations.insert(id.clone(), stored.clone());
1294 drop(state);
1295 let body = build_invalidation_xml(&stored);
1296 let mut headers = HeaderMap::new();
1297 set_header(
1298 &mut headers,
1299 LOCATION,
1300 &format!(
1301 "/2020-05-31/distribution/{dist_id}/invalidation/{}",
1302 stored.id
1303 ),
1304 );
1305 Ok(xml_response(StatusCode::CREATED, body, headers))
1306 }
1307
1308 fn get_invalidation(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
1309 let dist_id = route
1310 .id
1311 .as_deref()
1312 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1313 let inv_id = route
1314 .second_id
1315 .as_deref()
1316 .ok_or_else(|| invalid_argument("missing invalidation id"))?;
1317 let state = self.state.read();
1318 let account = state
1319 .accounts
1320 .get(DEFAULT_ACCOUNT)
1321 .ok_or_else(|| no_such_invalidation(inv_id))?;
1322 if !account.distributions.contains_key(dist_id) {
1323 return Err(no_such_distribution(dist_id));
1324 }
1325 let inv = account
1326 .invalidations
1327 .get(inv_id)
1328 .filter(|i| i.distribution_id == dist_id)
1329 .ok_or_else(|| no_such_invalidation(inv_id))?
1330 .clone();
1331 drop(state);
1332 let body = build_invalidation_xml(&inv);
1333 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1334 }
1335
1336 fn list_invalidations(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
1337 let dist_id = route
1338 .id
1339 .as_deref()
1340 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1341 let state = self.state.read();
1342 let account = state
1343 .accounts
1344 .get(DEFAULT_ACCOUNT)
1345 .ok_or_else(|| no_such_distribution(dist_id))?;
1346 if !account.distributions.contains_key(dist_id) {
1347 return Err(no_such_distribution(dist_id));
1348 }
1349 let mut items: Vec<&StoredInvalidation> = account
1350 .invalidations
1351 .values()
1352 .filter(|i| i.distribution_id == dist_id)
1353 .collect();
1354 items.sort_by_key(|a| a.create_time);
1355 let body = build_invalidation_list_xml(&items);
1356 drop(state);
1357 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1358 }
1359}
1360
1361impl CloudFrontService {
1364 fn parse_arn_query(query: &str) -> Option<String> {
1365 for pair in query.split('&').filter(|p| !p.is_empty()) {
1366 if let Some(rest) = pair.strip_prefix("Resource=") {
1367 return Some(percent_decode(rest));
1368 }
1369 }
1370 None
1371 }
1372
1373 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1374 let arn = Self::parse_arn_query(&req.raw_query)
1375 .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1376 let parsed: ModelTags = xml_io::from_xml_root(&req.body)
1377 .map_err(|e| invalid_argument(format!("invalid Tags XML: {e}")))?;
1378 let new_tags: Vec<Tag> = parsed
1379 .items
1380 .map(|i| {
1381 i.tag
1382 .into_iter()
1383 .map(|t| Tag {
1384 key: t.key,
1385 value: t.value,
1386 })
1387 .collect()
1388 })
1389 .unwrap_or_default();
1390 let mut state = self.state.write();
1391 let account = state.entry(DEFAULT_ACCOUNT);
1392 let entry = account.tags.entry(arn).or_default();
1393 for tag in new_tags {
1394 if let Some(existing) = entry.iter_mut().find(|t| t.key == tag.key) {
1395 existing.value = tag.value;
1396 } else {
1397 entry.push(tag);
1398 }
1399 }
1400 Ok(empty_response(StatusCode::NO_CONTENT))
1401 }
1402
1403 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1404 let arn = Self::parse_arn_query(&req.raw_query)
1405 .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1406 let parsed: TagKeys = xml_io::from_xml_root(&req.body)
1407 .map_err(|e| invalid_argument(format!("invalid TagKeys XML: {e}")))?;
1408 let keys: Vec<String> = parsed.items.map(|k| k.key).unwrap_or_default();
1409 let mut state = self.state.write();
1410 let account = state.entry(DEFAULT_ACCOUNT);
1411 if let Some(existing) = account.tags.get_mut(&arn) {
1412 existing.retain(|t| !keys.contains(&t.key));
1413 }
1414 Ok(empty_response(StatusCode::NO_CONTENT))
1415 }
1416
1417 fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1418 let arn = Self::parse_arn_query(&req.raw_query)
1419 .ok_or_else(|| invalid_argument("Resource query parameter is required"))?;
1420 let state = self.state.read();
1421 let tags = state
1422 .accounts
1423 .get(DEFAULT_ACCOUNT)
1424 .and_then(|a| a.tags.get(&arn))
1425 .cloned()
1426 .unwrap_or_default();
1427 drop(state);
1428 let body = build_tags_xml(&tags);
1429 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1430 }
1431}
1432
1433impl CloudFrontService {
1436 fn associate_alias(
1437 &self,
1438 req: &AwsRequest,
1439 route: &Route,
1440 ) -> Result<AwsResponse, AwsServiceError> {
1441 let id = route
1442 .id
1443 .as_deref()
1444 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1445 let alias = parse_query_value(&req.raw_query, "Alias")
1446 .ok_or_else(|| invalid_argument("Alias query parameter is required"))?;
1447 let mut state = self.state.write();
1448 let account = state
1449 .accounts
1450 .get_mut(DEFAULT_ACCOUNT)
1451 .ok_or_else(|| no_such_distribution(id))?;
1452 if let Some(other) = account.distributions.values().find(|d| {
1454 d.id != id
1455 && d.config
1456 .aliases
1457 .as_ref()
1458 .and_then(|a| a.items.as_ref())
1459 .is_some_and(|i| i.cname.iter().any(|c| c == &alias))
1460 }) {
1461 return Err(aws_error(
1462 StatusCode::CONFLICT,
1463 "CNAMEAlreadyExists",
1464 format!(
1465 "Alias {alias} is already associated with distribution {}",
1466 other.id
1467 ),
1468 ));
1469 }
1470 let dist = account
1471 .distributions
1472 .get_mut(id)
1473 .ok_or_else(|| no_such_distribution(id))?;
1474 let aliases = dist.config.aliases.get_or_insert_with(Default::default);
1475 let items = aliases
1476 .items
1477 .get_or_insert_with(crate::model::AliasItems::default);
1478 if !items.cname.iter().any(|c| c == &alias) {
1479 items.cname.push(alias.clone());
1480 aliases.quantity = items.cname.len() as i32;
1481 }
1482 dist.etag = generate_etag();
1483 dist.last_modified_time = Utc::now();
1484 Ok(empty_response(StatusCode::OK))
1485 }
1486
1487 fn list_conflicting_aliases(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1488 let alias = parse_query_value(&req.raw_query, "Alias")
1489 .ok_or_else(|| invalid_argument("Alias query parameter is required"))?;
1490 let dist_id = parse_query_value(&req.raw_query, "DistributionId")
1491 .ok_or_else(|| invalid_argument("DistributionId query parameter is required"))?;
1492 if alias.len() > 253 {
1497 return Err(invalid_argument(format!(
1498 "Alias length {} exceeds maximum 253",
1499 alias.len()
1500 )));
1501 }
1502 if dist_id.len() > 25 {
1503 return Err(invalid_argument(format!(
1504 "DistributionId length {} exceeds maximum 25",
1505 dist_id.len()
1506 )));
1507 }
1508 if let Some(max_items) = parse_query_value(&req.raw_query, "MaxItems") {
1509 let n: i64 = max_items.parse().map_err(|_| {
1510 invalid_argument(format!("MaxItems must be an integer, got '{max_items}'"))
1511 })?;
1512 if n > 100 {
1513 return Err(invalid_argument(format!(
1514 "MaxItems {n} exceeds maximum 100"
1515 )));
1516 }
1517 }
1518 let body = format!(
1521 "{XML_DECL}<ConflictingAliasesList xmlns=\"{NS}\"><Quantity>0</Quantity></ConflictingAliasesList>",
1522 NS = crate::NAMESPACE,
1523 XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1524 );
1525 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1526 }
1527
1528 fn associate_web_acl(
1529 &self,
1530 req: &AwsRequest,
1531 route: &Route,
1532 ) -> Result<AwsResponse, AwsServiceError> {
1533 let id = route
1534 .id
1535 .as_deref()
1536 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1537 let parsed: AssociateAliasRequest = xml_io::from_xml_root(&req.body)
1538 .map_err(|e| invalid_argument(format!("invalid request XML: {e}")))?;
1539 let mut state = self.state.write();
1540 let account = state
1541 .accounts
1542 .get_mut(DEFAULT_ACCOUNT)
1543 .ok_or_else(|| no_such_distribution(id))?;
1544 let dist = account
1545 .distributions
1546 .get_mut(id)
1547 .ok_or_else(|| no_such_distribution(id))?;
1548 dist.config.web_acl_id = Some(parsed.web_acl_arn.clone());
1549 dist.etag = generate_etag();
1550 dist.last_modified_time = Utc::now();
1551 let body = format!(
1552 "{XML_DECL}<AssociateDistributionWebACLResult xmlns=\"{NS}\"><Id>{}</Id><WebACLArn>{}</WebACLArn></AssociateDistributionWebACLResult>",
1553 esc(id), esc(&parsed.web_acl_arn),
1554 NS = crate::NAMESPACE,
1555 XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1556 );
1557 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1558 }
1559
1560 fn disassociate_web_acl(
1561 &self,
1562 _req: &AwsRequest,
1563 route: &Route,
1564 ) -> Result<AwsResponse, AwsServiceError> {
1565 let id = route
1566 .id
1567 .as_deref()
1568 .ok_or_else(|| invalid_argument("missing distribution id"))?;
1569 let entity_not_found = || {
1572 aws_error(
1573 StatusCode::NOT_FOUND,
1574 "EntityNotFound",
1575 format!("The specified distribution does not exist: {id}"),
1576 )
1577 };
1578 let mut state = self.state.write();
1579 let account = state
1580 .accounts
1581 .get_mut(DEFAULT_ACCOUNT)
1582 .ok_or_else(entity_not_found)?;
1583 let dist = account
1584 .distributions
1585 .get_mut(id)
1586 .ok_or_else(entity_not_found)?;
1587 dist.config.web_acl_id = None;
1588 dist.etag = generate_etag();
1589 dist.last_modified_time = Utc::now();
1590 let body = format!(
1591 "{XML_DECL}<DisassociateDistributionWebACLResult xmlns=\"{NS}\"><Id>{}</Id></DisassociateDistributionWebACLResult>",
1592 esc(id),
1593 NS = crate::NAMESPACE,
1594 XML_DECL = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
1595 );
1596 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1597 }
1598}
1599
1600#[derive(serde::Deserialize, Default, Debug)]
1601#[serde(rename_all = "PascalCase")]
1602struct AssociateAliasRequest {
1603 #[serde(rename = "WebACLArn", default)]
1604 web_acl_arn: String,
1605}
1606
1607pub(crate) fn esc(s: &str) -> String {
1615 let mut out = String::with_capacity(s.len());
1616 for c in s.chars() {
1617 match c {
1618 '&' => out.push_str("&"),
1619 '<' => out.push_str("<"),
1620 '>' => out.push_str(">"),
1621 '"' => out.push_str("""),
1622 '\'' => out.push_str("'"),
1623 _ => out.push(c),
1624 }
1625 }
1626 out
1627}
1628
1629pub(crate) fn build_distribution_xml(dist: &StoredDistribution) -> String {
1630 let mut out = String::with_capacity(2048);
1631 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1632 out.push_str(&format!(
1633 "<Distribution xmlns=\"{ns}\">",
1634 ns = crate::NAMESPACE
1635 ));
1636 out.push_str(&format!("<Id>{}</Id>", esc(&dist.id)));
1637 out.push_str(&format!("<ARN>{}</ARN>", esc(&dist.arn)));
1638 out.push_str(&format!("<Status>{}</Status>", esc(&dist.status)));
1639 out.push_str(&format!(
1640 "<LastModifiedTime>{}</LastModifiedTime>",
1641 rfc3339(&dist.last_modified_time)
1642 ));
1643 out.push_str(&format!(
1644 "<InProgressInvalidationBatches>{}</InProgressInvalidationBatches>",
1645 dist.in_progress_invalidation_batches
1646 ));
1647 out.push_str(&format!(
1648 "<DomainName>{}</DomainName>",
1649 esc(&dist.domain_name)
1650 ));
1651 out.push_str("<ActiveTrustedSigners><Enabled>false</Enabled><Quantity>0</Quantity></ActiveTrustedSigners>");
1652 out.push_str("<ActiveTrustedKeyGroups><Enabled>false</Enabled><Quantity>0</Quantity></ActiveTrustedKeyGroups>");
1653 let inner = quick_xml::se::to_string_with_root("DistributionConfig", &dist.config)
1654 .unwrap_or_else(|_| String::new());
1655 out.push_str(&inner);
1656 out.push_str("</Distribution>");
1657 out
1658}
1659
1660fn build_distribution_list_xml(
1661 dists: &[StoredDistribution],
1662 root: &str,
1663 marker: &str,
1664 max_items: usize,
1665 next_marker: Option<&str>,
1666) -> String {
1667 let mut out = String::with_capacity(2048);
1668 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1669 out.push_str(&format!("<{root} xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1670 out.push_str(&format!("<Marker>{}</Marker>", esc(marker)));
1671 if let Some(nm) = next_marker {
1672 out.push_str(&format!("<NextMarker>{}</NextMarker>", esc(nm)));
1673 }
1674 out.push_str(&format!("<MaxItems>{max_items}</MaxItems>"));
1675 out.push_str(&format!(
1676 "<IsTruncated>{}</IsTruncated>",
1677 next_marker.is_some()
1678 ));
1679 out.push_str(&format!("<Quantity>{}</Quantity>", dists.len()));
1680 if dists.is_empty() {
1681 out.push_str(&format!("</{root}>"));
1682 return out;
1683 }
1684 out.push_str("<Items>");
1685 for d in dists {
1686 out.push_str("<DistributionSummary>");
1687 out.push_str(&format!("<Id>{}</Id>", esc(&d.id)));
1688 out.push_str(&format!("<ARN>{}</ARN>", esc(&d.arn)));
1689 out.push_str(&format!("<Status>{}</Status>", esc(&d.status)));
1690 out.push_str(&format!(
1691 "<LastModifiedTime>{}</LastModifiedTime>",
1692 rfc3339(&d.last_modified_time)
1693 ));
1694 out.push_str(&format!("<DomainName>{}</DomainName>", esc(&d.domain_name)));
1695 let aliases = d.config.aliases.clone().unwrap_or_default();
1696 out.push_str(&render_inline("Aliases", &aliases));
1697 let origins = d.config.origins.clone();
1698 out.push_str(&render_inline("Origins", &origins));
1699 let dcb = d.config.default_cache_behavior.clone();
1700 out.push_str(&render_inline("DefaultCacheBehavior", &dcb));
1701 let cb = d.config.cache_behaviors.clone().unwrap_or_default();
1702 out.push_str(&render_inline("CacheBehaviors", &cb));
1703 let cer = d.config.custom_error_responses.clone().unwrap_or_default();
1704 out.push_str(&render_inline("CustomErrorResponses", &cer));
1705 out.push_str(&format!("<Comment>{}</Comment>", esc(&d.config.comment)));
1706 out.push_str(&format!(
1707 "<PriceClass>{}</PriceClass>",
1708 esc(&d
1709 .config
1710 .price_class
1711 .clone()
1712 .unwrap_or_else(|| "PriceClass_All".to_string()))
1713 ));
1714 out.push_str(&format!("<Enabled>{}</Enabled>", d.config.enabled));
1715 out.push_str(&render_inline(
1716 "ViewerCertificate",
1717 &d.config.viewer_certificate.clone().unwrap_or_default(),
1718 ));
1719 out.push_str(&render_inline(
1720 "Restrictions",
1721 &d.config.restrictions.clone().unwrap_or_default(),
1722 ));
1723 out.push_str(&format!(
1724 "<WebACLId>{}</WebACLId>",
1725 esc(&d.config.web_acl_id.clone().unwrap_or_default())
1726 ));
1727 out.push_str(&format!(
1728 "<HttpVersion>{}</HttpVersion>",
1729 esc(&d
1730 .config
1731 .http_version
1732 .clone()
1733 .unwrap_or_else(|| "http2".to_string()))
1734 ));
1735 out.push_str(&format!(
1736 "<IsIPV6Enabled>{}</IsIPV6Enabled>",
1737 d.config.is_ipv6_enabled.unwrap_or(true)
1738 ));
1739 out.push_str(&format!(
1740 "<Staging>{}</Staging>",
1741 d.config.staging.unwrap_or(false)
1742 ));
1743 out.push_str("</DistributionSummary>");
1744 }
1745 out.push_str("</Items>");
1746 out.push_str(&format!("</{root}>"));
1747 out
1748}
1749
1750fn build_empty_distribution_id_list(root: &str) -> String {
1751 let mut out = String::with_capacity(256);
1752 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1753 out.push_str(&format!("<{root} xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1754 out.push_str("<Marker></Marker>");
1755 out.push_str("<MaxItems>100</MaxItems>");
1756 out.push_str("<IsTruncated>false</IsTruncated>");
1757 out.push_str("<Quantity>0</Quantity>");
1758 out.push_str(&format!("</{root}>"));
1759 out
1760}
1761
1762fn build_invalidation_xml(inv: &StoredInvalidation) -> String {
1763 let mut out = String::with_capacity(512);
1764 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1765 out.push_str(&format!(
1766 "<Invalidation xmlns=\"{ns}\">",
1767 ns = crate::NAMESPACE
1768 ));
1769 out.push_str(&format!("<Id>{}</Id>", esc(&inv.id)));
1770 out.push_str(&format!("<Status>{}</Status>", esc(&inv.status)));
1771 out.push_str(&format!(
1772 "<CreateTime>{}</CreateTime>",
1773 rfc3339(&inv.create_time)
1774 ));
1775 out.push_str(&render_inline("InvalidationBatch", &inv.batch));
1776 out.push_str("</Invalidation>");
1777 out
1778}
1779
1780fn build_invalidation_list_xml(items: &[&StoredInvalidation]) -> String {
1781 let mut out = String::with_capacity(1024);
1782 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1783 out.push_str(&format!(
1784 "<InvalidationList xmlns=\"{ns}\">",
1785 ns = crate::NAMESPACE
1786 ));
1787 out.push_str("<Marker></Marker>");
1788 out.push_str("<MaxItems>100</MaxItems>");
1789 out.push_str("<IsTruncated>false</IsTruncated>");
1790 out.push_str(&format!("<Quantity>{}</Quantity>", items.len()));
1791 if !items.is_empty() {
1792 out.push_str("<Items>");
1793 for inv in items {
1794 out.push_str("<InvalidationSummary>");
1795 out.push_str(&format!("<Id>{}</Id>", esc(&inv.id)));
1796 out.push_str(&format!(
1797 "<CreateTime>{}</CreateTime>",
1798 rfc3339(&inv.create_time)
1799 ));
1800 out.push_str(&format!("<Status>{}</Status>", esc(&inv.status)));
1801 out.push_str("</InvalidationSummary>");
1802 }
1803 out.push_str("</Items>");
1804 }
1805 out.push_str("</InvalidationList>");
1806 out
1807}
1808
1809fn build_tags_xml(tags: &[Tag]) -> String {
1810 let mut out = String::with_capacity(256);
1811 out.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1812 out.push_str(&format!("<Tags xmlns=\"{ns}\">", ns = crate::NAMESPACE));
1813 out.push_str("<Items>");
1814 for t in tags {
1815 out.push_str("<Tag>");
1816 out.push_str(&format!("<Key>{}</Key>", esc(&t.key)));
1817 if let Some(v) = &t.value {
1818 out.push_str(&format!("<Value>{}</Value>", esc(v)));
1819 }
1820 out.push_str("</Tag>");
1821 }
1822 out.push_str("</Items>");
1823 out.push_str("</Tags>");
1824 out
1825}
1826
1827fn render_inline<T: serde::Serialize>(root: &str, value: &T) -> String {
1828 quick_xml::se::to_string_with_root(root, value).unwrap_or_default()
1829}
1830
1831fn validate_caller_reference(s: &str) -> Result<(), AwsServiceError> {
1834 if s.is_empty() {
1835 return Err(invalid_argument("CallerReference is required"));
1836 }
1837 Ok(())
1838}
1839
1840fn validate_origins(config: &DistributionConfig) -> Result<(), AwsServiceError> {
1841 if config.origins.quantity < 1 {
1842 return Err(invalid_argument(
1843 "DistributionConfig.Origins must contain at least one origin",
1844 ));
1845 }
1846 Ok(())
1847}
1848
1849fn configs_equal(lhs: &DistributionConfig, rhs: &DistributionConfig) -> bool {
1855 let Ok(a) = xml_io::to_xml_root("DistributionConfig", lhs) else {
1856 return false;
1857 };
1858 let Ok(b) = xml_io::to_xml_root("DistributionConfig", rhs) else {
1859 return false;
1860 };
1861 a == b
1862}
1863
1864fn account_id(_req: &AwsRequest) -> &'static str {
1865 DEFAULT_ACCOUNT
1870}
1871
1872fn generate_distribution_id() -> String {
1873 let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1875 format!("E{}", &raw[..13])
1876}
1877
1878fn generate_invalidation_id() -> String {
1879 let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1880 format!("I{}", &raw[..13])
1881}
1882
1883pub(crate) fn generate_etag() -> String {
1884 let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1885 format!("E{}", &raw[..13])
1886}
1887
1888pub(crate) fn generate_id_with_prefix(prefix: &str) -> String {
1893 let raw = Uuid::new_v4().simple().to_string().to_uppercase();
1894 let suffix_len = 14usize.saturating_sub(prefix.len()).min(raw.len());
1895 format!("{prefix}{}", &raw[..suffix_len])
1896}
1897
1898fn rfc3339(t: &chrono::DateTime<chrono::Utc>) -> String {
1899 t.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
1900}
1901
1902pub(crate) fn invalid_argument(msg: impl Into<String>) -> AwsServiceError {
1903 aws_error(StatusCode::BAD_REQUEST, "InvalidArgument", msg)
1904}
1905
1906fn no_such_distribution(id: &str) -> AwsServiceError {
1907 aws_error(
1908 StatusCode::NOT_FOUND,
1909 "NoSuchDistribution",
1910 format!("The specified distribution does not exist: {id}"),
1911 )
1912}
1913
1914fn no_such_invalidation(id: &str) -> AwsServiceError {
1915 aws_error(
1916 StatusCode::NOT_FOUND,
1917 "NoSuchInvalidation",
1918 format!("The specified invalidation does not exist: {id}"),
1919 )
1920}
1921
1922fn internal_error(msg: impl Into<String>) -> AwsServiceError {
1923 aws_error(StatusCode::INTERNAL_SERVER_ERROR, "InternalError", msg)
1924}
1925
1926pub(crate) fn aws_error(
1927 status: StatusCode,
1928 code: impl Into<String>,
1929 msg: impl Into<String>,
1930) -> AwsServiceError {
1931 AwsServiceError::aws_error(status, code.into(), msg)
1932}
1933
1934fn set_header(headers: &mut HeaderMap, name: HeaderName, value: &str) {
1935 if let Ok(v) = HeaderValue::from_str(value) {
1936 headers.insert(name, v);
1937 }
1938}
1939
1940pub(crate) fn xml_response(status: StatusCode, body: String, headers: HeaderMap) -> AwsResponse {
1941 AwsResponse {
1942 status,
1943 content_type: "text/xml".to_string(),
1944 body: ResponseBody::Bytes(Bytes::from(body)),
1945 headers,
1946 }
1947}
1948
1949fn empty_response(status: StatusCode) -> AwsResponse {
1950 AwsResponse {
1951 status,
1952 content_type: "text/xml".to_string(),
1953 body: ResponseBody::Bytes(Bytes::new()),
1954 headers: HeaderMap::new(),
1955 }
1956}
1957
1958fn percent_decode(input: &str) -> String {
1959 let mut out = String::with_capacity(input.len());
1960 let bytes = input.as_bytes();
1961 let mut i = 0;
1962 while i < bytes.len() {
1963 let b = bytes[i];
1964 if b == b'%' && i + 2 < bytes.len() {
1965 if let (Some(a), Some(c)) = (hex_digit(bytes[i + 1]), hex_digit(bytes[i + 2])) {
1966 out.push(((a << 4) | c) as char);
1967 i += 3;
1968 continue;
1969 }
1970 }
1971 if b == b'+' {
1972 out.push(' ');
1973 } else {
1974 out.push(b as char);
1975 }
1976 i += 1;
1977 }
1978 out
1979}
1980
1981fn hex_digit(b: u8) -> Option<u8> {
1982 match b {
1983 b'0'..=b'9' => Some(b - b'0'),
1984 b'a'..=b'f' => Some(b - b'a' + 10),
1985 b'A'..=b'F' => Some(b - b'A' + 10),
1986 _ => None,
1987 }
1988}
1989
1990pub(crate) fn is_placeholder_label(value: &str) -> bool {
1997 if value.is_empty() {
1998 return true;
1999 }
2000 let lower = value.to_ascii_lowercase();
2001 value.starts_with('{') || lower.starts_with("%7b")
2002}
2003
2004pub(crate) fn extract_body_field(body: &[u8], key: &str) -> Option<String> {
2012 if let Ok(s) = std::str::from_utf8(body) {
2013 let trimmed = s.trim_start();
2014 if trimmed.starts_with('{') {
2015 if let Ok(v) = serde_json::from_str::<serde_json::Value>(trimmed) {
2016 if let Some(field) = v.get(key) {
2017 return match field {
2018 serde_json::Value::String(s) => Some(s.clone()),
2019 serde_json::Value::Number(n) => Some(n.to_string()),
2020 serde_json::Value::Bool(b) => Some(b.to_string()),
2021 _ => None,
2022 };
2023 }
2024 return None;
2025 }
2026 }
2027 let open = format!("<{key}>");
2029 let close = format!("</{key}>");
2030 if let Some(start) = s.find(&open) {
2031 let after = start + open.len();
2032 if let Some(end_rel) = s[after..].find(&close) {
2033 return Some(s[after..after + end_rel].to_string());
2034 }
2035 }
2036 }
2037 None
2038}
2039
2040fn parse_query_value(query: &str, key: &str) -> Option<String> {
2041 let prefix = format!("{key}=");
2042 for pair in query.split('&').filter(|p| !p.is_empty()) {
2043 if let Some(rest) = pair.strip_prefix(&prefix) {
2044 return Some(percent_decode(rest));
2045 }
2046 }
2047 None
2048}
2049
2050#[cfg(test)]
2051mod tests {
2052 use super::*;
2053
2054 #[test]
2055 fn distribution_list_xml_renders_pagination_fields() {
2056 let xml =
2059 build_distribution_list_xml(&[], "DistributionList", "EDFDVBD6", 2, Some("E2NEXT"));
2060 assert!(xml.contains("<Marker>EDFDVBD6</Marker>"));
2061 assert!(xml.contains("<MaxItems>2</MaxItems>"));
2062 assert!(xml.contains("<IsTruncated>true</IsTruncated>"));
2063 assert!(xml.contains("<NextMarker>E2NEXT</NextMarker>"));
2064 let xml = build_distribution_list_xml(&[], "DistributionList", "", 100, None);
2066 assert!(xml.contains("<IsTruncated>false</IsTruncated>"));
2067 assert!(!xml.contains("<NextMarker>"));
2068 }
2069
2070 #[test]
2071 fn placeholder_label_detects_braces_and_percent_encoding() {
2072 assert!(is_placeholder_label(""));
2073 assert!(is_placeholder_label("{Identifier}"));
2074 assert!(is_placeholder_label("%7BIdentifier%7D"));
2075 assert!(is_placeholder_label("%7bidentifier%7d"));
2076 assert!(!is_placeholder_label("E1234567890ABC"));
2077 assert!(!is_placeholder_label(
2078 "arn:aws:cloudfront::000:distribution/E1"
2079 ));
2080 }
2081
2082 #[test]
2083 fn extract_body_field_handles_json_and_xml() {
2084 let json = br#"{"Stage":"BROKEN","Marker":"x"}"#;
2085 assert_eq!(
2086 extract_body_field(json, "Stage"),
2087 Some("BROKEN".to_string())
2088 );
2089 assert_eq!(extract_body_field(json, "MaxItems"), None);
2090
2091 let xml = br#"<?xml version="1.0"?><Body><Domain>example.com</Domain></Body>"#;
2092 assert_eq!(
2093 extract_body_field(xml, "Domain"),
2094 Some("example.com".to_string())
2095 );
2096 assert_eq!(extract_body_field(xml, "Missing"), None);
2097
2098 assert_eq!(extract_body_field(b"", "x"), None);
2099 }
2100
2101 fn make_state() -> SharedCloudFrontState {
2102 Arc::new(RwLock::new(CloudFrontAccounts::new()))
2103 }
2104
2105 fn make_request(method: http::Method, path: &str, query: &str, body: &str) -> AwsRequest {
2106 AwsRequest {
2107 service: "cloudfront".into(),
2108 action: String::new(),
2109 region: "us-east-1".into(),
2110 account_id: DEFAULT_ACCOUNT.into(),
2111 request_id: Uuid::new_v4().to_string(),
2112 headers: HeaderMap::new(),
2113 query_params: std::collections::HashMap::new(),
2114 body_stream: parking_lot::Mutex::new(None),
2115 body: Bytes::from(body.to_string()),
2116 path_segments: path
2117 .split('/')
2118 .filter(|s| !s.is_empty())
2119 .map(String::from)
2120 .collect(),
2121 raw_path: path.into(),
2122 raw_query: query.into(),
2123 method,
2124 is_query_protocol: false,
2125 access_key_id: None,
2126 principal: None,
2127 }
2128 }
2129
2130 fn minimal_dist_config_xml(caller_ref: &str) -> String {
2131 format!(
2132 r#"<?xml version="1.0" encoding="UTF-8"?>
2133<DistributionConfig xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2134 <CallerReference>{caller_ref}</CallerReference>
2135 <Origins>
2136 <Quantity>1</Quantity>
2137 <Items>
2138 <Origin>
2139 <Id>primary</Id>
2140 <DomainName>example.com</DomainName>
2141 </Origin>
2142 </Items>
2143 </Origins>
2144 <DefaultCacheBehavior>
2145 <TargetOriginId>primary</TargetOriginId>
2146 <ViewerProtocolPolicy>allow-all</ViewerProtocolPolicy>
2147 </DefaultCacheBehavior>
2148 <Comment></Comment>
2149 <Enabled>true</Enabled>
2150</DistributionConfig>"#
2151 )
2152 }
2153
2154 #[tokio::test]
2155 async fn create_then_get_then_delete_distribution() {
2156 let svc = CloudFrontService::new(make_state());
2157 let body = minimal_dist_config_xml("ref-1");
2158 let create = svc
2159 .handle(make_request(
2160 http::Method::POST,
2161 "/2020-05-31/distribution",
2162 "",
2163 &body,
2164 ))
2165 .await
2166 .unwrap();
2167 assert_eq!(create.status, StatusCode::CREATED);
2168 let etag = create
2169 .headers
2170 .get(ETAG)
2171 .unwrap()
2172 .to_str()
2173 .unwrap()
2174 .to_string();
2175 let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2176 let id = xml
2177 .split("<Id>")
2178 .nth(1)
2179 .unwrap()
2180 .split("</Id>")
2181 .next()
2182 .unwrap()
2183 .to_string();
2184
2185 let get = svc
2186 .handle(make_request(
2187 http::Method::GET,
2188 &format!("/2020-05-31/distribution/{id}"),
2189 "",
2190 "",
2191 ))
2192 .await
2193 .unwrap();
2194 assert_eq!(get.status, StatusCode::OK);
2195
2196 let disable_body = body.replace("<Enabled>true</Enabled>", "<Enabled>false</Enabled>");
2198 let mut update_req = make_request(
2199 http::Method::PUT,
2200 &format!("/2020-05-31/distribution/{id}/config"),
2201 "",
2202 &disable_body,
2203 );
2204 update_req.headers.insert(IF_MATCH, etag.parse().unwrap());
2205 let updated = svc.handle(update_req).await.unwrap();
2206 assert_eq!(updated.status, StatusCode::OK);
2207 let new_etag = updated
2208 .headers
2209 .get(ETAG)
2210 .unwrap()
2211 .to_str()
2212 .unwrap()
2213 .to_string();
2214
2215 let mut del_req = make_request(
2216 http::Method::DELETE,
2217 &format!("/2020-05-31/distribution/{id}"),
2218 "",
2219 "",
2220 );
2221 del_req.headers.insert(IF_MATCH, new_etag.parse().unwrap());
2222 let del = svc.handle(del_req).await.unwrap();
2223 assert_eq!(del.status, StatusCode::NO_CONTENT);
2224 }
2225
2226 async fn create_distribution_returning_id(svc: &CloudFrontService, caller_ref: &str) -> String {
2227 let body = minimal_dist_config_xml(caller_ref);
2228 let create = svc
2229 .handle(make_request(
2230 http::Method::POST,
2231 "/2020-05-31/distribution",
2232 "",
2233 &body,
2234 ))
2235 .await
2236 .unwrap();
2237 let xml = std::str::from_utf8(create.body.expect_bytes())
2238 .unwrap()
2239 .to_string();
2240 xml.split("<Id>")
2241 .nth(1)
2242 .unwrap()
2243 .split("</Id>")
2244 .next()
2245 .unwrap()
2246 .to_string()
2247 }
2248
2249 fn distribution_status(svc: &CloudFrontService, id: &str) -> String {
2250 let state = svc.state.read();
2251 state
2252 .accounts
2253 .get(DEFAULT_ACCOUNT)
2254 .and_then(|a| a.distributions.get(id))
2255 .map(|d| d.status.clone())
2256 .unwrap_or_default()
2257 }
2258
2259 #[tokio::test]
2260 async fn create_distribution_starts_in_progress() {
2261 let svc = CloudFrontService::new(make_state())
2263 .with_propagation_delay(std::time::Duration::from_secs(60));
2264 let body = minimal_dist_config_xml("status-ref");
2265 let create = svc
2266 .handle(make_request(
2267 http::Method::POST,
2268 "/2020-05-31/distribution",
2269 "",
2270 &body,
2271 ))
2272 .await
2273 .unwrap();
2274 let xml = std::str::from_utf8(create.body.expect_bytes())
2275 .unwrap()
2276 .to_string();
2277 assert!(
2278 xml.contains("<Status>InProgress</Status>"),
2279 "expected initial status InProgress, got: {xml}"
2280 );
2281 }
2282
2283 #[tokio::test]
2284 async fn auto_transition_after_tick_marks_deployed() {
2285 let svc = CloudFrontService::new(make_state())
2286 .with_propagation_delay(std::time::Duration::from_millis(50));
2287 let id = create_distribution_returning_id(&svc, "auto-tick-ref").await;
2288 assert_eq!(distribution_status(&svc, &id), "InProgress");
2289 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2290 assert_eq!(distribution_status(&svc, &id), "Deployed");
2291 }
2292
2293 #[tokio::test]
2294 async fn set_distribution_status_via_admin_flips_synchronously() {
2295 let svc = CloudFrontService::new(make_state())
2296 .with_propagation_delay(std::time::Duration::from_secs(60));
2297 let id = create_distribution_returning_id(&svc, "admin-ref").await;
2298 assert_eq!(distribution_status(&svc, &id), "InProgress");
2299 assert!(svc.set_distribution_status(&id, "Deployed"));
2300 assert_eq!(distribution_status(&svc, &id), "Deployed");
2301 assert!(svc.set_distribution_status(&id, "InProgress"));
2302 assert_eq!(distribution_status(&svc, &id), "InProgress");
2303 }
2304
2305 #[tokio::test]
2306 async fn set_distribution_status_unknown_id_returns_false() {
2307 let svc = CloudFrontService::new(make_state());
2308 assert!(!svc.set_distribution_status("E-DOES-NOT-EXIST", "Deployed"));
2309 }
2310
2311 #[tokio::test]
2312 async fn update_distribution_resets_to_in_progress() {
2313 let svc = CloudFrontService::new(make_state())
2314 .with_propagation_delay(std::time::Duration::from_secs(60));
2315 let body = minimal_dist_config_xml("update-reset-ref");
2316 let create = svc
2317 .handle(make_request(
2318 http::Method::POST,
2319 "/2020-05-31/distribution",
2320 "",
2321 &body,
2322 ))
2323 .await
2324 .unwrap();
2325 let etag = create
2326 .headers
2327 .get(ETAG)
2328 .unwrap()
2329 .to_str()
2330 .unwrap()
2331 .to_string();
2332 let xml = std::str::from_utf8(create.body.expect_bytes())
2333 .unwrap()
2334 .to_string();
2335 let id = xml
2336 .split("<Id>")
2337 .nth(1)
2338 .unwrap()
2339 .split("</Id>")
2340 .next()
2341 .unwrap()
2342 .to_string();
2343 assert!(svc.set_distribution_status(&id, "Deployed"));
2346 assert_eq!(distribution_status(&svc, &id), "Deployed");
2347
2348 let updated_body = body.replace(
2349 "<ViewerProtocolPolicy>allow-all</ViewerProtocolPolicy>",
2350 "<ViewerProtocolPolicy>https-only</ViewerProtocolPolicy>",
2351 );
2352 let mut update_req = make_request(
2353 http::Method::PUT,
2354 &format!("/2020-05-31/distribution/{id}/config"),
2355 "",
2356 &updated_body,
2357 );
2358 update_req.headers.insert(IF_MATCH, etag.parse().unwrap());
2359 let updated = svc.handle(update_req).await.unwrap();
2360 assert_eq!(updated.status, StatusCode::OK);
2361 assert_eq!(distribution_status(&svc, &id), "InProgress");
2362 }
2363
2364 #[tokio::test]
2365 async fn duplicate_caller_reference_is_rejected() {
2366 let svc = CloudFrontService::new(make_state());
2367 let body = minimal_dist_config_xml("dup-ref");
2368 svc.handle(make_request(
2369 http::Method::POST,
2370 "/2020-05-31/distribution",
2371 "",
2372 &body,
2373 ))
2374 .await
2375 .unwrap();
2376 let result = svc
2377 .handle(make_request(
2378 http::Method::POST,
2379 "/2020-05-31/distribution",
2380 "",
2381 &body,
2382 ))
2383 .await;
2384 let err = match result {
2385 Ok(_) => panic!("expected duplicate caller-reference to fail"),
2386 Err(e) => e,
2387 };
2388 assert_eq!(err.code(), "DistributionAlreadyExists");
2389 assert_eq!(err.status(), StatusCode::CONFLICT);
2390 }
2391
2392 #[tokio::test]
2393 async fn invalidation_lifecycle() {
2394 let svc = CloudFrontService::new(make_state());
2395 let body = minimal_dist_config_xml("inv-ref");
2396 let create = svc
2397 .handle(make_request(
2398 http::Method::POST,
2399 "/2020-05-31/distribution",
2400 "",
2401 &body,
2402 ))
2403 .await
2404 .unwrap();
2405 let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2406 let dist_id = xml
2407 .split("<Id>")
2408 .nth(1)
2409 .unwrap()
2410 .split("</Id>")
2411 .next()
2412 .unwrap();
2413 let inv_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2414<InvalidationBatch xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2415 <Paths><Quantity>1</Quantity><Items><Path>/*</Path></Items></Paths>
2416 <CallerReference>inv-1</CallerReference>
2417</InvalidationBatch>"#;
2418 let inv_resp = svc
2419 .handle(make_request(
2420 http::Method::POST,
2421 &format!("/2020-05-31/distribution/{dist_id}/invalidation"),
2422 "",
2423 inv_body,
2424 ))
2425 .await
2426 .unwrap();
2427 assert_eq!(inv_resp.status, StatusCode::CREATED);
2428 let inv_xml = std::str::from_utf8(inv_resp.body.expect_bytes()).unwrap();
2429 let inv_id = inv_xml
2430 .split("<Id>")
2431 .nth(1)
2432 .unwrap()
2433 .split("</Id>")
2434 .next()
2435 .unwrap();
2436 let get = svc
2437 .handle(make_request(
2438 http::Method::GET,
2439 &format!("/2020-05-31/distribution/{dist_id}/invalidation/{inv_id}"),
2440 "",
2441 "",
2442 ))
2443 .await
2444 .unwrap();
2445 assert_eq!(get.status, StatusCode::OK);
2446 let list = svc
2447 .handle(make_request(
2448 http::Method::GET,
2449 &format!("/2020-05-31/distribution/{dist_id}/invalidation"),
2450 "",
2451 "",
2452 ))
2453 .await
2454 .unwrap();
2455 let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2456 assert!(xml.contains("<Quantity>1</Quantity>"));
2457 }
2458
2459 #[tokio::test]
2460 async fn tags_roundtrip() {
2461 let svc = CloudFrontService::new(make_state());
2462 let body = minimal_dist_config_xml("tag-ref");
2463 let create = svc
2464 .handle(make_request(
2465 http::Method::POST,
2466 "/2020-05-31/distribution",
2467 "",
2468 &body,
2469 ))
2470 .await
2471 .unwrap();
2472 let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2473 let arn = xml
2474 .split("<ARN>")
2475 .nth(1)
2476 .unwrap()
2477 .split("</ARN>")
2478 .next()
2479 .unwrap();
2480 let tag_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2481<Tags xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2482 <Items><Tag><Key>env</Key><Value>prod</Value></Tag></Items>
2483</Tags>"#;
2484 let arn_q = format!("Operation=Tag&Resource={}", arn);
2485 let resp = svc
2486 .handle(make_request(
2487 http::Method::POST,
2488 "/2020-05-31/tagging",
2489 &arn_q,
2490 tag_body,
2491 ))
2492 .await
2493 .unwrap();
2494 assert_eq!(resp.status, StatusCode::NO_CONTENT);
2495 let list = svc
2496 .handle(make_request(
2497 http::Method::GET,
2498 "/2020-05-31/tagging",
2499 &format!("Resource={}", arn),
2500 "",
2501 ))
2502 .await
2503 .unwrap();
2504 let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2505 assert!(xml.contains("<Key>env</Key>"));
2506 assert!(xml.contains("<Value>prod</Value>"));
2507 }
2508
2509 #[tokio::test]
2510 async fn xml_metacharacters_in_user_input_are_escaped() {
2511 let svc = CloudFrontService::new(make_state());
2512 let body = minimal_dist_config_xml("escape-ref").replace(
2513 "<Comment></Comment>",
2514 "<Comment><![CDATA[a&b<c>d]]></Comment>",
2515 );
2516 let create = svc
2517 .handle(make_request(
2518 http::Method::POST,
2519 "/2020-05-31/distribution",
2520 "",
2521 &body,
2522 ))
2523 .await
2524 .unwrap();
2525 let xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
2526 let dist_id = xml
2527 .split("<Id>")
2528 .nth(1)
2529 .unwrap()
2530 .split("</Id>")
2531 .next()
2532 .unwrap();
2533 let arn = xml
2534 .split("<ARN>")
2535 .nth(1)
2536 .unwrap()
2537 .split("</ARN>")
2538 .next()
2539 .unwrap();
2540
2541 let tag_body = r#"<?xml version="1.0" encoding="UTF-8"?>
2542<Tags xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/">
2543 <Items><Tag><Key>env</Key><Value>a&b<c>d</Value></Tag></Items>
2544</Tags>"#;
2545 let arn_q = format!("Operation=Tag&Resource={}", arn);
2546 svc.handle(make_request(
2547 http::Method::POST,
2548 "/2020-05-31/tagging",
2549 &arn_q,
2550 tag_body,
2551 ))
2552 .await
2553 .unwrap();
2554
2555 let list = svc
2556 .handle(make_request(
2557 http::Method::GET,
2558 "/2020-05-31/tagging",
2559 &format!("Resource={}", arn),
2560 "",
2561 ))
2562 .await
2563 .unwrap();
2564 let xml = std::str::from_utf8(list.body.expect_bytes()).unwrap();
2565 assert!(xml.contains("<Value>a&b<c>d</Value>"));
2566 assert!(!xml.contains("<Value>a&b<c>d</Value>"));
2567
2568 let list_resp = svc
2571 .handle(make_request(
2572 http::Method::GET,
2573 "/2020-05-31/distribution",
2574 "",
2575 "",
2576 ))
2577 .await
2578 .unwrap();
2579 let xml = std::str::from_utf8(list_resp.body.expect_bytes()).unwrap();
2580 assert!(!xml.contains("<Comment>a&b<c>d"));
2583 assert!(xml.contains(dist_id));
2584 }
2585}