Skip to main content

fakecloud_cloudfront/
cfunctions_service.rs

1// Handlers for CloudFront ConnectionFunction ops (8 ops). Mirrors the
2// regular CloudFront Functions lifecycle: create -> describe/get ->
3// update -> publish -> attach. Code blob is base64-encoded on the
4// wire, returned raw for GetConnectionFunction.
5
6use base64::Engine;
7use chrono::Utc;
8use http::header::ETAG;
9use http::{HeaderMap, StatusCode};
10
11use fakecloud_core::service::{AwsRequest, AwsResponse, AwsServiceError, ResponseBody};
12
13use crate::cfunctions::StoredConnectionFunction;
14use crate::policies::{
15    not_found, precondition_failed, require_if_match, rfc3339, route_id, xml_with_etag,
16};
17use crate::router::Route;
18use crate::service::{
19    aws_error, esc, extract_body_field, generate_id_with_prefix, invalid_argument, xml_response,
20    CloudFrontService, DEFAULT_ACCOUNT,
21};
22use crate::xml_io;
23
24const NS: &str = crate::NAMESPACE;
25const XML_DECL: &str = r#"<?xml version="1.0" encoding="UTF-8"?>"#;
26
27#[derive(Debug, Default, serde::Deserialize)]
28#[serde(rename_all = "PascalCase")]
29struct CreateConnectionFunctionRequest {
30    pub name: String,
31    pub connection_function_config: ConnectionFunctionConfigInput,
32    pub connection_function_code: String,
33}
34
35#[derive(Debug, Default, serde::Deserialize)]
36#[serde(rename_all = "PascalCase")]
37struct UpdateConnectionFunctionRequest {
38    pub connection_function_config: ConnectionFunctionConfigInput,
39    pub connection_function_code: String,
40}
41
42#[derive(Debug, Default, serde::Deserialize)]
43#[serde(rename_all = "PascalCase")]
44struct ConnectionFunctionConfigInput {
45    pub comment: String,
46    pub runtime: String,
47}
48
49impl CloudFrontService {
50    pub(crate) fn create_connection_function(
51        &self,
52        req: &AwsRequest,
53    ) -> Result<AwsResponse, AwsServiceError> {
54        let parsed: CreateConnectionFunctionRequest =
55            xml_io::from_xml_root(&req.body).map_err(|e| {
56                invalid_argument(format!("invalid CreateConnectionFunctionRequest XML: {e}"))
57            })?;
58        if parsed.name.is_empty() {
59            return Err(invalid_argument("Name is required"));
60        }
61        let mut state = self.state.write();
62        let account = state
63            .accounts
64            .entry(DEFAULT_ACCOUNT.to_string())
65            .or_default();
66        if account.connection_functions.contains_key(&parsed.name) {
67            return Err(aws_error(
68                StatusCode::CONFLICT,
69                "EntityAlreadyExists",
70                format!("ConnectionFunction {} already exists", parsed.name),
71            ));
72        }
73        let now = Utc::now();
74        let etag = generate_id_with_prefix("E");
75        let id = generate_id_with_prefix("CF");
76        let arn = format!(
77            "arn:aws:cloudfront::{}:connection-function/{}",
78            DEFAULT_ACCOUNT, parsed.name
79        );
80        let code = base64::engine::general_purpose::STANDARD
81            .decode(parsed.connection_function_code.trim())
82            .unwrap_or_else(|_| parsed.connection_function_code.into_bytes());
83        let stored = StoredConnectionFunction {
84            id,
85            name: parsed.name.clone(),
86            arn,
87            stage: "DEVELOPMENT".to_string(),
88            status: "UNPUBLISHED".to_string(),
89            runtime: parsed.connection_function_config.runtime,
90            comment: parsed.connection_function_config.comment,
91            code,
92            live_code: None,
93            etag: etag.clone(),
94            created_time: now,
95            last_modified_time: now,
96        };
97        account
98            .connection_functions
99            .insert(parsed.name.clone(), stored.clone());
100        drop(state);
101        let body = render_connection_function_summary(&stored, true);
102        Ok(xml_with_etag(StatusCode::CREATED, body, &etag, None))
103    }
104
105    pub(crate) fn describe_connection_function(
106        &self,
107        route: &Route,
108    ) -> Result<AwsResponse, AwsServiceError> {
109        let name = route_id(route, "ConnectionFunction")?;
110        let state = self.state.read();
111        let f = state
112            .accounts
113            .get(DEFAULT_ACCOUNT)
114            .and_then(|a| a.connection_functions.get(&name).cloned())
115            .ok_or_else(|| not_found("ConnectionFunction", &name))?;
116        drop(state);
117        let body = render_connection_function_summary(&f, true);
118        Ok(xml_with_etag(StatusCode::OK, body, &f.etag, None))
119    }
120
121    pub(crate) fn get_connection_function(
122        &self,
123        route: &Route,
124    ) -> Result<AwsResponse, AwsServiceError> {
125        let name = route_id(route, "ConnectionFunction")?;
126        let state = self.state.read();
127        let f = state
128            .accounts
129            .get(DEFAULT_ACCOUNT)
130            .and_then(|a| a.connection_functions.get(&name).cloned())
131            .ok_or_else(|| not_found("ConnectionFunction", &name))?;
132        drop(state);
133        let mut headers = HeaderMap::new();
134        if let Ok(v) = http::HeaderValue::from_str(&f.etag) {
135            headers.insert(ETAG, v);
136        }
137        Ok(AwsResponse {
138            status: StatusCode::OK,
139            headers,
140            content_type: "application/octet-stream".to_string(),
141            body: ResponseBody::Bytes(bytes::Bytes::from(f.code.clone())),
142        })
143    }
144
145    pub(crate) fn update_connection_function(
146        &self,
147        req: &AwsRequest,
148        route: &Route,
149    ) -> Result<AwsResponse, AwsServiceError> {
150        let name = route_id(route, "ConnectionFunction")?;
151        let if_match = require_if_match(req)?;
152        let parsed: UpdateConnectionFunctionRequest =
153            xml_io::from_xml_root(&req.body).map_err(|e| {
154                invalid_argument(format!("invalid UpdateConnectionFunctionRequest XML: {e}"))
155            })?;
156        let mut state = self.state.write();
157        let account = state
158            .accounts
159            .get_mut(DEFAULT_ACCOUNT)
160            .ok_or_else(|| not_found("ConnectionFunction", &name))?;
161        let f = account
162            .connection_functions
163            .get_mut(&name)
164            .ok_or_else(|| not_found("ConnectionFunction", &name))?;
165        if f.etag != if_match {
166            return Err(precondition_failed());
167        }
168        f.runtime = parsed.connection_function_config.runtime;
169        f.comment = parsed.connection_function_config.comment;
170        f.code = base64::engine::general_purpose::STANDARD
171            .decode(parsed.connection_function_code.trim())
172            .unwrap_or_else(|_| parsed.connection_function_code.into_bytes());
173        f.etag = generate_id_with_prefix("E");
174        f.last_modified_time = Utc::now();
175        f.status = "UNPUBLISHED".to_string();
176        f.stage = "DEVELOPMENT".to_string();
177        let snap = f.clone();
178        drop(state);
179        let body = render_connection_function_summary(&snap, true);
180        Ok(xml_with_etag(StatusCode::OK, body, &snap.etag, None))
181    }
182
183    pub(crate) fn delete_connection_function(
184        &self,
185        req: &AwsRequest,
186        route: &Route,
187    ) -> Result<AwsResponse, AwsServiceError> {
188        let name = route_id(route, "ConnectionFunction")?;
189        let if_match = require_if_match(req)?;
190        let mut state = self.state.write();
191        let account = state
192            .accounts
193            .get_mut(DEFAULT_ACCOUNT)
194            .ok_or_else(|| not_found("ConnectionFunction", &name))?;
195        let f = account
196            .connection_functions
197            .get(&name)
198            .ok_or_else(|| not_found("ConnectionFunction", &name))?;
199        if f.etag != if_match {
200            return Err(precondition_failed());
201        }
202        account.connection_functions.remove(&name);
203        drop(state);
204        Ok(crate::policies::empty(StatusCode::NO_CONTENT))
205    }
206
207    pub(crate) fn list_connection_functions(
208        &self,
209        req: &AwsRequest,
210    ) -> Result<AwsResponse, AwsServiceError> {
211        // ListConnectionFunctions accepts an optional Stage filter constrained
212        // to the FunctionStage enum (DEVELOPMENT | LIVE). Reject unknown
213        // values up-front so conformance negative variants surface an
214        // InvalidArgument (declared in the op's Smithy error list) instead of
215        // silently returning the full unfiltered list.
216        if let Some(stage) = extract_body_field(&req.body, "Stage") {
217            if stage != "DEVELOPMENT" && stage != "LIVE" {
218                return Err(crate::service::invalid_argument(format!(
219                    "Stage must be one of 'DEVELOPMENT' or 'LIVE', got '{stage}'"
220                )));
221            }
222        }
223        let state = self.state.read();
224        let mut items: Vec<StoredConnectionFunction> = state
225            .accounts
226            .get(DEFAULT_ACCOUNT)
227            .map(|a| a.connection_functions.values().cloned().collect())
228            .unwrap_or_default();
229        drop(state);
230        items.sort_by(|a, b| a.name.cmp(&b.name));
231        let mut body = String::with_capacity(512);
232        body.push_str(XML_DECL);
233        body.push_str(&format!("<ListConnectionFunctionsResult xmlns=\"{NS}\">"));
234        body.push_str("<NextMarker></NextMarker>");
235        body.push_str("<ConnectionFunctions>");
236        for f in &items {
237            body.push_str(&render_connection_function_summary_inner(f));
238        }
239        body.push_str("</ConnectionFunctions>");
240        body.push_str("</ListConnectionFunctionsResult>");
241        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
242    }
243
244    pub(crate) fn publish_connection_function(
245        &self,
246        req: &AwsRequest,
247        route: &Route,
248    ) -> Result<AwsResponse, AwsServiceError> {
249        let name = route_id(route, "ConnectionFunction")?;
250        let if_match = require_if_match(req)?;
251        let mut state = self.state.write();
252        let account = state
253            .accounts
254            .get_mut(DEFAULT_ACCOUNT)
255            .ok_or_else(|| not_found("ConnectionFunction", &name))?;
256        let f = account
257            .connection_functions
258            .get_mut(&name)
259            .ok_or_else(|| not_found("ConnectionFunction", &name))?;
260        if f.etag != if_match {
261            return Err(precondition_failed());
262        }
263        f.status = "DEPLOYED".to_string();
264        f.stage = "LIVE".to_string();
265        f.last_modified_time = Utc::now();
266        // Freeze the current development bytes as the LIVE snapshot so
267        // TestConnectionFunction(Stage=LIVE) keeps the published
268        // behaviour stable across subsequent UpdateConnectionFunction
269        // calls.
270        f.live_code = Some(f.code.clone());
271        let snap = f.clone();
272        drop(state);
273        let body = render_connection_function_summary(&snap, true);
274        Ok(xml_with_etag(StatusCode::OK, body, &snap.etag, None))
275    }
276
277    pub(crate) fn test_connection_function(
278        &self,
279        req: &AwsRequest,
280        route: &Route,
281    ) -> Result<AwsResponse, AwsServiceError> {
282        let name = route_id(route, "ConnectionFunction")?;
283        let if_match = require_if_match(req)?;
284        let parsed: TestConnectionFunctionRequest =
285            xml_io::from_xml_root(&req.body).map_err(|e| {
286                invalid_argument(format!("invalid TestConnectionFunctionRequest XML: {e}"))
287            })?;
288        let event_bytes = base64::engine::general_purpose::STANDARD
289            .decode(parsed.connection_object.trim().as_bytes())
290            .map_err(|e| invalid_argument(format!("ConnectionObject is not valid base64: {e}")))?;
291        let state = self.state.read();
292        let f = state
293            .accounts
294            .get(DEFAULT_ACCOUNT)
295            .and_then(|a| a.connection_functions.get(&name).cloned())
296            .ok_or_else(|| {
297                aws_error(
298                    StatusCode::NOT_FOUND,
299                    "EntityNotFound",
300                    format!("ConnectionFunction {name} does not exist"),
301                )
302            })?;
303        drop(state);
304        if f.etag != if_match {
305            return Err(precondition_failed());
306        }
307        // Pick the stored bytes that match the requested stage.
308        // DEVELOPMENT (the default) is the latest body; LIVE reads the
309        // snapshot taken at PublishConnectionFunction. We fall back to
310        // `f.code` when no LIVE snapshot exists so unpublished
311        // functions are still testable against Stage=LIVE.
312        let stage = parsed.stage.as_deref().unwrap_or("DEVELOPMENT");
313        let source: &[u8] = if stage.eq_ignore_ascii_case("LIVE") {
314            f.live_code.as_deref().unwrap_or(&f.code)
315        } else {
316            &f.code
317        };
318        let code = std::str::from_utf8(source)
319            .map_err(|e| invalid_argument(format!("function code is not valid UTF-8: {e}")))?;
320        let exec = crate::js_runtime::run_handler(code, &event_bytes);
321
322        let mut body = String::with_capacity(1024);
323        body.push_str(XML_DECL);
324        body.push_str(&format!("<ConnectionFunctionTestResult xmlns=\"{NS}\">"));
325        body.push_str(&render_connection_function_summary_inner(&f));
326        body.push_str(&format!(
327            "<ComputeUtilization>{}</ComputeUtilization>",
328            exec.compute_utilization
329        ));
330        body.push_str("<ConnectionFunctionExecutionLogs>");
331        for line in &exec.logs {
332            body.push_str(&format!("<member>{}</member>", esc(line)));
333        }
334        body.push_str("</ConnectionFunctionExecutionLogs>");
335        body.push_str(&format!(
336            "<ConnectionFunctionErrorMessage>{}</ConnectionFunctionErrorMessage>",
337            esc(exec.error.as_deref().unwrap_or(""))
338        ));
339        body.push_str(&format!(
340            "<ConnectionFunctionOutput>{}</ConnectionFunctionOutput>",
341            esc(exec.output.as_deref().unwrap_or(""))
342        ));
343        body.push_str("</ConnectionFunctionTestResult>");
344        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
345    }
346}
347
348#[derive(Debug, Default, serde::Deserialize)]
349#[serde(rename_all = "PascalCase")]
350struct TestConnectionFunctionRequest {
351    #[serde(default)]
352    connection_object: String,
353    #[serde(default)]
354    stage: Option<String>,
355}
356
357fn render_connection_function_summary(f: &StoredConnectionFunction, with_decl: bool) -> String {
358    let mut out = String::with_capacity(512);
359    if with_decl {
360        out.push_str(XML_DECL);
361    }
362    out.push_str(&format!("<ConnectionFunctionSummary xmlns=\"{NS}\">"));
363    push_summary_body(&mut out, f);
364    out.push_str("</ConnectionFunctionSummary>");
365    out
366}
367
368fn render_connection_function_summary_inner(f: &StoredConnectionFunction) -> String {
369    let mut out = String::with_capacity(512);
370    out.push_str("<ConnectionFunctionSummary>");
371    push_summary_body(&mut out, f);
372    out.push_str("</ConnectionFunctionSummary>");
373    out
374}
375
376fn push_summary_body(out: &mut String, f: &StoredConnectionFunction) {
377    out.push_str(&format!("<Name>{}</Name>", esc(&f.name)));
378    out.push_str(&format!("<Id>{}</Id>", esc(&f.id)));
379    out.push_str(&format!("<Status>{}</Status>", esc(&f.status)));
380    out.push_str(&format!(
381        "<ConnectionFunctionArn>{}</ConnectionFunctionArn>",
382        esc(&f.arn)
383    ));
384    out.push_str(&format!("<Stage>{}</Stage>", esc(&f.stage)));
385    out.push_str(&format!(
386        "<CreatedTime>{}</CreatedTime>",
387        rfc3339(&f.created_time)
388    ));
389    out.push_str(&format!(
390        "<LastModifiedTime>{}</LastModifiedTime>",
391        rfc3339(&f.last_modified_time)
392    ));
393    out.push_str("<ConnectionFunctionConfig>");
394    out.push_str(&format!("<Comment>{}</Comment>", esc(&f.comment)));
395    out.push_str(&format!("<Runtime>{}</Runtime>", esc(&f.runtime)));
396    out.push_str("</ConnectionFunctionConfig>");
397}
398
399#[cfg(test)]
400mod tests {
401    use super::*;
402    use crate::service::CloudFrontService;
403    use crate::state::CloudFrontAccounts;
404    use bytes::Bytes;
405    use fakecloud_core::service::AwsService;
406    use http::HeaderValue;
407    use parking_lot::RwLock;
408    use std::sync::Arc;
409
410    fn svc() -> CloudFrontService {
411        CloudFrontService::new(Arc::new(RwLock::new(CloudFrontAccounts::new())))
412    }
413
414    fn req(method: http::Method, path: &str, body: &str, if_match: Option<&str>) -> AwsRequest {
415        let mut headers = HeaderMap::new();
416        if let Some(v) = if_match {
417            headers.insert(http::header::IF_MATCH, HeaderValue::from_str(v).unwrap());
418        }
419        AwsRequest {
420            service: "cloudfront".into(),
421            action: String::new(),
422            region: "us-east-1".into(),
423            account_id: DEFAULT_ACCOUNT.into(),
424            request_id: uuid::Uuid::new_v4().to_string(),
425            headers,
426            query_params: std::collections::HashMap::new(),
427            body_stream: parking_lot::Mutex::new(None),
428            body: Bytes::from(body.to_string()),
429            path_segments: path
430                .split('/')
431                .filter(|s| !s.is_empty())
432                .map(String::from)
433                .collect(),
434            raw_path: path.into(),
435            raw_query: String::new(),
436            method,
437            is_query_protocol: false,
438            access_key_id: None,
439            principal: None,
440        }
441    }
442
443    async fn create_cfn(svc: &CloudFrontService, name: &str, code: &str) -> String {
444        let code_b64 = base64::engine::general_purpose::STANDARD.encode(code.as_bytes());
445        let body = format!(
446            r#"<?xml version="1.0"?>
447<CreateConnectionFunctionRequest xmlns="{NS}">
448  <Name>{name}</Name>
449  <ConnectionFunctionConfig>
450    <Comment>t</Comment>
451    <Runtime>cloudfront-js-2.0</Runtime>
452  </ConnectionFunctionConfig>
453  <ConnectionFunctionCode>{code_b64}</ConnectionFunctionCode>
454</CreateConnectionFunctionRequest>"#
455        );
456        let resp = svc
457            .handle(req(
458                http::Method::POST,
459                "/2020-05-31/connection-function",
460                &body,
461                None,
462            ))
463            .await
464            .unwrap();
465        assert_eq!(resp.status, StatusCode::CREATED);
466        resp.headers
467            .get(ETAG)
468            .unwrap()
469            .to_str()
470            .unwrap()
471            .to_string()
472    }
473
474    fn test_cfn_request_xml(event_json: &str) -> String {
475        test_cfn_request_xml_with_stage(event_json, "DEVELOPMENT")
476    }
477
478    fn test_cfn_request_xml_with_stage(event_json: &str, stage: &str) -> String {
479        let event_b64 = base64::engine::general_purpose::STANDARD.encode(event_json.as_bytes());
480        format!(
481            r#"<?xml version="1.0"?>
482<TestConnectionFunctionRequest xmlns="{NS}">
483  <Stage>{stage}</Stage>
484  <ConnectionObject>{event_b64}</ConnectionObject>
485</TestConnectionFunctionRequest>"#
486        )
487    }
488
489    async fn update_cfn(svc: &CloudFrontService, name: &str, code: &str, if_match: &str) -> String {
490        let code_b64 = base64::engine::general_purpose::STANDARD.encode(code.as_bytes());
491        let body = format!(
492            r#"<?xml version="1.0"?>
493<UpdateConnectionFunctionRequest xmlns="{NS}">
494  <ConnectionFunctionConfig>
495    <Comment>t</Comment>
496    <Runtime>cloudfront-js-2.0</Runtime>
497  </ConnectionFunctionConfig>
498  <ConnectionFunctionCode>{code_b64}</ConnectionFunctionCode>
499</UpdateConnectionFunctionRequest>"#
500        );
501        let resp = svc
502            .handle(req(
503                http::Method::PUT,
504                &format!("/2020-05-31/connection-function/{name}"),
505                &body,
506                Some(if_match),
507            ))
508            .await
509            .unwrap();
510        assert_eq!(resp.status, StatusCode::OK);
511        resp.headers
512            .get(ETAG)
513            .unwrap()
514            .to_str()
515            .unwrap()
516            .to_string()
517    }
518
519    async fn publish_cfn(svc: &CloudFrontService, name: &str, if_match: &str) -> String {
520        let resp = svc
521            .handle(req(
522                http::Method::POST,
523                &format!("/2020-05-31/connection-function/{name}/publish"),
524                "",
525                Some(if_match),
526            ))
527            .await
528            .unwrap();
529        assert_eq!(resp.status, StatusCode::OK);
530        resp.headers
531            .get(ETAG)
532            .unwrap()
533            .to_str()
534            .unwrap()
535            .to_string()
536    }
537
538    #[tokio::test]
539    async fn test_connection_function_executes_handler_and_returns_result() {
540        let svc = svc();
541        let etag = create_cfn(
542            &svc,
543            "cfn-ok",
544            r#"function handler(event) { event.headers.x = "y"; return event; }"#,
545        )
546        .await;
547        let body = test_cfn_request_xml(r#"{"headers":{}}"#);
548        let resp = svc
549            .handle(req(
550                http::Method::POST,
551                "/2020-05-31/connection-function/cfn-ok/test",
552                &body,
553                Some(&etag),
554            ))
555            .await
556            .unwrap();
557        assert_eq!(resp.status, StatusCode::OK);
558        let xml = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
559        assert!(
560            xml.contains("&quot;x&quot;:&quot;y&quot;"),
561            "expected x:y in output, got {xml}"
562        );
563        assert!(
564            xml.contains("<ConnectionFunctionErrorMessage></ConnectionFunctionErrorMessage>"),
565            "expected empty error, got {xml}"
566        );
567    }
568
569    #[tokio::test]
570    async fn test_connection_function_propagates_js_error_into_message() {
571        let svc = svc();
572        let etag = create_cfn(
573            &svc,
574            "cfn-err",
575            r#"function handler() { throw new Error("boom"); }"#,
576        )
577        .await;
578        let body = test_cfn_request_xml("{}");
579        let resp = svc
580            .handle(req(
581                http::Method::POST,
582                "/2020-05-31/connection-function/cfn-err/test",
583                &body,
584                Some(&etag),
585            ))
586            .await
587            .unwrap();
588        assert_eq!(resp.status, StatusCode::OK);
589        let xml = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
590        assert!(xml.contains("boom"), "expected boom, got {xml}");
591        assert!(xml.contains("<ConnectionFunctionOutput></ConnectionFunctionOutput>"));
592    }
593
594    #[tokio::test]
595    async fn test_connection_function_unknown_name_returns_error() {
596        let svc = svc();
597        let body = test_cfn_request_xml("{}");
598        let err = match svc
599            .handle(req(
600                http::Method::POST,
601                "/2020-05-31/connection-function/missing/test",
602                &body,
603                Some("E0"),
604            ))
605            .await
606        {
607            Err(e) => e,
608            Ok(_) => panic!("expected EntityNotFound, got Ok"),
609        };
610        assert_eq!(err.status(), StatusCode::NOT_FOUND);
611        assert_eq!(err.code(), "EntityNotFound");
612    }
613
614    #[tokio::test]
615    async fn test_connection_function_logs_error_and_marks_compute_over_100() {
616        let svc = svc();
617        let etag = create_cfn(
618            &svc,
619            "cfn-throws",
620            r#"function handler() { throw new Error("kaboom"); }"#,
621        )
622        .await;
623        let body = test_cfn_request_xml("{}");
624        let resp = svc
625            .handle(req(
626                http::Method::POST,
627                "/2020-05-31/connection-function/cfn-throws/test",
628                &body,
629                Some(&etag),
630            ))
631            .await
632            .unwrap();
633        assert_eq!(resp.status, StatusCode::OK);
634        let xml = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
635        assert!(xml.contains("kaboom"), "expected kaboom, got {xml}");
636        assert!(
637            xml.contains("<ConnectionFunctionExecutionLogs>") && xml.contains("ERROR: "),
638            "expected error log, got {xml}"
639        );
640        let cu_open = xml.find("<ComputeUtilization>").unwrap() + "<ComputeUtilization>".len();
641        let cu_close = xml.find("</ComputeUtilization>").unwrap();
642        let pct: u32 = xml[cu_open..cu_close].parse().unwrap();
643        assert!(pct > 100, "expected pct > 100 on error, got {pct}");
644    }
645
646    #[tokio::test]
647    async fn test_connection_function_stage_selects_published_or_development_code() {
648        // ConnectionFunctions mirror Functions: PublishConnectionFunction
649        // freezes a LIVE snapshot, UpdateConnectionFunction mutates only
650        // DEVELOPMENT, and TestConnectionFunction picks the matching
651        // version per Stage.
652        let svc = svc();
653        let etag = create_cfn(&svc, "cfn-stage", r#"function handler() { return "v1"; }"#).await;
654        let pub_etag = publish_cfn(&svc, "cfn-stage", &etag).await;
655        let new_etag = update_cfn(
656            &svc,
657            "cfn-stage",
658            r#"function handler() { return "v2"; }"#,
659            &pub_etag,
660        )
661        .await;
662
663        let dev_body = test_cfn_request_xml_with_stage("{}", "DEVELOPMENT");
664        let dev_resp = svc
665            .handle(req(
666                http::Method::POST,
667                "/2020-05-31/connection-function/cfn-stage/test",
668                &dev_body,
669                Some(&new_etag),
670            ))
671            .await
672            .unwrap();
673        let dev_xml = std::str::from_utf8(dev_resp.body.expect_bytes()).unwrap();
674        assert!(
675            dev_xml.contains("&quot;v2&quot;"),
676            "DEVELOPMENT should run latest update (v2), got {dev_xml}"
677        );
678
679        let live_body = test_cfn_request_xml_with_stage("{}", "LIVE");
680        let live_resp = svc
681            .handle(req(
682                http::Method::POST,
683                "/2020-05-31/connection-function/cfn-stage/test",
684                &live_body,
685                Some(&new_etag),
686            ))
687            .await
688            .unwrap();
689        let live_xml = std::str::from_utf8(live_resp.body.expect_bytes()).unwrap();
690        assert!(
691            live_xml.contains("&quot;v1&quot;"),
692            "LIVE should run published snapshot (v1), got {live_xml}"
693        );
694    }
695
696    #[tokio::test]
697    async fn test_connection_function_infinite_loop_is_killed() {
698        let svc = svc();
699        let etag = create_cfn(&svc, "cfn-loop", r#"function handler() { while(1){} }"#).await;
700        let body = test_cfn_request_xml("{}");
701        let resp = svc
702            .handle(req(
703                http::Method::POST,
704                "/2020-05-31/connection-function/cfn-loop/test",
705                &body,
706                Some(&etag),
707            ))
708            .await
709            .unwrap();
710        assert_eq!(resp.status, StatusCode::OK);
711        let xml = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
712        assert!(
713            xml.contains("<ConnectionFunctionOutput></ConnectionFunctionOutput>"),
714            "expected empty output after kill, got {xml}"
715        );
716        assert!(
717            xml.contains("ERROR:") && xml.contains("limit"),
718            "expected timeout/limit log, got {xml}"
719        );
720        let cu_open = xml.find("<ComputeUtilization>").unwrap() + "<ComputeUtilization>".len();
721        let cu_close = xml.find("</ComputeUtilization>").unwrap();
722        let pct: u32 = xml[cu_open..cu_close].parse().unwrap();
723        assert!(pct > 100, "expected pct > 100, got {pct}");
724    }
725}