Skip to main content

fakecloud_cloudfront/
functions_service.rs

1//! Handlers for CloudFront Batch 3 resources: Functions, Public Keys,
2//! Key Groups, Key Value Stores, Origin Access Identities (legacy),
3//! Monitoring Subscriptions.
4
5use base64::Engine;
6use chrono::Utc;
7use http::header::ETAG;
8use http::{HeaderMap, StatusCode};
9use uuid::Uuid;
10
11use fakecloud_core::service::{AwsRequest, AwsResponse, AwsServiceError};
12
13use crate::functions::{
14    CloudFrontOriginAccessIdentityConfig, FunctionConfig, ImportSource, KeyGroupConfig,
15    MonitoringSubscriptionBody, PublicKeyConfig, StoredFunction, StoredKeyGroup,
16    StoredKeyValueStore, StoredMonitoringSubscription, StoredOriginAccessIdentity, StoredPublicKey,
17};
18use crate::policies::{
19    not_found, precondition_failed, require_if_match, rfc3339, route_id, xml_with_etag,
20};
21use crate::router::Route;
22use crate::service::{
23    aws_error, esc, generate_id_with_prefix, invalid_argument, xml_response, CloudFrontService,
24    DEFAULT_ACCOUNT,
25};
26use crate::xml_io;
27
28const NS: &str = crate::NAMESPACE;
29const XML_DECL: &str = r#"<?xml version="1.0" encoding="UTF-8"?>"#;
30
31// ─── CloudFront Functions ─────────────────────────────────────────────
32
33impl CloudFrontService {
34    pub(crate) fn create_function(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
35        // Body shape: <CreateFunctionRequest><Name/><FunctionConfig/><FunctionCode/></CreateFunctionRequest>
36        let parsed: CreateFunctionRequest = xml_io::from_xml_root(&req.body)
37            .map_err(|e| invalid_argument(format!("invalid CreateFunctionRequest XML: {e}")))?;
38        if parsed.name.is_empty() {
39            return Err(invalid_argument("CreateFunctionRequest.Name is required"));
40        }
41
42        let mut state = self.state.write();
43        let account = state
44            .accounts
45            .entry(DEFAULT_ACCOUNT.to_string())
46            .or_default();
47        if account.functions.contains_key(&parsed.name) {
48            return Err(aws_error(
49                StatusCode::CONFLICT,
50                "FunctionAlreadyExists",
51                format!("Function {} already exists", parsed.name),
52            ));
53        }
54        let now = Utc::now();
55        let etag = generate_id_with_prefix("E");
56        let function_arn = format!(
57            "arn:aws:cloudfront::{}:function/{}",
58            DEFAULT_ACCOUNT, parsed.name
59        );
60        let stored = StoredFunction {
61            name: parsed.name.clone(),
62            etag: etag.clone(),
63            status: "UNPUBLISHED".to_string(),
64            stage: "DEVELOPMENT".to_string(),
65            function_arn: function_arn.clone(),
66            created_time: now,
67            last_modified_time: now,
68            config: parsed.function_config,
69            function_code: parsed.function_code,
70            // No published snapshot until PublishFunction is called.
71            live_function_code: None,
72        };
73        account
74            .functions
75            .insert(parsed.name.clone(), stored.clone());
76        drop(state);
77
78        let body = render_function_summary(&stored, "CreateFunctionResult");
79        Ok(xml_with_etag(StatusCode::CREATED, body, &etag, None))
80    }
81
82    pub(crate) fn describe_function(
83        &self,
84        req: &AwsRequest,
85        route: &Route,
86    ) -> Result<AwsResponse, AwsServiceError> {
87        let name = route_id(route, "Function")?;
88        let stage = parse_stage_query(&req.raw_query);
89        let state = self.state.read();
90        let f = state
91            .accounts
92            .get(DEFAULT_ACCOUNT)
93            .and_then(|a| a.functions.get(&name).cloned())
94            .ok_or_else(|| not_found("Function", &name))?;
95        drop(state);
96        let view = stage_view(&f, &stage);
97        let body = render_function_summary(&view, "DescribeFunctionResult");
98        Ok(xml_with_etag(StatusCode::OK, body, &view.etag, None))
99    }
100
101    pub(crate) fn get_function(
102        &self,
103        req: &AwsRequest,
104        route: &Route,
105    ) -> Result<AwsResponse, AwsServiceError> {
106        let name = route_id(route, "Function")?;
107        let stage = parse_stage_query(&req.raw_query);
108        let state = self.state.read();
109        let f = state
110            .accounts
111            .get(DEFAULT_ACCOUNT)
112            .and_then(|a| a.functions.get(&name).cloned())
113            .ok_or_else(|| not_found("Function", &name))?;
114        drop(state);
115        let view = stage_view(&f, &stage);
116        let mut headers = HeaderMap::new();
117        headers.insert(ETAG, view.etag.parse().unwrap());
118        let bytes = base64::engine::general_purpose::STANDARD
119            .decode(view.function_code.as_bytes())
120            .unwrap_or_default();
121        Ok(AwsResponse {
122            status: StatusCode::OK,
123            headers,
124            content_type: "application/octet-stream".to_string(),
125            body: fakecloud_core::service::ResponseBody::Bytes(bytes::Bytes::from(bytes)),
126        })
127    }
128
129    pub(crate) fn update_function(
130        &self,
131        req: &AwsRequest,
132        route: &Route,
133    ) -> Result<AwsResponse, AwsServiceError> {
134        let name = route_id(route, "Function")?;
135        let if_match = require_if_match(req)?;
136        let parsed: UpdateFunctionRequest = xml_io::from_xml_root(&req.body)
137            .map_err(|e| invalid_argument(format!("invalid UpdateFunctionRequest XML: {e}")))?;
138        let mut state = self.state.write();
139        let account = state
140            .accounts
141            .get_mut(DEFAULT_ACCOUNT)
142            .ok_or_else(|| not_found("Function", &name))?;
143        let f = account
144            .functions
145            .get_mut(&name)
146            .ok_or_else(|| not_found("Function", &name))?;
147        if f.etag != if_match {
148            return Err(precondition_failed());
149        }
150        f.config = parsed.function_config;
151        f.function_code = parsed.function_code;
152        f.etag = generate_id_with_prefix("E");
153        f.last_modified_time = Utc::now();
154        f.status = "UNPUBLISHED".to_string();
155        f.stage = "DEVELOPMENT".to_string();
156        let snap = f.clone();
157        drop(state);
158        let body = render_function_summary(&snap, "UpdateFunctionResult");
159        // SDK has a known typo on UpdateFunctionOutput: it deserializes
160        // the etag from header `ETtag`, not `ETag`. Send both so any
161        // SDK version can read it.
162        let mut headers = HeaderMap::new();
163        if let Ok(v) = http::HeaderValue::from_str(&snap.etag) {
164            headers.insert(ETAG, v.clone());
165            headers.insert("ETtag", v);
166        }
167        Ok(xml_response(StatusCode::OK, body, headers))
168    }
169
170    pub(crate) fn delete_function(
171        &self,
172        req: &AwsRequest,
173        route: &Route,
174    ) -> Result<AwsResponse, AwsServiceError> {
175        let name = route_id(route, "Function")?;
176        let if_match = require_if_match(req)?;
177        let mut state = self.state.write();
178        let account = state
179            .accounts
180            .get_mut(DEFAULT_ACCOUNT)
181            .ok_or_else(|| not_found("Function", &name))?;
182        let f = account
183            .functions
184            .get(&name)
185            .ok_or_else(|| not_found("Function", &name))?;
186        if f.etag != if_match {
187            return Err(precondition_failed());
188        }
189        account.functions.remove(&name);
190        drop(state);
191        Ok(crate::policies::empty(StatusCode::NO_CONTENT))
192    }
193
194    pub(crate) fn list_functions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
195        let stage = validate_stage_query(&req.raw_query)?;
196        let state = self.state.read();
197        let mut items: Vec<StoredFunction> = state
198            .accounts
199            .get(DEFAULT_ACCOUNT)
200            .map(|a| a.functions.values().cloned().collect())
201            .unwrap_or_default();
202        drop(state);
203        items.sort_by(|a, b| a.name.cmp(&b.name));
204
205        let mut body = String::with_capacity(512);
206        body.push_str(XML_DECL);
207        body.push_str(&format!("<FunctionList xmlns=\"{NS}\">"));
208        body.push_str("<Marker></Marker>");
209        body.push_str("<MaxItems>100</MaxItems>");
210        body.push_str(&format!("<Quantity>{}</Quantity>", items.len()));
211        body.push_str("<Items>");
212        for f in &items {
213            let view = stage_view(f, &stage);
214            body.push_str(&render_function_summary_inner(&view));
215        }
216        body.push_str("</Items>");
217        body.push_str("</FunctionList>");
218        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
219    }
220
221    pub(crate) fn publish_function(
222        &self,
223        req: &AwsRequest,
224        route: &Route,
225    ) -> Result<AwsResponse, AwsServiceError> {
226        let name = route_id(route, "Function")?;
227        let if_match = require_if_match(req)?;
228        let mut state = self.state.write();
229        let account = state
230            .accounts
231            .get_mut(DEFAULT_ACCOUNT)
232            .ok_or_else(|| not_found("Function", &name))?;
233        let f = account
234            .functions
235            .get_mut(&name)
236            .ok_or_else(|| not_found("Function", &name))?;
237        if f.etag != if_match {
238            return Err(precondition_failed());
239        }
240        f.status = "DEPLOYED".to_string();
241        f.stage = "LIVE".to_string();
242        f.last_modified_time = Utc::now();
243        // Freeze the current development code as the LIVE snapshot.
244        // Subsequent UpdateFunction calls mutate `function_code` but
245        // leave this alone, so `TestFunction(Stage=LIVE)` keeps running
246        // the published version until the next Publish.
247        f.live_function_code = Some(f.function_code.clone());
248        let snap = f.clone();
249        drop(state);
250        let body = render_function_summary(&snap, "PublishFunctionResult");
251        Ok(xml_with_etag(StatusCode::OK, body, &snap.etag, None))
252    }
253
254    pub(crate) fn test_function(
255        &self,
256        req: &AwsRequest,
257        route: &Route,
258    ) -> Result<AwsResponse, AwsServiceError> {
259        let name = route_id(route, "Function")?;
260        let if_match = require_if_match(req)?;
261        let parsed: TestFunctionRequest = xml_io::from_xml_root(&req.body)
262            .map_err(|e| invalid_argument(format!("invalid TestFunctionRequest XML: {e}")))?;
263        let event_bytes = base64::engine::general_purpose::STANDARD
264            .decode(parsed.event_object.trim().as_bytes())
265            .map_err(|e| invalid_argument(format!("EventObject is not valid base64: {e}")))?;
266
267        let state = self.state.read();
268        let f = state
269            .accounts
270            .get(DEFAULT_ACCOUNT)
271            .and_then(|a| a.functions.get(&name).cloned())
272            .ok_or_else(|| {
273                aws_error(
274                    StatusCode::NOT_FOUND,
275                    "NoSuchFunctionExists",
276                    format!("The specified function does not exist: {name}"),
277                )
278            })?;
279        drop(state);
280        if f.etag != if_match {
281            return Err(precondition_failed());
282        }
283
284        // AWS lets callers pick which version to run. DEVELOPMENT (the
285        // default) is the latest CreateFunction / UpdateFunction body;
286        // LIVE is the snapshot taken at PublishFunction. Falling back to
287        // DEVELOPMENT when no snapshot exists matches the AWS error
288        // shape ("function not yet published") less closely, but is
289        // strictly nicer for tests against unpublished functions.
290        let stage = parsed.stage.as_deref().unwrap_or("DEVELOPMENT");
291        let source_b64 = if stage.eq_ignore_ascii_case("LIVE") {
292            f.live_function_code.as_deref().unwrap_or(&f.function_code)
293        } else {
294            f.function_code.as_str()
295        };
296        let code_bytes = base64::engine::general_purpose::STANDARD
297            .decode(source_b64.as_bytes())
298            .unwrap_or_else(|_| source_b64.as_bytes().to_vec());
299        let code = String::from_utf8(code_bytes)
300            .map_err(|e| invalid_argument(format!("function code is not valid UTF-8: {e}")))?;
301        let exec = crate::js_runtime::run_handler(&code, &event_bytes);
302
303        let mut body = String::with_capacity(1024);
304        body.push_str(XML_DECL);
305        body.push_str(&format!("<TestResult xmlns=\"{NS}\">"));
306        body.push_str(&render_function_summary_inner(&f));
307        body.push_str(&format!(
308            "<ComputeUtilization>{}</ComputeUtilization>",
309            exec.compute_utilization
310        ));
311        body.push_str("<FunctionExecutionLogs>");
312        for line in &exec.logs {
313            body.push_str(&format!("<member>{}</member>", esc(line)));
314        }
315        body.push_str("</FunctionExecutionLogs>");
316        body.push_str(&format!(
317            "<FunctionErrorMessage>{}</FunctionErrorMessage>",
318            esc(exec.error.as_deref().unwrap_or(""))
319        ));
320        body.push_str(&format!(
321            "<FunctionOutput>{}</FunctionOutput>",
322            esc(exec.output.as_deref().unwrap_or(""))
323        ));
324        body.push_str("</TestResult>");
325        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
326    }
327}
328
329// ─── Public Keys ──────────────────────────────────────────────────────
330
331impl CloudFrontService {
332    pub(crate) fn create_public_key(
333        &self,
334        req: &AwsRequest,
335    ) -> Result<AwsResponse, AwsServiceError> {
336        let cfg: PublicKeyConfig = xml_io::from_xml_root(&req.body)
337            .map_err(|e| invalid_argument(format!("invalid PublicKeyConfig XML: {e}")))?;
338        if cfg.name.is_empty() {
339            return Err(invalid_argument("PublicKeyConfig.Name is required"));
340        }
341        if cfg.encoded_key.is_empty() {
342            return Err(invalid_argument("PublicKeyConfig.EncodedKey is required"));
343        }
344        let mut state = self.state.write();
345        let account = state
346            .accounts
347            .entry(DEFAULT_ACCOUNT.to_string())
348            .or_default();
349        if let Some(existing) = account
350            .public_keys
351            .values()
352            .find(|p| p.config.caller_reference == cfg.caller_reference)
353        {
354            return Err(aws_error(
355                StatusCode::CONFLICT,
356                "PublicKeyAlreadyExists",
357                format!(
358                    "PublicKey with same CallerReference exists: {}",
359                    existing.id
360                ),
361            ));
362        }
363        let id = generate_id_with_prefix("K");
364        let etag = generate_id_with_prefix("E");
365        let stored = StoredPublicKey {
366            id: id.clone(),
367            etag: etag.clone(),
368            created_time: Utc::now(),
369            config: cfg,
370        };
371        account.public_keys.insert(id.clone(), stored.clone());
372        drop(state);
373        let body = render_public_key(&stored, "PublicKey");
374        Ok(xml_with_etag(StatusCode::CREATED, body, &etag, Some(&id)))
375    }
376
377    pub(crate) fn get_public_key(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
378        let id = route_id(route, "PublicKey")?;
379        let state = self.state.read();
380        let p = state
381            .accounts
382            .get(DEFAULT_ACCOUNT)
383            .and_then(|a| a.public_keys.get(&id).cloned())
384            .ok_or_else(|| not_found("PublicKey", &id))?;
385        drop(state);
386        let body = render_public_key(&p, "PublicKey");
387        Ok(xml_with_etag(StatusCode::OK, body, &p.etag, None))
388    }
389
390    pub(crate) fn get_public_key_config(
391        &self,
392        route: &Route,
393    ) -> Result<AwsResponse, AwsServiceError> {
394        let id = route_id(route, "PublicKey")?;
395        let state = self.state.read();
396        let p = state
397            .accounts
398            .get(DEFAULT_ACCOUNT)
399            .and_then(|a| a.public_keys.get(&id).cloned())
400            .ok_or_else(|| not_found("PublicKey", &id))?;
401        drop(state);
402        let body = config_xml("PublicKeyConfig", &p.config)?;
403        Ok(xml_with_etag(StatusCode::OK, body, &p.etag, None))
404    }
405
406    pub(crate) fn update_public_key(
407        &self,
408        req: &AwsRequest,
409        route: &Route,
410    ) -> Result<AwsResponse, AwsServiceError> {
411        let id = route_id(route, "PublicKey")?;
412        let if_match = require_if_match(req)?;
413        let cfg: PublicKeyConfig = xml_io::from_xml_root(&req.body)
414            .map_err(|e| invalid_argument(format!("invalid PublicKeyConfig XML: {e}")))?;
415        if cfg.name.is_empty() {
416            return Err(invalid_argument("PublicKeyConfig.Name is required"));
417        }
418        let mut state = self.state.write();
419        let account = state
420            .accounts
421            .get_mut(DEFAULT_ACCOUNT)
422            .ok_or_else(|| not_found("PublicKey", &id))?;
423        let p = account
424            .public_keys
425            .get_mut(&id)
426            .ok_or_else(|| not_found("PublicKey", &id))?;
427        if p.etag != if_match {
428            return Err(precondition_failed());
429        }
430        // CallerReference is immutable per AWS.
431        if p.config.caller_reference != cfg.caller_reference {
432            return Err(invalid_argument(
433                "CallerReference cannot change on UpdatePublicKey",
434            ));
435        }
436        p.config = cfg;
437        p.etag = generate_id_with_prefix("E");
438        let snap = p.clone();
439        drop(state);
440        let body = render_public_key(&snap, "PublicKey");
441        Ok(xml_with_etag(StatusCode::OK, body, &snap.etag, None))
442    }
443
444    pub(crate) fn delete_public_key(
445        &self,
446        req: &AwsRequest,
447        route: &Route,
448    ) -> Result<AwsResponse, AwsServiceError> {
449        let id = route_id(route, "PublicKey")?;
450        let if_match = require_if_match(req)?;
451        let mut state = self.state.write();
452        let account = state
453            .accounts
454            .get_mut(DEFAULT_ACCOUNT)
455            .ok_or_else(|| not_found("PublicKey", &id))?;
456        let p = account
457            .public_keys
458            .get(&id)
459            .ok_or_else(|| not_found("PublicKey", &id))?;
460        if p.etag != if_match {
461            return Err(precondition_failed());
462        }
463        account.public_keys.remove(&id);
464        drop(state);
465        Ok(crate::policies::empty(StatusCode::NO_CONTENT))
466    }
467
468    pub(crate) fn list_public_keys(
469        &self,
470        _req: &AwsRequest,
471    ) -> Result<AwsResponse, AwsServiceError> {
472        let state = self.state.read();
473        let mut items: Vec<StoredPublicKey> = state
474            .accounts
475            .get(DEFAULT_ACCOUNT)
476            .map(|a| a.public_keys.values().cloned().collect())
477            .unwrap_or_default();
478        drop(state);
479        items.sort_by(|a, b| a.id.cmp(&b.id));
480
481        let mut body = String::with_capacity(512);
482        body.push_str(XML_DECL);
483        body.push_str(&format!("<PublicKeyList xmlns=\"{NS}\">"));
484        body.push_str("<Marker></Marker>");
485        body.push_str("<MaxItems>100</MaxItems>");
486        body.push_str(&format!("<Quantity>{}</Quantity>", items.len()));
487        body.push_str("<Items>");
488        for p in &items {
489            body.push_str("<PublicKeySummary>");
490            body.push_str(&format!("<Id>{}</Id>", esc(&p.id)));
491            body.push_str(&format!("<Name>{}</Name>", esc(&p.config.name)));
492            body.push_str(&format!(
493                "<CreatedTime>{}</CreatedTime>",
494                rfc3339(&p.created_time)
495            ));
496            body.push_str(&format!(
497                "<EncodedKey>{}</EncodedKey>",
498                esc(&p.config.encoded_key)
499            ));
500            if let Some(c) = &p.config.comment {
501                body.push_str(&format!("<Comment>{}</Comment>", esc(c)));
502            }
503            body.push_str("</PublicKeySummary>");
504        }
505        body.push_str("</Items>");
506        body.push_str("</PublicKeyList>");
507        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
508    }
509}
510
511// ─── Key Groups ───────────────────────────────────────────────────────
512
513impl CloudFrontService {
514    pub(crate) fn create_key_group(
515        &self,
516        req: &AwsRequest,
517    ) -> Result<AwsResponse, AwsServiceError> {
518        let cfg: KeyGroupConfig = xml_io::from_xml_root(&req.body)
519            .map_err(|e| invalid_argument(format!("invalid KeyGroupConfig XML: {e}")))?;
520        if cfg.name.is_empty() {
521            return Err(invalid_argument("KeyGroupConfig.Name is required"));
522        }
523        let mut state = self.state.write();
524        let account = state
525            .accounts
526            .entry(DEFAULT_ACCOUNT.to_string())
527            .or_default();
528        let id = generate_id_with_prefix("K");
529        let etag = generate_id_with_prefix("E");
530        let stored = StoredKeyGroup {
531            id: id.clone(),
532            etag: etag.clone(),
533            last_modified_time: Utc::now(),
534            config: cfg,
535        };
536        account.key_groups.insert(id.clone(), stored.clone());
537        drop(state);
538        let body = render_key_group(&stored, "KeyGroup");
539        Ok(xml_with_etag(StatusCode::CREATED, body, &etag, Some(&id)))
540    }
541
542    pub(crate) fn get_key_group(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
543        let id = route_id(route, "KeyGroup")?;
544        let state = self.state.read();
545        let g = state
546            .accounts
547            .get(DEFAULT_ACCOUNT)
548            .and_then(|a| a.key_groups.get(&id).cloned())
549            .ok_or_else(|| not_found("KeyGroup", &id))?;
550        drop(state);
551        let body = render_key_group(&g, "KeyGroup");
552        Ok(xml_with_etag(StatusCode::OK, body, &g.etag, None))
553    }
554
555    pub(crate) fn get_key_group_config(
556        &self,
557        route: &Route,
558    ) -> Result<AwsResponse, AwsServiceError> {
559        let id = route_id(route, "KeyGroup")?;
560        let state = self.state.read();
561        let g = state
562            .accounts
563            .get(DEFAULT_ACCOUNT)
564            .and_then(|a| a.key_groups.get(&id).cloned())
565            .ok_or_else(|| not_found("KeyGroup", &id))?;
566        drop(state);
567        let body = config_xml("KeyGroupConfig", &g.config)?;
568        Ok(xml_with_etag(StatusCode::OK, body, &g.etag, None))
569    }
570
571    pub(crate) fn update_key_group(
572        &self,
573        req: &AwsRequest,
574        route: &Route,
575    ) -> Result<AwsResponse, AwsServiceError> {
576        let id = route_id(route, "KeyGroup")?;
577        let if_match = require_if_match(req)?;
578        let cfg: KeyGroupConfig = xml_io::from_xml_root(&req.body)
579            .map_err(|e| invalid_argument(format!("invalid KeyGroupConfig XML: {e}")))?;
580        if cfg.name.is_empty() {
581            return Err(invalid_argument("KeyGroupConfig.Name is required"));
582        }
583        let mut state = self.state.write();
584        let account = state
585            .accounts
586            .get_mut(DEFAULT_ACCOUNT)
587            .ok_or_else(|| not_found("KeyGroup", &id))?;
588        let g = account
589            .key_groups
590            .get_mut(&id)
591            .ok_or_else(|| not_found("KeyGroup", &id))?;
592        if g.etag != if_match {
593            return Err(precondition_failed());
594        }
595        g.config = cfg;
596        g.etag = generate_id_with_prefix("E");
597        g.last_modified_time = Utc::now();
598        let snap = g.clone();
599        drop(state);
600        let body = render_key_group(&snap, "KeyGroup");
601        Ok(xml_with_etag(StatusCode::OK, body, &snap.etag, None))
602    }
603
604    pub(crate) fn delete_key_group(
605        &self,
606        req: &AwsRequest,
607        route: &Route,
608    ) -> Result<AwsResponse, AwsServiceError> {
609        let id = route_id(route, "KeyGroup")?;
610        let if_match = require_if_match(req)?;
611        let mut state = self.state.write();
612        let account = state
613            .accounts
614            .get_mut(DEFAULT_ACCOUNT)
615            .ok_or_else(|| not_found("KeyGroup", &id))?;
616        let g = account
617            .key_groups
618            .get(&id)
619            .ok_or_else(|| not_found("KeyGroup", &id))?;
620        if g.etag != if_match {
621            return Err(precondition_failed());
622        }
623        account.key_groups.remove(&id);
624        drop(state);
625        Ok(crate::policies::empty(StatusCode::NO_CONTENT))
626    }
627
628    pub(crate) fn list_key_groups(
629        &self,
630        _req: &AwsRequest,
631    ) -> Result<AwsResponse, AwsServiceError> {
632        let state = self.state.read();
633        let mut items: Vec<StoredKeyGroup> = state
634            .accounts
635            .get(DEFAULT_ACCOUNT)
636            .map(|a| a.key_groups.values().cloned().collect())
637            .unwrap_or_default();
638        drop(state);
639        items.sort_by(|a, b| a.config.name.cmp(&b.config.name));
640
641        let mut body = String::with_capacity(512);
642        body.push_str(XML_DECL);
643        body.push_str(&format!("<KeyGroupList xmlns=\"{NS}\">"));
644        body.push_str("<Marker></Marker>");
645        body.push_str("<MaxItems>100</MaxItems>");
646        body.push_str(&format!("<Quantity>{}</Quantity>", items.len()));
647        body.push_str("<Items>");
648        for g in &items {
649            body.push_str("<KeyGroupSummary>");
650            body.push_str("<KeyGroup>");
651            push_key_group_inner(&mut body, g);
652            body.push_str("</KeyGroup>");
653            body.push_str("</KeyGroupSummary>");
654        }
655        body.push_str("</Items>");
656        body.push_str("</KeyGroupList>");
657        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
658    }
659}
660
661// ─── Key Value Stores ─────────────────────────────────────────────────
662
663impl CloudFrontService {
664    pub(crate) fn create_key_value_store(
665        &self,
666        req: &AwsRequest,
667    ) -> Result<AwsResponse, AwsServiceError> {
668        let parsed: CreateKeyValueStoreRequest = xml_io::from_xml_root(&req.body)
669            .map_err(|e| invalid_argument(format!("invalid CreateKeyValueStore XML: {e}")))?;
670        if parsed.name.is_empty() {
671            return Err(invalid_argument("Name is required"));
672        }
673        let mut state = self.state.write();
674        let account = state
675            .accounts
676            .entry(DEFAULT_ACCOUNT.to_string())
677            .or_default();
678        if account.key_value_stores.contains_key(&parsed.name) {
679            return Err(aws_error(
680                StatusCode::CONFLICT,
681                "EntityAlreadyExists",
682                format!("KeyValueStore {} already exists", parsed.name),
683            ));
684        }
685        let now = Utc::now();
686        let id = Uuid::new_v4().to_string();
687        let etag = generate_id_with_prefix("E");
688        let arn = format!(
689            "arn:aws:cloudfront::{}:key-value-store/{}",
690            DEFAULT_ACCOUNT, id
691        );
692        let stored = StoredKeyValueStore {
693            name: parsed.name.clone(),
694            id,
695            etag: etag.clone(),
696            arn,
697            status: "READY".to_string(),
698            created_time: now,
699            last_modified_time: now,
700            comment: parsed.comment,
701            import_source: parsed.import_source,
702        };
703        account
704            .key_value_stores
705            .insert(parsed.name.clone(), stored.clone());
706        drop(state);
707        let body = render_key_value_store(&stored, "CreateKeyValueStoreResult");
708        Ok(xml_with_etag(StatusCode::CREATED, body, &etag, None))
709    }
710
711    pub(crate) fn describe_key_value_store(
712        &self,
713        route: &Route,
714    ) -> Result<AwsResponse, AwsServiceError> {
715        let name = route_id(route, "KeyValueStore")?;
716        let state = self.state.read();
717        let kvs = state
718            .accounts
719            .get(DEFAULT_ACCOUNT)
720            .and_then(|a| a.key_value_stores.get(&name).cloned())
721            .ok_or_else(|| not_found("KeyValueStore", &name))?;
722        drop(state);
723        let body = render_key_value_store(&kvs, "DescribeKeyValueStoreResult");
724        Ok(xml_with_etag(StatusCode::OK, body, &kvs.etag, None))
725    }
726
727    pub(crate) fn update_key_value_store(
728        &self,
729        req: &AwsRequest,
730        route: &Route,
731    ) -> Result<AwsResponse, AwsServiceError> {
732        let name = route_id(route, "KeyValueStore")?;
733        let if_match = require_if_match(req)?;
734        let parsed: UpdateKeyValueStoreRequest = xml_io::from_xml_root(&req.body)
735            .map_err(|e| invalid_argument(format!("invalid UpdateKeyValueStore XML: {e}")))?;
736        let mut state = self.state.write();
737        let account = state
738            .accounts
739            .get_mut(DEFAULT_ACCOUNT)
740            .ok_or_else(|| not_found("KeyValueStore", &name))?;
741        let kvs = account
742            .key_value_stores
743            .get_mut(&name)
744            .ok_or_else(|| not_found("KeyValueStore", &name))?;
745        if kvs.etag != if_match {
746            return Err(precondition_failed());
747        }
748        kvs.comment = Some(parsed.comment);
749        kvs.etag = generate_id_with_prefix("E");
750        kvs.last_modified_time = Utc::now();
751        let snap = kvs.clone();
752        drop(state);
753        let body = render_key_value_store(&snap, "UpdateKeyValueStoreResult");
754        Ok(xml_with_etag(StatusCode::OK, body, &snap.etag, None))
755    }
756
757    pub(crate) fn delete_key_value_store(
758        &self,
759        req: &AwsRequest,
760        route: &Route,
761    ) -> Result<AwsResponse, AwsServiceError> {
762        let name = route_id(route, "KeyValueStore")?;
763        let if_match = require_if_match(req)?;
764        let mut state = self.state.write();
765        let account = state
766            .accounts
767            .get_mut(DEFAULT_ACCOUNT)
768            .ok_or_else(|| not_found("KeyValueStore", &name))?;
769        let kvs = account
770            .key_value_stores
771            .get(&name)
772            .ok_or_else(|| not_found("KeyValueStore", &name))?;
773        if kvs.etag != if_match {
774            return Err(precondition_failed());
775        }
776        account.key_value_stores.remove(&name);
777        drop(state);
778        Ok(crate::policies::empty(StatusCode::NO_CONTENT))
779    }
780
781    pub(crate) fn list_key_value_stores(
782        &self,
783        _req: &AwsRequest,
784    ) -> Result<AwsResponse, AwsServiceError> {
785        let state = self.state.read();
786        let mut items: Vec<StoredKeyValueStore> = state
787            .accounts
788            .get(DEFAULT_ACCOUNT)
789            .map(|a| a.key_value_stores.values().cloned().collect())
790            .unwrap_or_default();
791        drop(state);
792        items.sort_by(|a, b| a.name.cmp(&b.name));
793
794        let mut body = String::with_capacity(512);
795        body.push_str(XML_DECL);
796        body.push_str(&format!("<KeyValueStoreList xmlns=\"{NS}\">"));
797        body.push_str("<NextMarker></NextMarker>");
798        body.push_str("<MaxItems>100</MaxItems>");
799        body.push_str(&format!("<Quantity>{}</Quantity>", items.len()));
800        body.push_str("<Items>");
801        for kvs in &items {
802            body.push_str("<KeyValueStore>");
803            push_kvs_inner(&mut body, kvs);
804            body.push_str("</KeyValueStore>");
805        }
806        body.push_str("</Items>");
807        body.push_str("</KeyValueStoreList>");
808        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
809    }
810}
811
812// ─── Origin Access Identities (legacy) ────────────────────────────────
813
814impl CloudFrontService {
815    pub(crate) fn create_oai(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
816        let cfg: CloudFrontOriginAccessIdentityConfig = xml_io::from_xml_root(&req.body)
817            .map_err(|e| invalid_argument(format!("invalid OAI config XML: {e}")))?;
818        if cfg.caller_reference.is_empty() {
819            return Err(invalid_argument("CallerReference is required"));
820        }
821        let mut state = self.state.write();
822        let account = state
823            .accounts
824            .entry(DEFAULT_ACCOUNT.to_string())
825            .or_default();
826        if let Some(existing) = account
827            .origin_access_identities
828            .values()
829            .find(|o| o.config.caller_reference == cfg.caller_reference)
830        {
831            return Err(aws_error(
832                StatusCode::CONFLICT,
833                "CloudFrontOriginAccessIdentityAlreadyExists",
834                format!("OAI with same CallerReference exists: {}", existing.id),
835            ));
836        }
837        let id = format!(
838            "E{}",
839            Uuid::new_v4()
840                .simple()
841                .to_string()
842                .to_uppercase()
843                .chars()
844                .take(13)
845                .collect::<String>()
846        );
847        let etag = generate_id_with_prefix("E");
848        let canonical = Uuid::new_v4().simple().to_string();
849        let stored = StoredOriginAccessIdentity {
850            id: id.clone(),
851            etag: etag.clone(),
852            s3_canonical_user_id: canonical,
853            config: cfg,
854        };
855        account
856            .origin_access_identities
857            .insert(id.clone(), stored.clone());
858        drop(state);
859        let body = render_oai(&stored, "CloudFrontOriginAccessIdentity");
860        Ok(xml_with_etag(StatusCode::CREATED, body, &etag, Some(&id)))
861    }
862
863    pub(crate) fn get_oai(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
864        let id = route_id(route, "CloudFrontOriginAccessIdentity")?;
865        let state = self.state.read();
866        let oai = state
867            .accounts
868            .get(DEFAULT_ACCOUNT)
869            .and_then(|a| a.origin_access_identities.get(&id).cloned())
870            .ok_or_else(|| not_found("CloudFrontOriginAccessIdentity", &id))?;
871        drop(state);
872        let body = render_oai(&oai, "CloudFrontOriginAccessIdentity");
873        Ok(xml_with_etag(StatusCode::OK, body, &oai.etag, None))
874    }
875
876    pub(crate) fn get_oai_config(&self, route: &Route) -> Result<AwsResponse, AwsServiceError> {
877        let id = route_id(route, "CloudFrontOriginAccessIdentity")?;
878        let state = self.state.read();
879        let oai = state
880            .accounts
881            .get(DEFAULT_ACCOUNT)
882            .and_then(|a| a.origin_access_identities.get(&id).cloned())
883            .ok_or_else(|| not_found("CloudFrontOriginAccessIdentity", &id))?;
884        drop(state);
885        let body = config_xml("CloudFrontOriginAccessIdentityConfig", &oai.config)?;
886        Ok(xml_with_etag(StatusCode::OK, body, &oai.etag, None))
887    }
888
889    pub(crate) fn update_oai(
890        &self,
891        req: &AwsRequest,
892        route: &Route,
893    ) -> Result<AwsResponse, AwsServiceError> {
894        let id = route_id(route, "CloudFrontOriginAccessIdentity")?;
895        let if_match = require_if_match(req)?;
896        let cfg: CloudFrontOriginAccessIdentityConfig = xml_io::from_xml_root(&req.body)
897            .map_err(|e| invalid_argument(format!("invalid OAI config XML: {e}")))?;
898        let mut state = self.state.write();
899        let account = state
900            .accounts
901            .get_mut(DEFAULT_ACCOUNT)
902            .ok_or_else(|| not_found("CloudFrontOriginAccessIdentity", &id))?;
903        let oai = account
904            .origin_access_identities
905            .get_mut(&id)
906            .ok_or_else(|| not_found("CloudFrontOriginAccessIdentity", &id))?;
907        if oai.etag != if_match {
908            return Err(precondition_failed());
909        }
910        if oai.config.caller_reference != cfg.caller_reference {
911            return Err(invalid_argument(
912                "CallerReference cannot change on UpdateCloudFrontOriginAccessIdentity",
913            ));
914        }
915        oai.config = cfg;
916        oai.etag = generate_id_with_prefix("E");
917        let snap = oai.clone();
918        drop(state);
919        let body = render_oai(&snap, "CloudFrontOriginAccessIdentity");
920        Ok(xml_with_etag(StatusCode::OK, body, &snap.etag, None))
921    }
922
923    pub(crate) fn delete_oai(
924        &self,
925        req: &AwsRequest,
926        route: &Route,
927    ) -> Result<AwsResponse, AwsServiceError> {
928        let id = route_id(route, "CloudFrontOriginAccessIdentity")?;
929        let if_match = require_if_match(req)?;
930        let mut state = self.state.write();
931        let account = state
932            .accounts
933            .get_mut(DEFAULT_ACCOUNT)
934            .ok_or_else(|| not_found("CloudFrontOriginAccessIdentity", &id))?;
935        let oai = account
936            .origin_access_identities
937            .get(&id)
938            .ok_or_else(|| not_found("CloudFrontOriginAccessIdentity", &id))?;
939        if oai.etag != if_match {
940            return Err(precondition_failed());
941        }
942        account.origin_access_identities.remove(&id);
943        drop(state);
944        Ok(crate::policies::empty(StatusCode::NO_CONTENT))
945    }
946
947    pub(crate) fn list_oai(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
948        let state = self.state.read();
949        let mut items: Vec<StoredOriginAccessIdentity> = state
950            .accounts
951            .get(DEFAULT_ACCOUNT)
952            .map(|a| a.origin_access_identities.values().cloned().collect())
953            .unwrap_or_default();
954        drop(state);
955        items.sort_by(|a, b| a.id.cmp(&b.id));
956
957        let mut body = String::with_capacity(512);
958        body.push_str(XML_DECL);
959        body.push_str(&format!(
960            "<CloudFrontOriginAccessIdentityList xmlns=\"{NS}\">"
961        ));
962        body.push_str("<Marker></Marker>");
963        body.push_str("<MaxItems>100</MaxItems>");
964        body.push_str("<IsTruncated>false</IsTruncated>");
965        body.push_str(&format!("<Quantity>{}</Quantity>", items.len()));
966        body.push_str("<Items>");
967        for oai in &items {
968            body.push_str("<CloudFrontOriginAccessIdentitySummary>");
969            body.push_str(&format!("<Id>{}</Id>", esc(&oai.id)));
970            body.push_str(&format!(
971                "<S3CanonicalUserId>{}</S3CanonicalUserId>",
972                esc(&oai.s3_canonical_user_id)
973            ));
974            body.push_str(&format!("<Comment>{}</Comment>", esc(&oai.config.comment)));
975            body.push_str("</CloudFrontOriginAccessIdentitySummary>");
976        }
977        body.push_str("</Items>");
978        body.push_str("</CloudFrontOriginAccessIdentityList>");
979        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
980    }
981}
982
983// ─── Monitoring Subscriptions ─────────────────────────────────────────
984
985impl CloudFrontService {
986    pub(crate) fn create_monitoring_subscription(
987        &self,
988        req: &AwsRequest,
989        route: &Route,
990    ) -> Result<AwsResponse, AwsServiceError> {
991        let dist_id = route_id(route, "Distribution")?;
992        // Check the distribution exists before parsing the body. Synthetic
993        // probes hit this op against a placeholder distribution ID; surfacing
994        // NoSuchDistribution (declared in the Smithy errors list) before any
995        // XML-parse InvalidArgument keeps the conformance probe honest.
996        {
997            let state = self.state.read();
998            let has_dist = state
999                .accounts
1000                .get(DEFAULT_ACCOUNT)
1001                .is_some_and(|a| a.distributions.contains_key(&dist_id));
1002            if !has_dist {
1003                return Err(not_found("Distribution", &dist_id));
1004            }
1005        }
1006        let parsed: MonitoringSubscriptionBody = xml_io::from_xml_root(&req.body)
1007            .map_err(|e| invalid_argument(format!("invalid MonitoringSubscription XML: {e}")))?;
1008        let mut state = self.state.write();
1009        let account = state
1010            .accounts
1011            .entry(DEFAULT_ACCOUNT.to_string())
1012            .or_default();
1013        if !account.distributions.contains_key(&dist_id) {
1014            return Err(not_found("Distribution", &dist_id));
1015        }
1016        let stored = StoredMonitoringSubscription {
1017            distribution_id: dist_id.clone(),
1018            config: parsed.realtime_metrics_subscription_config,
1019        };
1020        account
1021            .monitoring_subscriptions
1022            .insert(dist_id.clone(), stored.clone());
1023        drop(state);
1024        let body = render_monitoring(&stored);
1025        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1026    }
1027
1028    pub(crate) fn get_monitoring_subscription(
1029        &self,
1030        route: &Route,
1031    ) -> Result<AwsResponse, AwsServiceError> {
1032        let dist_id = route_id(route, "Distribution")?;
1033        let state = self.state.read();
1034        let m = state
1035            .accounts
1036            .get(DEFAULT_ACCOUNT)
1037            .and_then(|a| a.monitoring_subscriptions.get(&dist_id).cloned())
1038            .ok_or_else(|| {
1039                aws_error(
1040                    StatusCode::NOT_FOUND,
1041                    "NoSuchMonitoringSubscription",
1042                    format!("No monitoring subscription for distribution {dist_id}"),
1043                )
1044            })?;
1045        drop(state);
1046        let body = render_monitoring(&m);
1047        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
1048    }
1049
1050    pub(crate) fn delete_monitoring_subscription(
1051        &self,
1052        route: &Route,
1053    ) -> Result<AwsResponse, AwsServiceError> {
1054        let dist_id = route_id(route, "Distribution")?;
1055        let mut state = self.state.write();
1056        let account = state
1057            .accounts
1058            .get_mut(DEFAULT_ACCOUNT)
1059            .ok_or_else(|| not_found("Distribution", &dist_id))?;
1060        if account.monitoring_subscriptions.remove(&dist_id).is_none() {
1061            return Err(aws_error(
1062                StatusCode::NOT_FOUND,
1063                "NoSuchMonitoringSubscription",
1064                format!("No monitoring subscription for distribution {dist_id}"),
1065            ));
1066        }
1067        drop(state);
1068        Ok(crate::policies::empty(StatusCode::NO_CONTENT))
1069    }
1070}
1071
1072// ─── Helpers ──────────────────────────────────────────────────────────
1073
1074#[derive(Debug, serde::Deserialize)]
1075#[serde(rename_all = "PascalCase")]
1076struct CreateFunctionRequest {
1077    name: String,
1078    function_config: FunctionConfig,
1079    /// Base64-encoded source.
1080    function_code: String,
1081}
1082
1083#[derive(Debug, serde::Deserialize)]
1084#[serde(rename_all = "PascalCase")]
1085struct UpdateFunctionRequest {
1086    function_config: FunctionConfig,
1087    function_code: String,
1088}
1089
1090#[derive(Debug, serde::Deserialize)]
1091#[serde(rename_all = "PascalCase")]
1092struct TestFunctionRequest {
1093    #[serde(default)]
1094    event_object: String,
1095    #[serde(default)]
1096    stage: Option<String>,
1097}
1098
1099#[derive(Debug, serde::Deserialize)]
1100#[serde(rename_all = "PascalCase")]
1101struct CreateKeyValueStoreRequest {
1102    name: String,
1103    #[serde(default)]
1104    comment: Option<String>,
1105    #[serde(default)]
1106    import_source: Option<ImportSource>,
1107}
1108
1109#[derive(Debug, serde::Deserialize)]
1110#[serde(rename_all = "PascalCase")]
1111struct UpdateKeyValueStoreRequest {
1112    comment: String,
1113}
1114
1115fn config_xml<T: serde::Serialize>(root: &str, cfg: &T) -> Result<String, AwsServiceError> {
1116    let inner = quick_xml::se::to_string_with_root(root, cfg).map_err(|e| {
1117        aws_error(
1118            StatusCode::INTERNAL_SERVER_ERROR,
1119            "InternalError",
1120            format!("xml encode failed: {e}"),
1121        )
1122    })?;
1123    let stamped = inner.replacen(
1124        &format!("<{root}>"),
1125        &format!("<{root} xmlns=\"{NS}\">", NS = crate::NAMESPACE),
1126        1,
1127    );
1128    Ok(format!("{XML_DECL}{stamped}"))
1129}
1130
1131fn parse_stage_query(query: &str) -> Option<String> {
1132    use std::collections::HashMap;
1133    let pairs: HashMap<&str, &str> = query.split('&').filter_map(|p| p.split_once('=')).collect();
1134    pairs.get("Stage").map(|s| s.to_string())
1135}
1136
1137/// Validate the `Stage` query against the FunctionStage enum
1138/// (DEVELOPMENT | LIVE). Returns InvalidArgument for unknown values; absent
1139/// or valid is OK.
1140pub(crate) fn validate_stage_query(
1141    query: &str,
1142) -> Result<Option<String>, fakecloud_core::service::AwsServiceError> {
1143    let stage = parse_stage_query(query);
1144    if let Some(ref v) = stage {
1145        if v != "DEVELOPMENT" && v != "LIVE" {
1146            return Err(crate::service::invalid_argument(format!(
1147                "Stage must be one of 'DEVELOPMENT' or 'LIVE', got '{v}'"
1148            )));
1149        }
1150    }
1151    Ok(stage)
1152}
1153
1154fn stage_view(f: &StoredFunction, stage: &Option<String>) -> StoredFunction {
1155    let mut clone = f.clone();
1156    if stage.as_deref() == Some("LIVE") {
1157        clone.stage = "LIVE".into();
1158        // GetFunction/TestFunction against the LIVE stage must serve the
1159        // code frozen at PublishFunction, not the mutable DEVELOPMENT
1160        // working copy. If the function was never published there is no
1161        // LIVE snapshot, so fall back to the development code unchanged.
1162        if let Some(live) = &f.live_function_code {
1163            clone.function_code = live.clone();
1164        }
1165    }
1166    clone
1167}
1168
1169fn render_function_summary(f: &StoredFunction, _root: &str) -> String {
1170    // CloudFront returns FunctionSummary as the root for Create/Describe/
1171    // Update/Publish — there is no operation-specific wrapper element.
1172    let mut out = String::with_capacity(512);
1173    out.push_str(XML_DECL);
1174    out.push_str(&render_function_summary_inner_with_ns(f));
1175    out
1176}
1177
1178fn render_function_summary_inner_with_ns(f: &StoredFunction) -> String {
1179    let mut out = String::with_capacity(512);
1180    out.push_str(&format!("<FunctionSummary xmlns=\"{NS}\">"));
1181    out.push_str(&render_function_summary_body(f));
1182    out.push_str("</FunctionSummary>");
1183    out
1184}
1185
1186fn render_function_summary_inner(f: &StoredFunction) -> String {
1187    let mut out = String::with_capacity(512);
1188    out.push_str("<FunctionSummary>");
1189    out.push_str(&render_function_summary_body(f));
1190    out.push_str("</FunctionSummary>");
1191    out
1192}
1193
1194fn render_function_summary_body(f: &StoredFunction) -> String {
1195    let mut out = String::with_capacity(512);
1196    out.push_str(&format!("<Name>{}</Name>", esc(&f.name)));
1197    out.push_str(&format!("<Status>{}</Status>", esc(&f.status)));
1198    out.push_str("<FunctionConfig>");
1199    if let Some(c) = &f.config.comment {
1200        out.push_str(&format!("<Comment>{}</Comment>", esc(c)));
1201    } else {
1202        out.push_str("<Comment></Comment>");
1203    }
1204    out.push_str(&format!("<Runtime>{}</Runtime>", esc(&f.config.runtime)));
1205    if let Some(kvsa) = &f.config.key_value_store_associations {
1206        out.push_str("<KeyValueStoreAssociations>");
1207        out.push_str(&format!("<Quantity>{}</Quantity>", kvsa.quantity));
1208        if let Some(items) = &kvsa.items {
1209            out.push_str("<Items>");
1210            for a in &items.key_value_store_association {
1211                out.push_str("<KeyValueStoreAssociation>");
1212                out.push_str(&format!(
1213                    "<KeyValueStoreARN>{}</KeyValueStoreARN>",
1214                    esc(&a.key_value_store_arn)
1215                ));
1216                out.push_str("</KeyValueStoreAssociation>");
1217            }
1218            out.push_str("</Items>");
1219        }
1220        out.push_str("</KeyValueStoreAssociations>");
1221    }
1222    out.push_str("</FunctionConfig>");
1223    out.push_str("<FunctionMetadata>");
1224    out.push_str(&format!(
1225        "<FunctionARN>{}</FunctionARN>",
1226        esc(&f.function_arn)
1227    ));
1228    out.push_str(&format!("<Stage>{}</Stage>", esc(&f.stage)));
1229    out.push_str(&format!(
1230        "<CreatedTime>{}</CreatedTime>",
1231        rfc3339(&f.created_time)
1232    ));
1233    out.push_str(&format!(
1234        "<LastModifiedTime>{}</LastModifiedTime>",
1235        rfc3339(&f.last_modified_time)
1236    ));
1237    out.push_str("</FunctionMetadata>");
1238    out
1239}
1240
1241fn render_public_key(p: &StoredPublicKey, root: &str) -> String {
1242    let mut out = String::with_capacity(512);
1243    out.push_str(XML_DECL);
1244    out.push_str(&format!("<{root} xmlns=\"{NS}\">"));
1245    out.push_str(&format!("<Id>{}</Id>", esc(&p.id)));
1246    out.push_str(&format!(
1247        "<CreatedTime>{}</CreatedTime>",
1248        rfc3339(&p.created_time)
1249    ));
1250    out.push_str("<PublicKeyConfig>");
1251    out.push_str(&format!(
1252        "<CallerReference>{}</CallerReference>",
1253        esc(&p.config.caller_reference)
1254    ));
1255    out.push_str(&format!("<Name>{}</Name>", esc(&p.config.name)));
1256    out.push_str(&format!(
1257        "<EncodedKey>{}</EncodedKey>",
1258        esc(&p.config.encoded_key)
1259    ));
1260    if let Some(c) = &p.config.comment {
1261        out.push_str(&format!("<Comment>{}</Comment>", esc(c)));
1262    }
1263    out.push_str("</PublicKeyConfig>");
1264    out.push_str(&format!("</{root}>"));
1265    out
1266}
1267
1268fn push_key_group_inner(out: &mut String, g: &StoredKeyGroup) {
1269    out.push_str(&format!("<Id>{}</Id>", esc(&g.id)));
1270    out.push_str(&format!(
1271        "<LastModifiedTime>{}</LastModifiedTime>",
1272        rfc3339(&g.last_modified_time)
1273    ));
1274    out.push_str("<KeyGroupConfig>");
1275    out.push_str(&format!("<Name>{}</Name>", esc(&g.config.name)));
1276    out.push_str("<Items>");
1277    for k in &g.config.items.public_key {
1278        out.push_str(&format!("<PublicKey>{}</PublicKey>", esc(k)));
1279    }
1280    out.push_str("</Items>");
1281    if let Some(c) = &g.config.comment {
1282        out.push_str(&format!("<Comment>{}</Comment>", esc(c)));
1283    }
1284    out.push_str("</KeyGroupConfig>");
1285}
1286
1287fn render_key_group(g: &StoredKeyGroup, root: &str) -> String {
1288    let mut out = String::with_capacity(512);
1289    out.push_str(XML_DECL);
1290    out.push_str(&format!("<{root} xmlns=\"{NS}\">"));
1291    push_key_group_inner(&mut out, g);
1292    out.push_str(&format!("</{root}>"));
1293    out
1294}
1295
1296fn push_kvs_inner(out: &mut String, kvs: &StoredKeyValueStore) {
1297    out.push_str(&format!("<Name>{}</Name>", esc(&kvs.name)));
1298    out.push_str(&format!("<Id>{}</Id>", esc(&kvs.id)));
1299    out.push_str(&format!(
1300        "<Comment>{}</Comment>",
1301        esc(kvs.comment.as_deref().unwrap_or(""))
1302    ));
1303    out.push_str(&format!("<ARN>{}</ARN>", esc(&kvs.arn)));
1304    out.push_str(&format!("<Status>{}</Status>", esc(&kvs.status)));
1305    out.push_str(&format!(
1306        "<LastModifiedTime>{}</LastModifiedTime>",
1307        rfc3339(&kvs.last_modified_time)
1308    ));
1309}
1310
1311fn render_key_value_store(kvs: &StoredKeyValueStore, _root: &str) -> String {
1312    // SDK expects KeyValueStore as root for Create/Describe/Update.
1313    let mut out = String::with_capacity(512);
1314    out.push_str(XML_DECL);
1315    out.push_str(&format!("<KeyValueStore xmlns=\"{NS}\">"));
1316    push_kvs_inner(&mut out, kvs);
1317    out.push_str("</KeyValueStore>");
1318    out
1319}
1320
1321fn render_oai(oai: &StoredOriginAccessIdentity, root: &str) -> String {
1322    let mut out = String::with_capacity(512);
1323    out.push_str(XML_DECL);
1324    out.push_str(&format!("<{root} xmlns=\"{NS}\">"));
1325    out.push_str(&format!("<Id>{}</Id>", esc(&oai.id)));
1326    out.push_str(&format!(
1327        "<S3CanonicalUserId>{}</S3CanonicalUserId>",
1328        esc(&oai.s3_canonical_user_id)
1329    ));
1330    out.push_str("<CloudFrontOriginAccessIdentityConfig>");
1331    out.push_str(&format!(
1332        "<CallerReference>{}</CallerReference>",
1333        esc(&oai.config.caller_reference)
1334    ));
1335    out.push_str(&format!("<Comment>{}</Comment>", esc(&oai.config.comment)));
1336    out.push_str("</CloudFrontOriginAccessIdentityConfig>");
1337    out.push_str(&format!("</{root}>"));
1338    out
1339}
1340
1341fn render_monitoring(m: &StoredMonitoringSubscription) -> String {
1342    let mut out = String::with_capacity(256);
1343    out.push_str(XML_DECL);
1344    out.push_str(&format!("<MonitoringSubscription xmlns=\"{NS}\">"));
1345    out.push_str("<RealtimeMetricsSubscriptionConfig>");
1346    out.push_str(&format!(
1347        "<RealtimeMetricsSubscriptionStatus>{}</RealtimeMetricsSubscriptionStatus>",
1348        esc(&m.config.realtime_metrics_subscription_status)
1349    ));
1350    out.push_str("</RealtimeMetricsSubscriptionConfig>");
1351    out.push_str("</MonitoringSubscription>");
1352    out
1353}
1354
1355#[cfg(test)]
1356mod tests {
1357    use super::*;
1358    use crate::service::CloudFrontService;
1359    use crate::state::CloudFrontAccounts;
1360    use bytes::Bytes;
1361    use fakecloud_core::service::AwsService;
1362    use http::HeaderValue;
1363    use parking_lot::RwLock;
1364    use std::sync::Arc;
1365
1366    fn svc() -> CloudFrontService {
1367        CloudFrontService::new(Arc::new(RwLock::new(CloudFrontAccounts::new())))
1368    }
1369
1370    fn req(method: http::Method, path: &str, body: &str, if_match: Option<&str>) -> AwsRequest {
1371        let mut headers = HeaderMap::new();
1372        if let Some(v) = if_match {
1373            headers.insert(http::header::IF_MATCH, HeaderValue::from_str(v).unwrap());
1374        }
1375        AwsRequest {
1376            service: "cloudfront".into(),
1377            action: String::new(),
1378            region: "us-east-1".into(),
1379            account_id: DEFAULT_ACCOUNT.into(),
1380            request_id: uuid::Uuid::new_v4().to_string(),
1381            headers,
1382            query_params: std::collections::HashMap::new(),
1383            body_stream: parking_lot::Mutex::new(None),
1384            body: Bytes::from(body.to_string()),
1385            path_segments: path
1386                .split('/')
1387                .filter(|s| !s.is_empty())
1388                .map(String::from)
1389                .collect(),
1390            raw_path: path.into(),
1391            raw_query: String::new(),
1392            method,
1393            is_query_protocol: false,
1394            access_key_id: None,
1395            principal: None,
1396        }
1397    }
1398
1399    async fn create_function(svc: &CloudFrontService, name: &str, code: &str) -> String {
1400        let code_b64 = base64::engine::general_purpose::STANDARD.encode(code.as_bytes());
1401        let body = format!(
1402            r#"<?xml version="1.0"?>
1403<CreateFunctionRequest xmlns="{NS}">
1404  <Name>{name}</Name>
1405  <FunctionConfig>
1406    <Comment>t</Comment>
1407    <Runtime>cloudfront-js-2.0</Runtime>
1408  </FunctionConfig>
1409  <FunctionCode>{code_b64}</FunctionCode>
1410</CreateFunctionRequest>"#
1411        );
1412        let resp = svc
1413            .handle(req(http::Method::POST, "/2020-05-31/function", &body, None))
1414            .await
1415            .unwrap();
1416        assert_eq!(resp.status, StatusCode::CREATED);
1417        resp.headers
1418            .get(http::header::ETAG)
1419            .unwrap()
1420            .to_str()
1421            .unwrap()
1422            .to_string()
1423    }
1424
1425    fn test_function_request_xml(event_json: &str) -> String {
1426        test_function_request_xml_with_stage(event_json, "DEVELOPMENT")
1427    }
1428
1429    fn test_function_request_xml_with_stage(event_json: &str, stage: &str) -> String {
1430        let event_b64 = base64::engine::general_purpose::STANDARD.encode(event_json.as_bytes());
1431        format!(
1432            r#"<?xml version="1.0"?>
1433<TestFunctionRequest xmlns="{NS}">
1434  <Stage>{stage}</Stage>
1435  <EventObject>{event_b64}</EventObject>
1436</TestFunctionRequest>"#
1437        )
1438    }
1439
1440    async fn update_function(
1441        svc: &CloudFrontService,
1442        name: &str,
1443        code: &str,
1444        if_match: &str,
1445    ) -> String {
1446        let code_b64 = base64::engine::general_purpose::STANDARD.encode(code.as_bytes());
1447        let body = format!(
1448            r#"<?xml version="1.0"?>
1449<UpdateFunctionRequest xmlns="{NS}">
1450  <FunctionConfig>
1451    <Comment>t</Comment>
1452    <Runtime>cloudfront-js-2.0</Runtime>
1453  </FunctionConfig>
1454  <FunctionCode>{code_b64}</FunctionCode>
1455</UpdateFunctionRequest>"#
1456        );
1457        let resp = svc
1458            .handle(req(
1459                http::Method::PUT,
1460                &format!("/2020-05-31/function/{name}"),
1461                &body,
1462                Some(if_match),
1463            ))
1464            .await
1465            .unwrap();
1466        assert_eq!(resp.status, StatusCode::OK);
1467        resp.headers
1468            .get(http::header::ETAG)
1469            .unwrap()
1470            .to_str()
1471            .unwrap()
1472            .to_string()
1473    }
1474
1475    async fn publish_function(svc: &CloudFrontService, name: &str, if_match: &str) -> String {
1476        let resp = svc
1477            .handle(req(
1478                http::Method::POST,
1479                &format!("/2020-05-31/function/{name}/publish"),
1480                "",
1481                Some(if_match),
1482            ))
1483            .await
1484            .unwrap();
1485        assert_eq!(resp.status, StatusCode::OK);
1486        resp.headers
1487            .get(http::header::ETAG)
1488            .unwrap()
1489            .to_str()
1490            .unwrap()
1491            .to_string()
1492    }
1493
1494    #[tokio::test]
1495    async fn test_function_executes_handler_and_returns_result() {
1496        let svc = svc();
1497        let etag = create_function(
1498            &svc,
1499            "fn-ok",
1500            r#"function handler(event) { event.headers.x = "y"; return event; }"#,
1501        )
1502        .await;
1503        let body = test_function_request_xml(r#"{"headers":{}}"#);
1504        let resp = svc
1505            .handle(req(
1506                http::Method::POST,
1507                "/2020-05-31/function/fn-ok/test",
1508                &body,
1509                Some(&etag),
1510            ))
1511            .await
1512            .unwrap();
1513        assert_eq!(resp.status, StatusCode::OK);
1514        let xml = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
1515        assert!(
1516            xml.contains("&quot;x&quot;:&quot;y&quot;"),
1517            "expected x:y in FunctionOutput, got {xml}"
1518        );
1519        assert!(xml.contains("<FunctionErrorMessage></FunctionErrorMessage>"));
1520    }
1521
1522    #[tokio::test]
1523    async fn test_function_propagates_js_error_into_message() {
1524        let svc = svc();
1525        let etag = create_function(
1526            &svc,
1527            "fn-err",
1528            r#"function handler() { throw new Error("boom"); }"#,
1529        )
1530        .await;
1531        let body = test_function_request_xml("{}");
1532        let resp = svc
1533            .handle(req(
1534                http::Method::POST,
1535                "/2020-05-31/function/fn-err/test",
1536                &body,
1537                Some(&etag),
1538            ))
1539            .await
1540            .unwrap();
1541        assert_eq!(resp.status, StatusCode::OK);
1542        let xml = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
1543        assert!(
1544            xml.contains("boom"),
1545            "expected boom in error msg, got {xml}"
1546        );
1547        assert!(xml.contains("<FunctionOutput></FunctionOutput>"));
1548    }
1549
1550    #[tokio::test]
1551    async fn test_function_unknown_name_returns_error() {
1552        let svc = svc();
1553        let body = test_function_request_xml("{}");
1554        let err = match svc
1555            .handle(req(
1556                http::Method::POST,
1557                "/2020-05-31/function/missing/test",
1558                &body,
1559                Some("E0"),
1560            ))
1561            .await
1562        {
1563            Err(e) => e,
1564            Ok(_) => panic!("expected NoSuchFunctionExists, got Ok"),
1565        };
1566        assert_eq!(err.status(), StatusCode::NOT_FOUND);
1567        assert_eq!(err.code(), "NoSuchFunctionExists");
1568    }
1569
1570    #[tokio::test]
1571    async fn test_function_modifies_aws_request_shape() {
1572        // Mirrors the canonical CloudFront Functions example from the
1573        // AWS docs: handler rewrites a request header and returns the
1574        // request, fakecloud passes the JSON shape straight through.
1575        let svc = svc();
1576        let etag = create_function(
1577            &svc,
1578            "fn-aws-shape",
1579            r#"function handler(event) { event.request.headers["x-foo"] = {value: "bar"}; return event.request; }"#,
1580        )
1581        .await;
1582        let body = test_function_request_xml(
1583            r#"{"version":"1.0","context":{},"viewer":{},"request":{"method":"GET","uri":"/","querystring":{},"headers":{},"cookies":{}}}"#,
1584        );
1585        let resp = svc
1586            .handle(req(
1587                http::Method::POST,
1588                "/2020-05-31/function/fn-aws-shape/test",
1589                &body,
1590                Some(&etag),
1591            ))
1592            .await
1593            .unwrap();
1594        assert_eq!(resp.status, StatusCode::OK);
1595        let xml = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
1596        assert!(
1597            xml.contains("x-foo"),
1598            "expected request header rewrite in output, got {xml}"
1599        );
1600        assert!(
1601            xml.contains("bar"),
1602            "expected header value in output, got {xml}"
1603        );
1604        assert!(
1605            xml.contains("<FunctionErrorMessage></FunctionErrorMessage>"),
1606            "expected empty error, got {xml}"
1607        );
1608    }
1609
1610    #[tokio::test]
1611    async fn test_function_logs_error_and_marks_compute_over_100() {
1612        let svc = svc();
1613        let etag = create_function(
1614            &svc,
1615            "fn-throws",
1616            r#"function handler() { throw new Error("kaboom"); }"#,
1617        )
1618        .await;
1619        let body = test_function_request_xml("{}");
1620        let resp = svc
1621            .handle(req(
1622                http::Method::POST,
1623                "/2020-05-31/function/fn-throws/test",
1624                &body,
1625                Some(&etag),
1626            ))
1627            .await
1628            .unwrap();
1629        assert_eq!(resp.status, StatusCode::OK);
1630        let xml = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
1631        // Error appears in both the dedicated message and in the logs
1632        assert!(xml.contains("kaboom"), "expected kaboom in body, got {xml}");
1633        assert!(
1634            xml.contains("<FunctionExecutionLogs>")
1635                && xml.contains("<member>ERROR: ")
1636                && xml.contains("kaboom"),
1637            "expected error log line, got {xml}"
1638        );
1639        // ComputeUtilization is rendered as a plain integer; on failure
1640        // we saturate past 100.
1641        let cu_open = xml.find("<ComputeUtilization>").unwrap() + "<ComputeUtilization>".len();
1642        let cu_close = xml.find("</ComputeUtilization>").unwrap();
1643        let pct: u32 = xml[cu_open..cu_close].parse().unwrap();
1644        assert!(pct > 100, "expected pct > 100 on error, got {pct}");
1645    }
1646
1647    #[tokio::test]
1648    async fn test_function_stage_selects_published_or_development_code() {
1649        // Publish freezes a copy of the code; subsequent UpdateFunction
1650        // mutates DEVELOPMENT but leaves LIVE pinned to the published
1651        // snapshot. Each stage's TestFunction must run the matching
1652        // version.
1653        let svc = svc();
1654        let etag =
1655            create_function(&svc, "fn-stage", r#"function handler() { return "v1"; }"#).await;
1656        let pub_etag = publish_function(&svc, "fn-stage", &etag).await;
1657        let _new_etag = update_function(
1658            &svc,
1659            "fn-stage",
1660            r#"function handler() { return "v2"; }"#,
1661            &pub_etag,
1662        )
1663        .await;
1664
1665        // DEVELOPMENT runs v2 (the latest update body).
1666        let dev_body = test_function_request_xml_with_stage("{}", "DEVELOPMENT");
1667        let resp = svc
1668            .handle(req(
1669                http::Method::POST,
1670                "/2020-05-31/function/fn-stage/test",
1671                &dev_body,
1672                Some("E_NOT_MATCHING"),
1673            ))
1674            .await;
1675        // We deliberately allow stale If-Match here so we exercise the
1676        // stage path; the precondition fires before we get to JS, so
1677        // grab the latest etag and retry.
1678        assert!(resp.is_err(), "stale If-Match must be rejected");
1679        let described = svc
1680            .handle(req(
1681                http::Method::GET,
1682                "/2020-05-31/function/fn-stage",
1683                "",
1684                None,
1685            ))
1686            .await
1687            .unwrap();
1688        let live_etag = described
1689            .headers
1690            .get(http::header::ETAG)
1691            .unwrap()
1692            .to_str()
1693            .unwrap()
1694            .to_string();
1695
1696        let dev_resp = svc
1697            .handle(req(
1698                http::Method::POST,
1699                "/2020-05-31/function/fn-stage/test",
1700                &dev_body,
1701                Some(&live_etag),
1702            ))
1703            .await
1704            .unwrap();
1705        let dev_xml = std::str::from_utf8(dev_resp.body.expect_bytes()).unwrap();
1706        assert!(
1707            dev_xml.contains("&quot;v2&quot;"),
1708            "DEVELOPMENT should run latest update (v2), got {dev_xml}"
1709        );
1710
1711        // LIVE runs v1 (the published snapshot, not affected by the
1712        // post-publish update).
1713        let live_body = test_function_request_xml_with_stage("{}", "LIVE");
1714        let live_resp = svc
1715            .handle(req(
1716                http::Method::POST,
1717                "/2020-05-31/function/fn-stage/test",
1718                &live_body,
1719                Some(&live_etag),
1720            ))
1721            .await
1722            .unwrap();
1723        let live_xml = std::str::from_utf8(live_resp.body.expect_bytes()).unwrap();
1724        assert!(
1725            live_xml.contains("&quot;v1&quot;"),
1726            "LIVE should run published snapshot (v1), got {live_xml}"
1727        );
1728    }
1729
1730    #[tokio::test]
1731    async fn test_function_infinite_loop_is_killed() {
1732        let svc = svc();
1733        let etag = create_function(&svc, "fn-loop", r#"function handler() { while(1){} }"#).await;
1734        let body = test_function_request_xml("{}");
1735        let resp = svc
1736            .handle(req(
1737                http::Method::POST,
1738                "/2020-05-31/function/fn-loop/test",
1739                &body,
1740                Some(&etag),
1741            ))
1742            .await
1743            .unwrap();
1744        assert_eq!(resp.status, StatusCode::OK);
1745        let xml = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
1746        assert!(
1747            xml.contains("<FunctionOutput></FunctionOutput>"),
1748            "expected empty output, got {xml}"
1749        );
1750        assert!(
1751            xml.contains("ERROR:") && xml.contains("limit"),
1752            "expected timeout/limit error in logs, got {xml}"
1753        );
1754        let cu_open = xml.find("<ComputeUtilization>").unwrap() + "<ComputeUtilization>".len();
1755        let cu_close = xml.find("</ComputeUtilization>").unwrap();
1756        let pct: u32 = xml[cu_open..cu_close].parse().unwrap();
1757        assert!(pct > 100, "expected pct > 100 after kill, got {pct}");
1758    }
1759
1760    fn req_q(method: http::Method, path: &str, query: &str) -> AwsRequest {
1761        let mut r = req(method, path, "", None);
1762        r.raw_query = query.to_string();
1763        r
1764    }
1765
1766    #[tokio::test]
1767    async fn get_function_live_stage_returns_published_code_not_dev() {
1768        // Finding #3: after PublishFunction freezes the LIVE snapshot, a
1769        // subsequent UpdateFunction mutates DEVELOPMENT only. GetFunction
1770        // must serve DEV working copy for DEVELOPMENT and the frozen
1771        // snapshot for LIVE.
1772        let svc = svc();
1773        let e1 = create_function(&svc, "stage-fn", "CODE_V1").await;
1774        let e2 = publish_function(&svc, "stage-fn", &e1).await;
1775        update_function(&svc, "stage-fn", "CODE_V2", &e2).await;
1776
1777        let live = svc
1778            .handle(req_q(
1779                http::Method::GET,
1780                "/2020-05-31/function/stage-fn",
1781                "Stage=LIVE",
1782            ))
1783            .await
1784            .unwrap();
1785        let live_code = String::from_utf8(live.body.expect_bytes().to_vec()).unwrap();
1786        assert_eq!(live_code, "CODE_V1", "LIVE stage must serve published code");
1787
1788        let dev = svc
1789            .handle(req_q(
1790                http::Method::GET,
1791                "/2020-05-31/function/stage-fn",
1792                "Stage=DEVELOPMENT",
1793            ))
1794            .await
1795            .unwrap();
1796        let dev_code = String::from_utf8(dev.body.expect_bytes().to_vec()).unwrap();
1797        assert_eq!(dev_code, "CODE_V2", "DEVELOPMENT stage serves working copy");
1798    }
1799
1800    #[tokio::test]
1801    async fn function_config_round_trips_key_value_store_associations() {
1802        // Finding #4: KeyValueStoreAssociations survive Create and are echoed
1803        // on Describe.
1804        let svc = svc();
1805        let arn = "arn:aws:cloudfront::123456789012:key-value-store/kvs-1";
1806        let code_b64 = base64::engine::general_purpose::STANDARD.encode(b"CODE");
1807        let body = format!(
1808            r#"<?xml version="1.0"?>
1809<CreateFunctionRequest xmlns="{NS}">
1810  <Name>kvs-fn</Name>
1811  <FunctionConfig>
1812    <Comment>t</Comment>
1813    <Runtime>cloudfront-js-2.0</Runtime>
1814    <KeyValueStoreAssociations>
1815      <Quantity>1</Quantity>
1816      <Items>
1817        <KeyValueStoreAssociation>
1818          <KeyValueStoreARN>{arn}</KeyValueStoreARN>
1819        </KeyValueStoreAssociation>
1820      </Items>
1821    </KeyValueStoreAssociations>
1822  </FunctionConfig>
1823  <FunctionCode>{code_b64}</FunctionCode>
1824</CreateFunctionRequest>"#
1825        );
1826        let create = svc
1827            .handle(req(http::Method::POST, "/2020-05-31/function", &body, None))
1828            .await
1829            .unwrap();
1830        assert_eq!(create.status, StatusCode::CREATED);
1831        let create_xml = std::str::from_utf8(create.body.expect_bytes()).unwrap();
1832        assert!(
1833            create_xml.contains(&format!("<KeyValueStoreARN>{arn}</KeyValueStoreARN>")),
1834            "create response dropped KVS association: {create_xml}"
1835        );
1836
1837        let describe = svc
1838            .handle(req(
1839                http::Method::GET,
1840                "/2020-05-31/function/kvs-fn/describe",
1841                "",
1842                None,
1843            ))
1844            .await
1845            .unwrap();
1846        let describe_xml = std::str::from_utf8(describe.body.expect_bytes()).unwrap();
1847        assert!(
1848            describe_xml.contains(&format!("<KeyValueStoreARN>{arn}</KeyValueStoreARN>")),
1849            "describe response dropped KVS association: {describe_xml}"
1850        );
1851        assert!(describe_xml.contains("<Quantity>1</Quantity>"));
1852    }
1853}