Skip to main content

relay_core_script/
deno_engine.rs

1use crate::engine_trait::ScriptEngineTrait;
2use crate::streams::HttpBodyResource;
3use async_trait::async_trait;
4use base64::Engine as _;
5use bytes::Bytes;
6use deno_core::{
7    Extension, JsRuntime, Op, OpState, ResourceId, RuntimeOptions, error::AnyError, op2,
8};
9use http_body_util::{BodyExt, Full};
10use relay_core_api::flow::{Flow, WebSocketMessage};
11use relay_core_lib::interceptor::{
12    BoxError, ConnectAction, ConnectionInfo, ConnectionStats, HttpBody, RequestAction,
13    ResponseAction, WebSocketMessageAction,
14};
15use std::cell::RefCell;
16use std::collections::{HashMap, HashSet};
17use std::rc::Rc;
18use std::thread;
19use tokio::sync::{mpsc, oneshot};
20
21#[op2(fast)]
22fn op_log_level(#[string] level: String, #[string] msg: String) {
23    match level.as_str() {
24        "error" => tracing::error!("[User Script] {}", msg),
25        "warn" => tracing::warn!("[User Script] {}", msg),
26        "info" => tracing::info!("[User Script] {}", msg),
27        "debug" => tracing::debug!("[User Script] {}", msg),
28        _ => tracing::info!("[User Script] {}", msg),
29    }
30}
31
32#[op2(async)]
33#[serde]
34async fn op_read_body(
35    state: Rc<RefCell<OpState>>,
36    #[smi] rid: ResourceId,
37    #[smi] limit: usize,
38) -> Result<Vec<u8>, AnyError> {
39    let resource = {
40        let state = state.borrow();
41        state.resource_table.get_any(rid)?
42    };
43    let view = resource.read(limit).await?;
44    Ok(view.to_vec())
45}
46
47#[op2(fast)]
48fn op_close_body(state: &mut OpState, #[smi] rid: ResourceId) {
49    state.resource_table.take_any(rid).ok();
50}
51
52// ── S1: sharedState ops ────────────────────────────────────────
53
54/// Soft cap for sharedState keys — exceeded triggers a warning but does not block.
55const SHARED_STATE_SOFT_CAP: usize = 10_000;
56
57fn shared_state(state: &mut OpState) -> &mut HashMap<String, serde_json::Value> {
58    state.borrow_mut::<HashMap<String, serde_json::Value>>()
59}
60
61#[op2]
62#[serde]
63fn op_shared_state_get(state: &mut OpState, #[string] key: String) -> Option<serde_json::Value> {
64    shared_state(state).get(&key).cloned()
65}
66
67#[op2]
68fn op_shared_state_set(
69    state: &mut OpState,
70    #[string] key: String,
71    #[serde] value: serde_json::Value,
72) {
73    let map = shared_state(state);
74    // Soft cap warning — does not block, but alerts users to clean up
75    if !map.contains_key(&key) && map.len() >= SHARED_STATE_SOFT_CAP {
76        tracing::warn!(
77            "sharedState exceeds soft cap ({} keys): user scripts should delete unused keys. \
78             Use sharedState.size() to check.",
79            map.len()
80        );
81    }
82    map.insert(key, value);
83}
84
85#[op2(fast)]
86fn op_shared_state_delete(state: &mut OpState, #[string] key: String) -> bool {
87    shared_state(state).remove(&key).is_some()
88}
89
90#[op2(fast)]
91fn op_shared_state_clear(state: &mut OpState) {
92    shared_state(state).clear();
93}
94
95#[op2]
96#[serde]
97fn op_shared_state_keys(state: &mut OpState) -> Vec<String> {
98    shared_state(state).keys().cloned().collect()
99}
100
101#[op2(fast)]
102fn op_shared_state_size(state: &OpState) -> u32 {
103    let map = state.borrow::<HashMap<String, serde_json::Value>>();
104    map.len() as u32
105}
106
107// ── S5: relay.env — whitelisted environment variable access ────
108
109/// Counter for env access attempts — exposed via Prometheus
110static SCRIPT_ENV_ACCESS_TOTAL: std::sync::atomic::AtomicUsize =
111    std::sync::atomic::AtomicUsize::new(0);
112
113fn env_allow(state: &OpState) -> &HashSet<String> {
114    state.borrow::<HashSet<String>>()
115}
116
117#[op2]
118#[string]
119fn op_env_get(state: &OpState, #[string] key: String) -> Option<String> {
120    let allowed = env_allow(state);
121    // Increment access counter regardless of allow/deny
122    SCRIPT_ENV_ACCESS_TOTAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
123    if allowed.is_empty() || !allowed.contains(&key) {
124        return None;
125    }
126    std::env::var(&key).ok()
127}
128
129/// Get the total number of relay.env access attempts
130pub fn get_script_env_access_total() -> usize {
131    SCRIPT_ENV_ACCESS_TOTAL.load(std::sync::atomic::Ordering::Relaxed)
132}
133
134// ── S7a: relay.fetch — async sub-requests (default disabled) ─────
135
136/// Configuration for relay.fetch sub-requests.
137#[derive(Clone)]
138pub struct ScriptFetchConfig {
139    /// Whether relay.fetch is enabled at all.
140    pub enabled: bool,
141    /// Allowed target hostnames. Empty means all allowed (if enabled).
142    pub allow_hosts: HashSet<String>,
143    /// Maximum concurrent fetch requests (semaphore permits).
144    pub max_concurrency: usize,
145    /// Timeout per request in milliseconds.
146    pub timeout_ms: u64,
147    /// Proxy listen port — used to prevent recursive fetch to self.
148    pub proxy_listen_port: u16,
149}
150
151impl Default for ScriptFetchConfig {
152    fn default() -> Self {
153        Self {
154            enabled: false,
155            allow_hosts: HashSet::new(),
156            max_concurrency: 8,
157            timeout_ms: 5000,
158            proxy_listen_port: 0,
159        }
160    }
161}
162
163fn fetch_config(state: &OpState) -> &ScriptFetchConfig {
164    state.borrow::<ScriptFetchConfig>()
165}
166
167/// Metrics for relay.fetch
168static SCRIPT_FETCH_TOTAL: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
169static SCRIPT_FETCH_REJECTED_TOTAL: std::sync::atomic::AtomicUsize =
170    std::sync::atomic::AtomicUsize::new(0);
171
172/// Fetch validation result — always returns a JSON string (never throws)
173/// so it works in synchronous onRequestHeaders context.
174#[op2]
175#[string]
176fn op_script_fetch(state: &OpState, #[string] url: String) -> String {
177    SCRIPT_FETCH_TOTAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
178    let config = fetch_config(state);
179
180    if !config.enabled {
181        SCRIPT_FETCH_REJECTED_TOTAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
182        return serde_json::json!({"ok": false, "error": "script fetch disabled"}).to_string();
183    }
184
185    if !config.allow_hosts.is_empty()
186        && let Ok(parsed) = url::Url::parse(&url)
187        && let Some(host) = parsed.host_str()
188        && !config.allow_hosts.contains(host)
189    {
190        SCRIPT_FETCH_REJECTED_TOTAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
191        return serde_json::json!({"ok": false, "error": "host not in allowlist"}).to_string();
192    }
193
194    if let Ok(parsed) = url::Url::parse(&url)
195        && let Some(port) = parsed.port_or_known_default()
196        && port == config.proxy_listen_port
197        && matches!(parsed.scheme(), "http" | "https")
198    {
199        let host = parsed.host_str().unwrap_or("");
200        if host == "localhost" || host == "127.0.0.1" || host == "::1" {
201            SCRIPT_FETCH_REJECTED_TOTAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
202            return serde_json::json!({"ok": false, "error": "recursive fetch to self rejected"})
203                .to_string();
204        }
205    }
206
207    // Minimal HTTP GET client for relay.fetch. Uses ureq (blocking) for simplicity.
208    // The V8 isolate runs on its own dedicated thread, so blocking is acceptable.
209    // Async sub-requests with response streaming are deferred to 1.x.
210    let timeout = std::time::Duration::from_millis(config.timeout_ms);
211    let agent = ureq::AgentBuilder::new()
212        .timeout_read(timeout)
213        .timeout_connect(timeout)
214        .build();
215    match agent.get(&url).call() {
216        Ok(resp) => {
217            let status = resp.status();
218            let body = resp.into_string().unwrap_or_default();
219            serde_json::json!({"ok": true, "status": status, "body": body}).to_string()
220        }
221        Err(ureq::Error::Status(code, resp)) => {
222            let body = resp.into_string().unwrap_or_default();
223            serde_json::json!({"ok": false, "status": code, "body": body, "error": format!("HTTP {}", code)}).to_string()
224        }
225        Err(e) => {
226            SCRIPT_FETCH_REJECTED_TOTAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
227            serde_json::json!({"ok": false, "error": e.to_string()}).to_string()
228        }
229    }
230}
231
232/// Get the total number of relay.fetch attempts
233pub fn get_script_fetch_total() -> usize {
234    SCRIPT_FETCH_TOTAL.load(std::sync::atomic::Ordering::Relaxed)
235}
236
237/// Get the total number of rejected relay.fetch attempts
238pub fn get_script_fetch_rejected_total() -> usize {
239    SCRIPT_FETCH_REJECTED_TOTAL.load(std::sync::atomic::Ordering::Relaxed)
240}
241
242// ── S6: relay utility ops — uuid, hash, base64, json ──────────
243
244#[op2]
245#[string]
246fn op_uuid_v4() -> String {
247    uuid::Uuid::new_v4().to_string()
248}
249
250#[op2]
251#[string]
252fn op_hash(#[string] algorithm: String, #[string] data: String) -> Result<String, AnyError> {
253    use sha1::Sha1;
254    use sha2::{Sha256, Sha512};
255    let bytes = data.as_bytes();
256    let hex = match algorithm.as_str() {
257        "sha1" => {
258            use sha1::Digest;
259            data_encoding::HEXLOWER.encode(&Sha1::digest(bytes))
260        }
261        "sha256" => {
262            use sha2::Digest;
263            data_encoding::HEXLOWER.encode(&Sha256::digest(bytes))
264        }
265        "sha512" => {
266            use sha2::Digest;
267            data_encoding::HEXLOWER.encode(&Sha512::digest(bytes))
268        }
269        "md5" => {
270            use md5::Digest;
271            data_encoding::HEXLOWER.encode(&md5::Md5::digest(bytes))
272        }
273        other => {
274            return Err(AnyError::msg(format!(
275                "unsupported hash algorithm: {}. Supported: sha1, sha256, sha512, md5",
276                other
277            )));
278        }
279    };
280    Ok(hex)
281}
282
283#[op2]
284#[string]
285fn op_base64_encode(#[string] data: String) -> String {
286    use base64::Engine as _;
287    base64::engine::general_purpose::STANDARD.encode(data.as_bytes())
288}
289
290#[op2]
291#[string]
292fn op_base64_decode(#[string] data: String) -> Result<String, AnyError> {
293    use base64::Engine as _;
294    let bytes = base64::engine::general_purpose::STANDARD
295        .decode(data.as_bytes())
296        .map_err(|e| AnyError::msg(format!("base64 decode error: {}", e)))?;
297    String::from_utf8(bytes).map_err(|e| AnyError::msg(format!("utf-8 error: {}", e)))
298}
299
300#[op2]
301#[serde]
302fn op_json_parse_safe(#[string] data: String) -> serde_json::Value {
303    serde_json::from_str(&data).unwrap_or(serde_json::Value::Null)
304}
305
306#[op2]
307#[string]
308fn op_json_stringify_pretty(#[serde] value: serde_json::Value) -> String {
309    serde_json::to_string_pretty(&value).unwrap_or_else(|_| String::new())
310}
311
312enum DenoCommand {
313    LoadScript(String, oneshot::Sender<Result<(), String>>),
314    OnConnect(
315        ConnectionInfo,
316        oneshot::Sender<Result<ConnectAction, String>>,
317    ),
318    OnDisconnect(
319        ConnectionInfo,
320        ConnectionStats,
321        oneshot::Sender<Result<(), String>>,
322    ),
323    OnRequestHeaders(Flow, oneshot::Sender<Result<Option<Flow>, String>>),
324    OnRequest(
325        Flow,
326        HttpBody,
327        oneshot::Sender<Result<(Option<Flow>, RequestAction), String>>,
328    ),
329    OnResponseHeaders(Flow, oneshot::Sender<Result<Option<Flow>, String>>),
330    OnResponse(
331        Flow,
332        HttpBody,
333        oneshot::Sender<Result<(Option<Flow>, ResponseAction), String>>,
334    ),
335    OnWebSocketMessage(
336        Flow,
337        WebSocketMessage,
338        oneshot::Sender<Result<WebSocketMessageAction, String>>,
339    ),
340    OnWebSocketStart(Flow, oneshot::Sender<Result<Option<Flow>, String>>),
341    OnWebSocketEnd(
342        Flow,
343        u16,
344        String,
345        oneshot::Sender<Result<Option<Flow>, String>>,
346    ),
347    OnWebSocketError(Flow, String, oneshot::Sender<Result<Option<Flow>, String>>),
348}
349
350#[derive(Clone)]
351pub struct DenoScriptEngine {
352    tx: mpsc::Sender<DenoCommand>,
353}
354
355impl Default for DenoScriptEngine {
356    fn default() -> Self {
357        Self::new(HashSet::new())
358    }
359}
360
361impl DenoScriptEngine {
362    pub fn new(env_allow: HashSet<String>) -> Self {
363        Self::new_with_fetch(env_allow, ScriptFetchConfig::default())
364    }
365
366    pub fn new_with_fetch(env_allow: HashSet<String>, fetch_config: ScriptFetchConfig) -> Self {
367        let (tx, mut rx) = mpsc::channel(32);
368
369        thread::spawn(move || {
370            let rt = tokio::runtime::Builder::new_current_thread()
371                .enable_all()
372                .build()
373                .unwrap();
374
375            rt.block_on(async move {
376                let env_allow = env_allow; // capture into async block
377                let ext = Extension {
378                    name: "relay_core",
379                    ops: std::borrow::Cow::Borrowed(&[
380                        op_log_level::DECL,
381                        op_read_body::DECL,
382                        op_close_body::DECL,
383                        op_shared_state_get::DECL,
384                        op_shared_state_set::DECL,
385                        op_shared_state_delete::DECL,
386                        op_shared_state_clear::DECL,
387                        op_shared_state_keys::DECL,
388                        op_shared_state_size::DECL,
389                        op_env_get::DECL,
390                        op_uuid_v4::DECL,
391                        op_hash::DECL,
392                        op_base64_encode::DECL,
393                        op_base64_decode::DECL,
394                        op_json_parse_safe::DECL,
395                        op_json_stringify_pretty::DECL,
396                        op_script_fetch::DECL,
397                    ]),
398                    op_state_fn: Some(Box::new({
399                        let env_allow = env_allow.clone();
400                        let fetch_config = fetch_config.clone();
401                        move |state| {
402                            state.put(HashMap::<String, serde_json::Value>::new());
403                            state.put(env_allow.clone());
404                            state.put(fetch_config.clone());
405                        }
406                    })),
407                    ..Default::default()
408                };
409
410                let mut js_runtime = JsRuntime::new(RuntimeOptions {
411                    extensions: vec![ext],
412                    ..Default::default()
413                });
414
415                // Bootstrap JS — S1/S3: sharedState + console levels
416                let bootstrap = r#"
417                    globalThis.console = {
418                        log: (...args) => {
419                            Deno.core.ops.op_log_level("log", _format(args));
420                        },
421                        info: (...args) => {
422                            Deno.core.ops.op_log_level("info", _format(args));
423                        },
424                        warn: (...args) => {
425                            Deno.core.ops.op_log_level("warn", _format(args));
426                        },
427                        error: (...args) => {
428                            Deno.core.ops.op_log_level("error", _format(args));
429                        },
430                        debug: (...args) => {
431                            Deno.core.ops.op_log_level("debug", _format(args));
432                        },
433                    };
434
435                    function _format(args) {
436                        return args.map(arg => {
437                            if (typeof arg === 'object') {
438                                try { return JSON.stringify(arg); }
439                                catch { return String(arg); }
440                            }
441                            return String(arg);
442                        }).join(" ");
443                    }
444
445                    class RelayBody {
446                        constructor(rid) { this.rid = rid; }
447                        async read(limit) {
448                            return await Deno.core.ops.op_read_body(this.rid, limit || 65536);
449                        }
450                        close() {
451                            Deno.core.ops.op_close_body(this.rid);
452                        }
453                        async text() {
454                            const bytes = await this.read(10 * 1024 * 1024);
455                            return new TextDecoder().decode(bytes);
456                        }
457                        async json() {
458                            const txt = await this.text();
459                            return JSON.parse(txt);
460                        }
461                    }
462                    globalThis.RelayBody = RelayBody;
463
464                    globalThis.relay = {
465                        log: globalThis.console.log,
466                        env: function(name) {
467                            return Deno.core.ops.op_env_get(name) ?? undefined;
468                        },
469                        uuid: function() {
470                            return Deno.core.ops.op_uuid_v4();
471                        },
472                        hash: function(alg, data) {
473                            return Deno.core.ops.op_hash(alg, data);
474                        },
475                        base64: {
476                            encode: function(data) {
477                                return Deno.core.ops.op_base64_encode(data);
478                            },
479                            decode: function(data) {
480                                return Deno.core.ops.op_base64_decode(data);
481                            },
482                        },
483                        json: {
484                            parseSafe: function(str) {
485                                return Deno.core.ops.op_json_parse_safe(str);
486                            },
487                            stringifyPretty: function(obj) {
488                                return Deno.core.ops.op_json_stringify_pretty(obj);
489                            },
490                        },
491                        fetch: function(url) {
492                            return JSON.parse(Deno.core.ops.op_script_fetch(url));
493                        },
494                    };
495
496                    // S12a: ctx.setTag / ctx.setVariable (script→rule injection) deferred to 1.x.
497                    // Cross-thread synchronous V8 callback into the rule execution engine would
498                    // require architectural changes that risk rule engine atomicity.
499                    // Script-side rule context reading (flow.matched_rules, flow.rule_variables)
500                    // is fully supported (S10a/S11).
501
502                    // S1: sharedState — cross-hook shared map per isolate
503                    globalThis.sharedState = {
504                        get(key) {
505                            return Deno.core.ops.op_shared_state_get(key);
506                        },
507                        set(key, value) {
508                            Deno.core.ops.op_shared_state_set(key, value);
509                        },
510                        delete(key) {
511                            return Deno.core.ops.op_shared_state_delete(key);
512                        },
513                        clear() {
514                            Deno.core.ops.op_shared_state_clear();
515                        },
516                        keys() {
517                            return Deno.core.ops.op_shared_state_keys();
518                        },
519                        size() {
520                            return Deno.core.ops.op_shared_state_size();
521                        },
522                    };
523                "#;
524                js_runtime.execute_script("bootstrap", bootstrap).unwrap();
525
526                while let Some(cmd) = rx.recv().await {
527                    match cmd {
528                        DenoCommand::LoadScript(script, resp) => {
529                            let res = js_runtime.execute_script("<anon>", script);
530                            let res = if let Err(e) = res {
531                                Err(e.to_string())
532                            } else {
533                                js_runtime
534                                    .run_event_loop(Default::default())
535                                    .await
536                                    .map(|_| ())
537                                    .map_err(|e| e.to_string())
538                            };
539                            let _ = resp.send(res);
540                        }
541                        DenoCommand::OnRequestHeaders(flow, resp) => {
542                            let res = Self::handle_on_request_headers(&mut js_runtime, flow);
543                            let _ = resp.send(res);
544                        }
545                        DenoCommand::OnRequest(flow, body, resp) => {
546                            let res = Self::handle_on_request(&mut js_runtime, flow, body).await;
547                            let _ = resp.send(res);
548                        }
549                        DenoCommand::OnResponseHeaders(flow, resp) => {
550                            let res = Self::handle_on_response_headers(&mut js_runtime, flow);
551                            let _ = resp.send(res);
552                        }
553                        DenoCommand::OnResponse(flow, body, resp) => {
554                            let res = Self::handle_on_response(&mut js_runtime, flow, body).await;
555                            let _ = resp.send(res);
556                        }
557                        DenoCommand::OnWebSocketMessage(flow, message, resp) => {
558                            let res =
559                                Self::handle_on_websocket_message(&mut js_runtime, flow, message);
560                            let _ = resp.send(res);
561                        }
562                        DenoCommand::OnConnect(conn, resp) => {
563                            let res = Self::handle_on_connect(&mut js_runtime, conn);
564                            let _ = resp.send(res);
565                        }
566                        DenoCommand::OnDisconnect(conn, stats, resp) => {
567                            let res = Self::handle_on_disconnect(&mut js_runtime, conn, stats);
568                            let _ = resp.send(res);
569                        }
570                        DenoCommand::OnWebSocketStart(flow, resp) => {
571                            let res = Self::handle_on_websocket_start(&mut js_runtime, flow);
572                            let _ = resp.send(res);
573                        }
574                        DenoCommand::OnWebSocketEnd(flow, code, reason, resp) => {
575                            let res =
576                                Self::handle_on_websocket_end(&mut js_runtime, flow, code, reason);
577                            let _ = resp.send(res);
578                        }
579                        DenoCommand::OnWebSocketError(flow, error, resp) => {
580                            let res = Self::handle_on_websocket_error(&mut js_runtime, flow, error);
581                            let _ = resp.send(res);
582                        }
583                    }
584                }
585            });
586        });
587
588        Self { tx }
589    }
590
591    // ── S2: onError dispatch ─────────────────────────────────
592
593    fn try_call_on_error(runtime: &mut JsRuntime, flow: &Flow, error: &str, stage: &str) {
594        let check_code = "typeof globalThis.onError === 'function'";
595        let exists = runtime
596            .execute_script("check_onError_v2", check_code)
597            .ok()
598            .map(|v| {
599                let mut scope = runtime.handle_scope();
600                let val = deno_core::v8::Local::new(&mut scope, v);
601                val.is_true()
602            })
603            .unwrap_or(false);
604
605        if !exists {
606            return;
607        }
608
609        let flow_json = match serde_json::to_string(flow) {
610            Ok(j) => j,
611            Err(_) => return,
612        };
613        let error_escaped = error.replace('\\', "\\\\").replace('\'', "\\'");
614        let code = format!(
615            "globalThis.onError({{}}, {}, '{}', '{}')",
616            flow_json, error_escaped, stage
617        );
618
619        let _ = runtime.execute_script("call_onError_v2", code);
620    }
621
622    fn handle_on_request_headers(
623        runtime: &mut JsRuntime,
624        flow: Flow,
625    ) -> Result<Option<Flow>, String> {
626        let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
627        let check_code = "typeof globalThis.onRequestHeaders === 'function'";
628        let exists = runtime
629            .execute_script("check_onRequestHeaders", check_code)
630            .map_err(|e| {
631                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequestHeaders");
632                e.to_string()
633            })?;
634        {
635            let mut scope = runtime.handle_scope();
636            let exists_val = deno_core::v8::Local::new(&mut scope, exists);
637            if !exists_val.is_true() {
638                return Ok(None);
639            }
640        }
641        let code = format!("globalThis.onRequestHeaders({{}}, {})", flow_json);
642        let result = runtime
643            .execute_script("call_onRequestHeaders", code)
644            .map_err(|e| {
645                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequestHeaders");
646                e.to_string()
647            })?;
648        let mut scope = runtime.handle_scope();
649        let result_val = deno_core::v8::Local::new(&mut scope, result);
650        if result_val.is_undefined() || result_val.is_null() {
651            return Ok(None);
652        }
653        let deser: Result<Flow, _> = deno_core::serde_v8::from_v8(&mut scope, result_val);
654        drop(scope);
655        let modified_flow = match deser {
656            Ok(f) => f,
657            Err(e) => {
658                let err_str = format!("Failed to deserialize flow: {}", e);
659                Self::try_call_on_error(runtime, &flow, &err_str, "onRequestHeaders");
660                return Err(err_str);
661            }
662        };
663        Ok(Some(modified_flow))
664    }
665
666    fn handle_on_response_headers(
667        runtime: &mut JsRuntime,
668        flow: Flow,
669    ) -> Result<Option<Flow>, String> {
670        let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
671        let check_code = "typeof globalThis.onResponseHeaders === 'function'";
672        let exists = runtime
673            .execute_script("check_onResponseHeaders", check_code)
674            .map_err(|e| {
675                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponseHeaders");
676                e.to_string()
677            })?;
678        {
679            let mut scope = runtime.handle_scope();
680            let exists_val = deno_core::v8::Local::new(&mut scope, exists);
681            if !exists_val.is_true() {
682                return Ok(None);
683            }
684        }
685        let code = format!("globalThis.onResponseHeaders({{}}, {})", flow_json);
686        let result = runtime
687            .execute_script("call_onResponseHeaders", code)
688            .map_err(|e| {
689                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponseHeaders");
690                e.to_string()
691            })?;
692        let mut scope = runtime.handle_scope();
693        let result_val = deno_core::v8::Local::new(&mut scope, result);
694        if result_val.is_undefined() || result_val.is_null() {
695            return Ok(None);
696        }
697        let deser: Result<Flow, _> = deno_core::serde_v8::from_v8(&mut scope, result_val);
698        drop(scope);
699        let modified_flow = match deser {
700            Ok(f) => f,
701            Err(e) => {
702                let err_str = format!("Failed to deserialize flow: {}", e);
703                Self::try_call_on_error(runtime, &flow, &err_str, "onResponseHeaders");
704                return Err(err_str);
705            }
706        };
707        Ok(Some(modified_flow))
708    }
709
710    async fn handle_on_request(
711        runtime: &mut JsRuntime,
712        flow: Flow,
713        body: HttpBody,
714    ) -> Result<(Option<Flow>, RequestAction), String> {
715        let resource = HttpBodyResource::new(body);
716        let rid = {
717            let op_state_rc = runtime.op_state();
718            let mut state = op_state_rc.borrow_mut();
719            state.resource_table.add(resource)
720        };
721
722        let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
723
724        let check_code = "typeof globalThis.onRequest === 'function'";
725        let exists = runtime
726            .execute_script("check_onRequest", check_code)
727            .map_err(|e| {
728                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequest");
729                e.to_string()
730            })?;
731
732        let exists_bool = {
733            let scope = &mut runtime.handle_scope();
734            let exists_val = deno_core::v8::Local::new(scope, exists);
735            exists_val.is_true()
736        };
737
738        if !exists_bool {
739            let resource = {
740                let op_state_rc = runtime.op_state();
741                let mut state = op_state_rc.borrow_mut();
742                state.resource_table.take::<HttpBodyResource>(rid).ok()
743            };
744            if let Some(res) = resource {
745                let body = crate::streams::create_body_from_resource(&res);
746                return Ok((None, RequestAction::Continue(body)));
747            } else {
748                return Ok((
749                    None,
750                    RequestAction::Continue(
751                        http_body_util::Empty::new()
752                            .map_err(|_| -> BoxError { unreachable!() })
753                            .boxed(),
754                    ),
755                ));
756            }
757        }
758
759        let code = format!(
760            "globalThis.onRequest(new RelayBody({}), {})",
761            rid, flow_json
762        );
763        let result = runtime
764            .execute_script("call_onRequest", code)
765            .map_err(|e| {
766                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequest");
767                e.to_string()
768            })?;
769
770        let result = runtime.resolve(result).await.map_err(|e| {
771            Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequest");
772            e.to_string()
773        })?;
774
775        let (is_empty, modified_flow) = {
776            let mut scope = runtime.handle_scope();
777            let result_val = deno_core::v8::Local::new(&mut scope, result);
778
779            if result_val.is_undefined() || result_val.is_null() {
780                (true, None)
781            } else {
782                let deser: Result<Flow, _> = deno_core::serde_v8::from_v8(&mut scope, result_val);
783                drop(scope);
784                match deser {
785                    Ok(f) => (false, Some(f)),
786                    Err(e) => {
787                        let err_str = format!("Failed to deserialize flow: {}", e);
788                        Self::try_call_on_error(runtime, &flow, &err_str, "onRequest");
789                        return Err(err_str);
790                    }
791                }
792            }
793        };
794
795        if is_empty {
796            let resource = {
797                let op_state_rc = runtime.op_state();
798                let mut state = op_state_rc.borrow_mut();
799                state.resource_table.take::<HttpBodyResource>(rid).ok()
800            };
801            if let Some(res) = resource {
802                let body = crate::streams::create_body_from_resource(&res);
803                return Ok((None, RequestAction::Continue(body)));
804            } else {
805                return Ok((
806                    None,
807                    RequestAction::Continue(
808                        http_body_util::Empty::new()
809                            .map_err(|_| -> BoxError { unreachable!() })
810                            .boxed(),
811                    ),
812                ));
813            }
814        }
815
816        let modified_flow = modified_flow.unwrap();
817
818        let resource = {
819            let op_state_rc = runtime.op_state();
820            let mut state = op_state_rc.borrow_mut();
821            state.resource_table.take::<HttpBodyResource>(rid).ok()
822        };
823
824        let new_body: HttpBody = if let Some(res) = resource {
825            let has_new_body = if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer
826            {
827                http.request
828                    .body
829                    .as_ref()
830                    .map(|b| !b.content.is_empty())
831                    .unwrap_or(false)
832            } else {
833                false
834            };
835
836            if has_new_body {
837                if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
838                    if let Some(b) = &http.request.body {
839                        let bytes: Bytes = if b.encoding == "base64" {
840                            base64::engine::general_purpose::STANDARD
841                                .decode(&b.content)
842                                .unwrap_or_default()
843                                .into()
844                        } else {
845                            Bytes::from(b.content.clone())
846                        };
847                        Full::new(bytes)
848                            .map_err(|e| -> BoxError { e.into() })
849                            .boxed()
850                    } else {
851                        http_body_util::Empty::new()
852                            .map_err(|_| -> BoxError { unreachable!() })
853                            .boxed()
854                    }
855                } else {
856                    http_body_util::Empty::new()
857                        .map_err(|_| -> BoxError { unreachable!() })
858                        .boxed()
859                }
860            } else {
861                crate::streams::create_body_from_resource(&res)
862            }
863        } else if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
864            if let Some(b) = &http.request.body {
865                let bytes: Bytes = if b.encoding == "base64" {
866                    base64::engine::general_purpose::STANDARD
867                        .decode(&b.content)
868                        .unwrap_or_default()
869                        .into()
870                } else {
871                    Bytes::from(b.content.clone())
872                };
873                Full::new(bytes)
874                    .map_err(|e| -> BoxError { e.into() })
875                    .boxed()
876            } else {
877                http_body_util::Empty::new()
878                    .map_err(|_| -> BoxError { unreachable!() })
879                    .boxed()
880            }
881        } else {
882            http_body_util::Empty::new()
883                .map_err(|_| -> BoxError { unreachable!() })
884                .boxed()
885        };
886
887        Ok((Some(modified_flow), RequestAction::Continue(new_body)))
888    }
889
890    async fn handle_on_response(
891        runtime: &mut JsRuntime,
892        flow: Flow,
893        body: HttpBody,
894    ) -> Result<(Option<Flow>, ResponseAction), String> {
895        let resource = HttpBodyResource::new(body);
896        let rid = {
897            let op_state_rc = runtime.op_state();
898            let mut state = op_state_rc.borrow_mut();
899            state.resource_table.add(resource)
900        };
901
902        let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
903
904        let check_code = "typeof globalThis.onResponse === 'function'";
905        let exists = runtime
906            .execute_script("check_onResponse", check_code)
907            .map_err(|e| {
908                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponse");
909                e.to_string()
910            })?;
911
912        let exists_bool = {
913            let scope = &mut runtime.handle_scope();
914            let exists_val = deno_core::v8::Local::new(scope, exists);
915            exists_val.is_true()
916        };
917
918        if !exists_bool {
919            let resource = {
920                let op_state_rc = runtime.op_state();
921                let mut state = op_state_rc.borrow_mut();
922                state.resource_table.take::<HttpBodyResource>(rid).ok()
923            };
924            if let Some(res) = resource {
925                let body = crate::streams::create_body_from_resource(&res);
926                return Ok((None, ResponseAction::Continue(body)));
927            } else {
928                return Ok((
929                    None,
930                    ResponseAction::Continue(
931                        http_body_util::Empty::new()
932                            .map_err(|_| -> BoxError { unreachable!() })
933                            .boxed(),
934                    ),
935                ));
936            }
937        }
938
939        let code = format!(
940            "globalThis.onResponse(new RelayBody({}), {})",
941            rid, flow_json
942        );
943        let result = runtime
944            .execute_script("call_onResponse", code)
945            .map_err(|e| {
946                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponse");
947                e.to_string()
948            })?;
949        let result = runtime.resolve(result).await.map_err(|e| {
950            Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponse");
951            e.to_string()
952        })?;
953
954        let (is_empty, modified_flow) = {
955            let mut scope = runtime.handle_scope();
956            let result_val = deno_core::v8::Local::new(&mut scope, result);
957
958            if result_val.is_undefined() || result_val.is_null() {
959                (true, None)
960            } else {
961                let deser: Result<Flow, _> = deno_core::serde_v8::from_v8(&mut scope, result_val);
962                drop(scope);
963                match deser {
964                    Ok(f) => (false, Some(f)),
965                    Err(e) => {
966                        let err_str = format!("Failed to deserialize flow: {}", e);
967                        Self::try_call_on_error(runtime, &flow, &err_str, "onResponse");
968                        return Err(err_str);
969                    }
970                }
971            }
972        };
973
974        if is_empty {
975            let resource = {
976                let op_state_rc = runtime.op_state();
977                let mut state = op_state_rc.borrow_mut();
978                state.resource_table.take::<HttpBodyResource>(rid).ok()
979            };
980            if let Some(res) = resource {
981                let body = crate::streams::create_body_from_resource(&res);
982                return Ok((None, ResponseAction::Continue(body)));
983            } else {
984                return Ok((
985                    None,
986                    ResponseAction::Continue(
987                        http_body_util::Empty::new()
988                            .map_err(|_| -> BoxError { unreachable!() })
989                            .boxed(),
990                    ),
991                ));
992            }
993        }
994
995        let modified_flow = modified_flow.unwrap();
996
997        let resource = {
998            let op_state_rc = runtime.op_state();
999            let mut state = op_state_rc.borrow_mut();
1000            state.resource_table.take::<HttpBodyResource>(rid).ok()
1001        };
1002
1003        let new_body: HttpBody = if let Some(res) = resource {
1004            let has_new_body = if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer
1005            {
1006                http.response
1007                    .as_ref()
1008                    .and_then(|r| r.body.as_ref())
1009                    .map(|b| !b.content.is_empty())
1010                    .unwrap_or(false)
1011            } else {
1012                false
1013            };
1014
1015            if has_new_body {
1016                if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
1017                    if let Some(resp) = &http.response {
1018                        if let Some(b) = &resp.body {
1019                            let bytes: Bytes = if b.encoding == "base64" {
1020                                base64::engine::general_purpose::STANDARD
1021                                    .decode(&b.content)
1022                                    .unwrap_or_default()
1023                                    .into()
1024                            } else {
1025                                Bytes::from(b.content.clone())
1026                            };
1027                            Full::new(bytes)
1028                                .map_err(|e| -> BoxError { e.into() })
1029                                .boxed()
1030                        } else {
1031                            http_body_util::Empty::new()
1032                                .map_err(|_| -> BoxError { unreachable!() })
1033                                .boxed()
1034                        }
1035                    } else {
1036                        http_body_util::Empty::new()
1037                            .map_err(|_| -> BoxError { unreachable!() })
1038                            .boxed()
1039                    }
1040                } else {
1041                    http_body_util::Empty::new()
1042                        .map_err(|_| -> BoxError { unreachable!() })
1043                        .boxed()
1044                }
1045            } else {
1046                crate::streams::create_body_from_resource(&res)
1047            }
1048        } else if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
1049            if let Some(resp) = &http.response {
1050                if let Some(b) = &resp.body {
1051                    let bytes: Bytes = if b.encoding == "base64" {
1052                        base64::engine::general_purpose::STANDARD
1053                            .decode(&b.content)
1054                            .unwrap_or_default()
1055                            .into()
1056                    } else {
1057                        Bytes::from(b.content.clone())
1058                    };
1059                    Full::new(bytes)
1060                        .map_err(|e| -> BoxError { e.into() })
1061                        .boxed()
1062                } else {
1063                    http_body_util::Empty::new()
1064                        .map_err(|_| -> BoxError { unreachable!() })
1065                        .boxed()
1066                }
1067            } else {
1068                http_body_util::Empty::new()
1069                    .map_err(|_| -> BoxError { unreachable!() })
1070                    .boxed()
1071            }
1072        } else {
1073            http_body_util::Empty::new()
1074                .map_err(|_| -> BoxError { unreachable!() })
1075                .boxed()
1076        };
1077
1078        Ok((Some(modified_flow), ResponseAction::Continue(new_body)))
1079    }
1080
1081    fn handle_on_websocket_message(
1082        runtime: &mut JsRuntime,
1083        flow: Flow,
1084        message: WebSocketMessage,
1085    ) -> Result<WebSocketMessageAction, String> {
1086        let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
1087        let message_json = serde_json::to_string(&message).map_err(|e| e.to_string())?;
1088
1089        let check_code = "typeof globalThis.onWebSocketMessage === 'function'";
1090        let exists = runtime
1091            .execute_script("check_onWebSocketMessage", check_code)
1092            .map_err(|e| {
1093                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onWebSocketMessage");
1094                e.to_string()
1095            })?;
1096        {
1097            let mut scope = runtime.handle_scope();
1098            let exists_val = deno_core::v8::Local::new(&mut scope, exists);
1099            if !exists_val.is_true() {
1100                return Ok(WebSocketMessageAction::Continue(message));
1101            }
1102        }
1103
1104        let code = format!(
1105            "globalThis.onWebSocketMessage({{}}, {}, {})",
1106            flow_json, message_json
1107        );
1108        let result = runtime
1109            .execute_script("call_onWebSocketMessage", code)
1110            .map_err(|e| {
1111                Self::try_call_on_error(runtime, &flow, &e.to_string(), "onWebSocketMessage");
1112                e.to_string()
1113            })?;
1114
1115        let mut scope = runtime.handle_scope();
1116        let result_val = deno_core::v8::Local::new(&mut scope, result);
1117
1118        if result_val.is_undefined() || result_val.is_null() {
1119            return Ok(WebSocketMessageAction::Continue(message));
1120        }
1121
1122        if result_val.is_string() {
1123            let s = result_val.to_rust_string_lossy(&mut scope);
1124            if s == "DROP" {
1125                return Ok(WebSocketMessageAction::Drop);
1126            }
1127        }
1128
1129        let deser: Result<WebSocketMessage, _> =
1130            deno_core::serde_v8::from_v8(&mut scope, result_val);
1131        drop(scope);
1132        let modified_message = match deser {
1133            Ok(m) => m,
1134            Err(e) => {
1135                let err_str = format!("Failed to deserialize message: {}", e);
1136                Self::try_call_on_error(runtime, &flow, &err_str, "onWebSocketMessage");
1137                return Err(err_str);
1138            }
1139        };
1140
1141        Ok(WebSocketMessageAction::Continue(modified_message))
1142    }
1143
1144    fn handle_on_connect(
1145        runtime: &mut JsRuntime,
1146        conn: ConnectionInfo,
1147    ) -> Result<ConnectAction, String> {
1148        let check_code = "typeof globalThis.onConnect === 'function'";
1149        let exists = runtime
1150            .execute_script("check_onConnect", check_code)
1151            .ok()
1152            .map(|v| {
1153                let mut scope = runtime.handle_scope();
1154                deno_core::v8::Local::new(&mut scope, v).is_true()
1155            })
1156            .unwrap_or(false);
1157        if !exists {
1158            return Ok(ConnectAction::Allow);
1159        }
1160
1161        let conn_json = serde_json::json!({
1162            "id": conn.id.to_string(),
1163            "client_addr": conn.client_addr.to_string(),
1164            "server_addr": conn.server_addr.map(|a| a.to_string()),
1165            "tls_sni": conn.tls_sni,
1166        });
1167        let code = format!(
1168            "globalThis.onConnect({{}}, {})",
1169            serde_json::to_string(&conn_json).unwrap_or_default()
1170        );
1171        let result = runtime
1172            .execute_script("call_onConnect", code)
1173            .map_err(|e| {
1174                tracing::warn!("onConnect script error: {}", e);
1175                e.to_string()
1176            })?;
1177        let mut scope = runtime.handle_scope();
1178        let result_val = deno_core::v8::Local::new(&mut scope, result);
1179        if result_val.is_undefined() || result_val.is_null() {
1180            return Ok(ConnectAction::Allow);
1181        }
1182        if result_val.is_object() {
1183            let Some(obj) = result_val.to_object(&mut scope) else {
1184                return Ok(ConnectAction::Allow);
1185            };
1186            let drop_key: deno_core::v8::Local<deno_core::v8::Value> =
1187                deno_core::v8::String::new(&mut scope, "drop")
1188                    .expect("v8 string")
1189                    .into();
1190            if let Some(drop_val) = obj.get(&mut scope, drop_key)
1191                && drop_val.is_true()
1192            {
1193                let reason_key: deno_core::v8::Local<deno_core::v8::Value> =
1194                    deno_core::v8::String::new(&mut scope, "reason")
1195                        .expect("v8 string")
1196                        .into();
1197                let reason = obj
1198                    .get(&mut scope, reason_key)
1199                    .map(|v| v.to_rust_string_lossy(&mut scope))
1200                    .unwrap_or_else(|| "script onConnect drop".to_string());
1201                return Ok(ConnectAction::Drop { reason });
1202            }
1203        }
1204        Ok(ConnectAction::Allow)
1205    }
1206
1207    fn handle_on_disconnect(
1208        runtime: &mut JsRuntime,
1209        conn: ConnectionInfo,
1210        stats: ConnectionStats,
1211    ) -> Result<(), String> {
1212        let check_code = "typeof globalThis.onDisconnect === 'function'";
1213        let exists = runtime
1214            .execute_script("check_onDisconnect", check_code)
1215            .ok()
1216            .map(|v| {
1217                let mut scope = runtime.handle_scope();
1218                deno_core::v8::Local::new(&mut scope, v).is_true()
1219            })
1220            .unwrap_or(false);
1221        if !exists {
1222            return Ok(());
1223        }
1224
1225        let conn_json = serde_json::json!({
1226            "id": conn.id.to_string(),
1227            "client_addr": conn.client_addr.to_string(),
1228            "server_addr": conn.server_addr.map(|a| a.to_string()),
1229            "tls_sni": conn.tls_sni,
1230        });
1231        let stats_json = serde_json::json!({
1232            "duration_ms": stats.duration_ms,
1233            "bytes_sent": stats.bytes_sent,
1234            "bytes_received": stats.bytes_received,
1235            "flows_count": stats.flows_count,
1236        });
1237        let code = format!(
1238            "globalThis.onDisconnect({{}}, {}, {})",
1239            serde_json::to_string(&conn_json).unwrap_or_default(),
1240            serde_json::to_string(&stats_json).unwrap_or_default()
1241        );
1242        let _ = runtime.execute_script("call_onDisconnect", code);
1243        Ok(())
1244    }
1245
1246    fn handle_on_websocket_start(
1247        runtime: &mut JsRuntime,
1248        flow: Flow,
1249    ) -> Result<Option<Flow>, String> {
1250        let check_code = "typeof globalThis.onWebSocketStart === 'function'";
1251        let exists = runtime
1252            .execute_script("check_onWebSocketStart", check_code)
1253            .ok()
1254            .map(|v| {
1255                let mut scope = runtime.handle_scope();
1256                deno_core::v8::Local::new(&mut scope, v).is_true()
1257            })
1258            .unwrap_or(false);
1259        if !exists {
1260            return Ok(None);
1261        }
1262
1263        let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
1264        let code = format!("globalThis.onWebSocketStart({{}}, {})", flow_json);
1265        let result = runtime
1266            .execute_script("call_onWebSocketStart", code)
1267            .map_err(|e| e.to_string())?;
1268        let mut scope = runtime.handle_scope();
1269        let result_val = deno_core::v8::Local::new(&mut scope, result);
1270        if result_val.is_undefined() || result_val.is_null() {
1271            return Ok(None);
1272        }
1273        let deser: Flow =
1274            deno_core::serde_v8::from_v8(&mut scope, result_val).map_err(|e| e.to_string())?;
1275        Ok(Some(deser))
1276    }
1277
1278    fn handle_on_websocket_end(
1279        runtime: &mut JsRuntime,
1280        flow: Flow,
1281        close_code: u16,
1282        close_reason: String,
1283    ) -> Result<Option<Flow>, String> {
1284        let check_code = "typeof globalThis.onWebSocketEnd === 'function'";
1285        let exists = runtime
1286            .execute_script("check_onWebSocketEnd", check_code)
1287            .ok()
1288            .map(|v| {
1289                let mut scope = runtime.handle_scope();
1290                deno_core::v8::Local::new(&mut scope, v).is_true()
1291            })
1292            .unwrap_or(false);
1293        if !exists {
1294            return Ok(None);
1295        }
1296
1297        let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
1298        let reason_json =
1299            serde_json::to_string(&close_reason).unwrap_or_else(|_| "null".to_string());
1300        let code = format!(
1301            "globalThis.onWebSocketEnd({{}}, {}, {}, {})",
1302            flow_json, close_code, reason_json
1303        );
1304        let result = runtime
1305            .execute_script("call_onWebSocketEnd", code)
1306            .map_err(|e| e.to_string())?;
1307        let mut scope = runtime.handle_scope();
1308        let result_val = deno_core::v8::Local::new(&mut scope, result);
1309        if result_val.is_undefined() || result_val.is_null() {
1310            return Ok(None);
1311        }
1312        let deser: Flow =
1313            deno_core::serde_v8::from_v8(&mut scope, result_val).map_err(|e| e.to_string())?;
1314        Ok(Some(deser))
1315    }
1316
1317    fn handle_on_websocket_error(
1318        runtime: &mut JsRuntime,
1319        flow: Flow,
1320        error: String,
1321    ) -> Result<Option<Flow>, String> {
1322        let check_code = "typeof globalThis.onWebSocketError === 'function'";
1323        let exists = runtime
1324            .execute_script("check_onWebSocketError", check_code)
1325            .ok()
1326            .map(|v| {
1327                let mut scope = runtime.handle_scope();
1328                deno_core::v8::Local::new(&mut scope, v).is_true()
1329            })
1330            .unwrap_or(false);
1331        if !exists {
1332            return Ok(None);
1333        }
1334
1335        let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
1336        let error_json = serde_json::to_string(&error).unwrap_or_else(|_| "null".to_string());
1337        let code = format!(
1338            "globalThis.onWebSocketError({{}}, {}, {})",
1339            flow_json, error_json
1340        );
1341        let result = runtime
1342            .execute_script("call_onWebSocketError", code)
1343            .map_err(|e| e.to_string())?;
1344        let mut scope = runtime.handle_scope();
1345        let result_val = deno_core::v8::Local::new(&mut scope, result);
1346        if result_val.is_undefined() || result_val.is_null() {
1347            return Ok(None);
1348        }
1349        let deser: Flow =
1350            deno_core::serde_v8::from_v8(&mut scope, result_val).map_err(|e| e.to_string())?;
1351        Ok(Some(deser))
1352    }
1353}
1354
1355#[async_trait]
1356impl ScriptEngineTrait for DenoScriptEngine {
1357    async fn load_script(&mut self, script: &str) -> Result<(), BoxError> {
1358        let (tx, rx) = oneshot::channel();
1359        self.tx
1360            .send(DenoCommand::LoadScript(script.to_string(), tx))
1361            .await
1362            .map_err(|e| Box::new(e) as BoxError)?;
1363        rx.await
1364            .map_err(|e| Box::new(e) as BoxError)?
1365            .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)
1366    }
1367
1368    async fn on_request_headers(&self, flow: &mut Flow) -> Result<Option<Flow>, BoxError> {
1369        let (tx, rx) = oneshot::channel();
1370        let flow_clone = flow.clone();
1371        self.tx
1372            .send(DenoCommand::OnRequestHeaders(flow_clone, tx))
1373            .await
1374            .map_err(|e| Box::new(e) as BoxError)?;
1375        let res = rx
1376            .await
1377            .map_err(|e| Box::new(e) as BoxError)?
1378            .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
1379
1380        if let Some(new_flow) = &res {
1381            *flow = new_flow.clone();
1382        }
1383        Ok(res)
1384    }
1385
1386    async fn on_request(&self, flow: &mut Flow, body: HttpBody) -> Result<RequestAction, BoxError> {
1387        let (tx, rx) = oneshot::channel();
1388        let flow_clone = flow.clone();
1389        self.tx
1390            .send(DenoCommand::OnRequest(flow_clone, body, tx))
1391            .await
1392            .map_err(|e| Box::new(e) as BoxError)?;
1393        let (new_flow, action) = rx
1394            .await
1395            .map_err(|e| Box::new(e) as BoxError)?
1396            .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
1397
1398        if let Some(f) = new_flow {
1399            *flow = f;
1400        }
1401        Ok(action)
1402    }
1403
1404    async fn on_response_headers(&self, flow: &mut Flow) -> Result<Option<Flow>, BoxError> {
1405        let (tx, rx) = oneshot::channel();
1406        let flow_clone = flow.clone();
1407        self.tx
1408            .send(DenoCommand::OnResponseHeaders(flow_clone, tx))
1409            .await
1410            .map_err(|e| Box::new(e) as BoxError)?;
1411        let res = rx
1412            .await
1413            .map_err(|e| Box::new(e) as BoxError)?
1414            .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
1415
1416        if let Some(new_flow) = &res {
1417            *flow = new_flow.clone();
1418        }
1419        Ok(res)
1420    }
1421
1422    async fn on_response(
1423        &self,
1424        flow: &mut Flow,
1425        body: HttpBody,
1426    ) -> Result<ResponseAction, BoxError> {
1427        let (tx, rx) = oneshot::channel();
1428        let flow_clone = flow.clone();
1429        self.tx
1430            .send(DenoCommand::OnResponse(flow_clone, body, tx))
1431            .await
1432            .map_err(|e| Box::new(e) as BoxError)?;
1433        let (new_flow, action) = rx
1434            .await
1435            .map_err(|e| Box::new(e) as BoxError)?
1436            .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
1437
1438        if let Some(f) = new_flow {
1439            *flow = f;
1440        }
1441        Ok(action)
1442    }
1443
1444    async fn on_websocket_message(
1445        &self,
1446        _flow: &mut Flow,
1447        message: &mut WebSocketMessage,
1448    ) -> Result<WebSocketMessageAction, BoxError> {
1449        let (tx, rx) = oneshot::channel();
1450        let flow_clone = _flow.clone();
1451        let message_clone = message.clone();
1452        self.tx
1453            .send(DenoCommand::OnWebSocketMessage(
1454                flow_clone,
1455                message_clone,
1456                tx,
1457            ))
1458            .await
1459            .map_err(|e| Box::new(e) as BoxError)?;
1460        let res = rx
1461            .await
1462            .map_err(|e| Box::new(e) as BoxError)?
1463            .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
1464
1465        Ok(res)
1466    }
1467
1468    async fn on_connect(&self, conn: &ConnectionInfo) -> Result<ConnectAction, BoxError> {
1469        let (tx, rx) = oneshot::channel();
1470        let conn_clone = conn.clone();
1471        self.tx
1472            .send(DenoCommand::OnConnect(conn_clone, tx))
1473            .await
1474            .map_err(|e| Box::new(e) as BoxError)?;
1475        rx.await
1476            .map_err(|e| Box::new(e) as BoxError)?
1477            .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)
1478    }
1479
1480    async fn on_disconnect(
1481        &self,
1482        conn: &ConnectionInfo,
1483        stats: &ConnectionStats,
1484    ) -> Result<(), BoxError> {
1485        let (tx, rx) = oneshot::channel();
1486        let conn_clone = conn.clone();
1487        let stats_clone = stats.clone();
1488        self.tx
1489            .send(DenoCommand::OnDisconnect(conn_clone, stats_clone, tx))
1490            .await
1491            .map_err(|e| Box::new(e) as BoxError)?;
1492        rx.await
1493            .map_err(|e| Box::new(e) as BoxError)?
1494            .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)
1495    }
1496
1497    async fn on_websocket_start(&self, flow: &mut Flow) -> Result<(), BoxError> {
1498        let (tx, rx) = oneshot::channel();
1499        let flow_clone = flow.clone();
1500        self.tx
1501            .send(DenoCommand::OnWebSocketStart(flow_clone, tx))
1502            .await
1503            .map_err(|e| Box::new(e) as BoxError)?;
1504        let res = rx
1505            .await
1506            .map_err(|e| Box::new(e) as BoxError)?
1507            .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
1508
1509        if let Some(new_flow) = res {
1510            *flow = new_flow;
1511        }
1512        Ok(())
1513    }
1514
1515    async fn on_websocket_end(
1516        &self,
1517        flow: &mut Flow,
1518        close_code: u16,
1519        close_reason: &str,
1520    ) -> Result<(), BoxError> {
1521        let (tx, rx) = oneshot::channel();
1522        let flow_clone = flow.clone();
1523        let reason_owned = close_reason.to_string();
1524        self.tx
1525            .send(DenoCommand::OnWebSocketEnd(
1526                flow_clone,
1527                close_code,
1528                reason_owned,
1529                tx,
1530            ))
1531            .await
1532            .map_err(|e| Box::new(e) as BoxError)?;
1533        let res = rx
1534            .await
1535            .map_err(|e| Box::new(e) as BoxError)?
1536            .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
1537
1538        if let Some(new_flow) = res {
1539            *flow = new_flow;
1540        }
1541        Ok(())
1542    }
1543
1544    async fn on_websocket_error(&self, flow: &mut Flow, error: &str) -> Result<(), BoxError> {
1545        let (tx, rx) = oneshot::channel();
1546        let flow_clone = flow.clone();
1547        let error_owned = error.to_string();
1548        self.tx
1549            .send(DenoCommand::OnWebSocketError(flow_clone, error_owned, tx))
1550            .await
1551            .map_err(|e| Box::new(e) as BoxError)?;
1552        let res = rx
1553            .await
1554            .map_err(|e| Box::new(e) as BoxError)?
1555            .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
1556
1557        if let Some(new_flow) = res {
1558            *flow = new_flow;
1559        }
1560        Ok(())
1561    }
1562}