1use base64::Engine;
7use chrono::Utc;
8use http::header::ETAG;
9use http::{HeaderMap, StatusCode};
10
11use fakecloud_core::service::{AwsRequest, AwsResponse, AwsServiceError, ResponseBody};
12
13use crate::cfunctions::StoredConnectionFunction;
14use crate::policies::{
15 not_found, precondition_failed, require_if_match, rfc3339, route_id, xml_with_etag,
16};
17use crate::router::Route;
18use crate::service::{
19 aws_error, esc, generate_id_with_prefix, invalid_argument, xml_response, CloudFrontService,
20 DEFAULT_ACCOUNT,
21};
22use crate::xml_io;
23
24const NS: &str = crate::NAMESPACE;
25const XML_DECL: &str = r#"<?xml version="1.0" encoding="UTF-8"?>"#;
26
27#[derive(Debug, Default, serde::Deserialize)]
28#[serde(rename_all = "PascalCase")]
29struct CreateConnectionFunctionRequest {
30 pub name: String,
31 pub connection_function_config: ConnectionFunctionConfigInput,
32 pub connection_function_code: String,
33}
34
35#[derive(Debug, Default, serde::Deserialize)]
36#[serde(rename_all = "PascalCase")]
37struct UpdateConnectionFunctionRequest {
38 pub connection_function_config: ConnectionFunctionConfigInput,
39 pub connection_function_code: String,
40}
41
42#[derive(Debug, Default, serde::Deserialize)]
43#[serde(rename_all = "PascalCase")]
44struct ConnectionFunctionConfigInput {
45 pub comment: String,
46 pub runtime: String,
47}
48
49impl CloudFrontService {
50 pub(crate) fn create_connection_function(
51 &self,
52 req: &AwsRequest,
53 ) -> Result<AwsResponse, AwsServiceError> {
54 let parsed: CreateConnectionFunctionRequest =
55 xml_io::from_xml_root(&req.body).map_err(|e| {
56 invalid_argument(format!("invalid CreateConnectionFunctionRequest XML: {e}"))
57 })?;
58 if parsed.name.is_empty() {
59 return Err(invalid_argument("Name is required"));
60 }
61 let mut state = self.state.write();
62 let account = state
63 .accounts
64 .entry(DEFAULT_ACCOUNT.to_string())
65 .or_default();
66 if account.connection_functions.contains_key(&parsed.name) {
67 return Err(aws_error(
68 StatusCode::CONFLICT,
69 "EntityAlreadyExists",
70 format!("ConnectionFunction {} already exists", parsed.name),
71 ));
72 }
73 let now = Utc::now();
74 let etag = generate_id_with_prefix("E");
75 let id = generate_id_with_prefix("CF");
76 let arn = format!(
77 "arn:aws:cloudfront::{}:connection-function/{}",
78 DEFAULT_ACCOUNT, parsed.name
79 );
80 let code = base64::engine::general_purpose::STANDARD
81 .decode(parsed.connection_function_code.trim())
82 .unwrap_or_else(|_| parsed.connection_function_code.into_bytes());
83 let stored = StoredConnectionFunction {
84 id,
85 name: parsed.name.clone(),
86 arn,
87 stage: "DEVELOPMENT".to_string(),
88 status: "UNPUBLISHED".to_string(),
89 runtime: parsed.connection_function_config.runtime,
90 comment: parsed.connection_function_config.comment,
91 code,
92 live_code: None,
93 etag: etag.clone(),
94 created_time: now,
95 last_modified_time: now,
96 };
97 account
98 .connection_functions
99 .insert(parsed.name.clone(), stored.clone());
100 drop(state);
101 let body = render_connection_function_summary(&stored, true);
102 Ok(xml_with_etag(StatusCode::CREATED, body, &etag, None))
103 }
104
105 pub(crate) fn describe_connection_function(
106 &self,
107 route: &Route,
108 ) -> Result<AwsResponse, AwsServiceError> {
109 let name = route_id(route, "ConnectionFunction")?;
110 let state = self.state.read();
111 let f = state
112 .accounts
113 .get(DEFAULT_ACCOUNT)
114 .and_then(|a| a.connection_functions.get(&name).cloned())
115 .ok_or_else(|| not_found("ConnectionFunction", &name))?;
116 drop(state);
117 let body = render_connection_function_summary(&f, true);
118 Ok(xml_with_etag(StatusCode::OK, body, &f.etag, None))
119 }
120
121 pub(crate) fn get_connection_function(
122 &self,
123 route: &Route,
124 ) -> Result<AwsResponse, AwsServiceError> {
125 let name = route_id(route, "ConnectionFunction")?;
126 let state = self.state.read();
127 let f = state
128 .accounts
129 .get(DEFAULT_ACCOUNT)
130 .and_then(|a| a.connection_functions.get(&name).cloned())
131 .ok_or_else(|| not_found("ConnectionFunction", &name))?;
132 drop(state);
133 let mut headers = HeaderMap::new();
134 if let Ok(v) = http::HeaderValue::from_str(&f.etag) {
135 headers.insert(ETAG, v);
136 }
137 Ok(AwsResponse {
138 status: StatusCode::OK,
139 headers,
140 content_type: "application/octet-stream".to_string(),
141 body: ResponseBody::Bytes(bytes::Bytes::from(f.code.clone())),
142 })
143 }
144
145 pub(crate) fn update_connection_function(
146 &self,
147 req: &AwsRequest,
148 route: &Route,
149 ) -> Result<AwsResponse, AwsServiceError> {
150 let name = route_id(route, "ConnectionFunction")?;
151 let if_match = require_if_match(req)?;
152 let parsed: UpdateConnectionFunctionRequest =
153 xml_io::from_xml_root(&req.body).map_err(|e| {
154 invalid_argument(format!("invalid UpdateConnectionFunctionRequest XML: {e}"))
155 })?;
156 let mut state = self.state.write();
157 let account = state
158 .accounts
159 .get_mut(DEFAULT_ACCOUNT)
160 .ok_or_else(|| not_found("ConnectionFunction", &name))?;
161 let f = account
162 .connection_functions
163 .get_mut(&name)
164 .ok_or_else(|| not_found("ConnectionFunction", &name))?;
165 if f.etag != if_match {
166 return Err(precondition_failed());
167 }
168 f.runtime = parsed.connection_function_config.runtime;
169 f.comment = parsed.connection_function_config.comment;
170 f.code = base64::engine::general_purpose::STANDARD
171 .decode(parsed.connection_function_code.trim())
172 .unwrap_or_else(|_| parsed.connection_function_code.into_bytes());
173 f.etag = generate_id_with_prefix("E");
174 f.last_modified_time = Utc::now();
175 f.status = "UNPUBLISHED".to_string();
176 f.stage = "DEVELOPMENT".to_string();
177 let snap = f.clone();
178 drop(state);
179 let body = render_connection_function_summary(&snap, true);
180 Ok(xml_with_etag(StatusCode::OK, body, &snap.etag, None))
181 }
182
183 pub(crate) fn delete_connection_function(
184 &self,
185 req: &AwsRequest,
186 route: &Route,
187 ) -> Result<AwsResponse, AwsServiceError> {
188 let name = route_id(route, "ConnectionFunction")?;
189 let if_match = require_if_match(req)?;
190 let mut state = self.state.write();
191 let account = state
192 .accounts
193 .get_mut(DEFAULT_ACCOUNT)
194 .ok_or_else(|| not_found("ConnectionFunction", &name))?;
195 let f = account
196 .connection_functions
197 .get(&name)
198 .ok_or_else(|| not_found("ConnectionFunction", &name))?;
199 if f.etag != if_match {
200 return Err(precondition_failed());
201 }
202 account.connection_functions.remove(&name);
203 drop(state);
204 Ok(crate::policies::empty(StatusCode::NO_CONTENT))
205 }
206
207 pub(crate) fn list_connection_functions(
208 &self,
209 _req: &AwsRequest,
210 ) -> Result<AwsResponse, AwsServiceError> {
211 let state = self.state.read();
212 let mut items: Vec<StoredConnectionFunction> = state
213 .accounts
214 .get(DEFAULT_ACCOUNT)
215 .map(|a| a.connection_functions.values().cloned().collect())
216 .unwrap_or_default();
217 drop(state);
218 items.sort_by(|a, b| a.name.cmp(&b.name));
219 let mut body = String::with_capacity(512);
220 body.push_str(XML_DECL);
221 body.push_str(&format!("<ListConnectionFunctionsResult xmlns=\"{NS}\">"));
222 body.push_str("<NextMarker></NextMarker>");
223 body.push_str("<ConnectionFunctions>");
224 for f in &items {
225 body.push_str(&render_connection_function_summary_inner(f));
226 }
227 body.push_str("</ConnectionFunctions>");
228 body.push_str("</ListConnectionFunctionsResult>");
229 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
230 }
231
232 pub(crate) fn publish_connection_function(
233 &self,
234 req: &AwsRequest,
235 route: &Route,
236 ) -> Result<AwsResponse, AwsServiceError> {
237 let name = route_id(route, "ConnectionFunction")?;
238 let if_match = require_if_match(req)?;
239 let mut state = self.state.write();
240 let account = state
241 .accounts
242 .get_mut(DEFAULT_ACCOUNT)
243 .ok_or_else(|| not_found("ConnectionFunction", &name))?;
244 let f = account
245 .connection_functions
246 .get_mut(&name)
247 .ok_or_else(|| not_found("ConnectionFunction", &name))?;
248 if f.etag != if_match {
249 return Err(precondition_failed());
250 }
251 f.status = "DEPLOYED".to_string();
252 f.stage = "LIVE".to_string();
253 f.last_modified_time = Utc::now();
254 f.live_code = Some(f.code.clone());
259 let snap = f.clone();
260 drop(state);
261 let body = render_connection_function_summary(&snap, true);
262 Ok(xml_with_etag(StatusCode::OK, body, &snap.etag, None))
263 }
264
265 pub(crate) fn test_connection_function(
266 &self,
267 req: &AwsRequest,
268 route: &Route,
269 ) -> Result<AwsResponse, AwsServiceError> {
270 let name = route_id(route, "ConnectionFunction")?;
271 let if_match = require_if_match(req)?;
272 let parsed: TestConnectionFunctionRequest =
273 xml_io::from_xml_root(&req.body).map_err(|e| {
274 invalid_argument(format!("invalid TestConnectionFunctionRequest XML: {e}"))
275 })?;
276 let event_bytes = base64::engine::general_purpose::STANDARD
277 .decode(parsed.connection_object.trim().as_bytes())
278 .map_err(|e| invalid_argument(format!("ConnectionObject is not valid base64: {e}")))?;
279 let state = self.state.read();
280 let f = state
281 .accounts
282 .get(DEFAULT_ACCOUNT)
283 .and_then(|a| a.connection_functions.get(&name).cloned())
284 .ok_or_else(|| {
285 aws_error(
286 StatusCode::NOT_FOUND,
287 "EntityNotFound",
288 format!("ConnectionFunction {name} does not exist"),
289 )
290 })?;
291 drop(state);
292 if f.etag != if_match {
293 return Err(precondition_failed());
294 }
295 let stage = parsed.stage.as_deref().unwrap_or("DEVELOPMENT");
301 let source: &[u8] = if stage.eq_ignore_ascii_case("LIVE") {
302 f.live_code.as_deref().unwrap_or(&f.code)
303 } else {
304 &f.code
305 };
306 let code = std::str::from_utf8(source)
307 .map_err(|e| invalid_argument(format!("function code is not valid UTF-8: {e}")))?;
308 let exec = crate::js_runtime::run_handler(code, &event_bytes);
309
310 let mut body = String::with_capacity(1024);
311 body.push_str(XML_DECL);
312 body.push_str(&format!("<ConnectionFunctionTestResult xmlns=\"{NS}\">"));
313 body.push_str(&render_connection_function_summary_inner(&f));
314 body.push_str(&format!(
315 "<ComputeUtilization>{}</ComputeUtilization>",
316 exec.compute_utilization
317 ));
318 body.push_str("<ConnectionFunctionExecutionLogs>");
319 for line in &exec.logs {
320 body.push_str(&format!("<member>{}</member>", esc(line)));
321 }
322 body.push_str("</ConnectionFunctionExecutionLogs>");
323 body.push_str(&format!(
324 "<ConnectionFunctionErrorMessage>{}</ConnectionFunctionErrorMessage>",
325 esc(exec.error.as_deref().unwrap_or(""))
326 ));
327 body.push_str(&format!(
328 "<ConnectionFunctionOutput>{}</ConnectionFunctionOutput>",
329 esc(exec.output.as_deref().unwrap_or(""))
330 ));
331 body.push_str("</ConnectionFunctionTestResult>");
332 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
333 }
334}
335
336#[derive(Debug, Default, serde::Deserialize)]
337#[serde(rename_all = "PascalCase")]
338struct TestConnectionFunctionRequest {
339 #[serde(default)]
340 connection_object: String,
341 #[serde(default)]
342 stage: Option<String>,
343}
344
345fn render_connection_function_summary(f: &StoredConnectionFunction, with_decl: bool) -> String {
346 let mut out = String::with_capacity(512);
347 if with_decl {
348 out.push_str(XML_DECL);
349 }
350 out.push_str(&format!("<ConnectionFunctionSummary xmlns=\"{NS}\">"));
351 push_summary_body(&mut out, f);
352 out.push_str("</ConnectionFunctionSummary>");
353 out
354}
355
356fn render_connection_function_summary_inner(f: &StoredConnectionFunction) -> String {
357 let mut out = String::with_capacity(512);
358 out.push_str("<ConnectionFunctionSummary>");
359 push_summary_body(&mut out, f);
360 out.push_str("</ConnectionFunctionSummary>");
361 out
362}
363
364fn push_summary_body(out: &mut String, f: &StoredConnectionFunction) {
365 out.push_str(&format!("<Name>{}</Name>", esc(&f.name)));
366 out.push_str(&format!("<Id>{}</Id>", esc(&f.id)));
367 out.push_str(&format!("<Status>{}</Status>", esc(&f.status)));
368 out.push_str(&format!(
369 "<ConnectionFunctionArn>{}</ConnectionFunctionArn>",
370 esc(&f.arn)
371 ));
372 out.push_str(&format!("<Stage>{}</Stage>", esc(&f.stage)));
373 out.push_str(&format!(
374 "<CreatedTime>{}</CreatedTime>",
375 rfc3339(&f.created_time)
376 ));
377 out.push_str(&format!(
378 "<LastModifiedTime>{}</LastModifiedTime>",
379 rfc3339(&f.last_modified_time)
380 ));
381 out.push_str("<ConnectionFunctionConfig>");
382 out.push_str(&format!("<Comment>{}</Comment>", esc(&f.comment)));
383 out.push_str(&format!("<Runtime>{}</Runtime>", esc(&f.runtime)));
384 out.push_str("</ConnectionFunctionConfig>");
385}
386
387#[cfg(test)]
388mod tests {
389 use super::*;
390 use crate::service::CloudFrontService;
391 use crate::state::CloudFrontAccounts;
392 use bytes::Bytes;
393 use fakecloud_core::service::AwsService;
394 use http::HeaderValue;
395 use parking_lot::RwLock;
396 use std::sync::Arc;
397
398 fn svc() -> CloudFrontService {
399 CloudFrontService::new(Arc::new(RwLock::new(CloudFrontAccounts::new())))
400 }
401
402 fn req(method: http::Method, path: &str, body: &str, if_match: Option<&str>) -> AwsRequest {
403 let mut headers = HeaderMap::new();
404 if let Some(v) = if_match {
405 headers.insert(http::header::IF_MATCH, HeaderValue::from_str(v).unwrap());
406 }
407 AwsRequest {
408 service: "cloudfront".into(),
409 action: String::new(),
410 region: "us-east-1".into(),
411 account_id: DEFAULT_ACCOUNT.into(),
412 request_id: uuid::Uuid::new_v4().to_string(),
413 headers,
414 query_params: std::collections::HashMap::new(),
415 body_stream: parking_lot::Mutex::new(None),
416 body: Bytes::from(body.to_string()),
417 path_segments: path
418 .split('/')
419 .filter(|s| !s.is_empty())
420 .map(String::from)
421 .collect(),
422 raw_path: path.into(),
423 raw_query: String::new(),
424 method,
425 is_query_protocol: false,
426 access_key_id: None,
427 principal: None,
428 }
429 }
430
431 async fn create_cfn(svc: &CloudFrontService, name: &str, code: &str) -> String {
432 let code_b64 = base64::engine::general_purpose::STANDARD.encode(code.as_bytes());
433 let body = format!(
434 r#"<?xml version="1.0"?>
435<CreateConnectionFunctionRequest xmlns="{NS}">
436 <Name>{name}</Name>
437 <ConnectionFunctionConfig>
438 <Comment>t</Comment>
439 <Runtime>cloudfront-js-2.0</Runtime>
440 </ConnectionFunctionConfig>
441 <ConnectionFunctionCode>{code_b64}</ConnectionFunctionCode>
442</CreateConnectionFunctionRequest>"#
443 );
444 let resp = svc
445 .handle(req(
446 http::Method::POST,
447 "/2020-05-31/connection-function",
448 &body,
449 None,
450 ))
451 .await
452 .unwrap();
453 assert_eq!(resp.status, StatusCode::CREATED);
454 resp.headers
455 .get(ETAG)
456 .unwrap()
457 .to_str()
458 .unwrap()
459 .to_string()
460 }
461
462 fn test_cfn_request_xml(event_json: &str) -> String {
463 test_cfn_request_xml_with_stage(event_json, "DEVELOPMENT")
464 }
465
466 fn test_cfn_request_xml_with_stage(event_json: &str, stage: &str) -> String {
467 let event_b64 = base64::engine::general_purpose::STANDARD.encode(event_json.as_bytes());
468 format!(
469 r#"<?xml version="1.0"?>
470<TestConnectionFunctionRequest xmlns="{NS}">
471 <Stage>{stage}</Stage>
472 <ConnectionObject>{event_b64}</ConnectionObject>
473</TestConnectionFunctionRequest>"#
474 )
475 }
476
477 async fn update_cfn(svc: &CloudFrontService, name: &str, code: &str, if_match: &str) -> String {
478 let code_b64 = base64::engine::general_purpose::STANDARD.encode(code.as_bytes());
479 let body = format!(
480 r#"<?xml version="1.0"?>
481<UpdateConnectionFunctionRequest xmlns="{NS}">
482 <ConnectionFunctionConfig>
483 <Comment>t</Comment>
484 <Runtime>cloudfront-js-2.0</Runtime>
485 </ConnectionFunctionConfig>
486 <ConnectionFunctionCode>{code_b64}</ConnectionFunctionCode>
487</UpdateConnectionFunctionRequest>"#
488 );
489 let resp = svc
490 .handle(req(
491 http::Method::PUT,
492 &format!("/2020-05-31/connection-function/{name}"),
493 &body,
494 Some(if_match),
495 ))
496 .await
497 .unwrap();
498 assert_eq!(resp.status, StatusCode::OK);
499 resp.headers
500 .get(ETAG)
501 .unwrap()
502 .to_str()
503 .unwrap()
504 .to_string()
505 }
506
507 async fn publish_cfn(svc: &CloudFrontService, name: &str, if_match: &str) -> String {
508 let resp = svc
509 .handle(req(
510 http::Method::POST,
511 &format!("/2020-05-31/connection-function/{name}/publish"),
512 "",
513 Some(if_match),
514 ))
515 .await
516 .unwrap();
517 assert_eq!(resp.status, StatusCode::OK);
518 resp.headers
519 .get(ETAG)
520 .unwrap()
521 .to_str()
522 .unwrap()
523 .to_string()
524 }
525
526 #[tokio::test]
527 async fn test_connection_function_executes_handler_and_returns_result() {
528 let svc = svc();
529 let etag = create_cfn(
530 &svc,
531 "cfn-ok",
532 r#"function handler(event) { event.headers.x = "y"; return event; }"#,
533 )
534 .await;
535 let body = test_cfn_request_xml(r#"{"headers":{}}"#);
536 let resp = svc
537 .handle(req(
538 http::Method::POST,
539 "/2020-05-31/connection-function/cfn-ok/test",
540 &body,
541 Some(&etag),
542 ))
543 .await
544 .unwrap();
545 assert_eq!(resp.status, StatusCode::OK);
546 let xml = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
547 assert!(
548 xml.contains(""x":"y""),
549 "expected x:y in output, got {xml}"
550 );
551 assert!(
552 xml.contains("<ConnectionFunctionErrorMessage></ConnectionFunctionErrorMessage>"),
553 "expected empty error, got {xml}"
554 );
555 }
556
557 #[tokio::test]
558 async fn test_connection_function_propagates_js_error_into_message() {
559 let svc = svc();
560 let etag = create_cfn(
561 &svc,
562 "cfn-err",
563 r#"function handler() { throw new Error("boom"); }"#,
564 )
565 .await;
566 let body = test_cfn_request_xml("{}");
567 let resp = svc
568 .handle(req(
569 http::Method::POST,
570 "/2020-05-31/connection-function/cfn-err/test",
571 &body,
572 Some(&etag),
573 ))
574 .await
575 .unwrap();
576 assert_eq!(resp.status, StatusCode::OK);
577 let xml = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
578 assert!(xml.contains("boom"), "expected boom, got {xml}");
579 assert!(xml.contains("<ConnectionFunctionOutput></ConnectionFunctionOutput>"));
580 }
581
582 #[tokio::test]
583 async fn test_connection_function_unknown_name_returns_error() {
584 let svc = svc();
585 let body = test_cfn_request_xml("{}");
586 let err = match svc
587 .handle(req(
588 http::Method::POST,
589 "/2020-05-31/connection-function/missing/test",
590 &body,
591 Some("E0"),
592 ))
593 .await
594 {
595 Err(e) => e,
596 Ok(_) => panic!("expected EntityNotFound, got Ok"),
597 };
598 assert_eq!(err.status(), StatusCode::NOT_FOUND);
599 assert_eq!(err.code(), "EntityNotFound");
600 }
601
602 #[tokio::test]
603 async fn test_connection_function_logs_error_and_marks_compute_over_100() {
604 let svc = svc();
605 let etag = create_cfn(
606 &svc,
607 "cfn-throws",
608 r#"function handler() { throw new Error("kaboom"); }"#,
609 )
610 .await;
611 let body = test_cfn_request_xml("{}");
612 let resp = svc
613 .handle(req(
614 http::Method::POST,
615 "/2020-05-31/connection-function/cfn-throws/test",
616 &body,
617 Some(&etag),
618 ))
619 .await
620 .unwrap();
621 assert_eq!(resp.status, StatusCode::OK);
622 let xml = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
623 assert!(xml.contains("kaboom"), "expected kaboom, got {xml}");
624 assert!(
625 xml.contains("<ConnectionFunctionExecutionLogs>") && xml.contains("ERROR: "),
626 "expected error log, got {xml}"
627 );
628 let cu_open = xml.find("<ComputeUtilization>").unwrap() + "<ComputeUtilization>".len();
629 let cu_close = xml.find("</ComputeUtilization>").unwrap();
630 let pct: u32 = xml[cu_open..cu_close].parse().unwrap();
631 assert!(pct > 100, "expected pct > 100 on error, got {pct}");
632 }
633
634 #[tokio::test]
635 async fn test_connection_function_stage_selects_published_or_development_code() {
636 let svc = svc();
641 let etag = create_cfn(&svc, "cfn-stage", r#"function handler() { return "v1"; }"#).await;
642 let pub_etag = publish_cfn(&svc, "cfn-stage", &etag).await;
643 let new_etag = update_cfn(
644 &svc,
645 "cfn-stage",
646 r#"function handler() { return "v2"; }"#,
647 &pub_etag,
648 )
649 .await;
650
651 let dev_body = test_cfn_request_xml_with_stage("{}", "DEVELOPMENT");
652 let dev_resp = svc
653 .handle(req(
654 http::Method::POST,
655 "/2020-05-31/connection-function/cfn-stage/test",
656 &dev_body,
657 Some(&new_etag),
658 ))
659 .await
660 .unwrap();
661 let dev_xml = std::str::from_utf8(dev_resp.body.expect_bytes()).unwrap();
662 assert!(
663 dev_xml.contains(""v2""),
664 "DEVELOPMENT should run latest update (v2), got {dev_xml}"
665 );
666
667 let live_body = test_cfn_request_xml_with_stage("{}", "LIVE");
668 let live_resp = svc
669 .handle(req(
670 http::Method::POST,
671 "/2020-05-31/connection-function/cfn-stage/test",
672 &live_body,
673 Some(&new_etag),
674 ))
675 .await
676 .unwrap();
677 let live_xml = std::str::from_utf8(live_resp.body.expect_bytes()).unwrap();
678 assert!(
679 live_xml.contains(""v1""),
680 "LIVE should run published snapshot (v1), got {live_xml}"
681 );
682 }
683
684 #[tokio::test]
685 async fn test_connection_function_infinite_loop_is_killed() {
686 let svc = svc();
687 let etag = create_cfn(&svc, "cfn-loop", r#"function handler() { while(1){} }"#).await;
688 let body = test_cfn_request_xml("{}");
689 let resp = svc
690 .handle(req(
691 http::Method::POST,
692 "/2020-05-31/connection-function/cfn-loop/test",
693 &body,
694 Some(&etag),
695 ))
696 .await
697 .unwrap();
698 assert_eq!(resp.status, StatusCode::OK);
699 let xml = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
700 assert!(
701 xml.contains("<ConnectionFunctionOutput></ConnectionFunctionOutput>"),
702 "expected empty output after kill, got {xml}"
703 );
704 assert!(
705 xml.contains("ERROR:") && xml.contains("limit"),
706 "expected timeout/limit log, got {xml}"
707 );
708 let cu_open = xml.find("<ComputeUtilization>").unwrap() + "<ComputeUtilization>".len();
709 let cu_close = xml.find("</ComputeUtilization>").unwrap();
710 let pct: u32 = xml[cu_open..cu_close].parse().unwrap();
711 assert!(pct > 100, "expected pct > 100, got {pct}");
712 }
713}