Skip to main content

fakecloud_cloudfront/
functions_service.rs

1//! Handlers for CloudFront Batch 3 resources: Functions, Public Keys,
2//! Key Groups, Key Value Stores, Origin Access Identities (legacy),
3//! Monitoring Subscriptions.
4
5use base64::Engine;
6use chrono::Utc;
7use http::header::ETAG;
8use http::{HeaderMap, StatusCode};
9use uuid::Uuid;
10
11use fakecloud_core::service::{AwsRequest, AwsResponse, AwsServiceError};
12
13use crate::functions::{
14    CloudFrontOriginAccessIdentityConfig, FunctionConfig, ImportSource, KeyGroupConfig,
15    MonitoringSubscriptionBody, PublicKeyConfig, StoredFunction, StoredKeyGroup,
16    StoredKeyValueStore, StoredMonitoringSubscription, StoredOriginAccessIdentity, StoredPublicKey,
17};
18use crate::policies::{
19    not_found, precondition_failed, require_if_match, rfc3339, route_id, xml_with_etag,
20};
21use crate::router::Route;
22use crate::service::{
23    aws_error, esc, generate_id_with_prefix, invalid_argument, xml_response, CloudFrontService,
24    DEFAULT_ACCOUNT,
25};
26use crate::xml_io;
27
28const NS: &str = crate::NAMESPACE;
29const XML_DECL: &str = r#"<?xml version="1.0" encoding="UTF-8"?>"#;
30
31// ─── CloudFront Functions ─────────────────────────────────────────────
32
33impl CloudFrontService {
34    pub(crate) fn create_function(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
35        // Body shape: <CreateFunctionRequest><Name/><FunctionConfig/><FunctionCode/></CreateFunctionRequest>
36        let parsed: CreateFunctionRequest = xml_io::from_xml_root(&req.body)
37            .map_err(|e| invalid_argument(format!("invalid CreateFunctionRequest XML: {e}")))?;
38        if parsed.name.is_empty() {
39            return Err(invalid_argument("CreateFunctionRequest.Name is required"));
40        }
41
42        let mut state = self.state.write();
43        let account = state
44            .accounts
45            .entry(DEFAULT_ACCOUNT.to_string())
46            .or_default();
47        if account.functions.contains_key(&parsed.name) {
48            return Err(aws_error(
49                StatusCode::CONFLICT,
50                "FunctionAlreadyExists",
51                format!("Function {} already exists", parsed.name),
52            ));
53        }
54        let now = Utc::now();
55        let etag = generate_id_with_prefix("E");
56        let function_arn = format!(
57            "arn:aws:cloudfront::{}:function/{}",
58            DEFAULT_ACCOUNT, parsed.name
59        );
60        let stored = StoredFunction {
61            name: parsed.name.clone(),
62            etag: etag.clone(),
63            status: "UNPUBLISHED".to_string(),
64            stage: "DEVELOPMENT".to_string(),
65            function_arn: function_arn.clone(),
66            created_time: now,
67            last_modified_time: now,
68            config: parsed.function_config,
69            function_code: parsed.function_code,
70            // No published snapshot until PublishFunction is called.
71            live_function_code: None,
72        };
73        account
74            .functions
75            .insert(parsed.name.clone(), stored.clone());
76        drop(state);
77
78        let body = render_function_summary(&stored, "CreateFunctionResult");
79        Ok(xml_with_etag(StatusCode::CREATED, body, &etag, None))
80    }
81
82    pub(crate) fn describe_function(
83        &self,
84        req: &AwsRequest,
85        route: &Route,
86    ) -> Result<AwsResponse, AwsServiceError> {
87        let name = route_id(route, "Function")?;
88        let stage = parse_stage_query(&req.raw_query);
89        let state = self.state.read();
90        let f = state
91            .accounts
92            .get(DEFAULT_ACCOUNT)
93            .and_then(|a| a.functions.get(&name).cloned())
94            .ok_or_else(|| not_found("Function", &name))?;
95        drop(state);
96        let view = stage_view(&f, &stage);
97        let body = render_function_summary(&view, "DescribeFunctionResult");
98        Ok(xml_with_etag(StatusCode::OK, body, &view.etag, None))
99    }
100
101    pub(crate) fn get_function(
102        &self,
103        req: &AwsRequest,
104        route: &Route,
105    ) -> Result<AwsResponse, AwsServiceError> {
106        let name = route_id(route, "Function")?;
107        let stage = parse_stage_query(&req.raw_query);
108        let state = self.state.read();
109        let f = state
110            .accounts
111            .get(DEFAULT_ACCOUNT)
112            .and_then(|a| a.functions.get(&name).cloned())
113            .ok_or_else(|| not_found("Function", &name))?;
114        drop(state);
115        let view = stage_view(&f, &stage);
116        let mut headers = HeaderMap::new();
117        headers.insert(ETAG, view.etag.parse().unwrap());
118        let bytes = base64::engine::general_purpose::STANDARD
119            .decode(view.function_code.as_bytes())
120            .unwrap_or_default();
121        Ok(AwsResponse {
122            status: StatusCode::OK,
123            headers,
124            content_type: "application/octet-stream".to_string(),
125            body: fakecloud_core::service::ResponseBody::Bytes(bytes::Bytes::from(bytes)),
126        })
127    }
128
129    pub(crate) fn update_function(
130        &self,
131        req: &AwsRequest,
132        route: &Route,
133    ) -> Result<AwsResponse, AwsServiceError> {
134        let name = route_id(route, "Function")?;
135        let if_match = require_if_match(req)?;
136        let parsed: UpdateFunctionRequest = xml_io::from_xml_root(&req.body)
137            .map_err(|e| invalid_argument(format!("invalid UpdateFunctionRequest XML: {e}")))?;
138        let mut state = self.state.write();
139        let account = state
140            .accounts
141            .get_mut(DEFAULT_ACCOUNT)
142            .ok_or_else(|| not_found("Function", &name))?;
143        let f = account
144            .functions
145            .get_mut(&name)
146            .ok_or_else(|| not_found("Function", &name))?;
147        if f.etag != if_match {
148            return Err(precondition_failed());
149        }
150        f.config = parsed.function_config;
151        f.function_code = parsed.function_code;
152        f.etag = generate_id_with_prefix("E");
153        f.last_modified_time = Utc::now();
154        f.status = "UNPUBLISHED".to_string();
155        f.stage = "DEVELOPMENT".to_string();
156        let snap = f.clone();
157        drop(state);
158        let body = render_function_summary(&snap, "UpdateFunctionResult");
159        // SDK has a known typo on UpdateFunctionOutput: it deserializes
160        // the etag from header `ETtag`, not `ETag`. Send both so any
161        // SDK version can read it.
162        let mut headers = HeaderMap::new();
163        if let Ok(v) = http::HeaderValue::from_str(&snap.etag) {
164            headers.insert(ETAG, v.clone());
165            headers.insert("ETtag", v);
166        }
167        Ok(xml_response(StatusCode::OK, body, headers))
168    }
169
170    pub(crate) fn delete_function(
171        &self,
172        req: &AwsRequest,
173        route: &Route,
174    ) -> Result<AwsResponse, AwsServiceError> {
175        let name = route_id(route, "Function")?;
176        let if_match = require_if_match(req)?;
177        let mut state = self.state.write();
178        let account = state
179            .accounts
180            .get_mut(DEFAULT_ACCOUNT)
181            .ok_or_else(|| not_found("Function", &name))?;
182        let f = account
183            .functions
184            .get(&name)
185            .ok_or_else(|| not_found("Function", &name))?;
186        if f.etag != if_match {
187            return Err(precondition_failed());
188        }
189        account.functions.remove(&name);
190        drop(state);
191        Ok(crate::policies::empty(StatusCode::NO_CONTENT))
192    }
193
194    pub(crate) fn list_functions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
195        let stage = parse_stage_query(&req.raw_query);
196        let state = self.state.read();
197        let mut items: Vec<StoredFunction> = state
198            .accounts
199            .get(DEFAULT_ACCOUNT)
200            .map(|a| a.functions.values().cloned().collect())
201            .unwrap_or_default();
202        drop(state);
203        items.sort_by(|a, b| a.name.cmp(&b.name));
204
205        let mut body = String::with_capacity(512);
206        body.push_str(XML_DECL);
207        body.push_str(&format!("<FunctionList xmlns=\"{NS}\">"));
208        body.push_str("<Marker></Marker>");
209        body.push_str("<MaxItems>100</MaxItems>");
210        body.push_str(&format!("<Quantity>{}</Quantity>", items.len()));
211        body.push_str("<Items>");
212        for f in &items {
213            let view = stage_view(f, &stage);
214            body.push_str(&render_function_summary_inner(&view));
215        }
216        body.push_str("</Items>");
217        body.push_str("</FunctionList>");
218        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
219    }
220
221    pub(crate) fn publish_function(
222        &self,
223        req: &AwsRequest,
224        route: &Route,
225    ) -> Result<AwsResponse, AwsServiceError> {
226        let name = route_id(route, "Function")?;
227        let if_match = require_if_match(req)?;
228        let mut state = self.state.write();
229        let account = state
230            .accounts
231            .get_mut(DEFAULT_ACCOUNT)
232            .ok_or_else(|| not_found("Function", &name))?;
233        let f = account
234            .functions
235            .get_mut(&name)
236            .ok_or_else(|| not_found("Function", &name))?;
237        if f.etag != if_match {
238            return Err(precondition_failed());
239        }
240        f.status = "DEPLOYED".to_string();
241        f.stage = "LIVE".to_string();
242        f.last_modified_time = Utc::now();
243        // Freeze the current development code as the LIVE snapshot.
244        // Subsequent UpdateFunction calls mutate `function_code` but
245        // leave this alone, so `TestFunction(Stage=LIVE)` keeps running
246        // the published version until the next Publish.
247        f.live_function_code = Some(f.function_code.clone());
248        let snap = f.clone();
249        drop(state);
250        let body = render_function_summary(&snap, "PublishFunctionResult");
251        Ok(xml_with_etag(StatusCode::OK, body, &snap.etag, None))
252    }
253
254    pub(crate) fn test_function(
255        &self,
256        req: &AwsRequest,
257        route: &Route,
258    ) -> Result<AwsResponse, AwsServiceError> {
259        let name = route_id(route, "Function")?;
260        let if_match = require_if_match(req)?;
261        let parsed: TestFunctionRequest = xml_io::from_xml_root(&req.body)
262            .map_err(|e| invalid_argument(format!("invalid TestFunctionRequest XML: {e}")))?;
263        let event_bytes = base64::engine::general_purpose::STANDARD
264            .decode(parsed.event_object.trim().as_bytes())
265            .map_err(|e| invalid_argument(format!("EventObject is not valid base64: {e}")))?;
266
267        let state = self.state.read();
268        let f = state
269            .accounts
270            .get(DEFAULT_ACCOUNT)
271            .and_then(|a| a.functions.get(&name).cloned())
272            .ok_or_else(|| {
273                aws_error(
274                    StatusCode::NOT_FOUND,
275                    "NoSuchFunctionExists",
276                    format!("The specified function does not exist: {name}"),
277                )
278            })?;
279        drop(state);
280        if f.etag != if_match {
281            return Err(precondition_failed());
282        }
283
284        // AWS lets callers pick which version to run. DEVELOPMENT (the
285        // default) is the latest CreateFunction / UpdateFunction body;
286        // LIVE is the snapshot taken at PublishFunction. Falling back to
287        // DEVELOPMENT when no snapshot exists matches the AWS error
288        // shape ("function not yet published") less closely, but is
289        // strictly nicer for tests against unpublished functions.
290        let stage = parsed.stage.as_deref().unwrap_or("DEVELOPMENT");
291        let source_b64 = if stage.eq_ignore_ascii_case("LIVE") {
292            f.live_function_code.as_deref().unwrap_or(&f.function_code)
293        } else {
294            f.function_code.as_str()
295        };
296        let code_bytes = base64::engine::general_purpose::STANDARD
297            .decode(source_b64.as_bytes())
298            .unwrap_or_else(|_| source_b64.as_bytes().to_vec());
299        let code = String::from_utf8(code_bytes)
300            .map_err(|e| invalid_argument(format!("function code is not valid UTF-8: {e}")))?;
301        let exec = crate::js_runtime::run_handler(&code, &event_bytes);
302
303        let mut body = String::with_capacity(1024);
304        body.push_str(XML_DECL);
305        body.push_str(&format!("<TestResult xmlns=\"{NS}\">"));
306        body.push_str(&render_function_summary_inner(&f));
307        body.push_str(&format!(
308            "<ComputeUtilization>{}</ComputeUtilization>",
309            exec.compute_utilization
310        ));
311        body.push_str("<FunctionExecutionLogs>");
312        for line in &exec.logs {
313            body.push_str(&format!("<member>{}</member>", esc(line)));
314        }
315        body.push_str("</FunctionExecutionLogs>");
316        body.push_str(&format!(
317            "<FunctionErrorMessage>{}</FunctionErrorMessage>",
318            esc(exec.error.as_deref().unwrap_or(""))
319        ));
320        body.push_str(&format!(
321            "<FunctionOutput>{}</FunctionOutput>",
322            esc(exec.output.as_deref().unwrap_or(""))
323        ));
324        body.push_str("</TestResult>");
325        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
326    }
327}
328
329// ─── Public Keys ──────────────────────────────────────────────────────
330
331impl CloudFrontService {
332    pub(crate) fn create_public_key(
333        &self,
334        req: &AwsRequest,
335    ) -> Result<AwsResponse, AwsServiceError> {
336        let cfg: PublicKeyConfig = xml_io::from_xml_root(&req.body)
337            .map_err(|e| invalid_argument(format!("invalid PublicKeyConfig XML: {e}")))?;
338        if cfg.name.is_empty() {
339            return Err(invalid_argument("PublicKeyConfig.Name is required"));
340        }
341        if cfg.encoded_key.is_empty() {
342            return Err(invalid_argument("PublicKeyConfig.EncodedKey is required"));
343        }
344        let mut state = self.state.write();
345        let account = state
346            .accounts
347            .entry(DEFAULT_ACCOUNT.to_string())
348            .or_default();
349        if let Some(existing) = account
350            .public_keys
351            .values()
352            .find(|p| p.config.caller_reference == cfg.caller_reference)
353        {
354            return Err(aws_error(
355                StatusCode::CONFLICT,
356                "PublicKeyAlreadyExists",
357                format!(
358                    "PublicKey with same CallerReference exists: {}",
359                    existing.id
360                ),
361            ));
362        }
363        let id = generate_id_with_prefix("K");
364        let etag = generate_id_with_prefix("E");
365        let stored = StoredPublicKey {
366            id: id.clone(),
367            etag: etag.clone(),
368            created_time: Utc::now(),
369            config: cfg,
370        };
371        account.public_keys.insert(id.clone(), stored.clone());
372        drop(state);
373        let body = render_public_key(&stored, "PublicKey");
374        Ok(xml_with_etag(StatusCode::CREATED, body, &etag, Some(&id)))
375    }
376
377    pub(crate) fn get_public_key(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
378        let id = route_id(route, "PublicKey")?;
379        let state = self.state.read();
380        let p = state
381            .accounts
382            .get(DEFAULT_ACCOUNT)
383            .and_then(|a| a.public_keys.get(&id).cloned())
384            .ok_or_else(|| not_found("PublicKey", &id))?;
385        drop(state);
386        let body = render_public_key(&p, "PublicKey");
387        Ok(xml_with_etag(StatusCode::OK, body, &p.etag, None))
388    }
389
390    pub(crate) fn get_public_key_config(
391        &self,
392        route: &Route,
393    ) -> Result<AwsResponse, AwsServiceError> {
394        let id = route_id(route, "PublicKey")?;
395        let state = self.state.read();
396        let p = state
397            .accounts
398            .get(DEFAULT_ACCOUNT)
399            .and_then(|a| a.public_keys.get(&id).cloned())
400            .ok_or_else(|| not_found("PublicKey", &id))?;
401        drop(state);
402        let body = config_xml("PublicKeyConfig", &p.config)?;
403        Ok(xml_with_etag(StatusCode::OK, body, &p.etag, None))
404    }
405
406    pub(crate) fn update_public_key(
407        &self,
408        req: &AwsRequest,
409        route: &Route,
410    ) -> Result<AwsResponse, AwsServiceError> {
411        let id = route_id(route, "PublicKey")?;
412        let if_match = require_if_match(req)?;
413        let cfg: PublicKeyConfig = xml_io::from_xml_root(&req.body)
414            .map_err(|e| invalid_argument(format!("invalid PublicKeyConfig XML: {e}")))?;
415        if cfg.name.is_empty() {
416            return Err(invalid_argument("PublicKeyConfig.Name is required"));
417        }
418        let mut state = self.state.write();
419        let account = state
420            .accounts
421            .get_mut(DEFAULT_ACCOUNT)
422            .ok_or_else(|| not_found("PublicKey", &id))?;
423        let p = account
424            .public_keys
425            .get_mut(&id)
426            .ok_or_else(|| not_found("PublicKey", &id))?;
427        if p.etag != if_match {
428            return Err(precondition_failed());
429        }
430        // CallerReference is immutable per AWS.
431        if p.config.caller_reference != cfg.caller_reference {
432            return Err(invalid_argument(
433                "CallerReference cannot change on UpdatePublicKey",
434            ));
435        }
436        p.config = cfg;
437        p.etag = generate_id_with_prefix("E");
438        let snap = p.clone();
439        drop(state);
440        let body = render_public_key(&snap, "PublicKey");
441        Ok(xml_with_etag(StatusCode::OK, body, &snap.etag, None))
442    }
443
444    pub(crate) fn delete_public_key(
445        &self,
446        req: &AwsRequest,
447        route: &Route,
448    ) -> Result<AwsResponse, AwsServiceError> {
449        let id = route_id(route, "PublicKey")?;
450        let if_match = require_if_match(req)?;
451        let mut state = self.state.write();
452        let account = state
453            .accounts
454            .get_mut(DEFAULT_ACCOUNT)
455            .ok_or_else(|| not_found("PublicKey", &id))?;
456        let p = account
457            .public_keys
458            .get(&id)
459            .ok_or_else(|| not_found("PublicKey", &id))?;
460        if p.etag != if_match {
461            return Err(precondition_failed());
462        }
463        account.public_keys.remove(&id);
464        drop(state);
465        Ok(crate::policies::empty(StatusCode::NO_CONTENT))
466    }
467
468    pub(crate) fn list_public_keys(
469        &self,
470        _req: &AwsRequest,
471    ) -> Result<AwsResponse, AwsServiceError> {
472        let state = self.state.read();
473        let mut items: Vec<StoredPublicKey> = state
474            .accounts
475            .get(DEFAULT_ACCOUNT)
476            .map(|a| a.public_keys.values().cloned().collect())
477            .unwrap_or_default();
478        drop(state);
479        items.sort_by(|a, b| a.id.cmp(&b.id));
480
481        let mut body = String::with_capacity(512);
482        body.push_str(XML_DECL);
483        body.push_str(&format!("<PublicKeyList xmlns=\"{NS}\">"));
484        body.push_str("<Marker></Marker>");
485        body.push_str("<MaxItems>100</MaxItems>");
486        body.push_str(&format!("<Quantity>{}</Quantity>", items.len()));
487        body.push_str("<Items>");
488        for p in &items {
489            body.push_str("<PublicKeySummary>");
490            body.push_str(&format!("<Id>{}</Id>", esc(&p.id)));
491            body.push_str(&format!("<Name>{}</Name>", esc(&p.config.name)));
492            body.push_str(&format!(
493                "<CreatedTime>{}</CreatedTime>",
494                rfc3339(&p.created_time)
495            ));
496            body.push_str(&format!(
497                "<EncodedKey>{}</EncodedKey>",
498                esc(&p.config.encoded_key)
499            ));
500            if let Some(c) = &p.config.comment {
501                body.push_str(&format!("<Comment>{}</Comment>", esc(c)));
502            }
503            body.push_str("</PublicKeySummary>");
504        }
505        body.push_str("</Items>");
506        body.push_str("</PublicKeyList>");
507        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
508    }
509}
510
511// ─── Key Groups ───────────────────────────────────────────────────────
512
513impl CloudFrontService {
514    pub(crate) fn create_key_group(
515        &self,
516        req: &AwsRequest,
517    ) -> Result<AwsResponse, AwsServiceError> {
518        let cfg: KeyGroupConfig = xml_io::from_xml_root(&req.body)
519            .map_err(|e| invalid_argument(format!("invalid KeyGroupConfig XML: {e}")))?;
520        if cfg.name.is_empty() {
521            return Err(invalid_argument("KeyGroupConfig.Name is required"));
522        }
523        let mut state = self.state.write();
524        let account = state
525            .accounts
526            .entry(DEFAULT_ACCOUNT.to_string())
527            .or_default();
528        let id = generate_id_with_prefix("K");
529        let etag = generate_id_with_prefix("E");
530        let stored = StoredKeyGroup {
531            id: id.clone(),
532            etag: etag.clone(),
533            last_modified_time: Utc::now(),
534            config: cfg,
535        };
536        account.key_groups.insert(id.clone(), stored.clone());
537        drop(state);
538        let body = render_key_group(&stored, "KeyGroup");
539        Ok(xml_with_etag(StatusCode::CREATED, body, &etag, Some(&id)))
540    }
541
542    pub(crate) fn get_key_group(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
543        let id = route_id(route, "KeyGroup")?;
544        let state = self.state.read();
545        let g = state
546            .accounts
547            .get(DEFAULT_ACCOUNT)
548            .and_then(|a| a.key_groups.get(&id).cloned())
549            .ok_or_else(|| not_found("KeyGroup", &id))?;
550        drop(state);
551        let body = render_key_group(&g, "KeyGroup");
552        Ok(xml_with_etag(StatusCode::OK, body, &g.etag, None))
553    }
554
555    pub(crate) fn get_key_group_config(
556        &self,
557        route: &Route,
558    ) -> Result<AwsResponse, AwsServiceError> {
559        let id = route_id(route, "KeyGroup")?;
560        let state = self.state.read();
561        let g = state
562            .accounts
563            .get(DEFAULT_ACCOUNT)
564            .and_then(|a| a.key_groups.get(&id).cloned())
565            .ok_or_else(|| not_found("KeyGroup", &id))?;
566        drop(state);
567        let body = config_xml("KeyGroupConfig", &g.config)?;
568        Ok(xml_with_etag(StatusCode::OK, body, &g.etag, None))
569    }
570
571    pub(crate) fn update_key_group(
572        &self,
573        req: &AwsRequest,
574        route: &Route,
575    ) -> Result<AwsResponse, AwsServiceError> {
576        let id = route_id(route, "KeyGroup")?;
577        let if_match = require_if_match(req)?;
578        let cfg: KeyGroupConfig = xml_io::from_xml_root(&req.body)
579            .map_err(|e| invalid_argument(format!("invalid KeyGroupConfig XML: {e}")))?;
580        if cfg.name.is_empty() {
581            return Err(invalid_argument("KeyGroupConfig.Name is required"));
582        }
583        let mut state = self.state.write();
584        let account = state
585            .accounts
586            .get_mut(DEFAULT_ACCOUNT)
587            .ok_or_else(|| not_found("KeyGroup", &id))?;
588        let g = account
589            .key_groups
590            .get_mut(&id)
591            .ok_or_else(|| not_found("KeyGroup", &id))?;
592        if g.etag != if_match {
593            return Err(precondition_failed());
594        }
595        g.config = cfg;
596        g.etag = generate_id_with_prefix("E");
597        g.last_modified_time = Utc::now();
598        let snap = g.clone();
599        drop(state);
600        let body = render_key_group(&snap, "KeyGroup");
601        Ok(xml_with_etag(StatusCode::OK, body, &snap.etag, None))
602    }
603
604    pub(crate) fn delete_key_group(
605        &self,
606        req: &AwsRequest,
607        route: &Route,
608    ) -> Result<AwsResponse, AwsServiceError> {
609        let id = route_id(route, "KeyGroup")?;
610        let if_match = require_if_match(req)?;
611        let mut state = self.state.write();
612        let account = state
613            .accounts
614            .get_mut(DEFAULT_ACCOUNT)
615            .ok_or_else(|| not_found("KeyGroup", &id))?;
616        let g = account
617            .key_groups
618            .get(&id)
619            .ok_or_else(|| not_found("KeyGroup", &id))?;
620        if g.etag != if_match {
621            return Err(precondition_failed());
622        }
623        account.key_groups.remove(&id);
624        drop(state);
625        Ok(crate::policies::empty(StatusCode::NO_CONTENT))
626    }
627
628    pub(crate) fn list_key_groups(
629        &self,
630        _req: &AwsRequest,
631    ) -> Result<AwsResponse, AwsServiceError> {
632        let state = self.state.read();
633        let mut items: Vec<StoredKeyGroup> = state
634            .accounts
635            .get(DEFAULT_ACCOUNT)
636            .map(|a| a.key_groups.values().cloned().collect())
637            .unwrap_or_default();
638        drop(state);
639        items.sort_by(|a, b| a.config.name.cmp(&b.config.name));
640
641        let mut body = String::with_capacity(512);
642        body.push_str(XML_DECL);
643        body.push_str(&format!("<KeyGroupList xmlns=\"{NS}\">"));
644        body.push_str("<Marker></Marker>");
645        body.push_str("<MaxItems>100</MaxItems>");
646        body.push_str(&format!("<Quantity>{}</Quantity>", items.len()));
647        body.push_str("<Items>");
648        for g in &items {
649            body.push_str("<KeyGroupSummary>");
650            body.push_str("<KeyGroup>");
651            push_key_group_inner(&mut body, g);
652            body.push_str("</KeyGroup>");
653            body.push_str("</KeyGroupSummary>");
654        }
655        body.push_str("</Items>");
656        body.push_str("</KeyGroupList>");
657        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
658    }
659}
660
661// ─── Key Value Stores ─────────────────────────────────────────────────
662
663impl CloudFrontService {
664    pub(crate) fn create_key_value_store(
665        &self,
666        req: &AwsRequest,
667    ) -> Result<AwsResponse, AwsServiceError> {
668        let parsed: CreateKeyValueStoreRequest = xml_io::from_xml_root(&req.body)
669            .map_err(|e| invalid_argument(format!("invalid CreateKeyValueStore XML: {e}")))?;
670        if parsed.name.is_empty() {
671            return Err(invalid_argument("Name is required"));
672        }
673        let mut state = self.state.write();
674        let account = state
675            .accounts
676            .entry(DEFAULT_ACCOUNT.to_string())
677            .or_default();
678        if account.key_value_stores.contains_key(&parsed.name) {
679            return Err(aws_error(
680                StatusCode::CONFLICT,
681                "EntityAlreadyExists",
682                format!("KeyValueStore {} already exists", parsed.name),
683            ));
684        }
685        let now = Utc::now();
686        let id = Uuid::new_v4().to_string();
687        let etag = generate_id_with_prefix("E");
688        let arn = format!(
689            "arn:aws:cloudfront::{}:key-value-store/{}",
690            DEFAULT_ACCOUNT, id
691        );
692        let stored = StoredKeyValueStore {
693            name: parsed.name.clone(),
694            id,
695            etag: etag.clone(),
696            arn,
697            status: "READY".to_string(),
698            created_time: now,
699            last_modified_time: now,
700            comment: parsed.comment,
701            import_source: parsed.import_source,
702        };
703        account
704            .key_value_stores
705            .insert(parsed.name.clone(), stored.clone());
706        drop(state);
707        let body = render_key_value_store(&stored, "CreateKeyValueStoreResult");
708        Ok(xml_with_etag(StatusCode::CREATED, body, &etag, None))
709    }
710
711    pub(crate) fn describe_key_value_store(
712        &self,
713        route: &Route,
714    ) -> Result<AwsResponse, AwsServiceError> {
715        let name = route_id(route, "KeyValueStore")?;
716        let state = self.state.read();
717        let kvs = state
718            .accounts
719            .get(DEFAULT_ACCOUNT)
720            .and_then(|a| a.key_value_stores.get(&name).cloned())
721            .ok_or_else(|| not_found("KeyValueStore", &name))?;
722        drop(state);
723        let body = render_key_value_store(&kvs, "DescribeKeyValueStoreResult");
724        Ok(xml_with_etag(StatusCode::OK, body, &kvs.etag, None))
725    }
726
727    pub(crate) fn update_key_value_store(
728        &self,
729        req: &AwsRequest,
730        route: &Route,
731    ) -> Result<AwsResponse, AwsServiceError> {
732        let name = route_id(route, "KeyValueStore")?;
733        let if_match = require_if_match(req)?;
734        let parsed: UpdateKeyValueStoreRequest = xml_io::from_xml_root(&req.body)
735            .map_err(|e| invalid_argument(format!("invalid UpdateKeyValueStore XML: {e}")))?;
736        let mut state = self.state.write();
737        let account = state
738            .accounts
739            .get_mut(DEFAULT_ACCOUNT)
740            .ok_or_else(|| not_found("KeyValueStore", &name))?;
741        let kvs = account
742            .key_value_stores
743            .get_mut(&name)
744            .ok_or_else(|| not_found("KeyValueStore", &name))?;
745        if kvs.etag != if_match {
746            return Err(precondition_failed());
747        }
748        kvs.comment = Some(parsed.comment);
749        kvs.etag = generate_id_with_prefix("E");
750        kvs.last_modified_time = Utc::now();
751        let snap = kvs.clone();
752        drop(state);
753        let body = render_key_value_store(&snap, "UpdateKeyValueStoreResult");
754        Ok(xml_with_etag(StatusCode::OK, body, &snap.etag, None))
755    }
756
757    pub(crate) fn delete_key_value_store(
758        &self,
759        req: &AwsRequest,
760        route: &Route,
761    ) -> Result<AwsResponse, AwsServiceError> {
762        let name = route_id(route, "KeyValueStore")?;
763        let if_match = require_if_match(req)?;
764        let mut state = self.state.write();
765        let account = state
766            .accounts
767            .get_mut(DEFAULT_ACCOUNT)
768            .ok_or_else(|| not_found("KeyValueStore", &name))?;
769        let kvs = account
770            .key_value_stores
771            .get(&name)
772            .ok_or_else(|| not_found("KeyValueStore", &name))?;
773        if kvs.etag != if_match {
774            return Err(precondition_failed());
775        }
776        account.key_value_stores.remove(&name);
777        drop(state);
778        Ok(crate::policies::empty(StatusCode::NO_CONTENT))
779    }
780
781    pub(crate) fn list_key_value_stores(
782        &self,
783        _req: &AwsRequest,
784    ) -> Result<AwsResponse, AwsServiceError> {
785        let state = self.state.read();
786        let mut items: Vec<StoredKeyValueStore> = state
787            .accounts
788            .get(DEFAULT_ACCOUNT)
789            .map(|a| a.key_value_stores.values().cloned().collect())
790            .unwrap_or_default();
791        drop(state);
792        items.sort_by(|a, b| a.name.cmp(&b.name));
793
794        let mut body = String::with_capacity(512);
795        body.push_str(XML_DECL);
796        body.push_str(&format!("<KeyValueStoreList xmlns=\"{NS}\">"));
797        body.push_str("<NextMarker></NextMarker>");
798        body.push_str("<MaxItems>100</MaxItems>");
799        body.push_str(&format!("<Quantity>{}</Quantity>", items.len()));
800        body.push_str("<Items>");
801        for kvs in &items {
802            body.push_str("<KeyValueStore>");
803            push_kvs_inner(&mut body, kvs);
804            body.push_str("</KeyValueStore>");
805        }
806        body.push_str("</Items>");
807        body.push_str("</KeyValueStoreList>");
808        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
809    }
810}
811
812// ─── Origin Access Identities (legacy) ────────────────────────────────
813
814impl CloudFrontService {
815    pub(crate) fn create_oai(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
816        let cfg: CloudFrontOriginAccessIdentityConfig = xml_io::from_xml_root(&req.body)
817            .map_err(|e| invalid_argument(format!("invalid OAI config XML: {e}")))?;
818        if cfg.caller_reference.is_empty() {
819            return Err(invalid_argument("CallerReference is required"));
820        }
821        let mut state = self.state.write();
822        let account = state
823            .accounts
824            .entry(DEFAULT_ACCOUNT.to_string())
825            .or_default();
826        if let Some(existing) = account
827            .origin_access_identities
828            .values()
829            .find(|o| o.config.caller_reference == cfg.caller_reference)
830        {
831            return Err(aws_error(
832                StatusCode::CONFLICT,
833                "CloudFrontOriginAccessIdentityAlreadyExists",
834                format!("OAI with same CallerReference exists: {}", existing.id),
835            ));
836        }
837        let id = format!(
838            "E{}",
839            Uuid::new_v4()
840                .simple()
841                .to_string()
842                .to_uppercase()
843                .chars()
844                .take(13)
845                .collect::<String>()
846        );
847        let etag = generate_id_with_prefix("E");
848        let canonical = Uuid::new_v4().simple().to_string();
849        let stored = StoredOriginAccessIdentity {
850            id: id.clone(),
851            etag: etag.clone(),
852            s3_canonical_user_id: canonical,
853            config: cfg,
854        };
855        account
856            .origin_access_identities
857            .insert(id.clone(), stored.clone());
858        drop(state);
859        let body = render_oai(&stored, "CloudFrontOriginAccessIdentity");
860        Ok(xml_with_etag(StatusCode::CREATED, body, &etag, Some(&id)))
861    }
862
863    pub(crate) fn get_oai(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
864        let id = route_id(route, "CloudFrontOriginAccessIdentity")?;
865        let state = self.state.read();
866        let oai = state
867            .accounts
868            .get(DEFAULT_ACCOUNT)
869            .and_then(|a| a.origin_access_identities.get(&id).cloned())
870            .ok_or_else(|| not_found("CloudFrontOriginAccessIdentity", &id))?;
871        drop(state);
872        let body = render_oai(&oai, "CloudFrontOriginAccessIdentity");
873        Ok(xml_with_etag(StatusCode::OK, body, &oai.etag, None))
874    }
875
876    pub(crate) fn get_oai_config(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
877        let id = route_id(route, "CloudFrontOriginAccessIdentity")?;
878        let state = self.state.read();
879        let oai = state
880            .accounts
881            .get(DEFAULT_ACCOUNT)
882            .and_then(|a| a.origin_access_identities.get(&id).cloned())
883            .ok_or_else(|| not_found("CloudFrontOriginAccessIdentity", &id))?;
884        drop(state);
885        let body = config_xml("CloudFrontOriginAccessIdentityConfig", &oai.config)?;
886        Ok(xml_with_etag(StatusCode::OK, body, &oai.etag, None))
887    }
888
889    pub(crate) fn update_oai(
890        &self,
891        req: &AwsRequest,
892        route: &Route,
893    ) -> Result<AwsResponse, AwsServiceError> {
894        let id = route_id(route, "CloudFrontOriginAccessIdentity")?;
895        let if_match = require_if_match(req)?;
896        let cfg: CloudFrontOriginAccessIdentityConfig = xml_io::from_xml_root(&req.body)
897            .map_err(|e| invalid_argument(format!("invalid OAI config XML: {e}")))?;
898        let mut state = self.state.write();
899        let account = state
900            .accounts
901            .get_mut(DEFAULT_ACCOUNT)
902            .ok_or_else(|| not_found("CloudFrontOriginAccessIdentity", &id))?;
903        let oai = account
904            .origin_access_identities
905            .get_mut(&id)
906            .ok_or_else(|| not_found("CloudFrontOriginAccessIdentity", &id))?;
907        if oai.etag != if_match {
908            return Err(precondition_failed());
909        }
910        if oai.config.caller_reference != cfg.caller_reference {
911            return Err(invalid_argument(
912                "CallerReference cannot change on UpdateCloudFrontOriginAccessIdentity",
913            ));
914        }
915        oai.config = cfg;
916        oai.etag = generate_id_with_prefix("E");
917        let snap = oai.clone();
918        drop(state);
919        let body = render_oai(&snap, "CloudFrontOriginAccessIdentity");
920        Ok(xml_with_etag(StatusCode::OK, body, &snap.etag, None))
921    }
922
923    pub(crate) fn delete_oai(
924        &self,
925        req: &AwsRequest,
926        route: &Route,
927    ) -> Result<AwsResponse, AwsServiceError> {
928        let id = route_id(route, "CloudFrontOriginAccessIdentity")?;
929        let if_match = require_if_match(req)?;
930        let mut state = self.state.write();
931        let account = state
932            .accounts
933            .get_mut(DEFAULT_ACCOUNT)
934            .ok_or_else(|| not_found("CloudFrontOriginAccessIdentity", &id))?;
935        let oai = account
936            .origin_access_identities
937            .get(&id)
938            .ok_or_else(|| not_found("CloudFrontOriginAccessIdentity", &id))?;
939        if oai.etag != if_match {
940            return Err(precondition_failed());
941        }
942        account.origin_access_identities.remove(&id);
943        drop(state);
944        Ok(crate::policies::empty(StatusCode::NO_CONTENT))
945    }
946
947    pub(crate) fn list_oai(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
948        let state = self.state.read();
949        let mut items: Vec<StoredOriginAccessIdentity> = state
950            .accounts
951            .get(DEFAULT_ACCOUNT)
952            .map(|a| a.origin_access_identities.values().cloned().collect())
953            .unwrap_or_default();
954        drop(state);
955        items.sort_by(|a, b| a.id.cmp(&b.id));
956
957        let mut body = String::with_capacity(512);
958        body.push_str(XML_DECL);
959        body.push_str(&format!(
960            "<CloudFrontOriginAccessIdentityList xmlns=\"{NS}\">"
961        ));
962        body.push_str("<Marker></Marker>");
963        body.push_str("<MaxItems>100</MaxItems>");
964        body.push_str("<IsTruncated>false</IsTruncated>");
965        body.push_str(&format!("<Quantity>{}</Quantity>", items.len()));
966        body.push_str("<Items>");
967        for oai in &items {
968            body.push_str("<CloudFrontOriginAccessIdentitySummary>");
969            body.push_str(&format!("<Id>{}</Id>", esc(&oai.id)));
970            body.push_str(&format!(
971                "<S3CanonicalUserId>{}</S3CanonicalUserId>",
972                esc(&oai.s3_canonical_user_id)
973            ));
974            body.push_str(&format!("<Comment>{}</Comment>", esc(&oai.config.comment)));
975            body.push_str("</CloudFrontOriginAccessIdentitySummary>");
976        }
977        body.push_str("</Items>");
978        body.push_str("</CloudFrontOriginAccessIdentityList>");
979        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
980    }
981}
982
983// ─── Monitoring Subscriptions ─────────────────────────────────────────
984
985impl CloudFrontService {
986    pub(crate) fn create_monitoring_subscription(
987        &self,
988        req: &AwsRequest,
989        route: &Route,
990    ) -> Result<AwsResponse, AwsServiceError> {
991        let dist_id = route_id(route, "Distribution")?;
992        let parsed: MonitoringSubscriptionBody = xml_io::from_xml_root(&req.body)
993            .map_err(|e| invalid_argument(format!("invalid MonitoringSubscription XML: {e}")))?;
994        let mut state = self.state.write();
995        let account = state
996            .accounts
997            .entry(DEFAULT_ACCOUNT.to_string())
998            .or_default();
999        if !account.distributions.contains_key(&dist_id) {
1000            return Err(not_found("Distribution", &dist_id));
1001        }
1002        let stored = StoredMonitoringSubscription {
1003            distribution_id: dist_id.clone(),
1004            config: parsed.realtime_metrics_subscription_config,
1005        };
1006        account
1007            .monitoring_subscriptions
1008            .insert(dist_id.clone(), stored.clone());
1009        drop(state);
1010        let body = render_monitoring(&stored);
1011        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1012    }
1013
1014    pub(crate) fn get_monitoring_subscription(
1015        &self,
1016        route: &Route,
1017    ) -> Result<AwsResponse, AwsServiceError> {
1018        let dist_id = route_id(route, "Distribution")?;
1019        let state = self.state.read();
1020        let m = state
1021            .accounts
1022            .get(DEFAULT_ACCOUNT)
1023            .and_then(|a| a.monitoring_subscriptions.get(&dist_id).cloned())
1024            .ok_or_else(|| {
1025                aws_error(
1026                    StatusCode::NOT_FOUND,
1027                    "NoSuchMonitoringSubscription",
1028                    format!("No monitoring subscription for distribution {dist_id}"),
1029                )
1030            })?;
1031        drop(state);
1032        let body = render_monitoring(&m);
1033        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1034    }
1035
1036    pub(crate) fn delete_monitoring_subscription(
1037        &self,
1038        route: &Route,
1039    ) -> Result<AwsResponse, AwsServiceError> {
1040        let dist_id = route_id(route, "Distribution")?;
1041        let mut state = self.state.write();
1042        let account = state
1043            .accounts
1044            .get_mut(DEFAULT_ACCOUNT)
1045            .ok_or_else(|| not_found("Distribution", &dist_id))?;
1046        if account.monitoring_subscriptions.remove(&dist_id).is_none() {
1047            return Err(aws_error(
1048                StatusCode::NOT_FOUND,
1049                "NoSuchMonitoringSubscription",
1050                format!("No monitoring subscription for distribution {dist_id}"),
1051            ));
1052        }
1053        drop(state);
1054        Ok(crate::policies::empty(StatusCode::NO_CONTENT))
1055    }
1056}
1057
1058// ─── Helpers ──────────────────────────────────────────────────────────
1059
1060#[derive(Debug, serde::Deserialize)]
1061#[serde(rename_all = "PascalCase")]
1062struct CreateFunctionRequest {
1063    name: String,
1064    function_config: FunctionConfig,
1065    /// Base64-encoded source.
1066    function_code: String,
1067}
1068
1069#[derive(Debug, serde::Deserialize)]
1070#[serde(rename_all = "PascalCase")]
1071struct UpdateFunctionRequest {
1072    function_config: FunctionConfig,
1073    function_code: String,
1074}
1075
1076#[derive(Debug, serde::Deserialize)]
1077#[serde(rename_all = "PascalCase")]
1078struct TestFunctionRequest {
1079    #[serde(default)]
1080    event_object: String,
1081    #[serde(default)]
1082    stage: Option<String>,
1083}
1084
1085#[derive(Debug, serde::Deserialize)]
1086#[serde(rename_all = "PascalCase")]
1087struct CreateKeyValueStoreRequest {
1088    name: String,
1089    #[serde(default)]
1090    comment: Option<String>,
1091    #[serde(default)]
1092    import_source: Option<ImportSource>,
1093}
1094
1095#[derive(Debug, serde::Deserialize)]
1096#[serde(rename_all = "PascalCase")]
1097struct UpdateKeyValueStoreRequest {
1098    comment: String,
1099}
1100
1101fn config_xml<T: serde::Serialize>(root: &str, cfg: &T) -> Result<String, AwsServiceError> {
1102    let inner = quick_xml::se::to_string_with_root(root, cfg).map_err(|e| {
1103        aws_error(
1104            StatusCode::INTERNAL_SERVER_ERROR,
1105            "InternalError",
1106            format!("xml encode failed: {e}"),
1107        )
1108    })?;
1109    let stamped = inner.replacen(
1110        &format!("<{root}>"),
1111        &format!("<{root} xmlns=\"{NS}\">", NS = crate::NAMESPACE),
1112        1,
1113    );
1114    Ok(format!("{XML_DECL}{stamped}"))
1115}
1116
1117fn parse_stage_query(query: &str) -> Option<String> {
1118    use std::collections::HashMap;
1119    let pairs: HashMap<&str, &str> = query.split('&').filter_map(|p| p.split_once('=')).collect();
1120    pairs.get("Stage").map(|s| s.to_string())
1121}
1122
1123fn stage_view(f: &StoredFunction, stage: &Option<String>) -> StoredFunction {
1124    let mut clone = f.clone();
1125    if stage.as_deref() == Some("LIVE") {
1126        clone.stage = "LIVE".into();
1127    }
1128    clone
1129}
1130
1131fn render_function_summary(f: &StoredFunction, _root: &str) -> String {
1132    // CloudFront returns FunctionSummary as the root for Create/Describe/
1133    // Update/Publish — there is no operation-specific wrapper element.
1134    let mut out = String::with_capacity(512);
1135    out.push_str(XML_DECL);
1136    out.push_str(&render_function_summary_inner_with_ns(f));
1137    out
1138}
1139
1140fn render_function_summary_inner_with_ns(f: &StoredFunction) -> String {
1141    let mut out = String::with_capacity(512);
1142    out.push_str(&format!("<FunctionSummary xmlns=\"{NS}\">"));
1143    out.push_str(&render_function_summary_body(f));
1144    out.push_str("</FunctionSummary>");
1145    out
1146}
1147
1148fn render_function_summary_inner(f: &StoredFunction) -> String {
1149    let mut out = String::with_capacity(512);
1150    out.push_str("<FunctionSummary>");
1151    out.push_str(&render_function_summary_body(f));
1152    out.push_str("</FunctionSummary>");
1153    out
1154}
1155
1156fn render_function_summary_body(f: &StoredFunction) -> String {
1157    let mut out = String::with_capacity(512);
1158    out.push_str(&format!("<Name>{}</Name>", esc(&f.name)));
1159    out.push_str(&format!("<Status>{}</Status>", esc(&f.status)));
1160    out.push_str("<FunctionConfig>");
1161    if let Some(c) = &f.config.comment {
1162        out.push_str(&format!("<Comment>{}</Comment>", esc(c)));
1163    } else {
1164        out.push_str("<Comment></Comment>");
1165    }
1166    out.push_str(&format!("<Runtime>{}</Runtime>", esc(&f.config.runtime)));
1167    out.push_str("</FunctionConfig>");
1168    out.push_str("<FunctionMetadata>");
1169    out.push_str(&format!(
1170        "<FunctionARN>{}</FunctionARN>",
1171        esc(&f.function_arn)
1172    ));
1173    out.push_str(&format!("<Stage>{}</Stage>", esc(&f.stage)));
1174    out.push_str(&format!(
1175        "<CreatedTime>{}</CreatedTime>",
1176        rfc3339(&f.created_time)
1177    ));
1178    out.push_str(&format!(
1179        "<LastModifiedTime>{}</LastModifiedTime>",
1180        rfc3339(&f.last_modified_time)
1181    ));
1182    out.push_str("</FunctionMetadata>");
1183    out
1184}
1185
1186fn render_public_key(p: &StoredPublicKey, root: &str) -> String {
1187    let mut out = String::with_capacity(512);
1188    out.push_str(XML_DECL);
1189    out.push_str(&format!("<{root} xmlns=\"{NS}\">"));
1190    out.push_str(&format!("<Id>{}</Id>", esc(&p.id)));
1191    out.push_str(&format!(
1192        "<CreatedTime>{}</CreatedTime>",
1193        rfc3339(&p.created_time)
1194    ));
1195    out.push_str("<PublicKeyConfig>");
1196    out.push_str(&format!(
1197        "<CallerReference>{}</CallerReference>",
1198        esc(&p.config.caller_reference)
1199    ));
1200    out.push_str(&format!("<Name>{}</Name>", esc(&p.config.name)));
1201    out.push_str(&format!(
1202        "<EncodedKey>{}</EncodedKey>",
1203        esc(&p.config.encoded_key)
1204    ));
1205    if let Some(c) = &p.config.comment {
1206        out.push_str(&format!("<Comment>{}</Comment>", esc(c)));
1207    }
1208    out.push_str("</PublicKeyConfig>");
1209    out.push_str(&format!("</{root}>"));
1210    out
1211}
1212
1213fn push_key_group_inner(out: &mut String, g: &StoredKeyGroup) {
1214    out.push_str(&format!("<Id>{}</Id>", esc(&g.id)));
1215    out.push_str(&format!(
1216        "<LastModifiedTime>{}</LastModifiedTime>",
1217        rfc3339(&g.last_modified_time)
1218    ));
1219    out.push_str("<KeyGroupConfig>");
1220    out.push_str(&format!("<Name>{}</Name>", esc(&g.config.name)));
1221    out.push_str("<Items>");
1222    for k in &g.config.items.public_key {
1223        out.push_str(&format!("<PublicKey>{}</PublicKey>", esc(k)));
1224    }
1225    out.push_str("</Items>");
1226    if let Some(c) = &g.config.comment {
1227        out.push_str(&format!("<Comment>{}</Comment>", esc(c)));
1228    }
1229    out.push_str("</KeyGroupConfig>");
1230}
1231
1232fn render_key_group(g: &StoredKeyGroup, root: &str) -> String {
1233    let mut out = String::with_capacity(512);
1234    out.push_str(XML_DECL);
1235    out.push_str(&format!("<{root} xmlns=\"{NS}\">"));
1236    push_key_group_inner(&mut out, g);
1237    out.push_str(&format!("</{root}>"));
1238    out
1239}
1240
1241fn push_kvs_inner(out: &mut String, kvs: &StoredKeyValueStore) {
1242    out.push_str(&format!("<Name>{}</Name>", esc(&kvs.name)));
1243    out.push_str(&format!("<Id>{}</Id>", esc(&kvs.id)));
1244    out.push_str(&format!(
1245        "<Comment>{}</Comment>",
1246        esc(kvs.comment.as_deref().unwrap_or(""))
1247    ));
1248    out.push_str(&format!("<ARN>{}</ARN>", esc(&kvs.arn)));
1249    out.push_str(&format!("<Status>{}</Status>", esc(&kvs.status)));
1250    out.push_str(&format!(
1251        "<LastModifiedTime>{}</LastModifiedTime>",
1252        rfc3339(&kvs.last_modified_time)
1253    ));
1254}
1255
1256fn render_key_value_store(kvs: &StoredKeyValueStore, _root: &str) -> String {
1257    // SDK expects KeyValueStore as root for Create/Describe/Update.
1258    let mut out = String::with_capacity(512);
1259    out.push_str(XML_DECL);
1260    out.push_str(&format!("<KeyValueStore xmlns=\"{NS}\">"));
1261    push_kvs_inner(&mut out, kvs);
1262    out.push_str("</KeyValueStore>");
1263    out
1264}
1265
1266fn render_oai(oai: &StoredOriginAccessIdentity, root: &str) -> String {
1267    let mut out = String::with_capacity(512);
1268    out.push_str(XML_DECL);
1269    out.push_str(&format!("<{root} xmlns=\"{NS}\">"));
1270    out.push_str(&format!("<Id>{}</Id>", esc(&oai.id)));
1271    out.push_str(&format!(
1272        "<S3CanonicalUserId>{}</S3CanonicalUserId>",
1273        esc(&oai.s3_canonical_user_id)
1274    ));
1275    out.push_str("<CloudFrontOriginAccessIdentityConfig>");
1276    out.push_str(&format!(
1277        "<CallerReference>{}</CallerReference>",
1278        esc(&oai.config.caller_reference)
1279    ));
1280    out.push_str(&format!("<Comment>{}</Comment>", esc(&oai.config.comment)));
1281    out.push_str("</CloudFrontOriginAccessIdentityConfig>");
1282    out.push_str(&format!("</{root}>"));
1283    out
1284}
1285
1286fn render_monitoring(m: &StoredMonitoringSubscription) -> String {
1287    let mut out = String::with_capacity(256);
1288    out.push_str(XML_DECL);
1289    out.push_str(&format!("<MonitoringSubscription xmlns=\"{NS}\">"));
1290    out.push_str("<RealtimeMetricsSubscriptionConfig>");
1291    out.push_str(&format!(
1292        "<RealtimeMetricsSubscriptionStatus>{}</RealtimeMetricsSubscriptionStatus>",
1293        esc(&m.config.realtime_metrics_subscription_status)
1294    ));
1295    out.push_str("</RealtimeMetricsSubscriptionConfig>");
1296    out.push_str("</MonitoringSubscription>");
1297    out
1298}
1299
1300#[cfg(test)]
1301mod tests {
1302    use super::*;
1303    use crate::service::CloudFrontService;
1304    use crate::state::CloudFrontAccounts;
1305    use bytes::Bytes;
1306    use fakecloud_core::service::AwsService;
1307    use http::HeaderValue;
1308    use parking_lot::RwLock;
1309    use std::sync::Arc;
1310
1311    fn svc() -> CloudFrontService {
1312        CloudFrontService::new(Arc::new(RwLock::new(CloudFrontAccounts::new())))
1313    }
1314
1315    fn req(method: http::Method, path: &str, body: &str, if_match: Option<&str>) -> AwsRequest {
1316        let mut headers = HeaderMap::new();
1317        if let Some(v) = if_match {
1318            headers.insert(http::header::IF_MATCH, HeaderValue::from_str(v).unwrap());
1319        }
1320        AwsRequest {
1321            service: "cloudfront".into(),
1322            action: String::new(),
1323            region: "us-east-1".into(),
1324            account_id: DEFAULT_ACCOUNT.into(),
1325            request_id: uuid::Uuid::new_v4().to_string(),
1326            headers,
1327            query_params: std::collections::HashMap::new(),
1328            body_stream: parking_lot::Mutex::new(None),
1329            body: Bytes::from(body.to_string()),
1330            path_segments: path
1331                .split('/')
1332                .filter(|s| !s.is_empty())
1333                .map(String::from)
1334                .collect(),
1335            raw_path: path.into(),
1336            raw_query: String::new(),
1337            method,
1338            is_query_protocol: false,
1339            access_key_id: None,
1340            principal: None,
1341        }
1342    }
1343
1344    async fn create_function(svc: &CloudFrontService, name: &str, code: &str) -> String {
1345        let code_b64 = base64::engine::general_purpose::STANDARD.encode(code.as_bytes());
1346        let body = format!(
1347            r#"<?xml version="1.0"?>
1348<CreateFunctionRequest xmlns="{NS}">
1349  <Name>{name}</Name>
1350  <FunctionConfig>
1351    <Comment>t</Comment>
1352    <Runtime>cloudfront-js-2.0</Runtime>
1353  </FunctionConfig>
1354  <FunctionCode>{code_b64}</FunctionCode>
1355</CreateFunctionRequest>"#
1356        );
1357        let resp = svc
1358            .handle(req(http::Method::POST, "/2020-05-31/function", &body, None))
1359            .await
1360            .unwrap();
1361        assert_eq!(resp.status, StatusCode::CREATED);
1362        resp.headers
1363            .get(http::header::ETAG)
1364            .unwrap()
1365            .to_str()
1366            .unwrap()
1367            .to_string()
1368    }
1369
1370    fn test_function_request_xml(event_json: &str) -> String {
1371        test_function_request_xml_with_stage(event_json, "DEVELOPMENT")
1372    }
1373
1374    fn test_function_request_xml_with_stage(event_json: &str, stage: &str) -> String {
1375        let event_b64 = base64::engine::general_purpose::STANDARD.encode(event_json.as_bytes());
1376        format!(
1377            r#"<?xml version="1.0"?>
1378<TestFunctionRequest xmlns="{NS}">
1379  <Stage>{stage}</Stage>
1380  <EventObject>{event_b64}</EventObject>
1381</TestFunctionRequest>"#
1382        )
1383    }
1384
1385    async fn update_function(
1386        svc: &CloudFrontService,
1387        name: &str,
1388        code: &str,
1389        if_match: &str,
1390    ) -> String {
1391        let code_b64 = base64::engine::general_purpose::STANDARD.encode(code.as_bytes());
1392        let body = format!(
1393            r#"<?xml version="1.0"?>
1394<UpdateFunctionRequest xmlns="{NS}">
1395  <FunctionConfig>
1396    <Comment>t</Comment>
1397    <Runtime>cloudfront-js-2.0</Runtime>
1398  </FunctionConfig>
1399  <FunctionCode>{code_b64}</FunctionCode>
1400</UpdateFunctionRequest>"#
1401        );
1402        let resp = svc
1403            .handle(req(
1404                http::Method::PUT,
1405                &format!("/2020-05-31/function/{name}"),
1406                &body,
1407                Some(if_match),
1408            ))
1409            .await
1410            .unwrap();
1411        assert_eq!(resp.status, StatusCode::OK);
1412        resp.headers
1413            .get(http::header::ETAG)
1414            .unwrap()
1415            .to_str()
1416            .unwrap()
1417            .to_string()
1418    }
1419
1420    async fn publish_function(svc: &CloudFrontService, name: &str, if_match: &str) -> String {
1421        let resp = svc
1422            .handle(req(
1423                http::Method::POST,
1424                &format!("/2020-05-31/function/{name}/publish"),
1425                "",
1426                Some(if_match),
1427            ))
1428            .await
1429            .unwrap();
1430        assert_eq!(resp.status, StatusCode::OK);
1431        resp.headers
1432            .get(http::header::ETAG)
1433            .unwrap()
1434            .to_str()
1435            .unwrap()
1436            .to_string()
1437    }
1438
1439    #[tokio::test]
1440    async fn test_function_executes_handler_and_returns_result() {
1441        let svc = svc();
1442        let etag = create_function(
1443            &svc,
1444            "fn-ok",
1445            r#"function handler(event) { event.headers.x = "y"; return event; }"#,
1446        )
1447        .await;
1448        let body = test_function_request_xml(r#"{"headers":{}}"#);
1449        let resp = svc
1450            .handle(req(
1451                http::Method::POST,
1452                "/2020-05-31/function/fn-ok/test",
1453                &body,
1454                Some(&etag),
1455            ))
1456            .await
1457            .unwrap();
1458        assert_eq!(resp.status, StatusCode::OK);
1459        let xml = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
1460        assert!(
1461            xml.contains("&quot;x&quot;:&quot;y&quot;"),
1462            "expected x:y in FunctionOutput, got {xml}"
1463        );
1464        assert!(xml.contains("<FunctionErrorMessage></FunctionErrorMessage>"));
1465    }
1466
1467    #[tokio::test]
1468    async fn test_function_propagates_js_error_into_message() {
1469        let svc = svc();
1470        let etag = create_function(
1471            &svc,
1472            "fn-err",
1473            r#"function handler() { throw new Error("boom"); }"#,
1474        )
1475        .await;
1476        let body = test_function_request_xml("{}");
1477        let resp = svc
1478            .handle(req(
1479                http::Method::POST,
1480                "/2020-05-31/function/fn-err/test",
1481                &body,
1482                Some(&etag),
1483            ))
1484            .await
1485            .unwrap();
1486        assert_eq!(resp.status, StatusCode::OK);
1487        let xml = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
1488        assert!(
1489            xml.contains("boom"),
1490            "expected boom in error msg, got {xml}"
1491        );
1492        assert!(xml.contains("<FunctionOutput></FunctionOutput>"));
1493    }
1494
1495    #[tokio::test]
1496    async fn test_function_unknown_name_returns_error() {
1497        let svc = svc();
1498        let body = test_function_request_xml("{}");
1499        let err = match svc
1500            .handle(req(
1501                http::Method::POST,
1502                "/2020-05-31/function/missing/test",
1503                &body,
1504                Some("E0"),
1505            ))
1506            .await
1507        {
1508            Err(e) => e,
1509            Ok(_) => panic!("expected NoSuchFunctionExists, got Ok"),
1510        };
1511        assert_eq!(err.status(), StatusCode::NOT_FOUND);
1512        assert_eq!(err.code(), "NoSuchFunctionExists");
1513    }
1514
1515    #[tokio::test]
1516    async fn test_function_modifies_aws_request_shape() {
1517        // Mirrors the canonical CloudFront Functions example from the
1518        // AWS docs: handler rewrites a request header and returns the
1519        // request, fakecloud passes the JSON shape straight through.
1520        let svc = svc();
1521        let etag = create_function(
1522            &svc,
1523            "fn-aws-shape",
1524            r#"function handler(event) { event.request.headers["x-foo"] = {value: "bar"}; return event.request; }"#,
1525        )
1526        .await;
1527        let body = test_function_request_xml(
1528            r#"{"version":"1.0","context":{},"viewer":{},"request":{"method":"GET","uri":"/","querystring":{},"headers":{},"cookies":{}}}"#,
1529        );
1530        let resp = svc
1531            .handle(req(
1532                http::Method::POST,
1533                "/2020-05-31/function/fn-aws-shape/test",
1534                &body,
1535                Some(&etag),
1536            ))
1537            .await
1538            .unwrap();
1539        assert_eq!(resp.status, StatusCode::OK);
1540        let xml = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
1541        assert!(
1542            xml.contains("x-foo"),
1543            "expected request header rewrite in output, got {xml}"
1544        );
1545        assert!(
1546            xml.contains("bar"),
1547            "expected header value in output, got {xml}"
1548        );
1549        assert!(
1550            xml.contains("<FunctionErrorMessage></FunctionErrorMessage>"),
1551            "expected empty error, got {xml}"
1552        );
1553    }
1554
1555    #[tokio::test]
1556    async fn test_function_logs_error_and_marks_compute_over_100() {
1557        let svc = svc();
1558        let etag = create_function(
1559            &svc,
1560            "fn-throws",
1561            r#"function handler() { throw new Error("kaboom"); }"#,
1562        )
1563        .await;
1564        let body = test_function_request_xml("{}");
1565        let resp = svc
1566            .handle(req(
1567                http::Method::POST,
1568                "/2020-05-31/function/fn-throws/test",
1569                &body,
1570                Some(&etag),
1571            ))
1572            .await
1573            .unwrap();
1574        assert_eq!(resp.status, StatusCode::OK);
1575        let xml = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
1576        // Error appears in both the dedicated message and in the logs
1577        assert!(xml.contains("kaboom"), "expected kaboom in body, got {xml}");
1578        assert!(
1579            xml.contains("<FunctionExecutionLogs>")
1580                && xml.contains("<member>ERROR: ")
1581                && xml.contains("kaboom"),
1582            "expected error log line, got {xml}"
1583        );
1584        // ComputeUtilization is rendered as a plain integer; on failure
1585        // we saturate past 100.
1586        let cu_open = xml.find("<ComputeUtilization>").unwrap() + "<ComputeUtilization>".len();
1587        let cu_close = xml.find("</ComputeUtilization>").unwrap();
1588        let pct: u32 = xml[cu_open..cu_close].parse().unwrap();
1589        assert!(pct > 100, "expected pct > 100 on error, got {pct}");
1590    }
1591
1592    #[tokio::test]
1593    async fn test_function_stage_selects_published_or_development_code() {
1594        // Publish freezes a copy of the code; subsequent UpdateFunction
1595        // mutates DEVELOPMENT but leaves LIVE pinned to the published
1596        // snapshot. Each stage's TestFunction must run the matching
1597        // version.
1598        let svc = svc();
1599        let etag =
1600            create_function(&svc, "fn-stage", r#"function handler() { return "v1"; }"#).await;
1601        let pub_etag = publish_function(&svc, "fn-stage", &etag).await;
1602        let _new_etag = update_function(
1603            &svc,
1604            "fn-stage",
1605            r#"function handler() { return "v2"; }"#,
1606            &pub_etag,
1607        )
1608        .await;
1609
1610        // DEVELOPMENT runs v2 (the latest update body).
1611        let dev_body = test_function_request_xml_with_stage("{}", "DEVELOPMENT");
1612        let resp = svc
1613            .handle(req(
1614                http::Method::POST,
1615                "/2020-05-31/function/fn-stage/test",
1616                &dev_body,
1617                Some("E_NOT_MATCHING"),
1618            ))
1619            .await;
1620        // We deliberately allow stale If-Match here so we exercise the
1621        // stage path; the precondition fires before we get to JS, so
1622        // grab the latest etag and retry.
1623        assert!(resp.is_err(), "stale If-Match must be rejected");
1624        let described = svc
1625            .handle(req(
1626                http::Method::GET,
1627                "/2020-05-31/function/fn-stage",
1628                "",
1629                None,
1630            ))
1631            .await
1632            .unwrap();
1633        let live_etag = described
1634            .headers
1635            .get(http::header::ETAG)
1636            .unwrap()
1637            .to_str()
1638            .unwrap()
1639            .to_string();
1640
1641        let dev_resp = svc
1642            .handle(req(
1643                http::Method::POST,
1644                "/2020-05-31/function/fn-stage/test",
1645                &dev_body,
1646                Some(&live_etag),
1647            ))
1648            .await
1649            .unwrap();
1650        let dev_xml = std::str::from_utf8(dev_resp.body.expect_bytes()).unwrap();
1651        assert!(
1652            dev_xml.contains("&quot;v2&quot;"),
1653            "DEVELOPMENT should run latest update (v2), got {dev_xml}"
1654        );
1655
1656        // LIVE runs v1 (the published snapshot, not affected by the
1657        // post-publish update).
1658        let live_body = test_function_request_xml_with_stage("{}", "LIVE");
1659        let live_resp = svc
1660            .handle(req(
1661                http::Method::POST,
1662                "/2020-05-31/function/fn-stage/test",
1663                &live_body,
1664                Some(&live_etag),
1665            ))
1666            .await
1667            .unwrap();
1668        let live_xml = std::str::from_utf8(live_resp.body.expect_bytes()).unwrap();
1669        assert!(
1670            live_xml.contains("&quot;v1&quot;"),
1671            "LIVE should run published snapshot (v1), got {live_xml}"
1672        );
1673    }
1674
1675    #[tokio::test]
1676    async fn test_function_infinite_loop_is_killed() {
1677        let svc = svc();
1678        let etag = create_function(&svc, "fn-loop", r#"function handler() { while(1){} }"#).await;
1679        let body = test_function_request_xml("{}");
1680        let resp = svc
1681            .handle(req(
1682                http::Method::POST,
1683                "/2020-05-31/function/fn-loop/test",
1684                &body,
1685                Some(&etag),
1686            ))
1687            .await
1688            .unwrap();
1689        assert_eq!(resp.status, StatusCode::OK);
1690        let xml = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
1691        assert!(
1692            xml.contains("<FunctionOutput></FunctionOutput>"),
1693            "expected empty output, got {xml}"
1694        );
1695        assert!(
1696            xml.contains("ERROR:") && xml.contains("limit"),
1697            "expected timeout/limit error in logs, got {xml}"
1698        );
1699        let cu_open = xml.find("<ComputeUtilization>").unwrap() + "<ComputeUtilization>".len();
1700        let cu_close = xml.find("</ComputeUtilization>").unwrap();
1701        let pct: u32 = xml[cu_open..cu_close].parse().unwrap();
1702        assert!(pct > 100, "expected pct > 100 after kill, got {pct}");
1703    }
1704}