Skip to main content

relay_core_script/
deno_engine.rs

1use std::thread;
2use std::rc::Rc;
3use std::cell::RefCell;
4use tokio::sync::{mpsc, oneshot};
5use relay_core_api::flow::{Flow, WebSocketMessage};
6use crate::engine_trait::ScriptEngineTrait;
7use async_trait::async_trait;
8use deno_core::{JsRuntime, RuntimeOptions, Extension, op2, Op, ResourceId, OpState, error::AnyError};
9use relay_core_lib::interceptor::{HttpBody, RequestAction, ResponseAction, WebSocketMessageAction, BoxError};
10use http_body_util::{BodyExt, Full};
11use bytes::Bytes;
12use base64::Engine as _;
13use crate::streams::HttpBodyResource;
14
15#[op2(fast)]
16fn op_log(#[string] msg: String) {
17    println!("[Deno] {}", msg);
18}
19
20#[op2(async)]
21#[serde]
22async fn op_read_body(
23    state: Rc<RefCell<OpState>>,
24    #[smi] rid: ResourceId,
25    #[smi] limit: usize,
26) -> Result<Vec<u8>, AnyError> {
27    let resource = {
28        let state = state.borrow();
29        state.resource_table.get_any(rid)?
30    };
31    let view = resource.read(limit).await?;
32    Ok(view.to_vec())
33}
34
35#[op2(fast)]
36fn op_close_body(
37    state: &mut OpState,
38    #[smi] rid: ResourceId,
39) {
40    state.resource_table.take_any(rid).ok();
41}
42
43enum DenoCommand {
44    LoadScript(String, oneshot::Sender<Result<(), String>>),
45    OnRequestHeaders(Flow, oneshot::Sender<Result<Option<Flow>, String>>),
46    OnRequest(Flow, HttpBody, oneshot::Sender<Result<(Option<Flow>, RequestAction), String>>),
47    OnResponseHeaders(Flow, oneshot::Sender<Result<Option<Flow>, String>>),
48    OnResponse(Flow, HttpBody, oneshot::Sender<Result<(Option<Flow>, ResponseAction), String>>),
49    OnWebSocketMessage(Flow, WebSocketMessage, oneshot::Sender<Result<WebSocketMessageAction, String>>),
50}
51
52#[derive(Clone)]
53pub struct DenoScriptEngine {
54    tx: mpsc::Sender<DenoCommand>,
55}
56
57impl Default for DenoScriptEngine {
58    fn default() -> Self {
59        Self::new()
60    }
61}
62
63impl DenoScriptEngine {
64    pub fn new() -> Self {
65        let (tx, mut rx) = mpsc::channel(32);
66
67        thread::spawn(move || {
68            let rt = tokio::runtime::Builder::new_current_thread()
69                .enable_all()
70                .build()
71                .unwrap();
72
73            rt.block_on(async move {
74                let ext = Extension {
75                    name: "relay_core",
76                    ops: std::borrow::Cow::Borrowed(&[
77                        op_log::DECL,
78                        op_read_body::DECL,
79                        op_close_body::DECL,
80                    ]),
81                    ..Default::default()
82                };
83
84                let mut js_runtime = JsRuntime::new(RuntimeOptions {
85                    extensions: vec![ext],
86                    ..Default::default()
87                });
88
89                // Polyfill console.log and Bootstrap API
90                let bootstrap = r#"
91                    globalThis.console = {
92                        log: (...args) => {
93                            const msg = args.map(arg => {
94                                if (typeof arg === 'object') {
95                                    try {
96                                        return JSON.stringify(arg);
97                                    } catch {
98                                        return String(arg);
99                                    }
100                                }
101                                return String(arg);
102                            }).join(" ");
103                            Deno.core.ops.op_log(msg);
104                        }
105                    };
106
107                    class RelayBody {
108                        constructor(rid) {
109                            this.rid = rid;
110                        }
111                        
112                        async read(limit) {
113                            return await Deno.core.ops.op_read_body(this.rid, limit || 65536);
114                        }
115                        
116                        close() {
117                            Deno.core.ops.op_close_body(this.rid);
118                        }
119                        
120                        async text() {
121                            const bytes = await this.read(10 * 1024 * 1024); // 10MB limit
122                            return new TextDecoder().decode(bytes);
123                        }
124                        
125                        async json() {
126                            const txt = await this.text();
127                            return JSON.parse(txt);
128                        }
129                    }
130                    globalThis.RelayBody = RelayBody;
131
132                    globalThis.relay = {
133                        log: globalThis.console.log,
134                        // Future: add more helpers like base64, etc.
135                    };
136                "#;
137                js_runtime.execute_script("bootstrap", bootstrap).unwrap();
138
139                while let Some(cmd) = rx.recv().await {
140                    match cmd {
141                        DenoCommand::LoadScript(script, resp) => {
142                            let res = js_runtime.execute_script("<anon>", script);
143                            let res = if let Err(e) = res {
144                                Err(e.to_string())
145                            } else {
146                                js_runtime.run_event_loop(Default::default()).await
147                                    .map(|_| ())
148                                    .map_err(|e| e.to_string())
149                            };
150                            let _ = resp.send(res);
151                        }
152                        DenoCommand::OnRequestHeaders(flow, resp) => {
153                             let res = Self::handle_on_request_headers(&mut js_runtime, flow);
154                             let _ = resp.send(res);
155                        }
156                        DenoCommand::OnRequest(flow, body, resp) => {
157                            let res = Self::handle_on_request(&mut js_runtime, flow, body).await;
158                            let _ = resp.send(res);
159                        }
160                        DenoCommand::OnResponseHeaders(flow, resp) => {
161                             let res = Self::handle_on_response_headers(&mut js_runtime, flow);
162                             let _ = resp.send(res);
163                        }
164                        DenoCommand::OnResponse(flow, body, resp) => {
165                            let res = Self::handle_on_response(&mut js_runtime, flow, body).await;
166                            let _ = resp.send(res);
167                        }
168                        DenoCommand::OnWebSocketMessage(flow, message, resp) => {
169                            let res = Self::handle_on_websocket_message(&mut js_runtime, flow, message);
170                            let _ = resp.send(res);
171                        }
172                    }
173                }
174            });
175        });
176
177        Self { tx }
178    }
179
180    fn handle_on_request_headers(runtime: &mut JsRuntime, flow: Flow) -> Result<Option<Flow>, String> {
181        let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
182        let check_code = "typeof globalThis.onRequestHeaders === 'function'";
183        let exists = runtime.execute_script("check_onRequestHeaders", check_code).map_err(|e| e.to_string())?;
184        {
185            let scope = &mut runtime.handle_scope();
186            let exists_val = deno_core::v8::Local::new(scope, exists);
187            if !exists_val.is_true() { return Ok(None); }
188        }
189        let code = format!("globalThis.onRequestHeaders({{}}, {})", flow_json);
190        let result = runtime.execute_script("call_onRequestHeaders", code).map_err(|e| e.to_string())?;
191        let scope = &mut runtime.handle_scope();
192        let result_val = deno_core::v8::Local::new(scope, result);
193        if result_val.is_undefined() || result_val.is_null() { return Ok(None); }
194        let modified_flow: Flow = deno_core::serde_v8::from_v8(scope, result_val)
195            .map_err(|e| format!("Failed to deserialize flow: {}", e))?;
196        Ok(Some(modified_flow))
197    }
198
199    fn handle_on_response_headers(runtime: &mut JsRuntime, flow: Flow) -> Result<Option<Flow>, String> {
200        let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
201        let check_code = "typeof globalThis.onResponseHeaders === 'function'";
202        let exists = runtime.execute_script("check_onResponseHeaders", check_code).map_err(|e| e.to_string())?;
203        {
204            let scope = &mut runtime.handle_scope();
205            let exists_val = deno_core::v8::Local::new(scope, exists);
206            if !exists_val.is_true() { return Ok(None); }
207        }
208        let code = format!("globalThis.onResponseHeaders({{}}, {})", flow_json);
209        let result = runtime.execute_script("call_onResponseHeaders", code).map_err(|e| e.to_string())?;
210        let scope = &mut runtime.handle_scope();
211        let result_val = deno_core::v8::Local::new(scope, result);
212        if result_val.is_undefined() || result_val.is_null() { return Ok(None); }
213        let modified_flow: Flow = deno_core::serde_v8::from_v8(scope, result_val)
214            .map_err(|e| format!("Failed to deserialize flow: {}", e))?;
215        Ok(Some(modified_flow))
216    }
217
218    async fn handle_on_request(runtime: &mut JsRuntime, flow: Flow, body: HttpBody) -> Result<(Option<Flow>, RequestAction), String> {
219        let resource = HttpBodyResource::new(body);
220        let rid = {
221            let op_state_rc = runtime.op_state();
222            let mut state = op_state_rc.borrow_mut();
223            state.resource_table.add(resource)
224        };
225
226        let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
227        
228        let check_code = "typeof globalThis.onRequest === 'function'";
229        let exists = runtime.execute_script("check_onRequest", check_code).map_err(|e| e.to_string())?;
230        
231        let exists_bool = {
232            let scope = &mut runtime.handle_scope();
233            let exists_val = deno_core::v8::Local::new(scope, exists);
234            exists_val.is_true()
235        };
236
237        if !exists_bool {
238            // Not found, return original body via resource
239            let resource = {
240                let op_state_rc = runtime.op_state();
241                let mut state = op_state_rc.borrow_mut();
242                state.resource_table.take::<HttpBodyResource>(rid).ok()
243            };
244            if let Some(res) = resource {
245                 let body = crate::streams::create_body_from_resource(&res);
246                 return Ok((None, RequestAction::Continue(body)));
247            } else {
248                 return Ok((None, RequestAction::Continue(http_body_util::Empty::new().map_err(|_| -> BoxError { unreachable!() }).boxed())));
249            }
250        }
251
252        let code = format!("globalThis.onRequest(new RelayBody({}), {})", rid, flow_json);
253        let result = runtime.execute_script("call_onRequest", code).map_err(|e| e.to_string())?;
254        
255        let result = runtime.resolve(result).await.map_err(|e| e.to_string())?;
256
257        let (is_empty, modified_flow) = {
258            let scope = &mut runtime.handle_scope();
259            let result_val = deno_core::v8::Local::new(scope, result);
260
261            if result_val.is_undefined() || result_val.is_null() {
262                (true, None)
263            } else {
264                let flow: Flow = deno_core::serde_v8::from_v8(scope, result_val)
265                    .map_err(|e| format!("Failed to deserialize flow: {}", e))?;
266                (false, Some(flow))
267            }
268        };
269
270        if is_empty {
271            // JS returned nothing, continue with original body
272            let resource = {
273                let op_state_rc = runtime.op_state();
274                let mut state = op_state_rc.borrow_mut();
275                state.resource_table.take::<HttpBodyResource>(rid).ok()
276            };
277            if let Some(res) = resource {
278                 let body = crate::streams::create_body_from_resource(&res);
279                 return Ok((None, RequestAction::Continue(body)));
280            } else {
281                 return Ok((None, RequestAction::Continue(http_body_util::Empty::new().map_err(|_| -> BoxError { unreachable!() }).boxed())));
282            }
283        }
284
285        let modified_flow = modified_flow.unwrap();
286
287        let resource = {
288            let op_state_rc = runtime.op_state();
289            let mut state = op_state_rc.borrow_mut();
290            state.resource_table.take::<HttpBodyResource>(rid).ok()
291        };
292        
293        let new_body: HttpBody = if let Some(res) = resource {
294            // Original body stream is available.
295            // Check if JS provided a NEW body in `modified_flow`.
296            let has_new_body = if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
297                http.request.body.as_ref().map(|b| !b.content.is_empty()).unwrap_or(false)
298            } else { false };
299            
300            if has_new_body {
301                // JS provided new body content. Use it.
302                // And we drop the original resource (and its reader).
303                if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
304                    if let Some(b) = &http.request.body {
305                         let bytes: Bytes = if b.encoding == "base64" {
306                             base64::engine::general_purpose::STANDARD.decode(&b.content).unwrap_or_default().into()
307                         } else {
308                             Bytes::from(b.content.clone())
309                         };
310                         Full::new(bytes).map_err(|e| -> BoxError { e.into() }).boxed()
311                    } else {
312                        http_body_util::Empty::new().map_err(|_| -> BoxError { unreachable!() }).boxed()
313                    }
314                } else {
315                    http_body_util::Empty::new().map_err(|_| -> BoxError { unreachable!() }).boxed()
316                }
317            } else {
318                // JS did not provide new body content. Use original stream.
319                crate::streams::create_body_from_resource(&res)
320            }
321        } else if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
322            // Resource is gone (JS consumed it or closed it).
323            // We must use whatever is in `modified_flow`.
324             if let Some(b) = &http.request.body {
325                 let bytes: Bytes = if b.encoding == "base64" {
326                     base64::engine::general_purpose::STANDARD.decode(&b.content).unwrap_or_default().into()
327                 } else {
328                     Bytes::from(b.content.clone())
329                 };
330                 Full::new(bytes).map_err(|e| -> BoxError { e.into() }).boxed()
331            } else {
332                http_body_util::Empty::new().map_err(|_| -> BoxError { unreachable!() }).boxed()
333            }
334        } else {
335            http_body_util::Empty::new().map_err(|_| -> BoxError { unreachable!() }).boxed()
336        };
337
338        Ok((Some(modified_flow), RequestAction::Continue(new_body)))
339    }
340
341    async fn handle_on_response(runtime: &mut JsRuntime, flow: Flow, body: HttpBody) -> Result<(Option<Flow>, ResponseAction), String> {
342        let resource = HttpBodyResource::new(body);
343        let rid = {
344            let op_state_rc = runtime.op_state();
345            let mut state = op_state_rc.borrow_mut();
346            state.resource_table.add(resource)
347        };
348
349        let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
350        
351        let check_code = "typeof globalThis.onResponse === 'function'";
352        let exists = runtime.execute_script("check_onResponse", check_code).map_err(|e| e.to_string())?;
353        
354        let exists_bool = {
355            let scope = &mut runtime.handle_scope();
356            let exists_val = deno_core::v8::Local::new(scope, exists);
357            exists_val.is_true()
358        };
359
360        if !exists_bool {
361             let resource = {
362                let op_state_rc = runtime.op_state();
363                let mut state = op_state_rc.borrow_mut();
364                state.resource_table.take::<HttpBodyResource>(rid).ok()
365            };
366            if let Some(res) = resource {
367                 let body = crate::streams::create_body_from_resource(&res);
368                 return Ok((None, ResponseAction::Continue(body)));
369            } else {
370                 return Ok((None, ResponseAction::Continue(http_body_util::Empty::new().map_err(|_| -> BoxError { unreachable!() }).boxed())));
371            }
372        }
373
374        let code = format!("globalThis.onResponse(new RelayBody({}), {})", rid, flow_json);
375        let result = runtime.execute_script("call_onResponse", code).map_err(|e| e.to_string())?;
376        let result = runtime.resolve(result).await.map_err(|e| e.to_string())?;
377
378        let (is_empty, modified_flow) = {
379            let scope = &mut runtime.handle_scope();
380            let result_val = deno_core::v8::Local::new(scope, result);
381
382            if result_val.is_undefined() || result_val.is_null() {
383                (true, None)
384            } else {
385                let flow: Flow = deno_core::serde_v8::from_v8(scope, result_val)
386                    .map_err(|e| format!("Failed to deserialize flow: {}", e))?;
387                (false, Some(flow))
388            }
389        };
390
391        if is_empty {
392             let resource = {
393                let op_state_rc = runtime.op_state();
394                let mut state = op_state_rc.borrow_mut();
395                state.resource_table.take::<HttpBodyResource>(rid).ok()
396            };
397            if let Some(res) = resource {
398                 let body = crate::streams::create_body_from_resource(&res);
399                 return Ok((None, ResponseAction::Continue(body)));
400            } else {
401                 return Ok((None, ResponseAction::Continue(http_body_util::Empty::new().map_err(|_| -> BoxError { unreachable!() }).boxed())));
402            }
403        }
404
405        let modified_flow = modified_flow.unwrap();
406
407        let resource = {
408            let op_state_rc = runtime.op_state();
409            let mut state = op_state_rc.borrow_mut();
410            state.resource_table.take::<HttpBodyResource>(rid).ok()
411        };
412        
413        let new_body: HttpBody = if let Some(res) = resource {
414            let has_new_body = if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
415                http.response.as_ref().and_then(|r| r.body.as_ref()).map(|b| !b.content.is_empty()).unwrap_or(false)
416            } else { false };
417            
418            if has_new_body {
419                if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
420                     if let Some(resp) = &http.response {
421                         if let Some(b) = &resp.body {
422                             let bytes: Bytes = if b.encoding == "base64" {
423                                 base64::engine::general_purpose::STANDARD.decode(&b.content).unwrap_or_default().into()
424                             } else {
425                                 Bytes::from(b.content.clone())
426                             };
427                             Full::new(bytes).map_err(|e| -> BoxError { e.into() }).boxed()
428                         } else {
429                             http_body_util::Empty::new().map_err(|_| -> BoxError { unreachable!() }).boxed()
430                         }
431                     } else {
432                         http_body_util::Empty::new().map_err(|_| -> BoxError { unreachable!() }).boxed()
433                     }
434                } else {
435                    http_body_util::Empty::new().map_err(|_| -> BoxError { unreachable!() }).boxed()
436                }
437            } else {
438                crate::streams::create_body_from_resource(&res)
439            }
440        } else if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
441             if let Some(resp) = &http.response {
442                 if let Some(b) = &resp.body {
443                     let bytes: Bytes = if b.encoding == "base64" {
444                         base64::engine::general_purpose::STANDARD.decode(&b.content).unwrap_or_default().into()
445                     } else {
446                         Bytes::from(b.content.clone())
447                     };
448                     Full::new(bytes).map_err(|e| -> BoxError { e.into() }).boxed()
449                 } else {
450                     http_body_util::Empty::new().map_err(|_| -> BoxError { unreachable!() }).boxed()
451                 }
452             } else {
453                 http_body_util::Empty::new().map_err(|_| -> BoxError { unreachable!() }).boxed()
454             }
455        } else {
456            http_body_util::Empty::new().map_err(|_| -> BoxError { unreachable!() }).boxed()
457        };
458
459        Ok((Some(modified_flow), ResponseAction::Continue(new_body)))
460    }
461
462    fn handle_on_websocket_message(runtime: &mut JsRuntime, flow: Flow, message: WebSocketMessage) -> Result<WebSocketMessageAction, String> {
463        let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
464        let message_json = serde_json::to_string(&message).map_err(|e| e.to_string())?;
465
466        let check_code = "typeof globalThis.onWebSocketMessage === 'function'";
467        let exists = runtime.execute_script("check_onWebSocketMessage", check_code).map_err(|e| e.to_string())?;
468        {
469            let scope = &mut runtime.handle_scope();
470            let exists_val = deno_core::v8::Local::new(scope, exists);
471            if !exists_val.is_true() { return Ok(WebSocketMessageAction::Continue(message)); }
472        }
473
474        let code = format!("globalThis.onWebSocketMessage({{}}, {}, {})", flow_json, message_json);
475        let result = runtime.execute_script("call_onWebSocketMessage", code).map_err(|e| e.to_string())?;
476        
477        let scope = &mut runtime.handle_scope();
478        let result_val = deno_core::v8::Local::new(scope, result);
479
480        if result_val.is_undefined() || result_val.is_null() {
481             return Ok(WebSocketMessageAction::Continue(message));
482        }
483
484        // Check if user returned "DROP" (string) or a modified WebSocketMessage
485        if result_val.is_string() {
486            let s = result_val.to_rust_string_lossy(scope);
487            if s == "DROP" {
488                 return Ok(WebSocketMessageAction::Drop);
489            }
490        }
491
492        let modified_message: WebSocketMessage = deno_core::serde_v8::from_v8(scope, result_val)
493            .map_err(|e| format!("Failed to deserialize message: {}", e))?;
494        
495        Ok(WebSocketMessageAction::Continue(modified_message))
496    }
497}
498
499#[async_trait]
500impl ScriptEngineTrait for DenoScriptEngine {
501    async fn load_script(&mut self, script: &str) -> Result<(), BoxError> {
502        let (tx, rx) = oneshot::channel();
503        self.tx.send(DenoCommand::LoadScript(script.to_string(), tx)).await.map_err(|e| Box::new(e) as BoxError)?;
504        rx.await.map_err(|e| Box::new(e) as BoxError)?.map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)
505    }
506
507    async fn on_request_headers(&self, flow: &mut Flow) -> Result<Option<Flow>, BoxError> {
508        let (tx, rx) = oneshot::channel();
509        let flow_clone = flow.clone();
510        self.tx.send(DenoCommand::OnRequestHeaders(flow_clone, tx)).await.map_err(|e| Box::new(e) as BoxError)?;
511        let res = rx.await.map_err(|e| Box::new(e) as BoxError)?.map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
512        
513        if let Some(new_flow) = &res {
514            *flow = new_flow.clone();
515        }
516        Ok(res)
517    }
518
519    async fn on_request(&self, flow: &mut Flow, body: HttpBody) -> Result<RequestAction, BoxError> {
520        let (tx, rx) = oneshot::channel();
521        let flow_clone = flow.clone();
522        self.tx.send(DenoCommand::OnRequest(flow_clone, body, tx)).await.map_err(|e| Box::new(e) as BoxError)?;
523        let (new_flow, action) = rx.await.map_err(|e| Box::new(e) as BoxError)?.map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
524        
525        if let Some(f) = new_flow {
526            *flow = f;
527        }
528        Ok(action)
529    }
530
531    async fn on_response_headers(&self, flow: &mut Flow) -> Result<Option<Flow>, BoxError> {
532        let (tx, rx) = oneshot::channel();
533        let flow_clone = flow.clone();
534        self.tx.send(DenoCommand::OnResponseHeaders(flow_clone, tx)).await.map_err(|e| Box::new(e) as BoxError)?;
535        let res = rx.await.map_err(|e| Box::new(e) as BoxError)?.map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
536        
537        if let Some(new_flow) = &res {
538            *flow = new_flow.clone();
539        }
540        Ok(res)
541    }
542
543    async fn on_response(&self, flow: &mut Flow, body: HttpBody) -> Result<ResponseAction, BoxError> {
544        let (tx, rx) = oneshot::channel();
545        let flow_clone = flow.clone();
546        self.tx.send(DenoCommand::OnResponse(flow_clone, body, tx)).await.map_err(|e| Box::new(e) as BoxError)?;
547        let (new_flow, action) = rx.await.map_err(|e| Box::new(e) as BoxError)?.map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
548        
549        if let Some(f) = new_flow {
550            *flow = f;
551        }
552        Ok(action)
553    }
554
555    async fn on_websocket_message(&self, _flow: &mut Flow, message: &mut WebSocketMessage) -> Result<WebSocketMessageAction, BoxError> {
556        let (tx, rx) = oneshot::channel();
557        let flow_clone = _flow.clone();
558        let message_clone = message.clone();
559        self.tx.send(DenoCommand::OnWebSocketMessage(flow_clone, message_clone, tx)).await.map_err(|e| Box::new(e) as BoxError)?;
560        let res = rx.await.map_err(|e| Box::new(e) as BoxError)?.map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
561        
562        Ok(res)
563    }
564}