Skip to main content

rustack_cloudfront_http/
dispatch.rs

1//! Dispatch identified operations to the provider.
2//!
3//! This is the "business" layer of the HTTP crate: it receives a
4//! `RouteMatch`, parses the request body / query, calls into the
5//! `RustackCloudFront` provider, and turns the domain result into an HTTP
6//! response with correct status codes and ETag/Location headers.
7
8use std::sync::Arc;
9
10use bytes::Bytes;
11use http::{HeaderMap, HeaderValue, Method, Response, StatusCode, Uri};
12use rustack_cloudfront_core::RustackCloudFront;
13use rustack_cloudfront_model::{
14    CloudFrontError, CloudFrontOriginAccessIdentityConfig, FunctionConfig, KeyValueStore,
15};
16
17use crate::{
18    request::{
19        parse_cache_policy_config, parse_distribution_config, parse_fle_config,
20        parse_fle_profile_config, parse_function_config, parse_invalidation_batch,
21        parse_key_group_config, parse_oac_config, parse_oai_config,
22        parse_origin_request_policy_config, parse_public_key_config, parse_realtime_log_config,
23        parse_response_headers_policy_config, parse_root, parse_tag_keys, parse_tag_set,
24    },
25    response::{empty_204, error_response, xml_response},
26    router::{Operation, PathParams, RouteMatch},
27    service::HttpBody,
28    xml::ser,
29};
30
31/// Handler trait — exists for symmetry with other services. The built-in
32/// implementation dispatches directly to `RustackCloudFront`.
33pub trait CloudFrontHandler: Send + Sync + 'static {
34    /// Get the underlying provider as an owned `Arc`.
35    ///
36    /// `create_distribution`, `copy_distribution`, `update_distribution`,
37    /// and `create_invalidation` take `self: &Arc<Self>` so they can spawn
38    /// propagation tasks that outlive the caller. The service layer stores
39    /// an `Arc<H>`; this method converts that into the owning `Arc<RustackCloudFront>`
40    /// needed by the provider.
41    fn provider_arc(&self) -> Arc<RustackCloudFront>;
42}
43
44/// The canonical handler: an `Arc<RustackCloudFront>` wrapped in the service's
45/// outer `Arc`. The double-Arc is required because the hyper service owns the
46/// handler by `Arc<H>` while the provider's methods need `Arc<RustackCloudFront>`.
47impl CloudFrontHandler for Arc<RustackCloudFront> {
48    fn provider_arc(&self) -> Arc<RustackCloudFront> {
49        Arc::clone(self)
50    }
51}
52
53/// Dispatch a parsed route to the provider and produce an HTTP response.
54#[allow(clippy::too_many_lines)]
55pub async fn dispatch(
56    handler: &dyn CloudFrontHandler,
57    route: RouteMatch,
58    uri: &Uri,
59    _headers: &HeaderMap,
60    if_match: Option<&str>,
61    body: Bytes,
62    request_id: &str,
63) -> Response<HttpBody> {
64    let provider = handler.provider_arc();
65    match handle(&provider, route, uri, if_match, body).await {
66        Ok(resp) => resp,
67        Err(err) => error_response(&err, request_id),
68    }
69}
70
71async fn handle(
72    provider: &Arc<RustackCloudFront>,
73    route: RouteMatch,
74    uri: &Uri,
75    if_match: Option<&str>,
76    body: Bytes,
77) -> Result<Response<HttpBody>, CloudFrontError> {
78    let p = &route.path_params;
79    match route.operation {
80        // Distribution
81        Operation::CreateDistribution | Operation::CreateDistributionWithTags => {
82            let root = parse_root(&body)?;
83            let (config_node, tags) = if root.name == "DistributionConfigWithTags" {
84                let cn = root.child("DistributionConfig").ok_or_else(|| {
85                    CloudFrontError::MalformedInput(
86                        "DistributionConfigWithTags missing DistributionConfig".into(),
87                    )
88                })?;
89                let tags = root.child("Tags").map(parse_tag_set).unwrap_or_default();
90                (cn.clone(), tags)
91            } else {
92                (root.clone(), Vec::new())
93            };
94            let cfg = parse_distribution_config(&config_node);
95            let dist = provider.create_distribution(cfg, tags)?;
96            let body = ser::distribution_xml(&dist);
97            let mut resp = xml_response(StatusCode::CREATED, body, Some(&dist.etag));
98            let loc = format!(
99                "https://cloudfront.amazonaws.com/2020-05-31/distribution/{}",
100                dist.id
101            );
102            if let Ok(hv) = HeaderValue::from_str(&loc) {
103                resp.headers_mut().insert(http::header::LOCATION, hv);
104            }
105            Ok(resp)
106        }
107        Operation::GetDistribution => {
108            let d = provider.get_distribution(&p.id)?;
109            Ok(xml_response(
110                StatusCode::OK,
111                ser::distribution_xml(&d),
112                Some(&d.etag),
113            ))
114        }
115        Operation::GetDistributionConfig => {
116            let d = provider.get_distribution(&p.id)?;
117            Ok(xml_response(
118                StatusCode::OK,
119                ser::distribution_config_xml(&d.config),
120                Some(&d.etag),
121            ))
122        }
123        Operation::UpdateDistribution => {
124            let root = parse_root(&body)?;
125            let cfg = parse_distribution_config(&root);
126            let d = provider.update_distribution(&p.id, if_match, cfg)?;
127            Ok(xml_response(
128                StatusCode::OK,
129                ser::distribution_xml(&d),
130                Some(&d.etag),
131            ))
132        }
133        Operation::DeleteDistribution => {
134            provider.delete_distribution(&p.id, if_match)?;
135            Ok(empty_204())
136        }
137        Operation::ListDistributions => {
138            let items = provider.list_distributions();
139            let max = query_max_items(uri);
140            Ok(xml_response(
141                StatusCode::OK,
142                ser::distribution_list_xml(&items, max),
143                None,
144            ))
145        }
146        Operation::CopyDistribution => {
147            let root = parse_root(&body)?;
148            let caller_ref = root.child_text("CallerReference").to_owned();
149            let staging = root.child_bool("Staging");
150            let d = provider.copy_distribution(&p.id, &caller_ref, staging)?;
151            Ok(xml_response(
152                StatusCode::CREATED,
153                ser::distribution_xml(&d),
154                Some(&d.etag),
155            ))
156        }
157
158        // Invalidation
159        Operation::CreateInvalidation => {
160            let root = parse_root(&body)?;
161            let batch = parse_invalidation_batch(&root);
162            let inv = provider.create_invalidation(&p.id, batch)?;
163            Ok(xml_response(
164                StatusCode::CREATED,
165                ser::invalidation_xml(&inv),
166                None,
167            ))
168        }
169        Operation::GetInvalidation => {
170            let inv = provider.get_invalidation(&p.id, &p.secondary_id)?;
171            Ok(xml_response(
172                StatusCode::OK,
173                ser::invalidation_xml(&inv),
174                None,
175            ))
176        }
177        Operation::ListInvalidations => {
178            let items = provider.list_invalidations(&p.id);
179            let max = query_max_items(uri);
180            Ok(xml_response(
181                StatusCode::OK,
182                ser::invalidation_list_xml(&items, max),
183                None,
184            ))
185        }
186
187        // OAC
188        Operation::CreateOriginAccessControl => {
189            let root = parse_root(&body)?;
190            let cfg = parse_oac_config(&root);
191            let o = provider.create_oac(cfg)?;
192            let xml = ser::oac_xml(&o);
193            let mut resp = xml_response(StatusCode::CREATED, xml, Some(&o.etag));
194            let loc = format!(
195                "https://cloudfront.amazonaws.com/2020-05-31/origin-access-control/{}",
196                o.id
197            );
198            if let Ok(hv) = HeaderValue::from_str(&loc) {
199                resp.headers_mut().insert(http::header::LOCATION, hv);
200            }
201            Ok(resp)
202        }
203        Operation::GetOriginAccessControl => {
204            let o = provider.get_oac(&p.id)?;
205            Ok(xml_response(
206                StatusCode::OK,
207                ser::oac_xml(&o),
208                Some(&o.etag),
209            ))
210        }
211        Operation::GetOriginAccessControlConfig => {
212            let o = provider.get_oac(&p.id)?;
213            Ok(xml_response(
214                StatusCode::OK,
215                ser::oac_config_xml(&o.config),
216                Some(&o.etag),
217            ))
218        }
219        Operation::UpdateOriginAccessControl => {
220            let root = parse_root(&body)?;
221            let cfg = parse_oac_config(&root);
222            let o = provider.update_oac(&p.id, if_match, cfg)?;
223            Ok(xml_response(
224                StatusCode::OK,
225                ser::oac_xml(&o),
226                Some(&o.etag),
227            ))
228        }
229        Operation::DeleteOriginAccessControl => {
230            provider.delete_oac(&p.id, if_match)?;
231            Ok(empty_204())
232        }
233        Operation::ListOriginAccessControls => {
234            let items = provider.list_oacs();
235            let max = query_max_items(uri);
236            Ok(xml_response(
237                StatusCode::OK,
238                ser::oac_list_xml(&items, max),
239                None,
240            ))
241        }
242
243        // OAI
244        Operation::CreateCloudFrontOriginAccessIdentity => {
245            let root = parse_root(&body)?;
246            let cfg: CloudFrontOriginAccessIdentityConfig = parse_oai_config(&root);
247            let o = provider.create_oai(cfg)?;
248            let mut resp = xml_response(StatusCode::CREATED, ser::oai_xml(&o), Some(&o.etag));
249            let loc = format!(
250                "https://cloudfront.amazonaws.com/2020-05-31/origin-access-identity/cloudfront/{}",
251                o.id
252            );
253            if let Ok(hv) = HeaderValue::from_str(&loc) {
254                resp.headers_mut().insert(http::header::LOCATION, hv);
255            }
256            Ok(resp)
257        }
258        Operation::GetCloudFrontOriginAccessIdentity => {
259            let o = provider.get_oai(&p.id)?;
260            Ok(xml_response(
261                StatusCode::OK,
262                ser::oai_xml(&o),
263                Some(&o.etag),
264            ))
265        }
266        Operation::GetCloudFrontOriginAccessIdentityConfig => {
267            let o = provider.get_oai(&p.id)?;
268            Ok(xml_response(
269                StatusCode::OK,
270                ser::oai_config_xml(&o.config),
271                Some(&o.etag),
272            ))
273        }
274        Operation::UpdateCloudFrontOriginAccessIdentity => {
275            let root = parse_root(&body)?;
276            let cfg = parse_oai_config(&root);
277            let o = provider.update_oai(&p.id, if_match, cfg)?;
278            Ok(xml_response(
279                StatusCode::OK,
280                ser::oai_xml(&o),
281                Some(&o.etag),
282            ))
283        }
284        Operation::DeleteCloudFrontOriginAccessIdentity => {
285            provider.delete_oai(&p.id, if_match)?;
286            Ok(empty_204())
287        }
288        Operation::ListCloudFrontOriginAccessIdentities => {
289            let items = provider.list_oais();
290            Ok(xml_response(
291                StatusCode::OK,
292                ser::oai_list_xml(&items, query_max_items(uri)),
293                None,
294            ))
295        }
296
297        // Cache policies
298        Operation::CreateCachePolicy => {
299            let root = parse_root(&body)?;
300            let cfg = parse_cache_policy_config(&root);
301            let p = provider.create_cache_policy(cfg)?;
302            Ok(xml_response(
303                StatusCode::CREATED,
304                ser::cache_policy_xml(&p),
305                Some(&p.etag),
306            ))
307        }
308        Operation::GetCachePolicy => {
309            let pol = provider.get_cache_policy(&p.id)?;
310            Ok(xml_response(
311                StatusCode::OK,
312                ser::cache_policy_xml(&pol),
313                Some(&pol.etag),
314            ))
315        }
316        Operation::GetCachePolicyConfig => {
317            let pol = provider.get_cache_policy(&p.id)?;
318            Ok(xml_response(
319                StatusCode::OK,
320                ser::cache_policy_config_xml(&pol.config),
321                Some(&pol.etag),
322            ))
323        }
324        Operation::UpdateCachePolicy => {
325            let root = parse_root(&body)?;
326            let cfg = parse_cache_policy_config(&root);
327            let pol = provider.update_cache_policy(&p.id, if_match, cfg)?;
328            Ok(xml_response(
329                StatusCode::OK,
330                ser::cache_policy_xml(&pol),
331                Some(&pol.etag),
332            ))
333        }
334        Operation::DeleteCachePolicy => {
335            provider.delete_cache_policy(&p.id, if_match)?;
336            Ok(empty_204())
337        }
338        Operation::ListCachePolicies => {
339            let items = provider.list_cache_policies();
340            Ok(xml_response(
341                StatusCode::OK,
342                ser::cache_policy_list_xml(&items, query_max_items(uri)),
343                None,
344            ))
345        }
346
347        // Origin request policy
348        Operation::CreateOriginRequestPolicy => {
349            let root = parse_root(&body)?;
350            let cfg = parse_origin_request_policy_config(&root);
351            let pol = provider.create_origin_request_policy(cfg)?;
352            Ok(xml_response(
353                StatusCode::CREATED,
354                ser::origin_request_policy_xml(&pol),
355                Some(&pol.etag),
356            ))
357        }
358        Operation::GetOriginRequestPolicy => {
359            let pol = provider.get_origin_request_policy(&p.id)?;
360            Ok(xml_response(
361                StatusCode::OK,
362                ser::origin_request_policy_xml(&pol),
363                Some(&pol.etag),
364            ))
365        }
366        Operation::GetOriginRequestPolicyConfig => {
367            let pol = provider.get_origin_request_policy(&p.id)?;
368            Ok(xml_response(
369                StatusCode::OK,
370                ser::origin_request_policy_config_xml(&pol.config),
371                Some(&pol.etag),
372            ))
373        }
374        Operation::UpdateOriginRequestPolicy => {
375            let root = parse_root(&body)?;
376            let cfg = parse_origin_request_policy_config(&root);
377            let pol = provider.update_origin_request_policy(&p.id, if_match, cfg)?;
378            Ok(xml_response(
379                StatusCode::OK,
380                ser::origin_request_policy_xml(&pol),
381                Some(&pol.etag),
382            ))
383        }
384        Operation::DeleteOriginRequestPolicy => {
385            provider.delete_origin_request_policy(&p.id, if_match)?;
386            Ok(empty_204())
387        }
388        Operation::ListOriginRequestPolicies => {
389            let items = provider.list_origin_request_policies();
390            Ok(xml_response(
391                StatusCode::OK,
392                ser::origin_request_policy_list_xml(&items, query_max_items(uri)),
393                None,
394            ))
395        }
396
397        // Response headers policy
398        Operation::CreateResponseHeadersPolicy => {
399            let root = parse_root(&body)?;
400            let cfg = parse_response_headers_policy_config(&root);
401            let pol = provider.create_response_headers_policy(cfg)?;
402            Ok(xml_response(
403                StatusCode::CREATED,
404                ser::response_headers_policy_xml(&pol),
405                Some(&pol.etag),
406            ))
407        }
408        Operation::GetResponseHeadersPolicy => {
409            let pol = provider.get_response_headers_policy(&p.id)?;
410            Ok(xml_response(
411                StatusCode::OK,
412                ser::response_headers_policy_xml(&pol),
413                Some(&pol.etag),
414            ))
415        }
416        Operation::GetResponseHeadersPolicyConfig => {
417            let pol = provider.get_response_headers_policy(&p.id)?;
418            Ok(xml_response(
419                StatusCode::OK,
420                ser::response_headers_policy_config_xml(&pol.config),
421                Some(&pol.etag),
422            ))
423        }
424        Operation::UpdateResponseHeadersPolicy => {
425            let root = parse_root(&body)?;
426            let cfg = parse_response_headers_policy_config(&root);
427            let pol = provider.update_response_headers_policy(&p.id, if_match, cfg)?;
428            Ok(xml_response(
429                StatusCode::OK,
430                ser::response_headers_policy_xml(&pol),
431                Some(&pol.etag),
432            ))
433        }
434        Operation::DeleteResponseHeadersPolicy => {
435            provider.delete_response_headers_policy(&p.id, if_match)?;
436            Ok(empty_204())
437        }
438        Operation::ListResponseHeadersPolicies => {
439            let items = provider.list_response_headers_policies();
440            Ok(xml_response(
441                StatusCode::OK,
442                ser::response_headers_policy_list_xml(&items, query_max_items(uri)),
443                None,
444            ))
445        }
446
447        // Key group
448        Operation::CreateKeyGroup => {
449            let root = parse_root(&body)?;
450            let cfg = parse_key_group_config(&root);
451            let kg = provider.create_key_group(cfg)?;
452            Ok(xml_response(
453                StatusCode::CREATED,
454                ser::key_group_xml(&kg),
455                Some(&kg.etag),
456            ))
457        }
458        Operation::GetKeyGroup | Operation::GetKeyGroupConfig => {
459            let kg = provider.get_key_group(&p.id)?;
460            Ok(xml_response(
461                StatusCode::OK,
462                ser::key_group_xml(&kg),
463                Some(&kg.etag),
464            ))
465        }
466        Operation::UpdateKeyGroup => {
467            let root = parse_root(&body)?;
468            let cfg = parse_key_group_config(&root);
469            let kg = provider.update_key_group(&p.id, if_match, cfg)?;
470            Ok(xml_response(
471                StatusCode::OK,
472                ser::key_group_xml(&kg),
473                Some(&kg.etag),
474            ))
475        }
476        Operation::DeleteKeyGroup => {
477            provider.delete_key_group(&p.id, if_match)?;
478            Ok(empty_204())
479        }
480        Operation::ListKeyGroups => {
481            let items = provider.list_key_groups();
482            Ok(xml_response(
483                StatusCode::OK,
484                ser::key_group_list_xml(&items, query_max_items(uri)),
485                None,
486            ))
487        }
488
489        // Public key
490        Operation::CreatePublicKey => {
491            let root = parse_root(&body)?;
492            let cfg = parse_public_key_config(&root);
493            let k = provider.create_public_key(cfg)?;
494            Ok(xml_response(
495                StatusCode::CREATED,
496                ser::public_key_xml(&k),
497                Some(&k.etag),
498            ))
499        }
500        Operation::GetPublicKey | Operation::GetPublicKeyConfig => {
501            let k = provider.get_public_key(&p.id)?;
502            Ok(xml_response(
503                StatusCode::OK,
504                ser::public_key_xml(&k),
505                Some(&k.etag),
506            ))
507        }
508        Operation::UpdatePublicKey => {
509            let root = parse_root(&body)?;
510            let cfg = parse_public_key_config(&root);
511            let k = provider.update_public_key(&p.id, if_match, cfg)?;
512            Ok(xml_response(
513                StatusCode::OK,
514                ser::public_key_xml(&k),
515                Some(&k.etag),
516            ))
517        }
518        Operation::DeletePublicKey => {
519            provider.delete_public_key(&p.id, if_match)?;
520            Ok(empty_204())
521        }
522        Operation::ListPublicKeys => {
523            let items = provider.list_public_keys();
524            Ok(xml_response(
525                StatusCode::OK,
526                ser::public_key_list_xml(&items, query_max_items(uri)),
527                None,
528            ))
529        }
530
531        // Functions
532        Operation::CreateFunction => {
533            let root = parse_root(&body)?;
534            let name = root.child_text("Name").to_owned();
535            let cfg_node = root.child("FunctionConfig");
536            let cfg: FunctionConfig = cfg_node.map(parse_function_config).unwrap_or_default();
537            let code = base64_decode_or_raw(root.child_text("FunctionCode"));
538            let f = provider.create_function(name, cfg, code)?;
539            Ok(xml_response(
540                StatusCode::CREATED,
541                ser::function_xml(&f),
542                Some(&f.etag),
543            ))
544        }
545        Operation::DescribeFunction | Operation::GetFunction => {
546            let f = provider.get_function(&route.path_params.name)?;
547            Ok(xml_response(
548                StatusCode::OK,
549                ser::function_xml(&f),
550                Some(&f.etag),
551            ))
552        }
553        Operation::UpdateFunction => {
554            let root = parse_root(&body)?;
555            let cfg = root
556                .child("FunctionConfig")
557                .map(parse_function_config)
558                .unwrap_or_default();
559            let code = base64_decode_or_raw(root.child_text("FunctionCode"));
560            let f = provider.update_function(&route.path_params.name, if_match, cfg, code)?;
561            Ok(xml_response(
562                StatusCode::OK,
563                ser::function_xml(&f),
564                Some(&f.etag),
565            ))
566        }
567        Operation::DeleteFunction => {
568            provider.delete_function(&route.path_params.name, if_match)?;
569            Ok(empty_204())
570        }
571        Operation::PublishFunction => {
572            let f = provider.publish_function(&route.path_params.name, if_match)?;
573            Ok(xml_response(
574                StatusCode::OK,
575                ser::function_xml(&f),
576                Some(&f.etag),
577            ))
578        }
579        Operation::TestFunction => {
580            let root = parse_root(&body)?;
581            let event = root.child_text("EventObject").as_bytes().to_vec();
582            let (result, util) = provider.test_function(&route.path_params.name, &event)?;
583            let s = String::from_utf8_lossy(&result);
584            let mut w = crate::xml::XmlWriter::new();
585            w.declaration();
586            w.open_root(
587                "TestResult",
588                Some(rustack_cloudfront_model::CLOUDFRONT_XML_NAMESPACE),
589            );
590            w.element("FunctionSummary", "");
591            w.element("ComputeUtilization", &util);
592            w.open("FunctionExecutionLogList");
593            w.close("FunctionExecutionLogList");
594            w.element("FunctionErrorMessage", "");
595            w.element("FunctionOutput", &s);
596            w.close("TestResult");
597            Ok(xml_response(StatusCode::OK, w.finish(), None))
598        }
599        Operation::ListFunctions => {
600            let items = provider.list_functions();
601            Ok(xml_response(
602                StatusCode::OK,
603                ser::function_list_xml(&items, query_max_items(uri)),
604                None,
605            ))
606        }
607
608        // FLE
609        Operation::CreateFieldLevelEncryptionConfig => {
610            let root = parse_root(&body)?;
611            let cfg = parse_fle_config(&root);
612            let f = provider.create_fle_config(cfg)?;
613            Ok(xml_response(
614                StatusCode::CREATED,
615                ser::fle_xml(&f),
616                Some(&f.etag),
617            ))
618        }
619        Operation::GetFieldLevelEncryption | Operation::GetFieldLevelEncryptionConfig => {
620            let f = provider.get_fle_config(&p.id)?;
621            Ok(xml_response(
622                StatusCode::OK,
623                ser::fle_xml(&f),
624                Some(&f.etag),
625            ))
626        }
627        Operation::UpdateFieldLevelEncryptionConfig => {
628            let root = parse_root(&body)?;
629            let cfg = parse_fle_config(&root);
630            let f = provider.update_fle_config(&p.id, if_match, cfg)?;
631            Ok(xml_response(
632                StatusCode::OK,
633                ser::fle_xml(&f),
634                Some(&f.etag),
635            ))
636        }
637        Operation::DeleteFieldLevelEncryptionConfig => {
638            provider.delete_fle_config(&p.id, if_match)?;
639            Ok(empty_204())
640        }
641        Operation::ListFieldLevelEncryptionConfigs => {
642            let items = provider.list_fle_configs();
643            let mut w = crate::xml::XmlWriter::new();
644            w.declaration();
645            w.open_root(
646                "FieldLevelEncryptionList",
647                Some(rustack_cloudfront_model::CLOUDFRONT_XML_NAMESPACE),
648            );
649            w.element_display("Quantity", items.len());
650            w.close("FieldLevelEncryptionList");
651            Ok(xml_response(StatusCode::OK, w.finish(), None))
652        }
653        Operation::CreateFieldLevelEncryptionProfile => {
654            let root = parse_root(&body)?;
655            let cfg = parse_fle_profile_config(&root);
656            let f = provider.create_fle_profile(cfg)?;
657            Ok(xml_response(
658                StatusCode::CREATED,
659                ser::fle_profile_xml(&f),
660                Some(&f.etag),
661            ))
662        }
663        Operation::GetFieldLevelEncryptionProfile
664        | Operation::GetFieldLevelEncryptionProfileConfig => {
665            let f = provider.get_fle_profile(&p.id)?;
666            Ok(xml_response(
667                StatusCode::OK,
668                ser::fle_profile_xml(&f),
669                Some(&f.etag),
670            ))
671        }
672        Operation::UpdateFieldLevelEncryptionProfile => {
673            let root = parse_root(&body)?;
674            let cfg = parse_fle_profile_config(&root);
675            let f = provider.update_fle_profile(&p.id, if_match, cfg)?;
676            Ok(xml_response(
677                StatusCode::OK,
678                ser::fle_profile_xml(&f),
679                Some(&f.etag),
680            ))
681        }
682        Operation::DeleteFieldLevelEncryptionProfile => {
683            provider.delete_fle_profile(&p.id, if_match)?;
684            Ok(empty_204())
685        }
686        Operation::ListFieldLevelEncryptionProfiles => {
687            let items = provider.list_fle_profiles();
688            let mut w = crate::xml::XmlWriter::new();
689            w.declaration();
690            w.open_root(
691                "FieldLevelEncryptionProfileList",
692                Some(rustack_cloudfront_model::CLOUDFRONT_XML_NAMESPACE),
693            );
694            w.element_display("Quantity", items.len());
695            w.close("FieldLevelEncryptionProfileList");
696            Ok(xml_response(StatusCode::OK, w.finish(), None))
697        }
698
699        // Monitoring subscription
700        Operation::CreateMonitoringSubscription => {
701            let root = parse_root(&body)?;
702            let enabled = root
703                .child("RealtimeMetricsSubscriptionConfig")
704                .map(|r| r.child_text("RealtimeMetricsSubscriptionStatus") == "Enabled")
705                .unwrap_or(false);
706            let sub = provider.create_monitoring_subscription(&p.id, enabled)?;
707            let mut w = crate::xml::XmlWriter::new();
708            w.declaration();
709            w.open_root(
710                "MonitoringSubscription",
711                Some(rustack_cloudfront_model::CLOUDFRONT_XML_NAMESPACE),
712            );
713            w.open("RealtimeMetricsSubscriptionConfig");
714            w.element(
715                "RealtimeMetricsSubscriptionStatus",
716                &sub.realtime_metrics_subscription_status,
717            );
718            w.close("RealtimeMetricsSubscriptionConfig");
719            w.close("MonitoringSubscription");
720            Ok(xml_response(StatusCode::CREATED, w.finish(), None))
721        }
722        Operation::GetMonitoringSubscription => {
723            let sub = provider.get_monitoring_subscription(&p.id)?;
724            let mut w = crate::xml::XmlWriter::new();
725            w.declaration();
726            w.open_root(
727                "MonitoringSubscription",
728                Some(rustack_cloudfront_model::CLOUDFRONT_XML_NAMESPACE),
729            );
730            w.open("RealtimeMetricsSubscriptionConfig");
731            w.element(
732                "RealtimeMetricsSubscriptionStatus",
733                &sub.realtime_metrics_subscription_status,
734            );
735            w.close("RealtimeMetricsSubscriptionConfig");
736            w.close("MonitoringSubscription");
737            Ok(xml_response(StatusCode::OK, w.finish(), None))
738        }
739        Operation::DeleteMonitoringSubscription => {
740            provider.delete_monitoring_subscription(&p.id)?;
741            Ok(empty_204())
742        }
743
744        // KVS
745        Operation::CreateKeyValueStore => {
746            let root = parse_root(&body)?;
747            let name = root.child_text("Name").to_owned();
748            let comment = root.child_text("Comment").to_owned();
749            let k: KeyValueStore = provider.create_kvs(name, comment)?;
750            Ok(xml_response(
751                StatusCode::CREATED,
752                ser::kvs_xml(&k),
753                Some(&k.etag),
754            ))
755        }
756        Operation::DescribeKeyValueStore => {
757            let k = provider.get_kvs(&p.id)?;
758            Ok(xml_response(
759                StatusCode::OK,
760                ser::kvs_xml(&k),
761                Some(&k.etag),
762            ))
763        }
764        Operation::UpdateKeyValueStore => {
765            let root = parse_root(&body)?;
766            let comment = root.child_text("Comment").to_owned();
767            let k = provider.update_kvs(&p.id, if_match, comment)?;
768            Ok(xml_response(
769                StatusCode::OK,
770                ser::kvs_xml(&k),
771                Some(&k.etag),
772            ))
773        }
774        Operation::DeleteKeyValueStore => {
775            provider.delete_kvs(&p.id, if_match)?;
776            Ok(empty_204())
777        }
778        Operation::ListKeyValueStores => {
779            let items = provider.list_kvs();
780            let mut w = crate::xml::XmlWriter::new();
781            w.declaration();
782            w.open_root(
783                "KeyValueStoreList",
784                Some(rustack_cloudfront_model::CLOUDFRONT_XML_NAMESPACE),
785            );
786            w.element_display("MaxItems", query_max_items(uri));
787            w.element_display("Quantity", items.len());
788            if !items.is_empty() {
789                w.open("Items");
790                for k in &items {
791                    w.open("KeyValueStore");
792                    w.element("Name", &k.name);
793                    w.element("Id", &k.id);
794                    w.element("Comment", &k.comment);
795                    w.element("ARN", &k.arn);
796                    w.element("Status", &k.status);
797                    w.element("LastModifiedTime", &ser::iso8601(&k.last_modified_time));
798                    w.close("KeyValueStore");
799                }
800                w.close("Items");
801            }
802            w.close("KeyValueStoreList");
803            Ok(xml_response(StatusCode::OK, w.finish(), None))
804        }
805
806        // Realtime log
807        Operation::CreateRealtimeLogConfig | Operation::UpdateRealtimeLogConfig => {
808            let root = parse_root(&body)?;
809            let cfg = parse_realtime_log_config(&root);
810            let r = match route.operation {
811                Operation::CreateRealtimeLogConfig => provider.create_realtime_log_config(cfg)?,
812                _ => provider.update_realtime_log_config(cfg)?,
813            };
814            Ok(xml_response(
815                StatusCode::OK,
816                ser::realtime_log_config_xml(&r),
817                None,
818            ))
819        }
820        Operation::GetRealtimeLogConfig => {
821            let root = parse_root(&body)?;
822            let name = root.child_text("Name");
823            let r = provider.get_realtime_log_config(name)?;
824            Ok(xml_response(
825                StatusCode::OK,
826                ser::realtime_log_config_xml(&r),
827                None,
828            ))
829        }
830        Operation::DeleteRealtimeLogConfig => {
831            // Delete is POST-ish with name in body normally; accept query
832            // `Name=` or body `<Name>`.
833            let name = if let Some(q) = uri.query() {
834                q.split('&')
835                    .find_map(|kv| kv.strip_prefix("Name="))
836                    .unwrap_or("")
837                    .to_owned()
838            } else {
839                let root = parse_root(&body)?;
840                root.child_text("Name").to_owned()
841            };
842            provider.delete_realtime_log_config(&name)?;
843            Ok(empty_204())
844        }
845        Operation::ListRealtimeLogConfigs => {
846            let items = provider.list_realtime_log_configs();
847            let mut w = crate::xml::XmlWriter::new();
848            w.declaration();
849            w.open_root(
850                "RealtimeLogConfigs",
851                Some(rustack_cloudfront_model::CLOUDFRONT_XML_NAMESPACE),
852            );
853            w.element_display("MaxItems", query_max_items(uri));
854            w.element_display("Quantity", items.len());
855            if !items.is_empty() {
856                w.open("Items");
857                for r in &items {
858                    w.raw(
859                        &ser::realtime_log_config_xml(r)
860                            .trim_start_matches("<?xml version=\"1.0\" encoding=\"UTF-8\"?>"),
861                    );
862                }
863                w.close("Items");
864            }
865            w.close("RealtimeLogConfigs");
866            Ok(xml_response(StatusCode::OK, w.finish(), None))
867        }
868
869        // Tagging
870        Operation::TagResource => {
871            let arn = query_resource(uri)?;
872            let root = parse_root(&body)?;
873            let tags = parse_tag_set(&root);
874            provider.tag_resource(&arn, &tags)?;
875            Ok(empty_204())
876        }
877        Operation::UntagResource => {
878            let arn = query_resource(uri)?;
879            let root = parse_root(&body)?;
880            let keys = parse_tag_keys(&root);
881            provider.untag_resource(&arn, &keys)?;
882            Ok(empty_204())
883        }
884        Operation::ListTagsForResource => {
885            let arn = query_resource(uri)?;
886            let tags = provider.list_tags_for_resource(&arn)?;
887            Ok(xml_response(StatusCode::OK, ser::tags_xml(&tags), None))
888        }
889
890        // Phase 4 stubs — return empty success/list responses.
891        op => stub_response(op, uri, &route.path_params),
892    }
893}
894
895fn base64_decode_or_raw(s: &str) -> Vec<u8> {
896    use base64::{Engine, engine::general_purpose::STANDARD};
897    STANDARD.decode(s).unwrap_or_else(|_| s.as_bytes().to_vec())
898}
899
900fn query_max_items(uri: &Uri) -> i32 {
901    uri.query()
902        .and_then(|q| {
903            q.split('&')
904                .find_map(|kv| kv.strip_prefix("MaxItems="))
905                .and_then(|v| v.parse().ok())
906        })
907        .unwrap_or(100)
908}
909
910fn query_resource(uri: &Uri) -> Result<String, CloudFrontError> {
911    let q = uri.query().unwrap_or("");
912    for kv in q.split('&') {
913        if let Some(v) = kv.strip_prefix("Resource=") {
914            return Ok(percent_decode(v));
915        }
916    }
917    Err(CloudFrontError::MissingArgument(
918        "Resource query parameter is required".into(),
919    ))
920}
921
922fn percent_decode(s: &str) -> String {
923    percent_encoding::percent_decode_str(s)
924        .decode_utf8_lossy()
925        .into_owned()
926}
927
928fn stub_response(
929    op: Operation,
930    _uri: &Uri,
931    _p: &PathParams,
932) -> Result<Response<HttpBody>, CloudFrontError> {
933    let root_name = match op {
934        Operation::ListConflictingAliases => "ConflictingAliasesList",
935        Operation::ListDistributionsByCachePolicyId
936        | Operation::ListDistributionsByKeyGroup
937        | Operation::ListDistributionsByOriginRequestPolicyId
938        | Operation::ListDistributionsByRealtimeLogConfig
939        | Operation::ListDistributionsByResponseHeadersPolicyId
940        | Operation::ListDistributionsByVpcOriginId
941        | Operation::ListDistributionsByWebACLId
942        | Operation::ListDistributionsByAnycastIpListId => "DistributionIdList",
943        Operation::ListStreamingDistributions => "StreamingDistributionList",
944        Operation::ListContinuousDeploymentPolicies => "ContinuousDeploymentPolicyList",
945        Operation::ListAnycastIpLists => "AnycastIpListCollection",
946        Operation::ListVpcOrigins => "VpcOriginList",
947        Operation::ListTrustStores => "TrustStoreList",
948        Operation::ListDomainConflicts => "DomainConflictList",
949        _ => "StubResult",
950    };
951    let mut w = crate::xml::XmlWriter::new();
952    w.declaration();
953    w.open_root(
954        root_name,
955        Some(rustack_cloudfront_model::CLOUDFRONT_XML_NAMESPACE),
956    );
957    w.element_display("Quantity", 0);
958    w.bool("IsTruncated", false);
959    w.element_display("MaxItems", 100);
960    w.close(root_name);
961    Ok(xml_response(StatusCode::OK, w.finish(), None))
962}
963
964// Silence unused import warnings when Method is not otherwise used in this file.
965#[allow(dead_code)]
966fn _force_method_usage(_m: Method) {}