1use base64::Engine;
7use chrono::Utc;
8use http::header::ETAG;
9use http::{HeaderMap, StatusCode};
10
11use fakecloud_core::service::{AwsRequest, AwsResponse, AwsServiceError, ResponseBody};
12
13use crate::cfunctions::StoredConnectionFunction;
14use crate::policies::{
15 not_found, precondition_failed, require_if_match, rfc3339, route_id, xml_with_etag,
16};
17use crate::router::Route;
18use crate::service::{
19 aws_error, esc, extract_body_field, generate_id_with_prefix, invalid_argument, xml_response,
20 CloudFrontService, DEFAULT_ACCOUNT,
21};
22use crate::xml_io;
23
24const NS: &str = crate::NAMESPACE;
25const XML_DECL: &str = r#"<?xml version="1.0" encoding="UTF-8"?>"#;
26
27#[derive(Debug, Default, serde::Deserialize)]
28#[serde(rename_all = "PascalCase")]
29struct CreateConnectionFunctionRequest {
30 pub name: String,
31 pub connection_function_config: ConnectionFunctionConfigInput,
32 pub connection_function_code: String,
33}
34
35#[derive(Debug, Default, serde::Deserialize)]
36#[serde(rename_all = "PascalCase")]
37struct UpdateConnectionFunctionRequest {
38 pub connection_function_config: ConnectionFunctionConfigInput,
39 pub connection_function_code: String,
40}
41
42#[derive(Debug, Default, serde::Deserialize)]
43#[serde(rename_all = "PascalCase")]
44struct ConnectionFunctionConfigInput {
45 pub comment: String,
46 pub runtime: String,
47}
48
49impl CloudFrontService {
50 pub(crate) fn create_connection_function(
51 &self,
52 req: &AwsRequest,
53 ) -> Result<AwsResponse, AwsServiceError> {
54 let parsed: CreateConnectionFunctionRequest =
55 xml_io::from_xml_root(&req.body).map_err(|e| {
56 invalid_argument(format!("invalid CreateConnectionFunctionRequest XML: {e}"))
57 })?;
58 if parsed.name.is_empty() {
59 return Err(invalid_argument("Name is required"));
60 }
61 let mut state = self.state.write();
62 let account = state
63 .accounts
64 .entry(DEFAULT_ACCOUNT.to_string())
65 .or_default();
66 if account.connection_functions.contains_key(&parsed.name) {
67 return Err(aws_error(
68 StatusCode::CONFLICT,
69 "EntityAlreadyExists",
70 format!("ConnectionFunction {} already exists", parsed.name),
71 ));
72 }
73 let now = Utc::now();
74 let etag = generate_id_with_prefix("E");
75 let id = generate_id_with_prefix("CF");
76 let arn = format!(
77 "arn:aws:cloudfront::{}:connection-function/{}",
78 DEFAULT_ACCOUNT, parsed.name
79 );
80 let code = base64::engine::general_purpose::STANDARD
81 .decode(parsed.connection_function_code.trim())
82 .unwrap_or_else(|_| parsed.connection_function_code.into_bytes());
83 let stored = StoredConnectionFunction {
84 id,
85 name: parsed.name.clone(),
86 arn,
87 stage: "DEVELOPMENT".to_string(),
88 status: "UNPUBLISHED".to_string(),
89 runtime: parsed.connection_function_config.runtime,
90 comment: parsed.connection_function_config.comment,
91 code,
92 live_code: None,
93 etag: etag.clone(),
94 created_time: now,
95 last_modified_time: now,
96 };
97 account
98 .connection_functions
99 .insert(parsed.name.clone(), stored.clone());
100 drop(state);
101 let body = render_connection_function_summary(&stored, true);
102 Ok(xml_with_etag(StatusCode::CREATED, body, &etag, None))
103 }
104
105 pub(crate) fn describe_connection_function(
106 &self,
107 route: &Route,
108 ) -> Result<AwsResponse, AwsServiceError> {
109 let name = route_id(route, "ConnectionFunction")?;
110 let state = self.state.read();
111 let f = state
112 .accounts
113 .get(DEFAULT_ACCOUNT)
114 .and_then(|a| a.connection_functions.get(&name).cloned())
115 .ok_or_else(|| not_found("ConnectionFunction", &name))?;
116 drop(state);
117 let body = render_connection_function_summary(&f, true);
118 Ok(xml_with_etag(StatusCode::OK, body, &f.etag, None))
119 }
120
121 pub(crate) fn get_connection_function(
122 &self,
123 route: &Route,
124 ) -> Result<AwsResponse, AwsServiceError> {
125 let name = route_id(route, "ConnectionFunction")?;
126 let state = self.state.read();
127 let f = state
128 .accounts
129 .get(DEFAULT_ACCOUNT)
130 .and_then(|a| a.connection_functions.get(&name).cloned())
131 .ok_or_else(|| not_found("ConnectionFunction", &name))?;
132 drop(state);
133 let mut headers = HeaderMap::new();
134 if let Ok(v) = http::HeaderValue::from_str(&f.etag) {
135 headers.insert(ETAG, v);
136 }
137 Ok(AwsResponse {
138 status: StatusCode::OK,
139 headers,
140 content_type: "application/octet-stream".to_string(),
141 body: ResponseBody::Bytes(bytes::Bytes::from(f.code.clone())),
142 })
143 }
144
145 pub(crate) fn update_connection_function(
146 &self,
147 req: &AwsRequest,
148 route: &Route,
149 ) -> Result<AwsResponse, AwsServiceError> {
150 let name = route_id(route, "ConnectionFunction")?;
151 let if_match = require_if_match(req)?;
152 let parsed: UpdateConnectionFunctionRequest =
153 xml_io::from_xml_root(&req.body).map_err(|e| {
154 invalid_argument(format!("invalid UpdateConnectionFunctionRequest XML: {e}"))
155 })?;
156 let mut state = self.state.write();
157 let account = state
158 .accounts
159 .get_mut(DEFAULT_ACCOUNT)
160 .ok_or_else(|| not_found("ConnectionFunction", &name))?;
161 let f = account
162 .connection_functions
163 .get_mut(&name)
164 .ok_or_else(|| not_found("ConnectionFunction", &name))?;
165 if f.etag != if_match {
166 return Err(precondition_failed());
167 }
168 f.runtime = parsed.connection_function_config.runtime;
169 f.comment = parsed.connection_function_config.comment;
170 f.code = base64::engine::general_purpose::STANDARD
171 .decode(parsed.connection_function_code.trim())
172 .unwrap_or_else(|_| parsed.connection_function_code.into_bytes());
173 f.etag = generate_id_with_prefix("E");
174 f.last_modified_time = Utc::now();
175 f.status = "UNPUBLISHED".to_string();
176 f.stage = "DEVELOPMENT".to_string();
177 let snap = f.clone();
178 drop(state);
179 let body = render_connection_function_summary(&snap, true);
180 Ok(xml_with_etag(StatusCode::OK, body, &snap.etag, None))
181 }
182
183 pub(crate) fn delete_connection_function(
184 &self,
185 req: &AwsRequest,
186 route: &Route,
187 ) -> Result<AwsResponse, AwsServiceError> {
188 let name = route_id(route, "ConnectionFunction")?;
189 let if_match = require_if_match(req)?;
190 let mut state = self.state.write();
191 let account = state
192 .accounts
193 .get_mut(DEFAULT_ACCOUNT)
194 .ok_or_else(|| not_found("ConnectionFunction", &name))?;
195 let f = account
196 .connection_functions
197 .get(&name)
198 .ok_or_else(|| not_found("ConnectionFunction", &name))?;
199 if f.etag != if_match {
200 return Err(precondition_failed());
201 }
202 account.connection_functions.remove(&name);
203 drop(state);
204 Ok(crate::policies::empty(StatusCode::NO_CONTENT))
205 }
206
207 pub(crate) fn list_connection_functions(
208 &self,
209 req: &AwsRequest,
210 ) -> Result<AwsResponse, AwsServiceError> {
211 if let Some(stage) = extract_body_field(&req.body, "Stage") {
217 if stage != "DEVELOPMENT" && stage != "LIVE" {
218 return Err(crate::service::invalid_argument(format!(
219 "Stage must be one of 'DEVELOPMENT' or 'LIVE', got '{stage}'"
220 )));
221 }
222 }
223 let state = self.state.read();
224 let mut items: Vec<StoredConnectionFunction> = state
225 .accounts
226 .get(DEFAULT_ACCOUNT)
227 .map(|a| a.connection_functions.values().cloned().collect())
228 .unwrap_or_default();
229 drop(state);
230 items.sort_by(|a, b| a.name.cmp(&b.name));
231 let mut body = String::with_capacity(512);
232 body.push_str(XML_DECL);
233 body.push_str(&format!("<ListConnectionFunctionsResult xmlns=\"{NS}\">"));
234 body.push_str("<NextMarker></NextMarker>");
235 body.push_str("<ConnectionFunctions>");
236 for f in &items {
237 body.push_str(&render_connection_function_summary_inner(f));
238 }
239 body.push_str("</ConnectionFunctions>");
240 body.push_str("</ListConnectionFunctionsResult>");
241 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
242 }
243
244 pub(crate) fn publish_connection_function(
245 &self,
246 req: &AwsRequest,
247 route: &Route,
248 ) -> Result<AwsResponse, AwsServiceError> {
249 let name = route_id(route, "ConnectionFunction")?;
250 let if_match = require_if_match(req)?;
251 let mut state = self.state.write();
252 let account = state
253 .accounts
254 .get_mut(DEFAULT_ACCOUNT)
255 .ok_or_else(|| not_found("ConnectionFunction", &name))?;
256 let f = account
257 .connection_functions
258 .get_mut(&name)
259 .ok_or_else(|| not_found("ConnectionFunction", &name))?;
260 if f.etag != if_match {
261 return Err(precondition_failed());
262 }
263 f.status = "DEPLOYED".to_string();
264 f.stage = "LIVE".to_string();
265 f.last_modified_time = Utc::now();
266 f.live_code = Some(f.code.clone());
271 let snap = f.clone();
272 drop(state);
273 let body = render_connection_function_summary(&snap, true);
274 Ok(xml_with_etag(StatusCode::OK, body, &snap.etag, None))
275 }
276
277 pub(crate) fn test_connection_function(
278 &self,
279 req: &AwsRequest,
280 route: &Route,
281 ) -> Result<AwsResponse, AwsServiceError> {
282 let name = route_id(route, "ConnectionFunction")?;
283 let if_match = require_if_match(req)?;
284 let parsed: TestConnectionFunctionRequest =
285 xml_io::from_xml_root(&req.body).map_err(|e| {
286 invalid_argument(format!("invalid TestConnectionFunctionRequest XML: {e}"))
287 })?;
288 let event_bytes = base64::engine::general_purpose::STANDARD
289 .decode(parsed.connection_object.trim().as_bytes())
290 .map_err(|e| invalid_argument(format!("ConnectionObject is not valid base64: {e}")))?;
291 let state = self.state.read();
292 let f = state
293 .accounts
294 .get(DEFAULT_ACCOUNT)
295 .and_then(|a| a.connection_functions.get(&name).cloned())
296 .ok_or_else(|| {
297 aws_error(
298 StatusCode::NOT_FOUND,
299 "EntityNotFound",
300 format!("ConnectionFunction {name} does not exist"),
301 )
302 })?;
303 drop(state);
304 if f.etag != if_match {
305 return Err(precondition_failed());
306 }
307 let stage = parsed.stage.as_deref().unwrap_or("DEVELOPMENT");
313 let source: &[u8] = if stage.eq_ignore_ascii_case("LIVE") {
314 f.live_code.as_deref().unwrap_or(&f.code)
315 } else {
316 &f.code
317 };
318 let code = std::str::from_utf8(source)
319 .map_err(|e| invalid_argument(format!("function code is not valid UTF-8: {e}")))?;
320 let exec = crate::js_runtime::run_handler(code, &event_bytes);
321
322 let mut body = String::with_capacity(1024);
323 body.push_str(XML_DECL);
324 body.push_str(&format!("<ConnectionFunctionTestResult xmlns=\"{NS}\">"));
325 body.push_str(&render_connection_function_summary_inner(&f));
326 body.push_str(&format!(
327 "<ComputeUtilization>{}</ComputeUtilization>",
328 exec.compute_utilization
329 ));
330 body.push_str("<ConnectionFunctionExecutionLogs>");
331 for line in &exec.logs {
332 body.push_str(&format!("<member>{}</member>", esc(line)));
333 }
334 body.push_str("</ConnectionFunctionExecutionLogs>");
335 body.push_str(&format!(
336 "<ConnectionFunctionErrorMessage>{}</ConnectionFunctionErrorMessage>",
337 esc(exec.error.as_deref().unwrap_or(""))
338 ));
339 body.push_str(&format!(
340 "<ConnectionFunctionOutput>{}</ConnectionFunctionOutput>",
341 esc(exec.output.as_deref().unwrap_or(""))
342 ));
343 body.push_str("</ConnectionFunctionTestResult>");
344 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
345 }
346}
347
348#[derive(Debug, Default, serde::Deserialize)]
349#[serde(rename_all = "PascalCase")]
350struct TestConnectionFunctionRequest {
351 #[serde(default)]
352 connection_object: String,
353 #[serde(default)]
354 stage: Option<String>,
355}
356
357fn render_connection_function_summary(f: &StoredConnectionFunction, with_decl: bool) -> String {
358 let mut out = String::with_capacity(512);
359 if with_decl {
360 out.push_str(XML_DECL);
361 }
362 out.push_str(&format!("<ConnectionFunctionSummary xmlns=\"{NS}\">"));
363 push_summary_body(&mut out, f);
364 out.push_str("</ConnectionFunctionSummary>");
365 out
366}
367
368fn render_connection_function_summary_inner(f: &StoredConnectionFunction) -> String {
369 let mut out = String::with_capacity(512);
370 out.push_str("<ConnectionFunctionSummary>");
371 push_summary_body(&mut out, f);
372 out.push_str("</ConnectionFunctionSummary>");
373 out
374}
375
376fn push_summary_body(out: &mut String, f: &StoredConnectionFunction) {
377 out.push_str(&format!("<Name>{}</Name>", esc(&f.name)));
378 out.push_str(&format!("<Id>{}</Id>", esc(&f.id)));
379 out.push_str(&format!("<Status>{}</Status>", esc(&f.status)));
380 out.push_str(&format!(
381 "<ConnectionFunctionArn>{}</ConnectionFunctionArn>",
382 esc(&f.arn)
383 ));
384 out.push_str(&format!("<Stage>{}</Stage>", esc(&f.stage)));
385 out.push_str(&format!(
386 "<CreatedTime>{}</CreatedTime>",
387 rfc3339(&f.created_time)
388 ));
389 out.push_str(&format!(
390 "<LastModifiedTime>{}</LastModifiedTime>",
391 rfc3339(&f.last_modified_time)
392 ));
393 out.push_str("<ConnectionFunctionConfig>");
394 out.push_str(&format!("<Comment>{}</Comment>", esc(&f.comment)));
395 out.push_str(&format!("<Runtime>{}</Runtime>", esc(&f.runtime)));
396 out.push_str("</ConnectionFunctionConfig>");
397}
398
399#[cfg(test)]
400mod tests {
401 use super::*;
402 use crate::service::CloudFrontService;
403 use crate::state::CloudFrontAccounts;
404 use bytes::Bytes;
405 use fakecloud_core::service::AwsService;
406 use http::HeaderValue;
407 use parking_lot::RwLock;
408 use std::sync::Arc;
409
410 fn svc() -> CloudFrontService {
411 CloudFrontService::new(Arc::new(RwLock::new(CloudFrontAccounts::new())))
412 }
413
414 fn req(method: http::Method, path: &str, body: &str, if_match: Option<&str>) -> AwsRequest {
415 let mut headers = HeaderMap::new();
416 if let Some(v) = if_match {
417 headers.insert(http::header::IF_MATCH, HeaderValue::from_str(v).unwrap());
418 }
419 AwsRequest {
420 service: "cloudfront".into(),
421 action: String::new(),
422 region: "us-east-1".into(),
423 account_id: DEFAULT_ACCOUNT.into(),
424 request_id: uuid::Uuid::new_v4().to_string(),
425 headers,
426 query_params: std::collections::HashMap::new(),
427 body_stream: parking_lot::Mutex::new(None),
428 body: Bytes::from(body.to_string()),
429 path_segments: path
430 .split('/')
431 .filter(|s| !s.is_empty())
432 .map(String::from)
433 .collect(),
434 raw_path: path.into(),
435 raw_query: String::new(),
436 method,
437 is_query_protocol: false,
438 access_key_id: None,
439 principal: None,
440 }
441 }
442
443 async fn create_cfn(svc: &CloudFrontService, name: &str, code: &str) -> String {
444 let code_b64 = base64::engine::general_purpose::STANDARD.encode(code.as_bytes());
445 let body = format!(
446 r#"<?xml version="1.0"?>
447<CreateConnectionFunctionRequest xmlns="{NS}">
448 <Name>{name}</Name>
449 <ConnectionFunctionConfig>
450 <Comment>t</Comment>
451 <Runtime>cloudfront-js-2.0</Runtime>
452 </ConnectionFunctionConfig>
453 <ConnectionFunctionCode>{code_b64}</ConnectionFunctionCode>
454</CreateConnectionFunctionRequest>"#
455 );
456 let resp = svc
457 .handle(req(
458 http::Method::POST,
459 "/2020-05-31/connection-function",
460 &body,
461 None,
462 ))
463 .await
464 .unwrap();
465 assert_eq!(resp.status, StatusCode::CREATED);
466 resp.headers
467 .get(ETAG)
468 .unwrap()
469 .to_str()
470 .unwrap()
471 .to_string()
472 }
473
474 fn test_cfn_request_xml(event_json: &str) -> String {
475 test_cfn_request_xml_with_stage(event_json, "DEVELOPMENT")
476 }
477
478 fn test_cfn_request_xml_with_stage(event_json: &str, stage: &str) -> String {
479 let event_b64 = base64::engine::general_purpose::STANDARD.encode(event_json.as_bytes());
480 format!(
481 r#"<?xml version="1.0"?>
482<TestConnectionFunctionRequest xmlns="{NS}">
483 <Stage>{stage}</Stage>
484 <ConnectionObject>{event_b64}</ConnectionObject>
485</TestConnectionFunctionRequest>"#
486 )
487 }
488
489 async fn update_cfn(svc: &CloudFrontService, name: &str, code: &str, if_match: &str) -> String {
490 let code_b64 = base64::engine::general_purpose::STANDARD.encode(code.as_bytes());
491 let body = format!(
492 r#"<?xml version="1.0"?>
493<UpdateConnectionFunctionRequest xmlns="{NS}">
494 <ConnectionFunctionConfig>
495 <Comment>t</Comment>
496 <Runtime>cloudfront-js-2.0</Runtime>
497 </ConnectionFunctionConfig>
498 <ConnectionFunctionCode>{code_b64}</ConnectionFunctionCode>
499</UpdateConnectionFunctionRequest>"#
500 );
501 let resp = svc
502 .handle(req(
503 http::Method::PUT,
504 &format!("/2020-05-31/connection-function/{name}"),
505 &body,
506 Some(if_match),
507 ))
508 .await
509 .unwrap();
510 assert_eq!(resp.status, StatusCode::OK);
511 resp.headers
512 .get(ETAG)
513 .unwrap()
514 .to_str()
515 .unwrap()
516 .to_string()
517 }
518
519 async fn publish_cfn(svc: &CloudFrontService, name: &str, if_match: &str) -> String {
520 let resp = svc
521 .handle(req(
522 http::Method::POST,
523 &format!("/2020-05-31/connection-function/{name}/publish"),
524 "",
525 Some(if_match),
526 ))
527 .await
528 .unwrap();
529 assert_eq!(resp.status, StatusCode::OK);
530 resp.headers
531 .get(ETAG)
532 .unwrap()
533 .to_str()
534 .unwrap()
535 .to_string()
536 }
537
538 #[tokio::test]
539 async fn test_connection_function_executes_handler_and_returns_result() {
540 let svc = svc();
541 let etag = create_cfn(
542 &svc,
543 "cfn-ok",
544 r#"function handler(event) { event.headers.x = "y"; return event; }"#,
545 )
546 .await;
547 let body = test_cfn_request_xml(r#"{"headers":{}}"#);
548 let resp = svc
549 .handle(req(
550 http::Method::POST,
551 "/2020-05-31/connection-function/cfn-ok/test",
552 &body,
553 Some(&etag),
554 ))
555 .await
556 .unwrap();
557 assert_eq!(resp.status, StatusCode::OK);
558 let xml = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
559 assert!(
560 xml.contains(""x":"y""),
561 "expected x:y in output, got {xml}"
562 );
563 assert!(
564 xml.contains("<ConnectionFunctionErrorMessage></ConnectionFunctionErrorMessage>"),
565 "expected empty error, got {xml}"
566 );
567 }
568
569 #[tokio::test]
570 async fn test_connection_function_propagates_js_error_into_message() {
571 let svc = svc();
572 let etag = create_cfn(
573 &svc,
574 "cfn-err",
575 r#"function handler() { throw new Error("boom"); }"#,
576 )
577 .await;
578 let body = test_cfn_request_xml("{}");
579 let resp = svc
580 .handle(req(
581 http::Method::POST,
582 "/2020-05-31/connection-function/cfn-err/test",
583 &body,
584 Some(&etag),
585 ))
586 .await
587 .unwrap();
588 assert_eq!(resp.status, StatusCode::OK);
589 let xml = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
590 assert!(xml.contains("boom"), "expected boom, got {xml}");
591 assert!(xml.contains("<ConnectionFunctionOutput></ConnectionFunctionOutput>"));
592 }
593
594 #[tokio::test]
595 async fn test_connection_function_unknown_name_returns_error() {
596 let svc = svc();
597 let body = test_cfn_request_xml("{}");
598 let err = match svc
599 .handle(req(
600 http::Method::POST,
601 "/2020-05-31/connection-function/missing/test",
602 &body,
603 Some("E0"),
604 ))
605 .await
606 {
607 Err(e) => e,
608 Ok(_) => panic!("expected EntityNotFound, got Ok"),
609 };
610 assert_eq!(err.status(), StatusCode::NOT_FOUND);
611 assert_eq!(err.code(), "EntityNotFound");
612 }
613
614 #[tokio::test]
615 async fn test_connection_function_logs_error_and_marks_compute_over_100() {
616 let svc = svc();
617 let etag = create_cfn(
618 &svc,
619 "cfn-throws",
620 r#"function handler() { throw new Error("kaboom"); }"#,
621 )
622 .await;
623 let body = test_cfn_request_xml("{}");
624 let resp = svc
625 .handle(req(
626 http::Method::POST,
627 "/2020-05-31/connection-function/cfn-throws/test",
628 &body,
629 Some(&etag),
630 ))
631 .await
632 .unwrap();
633 assert_eq!(resp.status, StatusCode::OK);
634 let xml = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
635 assert!(xml.contains("kaboom"), "expected kaboom, got {xml}");
636 assert!(
637 xml.contains("<ConnectionFunctionExecutionLogs>") && xml.contains("ERROR: "),
638 "expected error log, got {xml}"
639 );
640 let cu_open = xml.find("<ComputeUtilization>").unwrap() + "<ComputeUtilization>".len();
641 let cu_close = xml.find("</ComputeUtilization>").unwrap();
642 let pct: u32 = xml[cu_open..cu_close].parse().unwrap();
643 assert!(pct > 100, "expected pct > 100 on error, got {pct}");
644 }
645
646 #[tokio::test]
647 async fn test_connection_function_stage_selects_published_or_development_code() {
648 let svc = svc();
653 let etag = create_cfn(&svc, "cfn-stage", r#"function handler() { return "v1"; }"#).await;
654 let pub_etag = publish_cfn(&svc, "cfn-stage", &etag).await;
655 let new_etag = update_cfn(
656 &svc,
657 "cfn-stage",
658 r#"function handler() { return "v2"; }"#,
659 &pub_etag,
660 )
661 .await;
662
663 let dev_body = test_cfn_request_xml_with_stage("{}", "DEVELOPMENT");
664 let dev_resp = svc
665 .handle(req(
666 http::Method::POST,
667 "/2020-05-31/connection-function/cfn-stage/test",
668 &dev_body,
669 Some(&new_etag),
670 ))
671 .await
672 .unwrap();
673 let dev_xml = std::str::from_utf8(dev_resp.body.expect_bytes()).unwrap();
674 assert!(
675 dev_xml.contains(""v2""),
676 "DEVELOPMENT should run latest update (v2), got {dev_xml}"
677 );
678
679 let live_body = test_cfn_request_xml_with_stage("{}", "LIVE");
680 let live_resp = svc
681 .handle(req(
682 http::Method::POST,
683 "/2020-05-31/connection-function/cfn-stage/test",
684 &live_body,
685 Some(&new_etag),
686 ))
687 .await
688 .unwrap();
689 let live_xml = std::str::from_utf8(live_resp.body.expect_bytes()).unwrap();
690 assert!(
691 live_xml.contains(""v1""),
692 "LIVE should run published snapshot (v1), got {live_xml}"
693 );
694 }
695
696 #[tokio::test]
697 async fn test_connection_function_infinite_loop_is_killed() {
698 let svc = svc();
699 let etag = create_cfn(&svc, "cfn-loop", r#"function handler() { while(1){} }"#).await;
700 let body = test_cfn_request_xml("{}");
701 let resp = svc
702 .handle(req(
703 http::Method::POST,
704 "/2020-05-31/connection-function/cfn-loop/test",
705 &body,
706 Some(&etag),
707 ))
708 .await
709 .unwrap();
710 assert_eq!(resp.status, StatusCode::OK);
711 let xml = std::str::from_utf8(resp.body.expect_bytes()).unwrap();
712 assert!(
713 xml.contains("<ConnectionFunctionOutput></ConnectionFunctionOutput>"),
714 "expected empty output after kill, got {xml}"
715 );
716 assert!(
717 xml.contains("ERROR:") && xml.contains("limit"),
718 "expected timeout/limit log, got {xml}"
719 );
720 let cu_open = xml.find("<ComputeUtilization>").unwrap() + "<ComputeUtilization>".len();
721 let cu_close = xml.find("</ComputeUtilization>").unwrap();
722 let pct: u32 = xml[cu_open..cu_close].parse().unwrap();
723 assert!(pct > 100, "expected pct > 100, got {pct}");
724 }
725}