Skip to main content

relay_core_script/
deno_engine.rs

1use crate::engine_trait::ScriptEngineTrait;
2use crate::streams::HttpBodyResource;
3use async_trait::async_trait;
4use base64::Engine as _;
5use bytes::Bytes;
6use deno_core::{
7    Extension, JsRuntime, Op, OpState, ResourceId, RuntimeOptions, error::AnyError, op2,
8};
9use http_body_util::{BodyExt, Full};
10use relay_core_api::flow::{Flow, WebSocketMessage};
11use relay_core_lib::interceptor::{
12    BoxError, HttpBody, RequestAction, ResponseAction, WebSocketMessageAction,
13};
14use std::cell::RefCell;
15use std::rc::Rc;
16use std::thread;
17use tokio::sync::{mpsc, oneshot};
18
19#[op2(fast)]
20fn op_log(#[string] msg: String) {
21    println!("[Deno] {}", msg);
22}
23
24#[op2(async)]
25#[serde]
26async fn op_read_body(
27    state: Rc<RefCell<OpState>>,
28    #[smi] rid: ResourceId,
29    #[smi] limit: usize,
30) -> Result<Vec<u8>, AnyError> {
31    let resource = {
32        let state = state.borrow();
33        state.resource_table.get_any(rid)?
34    };
35    let view = resource.read(limit).await?;
36    Ok(view.to_vec())
37}
38
39#[op2(fast)]
40fn op_close_body(state: &mut OpState, #[smi] rid: ResourceId) {
41    state.resource_table.take_any(rid).ok();
42}
43
44enum DenoCommand {
45    LoadScript(String, oneshot::Sender<Result<(), String>>),
46    OnRequestHeaders(Flow, oneshot::Sender<Result<Option<Flow>, String>>),
47    OnRequest(
48        Flow,
49        HttpBody,
50        oneshot::Sender<Result<(Option<Flow>, RequestAction), String>>,
51    ),
52    OnResponseHeaders(Flow, oneshot::Sender<Result<Option<Flow>, String>>),
53    OnResponse(
54        Flow,
55        HttpBody,
56        oneshot::Sender<Result<(Option<Flow>, ResponseAction), String>>,
57    ),
58    OnWebSocketMessage(
59        Flow,
60        WebSocketMessage,
61        oneshot::Sender<Result<WebSocketMessageAction, String>>,
62    ),
63}
64
65#[derive(Clone)]
66pub struct DenoScriptEngine {
67    tx: mpsc::Sender<DenoCommand>,
68}
69
70impl Default for DenoScriptEngine {
71    fn default() -> Self {
72        Self::new()
73    }
74}
75
76impl DenoScriptEngine {
77    pub fn new() -> Self {
78        let (tx, mut rx) = mpsc::channel(32);
79
80        thread::spawn(move || {
81            let rt = tokio::runtime::Builder::new_current_thread()
82                .enable_all()
83                .build()
84                .unwrap();
85
86            rt.block_on(async move {
87                let ext = Extension {
88                    name: "relay_core",
89                    ops: std::borrow::Cow::Borrowed(&[
90                        op_log::DECL,
91                        op_read_body::DECL,
92                        op_close_body::DECL,
93                    ]),
94                    ..Default::default()
95                };
96
97                let mut js_runtime = JsRuntime::new(RuntimeOptions {
98                    extensions: vec![ext],
99                    ..Default::default()
100                });
101
102                // Polyfill console.log and Bootstrap API
103                let bootstrap = r#"
104                    globalThis.console = {
105                        log: (...args) => {
106                            const msg = args.map(arg => {
107                                if (typeof arg === 'object') {
108                                    try {
109                                        return JSON.stringify(arg);
110                                    } catch {
111                                        return String(arg);
112                                    }
113                                }
114                                return String(arg);
115                            }).join(" ");
116                            Deno.core.ops.op_log(msg);
117                        }
118                    };
119
120                    class RelayBody {
121                        constructor(rid) {
122                            this.rid = rid;
123                        }
124                        
125                        async read(limit) {
126                            return await Deno.core.ops.op_read_body(this.rid, limit || 65536);
127                        }
128                        
129                        close() {
130                            Deno.core.ops.op_close_body(this.rid);
131                        }
132                        
133                        async text() {
134                            const bytes = await this.read(10 * 1024 * 1024); // 10MB limit
135                            return new TextDecoder().decode(bytes);
136                        }
137                        
138                        async json() {
139                            const txt = await this.text();
140                            return JSON.parse(txt);
141                        }
142                    }
143                    globalThis.RelayBody = RelayBody;
144
145                    globalThis.relay = {
146                        log: globalThis.console.log,
147                        // Future: add more helpers like base64, etc.
148                    };
149                "#;
150                js_runtime.execute_script("bootstrap", bootstrap).unwrap();
151
152                while let Some(cmd) = rx.recv().await {
153                    match cmd {
154                        DenoCommand::LoadScript(script, resp) => {
155                            let res = js_runtime.execute_script("<anon>", script);
156                            let res = if let Err(e) = res {
157                                Err(e.to_string())
158                            } else {
159                                js_runtime
160                                    .run_event_loop(Default::default())
161                                    .await
162                                    .map(|_| ())
163                                    .map_err(|e| e.to_string())
164                            };
165                            let _ = resp.send(res);
166                        }
167                        DenoCommand::OnRequestHeaders(flow, resp) => {
168                            let res = Self::handle_on_request_headers(&mut js_runtime, flow);
169                            let _ = resp.send(res);
170                        }
171                        DenoCommand::OnRequest(flow, body, resp) => {
172                            let res = Self::handle_on_request(&mut js_runtime, flow, body).await;
173                            let _ = resp.send(res);
174                        }
175                        DenoCommand::OnResponseHeaders(flow, resp) => {
176                            let res = Self::handle_on_response_headers(&mut js_runtime, flow);
177                            let _ = resp.send(res);
178                        }
179                        DenoCommand::OnResponse(flow, body, resp) => {
180                            let res = Self::handle_on_response(&mut js_runtime, flow, body).await;
181                            let _ = resp.send(res);
182                        }
183                        DenoCommand::OnWebSocketMessage(flow, message, resp) => {
184                            let res =
185                                Self::handle_on_websocket_message(&mut js_runtime, flow, message);
186                            let _ = resp.send(res);
187                        }
188                    }
189                }
190            });
191        });
192
193        Self { tx }
194    }
195
196    fn handle_on_request_headers(
197        runtime: &mut JsRuntime,
198        flow: Flow,
199    ) -> Result<Option<Flow>, String> {
200        let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
201        let check_code = "typeof globalThis.onRequestHeaders === 'function'";
202        let exists = runtime
203            .execute_script("check_onRequestHeaders", check_code)
204            .map_err(|e| e.to_string())?;
205        {
206            let scope = &mut runtime.handle_scope();
207            let exists_val = deno_core::v8::Local::new(scope, exists);
208            if !exists_val.is_true() {
209                return Ok(None);
210            }
211        }
212        let code = format!("globalThis.onRequestHeaders({{}}, {})", flow_json);
213        let result = runtime
214            .execute_script("call_onRequestHeaders", code)
215            .map_err(|e| e.to_string())?;
216        let scope = &mut runtime.handle_scope();
217        let result_val = deno_core::v8::Local::new(scope, result);
218        if result_val.is_undefined() || result_val.is_null() {
219            return Ok(None);
220        }
221        let modified_flow: Flow = deno_core::serde_v8::from_v8(scope, result_val)
222            .map_err(|e| format!("Failed to deserialize flow: {}", e))?;
223        Ok(Some(modified_flow))
224    }
225
226    fn handle_on_response_headers(
227        runtime: &mut JsRuntime,
228        flow: Flow,
229    ) -> Result<Option<Flow>, String> {
230        let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
231        let check_code = "typeof globalThis.onResponseHeaders === 'function'";
232        let exists = runtime
233            .execute_script("check_onResponseHeaders", check_code)
234            .map_err(|e| e.to_string())?;
235        {
236            let scope = &mut runtime.handle_scope();
237            let exists_val = deno_core::v8::Local::new(scope, exists);
238            if !exists_val.is_true() {
239                return Ok(None);
240            }
241        }
242        let code = format!("globalThis.onResponseHeaders({{}}, {})", flow_json);
243        let result = runtime
244            .execute_script("call_onResponseHeaders", code)
245            .map_err(|e| e.to_string())?;
246        let scope = &mut runtime.handle_scope();
247        let result_val = deno_core::v8::Local::new(scope, result);
248        if result_val.is_undefined() || result_val.is_null() {
249            return Ok(None);
250        }
251        let modified_flow: Flow = deno_core::serde_v8::from_v8(scope, result_val)
252            .map_err(|e| format!("Failed to deserialize flow: {}", e))?;
253        Ok(Some(modified_flow))
254    }
255
256    async fn handle_on_request(
257        runtime: &mut JsRuntime,
258        flow: Flow,
259        body: HttpBody,
260    ) -> Result<(Option<Flow>, RequestAction), String> {
261        let resource = HttpBodyResource::new(body);
262        let rid = {
263            let op_state_rc = runtime.op_state();
264            let mut state = op_state_rc.borrow_mut();
265            state.resource_table.add(resource)
266        };
267
268        let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
269
270        let check_code = "typeof globalThis.onRequest === 'function'";
271        let exists = runtime
272            .execute_script("check_onRequest", check_code)
273            .map_err(|e| e.to_string())?;
274
275        let exists_bool = {
276            let scope = &mut runtime.handle_scope();
277            let exists_val = deno_core::v8::Local::new(scope, exists);
278            exists_val.is_true()
279        };
280
281        if !exists_bool {
282            // Not found, return original body via resource
283            let resource = {
284                let op_state_rc = runtime.op_state();
285                let mut state = op_state_rc.borrow_mut();
286                state.resource_table.take::<HttpBodyResource>(rid).ok()
287            };
288            if let Some(res) = resource {
289                let body = crate::streams::create_body_from_resource(&res);
290                return Ok((None, RequestAction::Continue(body)));
291            } else {
292                return Ok((
293                    None,
294                    RequestAction::Continue(
295                        http_body_util::Empty::new()
296                            .map_err(|_| -> BoxError { unreachable!() })
297                            .boxed(),
298                    ),
299                ));
300            }
301        }
302
303        let code = format!(
304            "globalThis.onRequest(new RelayBody({}), {})",
305            rid, flow_json
306        );
307        let result = runtime
308            .execute_script("call_onRequest", code)
309            .map_err(|e| e.to_string())?;
310
311        let result = runtime.resolve(result).await.map_err(|e| e.to_string())?;
312
313        let (is_empty, modified_flow) = {
314            let scope = &mut runtime.handle_scope();
315            let result_val = deno_core::v8::Local::new(scope, result);
316
317            if result_val.is_undefined() || result_val.is_null() {
318                (true, None)
319            } else {
320                let flow: Flow = deno_core::serde_v8::from_v8(scope, result_val)
321                    .map_err(|e| format!("Failed to deserialize flow: {}", e))?;
322                (false, Some(flow))
323            }
324        };
325
326        if is_empty {
327            // JS returned nothing, continue with original body
328            let resource = {
329                let op_state_rc = runtime.op_state();
330                let mut state = op_state_rc.borrow_mut();
331                state.resource_table.take::<HttpBodyResource>(rid).ok()
332            };
333            if let Some(res) = resource {
334                let body = crate::streams::create_body_from_resource(&res);
335                return Ok((None, RequestAction::Continue(body)));
336            } else {
337                return Ok((
338                    None,
339                    RequestAction::Continue(
340                        http_body_util::Empty::new()
341                            .map_err(|_| -> BoxError { unreachable!() })
342                            .boxed(),
343                    ),
344                ));
345            }
346        }
347
348        let modified_flow = modified_flow.unwrap();
349
350        let resource = {
351            let op_state_rc = runtime.op_state();
352            let mut state = op_state_rc.borrow_mut();
353            state.resource_table.take::<HttpBodyResource>(rid).ok()
354        };
355
356        let new_body: HttpBody = if let Some(res) = resource {
357            // Original body stream is available.
358            // Check if JS provided a NEW body in `modified_flow`.
359            let has_new_body = if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer
360            {
361                http.request
362                    .body
363                    .as_ref()
364                    .map(|b| !b.content.is_empty())
365                    .unwrap_or(false)
366            } else {
367                false
368            };
369
370            if has_new_body {
371                // JS provided new body content. Use it.
372                // And we drop the original resource (and its reader).
373                if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
374                    if let Some(b) = &http.request.body {
375                        let bytes: Bytes = if b.encoding == "base64" {
376                            base64::engine::general_purpose::STANDARD
377                                .decode(&b.content)
378                                .unwrap_or_default()
379                                .into()
380                        } else {
381                            Bytes::from(b.content.clone())
382                        };
383                        Full::new(bytes)
384                            .map_err(|e| -> BoxError { e.into() })
385                            .boxed()
386                    } else {
387                        http_body_util::Empty::new()
388                            .map_err(|_| -> BoxError { unreachable!() })
389                            .boxed()
390                    }
391                } else {
392                    http_body_util::Empty::new()
393                        .map_err(|_| -> BoxError { unreachable!() })
394                        .boxed()
395                }
396            } else {
397                // JS did not provide new body content. Use original stream.
398                crate::streams::create_body_from_resource(&res)
399            }
400        } else if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
401            // Resource is gone (JS consumed it or closed it).
402            // We must use whatever is in `modified_flow`.
403            if let Some(b) = &http.request.body {
404                let bytes: Bytes = if b.encoding == "base64" {
405                    base64::engine::general_purpose::STANDARD
406                        .decode(&b.content)
407                        .unwrap_or_default()
408                        .into()
409                } else {
410                    Bytes::from(b.content.clone())
411                };
412                Full::new(bytes)
413                    .map_err(|e| -> BoxError { e.into() })
414                    .boxed()
415            } else {
416                http_body_util::Empty::new()
417                    .map_err(|_| -> BoxError { unreachable!() })
418                    .boxed()
419            }
420        } else {
421            http_body_util::Empty::new()
422                .map_err(|_| -> BoxError { unreachable!() })
423                .boxed()
424        };
425
426        Ok((Some(modified_flow), RequestAction::Continue(new_body)))
427    }
428
429    async fn handle_on_response(
430        runtime: &mut JsRuntime,
431        flow: Flow,
432        body: HttpBody,
433    ) -> Result<(Option<Flow>, ResponseAction), String> {
434        let resource = HttpBodyResource::new(body);
435        let rid = {
436            let op_state_rc = runtime.op_state();
437            let mut state = op_state_rc.borrow_mut();
438            state.resource_table.add(resource)
439        };
440
441        let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
442
443        let check_code = "typeof globalThis.onResponse === 'function'";
444        let exists = runtime
445            .execute_script("check_onResponse", check_code)
446            .map_err(|e| e.to_string())?;
447
448        let exists_bool = {
449            let scope = &mut runtime.handle_scope();
450            let exists_val = deno_core::v8::Local::new(scope, exists);
451            exists_val.is_true()
452        };
453
454        if !exists_bool {
455            let resource = {
456                let op_state_rc = runtime.op_state();
457                let mut state = op_state_rc.borrow_mut();
458                state.resource_table.take::<HttpBodyResource>(rid).ok()
459            };
460            if let Some(res) = resource {
461                let body = crate::streams::create_body_from_resource(&res);
462                return Ok((None, ResponseAction::Continue(body)));
463            } else {
464                return Ok((
465                    None,
466                    ResponseAction::Continue(
467                        http_body_util::Empty::new()
468                            .map_err(|_| -> BoxError { unreachable!() })
469                            .boxed(),
470                    ),
471                ));
472            }
473        }
474
475        let code = format!(
476            "globalThis.onResponse(new RelayBody({}), {})",
477            rid, flow_json
478        );
479        let result = runtime
480            .execute_script("call_onResponse", code)
481            .map_err(|e| e.to_string())?;
482        let result = runtime.resolve(result).await.map_err(|e| e.to_string())?;
483
484        let (is_empty, modified_flow) = {
485            let scope = &mut runtime.handle_scope();
486            let result_val = deno_core::v8::Local::new(scope, result);
487
488            if result_val.is_undefined() || result_val.is_null() {
489                (true, None)
490            } else {
491                let flow: Flow = deno_core::serde_v8::from_v8(scope, result_val)
492                    .map_err(|e| format!("Failed to deserialize flow: {}", e))?;
493                (false, Some(flow))
494            }
495        };
496
497        if is_empty {
498            let resource = {
499                let op_state_rc = runtime.op_state();
500                let mut state = op_state_rc.borrow_mut();
501                state.resource_table.take::<HttpBodyResource>(rid).ok()
502            };
503            if let Some(res) = resource {
504                let body = crate::streams::create_body_from_resource(&res);
505                return Ok((None, ResponseAction::Continue(body)));
506            } else {
507                return Ok((
508                    None,
509                    ResponseAction::Continue(
510                        http_body_util::Empty::new()
511                            .map_err(|_| -> BoxError { unreachable!() })
512                            .boxed(),
513                    ),
514                ));
515            }
516        }
517
518        let modified_flow = modified_flow.unwrap();
519
520        let resource = {
521            let op_state_rc = runtime.op_state();
522            let mut state = op_state_rc.borrow_mut();
523            state.resource_table.take::<HttpBodyResource>(rid).ok()
524        };
525
526        let new_body: HttpBody = if let Some(res) = resource {
527            let has_new_body = if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer
528            {
529                http.response
530                    .as_ref()
531                    .and_then(|r| r.body.as_ref())
532                    .map(|b| !b.content.is_empty())
533                    .unwrap_or(false)
534            } else {
535                false
536            };
537
538            if has_new_body {
539                if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
540                    if let Some(resp) = &http.response {
541                        if let Some(b) = &resp.body {
542                            let bytes: Bytes = if b.encoding == "base64" {
543                                base64::engine::general_purpose::STANDARD
544                                    .decode(&b.content)
545                                    .unwrap_or_default()
546                                    .into()
547                            } else {
548                                Bytes::from(b.content.clone())
549                            };
550                            Full::new(bytes)
551                                .map_err(|e| -> BoxError { e.into() })
552                                .boxed()
553                        } else {
554                            http_body_util::Empty::new()
555                                .map_err(|_| -> BoxError { unreachable!() })
556                                .boxed()
557                        }
558                    } else {
559                        http_body_util::Empty::new()
560                            .map_err(|_| -> BoxError { unreachable!() })
561                            .boxed()
562                    }
563                } else {
564                    http_body_util::Empty::new()
565                        .map_err(|_| -> BoxError { unreachable!() })
566                        .boxed()
567                }
568            } else {
569                crate::streams::create_body_from_resource(&res)
570            }
571        } else if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
572            if let Some(resp) = &http.response {
573                if let Some(b) = &resp.body {
574                    let bytes: Bytes = if b.encoding == "base64" {
575                        base64::engine::general_purpose::STANDARD
576                            .decode(&b.content)
577                            .unwrap_or_default()
578                            .into()
579                    } else {
580                        Bytes::from(b.content.clone())
581                    };
582                    Full::new(bytes)
583                        .map_err(|e| -> BoxError { e.into() })
584                        .boxed()
585                } else {
586                    http_body_util::Empty::new()
587                        .map_err(|_| -> BoxError { unreachable!() })
588                        .boxed()
589                }
590            } else {
591                http_body_util::Empty::new()
592                    .map_err(|_| -> BoxError { unreachable!() })
593                    .boxed()
594            }
595        } else {
596            http_body_util::Empty::new()
597                .map_err(|_| -> BoxError { unreachable!() })
598                .boxed()
599        };
600
601        Ok((Some(modified_flow), ResponseAction::Continue(new_body)))
602    }
603
604    fn handle_on_websocket_message(
605        runtime: &mut JsRuntime,
606        flow: Flow,
607        message: WebSocketMessage,
608    ) -> Result<WebSocketMessageAction, String> {
609        let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
610        let message_json = serde_json::to_string(&message).map_err(|e| e.to_string())?;
611
612        let check_code = "typeof globalThis.onWebSocketMessage === 'function'";
613        let exists = runtime
614            .execute_script("check_onWebSocketMessage", check_code)
615            .map_err(|e| e.to_string())?;
616        {
617            let scope = &mut runtime.handle_scope();
618            let exists_val = deno_core::v8::Local::new(scope, exists);
619            if !exists_val.is_true() {
620                return Ok(WebSocketMessageAction::Continue(message));
621            }
622        }
623
624        let code = format!(
625            "globalThis.onWebSocketMessage({{}}, {}, {})",
626            flow_json, message_json
627        );
628        let result = runtime
629            .execute_script("call_onWebSocketMessage", code)
630            .map_err(|e| e.to_string())?;
631
632        let scope = &mut runtime.handle_scope();
633        let result_val = deno_core::v8::Local::new(scope, result);
634
635        if result_val.is_undefined() || result_val.is_null() {
636            return Ok(WebSocketMessageAction::Continue(message));
637        }
638
639        // Check if user returned "DROP" (string) or a modified WebSocketMessage
640        if result_val.is_string() {
641            let s = result_val.to_rust_string_lossy(scope);
642            if s == "DROP" {
643                return Ok(WebSocketMessageAction::Drop);
644            }
645        }
646
647        let modified_message: WebSocketMessage = deno_core::serde_v8::from_v8(scope, result_val)
648            .map_err(|e| format!("Failed to deserialize message: {}", e))?;
649
650        Ok(WebSocketMessageAction::Continue(modified_message))
651    }
652}
653
654#[async_trait]
655impl ScriptEngineTrait for DenoScriptEngine {
656    async fn load_script(&mut self, script: &str) -> Result<(), BoxError> {
657        let (tx, rx) = oneshot::channel();
658        self.tx
659            .send(DenoCommand::LoadScript(script.to_string(), tx))
660            .await
661            .map_err(|e| Box::new(e) as BoxError)?;
662        rx.await
663            .map_err(|e| Box::new(e) as BoxError)?
664            .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)
665    }
666
667    async fn on_request_headers(&self, flow: &mut Flow) -> Result<Option<Flow>, BoxError> {
668        let (tx, rx) = oneshot::channel();
669        let flow_clone = flow.clone();
670        self.tx
671            .send(DenoCommand::OnRequestHeaders(flow_clone, tx))
672            .await
673            .map_err(|e| Box::new(e) as BoxError)?;
674        let res = rx
675            .await
676            .map_err(|e| Box::new(e) as BoxError)?
677            .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
678
679        if let Some(new_flow) = &res {
680            *flow = new_flow.clone();
681        }
682        Ok(res)
683    }
684
685    async fn on_request(&self, flow: &mut Flow, body: HttpBody) -> Result<RequestAction, BoxError> {
686        let (tx, rx) = oneshot::channel();
687        let flow_clone = flow.clone();
688        self.tx
689            .send(DenoCommand::OnRequest(flow_clone, body, tx))
690            .await
691            .map_err(|e| Box::new(e) as BoxError)?;
692        let (new_flow, action) = rx
693            .await
694            .map_err(|e| Box::new(e) as BoxError)?
695            .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
696
697        if let Some(f) = new_flow {
698            *flow = f;
699        }
700        Ok(action)
701    }
702
703    async fn on_response_headers(&self, flow: &mut Flow) -> Result<Option<Flow>, BoxError> {
704        let (tx, rx) = oneshot::channel();
705        let flow_clone = flow.clone();
706        self.tx
707            .send(DenoCommand::OnResponseHeaders(flow_clone, tx))
708            .await
709            .map_err(|e| Box::new(e) as BoxError)?;
710        let res = rx
711            .await
712            .map_err(|e| Box::new(e) as BoxError)?
713            .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
714
715        if let Some(new_flow) = &res {
716            *flow = new_flow.clone();
717        }
718        Ok(res)
719    }
720
721    async fn on_response(
722        &self,
723        flow: &mut Flow,
724        body: HttpBody,
725    ) -> Result<ResponseAction, BoxError> {
726        let (tx, rx) = oneshot::channel();
727        let flow_clone = flow.clone();
728        self.tx
729            .send(DenoCommand::OnResponse(flow_clone, body, tx))
730            .await
731            .map_err(|e| Box::new(e) as BoxError)?;
732        let (new_flow, action) = rx
733            .await
734            .map_err(|e| Box::new(e) as BoxError)?
735            .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
736
737        if let Some(f) = new_flow {
738            *flow = f;
739        }
740        Ok(action)
741    }
742
743    async fn on_websocket_message(
744        &self,
745        _flow: &mut Flow,
746        message: &mut WebSocketMessage,
747    ) -> Result<WebSocketMessageAction, BoxError> {
748        let (tx, rx) = oneshot::channel();
749        let flow_clone = _flow.clone();
750        let message_clone = message.clone();
751        self.tx
752            .send(DenoCommand::OnWebSocketMessage(
753                flow_clone,
754                message_clone,
755                tx,
756            ))
757            .await
758            .map_err(|e| Box::new(e) as BoxError)?;
759        let res = rx
760            .await
761            .map_err(|e| Box::new(e) as BoxError)?
762            .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
763
764        Ok(res)
765    }
766}