Skip to main content

relay_core_script/
deno_engine.rs

1use crate::engine_trait::ScriptEngineTrait;
2use crate::streams::HttpBodyResource;
3use async_trait::async_trait;
4use base64::Engine as _;
5use bytes::Bytes;
6use deno_core::{
7    Extension, JsRuntime, Op, OpState, ResourceId, RuntimeOptions, error::AnyError, op2,
8};
9use http_body_util::{BodyExt, Full};
10use relay_core_api::flow::{Flow, WebSocketMessage};
11use relay_core_lib::interceptor::{
12    BoxError, HttpBody, RequestAction, ResponseAction, WebSocketMessageAction,
13};
14use std::cell::RefCell;
15use std::collections::HashMap;
16use std::rc::Rc;
17use std::thread;
18use tokio::sync::{mpsc, oneshot};
19
20#[op2(fast)]
21fn op_log_level(#[string] level: String, #[string] msg: String) {
22    match level.as_str() {
23        "error" => tracing::error!("[User Script] {}", msg),
24        "warn" => tracing::warn!("[User Script] {}", msg),
25        "info" => tracing::info!("[User Script] {}", msg),
26        "debug" => tracing::debug!("[User Script] {}", msg),
27        _ => tracing::info!("[User Script] {}", msg),
28    }
29}
30
31#[op2(async)]
32#[serde]
33async fn op_read_body(
34    state: Rc<RefCell<OpState>>,
35    #[smi] rid: ResourceId,
36    #[smi] limit: usize,
37) -> Result<Vec<u8>, AnyError> {
38    let resource = {
39        let state = state.borrow();
40        state.resource_table.get_any(rid)?
41    };
42    let view = resource.read(limit).await?;
43    Ok(view.to_vec())
44}
45
46#[op2(fast)]
47fn op_close_body(state: &mut OpState, #[smi] rid: ResourceId) {
48    state.resource_table.take_any(rid).ok();
49}
50
51// ── S1: sharedState ops ────────────────────────────────────────
52
53fn shared_state(state: &mut OpState) -> &mut HashMap<String, serde_json::Value> {
54    state.borrow_mut::<HashMap<String, serde_json::Value>>()
55}
56
57#[op2]
58#[serde]
59fn op_shared_state_get(state: &mut OpState, #[string] key: String) -> Option<serde_json::Value> {
60    shared_state(state).get(&key).cloned()
61}
62
63#[op2]
64fn op_shared_state_set(
65    state: &mut OpState,
66    #[string] key: String,
67    #[serde] value: serde_json::Value,
68) {
69    shared_state(state).insert(key, value);
70}
71
72#[op2(fast)]
73fn op_shared_state_delete(state: &mut OpState, #[string] key: String) -> bool {
74    shared_state(state).remove(&key).is_some()
75}
76
77#[op2(fast)]
78fn op_shared_state_clear(state: &mut OpState) {
79    shared_state(state).clear();
80}
81
82#[op2]
83#[serde]
84fn op_shared_state_keys(state: &mut OpState) -> Vec<String> {
85    shared_state(state).keys().cloned().collect()
86}
87
88enum DenoCommand {
89    LoadScript(String, oneshot::Sender<Result<(), String>>),
90    OnRequestHeaders(Flow, oneshot::Sender<Result<Option<Flow>, String>>),
91    OnRequest(
92        Flow,
93        HttpBody,
94        oneshot::Sender<Result<(Option<Flow>, RequestAction), String>>,
95    ),
96    OnResponseHeaders(Flow, oneshot::Sender<Result<Option<Flow>, String>>),
97    OnResponse(
98        Flow,
99        HttpBody,
100        oneshot::Sender<Result<(Option<Flow>, ResponseAction), String>>,
101    ),
102    OnWebSocketMessage(
103        Flow,
104        WebSocketMessage,
105        oneshot::Sender<Result<WebSocketMessageAction, String>>,
106    ),
107}
108
109#[derive(Clone)]
110pub struct DenoScriptEngine {
111    tx: mpsc::Sender<DenoCommand>,
112}
113
114impl Default for DenoScriptEngine {
115    fn default() -> Self {
116        Self::new()
117    }
118}
119
120impl DenoScriptEngine {
121    pub fn new() -> Self {
122        let (tx, mut rx) = mpsc::channel(32);
123
124        thread::spawn(move || {
125            let rt = tokio::runtime::Builder::new_current_thread()
126                .enable_all()
127                .build()
128                .unwrap();
129
130            rt.block_on(async move {
131                let ext = Extension {
132                    name: "relay_core",
133                    ops: std::borrow::Cow::Borrowed(&[
134                        op_log_level::DECL,
135                        op_read_body::DECL,
136                        op_close_body::DECL,
137                        op_shared_state_get::DECL,
138                        op_shared_state_set::DECL,
139                        op_shared_state_delete::DECL,
140                        op_shared_state_clear::DECL,
141                        op_shared_state_keys::DECL,
142                    ]),
143                    op_state_fn: Some(Box::new(|state| {
144                        state.put(HashMap::<String, serde_json::Value>::new());
145                    })),
146                    ..Default::default()
147                };
148
149                let mut js_runtime = JsRuntime::new(RuntimeOptions {
150                    extensions: vec![ext],
151                    ..Default::default()
152                });
153
154                // Bootstrap JS — S1/S3: sharedState + console levels
155                let bootstrap = r#"
156                    globalThis.console = {
157                        log: (...args) => {
158                            Deno.core.ops.op_log_level("log", _format(args));
159                        },
160                        info: (...args) => {
161                            Deno.core.ops.op_log_level("info", _format(args));
162                        },
163                        warn: (...args) => {
164                            Deno.core.ops.op_log_level("warn", _format(args));
165                        },
166                        error: (...args) => {
167                            Deno.core.ops.op_log_level("error", _format(args));
168                        },
169                        debug: (...args) => {
170                            Deno.core.ops.op_log_level("debug", _format(args));
171                        },
172                    };
173
174                    function _format(args) {
175                        return args.map(arg => {
176                            if (typeof arg === 'object') {
177                                try { return JSON.stringify(arg); }
178                                catch { return String(arg); }
179                            }
180                            return String(arg);
181                        }).join(" ");
182                    }
183
184                    class RelayBody {
185                        constructor(rid) { this.rid = rid; }
186                        async read(limit) {
187                            return await Deno.core.ops.op_read_body(this.rid, limit || 65536);
188                        }
189                        close() {
190                            Deno.core.ops.op_close_body(this.rid);
191                        }
192                        async text() {
193                            const bytes = await this.read(10 * 1024 * 1024);
194                            return new TextDecoder().decode(bytes);
195                        }
196                        async json() {
197                            const txt = await this.text();
198                            return JSON.parse(txt);
199                        }
200                    }
201                    globalThis.RelayBody = RelayBody;
202
203                    globalThis.relay = {
204                        log: globalThis.console.log,
205                    };
206
207                    // S1: sharedState — cross-hook shared map per isolate
208                    globalThis.sharedState = {
209                        get(key) {
210                            return Deno.core.ops.op_shared_state_get(key);
211                        },
212                        set(key, value) {
213                            Deno.core.ops.op_shared_state_set(key, value);
214                        },
215                        delete(key) {
216                            return Deno.core.ops.op_shared_state_delete(key);
217                        },
218                        clear() {
219                            Deno.core.ops.op_shared_state_clear();
220                        },
221                        keys() {
222                            return Deno.core.ops.op_shared_state_keys();
223                        },
224                    };
225                "#;
226                js_runtime.execute_script("bootstrap", bootstrap).unwrap();
227
228                while let Some(cmd) = rx.recv().await {
229                    match cmd {
230                        DenoCommand::LoadScript(script, resp) => {
231                            let res = js_runtime.execute_script("<anon>", script);
232                            let res = if let Err(e) = res {
233                                Err(e.to_string())
234                            } else {
235                                js_runtime
236                                    .run_event_loop(Default::default())
237                                    .await
238                                    .map(|_| ())
239                                    .map_err(|e| e.to_string())
240                            };
241                            let _ = resp.send(res);
242                        }
243                        DenoCommand::OnRequestHeaders(flow, resp) => {
244                            let res = Self::handle_on_request_headers(&mut js_runtime, flow);
245                            let _ = resp.send(res);
246                        }
247                        DenoCommand::OnRequest(flow, body, resp) => {
248                            let res = Self::handle_on_request(&mut js_runtime, flow, body).await;
249                            let _ = resp.send(res);
250                        }
251                        DenoCommand::OnResponseHeaders(flow, resp) => {
252                            let res = Self::handle_on_response_headers(&mut js_runtime, flow);
253                            let _ = resp.send(res);
254                        }
255                        DenoCommand::OnResponse(flow, body, resp) => {
256                            let res = Self::handle_on_response(&mut js_runtime, flow, body).await;
257                            let _ = resp.send(res);
258                        }
259                        DenoCommand::OnWebSocketMessage(flow, message, resp) => {
260                            let res =
261                                Self::handle_on_websocket_message(&mut js_runtime, flow, message);
262                            let _ = resp.send(res);
263                        }
264                    }
265                }
266            });
267        });
268
269        Self { tx }
270    }
271
272    // ── S2: onError dispatch ─────────────────────────────────
273
274    fn try_call_on_error(runtime: &mut JsRuntime, flow: &Flow, error: &str, stage: &str) {
275        let check_code = "typeof globalThis.onError === 'function'";
276        let exists = runtime
277            .execute_script("check_onError_v2", check_code)
278            .ok()
279            .map(|v| {
280                let mut scope = runtime.handle_scope();
281                let val = deno_core::v8::Local::new(&mut scope, v);
282                val.is_true()
283            })
284            .unwrap_or(false);
285
286        if !exists {
287            return;
288        }
289
290        let flow_json = match serde_json::to_string(flow) {
291            Ok(j) => j,
292            Err(_) => return,
293        };
294        let error_escaped = error.replace('\\', "\\\\").replace('\'', "\\'");
295        let code = format!(
296            "globalThis.onError({{}}, {}, '{}', '{}')",
297            flow_json, error_escaped, stage
298        );
299
300        let _ = runtime.execute_script("call_onError_v2", code);
301    }
302
303    fn handle_on_request_headers(
304        runtime: &mut JsRuntime,
305        flow: Flow,
306    ) -> Result<Option<Flow>, String> {
307        let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
308        let check_code = "typeof globalThis.onRequestHeaders === 'function'";
309        let exists = runtime
310            .execute_script("check_onRequestHeaders", check_code)
311            .map_err(|e| {
312                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequestHeaders");
313                e.to_string()
314            })?;
315        {
316            let mut scope = runtime.handle_scope();
317            let exists_val = deno_core::v8::Local::new(&mut scope, exists);
318            if !exists_val.is_true() {
319                return Ok(None);
320            }
321        }
322        let code = format!("globalThis.onRequestHeaders({{}}, {})", flow_json);
323        let result = runtime
324            .execute_script("call_onRequestHeaders", code)
325            .map_err(|e| {
326                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequestHeaders");
327                e.to_string()
328            })?;
329        let mut scope = runtime.handle_scope();
330        let result_val = deno_core::v8::Local::new(&mut scope, result);
331        if result_val.is_undefined() || result_val.is_null() {
332            return Ok(None);
333        }
334        let deser: Result<Flow, _> = deno_core::serde_v8::from_v8(&mut scope, result_val);
335        drop(scope);
336        let modified_flow = match deser {
337            Ok(f) => f,
338            Err(e) => {
339                let err_str = format!("Failed to deserialize flow: {}", e);
340                Self::try_call_on_error(runtime, &flow, &err_str, "onRequestHeaders");
341                return Err(err_str);
342            }
343        };
344        Ok(Some(modified_flow))
345    }
346
347    fn handle_on_response_headers(
348        runtime: &mut JsRuntime,
349        flow: Flow,
350    ) -> Result<Option<Flow>, String> {
351        let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
352        let check_code = "typeof globalThis.onResponseHeaders === 'function'";
353        let exists = runtime
354            .execute_script("check_onResponseHeaders", check_code)
355            .map_err(|e| {
356                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponseHeaders");
357                e.to_string()
358            })?;
359        {
360            let mut scope = runtime.handle_scope();
361            let exists_val = deno_core::v8::Local::new(&mut scope, exists);
362            if !exists_val.is_true() {
363                return Ok(None);
364            }
365        }
366        let code = format!("globalThis.onResponseHeaders({{}}, {})", flow_json);
367        let result = runtime
368            .execute_script("call_onResponseHeaders", code)
369            .map_err(|e| {
370                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponseHeaders");
371                e.to_string()
372            })?;
373        let mut scope = runtime.handle_scope();
374        let result_val = deno_core::v8::Local::new(&mut scope, result);
375        if result_val.is_undefined() || result_val.is_null() {
376            return Ok(None);
377        }
378        let deser: Result<Flow, _> = deno_core::serde_v8::from_v8(&mut scope, result_val);
379        drop(scope);
380        let modified_flow = match deser {
381            Ok(f) => f,
382            Err(e) => {
383                let err_str = format!("Failed to deserialize flow: {}", e);
384                Self::try_call_on_error(runtime, &flow, &err_str, "onResponseHeaders");
385                return Err(err_str);
386            }
387        };
388        Ok(Some(modified_flow))
389    }
390
391    async fn handle_on_request(
392        runtime: &mut JsRuntime,
393        flow: Flow,
394        body: HttpBody,
395    ) -> Result<(Option<Flow>, RequestAction), String> {
396        let resource = HttpBodyResource::new(body);
397        let rid = {
398            let op_state_rc = runtime.op_state();
399            let mut state = op_state_rc.borrow_mut();
400            state.resource_table.add(resource)
401        };
402
403        let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
404
405        let check_code = "typeof globalThis.onRequest === 'function'";
406        let exists = runtime
407            .execute_script("check_onRequest", check_code)
408            .map_err(|e| {
409                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequest");
410                e.to_string()
411            })?;
412
413        let exists_bool = {
414            let scope = &mut runtime.handle_scope();
415            let exists_val = deno_core::v8::Local::new(scope, exists);
416            exists_val.is_true()
417        };
418
419        if !exists_bool {
420            let resource = {
421                let op_state_rc = runtime.op_state();
422                let mut state = op_state_rc.borrow_mut();
423                state.resource_table.take::<HttpBodyResource>(rid).ok()
424            };
425            if let Some(res) = resource {
426                let body = crate::streams::create_body_from_resource(&res);
427                return Ok((None, RequestAction::Continue(body)));
428            } else {
429                return Ok((
430                    None,
431                    RequestAction::Continue(
432                        http_body_util::Empty::new()
433                            .map_err(|_| -> BoxError { unreachable!() })
434                            .boxed(),
435                    ),
436                ));
437            }
438        }
439
440        let code = format!(
441            "globalThis.onRequest(new RelayBody({}), {})",
442            rid, flow_json
443        );
444        let result = runtime
445            .execute_script("call_onRequest", code)
446            .map_err(|e| {
447                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequest");
448                e.to_string()
449            })?;
450
451        let result = runtime.resolve(result).await.map_err(|e| {
452            Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequest");
453            e.to_string()
454        })?;
455
456        let (is_empty, modified_flow) = {
457            let mut scope = runtime.handle_scope();
458            let result_val = deno_core::v8::Local::new(&mut scope, result);
459
460            if result_val.is_undefined() || result_val.is_null() {
461                (true, None)
462            } else {
463                let deser: Result<Flow, _> = deno_core::serde_v8::from_v8(&mut scope, result_val);
464                drop(scope);
465                match deser {
466                    Ok(f) => (false, Some(f)),
467                    Err(e) => {
468                        let err_str = format!("Failed to deserialize flow: {}", e);
469                        Self::try_call_on_error(runtime, &flow, &err_str, "onRequest");
470                        return Err(err_str);
471                    }
472                }
473            }
474        };
475
476        if is_empty {
477            let resource = {
478                let op_state_rc = runtime.op_state();
479                let mut state = op_state_rc.borrow_mut();
480                state.resource_table.take::<HttpBodyResource>(rid).ok()
481            };
482            if let Some(res) = resource {
483                let body = crate::streams::create_body_from_resource(&res);
484                return Ok((None, RequestAction::Continue(body)));
485            } else {
486                return Ok((
487                    None,
488                    RequestAction::Continue(
489                        http_body_util::Empty::new()
490                            .map_err(|_| -> BoxError { unreachable!() })
491                            .boxed(),
492                    ),
493                ));
494            }
495        }
496
497        let modified_flow = modified_flow.unwrap();
498
499        let resource = {
500            let op_state_rc = runtime.op_state();
501            let mut state = op_state_rc.borrow_mut();
502            state.resource_table.take::<HttpBodyResource>(rid).ok()
503        };
504
505        let new_body: HttpBody = if let Some(res) = resource {
506            let has_new_body = if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer
507            {
508                http.request
509                    .body
510                    .as_ref()
511                    .map(|b| !b.content.is_empty())
512                    .unwrap_or(false)
513            } else {
514                false
515            };
516
517            if has_new_body {
518                if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
519                    if let Some(b) = &http.request.body {
520                        let bytes: Bytes = if b.encoding == "base64" {
521                            base64::engine::general_purpose::STANDARD
522                                .decode(&b.content)
523                                .unwrap_or_default()
524                                .into()
525                        } else {
526                            Bytes::from(b.content.clone())
527                        };
528                        Full::new(bytes)
529                            .map_err(|e| -> BoxError { e.into() })
530                            .boxed()
531                    } else {
532                        http_body_util::Empty::new()
533                            .map_err(|_| -> BoxError { unreachable!() })
534                            .boxed()
535                    }
536                } else {
537                    http_body_util::Empty::new()
538                        .map_err(|_| -> BoxError { unreachable!() })
539                        .boxed()
540                }
541            } else {
542                crate::streams::create_body_from_resource(&res)
543            }
544        } else if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
545            if let Some(b) = &http.request.body {
546                let bytes: Bytes = if b.encoding == "base64" {
547                    base64::engine::general_purpose::STANDARD
548                        .decode(&b.content)
549                        .unwrap_or_default()
550                        .into()
551                } else {
552                    Bytes::from(b.content.clone())
553                };
554                Full::new(bytes)
555                    .map_err(|e| -> BoxError { e.into() })
556                    .boxed()
557            } else {
558                http_body_util::Empty::new()
559                    .map_err(|_| -> BoxError { unreachable!() })
560                    .boxed()
561            }
562        } else {
563            http_body_util::Empty::new()
564                .map_err(|_| -> BoxError { unreachable!() })
565                .boxed()
566        };
567
568        Ok((Some(modified_flow), RequestAction::Continue(new_body)))
569    }
570
571    async fn handle_on_response(
572        runtime: &mut JsRuntime,
573        flow: Flow,
574        body: HttpBody,
575    ) -> Result<(Option<Flow>, ResponseAction), String> {
576        let resource = HttpBodyResource::new(body);
577        let rid = {
578            let op_state_rc = runtime.op_state();
579            let mut state = op_state_rc.borrow_mut();
580            state.resource_table.add(resource)
581        };
582
583        let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
584
585        let check_code = "typeof globalThis.onResponse === 'function'";
586        let exists = runtime
587            .execute_script("check_onResponse", check_code)
588            .map_err(|e| {
589                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponse");
590                e.to_string()
591            })?;
592
593        let exists_bool = {
594            let scope = &mut runtime.handle_scope();
595            let exists_val = deno_core::v8::Local::new(scope, exists);
596            exists_val.is_true()
597        };
598
599        if !exists_bool {
600            let resource = {
601                let op_state_rc = runtime.op_state();
602                let mut state = op_state_rc.borrow_mut();
603                state.resource_table.take::<HttpBodyResource>(rid).ok()
604            };
605            if let Some(res) = resource {
606                let body = crate::streams::create_body_from_resource(&res);
607                return Ok((None, ResponseAction::Continue(body)));
608            } else {
609                return Ok((
610                    None,
611                    ResponseAction::Continue(
612                        http_body_util::Empty::new()
613                            .map_err(|_| -> BoxError { unreachable!() })
614                            .boxed(),
615                    ),
616                ));
617            }
618        }
619
620        let code = format!(
621            "globalThis.onResponse(new RelayBody({}), {})",
622            rid, flow_json
623        );
624        let result = runtime
625            .execute_script("call_onResponse", code)
626            .map_err(|e| {
627                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponse");
628                e.to_string()
629            })?;
630        let result = runtime.resolve(result).await.map_err(|e| {
631            Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponse");
632            e.to_string()
633        })?;
634
635        let (is_empty, modified_flow) = {
636            let mut scope = runtime.handle_scope();
637            let result_val = deno_core::v8::Local::new(&mut scope, result);
638
639            if result_val.is_undefined() || result_val.is_null() {
640                (true, None)
641            } else {
642                let deser: Result<Flow, _> = deno_core::serde_v8::from_v8(&mut scope, result_val);
643                drop(scope);
644                match deser {
645                    Ok(f) => (false, Some(f)),
646                    Err(e) => {
647                        let err_str = format!("Failed to deserialize flow: {}", e);
648                        Self::try_call_on_error(runtime, &flow, &err_str, "onResponse");
649                        return Err(err_str);
650                    }
651                }
652            }
653        };
654
655        if is_empty {
656            let resource = {
657                let op_state_rc = runtime.op_state();
658                let mut state = op_state_rc.borrow_mut();
659                state.resource_table.take::<HttpBodyResource>(rid).ok()
660            };
661            if let Some(res) = resource {
662                let body = crate::streams::create_body_from_resource(&res);
663                return Ok((None, ResponseAction::Continue(body)));
664            } else {
665                return Ok((
666                    None,
667                    ResponseAction::Continue(
668                        http_body_util::Empty::new()
669                            .map_err(|_| -> BoxError { unreachable!() })
670                            .boxed(),
671                    ),
672                ));
673            }
674        }
675
676        let modified_flow = modified_flow.unwrap();
677
678        let resource = {
679            let op_state_rc = runtime.op_state();
680            let mut state = op_state_rc.borrow_mut();
681            state.resource_table.take::<HttpBodyResource>(rid).ok()
682        };
683
684        let new_body: HttpBody = if let Some(res) = resource {
685            let has_new_body = if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer
686            {
687                http.response
688                    .as_ref()
689                    .and_then(|r| r.body.as_ref())
690                    .map(|b| !b.content.is_empty())
691                    .unwrap_or(false)
692            } else {
693                false
694            };
695
696            if has_new_body {
697                if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
698                    if let Some(resp) = &http.response {
699                        if let Some(b) = &resp.body {
700                            let bytes: Bytes = if b.encoding == "base64" {
701                                base64::engine::general_purpose::STANDARD
702                                    .decode(&b.content)
703                                    .unwrap_or_default()
704                                    .into()
705                            } else {
706                                Bytes::from(b.content.clone())
707                            };
708                            Full::new(bytes)
709                                .map_err(|e| -> BoxError { e.into() })
710                                .boxed()
711                        } else {
712                            http_body_util::Empty::new()
713                                .map_err(|_| -> BoxError { unreachable!() })
714                                .boxed()
715                        }
716                    } else {
717                        http_body_util::Empty::new()
718                            .map_err(|_| -> BoxError { unreachable!() })
719                            .boxed()
720                    }
721                } else {
722                    http_body_util::Empty::new()
723                        .map_err(|_| -> BoxError { unreachable!() })
724                        .boxed()
725                }
726            } else {
727                crate::streams::create_body_from_resource(&res)
728            }
729        } else if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
730            if let Some(resp) = &http.response {
731                if let Some(b) = &resp.body {
732                    let bytes: Bytes = if b.encoding == "base64" {
733                        base64::engine::general_purpose::STANDARD
734                            .decode(&b.content)
735                            .unwrap_or_default()
736                            .into()
737                    } else {
738                        Bytes::from(b.content.clone())
739                    };
740                    Full::new(bytes)
741                        .map_err(|e| -> BoxError { e.into() })
742                        .boxed()
743                } else {
744                    http_body_util::Empty::new()
745                        .map_err(|_| -> BoxError { unreachable!() })
746                        .boxed()
747                }
748            } else {
749                http_body_util::Empty::new()
750                    .map_err(|_| -> BoxError { unreachable!() })
751                    .boxed()
752            }
753        } else {
754            http_body_util::Empty::new()
755                .map_err(|_| -> BoxError { unreachable!() })
756                .boxed()
757        };
758
759        Ok((Some(modified_flow), ResponseAction::Continue(new_body)))
760    }
761
762    fn handle_on_websocket_message(
763        runtime: &mut JsRuntime,
764        flow: Flow,
765        message: WebSocketMessage,
766    ) -> Result<WebSocketMessageAction, String> {
767        let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
768        let message_json = serde_json::to_string(&message).map_err(|e| e.to_string())?;
769
770        let check_code = "typeof globalThis.onWebSocketMessage === 'function'";
771        let exists = runtime
772            .execute_script("check_onWebSocketMessage", check_code)
773            .map_err(|e| {
774                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onWebSocketMessage");
775                e.to_string()
776            })?;
777        {
778            let mut scope = runtime.handle_scope();
779            let exists_val = deno_core::v8::Local::new(&mut scope, exists);
780            if !exists_val.is_true() {
781                return Ok(WebSocketMessageAction::Continue(message));
782            }
783        }
784
785        let code = format!(
786            "globalThis.onWebSocketMessage({{}}, {}, {})",
787            flow_json, message_json
788        );
789        let result = runtime
790            .execute_script("call_onWebSocketMessage", code)
791            .map_err(|e| {
792                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onWebSocketMessage");
793                e.to_string()
794            })?;
795
796        let mut scope = runtime.handle_scope();
797        let result_val = deno_core::v8::Local::new(&mut scope, result);
798
799        if result_val.is_undefined() || result_val.is_null() {
800            return Ok(WebSocketMessageAction::Continue(message));
801        }
802
803        if result_val.is_string() {
804            let s = result_val.to_rust_string_lossy(&mut scope);
805            if s == "DROP" {
806                return Ok(WebSocketMessageAction::Drop);
807            }
808        }
809
810        let deser: Result<WebSocketMessage, _> =
811            deno_core::serde_v8::from_v8(&mut scope, result_val);
812        drop(scope);
813        let modified_message = match deser {
814            Ok(m) => m,
815            Err(e) => {
816                let err_str = format!("Failed to deserialize message: {}", e);
817                Self::try_call_on_error(runtime, &flow, &err_str, "onWebSocketMessage");
818                return Err(err_str);
819            }
820        };
821
822        Ok(WebSocketMessageAction::Continue(modified_message))
823    }
824}
825
826#[async_trait]
827impl ScriptEngineTrait for DenoScriptEngine {
828    async fn load_script(&mut self, script: &str) -> Result<(), BoxError> {
829        let (tx, rx) = oneshot::channel();
830        self.tx
831            .send(DenoCommand::LoadScript(script.to_string(), tx))
832            .await
833            .map_err(|e| Box::new(e) as BoxError)?;
834        rx.await
835            .map_err(|e| Box::new(e) as BoxError)?
836            .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)
837    }
838
839    async fn on_request_headers(&self, flow: &mut Flow) -> Result<Option<Flow>, BoxError> {
840        let (tx, rx) = oneshot::channel();
841        let flow_clone = flow.clone();
842        self.tx
843            .send(DenoCommand::OnRequestHeaders(flow_clone, tx))
844            .await
845            .map_err(|e| Box::new(e) as BoxError)?;
846        let res = rx
847            .await
848            .map_err(|e| Box::new(e) as BoxError)?
849            .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
850
851        if let Some(new_flow) = &res {
852            *flow = new_flow.clone();
853        }
854        Ok(res)
855    }
856
857    async fn on_request(&self, flow: &mut Flow, body: HttpBody) -> Result<RequestAction, BoxError> {
858        let (tx, rx) = oneshot::channel();
859        let flow_clone = flow.clone();
860        self.tx
861            .send(DenoCommand::OnRequest(flow_clone, body, tx))
862            .await
863            .map_err(|e| Box::new(e) as BoxError)?;
864        let (new_flow, action) = rx
865            .await
866            .map_err(|e| Box::new(e) as BoxError)?
867            .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
868
869        if let Some(f) = new_flow {
870            *flow = f;
871        }
872        Ok(action)
873    }
874
875    async fn on_response_headers(&self, flow: &mut Flow) -> Result<Option<Flow>, BoxError> {
876        let (tx, rx) = oneshot::channel();
877        let flow_clone = flow.clone();
878        self.tx
879            .send(DenoCommand::OnResponseHeaders(flow_clone, tx))
880            .await
881            .map_err(|e| Box::new(e) as BoxError)?;
882        let res = rx
883            .await
884            .map_err(|e| Box::new(e) as BoxError)?
885            .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
886
887        if let Some(new_flow) = &res {
888            *flow = new_flow.clone();
889        }
890        Ok(res)
891    }
892
893    async fn on_response(
894        &self,
895        flow: &mut Flow,
896        body: HttpBody,
897    ) -> Result<ResponseAction, BoxError> {
898        let (tx, rx) = oneshot::channel();
899        let flow_clone = flow.clone();
900        self.tx
901            .send(DenoCommand::OnResponse(flow_clone, body, tx))
902            .await
903            .map_err(|e| Box::new(e) as BoxError)?;
904        let (new_flow, action) = rx
905            .await
906            .map_err(|e| Box::new(e) as BoxError)?
907            .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
908
909        if let Some(f) = new_flow {
910            *flow = f;
911        }
912        Ok(action)
913    }
914
915    async fn on_websocket_message(
916        &self,
917        _flow: &mut Flow,
918        message: &mut WebSocketMessage,
919    ) -> Result<WebSocketMessageAction, BoxError> {
920        let (tx, rx) = oneshot::channel();
921        let flow_clone = _flow.clone();
922        let message_clone = message.clone();
923        self.tx
924            .send(DenoCommand::OnWebSocketMessage(
925                flow_clone,
926                message_clone,
927                tx,
928            ))
929            .await
930            .map_err(|e| Box::new(e) as BoxError)?;
931        let res = rx
932            .await
933            .map_err(|e| Box::new(e) as BoxError)?
934            .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
935
936        Ok(res)
937    }
938}