1use crate::engine_trait::ScriptEngineTrait;
2use crate::streams::HttpBodyResource;
3use async_trait::async_trait;
4use base64::Engine as _;
5use bytes::Bytes;
6use deno_core::{
7 Extension, JsRuntime, Op, OpState, ResourceId, RuntimeOptions, error::AnyError, op2,
8};
9use http_body_util::{BodyExt, Full};
10use relay_core_api::flow::{Flow, WebSocketMessage};
11use relay_core_lib::interceptor::{
12 BoxError, HttpBody, RequestAction, ResponseAction, WebSocketMessageAction,
13};
14use std::cell::RefCell;
15use std::collections::{HashMap, HashSet};
16use std::rc::Rc;
17use std::thread;
18use tokio::sync::{mpsc, oneshot};
19
20#[op2(fast)]
21fn op_log_level(#[string] level: String, #[string] msg: String) {
22 match level.as_str() {
23 "error" => tracing::error!("[User Script] {}", msg),
24 "warn" => tracing::warn!("[User Script] {}", msg),
25 "info" => tracing::info!("[User Script] {}", msg),
26 "debug" => tracing::debug!("[User Script] {}", msg),
27 _ => tracing::info!("[User Script] {}", msg),
28 }
29}
30
31#[op2(async)]
32#[serde]
33async fn op_read_body(
34 state: Rc<RefCell<OpState>>,
35 #[smi] rid: ResourceId,
36 #[smi] limit: usize,
37) -> Result<Vec<u8>, AnyError> {
38 let resource = {
39 let state = state.borrow();
40 state.resource_table.get_any(rid)?
41 };
42 let view = resource.read(limit).await?;
43 Ok(view.to_vec())
44}
45
46#[op2(fast)]
47fn op_close_body(state: &mut OpState, #[smi] rid: ResourceId) {
48 state.resource_table.take_any(rid).ok();
49}
50
51fn shared_state(state: &mut OpState) -> &mut HashMap<String, serde_json::Value> {
54 state.borrow_mut::<HashMap<String, serde_json::Value>>()
55}
56
57#[op2]
58#[serde]
59fn op_shared_state_get(state: &mut OpState, #[string] key: String) -> Option<serde_json::Value> {
60 shared_state(state).get(&key).cloned()
61}
62
63#[op2]
64fn op_shared_state_set(
65 state: &mut OpState,
66 #[string] key: String,
67 #[serde] value: serde_json::Value,
68) {
69 shared_state(state).insert(key, value);
70}
71
72#[op2(fast)]
73fn op_shared_state_delete(state: &mut OpState, #[string] key: String) -> bool {
74 shared_state(state).remove(&key).is_some()
75}
76
77#[op2(fast)]
78fn op_shared_state_clear(state: &mut OpState) {
79 shared_state(state).clear();
80}
81
82#[op2]
83#[serde]
84fn op_shared_state_keys(state: &mut OpState) -> Vec<String> {
85 shared_state(state).keys().cloned().collect()
86}
87
88fn env_allow(state: &OpState) -> &HashSet<String> {
91 state.borrow::<HashSet<String>>()
92}
93
94#[op2]
95#[string]
96fn op_env_get(state: &OpState, #[string] key: String) -> Option<String> {
97 let allowed = env_allow(state);
98 if allowed.is_empty() || !allowed.contains(&key) {
99 return None;
100 }
101 std::env::var(&key).ok()
102}
103
104#[op2]
107#[string]
108fn op_uuid_v4() -> String {
109 uuid::Uuid::new_v4().to_string()
110}
111
112#[op2]
113#[string]
114fn op_hash(#[string] algorithm: String, #[string] data: String) -> Result<String, AnyError> {
115 use sha1::Sha1;
116 use sha2::{Sha256, Sha512};
117 let bytes = data.as_bytes();
118 let hex = match algorithm.as_str() {
119 "sha1" => {
120 use sha1::Digest;
121 data_encoding::HEXLOWER.encode(&Sha1::digest(bytes))
122 }
123 "sha256" => {
124 use sha2::Digest;
125 data_encoding::HEXLOWER.encode(&Sha256::digest(bytes))
126 }
127 "sha512" => {
128 use sha2::Digest;
129 data_encoding::HEXLOWER.encode(&Sha512::digest(bytes))
130 }
131 "md5" => {
132 use md5::Digest;
133 data_encoding::HEXLOWER.encode(&md5::Md5::digest(bytes))
134 }
135 other => {
136 return Err(AnyError::msg(format!(
137 "unsupported hash algorithm: {}. Supported: sha1, sha256, sha512, md5",
138 other
139 )));
140 }
141 };
142 Ok(hex)
143}
144
145#[op2]
146#[string]
147fn op_base64_encode(#[string] data: String) -> String {
148 use base64::Engine as _;
149 base64::engine::general_purpose::STANDARD.encode(data.as_bytes())
150}
151
152#[op2]
153#[string]
154fn op_base64_decode(#[string] data: String) -> Result<String, AnyError> {
155 use base64::Engine as _;
156 let bytes = base64::engine::general_purpose::STANDARD
157 .decode(data.as_bytes())
158 .map_err(|e| AnyError::msg(format!("base64 decode error: {}", e)))?;
159 String::from_utf8(bytes).map_err(|e| AnyError::msg(format!("utf-8 error: {}", e)))
160}
161
162#[op2]
163#[serde]
164fn op_json_parse_safe(#[string] data: String) -> serde_json::Value {
165 serde_json::from_str(&data).unwrap_or(serde_json::Value::Null)
166}
167
168#[op2]
169#[string]
170fn op_json_stringify_pretty(#[serde] value: serde_json::Value) -> String {
171 serde_json::to_string_pretty(&value).unwrap_or_else(|_| String::new())
172}
173
174enum DenoCommand {
175 LoadScript(String, oneshot::Sender<Result<(), String>>),
176 OnRequestHeaders(Flow, oneshot::Sender<Result<Option<Flow>, String>>),
177 OnRequest(
178 Flow,
179 HttpBody,
180 oneshot::Sender<Result<(Option<Flow>, RequestAction), String>>,
181 ),
182 OnResponseHeaders(Flow, oneshot::Sender<Result<Option<Flow>, String>>),
183 OnResponse(
184 Flow,
185 HttpBody,
186 oneshot::Sender<Result<(Option<Flow>, ResponseAction), String>>,
187 ),
188 OnWebSocketMessage(
189 Flow,
190 WebSocketMessage,
191 oneshot::Sender<Result<WebSocketMessageAction, String>>,
192 ),
193}
194
195#[derive(Clone)]
196pub struct DenoScriptEngine {
197 tx: mpsc::Sender<DenoCommand>,
198}
199
200impl Default for DenoScriptEngine {
201 fn default() -> Self {
202 Self::new(HashSet::new())
203 }
204}
205
206impl DenoScriptEngine {
207 pub fn new(env_allow: HashSet<String>) -> Self {
208 let (tx, mut rx) = mpsc::channel(32);
209
210 thread::spawn(move || {
211 let rt = tokio::runtime::Builder::new_current_thread()
212 .enable_all()
213 .build()
214 .unwrap();
215
216 rt.block_on(async move {
217 let env_allow = env_allow; let ext = Extension {
219 name: "relay_core",
220 ops: std::borrow::Cow::Borrowed(&[
221 op_log_level::DECL,
222 op_read_body::DECL,
223 op_close_body::DECL,
224 op_shared_state_get::DECL,
225 op_shared_state_set::DECL,
226 op_shared_state_delete::DECL,
227 op_shared_state_clear::DECL,
228 op_shared_state_keys::DECL,
229 op_env_get::DECL,
230 op_uuid_v4::DECL,
231 op_hash::DECL,
232 op_base64_encode::DECL,
233 op_base64_decode::DECL,
234 op_json_parse_safe::DECL,
235 op_json_stringify_pretty::DECL,
236 ]),
237 op_state_fn: Some(Box::new({
238 let env_allow = env_allow.clone();
239 move |state| {
240 state.put(HashMap::<String, serde_json::Value>::new());
241 state.put(env_allow.clone());
242 }
243 })),
244 ..Default::default()
245 };
246
247 let mut js_runtime = JsRuntime::new(RuntimeOptions {
248 extensions: vec![ext],
249 ..Default::default()
250 });
251
252 let bootstrap = r#"
254 globalThis.console = {
255 log: (...args) => {
256 Deno.core.ops.op_log_level("log", _format(args));
257 },
258 info: (...args) => {
259 Deno.core.ops.op_log_level("info", _format(args));
260 },
261 warn: (...args) => {
262 Deno.core.ops.op_log_level("warn", _format(args));
263 },
264 error: (...args) => {
265 Deno.core.ops.op_log_level("error", _format(args));
266 },
267 debug: (...args) => {
268 Deno.core.ops.op_log_level("debug", _format(args));
269 },
270 };
271
272 function _format(args) {
273 return args.map(arg => {
274 if (typeof arg === 'object') {
275 try { return JSON.stringify(arg); }
276 catch { return String(arg); }
277 }
278 return String(arg);
279 }).join(" ");
280 }
281
282 class RelayBody {
283 constructor(rid) { this.rid = rid; }
284 async read(limit) {
285 return await Deno.core.ops.op_read_body(this.rid, limit || 65536);
286 }
287 close() {
288 Deno.core.ops.op_close_body(this.rid);
289 }
290 async text() {
291 const bytes = await this.read(10 * 1024 * 1024);
292 return new TextDecoder().decode(bytes);
293 }
294 async json() {
295 const txt = await this.text();
296 return JSON.parse(txt);
297 }
298 }
299 globalThis.RelayBody = RelayBody;
300
301 globalThis.relay = {
302 log: globalThis.console.log,
303 env: function(name) {
304 return Deno.core.ops.op_env_get(name);
305 },
306 uuid: function() {
307 return Deno.core.ops.op_uuid_v4();
308 },
309 hash: function(alg, data) {
310 return Deno.core.ops.op_hash(alg, data);
311 },
312 base64: {
313 encode: function(data) {
314 return Deno.core.ops.op_base64_encode(data);
315 },
316 decode: function(data) {
317 return Deno.core.ops.op_base64_decode(data);
318 },
319 },
320 json: {
321 parseSafe: function(str) {
322 return Deno.core.ops.op_json_parse_safe(str);
323 },
324 stringifyPretty: function(obj) {
325 return Deno.core.ops.op_json_stringify_pretty(obj);
326 },
327 },
328 };
329
330 // S1: sharedState — cross-hook shared map per isolate
331 globalThis.sharedState = {
332 get(key) {
333 return Deno.core.ops.op_shared_state_get(key);
334 },
335 set(key, value) {
336 Deno.core.ops.op_shared_state_set(key, value);
337 },
338 delete(key) {
339 return Deno.core.ops.op_shared_state_delete(key);
340 },
341 clear() {
342 Deno.core.ops.op_shared_state_clear();
343 },
344 keys() {
345 return Deno.core.ops.op_shared_state_keys();
346 },
347 };
348 "#;
349 js_runtime.execute_script("bootstrap", bootstrap).unwrap();
350
351 while let Some(cmd) = rx.recv().await {
352 match cmd {
353 DenoCommand::LoadScript(script, resp) => {
354 let res = js_runtime.execute_script("<anon>", script);
355 let res = if let Err(e) = res {
356 Err(e.to_string())
357 } else {
358 js_runtime
359 .run_event_loop(Default::default())
360 .await
361 .map(|_| ())
362 .map_err(|e| e.to_string())
363 };
364 let _ = resp.send(res);
365 }
366 DenoCommand::OnRequestHeaders(flow, resp) => {
367 let res = Self::handle_on_request_headers(&mut js_runtime, flow);
368 let _ = resp.send(res);
369 }
370 DenoCommand::OnRequest(flow, body, resp) => {
371 let res = Self::handle_on_request(&mut js_runtime, flow, body).await;
372 let _ = resp.send(res);
373 }
374 DenoCommand::OnResponseHeaders(flow, resp) => {
375 let res = Self::handle_on_response_headers(&mut js_runtime, flow);
376 let _ = resp.send(res);
377 }
378 DenoCommand::OnResponse(flow, body, resp) => {
379 let res = Self::handle_on_response(&mut js_runtime, flow, body).await;
380 let _ = resp.send(res);
381 }
382 DenoCommand::OnWebSocketMessage(flow, message, resp) => {
383 let res =
384 Self::handle_on_websocket_message(&mut js_runtime, flow, message);
385 let _ = resp.send(res);
386 }
387 }
388 }
389 });
390 });
391
392 Self { tx }
393 }
394
395 fn try_call_on_error(runtime: &mut JsRuntime, flow: &Flow, error: &str, stage: &str) {
398 let check_code = "typeof globalThis.onError === 'function'";
399 let exists = runtime
400 .execute_script("check_onError_v2", check_code)
401 .ok()
402 .map(|v| {
403 let mut scope = runtime.handle_scope();
404 let val = deno_core::v8::Local::new(&mut scope, v);
405 val.is_true()
406 })
407 .unwrap_or(false);
408
409 if !exists {
410 return;
411 }
412
413 let flow_json = match serde_json::to_string(flow) {
414 Ok(j) => j,
415 Err(_) => return,
416 };
417 let error_escaped = error.replace('\\', "\\\\").replace('\'', "\\'");
418 let code = format!(
419 "globalThis.onError({{}}, {}, '{}', '{}')",
420 flow_json, error_escaped, stage
421 );
422
423 let _ = runtime.execute_script("call_onError_v2", code);
424 }
425
426 fn handle_on_request_headers(
427 runtime: &mut JsRuntime,
428 flow: Flow,
429 ) -> Result<Option<Flow>, String> {
430 let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
431 let check_code = "typeof globalThis.onRequestHeaders === 'function'";
432 let exists = runtime
433 .execute_script("check_onRequestHeaders", check_code)
434 .map_err(|e| {
435 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequestHeaders");
436 e.to_string()
437 })?;
438 {
439 let mut scope = runtime.handle_scope();
440 let exists_val = deno_core::v8::Local::new(&mut scope, exists);
441 if !exists_val.is_true() {
442 return Ok(None);
443 }
444 }
445 let code = format!("globalThis.onRequestHeaders({{}}, {})", flow_json);
446 let result = runtime
447 .execute_script("call_onRequestHeaders", code)
448 .map_err(|e| {
449 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequestHeaders");
450 e.to_string()
451 })?;
452 let mut scope = runtime.handle_scope();
453 let result_val = deno_core::v8::Local::new(&mut scope, result);
454 if result_val.is_undefined() || result_val.is_null() {
455 return Ok(None);
456 }
457 let deser: Result<Flow, _> = deno_core::serde_v8::from_v8(&mut scope, result_val);
458 drop(scope);
459 let modified_flow = match deser {
460 Ok(f) => f,
461 Err(e) => {
462 let err_str = format!("Failed to deserialize flow: {}", e);
463 Self::try_call_on_error(runtime, &flow, &err_str, "onRequestHeaders");
464 return Err(err_str);
465 }
466 };
467 Ok(Some(modified_flow))
468 }
469
470 fn handle_on_response_headers(
471 runtime: &mut JsRuntime,
472 flow: Flow,
473 ) -> Result<Option<Flow>, String> {
474 let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
475 let check_code = "typeof globalThis.onResponseHeaders === 'function'";
476 let exists = runtime
477 .execute_script("check_onResponseHeaders", check_code)
478 .map_err(|e| {
479 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponseHeaders");
480 e.to_string()
481 })?;
482 {
483 let mut scope = runtime.handle_scope();
484 let exists_val = deno_core::v8::Local::new(&mut scope, exists);
485 if !exists_val.is_true() {
486 return Ok(None);
487 }
488 }
489 let code = format!("globalThis.onResponseHeaders({{}}, {})", flow_json);
490 let result = runtime
491 .execute_script("call_onResponseHeaders", code)
492 .map_err(|e| {
493 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponseHeaders");
494 e.to_string()
495 })?;
496 let mut scope = runtime.handle_scope();
497 let result_val = deno_core::v8::Local::new(&mut scope, result);
498 if result_val.is_undefined() || result_val.is_null() {
499 return Ok(None);
500 }
501 let deser: Result<Flow, _> = deno_core::serde_v8::from_v8(&mut scope, result_val);
502 drop(scope);
503 let modified_flow = match deser {
504 Ok(f) => f,
505 Err(e) => {
506 let err_str = format!("Failed to deserialize flow: {}", e);
507 Self::try_call_on_error(runtime, &flow, &err_str, "onResponseHeaders");
508 return Err(err_str);
509 }
510 };
511 Ok(Some(modified_flow))
512 }
513
514 async fn handle_on_request(
515 runtime: &mut JsRuntime,
516 flow: Flow,
517 body: HttpBody,
518 ) -> Result<(Option<Flow>, RequestAction), String> {
519 let resource = HttpBodyResource::new(body);
520 let rid = {
521 let op_state_rc = runtime.op_state();
522 let mut state = op_state_rc.borrow_mut();
523 state.resource_table.add(resource)
524 };
525
526 let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
527
528 let check_code = "typeof globalThis.onRequest === 'function'";
529 let exists = runtime
530 .execute_script("check_onRequest", check_code)
531 .map_err(|e| {
532 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequest");
533 e.to_string()
534 })?;
535
536 let exists_bool = {
537 let scope = &mut runtime.handle_scope();
538 let exists_val = deno_core::v8::Local::new(scope, exists);
539 exists_val.is_true()
540 };
541
542 if !exists_bool {
543 let resource = {
544 let op_state_rc = runtime.op_state();
545 let mut state = op_state_rc.borrow_mut();
546 state.resource_table.take::<HttpBodyResource>(rid).ok()
547 };
548 if let Some(res) = resource {
549 let body = crate::streams::create_body_from_resource(&res);
550 return Ok((None, RequestAction::Continue(body)));
551 } else {
552 return Ok((
553 None,
554 RequestAction::Continue(
555 http_body_util::Empty::new()
556 .map_err(|_| -> BoxError { unreachable!() })
557 .boxed(),
558 ),
559 ));
560 }
561 }
562
563 let code = format!(
564 "globalThis.onRequest(new RelayBody({}), {})",
565 rid, flow_json
566 );
567 let result = runtime
568 .execute_script("call_onRequest", code)
569 .map_err(|e| {
570 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequest");
571 e.to_string()
572 })?;
573
574 let result = runtime.resolve(result).await.map_err(|e| {
575 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequest");
576 e.to_string()
577 })?;
578
579 let (is_empty, modified_flow) = {
580 let mut scope = runtime.handle_scope();
581 let result_val = deno_core::v8::Local::new(&mut scope, result);
582
583 if result_val.is_undefined() || result_val.is_null() {
584 (true, None)
585 } else {
586 let deser: Result<Flow, _> = deno_core::serde_v8::from_v8(&mut scope, result_val);
587 drop(scope);
588 match deser {
589 Ok(f) => (false, Some(f)),
590 Err(e) => {
591 let err_str = format!("Failed to deserialize flow: {}", e);
592 Self::try_call_on_error(runtime, &flow, &err_str, "onRequest");
593 return Err(err_str);
594 }
595 }
596 }
597 };
598
599 if is_empty {
600 let resource = {
601 let op_state_rc = runtime.op_state();
602 let mut state = op_state_rc.borrow_mut();
603 state.resource_table.take::<HttpBodyResource>(rid).ok()
604 };
605 if let Some(res) = resource {
606 let body = crate::streams::create_body_from_resource(&res);
607 return Ok((None, RequestAction::Continue(body)));
608 } else {
609 return Ok((
610 None,
611 RequestAction::Continue(
612 http_body_util::Empty::new()
613 .map_err(|_| -> BoxError { unreachable!() })
614 .boxed(),
615 ),
616 ));
617 }
618 }
619
620 let modified_flow = modified_flow.unwrap();
621
622 let resource = {
623 let op_state_rc = runtime.op_state();
624 let mut state = op_state_rc.borrow_mut();
625 state.resource_table.take::<HttpBodyResource>(rid).ok()
626 };
627
628 let new_body: HttpBody = if let Some(res) = resource {
629 let has_new_body = if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer
630 {
631 http.request
632 .body
633 .as_ref()
634 .map(|b| !b.content.is_empty())
635 .unwrap_or(false)
636 } else {
637 false
638 };
639
640 if has_new_body {
641 if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
642 if let Some(b) = &http.request.body {
643 let bytes: Bytes = if b.encoding == "base64" {
644 base64::engine::general_purpose::STANDARD
645 .decode(&b.content)
646 .unwrap_or_default()
647 .into()
648 } else {
649 Bytes::from(b.content.clone())
650 };
651 Full::new(bytes)
652 .map_err(|e| -> BoxError { e.into() })
653 .boxed()
654 } else {
655 http_body_util::Empty::new()
656 .map_err(|_| -> BoxError { unreachable!() })
657 .boxed()
658 }
659 } else {
660 http_body_util::Empty::new()
661 .map_err(|_| -> BoxError { unreachable!() })
662 .boxed()
663 }
664 } else {
665 crate::streams::create_body_from_resource(&res)
666 }
667 } else if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
668 if let Some(b) = &http.request.body {
669 let bytes: Bytes = if b.encoding == "base64" {
670 base64::engine::general_purpose::STANDARD
671 .decode(&b.content)
672 .unwrap_or_default()
673 .into()
674 } else {
675 Bytes::from(b.content.clone())
676 };
677 Full::new(bytes)
678 .map_err(|e| -> BoxError { e.into() })
679 .boxed()
680 } else {
681 http_body_util::Empty::new()
682 .map_err(|_| -> BoxError { unreachable!() })
683 .boxed()
684 }
685 } else {
686 http_body_util::Empty::new()
687 .map_err(|_| -> BoxError { unreachable!() })
688 .boxed()
689 };
690
691 Ok((Some(modified_flow), RequestAction::Continue(new_body)))
692 }
693
694 async fn handle_on_response(
695 runtime: &mut JsRuntime,
696 flow: Flow,
697 body: HttpBody,
698 ) -> Result<(Option<Flow>, ResponseAction), String> {
699 let resource = HttpBodyResource::new(body);
700 let rid = {
701 let op_state_rc = runtime.op_state();
702 let mut state = op_state_rc.borrow_mut();
703 state.resource_table.add(resource)
704 };
705
706 let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
707
708 let check_code = "typeof globalThis.onResponse === 'function'";
709 let exists = runtime
710 .execute_script("check_onResponse", check_code)
711 .map_err(|e| {
712 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponse");
713 e.to_string()
714 })?;
715
716 let exists_bool = {
717 let scope = &mut runtime.handle_scope();
718 let exists_val = deno_core::v8::Local::new(scope, exists);
719 exists_val.is_true()
720 };
721
722 if !exists_bool {
723 let resource = {
724 let op_state_rc = runtime.op_state();
725 let mut state = op_state_rc.borrow_mut();
726 state.resource_table.take::<HttpBodyResource>(rid).ok()
727 };
728 if let Some(res) = resource {
729 let body = crate::streams::create_body_from_resource(&res);
730 return Ok((None, ResponseAction::Continue(body)));
731 } else {
732 return Ok((
733 None,
734 ResponseAction::Continue(
735 http_body_util::Empty::new()
736 .map_err(|_| -> BoxError { unreachable!() })
737 .boxed(),
738 ),
739 ));
740 }
741 }
742
743 let code = format!(
744 "globalThis.onResponse(new RelayBody({}), {})",
745 rid, flow_json
746 );
747 let result = runtime
748 .execute_script("call_onResponse", code)
749 .map_err(|e| {
750 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponse");
751 e.to_string()
752 })?;
753 let result = runtime.resolve(result).await.map_err(|e| {
754 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponse");
755 e.to_string()
756 })?;
757
758 let (is_empty, modified_flow) = {
759 let mut scope = runtime.handle_scope();
760 let result_val = deno_core::v8::Local::new(&mut scope, result);
761
762 if result_val.is_undefined() || result_val.is_null() {
763 (true, None)
764 } else {
765 let deser: Result<Flow, _> = deno_core::serde_v8::from_v8(&mut scope, result_val);
766 drop(scope);
767 match deser {
768 Ok(f) => (false, Some(f)),
769 Err(e) => {
770 let err_str = format!("Failed to deserialize flow: {}", e);
771 Self::try_call_on_error(runtime, &flow, &err_str, "onResponse");
772 return Err(err_str);
773 }
774 }
775 }
776 };
777
778 if is_empty {
779 let resource = {
780 let op_state_rc = runtime.op_state();
781 let mut state = op_state_rc.borrow_mut();
782 state.resource_table.take::<HttpBodyResource>(rid).ok()
783 };
784 if let Some(res) = resource {
785 let body = crate::streams::create_body_from_resource(&res);
786 return Ok((None, ResponseAction::Continue(body)));
787 } else {
788 return Ok((
789 None,
790 ResponseAction::Continue(
791 http_body_util::Empty::new()
792 .map_err(|_| -> BoxError { unreachable!() })
793 .boxed(),
794 ),
795 ));
796 }
797 }
798
799 let modified_flow = modified_flow.unwrap();
800
801 let resource = {
802 let op_state_rc = runtime.op_state();
803 let mut state = op_state_rc.borrow_mut();
804 state.resource_table.take::<HttpBodyResource>(rid).ok()
805 };
806
807 let new_body: HttpBody = if let Some(res) = resource {
808 let has_new_body = if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer
809 {
810 http.response
811 .as_ref()
812 .and_then(|r| r.body.as_ref())
813 .map(|b| !b.content.is_empty())
814 .unwrap_or(false)
815 } else {
816 false
817 };
818
819 if has_new_body {
820 if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
821 if let Some(resp) = &http.response {
822 if let Some(b) = &resp.body {
823 let bytes: Bytes = if b.encoding == "base64" {
824 base64::engine::general_purpose::STANDARD
825 .decode(&b.content)
826 .unwrap_or_default()
827 .into()
828 } else {
829 Bytes::from(b.content.clone())
830 };
831 Full::new(bytes)
832 .map_err(|e| -> BoxError { e.into() })
833 .boxed()
834 } else {
835 http_body_util::Empty::new()
836 .map_err(|_| -> BoxError { unreachable!() })
837 .boxed()
838 }
839 } else {
840 http_body_util::Empty::new()
841 .map_err(|_| -> BoxError { unreachable!() })
842 .boxed()
843 }
844 } else {
845 http_body_util::Empty::new()
846 .map_err(|_| -> BoxError { unreachable!() })
847 .boxed()
848 }
849 } else {
850 crate::streams::create_body_from_resource(&res)
851 }
852 } else if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
853 if let Some(resp) = &http.response {
854 if let Some(b) = &resp.body {
855 let bytes: Bytes = if b.encoding == "base64" {
856 base64::engine::general_purpose::STANDARD
857 .decode(&b.content)
858 .unwrap_or_default()
859 .into()
860 } else {
861 Bytes::from(b.content.clone())
862 };
863 Full::new(bytes)
864 .map_err(|e| -> BoxError { e.into() })
865 .boxed()
866 } else {
867 http_body_util::Empty::new()
868 .map_err(|_| -> BoxError { unreachable!() })
869 .boxed()
870 }
871 } else {
872 http_body_util::Empty::new()
873 .map_err(|_| -> BoxError { unreachable!() })
874 .boxed()
875 }
876 } else {
877 http_body_util::Empty::new()
878 .map_err(|_| -> BoxError { unreachable!() })
879 .boxed()
880 };
881
882 Ok((Some(modified_flow), ResponseAction::Continue(new_body)))
883 }
884
885 fn handle_on_websocket_message(
886 runtime: &mut JsRuntime,
887 flow: Flow,
888 message: WebSocketMessage,
889 ) -> Result<WebSocketMessageAction, String> {
890 let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
891 let message_json = serde_json::to_string(&message).map_err(|e| e.to_string())?;
892
893 let check_code = "typeof globalThis.onWebSocketMessage === 'function'";
894 let exists = runtime
895 .execute_script("check_onWebSocketMessage", check_code)
896 .map_err(|e| {
897 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onWebSocketMessage");
898 e.to_string()
899 })?;
900 {
901 let mut scope = runtime.handle_scope();
902 let exists_val = deno_core::v8::Local::new(&mut scope, exists);
903 if !exists_val.is_true() {
904 return Ok(WebSocketMessageAction::Continue(message));
905 }
906 }
907
908 let code = format!(
909 "globalThis.onWebSocketMessage({{}}, {}, {})",
910 flow_json, message_json
911 );
912 let result = runtime
913 .execute_script("call_onWebSocketMessage", code)
914 .map_err(|e| {
915 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onWebSocketMessage");
916 e.to_string()
917 })?;
918
919 let mut scope = runtime.handle_scope();
920 let result_val = deno_core::v8::Local::new(&mut scope, result);
921
922 if result_val.is_undefined() || result_val.is_null() {
923 return Ok(WebSocketMessageAction::Continue(message));
924 }
925
926 if result_val.is_string() {
927 let s = result_val.to_rust_string_lossy(&mut scope);
928 if s == "DROP" {
929 return Ok(WebSocketMessageAction::Drop);
930 }
931 }
932
933 let deser: Result<WebSocketMessage, _> =
934 deno_core::serde_v8::from_v8(&mut scope, result_val);
935 drop(scope);
936 let modified_message = match deser {
937 Ok(m) => m,
938 Err(e) => {
939 let err_str = format!("Failed to deserialize message: {}", e);
940 Self::try_call_on_error(runtime, &flow, &err_str, "onWebSocketMessage");
941 return Err(err_str);
942 }
943 };
944
945 Ok(WebSocketMessageAction::Continue(modified_message))
946 }
947}
948
949#[async_trait]
950impl ScriptEngineTrait for DenoScriptEngine {
951 async fn load_script(&mut self, script: &str) -> Result<(), BoxError> {
952 let (tx, rx) = oneshot::channel();
953 self.tx
954 .send(DenoCommand::LoadScript(script.to_string(), tx))
955 .await
956 .map_err(|e| Box::new(e) as BoxError)?;
957 rx.await
958 .map_err(|e| Box::new(e) as BoxError)?
959 .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)
960 }
961
962 async fn on_request_headers(&self, flow: &mut Flow) -> Result<Option<Flow>, BoxError> {
963 let (tx, rx) = oneshot::channel();
964 let flow_clone = flow.clone();
965 self.tx
966 .send(DenoCommand::OnRequestHeaders(flow_clone, tx))
967 .await
968 .map_err(|e| Box::new(e) as BoxError)?;
969 let res = rx
970 .await
971 .map_err(|e| Box::new(e) as BoxError)?
972 .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
973
974 if let Some(new_flow) = &res {
975 *flow = new_flow.clone();
976 }
977 Ok(res)
978 }
979
980 async fn on_request(&self, flow: &mut Flow, body: HttpBody) -> Result<RequestAction, BoxError> {
981 let (tx, rx) = oneshot::channel();
982 let flow_clone = flow.clone();
983 self.tx
984 .send(DenoCommand::OnRequest(flow_clone, body, tx))
985 .await
986 .map_err(|e| Box::new(e) as BoxError)?;
987 let (new_flow, action) = rx
988 .await
989 .map_err(|e| Box::new(e) as BoxError)?
990 .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
991
992 if let Some(f) = new_flow {
993 *flow = f;
994 }
995 Ok(action)
996 }
997
998 async fn on_response_headers(&self, flow: &mut Flow) -> Result<Option<Flow>, BoxError> {
999 let (tx, rx) = oneshot::channel();
1000 let flow_clone = flow.clone();
1001 self.tx
1002 .send(DenoCommand::OnResponseHeaders(flow_clone, tx))
1003 .await
1004 .map_err(|e| Box::new(e) as BoxError)?;
1005 let res = rx
1006 .await
1007 .map_err(|e| Box::new(e) as BoxError)?
1008 .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
1009
1010 if let Some(new_flow) = &res {
1011 *flow = new_flow.clone();
1012 }
1013 Ok(res)
1014 }
1015
1016 async fn on_response(
1017 &self,
1018 flow: &mut Flow,
1019 body: HttpBody,
1020 ) -> Result<ResponseAction, BoxError> {
1021 let (tx, rx) = oneshot::channel();
1022 let flow_clone = flow.clone();
1023 self.tx
1024 .send(DenoCommand::OnResponse(flow_clone, body, tx))
1025 .await
1026 .map_err(|e| Box::new(e) as BoxError)?;
1027 let (new_flow, action) = rx
1028 .await
1029 .map_err(|e| Box::new(e) as BoxError)?
1030 .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
1031
1032 if let Some(f) = new_flow {
1033 *flow = f;
1034 }
1035 Ok(action)
1036 }
1037
1038 async fn on_websocket_message(
1039 &self,
1040 _flow: &mut Flow,
1041 message: &mut WebSocketMessage,
1042 ) -> Result<WebSocketMessageAction, BoxError> {
1043 let (tx, rx) = oneshot::channel();
1044 let flow_clone = _flow.clone();
1045 let message_clone = message.clone();
1046 self.tx
1047 .send(DenoCommand::OnWebSocketMessage(
1048 flow_clone,
1049 message_clone,
1050 tx,
1051 ))
1052 .await
1053 .map_err(|e| Box::new(e) as BoxError)?;
1054 let res = rx
1055 .await
1056 .map_err(|e| Box::new(e) as BoxError)?
1057 .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
1058
1059 Ok(res)
1060 }
1061}