Skip to main content

rustack_cloudfront_http/
dispatch.rs

1//! Dispatch identified operations to the provider.
2//!
3//! This is the "business" layer of the HTTP crate: it receives a
4//! `RouteMatch`, parses the request body / query, calls into the
5//! `RustackCloudFront` provider, and turns the domain result into an HTTP
6//! response with correct status codes and ETag/Location headers.
7
8use std::sync::Arc;
9
10use bytes::Bytes;
11use http::{HeaderMap, HeaderValue, Method, Response, StatusCode, Uri};
12use rustack_cloudfront_core::RustackCloudFront;
13use rustack_cloudfront_model::{
14    CloudFrontError, CloudFrontOriginAccessIdentityConfig, FunctionConfig, KeyValueStore,
15};
16
17use crate::{
18    request::{
19        parse_cache_policy_config, parse_distribution_config, parse_fle_config,
20        parse_fle_profile_config, parse_function_config, parse_invalidation_batch,
21        parse_key_group_config, parse_oac_config, parse_oai_config,
22        parse_origin_request_policy_config, parse_public_key_config, parse_realtime_log_config,
23        parse_response_headers_policy_config, parse_root, parse_tag_keys, parse_tag_set,
24    },
25    response::{bytes_response, empty_204, error_response, xml_response},
26    router::{Operation, PathParams, RouteMatch},
27    service::HttpBody,
28    xml::ser,
29};
30
31/// Handler trait — exists for symmetry with other services. The built-in
32/// implementation dispatches directly to `RustackCloudFront`.
33pub trait CloudFrontHandler: Send + Sync + 'static {
34    /// Get the underlying provider as an owned `Arc`.
35    ///
36    /// `create_distribution`, `copy_distribution`, `update_distribution`,
37    /// and `create_invalidation` take `self: &Arc<Self>` so they can spawn
38    /// propagation tasks that outlive the caller. The service layer stores
39    /// an `Arc<H>`; this method converts that into the owning `Arc<RustackCloudFront>`
40    /// needed by the provider.
41    fn provider_arc(&self) -> Arc<RustackCloudFront>;
42}
43
44/// The canonical handler: an `Arc<RustackCloudFront>` wrapped in the service's
45/// outer `Arc`. The double-Arc is required because the hyper service owns the
46/// handler by `Arc<H>` while the provider's methods need `Arc<RustackCloudFront>`.
47impl CloudFrontHandler for Arc<RustackCloudFront> {
48    fn provider_arc(&self) -> Arc<RustackCloudFront> {
49        Arc::clone(self)
50    }
51}
52
53/// Dispatch a parsed route to the provider and produce an HTTP response.
54#[allow(clippy::too_many_lines)]
55pub async fn dispatch(
56    handler: &dyn CloudFrontHandler,
57    route: RouteMatch,
58    uri: &Uri,
59    _headers: &HeaderMap,
60    if_match: Option<&str>,
61    body: Bytes,
62    request_id: &str,
63) -> Response<HttpBody> {
64    let provider = handler.provider_arc();
65    match handle(&provider, route, uri, if_match, body).await {
66        Ok(resp) => resp,
67        Err(err) => error_response(&err, request_id),
68    }
69}
70
71async fn handle(
72    provider: &Arc<RustackCloudFront>,
73    route: RouteMatch,
74    uri: &Uri,
75    if_match: Option<&str>,
76    body: Bytes,
77) -> Result<Response<HttpBody>, CloudFrontError> {
78    let p = &route.path_params;
79    match route.operation {
80        // Distribution
81        Operation::CreateDistribution | Operation::CreateDistributionWithTags => {
82            let root = parse_root(&body)?;
83            let (config_node, tags) = if root.name == "DistributionConfigWithTags" {
84                let cn = root.child("DistributionConfig").ok_or_else(|| {
85                    CloudFrontError::MalformedInput(
86                        "DistributionConfigWithTags missing DistributionConfig".into(),
87                    )
88                })?;
89                let tags = root.child("Tags").map(parse_tag_set).unwrap_or_default();
90                (cn.clone(), tags)
91            } else {
92                (root.clone(), Vec::new())
93            };
94            let cfg = parse_distribution_config(&config_node);
95            let dist = provider.create_distribution(cfg, tags)?;
96            let body = ser::distribution_xml(&dist);
97            let mut resp = xml_response(StatusCode::CREATED, body, Some(&dist.etag));
98            let loc = format!(
99                "https://cloudfront.amazonaws.com/2020-05-31/distribution/{}",
100                dist.id
101            );
102            if let Ok(hv) = HeaderValue::from_str(&loc) {
103                resp.headers_mut().insert(http::header::LOCATION, hv);
104            }
105            Ok(resp)
106        }
107        Operation::GetDistribution => {
108            let d = provider.get_distribution(&p.id)?;
109            Ok(xml_response(
110                StatusCode::OK,
111                ser::distribution_xml(&d),
112                Some(&d.etag),
113            ))
114        }
115        Operation::GetDistributionConfig => {
116            let d = provider.get_distribution(&p.id)?;
117            Ok(xml_response(
118                StatusCode::OK,
119                ser::distribution_config_xml(&d.config),
120                Some(&d.etag),
121            ))
122        }
123        Operation::UpdateDistribution => {
124            let root = parse_root(&body)?;
125            let cfg = parse_distribution_config(&root);
126            let d = provider.update_distribution(&p.id, if_match, cfg)?;
127            Ok(xml_response(
128                StatusCode::OK,
129                ser::distribution_xml(&d),
130                Some(&d.etag),
131            ))
132        }
133        Operation::DeleteDistribution => {
134            provider.delete_distribution(&p.id, if_match)?;
135            Ok(empty_204())
136        }
137        Operation::ListDistributions => {
138            let items = provider.list_distributions();
139            let max = query_max_items(uri);
140            Ok(xml_response(
141                StatusCode::OK,
142                ser::distribution_list_xml(&items, max),
143                None,
144            ))
145        }
146        Operation::CopyDistribution => {
147            let root = parse_root(&body)?;
148            let caller_ref = root.child_text("CallerReference").to_owned();
149            let staging = root.child_bool("Staging");
150            let d = provider.copy_distribution(&p.id, &caller_ref, staging)?;
151            Ok(xml_response(
152                StatusCode::CREATED,
153                ser::distribution_xml(&d),
154                Some(&d.etag),
155            ))
156        }
157
158        // Invalidation
159        Operation::CreateInvalidation => {
160            let root = parse_root(&body)?;
161            let batch = parse_invalidation_batch(&root);
162            let inv = provider.create_invalidation(&p.id, batch)?;
163            Ok(xml_response(
164                StatusCode::CREATED,
165                ser::invalidation_xml(&inv),
166                None,
167            ))
168        }
169        Operation::GetInvalidation => {
170            let inv = provider.get_invalidation(&p.id, &p.secondary_id)?;
171            Ok(xml_response(
172                StatusCode::OK,
173                ser::invalidation_xml(&inv),
174                None,
175            ))
176        }
177        Operation::ListInvalidations => {
178            let items = provider.list_invalidations(&p.id);
179            let max = query_max_items(uri);
180            Ok(xml_response(
181                StatusCode::OK,
182                ser::invalidation_list_xml(&items, max),
183                None,
184            ))
185        }
186
187        // OAC
188        Operation::CreateOriginAccessControl => {
189            let root = parse_root(&body)?;
190            let cfg = parse_oac_config(&root);
191            let o = provider.create_oac(cfg)?;
192            let xml = ser::oac_xml(&o);
193            let mut resp = xml_response(StatusCode::CREATED, xml, Some(&o.etag));
194            let loc = format!(
195                "https://cloudfront.amazonaws.com/2020-05-31/origin-access-control/{}",
196                o.id
197            );
198            if let Ok(hv) = HeaderValue::from_str(&loc) {
199                resp.headers_mut().insert(http::header::LOCATION, hv);
200            }
201            Ok(resp)
202        }
203        Operation::GetOriginAccessControl => {
204            let o = provider.get_oac(&p.id)?;
205            Ok(xml_response(
206                StatusCode::OK,
207                ser::oac_xml(&o),
208                Some(&o.etag),
209            ))
210        }
211        Operation::GetOriginAccessControlConfig => {
212            let o = provider.get_oac(&p.id)?;
213            Ok(xml_response(
214                StatusCode::OK,
215                ser::oac_config_xml(&o.config),
216                Some(&o.etag),
217            ))
218        }
219        Operation::UpdateOriginAccessControl => {
220            let root = parse_root(&body)?;
221            let cfg = parse_oac_config(&root);
222            let o = provider.update_oac(&p.id, if_match, cfg)?;
223            Ok(xml_response(
224                StatusCode::OK,
225                ser::oac_xml(&o),
226                Some(&o.etag),
227            ))
228        }
229        Operation::DeleteOriginAccessControl => {
230            provider.delete_oac(&p.id, if_match)?;
231            Ok(empty_204())
232        }
233        Operation::ListOriginAccessControls => {
234            let items = provider.list_oacs();
235            let max = query_max_items(uri);
236            Ok(xml_response(
237                StatusCode::OK,
238                ser::oac_list_xml(&items, max),
239                None,
240            ))
241        }
242
243        // OAI
244        Operation::CreateCloudFrontOriginAccessIdentity => {
245            let root = parse_root(&body)?;
246            let cfg: CloudFrontOriginAccessIdentityConfig = parse_oai_config(&root);
247            let o = provider.create_oai(cfg)?;
248            let mut resp = xml_response(StatusCode::CREATED, ser::oai_xml(&o), Some(&o.etag));
249            let loc = format!(
250                "https://cloudfront.amazonaws.com/2020-05-31/origin-access-identity/cloudfront/{}",
251                o.id
252            );
253            if let Ok(hv) = HeaderValue::from_str(&loc) {
254                resp.headers_mut().insert(http::header::LOCATION, hv);
255            }
256            Ok(resp)
257        }
258        Operation::GetCloudFrontOriginAccessIdentity => {
259            let o = provider.get_oai(&p.id)?;
260            Ok(xml_response(
261                StatusCode::OK,
262                ser::oai_xml(&o),
263                Some(&o.etag),
264            ))
265        }
266        Operation::GetCloudFrontOriginAccessIdentityConfig => {
267            let o = provider.get_oai(&p.id)?;
268            Ok(xml_response(
269                StatusCode::OK,
270                ser::oai_config_xml(&o.config),
271                Some(&o.etag),
272            ))
273        }
274        Operation::UpdateCloudFrontOriginAccessIdentity => {
275            let root = parse_root(&body)?;
276            let cfg = parse_oai_config(&root);
277            let o = provider.update_oai(&p.id, if_match, cfg)?;
278            Ok(xml_response(
279                StatusCode::OK,
280                ser::oai_xml(&o),
281                Some(&o.etag),
282            ))
283        }
284        Operation::DeleteCloudFrontOriginAccessIdentity => {
285            provider.delete_oai(&p.id, if_match)?;
286            Ok(empty_204())
287        }
288        Operation::ListCloudFrontOriginAccessIdentities => {
289            let items = provider.list_oais();
290            Ok(xml_response(
291                StatusCode::OK,
292                ser::oai_list_xml(&items, query_max_items(uri)),
293                None,
294            ))
295        }
296
297        // Cache policies
298        Operation::CreateCachePolicy => {
299            let root = parse_root(&body)?;
300            let cfg = parse_cache_policy_config(&root);
301            let p = provider.create_cache_policy(cfg)?;
302            Ok(xml_response(
303                StatusCode::CREATED,
304                ser::cache_policy_xml(&p),
305                Some(&p.etag),
306            ))
307        }
308        Operation::GetCachePolicy => {
309            let pol = provider.get_cache_policy(&p.id)?;
310            Ok(xml_response(
311                StatusCode::OK,
312                ser::cache_policy_xml(&pol),
313                Some(&pol.etag),
314            ))
315        }
316        Operation::GetCachePolicyConfig => {
317            let pol = provider.get_cache_policy(&p.id)?;
318            Ok(xml_response(
319                StatusCode::OK,
320                ser::cache_policy_config_xml(&pol.config),
321                Some(&pol.etag),
322            ))
323        }
324        Operation::UpdateCachePolicy => {
325            let root = parse_root(&body)?;
326            let cfg = parse_cache_policy_config(&root);
327            let pol = provider.update_cache_policy(&p.id, if_match, cfg)?;
328            Ok(xml_response(
329                StatusCode::OK,
330                ser::cache_policy_xml(&pol),
331                Some(&pol.etag),
332            ))
333        }
334        Operation::DeleteCachePolicy => {
335            provider.delete_cache_policy(&p.id, if_match)?;
336            Ok(empty_204())
337        }
338        Operation::ListCachePolicies => {
339            let items = provider.list_cache_policies();
340            Ok(xml_response(
341                StatusCode::OK,
342                ser::cache_policy_list_xml(&items, query_max_items(uri)),
343                None,
344            ))
345        }
346
347        // Origin request policy
348        Operation::CreateOriginRequestPolicy => {
349            let root = parse_root(&body)?;
350            let cfg = parse_origin_request_policy_config(&root);
351            let pol = provider.create_origin_request_policy(cfg)?;
352            Ok(xml_response(
353                StatusCode::CREATED,
354                ser::origin_request_policy_xml(&pol),
355                Some(&pol.etag),
356            ))
357        }
358        Operation::GetOriginRequestPolicy => {
359            let pol = provider.get_origin_request_policy(&p.id)?;
360            Ok(xml_response(
361                StatusCode::OK,
362                ser::origin_request_policy_xml(&pol),
363                Some(&pol.etag),
364            ))
365        }
366        Operation::GetOriginRequestPolicyConfig => {
367            let pol = provider.get_origin_request_policy(&p.id)?;
368            Ok(xml_response(
369                StatusCode::OK,
370                ser::origin_request_policy_config_xml(&pol.config),
371                Some(&pol.etag),
372            ))
373        }
374        Operation::UpdateOriginRequestPolicy => {
375            let root = parse_root(&body)?;
376            let cfg = parse_origin_request_policy_config(&root);
377            let pol = provider.update_origin_request_policy(&p.id, if_match, cfg)?;
378            Ok(xml_response(
379                StatusCode::OK,
380                ser::origin_request_policy_xml(&pol),
381                Some(&pol.etag),
382            ))
383        }
384        Operation::DeleteOriginRequestPolicy => {
385            provider.delete_origin_request_policy(&p.id, if_match)?;
386            Ok(empty_204())
387        }
388        Operation::ListOriginRequestPolicies => {
389            let items = provider.list_origin_request_policies();
390            Ok(xml_response(
391                StatusCode::OK,
392                ser::origin_request_policy_list_xml(&items, query_max_items(uri)),
393                None,
394            ))
395        }
396
397        // Response headers policy
398        Operation::CreateResponseHeadersPolicy => {
399            let root = parse_root(&body)?;
400            let cfg = parse_response_headers_policy_config(&root);
401            let pol = provider.create_response_headers_policy(cfg)?;
402            Ok(xml_response(
403                StatusCode::CREATED,
404                ser::response_headers_policy_xml(&pol),
405                Some(&pol.etag),
406            ))
407        }
408        Operation::GetResponseHeadersPolicy => {
409            let pol = provider.get_response_headers_policy(&p.id)?;
410            Ok(xml_response(
411                StatusCode::OK,
412                ser::response_headers_policy_xml(&pol),
413                Some(&pol.etag),
414            ))
415        }
416        Operation::GetResponseHeadersPolicyConfig => {
417            let pol = provider.get_response_headers_policy(&p.id)?;
418            Ok(xml_response(
419                StatusCode::OK,
420                ser::response_headers_policy_config_xml(&pol.config),
421                Some(&pol.etag),
422            ))
423        }
424        Operation::UpdateResponseHeadersPolicy => {
425            let root = parse_root(&body)?;
426            let cfg = parse_response_headers_policy_config(&root);
427            let pol = provider.update_response_headers_policy(&p.id, if_match, cfg)?;
428            Ok(xml_response(
429                StatusCode::OK,
430                ser::response_headers_policy_xml(&pol),
431                Some(&pol.etag),
432            ))
433        }
434        Operation::DeleteResponseHeadersPolicy => {
435            provider.delete_response_headers_policy(&p.id, if_match)?;
436            Ok(empty_204())
437        }
438        Operation::ListResponseHeadersPolicies => {
439            let items = provider.list_response_headers_policies();
440            Ok(xml_response(
441                StatusCode::OK,
442                ser::response_headers_policy_list_xml(&items, query_max_items(uri)),
443                None,
444            ))
445        }
446
447        // Key group
448        Operation::CreateKeyGroup => {
449            let root = parse_root(&body)?;
450            let cfg = parse_key_group_config(&root);
451            let kg = provider.create_key_group(cfg)?;
452            Ok(xml_response(
453                StatusCode::CREATED,
454                ser::key_group_xml(&kg),
455                Some(&kg.etag),
456            ))
457        }
458        Operation::GetKeyGroup | Operation::GetKeyGroupConfig => {
459            let kg = provider.get_key_group(&p.id)?;
460            Ok(xml_response(
461                StatusCode::OK,
462                ser::key_group_xml(&kg),
463                Some(&kg.etag),
464            ))
465        }
466        Operation::UpdateKeyGroup => {
467            let root = parse_root(&body)?;
468            let cfg = parse_key_group_config(&root);
469            let kg = provider.update_key_group(&p.id, if_match, cfg)?;
470            Ok(xml_response(
471                StatusCode::OK,
472                ser::key_group_xml(&kg),
473                Some(&kg.etag),
474            ))
475        }
476        Operation::DeleteKeyGroup => {
477            provider.delete_key_group(&p.id, if_match)?;
478            Ok(empty_204())
479        }
480        Operation::ListKeyGroups => {
481            let items = provider.list_key_groups();
482            Ok(xml_response(
483                StatusCode::OK,
484                ser::key_group_list_xml(&items, query_max_items(uri)),
485                None,
486            ))
487        }
488
489        // Public key
490        Operation::CreatePublicKey => {
491            let root = parse_root(&body)?;
492            let cfg = parse_public_key_config(&root);
493            let k = provider.create_public_key(cfg)?;
494            Ok(xml_response(
495                StatusCode::CREATED,
496                ser::public_key_xml(&k),
497                Some(&k.etag),
498            ))
499        }
500        Operation::GetPublicKey | Operation::GetPublicKeyConfig => {
501            let k = provider.get_public_key(&p.id)?;
502            Ok(xml_response(
503                StatusCode::OK,
504                ser::public_key_xml(&k),
505                Some(&k.etag),
506            ))
507        }
508        Operation::UpdatePublicKey => {
509            let root = parse_root(&body)?;
510            let cfg = parse_public_key_config(&root);
511            let k = provider.update_public_key(&p.id, if_match, cfg)?;
512            Ok(xml_response(
513                StatusCode::OK,
514                ser::public_key_xml(&k),
515                Some(&k.etag),
516            ))
517        }
518        Operation::DeletePublicKey => {
519            provider.delete_public_key(&p.id, if_match)?;
520            Ok(empty_204())
521        }
522        Operation::ListPublicKeys => {
523            let items = provider.list_public_keys();
524            Ok(xml_response(
525                StatusCode::OK,
526                ser::public_key_list_xml(&items, query_max_items(uri)),
527                None,
528            ))
529        }
530
531        // Functions
532        Operation::CreateFunction => {
533            let root = parse_root(&body)?;
534            let name = root.child_text("Name").to_owned();
535            let cfg_node = root.child("FunctionConfig");
536            let cfg: FunctionConfig = cfg_node.map(parse_function_config).unwrap_or_default();
537            let code = base64_decode_or_raw(root.child_text("FunctionCode"));
538            let f = provider.create_function(name, cfg, code)?;
539            Ok(xml_response(
540                StatusCode::CREATED,
541                ser::function_xml(&f),
542                Some(&f.etag),
543            ))
544        }
545        Operation::DescribeFunction => {
546            let f = provider.get_function(&route.path_params.name)?;
547            Ok(xml_response(
548                StatusCode::OK,
549                ser::function_xml(&f),
550                Some(&f.etag),
551            ))
552        }
553        Operation::GetFunction => {
554            let f = provider.get_function(&route.path_params.name)?;
555            Ok(bytes_response(
556                StatusCode::OK,
557                Bytes::from(f.code),
558                "application/octet-stream",
559                Some(&f.etag),
560            ))
561        }
562        Operation::UpdateFunction => {
563            let root = parse_root(&body)?;
564            let cfg = root
565                .child("FunctionConfig")
566                .map(parse_function_config)
567                .unwrap_or_default();
568            let code = base64_decode_or_raw(root.child_text("FunctionCode"));
569            let f = provider.update_function(&route.path_params.name, if_match, cfg, code)?;
570            Ok(xml_response(
571                StatusCode::OK,
572                ser::function_xml(&f),
573                Some(&f.etag),
574            ))
575        }
576        Operation::DeleteFunction => {
577            provider.delete_function(&route.path_params.name, if_match)?;
578            Ok(empty_204())
579        }
580        Operation::PublishFunction => {
581            let f = provider.publish_function(&route.path_params.name, if_match)?;
582            Ok(xml_response(
583                StatusCode::OK,
584                ser::function_xml(&f),
585                Some(&f.etag),
586            ))
587        }
588        Operation::TestFunction => {
589            let root = parse_root(&body)?;
590            let event = root.child_text("EventObject").as_bytes().to_vec();
591            let (result, util) = provider.test_function(&route.path_params.name, &event)?;
592            let s = String::from_utf8_lossy(&result);
593            let mut w = crate::xml::XmlWriter::new();
594            w.declaration();
595            w.open_root(
596                "TestResult",
597                Some(rustack_cloudfront_model::CLOUDFRONT_XML_NAMESPACE),
598            );
599            w.element("FunctionSummary", "");
600            w.element("ComputeUtilization", &util);
601            w.open("FunctionExecutionLogList");
602            w.close("FunctionExecutionLogList");
603            w.element("FunctionErrorMessage", "");
604            w.element("FunctionOutput", &s);
605            w.close("TestResult");
606            Ok(xml_response(StatusCode::OK, w.finish(), None))
607        }
608        Operation::ListFunctions => {
609            let items = provider.list_functions();
610            Ok(xml_response(
611                StatusCode::OK,
612                ser::function_list_xml(&items, query_max_items(uri)),
613                None,
614            ))
615        }
616
617        // FLE
618        Operation::CreateFieldLevelEncryptionConfig => {
619            let root = parse_root(&body)?;
620            let cfg = parse_fle_config(&root);
621            let f = provider.create_fle_config(cfg)?;
622            Ok(xml_response(
623                StatusCode::CREATED,
624                ser::fle_xml(&f),
625                Some(&f.etag),
626            ))
627        }
628        Operation::GetFieldLevelEncryption | Operation::GetFieldLevelEncryptionConfig => {
629            let f = provider.get_fle_config(&p.id)?;
630            Ok(xml_response(
631                StatusCode::OK,
632                ser::fle_xml(&f),
633                Some(&f.etag),
634            ))
635        }
636        Operation::UpdateFieldLevelEncryptionConfig => {
637            let root = parse_root(&body)?;
638            let cfg = parse_fle_config(&root);
639            let f = provider.update_fle_config(&p.id, if_match, cfg)?;
640            Ok(xml_response(
641                StatusCode::OK,
642                ser::fle_xml(&f),
643                Some(&f.etag),
644            ))
645        }
646        Operation::DeleteFieldLevelEncryptionConfig => {
647            provider.delete_fle_config(&p.id, if_match)?;
648            Ok(empty_204())
649        }
650        Operation::ListFieldLevelEncryptionConfigs => {
651            let items = provider.list_fle_configs();
652            let mut w = crate::xml::XmlWriter::new();
653            w.declaration();
654            w.open_root(
655                "FieldLevelEncryptionList",
656                Some(rustack_cloudfront_model::CLOUDFRONT_XML_NAMESPACE),
657            );
658            w.element_display("Quantity", items.len());
659            w.close("FieldLevelEncryptionList");
660            Ok(xml_response(StatusCode::OK, w.finish(), None))
661        }
662        Operation::CreateFieldLevelEncryptionProfile => {
663            let root = parse_root(&body)?;
664            let cfg = parse_fle_profile_config(&root);
665            let f = provider.create_fle_profile(cfg)?;
666            Ok(xml_response(
667                StatusCode::CREATED,
668                ser::fle_profile_xml(&f),
669                Some(&f.etag),
670            ))
671        }
672        Operation::GetFieldLevelEncryptionProfile
673        | Operation::GetFieldLevelEncryptionProfileConfig => {
674            let f = provider.get_fle_profile(&p.id)?;
675            Ok(xml_response(
676                StatusCode::OK,
677                ser::fle_profile_xml(&f),
678                Some(&f.etag),
679            ))
680        }
681        Operation::UpdateFieldLevelEncryptionProfile => {
682            let root = parse_root(&body)?;
683            let cfg = parse_fle_profile_config(&root);
684            let f = provider.update_fle_profile(&p.id, if_match, cfg)?;
685            Ok(xml_response(
686                StatusCode::OK,
687                ser::fle_profile_xml(&f),
688                Some(&f.etag),
689            ))
690        }
691        Operation::DeleteFieldLevelEncryptionProfile => {
692            provider.delete_fle_profile(&p.id, if_match)?;
693            Ok(empty_204())
694        }
695        Operation::ListFieldLevelEncryptionProfiles => {
696            let items = provider.list_fle_profiles();
697            let mut w = crate::xml::XmlWriter::new();
698            w.declaration();
699            w.open_root(
700                "FieldLevelEncryptionProfileList",
701                Some(rustack_cloudfront_model::CLOUDFRONT_XML_NAMESPACE),
702            );
703            w.element_display("Quantity", items.len());
704            w.close("FieldLevelEncryptionProfileList");
705            Ok(xml_response(StatusCode::OK, w.finish(), None))
706        }
707
708        // Monitoring subscription
709        Operation::CreateMonitoringSubscription => {
710            let root = parse_root(&body)?;
711            let enabled = root
712                .child("RealtimeMetricsSubscriptionConfig")
713                .map(|r| r.child_text("RealtimeMetricsSubscriptionStatus") == "Enabled")
714                .unwrap_or(false);
715            let sub = provider.create_monitoring_subscription(&p.id, enabled)?;
716            let mut w = crate::xml::XmlWriter::new();
717            w.declaration();
718            w.open_root(
719                "MonitoringSubscription",
720                Some(rustack_cloudfront_model::CLOUDFRONT_XML_NAMESPACE),
721            );
722            w.open("RealtimeMetricsSubscriptionConfig");
723            w.element(
724                "RealtimeMetricsSubscriptionStatus",
725                &sub.realtime_metrics_subscription_status,
726            );
727            w.close("RealtimeMetricsSubscriptionConfig");
728            w.close("MonitoringSubscription");
729            Ok(xml_response(StatusCode::CREATED, w.finish(), None))
730        }
731        Operation::GetMonitoringSubscription => {
732            let sub = provider.get_monitoring_subscription(&p.id)?;
733            let mut w = crate::xml::XmlWriter::new();
734            w.declaration();
735            w.open_root(
736                "MonitoringSubscription",
737                Some(rustack_cloudfront_model::CLOUDFRONT_XML_NAMESPACE),
738            );
739            w.open("RealtimeMetricsSubscriptionConfig");
740            w.element(
741                "RealtimeMetricsSubscriptionStatus",
742                &sub.realtime_metrics_subscription_status,
743            );
744            w.close("RealtimeMetricsSubscriptionConfig");
745            w.close("MonitoringSubscription");
746            Ok(xml_response(StatusCode::OK, w.finish(), None))
747        }
748        Operation::DeleteMonitoringSubscription => {
749            provider.delete_monitoring_subscription(&p.id)?;
750            Ok(empty_204())
751        }
752
753        // KVS
754        Operation::CreateKeyValueStore => {
755            let root = parse_root(&body)?;
756            let name = root.child_text("Name").to_owned();
757            let comment = root.child_text("Comment").to_owned();
758            let k: KeyValueStore = provider.create_kvs(name, comment)?;
759            Ok(xml_response(
760                StatusCode::CREATED,
761                ser::kvs_xml(&k),
762                Some(&k.etag),
763            ))
764        }
765        Operation::DescribeKeyValueStore => {
766            let k = provider.get_kvs(&p.id)?;
767            Ok(xml_response(
768                StatusCode::OK,
769                ser::kvs_xml(&k),
770                Some(&k.etag),
771            ))
772        }
773        Operation::UpdateKeyValueStore => {
774            let root = parse_root(&body)?;
775            let comment = root.child_text("Comment").to_owned();
776            let k = provider.update_kvs(&p.id, if_match, comment)?;
777            Ok(xml_response(
778                StatusCode::OK,
779                ser::kvs_xml(&k),
780                Some(&k.etag),
781            ))
782        }
783        Operation::DeleteKeyValueStore => {
784            provider.delete_kvs(&p.id, if_match)?;
785            Ok(empty_204())
786        }
787        Operation::ListKeyValueStores => {
788            let items = provider.list_kvs();
789            let mut w = crate::xml::XmlWriter::new();
790            w.declaration();
791            w.open_root(
792                "KeyValueStoreList",
793                Some(rustack_cloudfront_model::CLOUDFRONT_XML_NAMESPACE),
794            );
795            w.element_display("MaxItems", query_max_items(uri));
796            w.element_display("Quantity", items.len());
797            if !items.is_empty() {
798                w.open("Items");
799                for k in &items {
800                    w.open("KeyValueStore");
801                    w.element("Name", &k.name);
802                    w.element("Id", &k.id);
803                    w.element("Comment", &k.comment);
804                    w.element("ARN", &k.arn);
805                    w.element("Status", &k.status);
806                    w.element("LastModifiedTime", &ser::iso8601(&k.last_modified_time));
807                    w.close("KeyValueStore");
808                }
809                w.close("Items");
810            }
811            w.close("KeyValueStoreList");
812            Ok(xml_response(StatusCode::OK, w.finish(), None))
813        }
814
815        // Realtime log
816        Operation::CreateRealtimeLogConfig | Operation::UpdateRealtimeLogConfig => {
817            let root = parse_root(&body)?;
818            let cfg = parse_realtime_log_config(&root);
819            let r = match route.operation {
820                Operation::CreateRealtimeLogConfig => provider.create_realtime_log_config(cfg)?,
821                _ => provider.update_realtime_log_config(cfg)?,
822            };
823            Ok(xml_response(
824                StatusCode::OK,
825                ser::realtime_log_config_xml(&r),
826                None,
827            ))
828        }
829        Operation::GetRealtimeLogConfig => {
830            let root = parse_root(&body)?;
831            let name = root.child_text("Name");
832            let r = provider.get_realtime_log_config(name)?;
833            Ok(xml_response(
834                StatusCode::OK,
835                ser::realtime_log_config_xml(&r),
836                None,
837            ))
838        }
839        Operation::DeleteRealtimeLogConfig => {
840            // Delete is POST-ish with name in body normally; accept query
841            // `Name=` or body `<Name>`.
842            let name = if let Some(q) = uri.query() {
843                q.split('&')
844                    .find_map(|kv| kv.strip_prefix("Name="))
845                    .unwrap_or("")
846                    .to_owned()
847            } else {
848                let root = parse_root(&body)?;
849                root.child_text("Name").to_owned()
850            };
851            provider.delete_realtime_log_config(&name)?;
852            Ok(empty_204())
853        }
854        Operation::ListRealtimeLogConfigs => {
855            let items = provider.list_realtime_log_configs();
856            let mut w = crate::xml::XmlWriter::new();
857            w.declaration();
858            w.open_root(
859                "RealtimeLogConfigs",
860                Some(rustack_cloudfront_model::CLOUDFRONT_XML_NAMESPACE),
861            );
862            w.element_display("MaxItems", query_max_items(uri));
863            w.element_display("Quantity", items.len());
864            if !items.is_empty() {
865                w.open("Items");
866                for r in &items {
867                    w.raw(
868                        &ser::realtime_log_config_xml(r)
869                            .trim_start_matches("<?xml version=\"1.0\" encoding=\"UTF-8\"?>"),
870                    );
871                }
872                w.close("Items");
873            }
874            w.close("RealtimeLogConfigs");
875            Ok(xml_response(StatusCode::OK, w.finish(), None))
876        }
877
878        // Tagging
879        Operation::TagResource => {
880            let arn = query_resource(uri)?;
881            let root = parse_root(&body)?;
882            let tags = parse_tag_set(&root);
883            provider.tag_resource(&arn, &tags)?;
884            Ok(empty_204())
885        }
886        Operation::UntagResource => {
887            let arn = query_resource(uri)?;
888            let root = parse_root(&body)?;
889            let keys = parse_tag_keys(&root);
890            provider.untag_resource(&arn, &keys)?;
891            Ok(empty_204())
892        }
893        Operation::ListTagsForResource => {
894            let arn = query_resource(uri)?;
895            let tags = provider.list_tags_for_resource(&arn)?;
896            Ok(xml_response(StatusCode::OK, ser::tags_xml(&tags), None))
897        }
898
899        // Phase 4 stubs — return empty success/list responses.
900        op => stub_response(op, uri, &route.path_params),
901    }
902}
903
904fn base64_decode_or_raw(s: &str) -> Vec<u8> {
905    use base64::{Engine, engine::general_purpose::STANDARD};
906    STANDARD.decode(s).unwrap_or_else(|_| s.as_bytes().to_vec())
907}
908
909fn query_max_items(uri: &Uri) -> i32 {
910    uri.query()
911        .and_then(|q| {
912            q.split('&')
913                .find_map(|kv| kv.strip_prefix("MaxItems="))
914                .and_then(|v| v.parse().ok())
915        })
916        .unwrap_or(100)
917}
918
919fn query_resource(uri: &Uri) -> Result<String, CloudFrontError> {
920    let q = uri.query().unwrap_or("");
921    for kv in q.split('&') {
922        if let Some(v) = kv.strip_prefix("Resource=") {
923            return Ok(percent_decode(v));
924        }
925    }
926    Err(CloudFrontError::MissingArgument(
927        "Resource query parameter is required".into(),
928    ))
929}
930
931fn percent_decode(s: &str) -> String {
932    percent_encoding::percent_decode_str(s)
933        .decode_utf8_lossy()
934        .into_owned()
935}
936
937fn stub_response(
938    op: Operation,
939    _uri: &Uri,
940    _p: &PathParams,
941) -> Result<Response<HttpBody>, CloudFrontError> {
942    let root_name = match op {
943        Operation::ListConflictingAliases => "ConflictingAliasesList",
944        Operation::ListDistributionsByCachePolicyId
945        | Operation::ListDistributionsByKeyGroup
946        | Operation::ListDistributionsByOriginRequestPolicyId
947        | Operation::ListDistributionsByRealtimeLogConfig
948        | Operation::ListDistributionsByResponseHeadersPolicyId
949        | Operation::ListDistributionsByVpcOriginId
950        | Operation::ListDistributionsByWebACLId
951        | Operation::ListDistributionsByAnycastIpListId => "DistributionIdList",
952        Operation::ListStreamingDistributions => "StreamingDistributionList",
953        Operation::ListContinuousDeploymentPolicies => "ContinuousDeploymentPolicyList",
954        Operation::ListAnycastIpLists => "AnycastIpListCollection",
955        Operation::ListVpcOrigins => "VpcOriginList",
956        Operation::ListTrustStores => "TrustStoreList",
957        Operation::ListDomainConflicts => "DomainConflictList",
958        _ => "StubResult",
959    };
960    let mut w = crate::xml::XmlWriter::new();
961    w.declaration();
962    w.open_root(
963        root_name,
964        Some(rustack_cloudfront_model::CLOUDFRONT_XML_NAMESPACE),
965    );
966    w.element_display("Quantity", 0);
967    w.bool("IsTruncated", false);
968    w.element_display("MaxItems", 100);
969    w.close(root_name);
970    Ok(xml_response(StatusCode::OK, w.finish(), None))
971}
972
973// Silence unused import warnings when Method is not otherwise used in this file.
974#[allow(dead_code)]
975fn _force_method_usage(_m: Method) {}