Skip to main content

relay_core_script/
deno_engine.rs

1use crate::engine_trait::ScriptEngineTrait;
2use crate::streams::HttpBodyResource;
3use async_trait::async_trait;
4use base64::Engine as _;
5use bytes::Bytes;
6use deno_core::{
7    Extension, JsRuntime, Op, OpState, ResourceId, RuntimeOptions, error::AnyError, op2,
8};
9use http_body_util::{BodyExt, Full};
10use relay_core_api::flow::{Flow, WebSocketMessage};
11use relay_core_lib::interceptor::{
12    BoxError, HttpBody, RequestAction, ResponseAction, WebSocketMessageAction,
13};
14use std::cell::RefCell;
15use std::collections::{HashMap, HashSet};
16use std::rc::Rc;
17use std::thread;
18use tokio::sync::{mpsc, oneshot};
19
20#[op2(fast)]
21fn op_log_level(#[string] level: String, #[string] msg: String) {
22    match level.as_str() {
23        "error" => tracing::error!("[User Script] {}", msg),
24        "warn" => tracing::warn!("[User Script] {}", msg),
25        "info" => tracing::info!("[User Script] {}", msg),
26        "debug" => tracing::debug!("[User Script] {}", msg),
27        _ => tracing::info!("[User Script] {}", msg),
28    }
29}
30
31#[op2(async)]
32#[serde]
33async fn op_read_body(
34    state: Rc<RefCell<OpState>>,
35    #[smi] rid: ResourceId,
36    #[smi] limit: usize,
37) -> Result<Vec<u8>, AnyError> {
38    let resource = {
39        let state = state.borrow();
40        state.resource_table.get_any(rid)?
41    };
42    let view = resource.read(limit).await?;
43    Ok(view.to_vec())
44}
45
46#[op2(fast)]
47fn op_close_body(state: &mut OpState, #[smi] rid: ResourceId) {
48    state.resource_table.take_any(rid).ok();
49}
50
51// ── S1: sharedState ops ────────────────────────────────────────
52
53fn shared_state(state: &mut OpState) -> &mut HashMap<String, serde_json::Value> {
54    state.borrow_mut::<HashMap<String, serde_json::Value>>()
55}
56
57#[op2]
58#[serde]
59fn op_shared_state_get(state: &mut OpState, #[string] key: String) -> Option<serde_json::Value> {
60    shared_state(state).get(&key).cloned()
61}
62
63#[op2]
64fn op_shared_state_set(
65    state: &mut OpState,
66    #[string] key: String,
67    #[serde] value: serde_json::Value,
68) {
69    shared_state(state).insert(key, value);
70}
71
72#[op2(fast)]
73fn op_shared_state_delete(state: &mut OpState, #[string] key: String) -> bool {
74    shared_state(state).remove(&key).is_some()
75}
76
77#[op2(fast)]
78fn op_shared_state_clear(state: &mut OpState) {
79    shared_state(state).clear();
80}
81
82#[op2]
83#[serde]
84fn op_shared_state_keys(state: &mut OpState) -> Vec<String> {
85    shared_state(state).keys().cloned().collect()
86}
87
88// ── S5: relay.env — whitelisted environment variable access ────
89
90fn env_allow(state: &OpState) -> &HashSet<String> {
91    state.borrow::<HashSet<String>>()
92}
93
94#[op2]
95#[string]
96fn op_env_get(state: &OpState, #[string] key: String) -> Option<String> {
97    let allowed = env_allow(state);
98    if allowed.is_empty() || !allowed.contains(&key) {
99        return None;
100    }
101    std::env::var(&key).ok()
102}
103
104// ── S6: relay utility ops — uuid, hash, base64, json ──────────
105
106#[op2]
107#[string]
108fn op_uuid_v4() -> String {
109    uuid::Uuid::new_v4().to_string()
110}
111
112#[op2]
113#[string]
114fn op_hash(#[string] algorithm: String, #[string] data: String) -> Result<String, AnyError> {
115    use sha1::Sha1;
116    use sha2::{Sha256, Sha512};
117    let bytes = data.as_bytes();
118    let hex = match algorithm.as_str() {
119        "sha1" => {
120            use sha1::Digest;
121            data_encoding::HEXLOWER.encode(&Sha1::digest(bytes))
122        }
123        "sha256" => {
124            use sha2::Digest;
125            data_encoding::HEXLOWER.encode(&Sha256::digest(bytes))
126        }
127        "sha512" => {
128            use sha2::Digest;
129            data_encoding::HEXLOWER.encode(&Sha512::digest(bytes))
130        }
131        "md5" => {
132            use md5::Digest;
133            data_encoding::HEXLOWER.encode(&md5::Md5::digest(bytes))
134        }
135        other => {
136            return Err(AnyError::msg(format!(
137                "unsupported hash algorithm: {}. Supported: sha1, sha256, sha512, md5",
138                other
139            )));
140        }
141    };
142    Ok(hex)
143}
144
145#[op2]
146#[string]
147fn op_base64_encode(#[string] data: String) -> String {
148    use base64::Engine as _;
149    base64::engine::general_purpose::STANDARD.encode(data.as_bytes())
150}
151
152#[op2]
153#[string]
154fn op_base64_decode(#[string] data: String) -> Result<String, AnyError> {
155    use base64::Engine as _;
156    let bytes = base64::engine::general_purpose::STANDARD
157        .decode(data.as_bytes())
158        .map_err(|e| AnyError::msg(format!("base64 decode error: {}", e)))?;
159    String::from_utf8(bytes).map_err(|e| AnyError::msg(format!("utf-8 error: {}", e)))
160}
161
162#[op2]
163#[serde]
164fn op_json_parse_safe(#[string] data: String) -> serde_json::Value {
165    serde_json::from_str(&data).unwrap_or(serde_json::Value::Null)
166}
167
168#[op2]
169#[string]
170fn op_json_stringify_pretty(#[serde] value: serde_json::Value) -> String {
171    serde_json::to_string_pretty(&value).unwrap_or_else(|_| String::new())
172}
173
174enum DenoCommand {
175    LoadScript(String, oneshot::Sender<Result<(), String>>),
176    OnRequestHeaders(Flow, oneshot::Sender<Result<Option<Flow>, String>>),
177    OnRequest(
178        Flow,
179        HttpBody,
180        oneshot::Sender<Result<(Option<Flow>, RequestAction), String>>,
181    ),
182    OnResponseHeaders(Flow, oneshot::Sender<Result<Option<Flow>, String>>),
183    OnResponse(
184        Flow,
185        HttpBody,
186        oneshot::Sender<Result<(Option<Flow>, ResponseAction), String>>,
187    ),
188    OnWebSocketMessage(
189        Flow,
190        WebSocketMessage,
191        oneshot::Sender<Result<WebSocketMessageAction, String>>,
192    ),
193}
194
195#[derive(Clone)]
196pub struct DenoScriptEngine {
197    tx: mpsc::Sender<DenoCommand>,
198}
199
200impl Default for DenoScriptEngine {
201    fn default() -> Self {
202        Self::new(HashSet::new())
203    }
204}
205
206impl DenoScriptEngine {
207    pub fn new(env_allow: HashSet<String>) -> Self {
208        let (tx, mut rx) = mpsc::channel(32);
209
210        thread::spawn(move || {
211            let rt = tokio::runtime::Builder::new_current_thread()
212                .enable_all()
213                .build()
214                .unwrap();
215
216            rt.block_on(async move {
217                let env_allow = env_allow; // capture into async block
218                let ext = Extension {
219                    name: "relay_core",
220                    ops: std::borrow::Cow::Borrowed(&[
221                        op_log_level::DECL,
222                        op_read_body::DECL,
223                        op_close_body::DECL,
224                        op_shared_state_get::DECL,
225                        op_shared_state_set::DECL,
226                        op_shared_state_delete::DECL,
227                        op_shared_state_clear::DECL,
228                        op_shared_state_keys::DECL,
229                        op_env_get::DECL,
230                        op_uuid_v4::DECL,
231                        op_hash::DECL,
232                        op_base64_encode::DECL,
233                        op_base64_decode::DECL,
234                        op_json_parse_safe::DECL,
235                        op_json_stringify_pretty::DECL,
236                    ]),
237                    op_state_fn: Some(Box::new({
238                        let env_allow = env_allow.clone();
239                        move |state| {
240                            state.put(HashMap::<String, serde_json::Value>::new());
241                            state.put(env_allow.clone());
242                        }
243                    })),
244                    ..Default::default()
245                };
246
247                let mut js_runtime = JsRuntime::new(RuntimeOptions {
248                    extensions: vec![ext],
249                    ..Default::default()
250                });
251
252                // Bootstrap JS — S1/S3: sharedState + console levels
253                let bootstrap = r#"
254                    globalThis.console = {
255                        log: (...args) => {
256                            Deno.core.ops.op_log_level("log", _format(args));
257                        },
258                        info: (...args) => {
259                            Deno.core.ops.op_log_level("info", _format(args));
260                        },
261                        warn: (...args) => {
262                            Deno.core.ops.op_log_level("warn", _format(args));
263                        },
264                        error: (...args) => {
265                            Deno.core.ops.op_log_level("error", _format(args));
266                        },
267                        debug: (...args) => {
268                            Deno.core.ops.op_log_level("debug", _format(args));
269                        },
270                    };
271
272                    function _format(args) {
273                        return args.map(arg => {
274                            if (typeof arg === 'object') {
275                                try { return JSON.stringify(arg); }
276                                catch { return String(arg); }
277                            }
278                            return String(arg);
279                        }).join(" ");
280                    }
281
282                    class RelayBody {
283                        constructor(rid) { this.rid = rid; }
284                        async read(limit) {
285                            return await Deno.core.ops.op_read_body(this.rid, limit || 65536);
286                        }
287                        close() {
288                            Deno.core.ops.op_close_body(this.rid);
289                        }
290                        async text() {
291                            const bytes = await this.read(10 * 1024 * 1024);
292                            return new TextDecoder().decode(bytes);
293                        }
294                        async json() {
295                            const txt = await this.text();
296                            return JSON.parse(txt);
297                        }
298                    }
299                    globalThis.RelayBody = RelayBody;
300
301                    globalThis.relay = {
302                        log: globalThis.console.log,
303                        env: function(name) {
304                            return Deno.core.ops.op_env_get(name);
305                        },
306                        uuid: function() {
307                            return Deno.core.ops.op_uuid_v4();
308                        },
309                        hash: function(alg, data) {
310                            return Deno.core.ops.op_hash(alg, data);
311                        },
312                        base64: {
313                            encode: function(data) {
314                                return Deno.core.ops.op_base64_encode(data);
315                            },
316                            decode: function(data) {
317                                return Deno.core.ops.op_base64_decode(data);
318                            },
319                        },
320                        json: {
321                            parseSafe: function(str) {
322                                return Deno.core.ops.op_json_parse_safe(str);
323                            },
324                            stringifyPretty: function(obj) {
325                                return Deno.core.ops.op_json_stringify_pretty(obj);
326                            },
327                        },
328                    };
329
330                    // S1: sharedState — cross-hook shared map per isolate
331                    globalThis.sharedState = {
332                        get(key) {
333                            return Deno.core.ops.op_shared_state_get(key);
334                        },
335                        set(key, value) {
336                            Deno.core.ops.op_shared_state_set(key, value);
337                        },
338                        delete(key) {
339                            return Deno.core.ops.op_shared_state_delete(key);
340                        },
341                        clear() {
342                            Deno.core.ops.op_shared_state_clear();
343                        },
344                        keys() {
345                            return Deno.core.ops.op_shared_state_keys();
346                        },
347                    };
348                "#;
349                js_runtime.execute_script("bootstrap", bootstrap).unwrap();
350
351                while let Some(cmd) = rx.recv().await {
352                    match cmd {
353                        DenoCommand::LoadScript(script, resp) => {
354                            let res = js_runtime.execute_script("<anon>", script);
355                            let res = if let Err(e) = res {
356                                Err(e.to_string())
357                            } else {
358                                js_runtime
359                                    .run_event_loop(Default::default())
360                                    .await
361                                    .map(|_| ())
362                                    .map_err(|e| e.to_string())
363                            };
364                            let _ = resp.send(res);
365                        }
366                        DenoCommand::OnRequestHeaders(flow, resp) => {
367                            let res = Self::handle_on_request_headers(&mut js_runtime, flow);
368                            let _ = resp.send(res);
369                        }
370                        DenoCommand::OnRequest(flow, body, resp) => {
371                            let res = Self::handle_on_request(&mut js_runtime, flow, body).await;
372                            let _ = resp.send(res);
373                        }
374                        DenoCommand::OnResponseHeaders(flow, resp) => {
375                            let res = Self::handle_on_response_headers(&mut js_runtime, flow);
376                            let _ = resp.send(res);
377                        }
378                        DenoCommand::OnResponse(flow, body, resp) => {
379                            let res = Self::handle_on_response(&mut js_runtime, flow, body).await;
380                            let _ = resp.send(res);
381                        }
382                        DenoCommand::OnWebSocketMessage(flow, message, resp) => {
383                            let res =
384                                Self::handle_on_websocket_message(&mut js_runtime, flow, message);
385                            let _ = resp.send(res);
386                        }
387                    }
388                }
389            });
390        });
391
392        Self { tx }
393    }
394
395    // ── S2: onError dispatch ─────────────────────────────────
396
397    fn try_call_on_error(runtime: &mut JsRuntime, flow: &Flow, error: &str, stage: &str) {
398        let check_code = "typeof globalThis.onError === 'function'";
399        let exists = runtime
400            .execute_script("check_onError_v2", check_code)
401            .ok()
402            .map(|v| {
403                let mut scope = runtime.handle_scope();
404                let val = deno_core::v8::Local::new(&mut scope, v);
405                val.is_true()
406            })
407            .unwrap_or(false);
408
409        if !exists {
410            return;
411        }
412
413        let flow_json = match serde_json::to_string(flow) {
414            Ok(j) => j,
415            Err(_) => return,
416        };
417        let error_escaped = error.replace('\\', "\\\\").replace('\'', "\\'");
418        let code = format!(
419            "globalThis.onError({{}}, {}, '{}', '{}')",
420            flow_json, error_escaped, stage
421        );
422
423        let _ = runtime.execute_script("call_onError_v2", code);
424    }
425
426    fn handle_on_request_headers(
427        runtime: &mut JsRuntime,
428        flow: Flow,
429    ) -> Result<Option<Flow>, String> {
430        let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
431        let check_code = "typeof globalThis.onRequestHeaders === 'function'";
432        let exists = runtime
433            .execute_script("check_onRequestHeaders", check_code)
434            .map_err(|e| {
435                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequestHeaders");
436                e.to_string()
437            })?;
438        {
439            let mut scope = runtime.handle_scope();
440            let exists_val = deno_core::v8::Local::new(&mut scope, exists);
441            if !exists_val.is_true() {
442                return Ok(None);
443            }
444        }
445        let code = format!("globalThis.onRequestHeaders({{}}, {})", flow_json);
446        let result = runtime
447            .execute_script("call_onRequestHeaders", code)
448            .map_err(|e| {
449                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequestHeaders");
450                e.to_string()
451            })?;
452        let mut scope = runtime.handle_scope();
453        let result_val = deno_core::v8::Local::new(&mut scope, result);
454        if result_val.is_undefined() || result_val.is_null() {
455            return Ok(None);
456        }
457        let deser: Result<Flow, _> = deno_core::serde_v8::from_v8(&mut scope, result_val);
458        drop(scope);
459        let modified_flow = match deser {
460            Ok(f) => f,
461            Err(e) => {
462                let err_str = format!("Failed to deserialize flow: {}", e);
463                Self::try_call_on_error(runtime, &flow, &err_str, "onRequestHeaders");
464                return Err(err_str);
465            }
466        };
467        Ok(Some(modified_flow))
468    }
469
470    fn handle_on_response_headers(
471        runtime: &mut JsRuntime,
472        flow: Flow,
473    ) -> Result<Option<Flow>, String> {
474        let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
475        let check_code = "typeof globalThis.onResponseHeaders === 'function'";
476        let exists = runtime
477            .execute_script("check_onResponseHeaders", check_code)
478            .map_err(|e| {
479                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponseHeaders");
480                e.to_string()
481            })?;
482        {
483            let mut scope = runtime.handle_scope();
484            let exists_val = deno_core::v8::Local::new(&mut scope, exists);
485            if !exists_val.is_true() {
486                return Ok(None);
487            }
488        }
489        let code = format!("globalThis.onResponseHeaders({{}}, {})", flow_json);
490        let result = runtime
491            .execute_script("call_onResponseHeaders", code)
492            .map_err(|e| {
493                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponseHeaders");
494                e.to_string()
495            })?;
496        let mut scope = runtime.handle_scope();
497        let result_val = deno_core::v8::Local::new(&mut scope, result);
498        if result_val.is_undefined() || result_val.is_null() {
499            return Ok(None);
500        }
501        let deser: Result<Flow, _> = deno_core::serde_v8::from_v8(&mut scope, result_val);
502        drop(scope);
503        let modified_flow = match deser {
504            Ok(f) => f,
505            Err(e) => {
506                let err_str = format!("Failed to deserialize flow: {}", e);
507                Self::try_call_on_error(runtime, &flow, &err_str, "onResponseHeaders");
508                return Err(err_str);
509            }
510        };
511        Ok(Some(modified_flow))
512    }
513
514    async fn handle_on_request(
515        runtime: &mut JsRuntime,
516        flow: Flow,
517        body: HttpBody,
518    ) -> Result<(Option<Flow>, RequestAction), String> {
519        let resource = HttpBodyResource::new(body);
520        let rid = {
521            let op_state_rc = runtime.op_state();
522            let mut state = op_state_rc.borrow_mut();
523            state.resource_table.add(resource)
524        };
525
526        let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
527
528        let check_code = "typeof globalThis.onRequest === 'function'";
529        let exists = runtime
530            .execute_script("check_onRequest", check_code)
531            .map_err(|e| {
532                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequest");
533                e.to_string()
534            })?;
535
536        let exists_bool = {
537            let scope = &mut runtime.handle_scope();
538            let exists_val = deno_core::v8::Local::new(scope, exists);
539            exists_val.is_true()
540        };
541
542        if !exists_bool {
543            let resource = {
544                let op_state_rc = runtime.op_state();
545                let mut state = op_state_rc.borrow_mut();
546                state.resource_table.take::<HttpBodyResource>(rid).ok()
547            };
548            if let Some(res) = resource {
549                let body = crate::streams::create_body_from_resource(&res);
550                return Ok((None, RequestAction::Continue(body)));
551            } else {
552                return Ok((
553                    None,
554                    RequestAction::Continue(
555                        http_body_util::Empty::new()
556                            .map_err(|_| -> BoxError { unreachable!() })
557                            .boxed(),
558                    ),
559                ));
560            }
561        }
562
563        let code = format!(
564            "globalThis.onRequest(new RelayBody({}), {})",
565            rid, flow_json
566        );
567        let result = runtime
568            .execute_script("call_onRequest", code)
569            .map_err(|e| {
570                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequest");
571                e.to_string()
572            })?;
573
574        let result = runtime.resolve(result).await.map_err(|e| {
575            Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequest");
576            e.to_string()
577        })?;
578
579        let (is_empty, modified_flow) = {
580            let mut scope = runtime.handle_scope();
581            let result_val = deno_core::v8::Local::new(&mut scope, result);
582
583            if result_val.is_undefined() || result_val.is_null() {
584                (true, None)
585            } else {
586                let deser: Result<Flow, _> = deno_core::serde_v8::from_v8(&mut scope, result_val);
587                drop(scope);
588                match deser {
589                    Ok(f) => (false, Some(f)),
590                    Err(e) => {
591                        let err_str = format!("Failed to deserialize flow: {}", e);
592                        Self::try_call_on_error(runtime, &flow, &err_str, "onRequest");
593                        return Err(err_str);
594                    }
595                }
596            }
597        };
598
599        if is_empty {
600            let resource = {
601                let op_state_rc = runtime.op_state();
602                let mut state = op_state_rc.borrow_mut();
603                state.resource_table.take::<HttpBodyResource>(rid).ok()
604            };
605            if let Some(res) = resource {
606                let body = crate::streams::create_body_from_resource(&res);
607                return Ok((None, RequestAction::Continue(body)));
608            } else {
609                return Ok((
610                    None,
611                    RequestAction::Continue(
612                        http_body_util::Empty::new()
613                            .map_err(|_| -> BoxError { unreachable!() })
614                            .boxed(),
615                    ),
616                ));
617            }
618        }
619
620        let modified_flow = modified_flow.unwrap();
621
622        let resource = {
623            let op_state_rc = runtime.op_state();
624            let mut state = op_state_rc.borrow_mut();
625            state.resource_table.take::<HttpBodyResource>(rid).ok()
626        };
627
628        let new_body: HttpBody = if let Some(res) = resource {
629            let has_new_body = if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer
630            {
631                http.request
632                    .body
633                    .as_ref()
634                    .map(|b| !b.content.is_empty())
635                    .unwrap_or(false)
636            } else {
637                false
638            };
639
640            if has_new_body {
641                if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
642                    if let Some(b) = &http.request.body {
643                        let bytes: Bytes = if b.encoding == "base64" {
644                            base64::engine::general_purpose::STANDARD
645                                .decode(&b.content)
646                                .unwrap_or_default()
647                                .into()
648                        } else {
649                            Bytes::from(b.content.clone())
650                        };
651                        Full::new(bytes)
652                            .map_err(|e| -> BoxError { e.into() })
653                            .boxed()
654                    } else {
655                        http_body_util::Empty::new()
656                            .map_err(|_| -> BoxError { unreachable!() })
657                            .boxed()
658                    }
659                } else {
660                    http_body_util::Empty::new()
661                        .map_err(|_| -> BoxError { unreachable!() })
662                        .boxed()
663                }
664            } else {
665                crate::streams::create_body_from_resource(&res)
666            }
667        } else if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
668            if let Some(b) = &http.request.body {
669                let bytes: Bytes = if b.encoding == "base64" {
670                    base64::engine::general_purpose::STANDARD
671                        .decode(&b.content)
672                        .unwrap_or_default()
673                        .into()
674                } else {
675                    Bytes::from(b.content.clone())
676                };
677                Full::new(bytes)
678                    .map_err(|e| -> BoxError { e.into() })
679                    .boxed()
680            } else {
681                http_body_util::Empty::new()
682                    .map_err(|_| -> BoxError { unreachable!() })
683                    .boxed()
684            }
685        } else {
686            http_body_util::Empty::new()
687                .map_err(|_| -> BoxError { unreachable!() })
688                .boxed()
689        };
690
691        Ok((Some(modified_flow), RequestAction::Continue(new_body)))
692    }
693
694    async fn handle_on_response(
695        runtime: &mut JsRuntime,
696        flow: Flow,
697        body: HttpBody,
698    ) -> Result<(Option<Flow>, ResponseAction), String> {
699        let resource = HttpBodyResource::new(body);
700        let rid = {
701            let op_state_rc = runtime.op_state();
702            let mut state = op_state_rc.borrow_mut();
703            state.resource_table.add(resource)
704        };
705
706        let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
707
708        let check_code = "typeof globalThis.onResponse === 'function'";
709        let exists = runtime
710            .execute_script("check_onResponse", check_code)
711            .map_err(|e| {
712                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponse");
713                e.to_string()
714            })?;
715
716        let exists_bool = {
717            let scope = &mut runtime.handle_scope();
718            let exists_val = deno_core::v8::Local::new(scope, exists);
719            exists_val.is_true()
720        };
721
722        if !exists_bool {
723            let resource = {
724                let op_state_rc = runtime.op_state();
725                let mut state = op_state_rc.borrow_mut();
726                state.resource_table.take::<HttpBodyResource>(rid).ok()
727            };
728            if let Some(res) = resource {
729                let body = crate::streams::create_body_from_resource(&res);
730                return Ok((None, ResponseAction::Continue(body)));
731            } else {
732                return Ok((
733                    None,
734                    ResponseAction::Continue(
735                        http_body_util::Empty::new()
736                            .map_err(|_| -> BoxError { unreachable!() })
737                            .boxed(),
738                    ),
739                ));
740            }
741        }
742
743        let code = format!(
744            "globalThis.onResponse(new RelayBody({}), {})",
745            rid, flow_json
746        );
747        let result = runtime
748            .execute_script("call_onResponse", code)
749            .map_err(|e| {
750                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponse");
751                e.to_string()
752            })?;
753        let result = runtime.resolve(result).await.map_err(|e| {
754            Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponse");
755            e.to_string()
756        })?;
757
758        let (is_empty, modified_flow) = {
759            let mut scope = runtime.handle_scope();
760            let result_val = deno_core::v8::Local::new(&mut scope, result);
761
762            if result_val.is_undefined() || result_val.is_null() {
763                (true, None)
764            } else {
765                let deser: Result<Flow, _> = deno_core::serde_v8::from_v8(&mut scope, result_val);
766                drop(scope);
767                match deser {
768                    Ok(f) => (false, Some(f)),
769                    Err(e) => {
770                        let err_str = format!("Failed to deserialize flow: {}", e);
771                        Self::try_call_on_error(runtime, &flow, &err_str, "onResponse");
772                        return Err(err_str);
773                    }
774                }
775            }
776        };
777
778        if is_empty {
779            let resource = {
780                let op_state_rc = runtime.op_state();
781                let mut state = op_state_rc.borrow_mut();
782                state.resource_table.take::<HttpBodyResource>(rid).ok()
783            };
784            if let Some(res) = resource {
785                let body = crate::streams::create_body_from_resource(&res);
786                return Ok((None, ResponseAction::Continue(body)));
787            } else {
788                return Ok((
789                    None,
790                    ResponseAction::Continue(
791                        http_body_util::Empty::new()
792                            .map_err(|_| -> BoxError { unreachable!() })
793                            .boxed(),
794                    ),
795                ));
796            }
797        }
798
799        let modified_flow = modified_flow.unwrap();
800
801        let resource = {
802            let op_state_rc = runtime.op_state();
803            let mut state = op_state_rc.borrow_mut();
804            state.resource_table.take::<HttpBodyResource>(rid).ok()
805        };
806
807        let new_body: HttpBody = if let Some(res) = resource {
808            let has_new_body = if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer
809            {
810                http.response
811                    .as_ref()
812                    .and_then(|r| r.body.as_ref())
813                    .map(|b| !b.content.is_empty())
814                    .unwrap_or(false)
815            } else {
816                false
817            };
818
819            if has_new_body {
820                if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
821                    if let Some(resp) = &http.response {
822                        if let Some(b) = &resp.body {
823                            let bytes: Bytes = if b.encoding == "base64" {
824                                base64::engine::general_purpose::STANDARD
825                                    .decode(&b.content)
826                                    .unwrap_or_default()
827                                    .into()
828                            } else {
829                                Bytes::from(b.content.clone())
830                            };
831                            Full::new(bytes)
832                                .map_err(|e| -> BoxError { e.into() })
833                                .boxed()
834                        } else {
835                            http_body_util::Empty::new()
836                                .map_err(|_| -> BoxError { unreachable!() })
837                                .boxed()
838                        }
839                    } else {
840                        http_body_util::Empty::new()
841                            .map_err(|_| -> BoxError { unreachable!() })
842                            .boxed()
843                    }
844                } else {
845                    http_body_util::Empty::new()
846                        .map_err(|_| -> BoxError { unreachable!() })
847                        .boxed()
848                }
849            } else {
850                crate::streams::create_body_from_resource(&res)
851            }
852        } else if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
853            if let Some(resp) = &http.response {
854                if let Some(b) = &resp.body {
855                    let bytes: Bytes = if b.encoding == "base64" {
856                        base64::engine::general_purpose::STANDARD
857                            .decode(&b.content)
858                            .unwrap_or_default()
859                            .into()
860                    } else {
861                        Bytes::from(b.content.clone())
862                    };
863                    Full::new(bytes)
864                        .map_err(|e| -> BoxError { e.into() })
865                        .boxed()
866                } else {
867                    http_body_util::Empty::new()
868                        .map_err(|_| -> BoxError { unreachable!() })
869                        .boxed()
870                }
871            } else {
872                http_body_util::Empty::new()
873                    .map_err(|_| -> BoxError { unreachable!() })
874                    .boxed()
875            }
876        } else {
877            http_body_util::Empty::new()
878                .map_err(|_| -> BoxError { unreachable!() })
879                .boxed()
880        };
881
882        Ok((Some(modified_flow), ResponseAction::Continue(new_body)))
883    }
884
885    fn handle_on_websocket_message(
886        runtime: &mut JsRuntime,
887        flow: Flow,
888        message: WebSocketMessage,
889    ) -> Result<WebSocketMessageAction, String> {
890        let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
891        let message_json = serde_json::to_string(&message).map_err(|e| e.to_string())?;
892
893        let check_code = "typeof globalThis.onWebSocketMessage === 'function'";
894        let exists = runtime
895            .execute_script("check_onWebSocketMessage", check_code)
896            .map_err(|e| {
897                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onWebSocketMessage");
898                e.to_string()
899            })?;
900        {
901            let mut scope = runtime.handle_scope();
902            let exists_val = deno_core::v8::Local::new(&mut scope, exists);
903            if !exists_val.is_true() {
904                return Ok(WebSocketMessageAction::Continue(message));
905            }
906        }
907
908        let code = format!(
909            "globalThis.onWebSocketMessage({{}}, {}, {})",
910            flow_json, message_json
911        );
912        let result = runtime
913            .execute_script("call_onWebSocketMessage", code)
914            .map_err(|e| {
915                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onWebSocketMessage");
916                e.to_string()
917            })?;
918
919        let mut scope = runtime.handle_scope();
920        let result_val = deno_core::v8::Local::new(&mut scope, result);
921
922        if result_val.is_undefined() || result_val.is_null() {
923            return Ok(WebSocketMessageAction::Continue(message));
924        }
925
926        if result_val.is_string() {
927            let s = result_val.to_rust_string_lossy(&mut scope);
928            if s == "DROP" {
929                return Ok(WebSocketMessageAction::Drop);
930            }
931        }
932
933        let deser: Result<WebSocketMessage, _> =
934            deno_core::serde_v8::from_v8(&mut scope, result_val);
935        drop(scope);
936        let modified_message = match deser {
937            Ok(m) => m,
938            Err(e) => {
939                let err_str = format!("Failed to deserialize message: {}", e);
940                Self::try_call_on_error(runtime, &flow, &err_str, "onWebSocketMessage");
941                return Err(err_str);
942            }
943        };
944
945        Ok(WebSocketMessageAction::Continue(modified_message))
946    }
947}
948
949#[async_trait]
950impl ScriptEngineTrait for DenoScriptEngine {
951    async fn load_script(&mut self, script: &str) -> Result<(), BoxError> {
952        let (tx, rx) = oneshot::channel();
953        self.tx
954            .send(DenoCommand::LoadScript(script.to_string(), tx))
955            .await
956            .map_err(|e| Box::new(e) as BoxError)?;
957        rx.await
958            .map_err(|e| Box::new(e) as BoxError)?
959            .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)
960    }
961
962    async fn on_request_headers(&self, flow: &mut Flow) -> Result<Option<Flow>, BoxError> {
963        let (tx, rx) = oneshot::channel();
964        let flow_clone = flow.clone();
965        self.tx
966            .send(DenoCommand::OnRequestHeaders(flow_clone, tx))
967            .await
968            .map_err(|e| Box::new(e) as BoxError)?;
969        let res = rx
970            .await
971            .map_err(|e| Box::new(e) as BoxError)?
972            .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
973
974        if let Some(new_flow) = &res {
975            *flow = new_flow.clone();
976        }
977        Ok(res)
978    }
979
980    async fn on_request(&self, flow: &mut Flow, body: HttpBody) -> Result<RequestAction, BoxError> {
981        let (tx, rx) = oneshot::channel();
982        let flow_clone = flow.clone();
983        self.tx
984            .send(DenoCommand::OnRequest(flow_clone, body, tx))
985            .await
986            .map_err(|e| Box::new(e) as BoxError)?;
987        let (new_flow, action) = rx
988            .await
989            .map_err(|e| Box::new(e) as BoxError)?
990            .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
991
992        if let Some(f) = new_flow {
993            *flow = f;
994        }
995        Ok(action)
996    }
997
998    async fn on_response_headers(&self, flow: &mut Flow) -> Result<Option<Flow>, BoxError> {
999        let (tx, rx) = oneshot::channel();
1000        let flow_clone = flow.clone();
1001        self.tx
1002            .send(DenoCommand::OnResponseHeaders(flow_clone, tx))
1003            .await
1004            .map_err(|e| Box::new(e) as BoxError)?;
1005        let res = rx
1006            .await
1007            .map_err(|e| Box::new(e) as BoxError)?
1008            .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
1009
1010        if let Some(new_flow) = &res {
1011            *flow = new_flow.clone();
1012        }
1013        Ok(res)
1014    }
1015
1016    async fn on_response(
1017        &self,
1018        flow: &mut Flow,
1019        body: HttpBody,
1020    ) -> Result<ResponseAction, BoxError> {
1021        let (tx, rx) = oneshot::channel();
1022        let flow_clone = flow.clone();
1023        self.tx
1024            .send(DenoCommand::OnResponse(flow_clone, body, tx))
1025            .await
1026            .map_err(|e| Box::new(e) as BoxError)?;
1027        let (new_flow, action) = rx
1028            .await
1029            .map_err(|e| Box::new(e) as BoxError)?
1030            .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
1031
1032        if let Some(f) = new_flow {
1033            *flow = f;
1034        }
1035        Ok(action)
1036    }
1037
1038    async fn on_websocket_message(
1039        &self,
1040        _flow: &mut Flow,
1041        message: &mut WebSocketMessage,
1042    ) -> Result<WebSocketMessageAction, BoxError> {
1043        let (tx, rx) = oneshot::channel();
1044        let flow_clone = _flow.clone();
1045        let message_clone = message.clone();
1046        self.tx
1047            .send(DenoCommand::OnWebSocketMessage(
1048                flow_clone,
1049                message_clone,
1050                tx,
1051            ))
1052            .await
1053            .map_err(|e| Box::new(e) as BoxError)?;
1054        let res = rx
1055            .await
1056            .map_err(|e| Box::new(e) as BoxError)?
1057            .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
1058
1059        Ok(res)
1060    }
1061}