Skip to main content

fakecloud_cloudfront/
cfunctions_service.rs

1// Handlers for CloudFront ConnectionFunction ops (8 ops). Mirrors the
2// regular CloudFront Functions lifecycle: create -> describe/get ->
3// update -> publish -> attach. Code blob is base64-encoded on the
4// wire, returned raw for GetConnectionFunction.
5
6use base64::Engine;
7use chrono::Utc;
8use http::header::ETAG;
9use http::{HeaderMap, StatusCode};
10
11use fakecloud_core::service::{AwsRequest, AwsResponse, AwsServiceError, ResponseBody};
12
13use crate::cfunctions::StoredConnectionFunction;
14use crate::policies::{
15    not_found, precondition_failed, require_if_match, rfc3339, route_id, xml_with_etag,
16};
17use crate::router::Route;
18use crate::service::{
19    aws_error, esc, generate_id_with_prefix, invalid_argument, xml_response, CloudFrontService,
20    DEFAULT_ACCOUNT,
21};
22use crate::xml_io;
23
24const NS: &str = crate::NAMESPACE;
25const XML_DECL: &str = r#"<?xml version="1.0" encoding="UTF-8"?>"#;
26
27#[derive(Debug, Default, serde::Deserialize)]
28#[serde(rename_all = "PascalCase")]
29struct CreateConnectionFunctionRequest {
30    pub name: String,
31    pub connection_function_config: ConnectionFunctionConfigInput,
32    pub connection_function_code: String,
33}
34
35#[derive(Debug, Default, serde::Deserialize)]
36#[serde(rename_all = "PascalCase")]
37struct UpdateConnectionFunctionRequest {
38    pub connection_function_config: ConnectionFunctionConfigInput,
39    pub connection_function_code: String,
40}
41
42#[derive(Debug, Default, serde::Deserialize)]
43#[serde(rename_all = "PascalCase")]
44struct ConnectionFunctionConfigInput {
45    pub comment: String,
46    pub runtime: String,
47}
48
49impl CloudFrontService {
50    pub(crate) fn create_connection_function(
51        &self,
52        req: &AwsRequest,
53    ) -> Result<AwsResponse, AwsServiceError> {
54        let parsed: CreateConnectionFunctionRequest =
55            xml_io::from_xml_root(&req.body).map_err(|e| {
56                invalid_argument(format!("invalid CreateConnectionFunctionRequest XML: {e}"))
57            })?;
58        if parsed.name.is_empty() {
59            return Err(invalid_argument("Name is required"));
60        }
61        let mut state = self.state.write();
62        let account = state
63            .accounts
64            .entry(DEFAULT_ACCOUNT.to_string())
65            .or_default();
66        if account.connection_functions.contains_key(&parsed.name) {
67            return Err(aws_error(
68                StatusCode::CONFLICT,
69                "EntityAlreadyExists",
70                format!("ConnectionFunction {} already exists", parsed.name),
71            ));
72        }
73        let now = Utc::now();
74        let etag = generate_id_with_prefix("E");
75        let id = generate_id_with_prefix("CF");
76        let arn = format!(
77            "arn:aws:cloudfront::{}:connection-function/{}",
78            DEFAULT_ACCOUNT, parsed.name
79        );
80        let code = base64::engine::general_purpose::STANDARD
81            .decode(parsed.connection_function_code.trim())
82            .unwrap_or_else(|_| parsed.connection_function_code.into_bytes());
83        let stored = StoredConnectionFunction {
84            id,
85            name: parsed.name.clone(),
86            arn,
87            stage: "DEVELOPMENT".to_string(),
88            status: "UNPUBLISHED".to_string(),
89            runtime: parsed.connection_function_config.runtime,
90            comment: parsed.connection_function_config.comment,
91            code,
92            live_code: None,
93            etag: etag.clone(),
94            created_time: now,
95            last_modified_time: now,
96        };
97        account
98            .connection_functions
99            .insert(parsed.name.clone(), stored.clone());
100        drop(state);
101        let body = render_connection_function_summary(&stored, true);
102        Ok(xml_with_etag(StatusCode::CREATED, body, &etag, None))
103    }
104
105    pub(crate) fn describe_connection_function(
106        &self,
107        route: &Route,
108    ) -> Result<AwsResponse, AwsServiceError> {
109        let name = route_id(route, "ConnectionFunction")?;
110        let state = self.state.read();
111        let f = state
112            .accounts
113            .get(DEFAULT_ACCOUNT)
114            .and_then(|a| a.connection_functions.get(&name).cloned())
115            .ok_or_else(|| not_found("ConnectionFunction", &name))?;
116        drop(state);
117        let body = render_connection_function_summary(&f, true);
118        Ok(xml_with_etag(StatusCode::OK, body, &f.etag, None))
119    }
120
121    pub(crate) fn get_connection_function(
122        &self,
123        route: &Route,
124    ) -> Result<AwsResponse, AwsServiceError> {
125        let name = route_id(route, "ConnectionFunction")?;
126        let state = self.state.read();
127        let f = state
128            .accounts
129            .get(DEFAULT_ACCOUNT)
130            .and_then(|a| a.connection_functions.get(&name).cloned())
131            .ok_or_else(|| not_found("ConnectionFunction", &name))?;
132        drop(state);
133        let mut headers = HeaderMap::new();
134        if let Ok(v) = http::HeaderValue::from_str(&f.etag) {
135            headers.insert(ETAG, v);
136        }
137        Ok(AwsResponse {
138            status: StatusCode::OK,
139            headers,
140            content_type: "application/octet-stream".to_string(),
141            body: ResponseBody::Bytes(bytes::Bytes::from(f.code.clone())),
142        })
143    }
144
145    pub(crate) fn update_connection_function(
146        &self,
147        req: &AwsRequest,
148        route: &Route,
149    ) -> Result<AwsResponse, AwsServiceError> {
150        let name = route_id(route, "ConnectionFunction")?;
151        let if_match = require_if_match(req)?;
152        let parsed: UpdateConnectionFunctionRequest =
153            xml_io::from_xml_root(&req.body).map_err(|e| {
154                invalid_argument(format!("invalid UpdateConnectionFunctionRequest XML: {e}"))
155            })?;
156        let mut state = self.state.write();
157        let account = state
158            .accounts
159            .get_mut(DEFAULT_ACCOUNT)
160            .ok_or_else(|| not_found("ConnectionFunction", &name))?;
161        let f = account
162            .connection_functions
163            .get_mut(&name)
164            .ok_or_else(|| not_found("ConnectionFunction", &name))?;
165        if f.etag != if_match {
166            return Err(precondition_failed());
167        }
168        f.runtime = parsed.connection_function_config.runtime;
169        f.comment = parsed.connection_function_config.comment;
170        f.code = base64::engine::general_purpose::STANDARD
171            .decode(parsed.connection_function_code.trim())
172            .unwrap_or_else(|_| parsed.connection_function_code.into_bytes());
173        f.etag = generate_id_with_prefix("E");
174        f.last_modified_time = Utc::now();
175        f.status = "UNPUBLISHED".to_string();
176        f.stage = "DEVELOPMENT".to_string();
177        let snap = f.clone();
178        drop(state);
179        let body = render_connection_function_summary(&snap, true);
180        Ok(xml_with_etag(StatusCode::OK, body, &snap.etag, None))
181    }
182
183    pub(crate) fn delete_connection_function(
184        &self,
185        req: &AwsRequest,
186        route: &Route,
187    ) -> Result<AwsResponse, AwsServiceError> {
188        let name = route_id(route, "ConnectionFunction")?;
189        let if_match = require_if_match(req)?;
190        let mut state = self.state.write();
191        let account = state
192            .accounts
193            .get_mut(DEFAULT_ACCOUNT)
194            .ok_or_else(|| not_found("ConnectionFunction", &name))?;
195        let f = account
196            .connection_functions
197            .get(&name)
198            .ok_or_else(|| not_found("ConnectionFunction", &name))?;
199        if f.etag != if_match {
200            return Err(precondition_failed());
201        }
202        account.connection_functions.remove(&name);
203        drop(state);
204        Ok(crate::policies::empty(StatusCode::NO_CONTENT))
205    }
206
207    pub(crate) fn list_connection_functions(
208        &self,
209        _req: &AwsRequest,
210    ) -> Result<AwsResponse, AwsServiceError> {
211        let state = self.state.read();
212        let mut items: Vec<StoredConnectionFunction> = state
213            .accounts
214            .get(DEFAULT_ACCOUNT)
215            .map(|a| a.connection_functions.values().cloned().collect())
216            .unwrap_or_default();
217        drop(state);
218        items.sort_by(|a, b| a.name.cmp(&b.name));
219        let mut body = String::with_capacity(512);
220        body.push_str(XML_DECL);
221        body.push_str(&format!("<ListConnectionFunctionsResult xmlns=\"{NS}\">"));
222        body.push_str("<NextMarker></NextMarker>");
223        body.push_str("<ConnectionFunctions>");
224        for f in &items {
225            body.push_str(&render_connection_function_summary_inner(f));
226        }
227        body.push_str("</ConnectionFunctions>");
228        body.push_str("</ListConnectionFunctionsResult>");
229        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
230    }
231
232    pub(crate) fn publish_connection_function(
233        &self,
234        req: &AwsRequest,
235        route: &Route,
236    ) -> Result<AwsResponse, AwsServiceError> {
237        let name = route_id(route, "ConnectionFunction")?;
238        let if_match = require_if_match(req)?;
239        let mut state = self.state.write();
240        let account = state
241            .accounts
242            .get_mut(DEFAULT_ACCOUNT)
243            .ok_or_else(|| not_found("ConnectionFunction", &name))?;
244        let f = account
245            .connection_functions
246            .get_mut(&name)
247            .ok_or_else(|| not_found("ConnectionFunction", &name))?;
248        if f.etag != if_match {
249            return Err(precondition_failed());
250        }
251        f.status = "DEPLOYED".to_string();
252        f.stage = "LIVE".to_string();
253        f.last_modified_time = Utc::now();
254        // Freeze the current development bytes as the LIVE snapshot so
255        // TestConnectionFunction(Stage=LIVE) keeps the published
256        // behaviour stable across subsequent UpdateConnectionFunction
257        // calls.
258        f.live_code = Some(f.code.clone());
259        let snap = f.clone();
260        drop(state);
261        let body = render_connection_function_summary(&snap, true);
262        Ok(xml_with_etag(StatusCode::OK, body, &snap.etag, None))
263    }
264
265    pub(crate) fn test_connection_function(
266        &self,
267        req: &AwsRequest,
268        route: &Route,
269    ) -> Result<AwsResponse, AwsServiceError> {
270        let name = route_id(route, "ConnectionFunction")?;
271        let if_match = require_if_match(req)?;
272        let parsed: TestConnectionFunctionRequest =
273            xml_io::from_xml_root(&req.body).map_err(|e| {
274                invalid_argument(format!("invalid TestConnectionFunctionRequest XML: {e}"))
275            })?;
276        let event_bytes = base64::engine::general_purpose::STANDARD
277            .decode(parsed.connection_object.trim().as_bytes())
278            .map_err(|e| invalid_argument(format!("ConnectionObject is not valid base64: {e}")))?;
279        let state = self.state.read();
280        let f = state
281            .accounts
282            .get(DEFAULT_ACCOUNT)
283            .and_then(|a| a.connection_functions.get(&name).cloned())
284            .ok_or_else(|| {
285                aws_error(
286                    StatusCode::NOT_FOUND,
287                    "EntityNotFound",
288                    format!("ConnectionFunction {name} does not exist"),
289                )
290            })?;
291        drop(state);
292        if f.etag != if_match {
293            return Err(precondition_failed());
294        }
295        // Pick the stored bytes that match the requested stage.
296        // DEVELOPMENT (the default) is the latest body; LIVE reads the
297        // snapshot taken at PublishConnectionFunction. We fall back to
298        // `f.code` when no LIVE snapshot exists so unpublished
299        // functions are still testable against Stage=LIVE.
300        let stage = parsed.stage.as_deref().unwrap_or("DEVELOPMENT");
301        let source: &[u8] = if stage.eq_ignore_ascii_case("LIVE") {
302            f.live_code.as_deref().unwrap_or(&f.code)
303        } else {
304            &f.code
305        };
306        let code = std::str::from_utf8(source)
307            .map_err(|e| invalid_argument(format!("function code is not valid UTF-8: {e}")))?;
308        let exec = crate::js_runtime::run_handler(code, &event_bytes);
309
310        let mut body = String::with_capacity(1024);
311        body.push_str(XML_DECL);
312        body.push_str(&format!("<ConnectionFunctionTestResult xmlns=\"{NS}\">"));
313        body.push_str(&render_connection_function_summary_inner(&f));
314        body.push_str(&format!(
315            "<ComputeUtilization>{}</ComputeUtilization>",
316            exec.compute_utilization
317        ));
318        body.push_str("<ConnectionFunctionExecutionLogs>");
319        for line in &exec.logs {
320            body.push_str(&format!("<member>{}</member>", esc(line)));
321        }
322        body.push_str("</ConnectionFunctionExecutionLogs>");
323        body.push_str(&format!(
324            "<ConnectionFunctionErrorMessage>{}</ConnectionFunctionErrorMessage>",
325            esc(exec.error.as_deref().unwrap_or(""))
326        ));
327        body.push_str(&format!(
328            "<ConnectionFunctionOutput>{}</ConnectionFunctionOutput>",
329            esc(exec.output.as_deref().unwrap_or(""))
330        ));
331        body.push_str("</ConnectionFunctionTestResult>");
332        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
333    }
334}
335
336#[derive(Debug, Default, serde::Deserialize)]
337#[serde(rename_all = "PascalCase")]
338struct TestConnectionFunctionRequest {
339    #[serde(default)]
340    connection_object: String,
341    #[serde(default)]
342    stage: Option<String>,
343}
344
345fn render_connection_function_summary(f: &StoredConnectionFunction, with_decl: bool) -> String {
346    let mut out = String::with_capacity(512);
347    if with_decl {
348        out.push_str(XML_DECL);
349    }
350    out.push_str(&format!("<ConnectionFunctionSummary xmlns=\"{NS}\">"));
351    push_summary_body(&mut out, f);
352    out.push_str("</ConnectionFunctionSummary>");
353    out
354}
355
356fn render_connection_function_summary_inner(f: &StoredConnectionFunction) -> String {
357    let mut out = String::with_capacity(512);
358    out.push_str("<ConnectionFunctionSummary>");
359    push_summary_body(&mut out, f);
360    out.push_str("</ConnectionFunctionSummary>");
361    out
362}
363
364fn push_summary_body(out: &mut String, f: &StoredConnectionFunction) {
365    out.push_str(&format!("<Name>{}</Name>", esc(&f.name)));
366    out.push_str(&format!("<Id>{}</Id>", esc(&f.id)));
367    out.push_str(&format!("<Status>{}</Status>", esc(&f.status)));
368    out.push_str(&format!(
369        "<ConnectionFunctionArn>{}</ConnectionFunctionArn>",
370        esc(&f.arn)
371    ));
372    out.push_str(&format!("<Stage>{}</Stage>", esc(&f.stage)));
373    out.push_str(&format!(
374        "<CreatedTime>{}</CreatedTime>",
375        rfc3339(&f.created_time)
376    ));
377    out.push_str(&format!(
378        "<LastModifiedTime>{}</LastModifiedTime>",
379        rfc3339(&f.last_modified_time)
380    ));
381    out.push_str("<ConnectionFunctionConfig>");
382    out.push_str(&format!("<Comment>{}</Comment>", esc(&f.comment)));
383    out.push_str(&format!("<Runtime>{}</Runtime>", esc(&f.runtime)));
384    out.push_str("</ConnectionFunctionConfig>");
385}
386
387#[cfg(test)]
388mod tests {
389    use super::*;
390    use crate::service::CloudFrontService;
391    use crate::state::CloudFrontAccounts;
392    use bytes::Bytes;
393    use fakecloud_core::service::AwsService;
394    use http::HeaderValue;
395    use parking_lot::RwLock;
396    use std::sync::Arc;
397
398    fn svc() -> CloudFrontService {
399        CloudFrontService::new(Arc::new(RwLock::new(CloudFrontAccounts::new())))
400    }
401
402    fn req(method: http::Method, path: &str, body: &str, if_match: Option<&str>) -> AwsRequest {
403        let mut headers = HeaderMap::new();
404        if let Some(v) = if_match {
405            headers.insert(http::header::IF_MATCH, HeaderValue::from_str(v).unwrap());
406        }
407        AwsRequest {
408            service: "cloudfront".into(),
409            action: String::new(),
410            region: "us-east-1".into(),
411            account_id: DEFAULT_ACCOUNT.into(),
412            request_id: uuid::Uuid::new_v4().to_string(),
413            headers,
414            query_params: std::collections::HashMap::new(),
415            body_stream: parking_lot::Mutex::new(None),
416            body: Bytes::from(body.to_string()),
417            path_segments: path
418                .split('/')
419                .filter(|s| !s.is_empty())
420                .map(String::from)
421                .collect(),
422            raw_path: path.into(),
423            raw_query: String::new(),
424            method,
425            is_query_protocol: false,
426            access_key_id: None,
427            principal: None,
428        }
429    }
430
431    async fn create_cfn(svc: &CloudFrontService, name: &str, code: &str) -> String {
432        let code_b64 = base64::engine::general_purpose::STANDARD.encode(code.as_bytes());
433        let body = format!(
434            r#"<?xml version="1.0"?>
435<CreateConnectionFunctionRequest xmlns="{NS}">
436  <Name>{name}</Name>
437  <ConnectionFunctionConfig>
438    <Comment>t</Comment>
439    <Runtime>cloudfront-js-2.0</Runtime>
440  </ConnectionFunctionConfig>
441  <ConnectionFunctionCode>{code_b64}</ConnectionFunctionCode>
442</CreateConnectionFunctionRequest>"#
443        );
444        let resp = svc
445            .handle(req(
446                http::Method::POST,
447                "/2020-05-31/connection-function",
448                &body,
449                None,
450            ))
451            .await
452            .unwrap();
453        assert_eq!(resp.status, StatusCode::CREATED);
454        resp.headers
455            .get(ETAG)
456            .unwrap()
457            .to_str()
458            .unwrap()
459            .to_string()
460    }
461
462    fn test_cfn_request_xml(event_json: &str) -> String {
463        test_cfn_request_xml_with_stage(event_json, "DEVELOPMENT")
464    }
465
466    fn test_cfn_request_xml_with_stage(event_json: &str, stage: &str) -> String {
467        let event_b64 = base64::engine::general_purpose::STANDARD.encode(event_json.as_bytes());
468        format!(
469            r#"<?xml version="1.0"?>
470<TestConnectionFunctionRequest xmlns="{NS}">
471  <Stage>{stage}</Stage>
472  <ConnectionObject>{event_b64}</ConnectionObject>
473</TestConnectionFunctionRequest>"#
474        )
475    }
476
477    async fn update_cfn(svc: &CloudFrontService, name: &str, code: &str, if_match: &str) -> String {
478        let code_b64 = base64::engine::general_purpose::STANDARD.encode(code.as_bytes());
479        let body = format!(
480            r#"<?xml version="1.0"?>
481<UpdateConnectionFunctionRequest xmlns="{NS}">
482  <ConnectionFunctionConfig>
483    <Comment>t</Comment>
484    <Runtime>cloudfront-js-2.0</Runtime>
485  </ConnectionFunctionConfig>
486  <ConnectionFunctionCode>{code_b64}</ConnectionFunctionCode>
487</UpdateConnectionFunctionRequest>"#
488        );
489        let resp = svc
490            .handle(req(
491                http::Method::PUT,
492                &format!("/2020-05-31/connection-function/{name}"),
493                &body,
494                Some(if_match),
495            ))
496            .await
497            .unwrap();
498        assert_eq!(resp.status, StatusCode::OK);
499        resp.headers
500            .get(ETAG)
501            .unwrap()
502            .to_str()
503            .unwrap()
504            .to_string()
505    }
506
507    async fn publish_cfn(svc: &CloudFrontService, name: &str, if_match: &str) -> String {
508        let resp = svc
509            .handle(req(
510                http::Method::POST,
511                &format!("/2020-05-31/connection-function/{name}/publish"),
512                "",
513                Some(if_match),
514            ))
515            .await
516            .unwrap();
517        assert_eq!(resp.status, StatusCode::OK);
518        resp.headers
519            .get(ETAG)
520            .unwrap()
521            .to_str()
522            .unwrap()
523            .to_string()
524    }
525
526    #[tokio::test]
527    async fn test_connection_function_executes_handler_and_returns_result() {
528        let svc = svc();
529        let etag = create_cfn(
530            &svc,
531            "cfn-ok",
532            r#"function handler(event) { event.headers.x = "y"; return event; }"#,
533        )
534        .await;
535        let body = test_cfn_request_xml(r#"{"headers":{}}"#);
536        let resp = svc
537            .handle(req(
538                http::Method::POST,
539                "/2020-05-31/connection-function/cfn-ok/test",
540                &body,
541                Some(&etag),
542            ))
543            .await
544            .unwrap();
545        assert_eq!(resp.status, StatusCode::OK);
546        let xml = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
547        assert!(
548            xml.contains("&quot;x&quot;:&quot;y&quot;"),
549            "expected x:y in output, got {xml}"
550        );
551        assert!(
552            xml.contains("<ConnectionFunctionErrorMessage></ConnectionFunctionErrorMessage>"),
553            "expected empty error, got {xml}"
554        );
555    }
556
557    #[tokio::test]
558    async fn test_connection_function_propagates_js_error_into_message() {
559        let svc = svc();
560        let etag = create_cfn(
561            &svc,
562            "cfn-err",
563            r#"function handler() { throw new Error("boom"); }"#,
564        )
565        .await;
566        let body = test_cfn_request_xml("{}");
567        let resp = svc
568            .handle(req(
569                http::Method::POST,
570                "/2020-05-31/connection-function/cfn-err/test",
571                &body,
572                Some(&etag),
573            ))
574            .await
575            .unwrap();
576        assert_eq!(resp.status, StatusCode::OK);
577        let xml = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
578        assert!(xml.contains("boom"), "expected boom, got {xml}");
579        assert!(xml.contains("<ConnectionFunctionOutput></ConnectionFunctionOutput>"));
580    }
581
582    #[tokio::test]
583    async fn test_connection_function_unknown_name_returns_error() {
584        let svc = svc();
585        let body = test_cfn_request_xml("{}");
586        let err = match svc
587            .handle(req(
588                http::Method::POST,
589                "/2020-05-31/connection-function/missing/test",
590                &body,
591                Some("E0"),
592            ))
593            .await
594        {
595            Err(e) => e,
596            Ok(_) => panic!("expected EntityNotFound, got Ok"),
597        };
598        assert_eq!(err.status(), StatusCode::NOT_FOUND);
599        assert_eq!(err.code(), "EntityNotFound");
600    }
601
602    #[tokio::test]
603    async fn test_connection_function_logs_error_and_marks_compute_over_100() {
604        let svc = svc();
605        let etag = create_cfn(
606            &svc,
607            "cfn-throws",
608            r#"function handler() { throw new Error("kaboom"); }"#,
609        )
610        .await;
611        let body = test_cfn_request_xml("{}");
612        let resp = svc
613            .handle(req(
614                http::Method::POST,
615                "/2020-05-31/connection-function/cfn-throws/test",
616                &body,
617                Some(&etag),
618            ))
619            .await
620            .unwrap();
621        assert_eq!(resp.status, StatusCode::OK);
622        let xml = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
623        assert!(xml.contains("kaboom"), "expected kaboom, got {xml}");
624        assert!(
625            xml.contains("<ConnectionFunctionExecutionLogs>") && xml.contains("ERROR: "),
626            "expected error log, got {xml}"
627        );
628        let cu_open = xml.find("<ComputeUtilization>").unwrap() + "<ComputeUtilization>".len();
629        let cu_close = xml.find("</ComputeUtilization>").unwrap();
630        let pct: u32 = xml[cu_open..cu_close].parse().unwrap();
631        assert!(pct > 100, "expected pct > 100 on error, got {pct}");
632    }
633
634    #[tokio::test]
635    async fn test_connection_function_stage_selects_published_or_development_code() {
636        // ConnectionFunctions mirror Functions: PublishConnectionFunction
637        // freezes a LIVE snapshot, UpdateConnectionFunction mutates only
638        // DEVELOPMENT, and TestConnectionFunction picks the matching
639        // version per Stage.
640        let svc = svc();
641        let etag = create_cfn(&svc, "cfn-stage", r#"function handler() { return "v1"; }"#).await;
642        let pub_etag = publish_cfn(&svc, "cfn-stage", &etag).await;
643        let new_etag = update_cfn(
644            &svc,
645            "cfn-stage",
646            r#"function handler() { return "v2"; }"#,
647            &pub_etag,
648        )
649        .await;
650
651        let dev_body = test_cfn_request_xml_with_stage("{}", "DEVELOPMENT");
652        let dev_resp = svc
653            .handle(req(
654                http::Method::POST,
655                "/2020-05-31/connection-function/cfn-stage/test",
656                &dev_body,
657                Some(&new_etag),
658            ))
659            .await
660            .unwrap();
661        let dev_xml = std::str::from_utf8(dev_resp.body.expect_bytes()).unwrap();
662        assert!(
663            dev_xml.contains("&quot;v2&quot;"),
664            "DEVELOPMENT should run latest update (v2), got {dev_xml}"
665        );
666
667        let live_body = test_cfn_request_xml_with_stage("{}", "LIVE");
668        let live_resp = svc
669            .handle(req(
670                http::Method::POST,
671                "/2020-05-31/connection-function/cfn-stage/test",
672                &live_body,
673                Some(&new_etag),
674            ))
675            .await
676            .unwrap();
677        let live_xml = std::str::from_utf8(live_resp.body.expect_bytes()).unwrap();
678        assert!(
679            live_xml.contains("&quot;v1&quot;"),
680            "LIVE should run published snapshot (v1), got {live_xml}"
681        );
682    }
683
684    #[tokio::test]
685    async fn test_connection_function_infinite_loop_is_killed() {
686        let svc = svc();
687        let etag = create_cfn(&svc, "cfn-loop", r#"function handler() { while(1){} }"#).await;
688        let body = test_cfn_request_xml("{}");
689        let resp = svc
690            .handle(req(
691                http::Method::POST,
692                "/2020-05-31/connection-function/cfn-loop/test",
693                &body,
694                Some(&etag),
695            ))
696            .await
697            .unwrap();
698        assert_eq!(resp.status, StatusCode::OK);
699        let xml = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
700        assert!(
701            xml.contains("<ConnectionFunctionOutput></ConnectionFunctionOutput>"),
702            "expected empty output after kill, got {xml}"
703        );
704        assert!(
705            xml.contains("ERROR:") && xml.contains("limit"),
706            "expected timeout/limit log, got {xml}"
707        );
708        let cu_open = xml.find("<ComputeUtilization>").unwrap() + "<ComputeUtilization>".len();
709        let cu_close = xml.find("</ComputeUtilization>").unwrap();
710        let pct: u32 = xml[cu_open..cu_close].parse().unwrap();
711        assert!(pct > 100, "expected pct > 100, got {pct}");
712    }
713}