1use crate::engine_trait::ScriptEngineTrait;
2use crate::streams::HttpBodyResource;
3use async_trait::async_trait;
4use base64::Engine as _;
5use bytes::Bytes;
6use deno_core::{
7 Extension, JsRuntime, Op, OpState, ResourceId, RuntimeOptions, error::AnyError, op2,
8};
9use http_body_util::{BodyExt, Full};
10use relay_core_api::flow::{Flow, WebSocketMessage};
11use relay_core_lib::interceptor::{
12 BoxError, HttpBody, RequestAction, ResponseAction, WebSocketMessageAction,
13};
14use std::cell::RefCell;
15use std::collections::HashMap;
16use std::rc::Rc;
17use std::thread;
18use tokio::sync::{mpsc, oneshot};
19
20#[op2(fast)]
21fn op_log_level(#[string] level: String, #[string] msg: String) {
22 match level.as_str() {
23 "error" => tracing::error!("[User Script] {}", msg),
24 "warn" => tracing::warn!("[User Script] {}", msg),
25 "info" => tracing::info!("[User Script] {}", msg),
26 "debug" => tracing::debug!("[User Script] {}", msg),
27 _ => tracing::info!("[User Script] {}", msg),
28 }
29}
30
31#[op2(async)]
32#[serde]
33async fn op_read_body(
34 state: Rc<RefCell<OpState>>,
35 #[smi] rid: ResourceId,
36 #[smi] limit: usize,
37) -> Result<Vec<u8>, AnyError> {
38 let resource = {
39 let state = state.borrow();
40 state.resource_table.get_any(rid)?
41 };
42 let view = resource.read(limit).await?;
43 Ok(view.to_vec())
44}
45
46#[op2(fast)]
47fn op_close_body(state: &mut OpState, #[smi] rid: ResourceId) {
48 state.resource_table.take_any(rid).ok();
49}
50
51fn shared_state(state: &mut OpState) -> &mut HashMap<String, serde_json::Value> {
54 state.borrow_mut::<HashMap<String, serde_json::Value>>()
55}
56
57#[op2]
58#[serde]
59fn op_shared_state_get(state: &mut OpState, #[string] key: String) -> Option<serde_json::Value> {
60 shared_state(state).get(&key).cloned()
61}
62
63#[op2]
64fn op_shared_state_set(
65 state: &mut OpState,
66 #[string] key: String,
67 #[serde] value: serde_json::Value,
68) {
69 shared_state(state).insert(key, value);
70}
71
72#[op2(fast)]
73fn op_shared_state_delete(state: &mut OpState, #[string] key: String) -> bool {
74 shared_state(state).remove(&key).is_some()
75}
76
77#[op2(fast)]
78fn op_shared_state_clear(state: &mut OpState) {
79 shared_state(state).clear();
80}
81
82#[op2]
83#[serde]
84fn op_shared_state_keys(state: &mut OpState) -> Vec<String> {
85 shared_state(state).keys().cloned().collect()
86}
87
88enum DenoCommand {
89 LoadScript(String, oneshot::Sender<Result<(), String>>),
90 OnRequestHeaders(Flow, oneshot::Sender<Result<Option<Flow>, String>>),
91 OnRequest(
92 Flow,
93 HttpBody,
94 oneshot::Sender<Result<(Option<Flow>, RequestAction), String>>,
95 ),
96 OnResponseHeaders(Flow, oneshot::Sender<Result<Option<Flow>, String>>),
97 OnResponse(
98 Flow,
99 HttpBody,
100 oneshot::Sender<Result<(Option<Flow>, ResponseAction), String>>,
101 ),
102 OnWebSocketMessage(
103 Flow,
104 WebSocketMessage,
105 oneshot::Sender<Result<WebSocketMessageAction, String>>,
106 ),
107}
108
109#[derive(Clone)]
110pub struct DenoScriptEngine {
111 tx: mpsc::Sender<DenoCommand>,
112}
113
114impl Default for DenoScriptEngine {
115 fn default() -> Self {
116 Self::new()
117 }
118}
119
120impl DenoScriptEngine {
121 pub fn new() -> Self {
122 let (tx, mut rx) = mpsc::channel(32);
123
124 thread::spawn(move || {
125 let rt = tokio::runtime::Builder::new_current_thread()
126 .enable_all()
127 .build()
128 .unwrap();
129
130 rt.block_on(async move {
131 let ext = Extension {
132 name: "relay_core",
133 ops: std::borrow::Cow::Borrowed(&[
134 op_log_level::DECL,
135 op_read_body::DECL,
136 op_close_body::DECL,
137 op_shared_state_get::DECL,
138 op_shared_state_set::DECL,
139 op_shared_state_delete::DECL,
140 op_shared_state_clear::DECL,
141 op_shared_state_keys::DECL,
142 ]),
143 op_state_fn: Some(Box::new(|state| {
144 state.put(HashMap::<String, serde_json::Value>::new());
145 })),
146 ..Default::default()
147 };
148
149 let mut js_runtime = JsRuntime::new(RuntimeOptions {
150 extensions: vec![ext],
151 ..Default::default()
152 });
153
154 let bootstrap = r#"
156 globalThis.console = {
157 log: (...args) => {
158 Deno.core.ops.op_log_level("log", _format(args));
159 },
160 info: (...args) => {
161 Deno.core.ops.op_log_level("info", _format(args));
162 },
163 warn: (...args) => {
164 Deno.core.ops.op_log_level("warn", _format(args));
165 },
166 error: (...args) => {
167 Deno.core.ops.op_log_level("error", _format(args));
168 },
169 debug: (...args) => {
170 Deno.core.ops.op_log_level("debug", _format(args));
171 },
172 };
173
174 function _format(args) {
175 return args.map(arg => {
176 if (typeof arg === 'object') {
177 try { return JSON.stringify(arg); }
178 catch { return String(arg); }
179 }
180 return String(arg);
181 }).join(" ");
182 }
183
184 class RelayBody {
185 constructor(rid) { this.rid = rid; }
186 async read(limit) {
187 return await Deno.core.ops.op_read_body(this.rid, limit || 65536);
188 }
189 close() {
190 Deno.core.ops.op_close_body(this.rid);
191 }
192 async text() {
193 const bytes = await this.read(10 * 1024 * 1024);
194 return new TextDecoder().decode(bytes);
195 }
196 async json() {
197 const txt = await this.text();
198 return JSON.parse(txt);
199 }
200 }
201 globalThis.RelayBody = RelayBody;
202
203 globalThis.relay = {
204 log: globalThis.console.log,
205 };
206
207 // S1: sharedState — cross-hook shared map per isolate
208 globalThis.sharedState = {
209 get(key) {
210 return Deno.core.ops.op_shared_state_get(key);
211 },
212 set(key, value) {
213 Deno.core.ops.op_shared_state_set(key, value);
214 },
215 delete(key) {
216 return Deno.core.ops.op_shared_state_delete(key);
217 },
218 clear() {
219 Deno.core.ops.op_shared_state_clear();
220 },
221 keys() {
222 return Deno.core.ops.op_shared_state_keys();
223 },
224 };
225 "#;
226 js_runtime.execute_script("bootstrap", bootstrap).unwrap();
227
228 while let Some(cmd) = rx.recv().await {
229 match cmd {
230 DenoCommand::LoadScript(script, resp) => {
231 let res = js_runtime.execute_script("<anon>", script);
232 let res = if let Err(e) = res {
233 Err(e.to_string())
234 } else {
235 js_runtime
236 .run_event_loop(Default::default())
237 .await
238 .map(|_| ())
239 .map_err(|e| e.to_string())
240 };
241 let _ = resp.send(res);
242 }
243 DenoCommand::OnRequestHeaders(flow, resp) => {
244 let res = Self::handle_on_request_headers(&mut js_runtime, flow);
245 let _ = resp.send(res);
246 }
247 DenoCommand::OnRequest(flow, body, resp) => {
248 let res = Self::handle_on_request(&mut js_runtime, flow, body).await;
249 let _ = resp.send(res);
250 }
251 DenoCommand::OnResponseHeaders(flow, resp) => {
252 let res = Self::handle_on_response_headers(&mut js_runtime, flow);
253 let _ = resp.send(res);
254 }
255 DenoCommand::OnResponse(flow, body, resp) => {
256 let res = Self::handle_on_response(&mut js_runtime, flow, body).await;
257 let _ = resp.send(res);
258 }
259 DenoCommand::OnWebSocketMessage(flow, message, resp) => {
260 let res =
261 Self::handle_on_websocket_message(&mut js_runtime, flow, message);
262 let _ = resp.send(res);
263 }
264 }
265 }
266 });
267 });
268
269 Self { tx }
270 }
271
272 fn try_call_on_error(runtime: &mut JsRuntime, flow: &Flow, error: &str, stage: &str) {
275 let check_code = "typeof globalThis.onError === 'function'";
276 let exists = runtime
277 .execute_script("check_onError_v2", check_code)
278 .ok()
279 .map(|v| {
280 let mut scope = runtime.handle_scope();
281 let val = deno_core::v8::Local::new(&mut scope, v);
282 val.is_true()
283 })
284 .unwrap_or(false);
285
286 if !exists {
287 return;
288 }
289
290 let flow_json = match serde_json::to_string(flow) {
291 Ok(j) => j,
292 Err(_) => return,
293 };
294 let error_escaped = error.replace('\\', "\\\\").replace('\'', "\\'");
295 let code = format!(
296 "globalThis.onError({{}}, {}, '{}', '{}')",
297 flow_json, error_escaped, stage
298 );
299
300 let _ = runtime.execute_script("call_onError_v2", code);
301 }
302
303 fn handle_on_request_headers(
304 runtime: &mut JsRuntime,
305 flow: Flow,
306 ) -> Result<Option<Flow>, String> {
307 let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
308 let check_code = "typeof globalThis.onRequestHeaders === 'function'";
309 let exists = runtime
310 .execute_script("check_onRequestHeaders", check_code)
311 .map_err(|e| {
312 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequestHeaders");
313 e.to_string()
314 })?;
315 {
316 let mut scope = runtime.handle_scope();
317 let exists_val = deno_core::v8::Local::new(&mut scope, exists);
318 if !exists_val.is_true() {
319 return Ok(None);
320 }
321 }
322 let code = format!("globalThis.onRequestHeaders({{}}, {})", flow_json);
323 let result = runtime
324 .execute_script("call_onRequestHeaders", code)
325 .map_err(|e| {
326 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequestHeaders");
327 e.to_string()
328 })?;
329 let mut scope = runtime.handle_scope();
330 let result_val = deno_core::v8::Local::new(&mut scope, result);
331 if result_val.is_undefined() || result_val.is_null() {
332 return Ok(None);
333 }
334 let deser: Result<Flow, _> = deno_core::serde_v8::from_v8(&mut scope, result_val);
335 drop(scope);
336 let modified_flow = match deser {
337 Ok(f) => f,
338 Err(e) => {
339 let err_str = format!("Failed to deserialize flow: {}", e);
340 Self::try_call_on_error(runtime, &flow, &err_str, "onRequestHeaders");
341 return Err(err_str);
342 }
343 };
344 Ok(Some(modified_flow))
345 }
346
347 fn handle_on_response_headers(
348 runtime: &mut JsRuntime,
349 flow: Flow,
350 ) -> Result<Option<Flow>, String> {
351 let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
352 let check_code = "typeof globalThis.onResponseHeaders === 'function'";
353 let exists = runtime
354 .execute_script("check_onResponseHeaders", check_code)
355 .map_err(|e| {
356 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponseHeaders");
357 e.to_string()
358 })?;
359 {
360 let mut scope = runtime.handle_scope();
361 let exists_val = deno_core::v8::Local::new(&mut scope, exists);
362 if !exists_val.is_true() {
363 return Ok(None);
364 }
365 }
366 let code = format!("globalThis.onResponseHeaders({{}}, {})", flow_json);
367 let result = runtime
368 .execute_script("call_onResponseHeaders", code)
369 .map_err(|e| {
370 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponseHeaders");
371 e.to_string()
372 })?;
373 let mut scope = runtime.handle_scope();
374 let result_val = deno_core::v8::Local::new(&mut scope, result);
375 if result_val.is_undefined() || result_val.is_null() {
376 return Ok(None);
377 }
378 let deser: Result<Flow, _> = deno_core::serde_v8::from_v8(&mut scope, result_val);
379 drop(scope);
380 let modified_flow = match deser {
381 Ok(f) => f,
382 Err(e) => {
383 let err_str = format!("Failed to deserialize flow: {}", e);
384 Self::try_call_on_error(runtime, &flow, &err_str, "onResponseHeaders");
385 return Err(err_str);
386 }
387 };
388 Ok(Some(modified_flow))
389 }
390
391 async fn handle_on_request(
392 runtime: &mut JsRuntime,
393 flow: Flow,
394 body: HttpBody,
395 ) -> Result<(Option<Flow>, RequestAction), String> {
396 let resource = HttpBodyResource::new(body);
397 let rid = {
398 let op_state_rc = runtime.op_state();
399 let mut state = op_state_rc.borrow_mut();
400 state.resource_table.add(resource)
401 };
402
403 let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
404
405 let check_code = "typeof globalThis.onRequest === 'function'";
406 let exists = runtime
407 .execute_script("check_onRequest", check_code)
408 .map_err(|e| {
409 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequest");
410 e.to_string()
411 })?;
412
413 let exists_bool = {
414 let scope = &mut runtime.handle_scope();
415 let exists_val = deno_core::v8::Local::new(scope, exists);
416 exists_val.is_true()
417 };
418
419 if !exists_bool {
420 let resource = {
421 let op_state_rc = runtime.op_state();
422 let mut state = op_state_rc.borrow_mut();
423 state.resource_table.take::<HttpBodyResource>(rid).ok()
424 };
425 if let Some(res) = resource {
426 let body = crate::streams::create_body_from_resource(&res);
427 return Ok((None, RequestAction::Continue(body)));
428 } else {
429 return Ok((
430 None,
431 RequestAction::Continue(
432 http_body_util::Empty::new()
433 .map_err(|_| -> BoxError { unreachable!() })
434 .boxed(),
435 ),
436 ));
437 }
438 }
439
440 let code = format!(
441 "globalThis.onRequest(new RelayBody({}), {})",
442 rid, flow_json
443 );
444 let result = runtime
445 .execute_script("call_onRequest", code)
446 .map_err(|e| {
447 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequest");
448 e.to_string()
449 })?;
450
451 let result = runtime.resolve(result).await.map_err(|e| {
452 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequest");
453 e.to_string()
454 })?;
455
456 let (is_empty, modified_flow) = {
457 let mut scope = runtime.handle_scope();
458 let result_val = deno_core::v8::Local::new(&mut scope, result);
459
460 if result_val.is_undefined() || result_val.is_null() {
461 (true, None)
462 } else {
463 let deser: Result<Flow, _> = deno_core::serde_v8::from_v8(&mut scope, result_val);
464 drop(scope);
465 match deser {
466 Ok(f) => (false, Some(f)),
467 Err(e) => {
468 let err_str = format!("Failed to deserialize flow: {}", e);
469 Self::try_call_on_error(runtime, &flow, &err_str, "onRequest");
470 return Err(err_str);
471 }
472 }
473 }
474 };
475
476 if is_empty {
477 let resource = {
478 let op_state_rc = runtime.op_state();
479 let mut state = op_state_rc.borrow_mut();
480 state.resource_table.take::<HttpBodyResource>(rid).ok()
481 };
482 if let Some(res) = resource {
483 let body = crate::streams::create_body_from_resource(&res);
484 return Ok((None, RequestAction::Continue(body)));
485 } else {
486 return Ok((
487 None,
488 RequestAction::Continue(
489 http_body_util::Empty::new()
490 .map_err(|_| -> BoxError { unreachable!() })
491 .boxed(),
492 ),
493 ));
494 }
495 }
496
497 let modified_flow = modified_flow.unwrap();
498
499 let resource = {
500 let op_state_rc = runtime.op_state();
501 let mut state = op_state_rc.borrow_mut();
502 state.resource_table.take::<HttpBodyResource>(rid).ok()
503 };
504
505 let new_body: HttpBody = if let Some(res) = resource {
506 let has_new_body = if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer
507 {
508 http.request
509 .body
510 .as_ref()
511 .map(|b| !b.content.is_empty())
512 .unwrap_or(false)
513 } else {
514 false
515 };
516
517 if has_new_body {
518 if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
519 if let Some(b) = &http.request.body {
520 let bytes: Bytes = if b.encoding == "base64" {
521 base64::engine::general_purpose::STANDARD
522 .decode(&b.content)
523 .unwrap_or_default()
524 .into()
525 } else {
526 Bytes::from(b.content.clone())
527 };
528 Full::new(bytes)
529 .map_err(|e| -> BoxError { e.into() })
530 .boxed()
531 } else {
532 http_body_util::Empty::new()
533 .map_err(|_| -> BoxError { unreachable!() })
534 .boxed()
535 }
536 } else {
537 http_body_util::Empty::new()
538 .map_err(|_| -> BoxError { unreachable!() })
539 .boxed()
540 }
541 } else {
542 crate::streams::create_body_from_resource(&res)
543 }
544 } else if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
545 if let Some(b) = &http.request.body {
546 let bytes: Bytes = if b.encoding == "base64" {
547 base64::engine::general_purpose::STANDARD
548 .decode(&b.content)
549 .unwrap_or_default()
550 .into()
551 } else {
552 Bytes::from(b.content.clone())
553 };
554 Full::new(bytes)
555 .map_err(|e| -> BoxError { e.into() })
556 .boxed()
557 } else {
558 http_body_util::Empty::new()
559 .map_err(|_| -> BoxError { unreachable!() })
560 .boxed()
561 }
562 } else {
563 http_body_util::Empty::new()
564 .map_err(|_| -> BoxError { unreachable!() })
565 .boxed()
566 };
567
568 Ok((Some(modified_flow), RequestAction::Continue(new_body)))
569 }
570
571 async fn handle_on_response(
572 runtime: &mut JsRuntime,
573 flow: Flow,
574 body: HttpBody,
575 ) -> Result<(Option<Flow>, ResponseAction), String> {
576 let resource = HttpBodyResource::new(body);
577 let rid = {
578 let op_state_rc = runtime.op_state();
579 let mut state = op_state_rc.borrow_mut();
580 state.resource_table.add(resource)
581 };
582
583 let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
584
585 let check_code = "typeof globalThis.onResponse === 'function'";
586 let exists = runtime
587 .execute_script("check_onResponse", check_code)
588 .map_err(|e| {
589 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponse");
590 e.to_string()
591 })?;
592
593 let exists_bool = {
594 let scope = &mut runtime.handle_scope();
595 let exists_val = deno_core::v8::Local::new(scope, exists);
596 exists_val.is_true()
597 };
598
599 if !exists_bool {
600 let resource = {
601 let op_state_rc = runtime.op_state();
602 let mut state = op_state_rc.borrow_mut();
603 state.resource_table.take::<HttpBodyResource>(rid).ok()
604 };
605 if let Some(res) = resource {
606 let body = crate::streams::create_body_from_resource(&res);
607 return Ok((None, ResponseAction::Continue(body)));
608 } else {
609 return Ok((
610 None,
611 ResponseAction::Continue(
612 http_body_util::Empty::new()
613 .map_err(|_| -> BoxError { unreachable!() })
614 .boxed(),
615 ),
616 ));
617 }
618 }
619
620 let code = format!(
621 "globalThis.onResponse(new RelayBody({}), {})",
622 rid, flow_json
623 );
624 let result = runtime
625 .execute_script("call_onResponse", code)
626 .map_err(|e| {
627 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponse");
628 e.to_string()
629 })?;
630 let result = runtime.resolve(result).await.map_err(|e| {
631 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponse");
632 e.to_string()
633 })?;
634
635 let (is_empty, modified_flow) = {
636 let mut scope = runtime.handle_scope();
637 let result_val = deno_core::v8::Local::new(&mut scope, result);
638
639 if result_val.is_undefined() || result_val.is_null() {
640 (true, None)
641 } else {
642 let deser: Result<Flow, _> = deno_core::serde_v8::from_v8(&mut scope, result_val);
643 drop(scope);
644 match deser {
645 Ok(f) => (false, Some(f)),
646 Err(e) => {
647 let err_str = format!("Failed to deserialize flow: {}", e);
648 Self::try_call_on_error(runtime, &flow, &err_str, "onResponse");
649 return Err(err_str);
650 }
651 }
652 }
653 };
654
655 if is_empty {
656 let resource = {
657 let op_state_rc = runtime.op_state();
658 let mut state = op_state_rc.borrow_mut();
659 state.resource_table.take::<HttpBodyResource>(rid).ok()
660 };
661 if let Some(res) = resource {
662 let body = crate::streams::create_body_from_resource(&res);
663 return Ok((None, ResponseAction::Continue(body)));
664 } else {
665 return Ok((
666 None,
667 ResponseAction::Continue(
668 http_body_util::Empty::new()
669 .map_err(|_| -> BoxError { unreachable!() })
670 .boxed(),
671 ),
672 ));
673 }
674 }
675
676 let modified_flow = modified_flow.unwrap();
677
678 let resource = {
679 let op_state_rc = runtime.op_state();
680 let mut state = op_state_rc.borrow_mut();
681 state.resource_table.take::<HttpBodyResource>(rid).ok()
682 };
683
684 let new_body: HttpBody = if let Some(res) = resource {
685 let has_new_body = if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer
686 {
687 http.response
688 .as_ref()
689 .and_then(|r| r.body.as_ref())
690 .map(|b| !b.content.is_empty())
691 .unwrap_or(false)
692 } else {
693 false
694 };
695
696 if has_new_body {
697 if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
698 if let Some(resp) = &http.response {
699 if let Some(b) = &resp.body {
700 let bytes: Bytes = if b.encoding == "base64" {
701 base64::engine::general_purpose::STANDARD
702 .decode(&b.content)
703 .unwrap_or_default()
704 .into()
705 } else {
706 Bytes::from(b.content.clone())
707 };
708 Full::new(bytes)
709 .map_err(|e| -> BoxError { e.into() })
710 .boxed()
711 } else {
712 http_body_util::Empty::new()
713 .map_err(|_| -> BoxError { unreachable!() })
714 .boxed()
715 }
716 } else {
717 http_body_util::Empty::new()
718 .map_err(|_| -> BoxError { unreachable!() })
719 .boxed()
720 }
721 } else {
722 http_body_util::Empty::new()
723 .map_err(|_| -> BoxError { unreachable!() })
724 .boxed()
725 }
726 } else {
727 crate::streams::create_body_from_resource(&res)
728 }
729 } else if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
730 if let Some(resp) = &http.response {
731 if let Some(b) = &resp.body {
732 let bytes: Bytes = if b.encoding == "base64" {
733 base64::engine::general_purpose::STANDARD
734 .decode(&b.content)
735 .unwrap_or_default()
736 .into()
737 } else {
738 Bytes::from(b.content.clone())
739 };
740 Full::new(bytes)
741 .map_err(|e| -> BoxError { e.into() })
742 .boxed()
743 } else {
744 http_body_util::Empty::new()
745 .map_err(|_| -> BoxError { unreachable!() })
746 .boxed()
747 }
748 } else {
749 http_body_util::Empty::new()
750 .map_err(|_| -> BoxError { unreachable!() })
751 .boxed()
752 }
753 } else {
754 http_body_util::Empty::new()
755 .map_err(|_| -> BoxError { unreachable!() })
756 .boxed()
757 };
758
759 Ok((Some(modified_flow), ResponseAction::Continue(new_body)))
760 }
761
762 fn handle_on_websocket_message(
763 runtime: &mut JsRuntime,
764 flow: Flow,
765 message: WebSocketMessage,
766 ) -> Result<WebSocketMessageAction, String> {
767 let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
768 let message_json = serde_json::to_string(&message).map_err(|e| e.to_string())?;
769
770 let check_code = "typeof globalThis.onWebSocketMessage === 'function'";
771 let exists = runtime
772 .execute_script("check_onWebSocketMessage", check_code)
773 .map_err(|e| {
774 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onWebSocketMessage");
775 e.to_string()
776 })?;
777 {
778 let mut scope = runtime.handle_scope();
779 let exists_val = deno_core::v8::Local::new(&mut scope, exists);
780 if !exists_val.is_true() {
781 return Ok(WebSocketMessageAction::Continue(message));
782 }
783 }
784
785 let code = format!(
786 "globalThis.onWebSocketMessage({{}}, {}, {})",
787 flow_json, message_json
788 );
789 let result = runtime
790 .execute_script("call_onWebSocketMessage", code)
791 .map_err(|e| {
792 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onWebSocketMessage");
793 e.to_string()
794 })?;
795
796 let mut scope = runtime.handle_scope();
797 let result_val = deno_core::v8::Local::new(&mut scope, result);
798
799 if result_val.is_undefined() || result_val.is_null() {
800 return Ok(WebSocketMessageAction::Continue(message));
801 }
802
803 if result_val.is_string() {
804 let s = result_val.to_rust_string_lossy(&mut scope);
805 if s == "DROP" {
806 return Ok(WebSocketMessageAction::Drop);
807 }
808 }
809
810 let deser: Result<WebSocketMessage, _> =
811 deno_core::serde_v8::from_v8(&mut scope, result_val);
812 drop(scope);
813 let modified_message = match deser {
814 Ok(m) => m,
815 Err(e) => {
816 let err_str = format!("Failed to deserialize message: {}", e);
817 Self::try_call_on_error(runtime, &flow, &err_str, "onWebSocketMessage");
818 return Err(err_str);
819 }
820 };
821
822 Ok(WebSocketMessageAction::Continue(modified_message))
823 }
824}
825
826#[async_trait]
827impl ScriptEngineTrait for DenoScriptEngine {
828 async fn load_script(&mut self, script: &str) -> Result<(), BoxError> {
829 let (tx, rx) = oneshot::channel();
830 self.tx
831 .send(DenoCommand::LoadScript(script.to_string(), tx))
832 .await
833 .map_err(|e| Box::new(e) as BoxError)?;
834 rx.await
835 .map_err(|e| Box::new(e) as BoxError)?
836 .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)
837 }
838
839 async fn on_request_headers(&self, flow: &mut Flow) -> Result<Option<Flow>, BoxError> {
840 let (tx, rx) = oneshot::channel();
841 let flow_clone = flow.clone();
842 self.tx
843 .send(DenoCommand::OnRequestHeaders(flow_clone, tx))
844 .await
845 .map_err(|e| Box::new(e) as BoxError)?;
846 let res = rx
847 .await
848 .map_err(|e| Box::new(e) as BoxError)?
849 .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
850
851 if let Some(new_flow) = &res {
852 *flow = new_flow.clone();
853 }
854 Ok(res)
855 }
856
857 async fn on_request(&self, flow: &mut Flow, body: HttpBody) -> Result<RequestAction, BoxError> {
858 let (tx, rx) = oneshot::channel();
859 let flow_clone = flow.clone();
860 self.tx
861 .send(DenoCommand::OnRequest(flow_clone, body, tx))
862 .await
863 .map_err(|e| Box::new(e) as BoxError)?;
864 let (new_flow, action) = rx
865 .await
866 .map_err(|e| Box::new(e) as BoxError)?
867 .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
868
869 if let Some(f) = new_flow {
870 *flow = f;
871 }
872 Ok(action)
873 }
874
875 async fn on_response_headers(&self, flow: &mut Flow) -> Result<Option<Flow>, BoxError> {
876 let (tx, rx) = oneshot::channel();
877 let flow_clone = flow.clone();
878 self.tx
879 .send(DenoCommand::OnResponseHeaders(flow_clone, tx))
880 .await
881 .map_err(|e| Box::new(e) as BoxError)?;
882 let res = rx
883 .await
884 .map_err(|e| Box::new(e) as BoxError)?
885 .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
886
887 if let Some(new_flow) = &res {
888 *flow = new_flow.clone();
889 }
890 Ok(res)
891 }
892
893 async fn on_response(
894 &self,
895 flow: &mut Flow,
896 body: HttpBody,
897 ) -> Result<ResponseAction, BoxError> {
898 let (tx, rx) = oneshot::channel();
899 let flow_clone = flow.clone();
900 self.tx
901 .send(DenoCommand::OnResponse(flow_clone, body, tx))
902 .await
903 .map_err(|e| Box::new(e) as BoxError)?;
904 let (new_flow, action) = rx
905 .await
906 .map_err(|e| Box::new(e) as BoxError)?
907 .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
908
909 if let Some(f) = new_flow {
910 *flow = f;
911 }
912 Ok(action)
913 }
914
915 async fn on_websocket_message(
916 &self,
917 _flow: &mut Flow,
918 message: &mut WebSocketMessage,
919 ) -> Result<WebSocketMessageAction, BoxError> {
920 let (tx, rx) = oneshot::channel();
921 let flow_clone = _flow.clone();
922 let message_clone = message.clone();
923 self.tx
924 .send(DenoCommand::OnWebSocketMessage(
925 flow_clone,
926 message_clone,
927 tx,
928 ))
929 .await
930 .map_err(|e| Box::new(e) as BoxError)?;
931 let res = rx
932 .await
933 .map_err(|e| Box::new(e) as BoxError)?
934 .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
935
936 Ok(res)
937 }
938}