1use std::thread;
2use std::rc::Rc;
3use std::cell::RefCell;
4use tokio::sync::{mpsc, oneshot};
5use relay_core_api::flow::{Flow, WebSocketMessage};
6use crate::engine_trait::ScriptEngineTrait;
7use async_trait::async_trait;
8use deno_core::{JsRuntime, RuntimeOptions, Extension, op2, Op, ResourceId, OpState, error::AnyError};
9use relay_core_lib::interceptor::{HttpBody, RequestAction, ResponseAction, WebSocketMessageAction, BoxError};
10use http_body_util::{BodyExt, Full};
11use bytes::Bytes;
12use base64::Engine as _;
13use crate::streams::HttpBodyResource;
14
15#[op2(fast)]
16fn op_log(#[string] msg: String) {
17 println!("[Deno] {}", msg);
18}
19
20#[op2(async)]
21#[serde]
22async fn op_read_body(
23 state: Rc<RefCell<OpState>>,
24 #[smi] rid: ResourceId,
25 #[smi] limit: usize,
26) -> Result<Vec<u8>, AnyError> {
27 let resource = {
28 let state = state.borrow();
29 state.resource_table.get_any(rid)?
30 };
31 let view = resource.read(limit).await?;
32 Ok(view.to_vec())
33}
34
35#[op2(fast)]
36fn op_close_body(
37 state: &mut OpState,
38 #[smi] rid: ResourceId,
39) {
40 state.resource_table.take_any(rid).ok();
41}
42
43enum DenoCommand {
44 LoadScript(String, oneshot::Sender<Result<(), String>>),
45 OnRequestHeaders(Flow, oneshot::Sender<Result<Option<Flow>, String>>),
46 OnRequest(Flow, HttpBody, oneshot::Sender<Result<(Option<Flow>, RequestAction), String>>),
47 OnResponseHeaders(Flow, oneshot::Sender<Result<Option<Flow>, String>>),
48 OnResponse(Flow, HttpBody, oneshot::Sender<Result<(Option<Flow>, ResponseAction), String>>),
49 OnWebSocketMessage(Flow, WebSocketMessage, oneshot::Sender<Result<WebSocketMessageAction, String>>),
50}
51
52#[derive(Clone)]
53pub struct DenoScriptEngine {
54 tx: mpsc::Sender<DenoCommand>,
55}
56
57impl Default for DenoScriptEngine {
58 fn default() -> Self {
59 Self::new()
60 }
61}
62
63impl DenoScriptEngine {
64 pub fn new() -> Self {
65 let (tx, mut rx) = mpsc::channel(32);
66
67 thread::spawn(move || {
68 let rt = tokio::runtime::Builder::new_current_thread()
69 .enable_all()
70 .build()
71 .unwrap();
72
73 rt.block_on(async move {
74 let ext = Extension {
75 name: "relay_core",
76 ops: std::borrow::Cow::Borrowed(&[
77 op_log::DECL,
78 op_read_body::DECL,
79 op_close_body::DECL,
80 ]),
81 ..Default::default()
82 };
83
84 let mut js_runtime = JsRuntime::new(RuntimeOptions {
85 extensions: vec![ext],
86 ..Default::default()
87 });
88
89 let bootstrap = r#"
91 globalThis.console = {
92 log: (...args) => {
93 const msg = args.map(arg => {
94 if (typeof arg === 'object') {
95 try {
96 return JSON.stringify(arg);
97 } catch {
98 return String(arg);
99 }
100 }
101 return String(arg);
102 }).join(" ");
103 Deno.core.ops.op_log(msg);
104 }
105 };
106
107 class RelayBody {
108 constructor(rid) {
109 this.rid = rid;
110 }
111
112 async read(limit) {
113 return await Deno.core.ops.op_read_body(this.rid, limit || 65536);
114 }
115
116 close() {
117 Deno.core.ops.op_close_body(this.rid);
118 }
119
120 async text() {
121 const bytes = await this.read(10 * 1024 * 1024); // 10MB limit
122 return new TextDecoder().decode(bytes);
123 }
124
125 async json() {
126 const txt = await this.text();
127 return JSON.parse(txt);
128 }
129 }
130 globalThis.RelayBody = RelayBody;
131
132 globalThis.relay = {
133 log: globalThis.console.log,
134 // Future: add more helpers like base64, etc.
135 };
136 "#;
137 js_runtime.execute_script("bootstrap", bootstrap).unwrap();
138
139 while let Some(cmd) = rx.recv().await {
140 match cmd {
141 DenoCommand::LoadScript(script, resp) => {
142 let res = js_runtime.execute_script("<anon>", script);
143 let res = if let Err(e) = res {
144 Err(e.to_string())
145 } else {
146 js_runtime.run_event_loop(Default::default()).await
147 .map(|_| ())
148 .map_err(|e| e.to_string())
149 };
150 let _ = resp.send(res);
151 }
152 DenoCommand::OnRequestHeaders(flow, resp) => {
153 let res = Self::handle_on_request_headers(&mut js_runtime, flow);
154 let _ = resp.send(res);
155 }
156 DenoCommand::OnRequest(flow, body, resp) => {
157 let res = Self::handle_on_request(&mut js_runtime, flow, body).await;
158 let _ = resp.send(res);
159 }
160 DenoCommand::OnResponseHeaders(flow, resp) => {
161 let res = Self::handle_on_response_headers(&mut js_runtime, flow);
162 let _ = resp.send(res);
163 }
164 DenoCommand::OnResponse(flow, body, resp) => {
165 let res = Self::handle_on_response(&mut js_runtime, flow, body).await;
166 let _ = resp.send(res);
167 }
168 DenoCommand::OnWebSocketMessage(flow, message, resp) => {
169 let res = Self::handle_on_websocket_message(&mut js_runtime, flow, message);
170 let _ = resp.send(res);
171 }
172 }
173 }
174 });
175 });
176
177 Self { tx }
178 }
179
180 fn handle_on_request_headers(runtime: &mut JsRuntime, flow: Flow) -> Result<Option<Flow>, String> {
181 let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
182 let check_code = "typeof globalThis.onRequestHeaders === 'function'";
183 let exists = runtime.execute_script("check_onRequestHeaders", check_code).map_err(|e| e.to_string())?;
184 {
185 let scope = &mut runtime.handle_scope();
186 let exists_val = deno_core::v8::Local::new(scope, exists);
187 if !exists_val.is_true() { return Ok(None); }
188 }
189 let code = format!("globalThis.onRequestHeaders({{}}, {})", flow_json);
190 let result = runtime.execute_script("call_onRequestHeaders", code).map_err(|e| e.to_string())?;
191 let scope = &mut runtime.handle_scope();
192 let result_val = deno_core::v8::Local::new(scope, result);
193 if result_val.is_undefined() || result_val.is_null() { return Ok(None); }
194 let modified_flow: Flow = deno_core::serde_v8::from_v8(scope, result_val)
195 .map_err(|e| format!("Failed to deserialize flow: {}", e))?;
196 Ok(Some(modified_flow))
197 }
198
199 fn handle_on_response_headers(runtime: &mut JsRuntime, flow: Flow) -> Result<Option<Flow>, String> {
200 let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
201 let check_code = "typeof globalThis.onResponseHeaders === 'function'";
202 let exists = runtime.execute_script("check_onResponseHeaders", check_code).map_err(|e| e.to_string())?;
203 {
204 let scope = &mut runtime.handle_scope();
205 let exists_val = deno_core::v8::Local::new(scope, exists);
206 if !exists_val.is_true() { return Ok(None); }
207 }
208 let code = format!("globalThis.onResponseHeaders({{}}, {})", flow_json);
209 let result = runtime.execute_script("call_onResponseHeaders", code).map_err(|e| e.to_string())?;
210 let scope = &mut runtime.handle_scope();
211 let result_val = deno_core::v8::Local::new(scope, result);
212 if result_val.is_undefined() || result_val.is_null() { return Ok(None); }
213 let modified_flow: Flow = deno_core::serde_v8::from_v8(scope, result_val)
214 .map_err(|e| format!("Failed to deserialize flow: {}", e))?;
215 Ok(Some(modified_flow))
216 }
217
218 async fn handle_on_request(runtime: &mut JsRuntime, flow: Flow, body: HttpBody) -> Result<(Option<Flow>, RequestAction), String> {
219 let resource = HttpBodyResource::new(body);
220 let rid = {
221 let op_state_rc = runtime.op_state();
222 let mut state = op_state_rc.borrow_mut();
223 state.resource_table.add(resource)
224 };
225
226 let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
227
228 let check_code = "typeof globalThis.onRequest === 'function'";
229 let exists = runtime.execute_script("check_onRequest", check_code).map_err(|e| e.to_string())?;
230
231 let exists_bool = {
232 let scope = &mut runtime.handle_scope();
233 let exists_val = deno_core::v8::Local::new(scope, exists);
234 exists_val.is_true()
235 };
236
237 if !exists_bool {
238 let resource = {
240 let op_state_rc = runtime.op_state();
241 let mut state = op_state_rc.borrow_mut();
242 state.resource_table.take::<HttpBodyResource>(rid).ok()
243 };
244 if let Some(res) = resource {
245 let body = crate::streams::create_body_from_resource(&res);
246 return Ok((None, RequestAction::Continue(body)));
247 } else {
248 return Ok((None, RequestAction::Continue(http_body_util::Empty::new().map_err(|_| -> BoxError { unreachable!() }).boxed())));
249 }
250 }
251
252 let code = format!("globalThis.onRequest(new RelayBody({}), {})", rid, flow_json);
253 let result = runtime.execute_script("call_onRequest", code).map_err(|e| e.to_string())?;
254
255 let result = runtime.resolve(result).await.map_err(|e| e.to_string())?;
256
257 let (is_empty, modified_flow) = {
258 let scope = &mut runtime.handle_scope();
259 let result_val = deno_core::v8::Local::new(scope, result);
260
261 if result_val.is_undefined() || result_val.is_null() {
262 (true, None)
263 } else {
264 let flow: Flow = deno_core::serde_v8::from_v8(scope, result_val)
265 .map_err(|e| format!("Failed to deserialize flow: {}", e))?;
266 (false, Some(flow))
267 }
268 };
269
270 if is_empty {
271 let resource = {
273 let op_state_rc = runtime.op_state();
274 let mut state = op_state_rc.borrow_mut();
275 state.resource_table.take::<HttpBodyResource>(rid).ok()
276 };
277 if let Some(res) = resource {
278 let body = crate::streams::create_body_from_resource(&res);
279 return Ok((None, RequestAction::Continue(body)));
280 } else {
281 return Ok((None, RequestAction::Continue(http_body_util::Empty::new().map_err(|_| -> BoxError { unreachable!() }).boxed())));
282 }
283 }
284
285 let modified_flow = modified_flow.unwrap();
286
287 let resource = {
288 let op_state_rc = runtime.op_state();
289 let mut state = op_state_rc.borrow_mut();
290 state.resource_table.take::<HttpBodyResource>(rid).ok()
291 };
292
293 let new_body: HttpBody = if let Some(res) = resource {
294 let has_new_body = if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
297 http.request.body.as_ref().map(|b| !b.content.is_empty()).unwrap_or(false)
298 } else { false };
299
300 if has_new_body {
301 if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
304 if let Some(b) = &http.request.body {
305 let bytes: Bytes = if b.encoding == "base64" {
306 base64::engine::general_purpose::STANDARD.decode(&b.content).unwrap_or_default().into()
307 } else {
308 Bytes::from(b.content.clone())
309 };
310 Full::new(bytes).map_err(|e| -> BoxError { e.into() }).boxed()
311 } else {
312 http_body_util::Empty::new().map_err(|_| -> BoxError { unreachable!() }).boxed()
313 }
314 } else {
315 http_body_util::Empty::new().map_err(|_| -> BoxError { unreachable!() }).boxed()
316 }
317 } else {
318 crate::streams::create_body_from_resource(&res)
320 }
321 } else if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
322 if let Some(b) = &http.request.body {
325 let bytes: Bytes = if b.encoding == "base64" {
326 base64::engine::general_purpose::STANDARD.decode(&b.content).unwrap_or_default().into()
327 } else {
328 Bytes::from(b.content.clone())
329 };
330 Full::new(bytes).map_err(|e| -> BoxError { e.into() }).boxed()
331 } else {
332 http_body_util::Empty::new().map_err(|_| -> BoxError { unreachable!() }).boxed()
333 }
334 } else {
335 http_body_util::Empty::new().map_err(|_| -> BoxError { unreachable!() }).boxed()
336 };
337
338 Ok((Some(modified_flow), RequestAction::Continue(new_body)))
339 }
340
341 async fn handle_on_response(runtime: &mut JsRuntime, flow: Flow, body: HttpBody) -> Result<(Option<Flow>, ResponseAction), String> {
342 let resource = HttpBodyResource::new(body);
343 let rid = {
344 let op_state_rc = runtime.op_state();
345 let mut state = op_state_rc.borrow_mut();
346 state.resource_table.add(resource)
347 };
348
349 let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
350
351 let check_code = "typeof globalThis.onResponse === 'function'";
352 let exists = runtime.execute_script("check_onResponse", check_code).map_err(|e| e.to_string())?;
353
354 let exists_bool = {
355 let scope = &mut runtime.handle_scope();
356 let exists_val = deno_core::v8::Local::new(scope, exists);
357 exists_val.is_true()
358 };
359
360 if !exists_bool {
361 let resource = {
362 let op_state_rc = runtime.op_state();
363 let mut state = op_state_rc.borrow_mut();
364 state.resource_table.take::<HttpBodyResource>(rid).ok()
365 };
366 if let Some(res) = resource {
367 let body = crate::streams::create_body_from_resource(&res);
368 return Ok((None, ResponseAction::Continue(body)));
369 } else {
370 return Ok((None, ResponseAction::Continue(http_body_util::Empty::new().map_err(|_| -> BoxError { unreachable!() }).boxed())));
371 }
372 }
373
374 let code = format!("globalThis.onResponse(new RelayBody({}), {})", rid, flow_json);
375 let result = runtime.execute_script("call_onResponse", code).map_err(|e| e.to_string())?;
376 let result = runtime.resolve(result).await.map_err(|e| e.to_string())?;
377
378 let (is_empty, modified_flow) = {
379 let scope = &mut runtime.handle_scope();
380 let result_val = deno_core::v8::Local::new(scope, result);
381
382 if result_val.is_undefined() || result_val.is_null() {
383 (true, None)
384 } else {
385 let flow: Flow = deno_core::serde_v8::from_v8(scope, result_val)
386 .map_err(|e| format!("Failed to deserialize flow: {}", e))?;
387 (false, Some(flow))
388 }
389 };
390
391 if is_empty {
392 let resource = {
393 let op_state_rc = runtime.op_state();
394 let mut state = op_state_rc.borrow_mut();
395 state.resource_table.take::<HttpBodyResource>(rid).ok()
396 };
397 if let Some(res) = resource {
398 let body = crate::streams::create_body_from_resource(&res);
399 return Ok((None, ResponseAction::Continue(body)));
400 } else {
401 return Ok((None, ResponseAction::Continue(http_body_util::Empty::new().map_err(|_| -> BoxError { unreachable!() }).boxed())));
402 }
403 }
404
405 let modified_flow = modified_flow.unwrap();
406
407 let resource = {
408 let op_state_rc = runtime.op_state();
409 let mut state = op_state_rc.borrow_mut();
410 state.resource_table.take::<HttpBodyResource>(rid).ok()
411 };
412
413 let new_body: HttpBody = if let Some(res) = resource {
414 let has_new_body = if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
415 http.response.as_ref().and_then(|r| r.body.as_ref()).map(|b| !b.content.is_empty()).unwrap_or(false)
416 } else { false };
417
418 if has_new_body {
419 if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
420 if let Some(resp) = &http.response {
421 if let Some(b) = &resp.body {
422 let bytes: Bytes = if b.encoding == "base64" {
423 base64::engine::general_purpose::STANDARD.decode(&b.content).unwrap_or_default().into()
424 } else {
425 Bytes::from(b.content.clone())
426 };
427 Full::new(bytes).map_err(|e| -> BoxError { e.into() }).boxed()
428 } else {
429 http_body_util::Empty::new().map_err(|_| -> BoxError { unreachable!() }).boxed()
430 }
431 } else {
432 http_body_util::Empty::new().map_err(|_| -> BoxError { unreachable!() }).boxed()
433 }
434 } else {
435 http_body_util::Empty::new().map_err(|_| -> BoxError { unreachable!() }).boxed()
436 }
437 } else {
438 crate::streams::create_body_from_resource(&res)
439 }
440 } else if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
441 if let Some(resp) = &http.response {
442 if let Some(b) = &resp.body {
443 let bytes: Bytes = if b.encoding == "base64" {
444 base64::engine::general_purpose::STANDARD.decode(&b.content).unwrap_or_default().into()
445 } else {
446 Bytes::from(b.content.clone())
447 };
448 Full::new(bytes).map_err(|e| -> BoxError { e.into() }).boxed()
449 } else {
450 http_body_util::Empty::new().map_err(|_| -> BoxError { unreachable!() }).boxed()
451 }
452 } else {
453 http_body_util::Empty::new().map_err(|_| -> BoxError { unreachable!() }).boxed()
454 }
455 } else {
456 http_body_util::Empty::new().map_err(|_| -> BoxError { unreachable!() }).boxed()
457 };
458
459 Ok((Some(modified_flow), ResponseAction::Continue(new_body)))
460 }
461
462 fn handle_on_websocket_message(runtime: &mut JsRuntime, flow: Flow, message: WebSocketMessage) -> Result<WebSocketMessageAction, String> {
463 let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
464 let message_json = serde_json::to_string(&message).map_err(|e| e.to_string())?;
465
466 let check_code = "typeof globalThis.onWebSocketMessage === 'function'";
467 let exists = runtime.execute_script("check_onWebSocketMessage", check_code).map_err(|e| e.to_string())?;
468 {
469 let scope = &mut runtime.handle_scope();
470 let exists_val = deno_core::v8::Local::new(scope, exists);
471 if !exists_val.is_true() { return Ok(WebSocketMessageAction::Continue(message)); }
472 }
473
474 let code = format!("globalThis.onWebSocketMessage({{}}, {}, {})", flow_json, message_json);
475 let result = runtime.execute_script("call_onWebSocketMessage", code).map_err(|e| e.to_string())?;
476
477 let scope = &mut runtime.handle_scope();
478 let result_val = deno_core::v8::Local::new(scope, result);
479
480 if result_val.is_undefined() || result_val.is_null() {
481 return Ok(WebSocketMessageAction::Continue(message));
482 }
483
484 if result_val.is_string() {
486 let s = result_val.to_rust_string_lossy(scope);
487 if s == "DROP" {
488 return Ok(WebSocketMessageAction::Drop);
489 }
490 }
491
492 let modified_message: WebSocketMessage = deno_core::serde_v8::from_v8(scope, result_val)
493 .map_err(|e| format!("Failed to deserialize message: {}", e))?;
494
495 Ok(WebSocketMessageAction::Continue(modified_message))
496 }
497}
498
499#[async_trait]
500impl ScriptEngineTrait for DenoScriptEngine {
501 async fn load_script(&mut self, script: &str) -> Result<(), BoxError> {
502 let (tx, rx) = oneshot::channel();
503 self.tx.send(DenoCommand::LoadScript(script.to_string(), tx)).await.map_err(|e| Box::new(e) as BoxError)?;
504 rx.await.map_err(|e| Box::new(e) as BoxError)?.map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)
505 }
506
507 async fn on_request_headers(&self, flow: &mut Flow) -> Result<Option<Flow>, BoxError> {
508 let (tx, rx) = oneshot::channel();
509 let flow_clone = flow.clone();
510 self.tx.send(DenoCommand::OnRequestHeaders(flow_clone, tx)).await.map_err(|e| Box::new(e) as BoxError)?;
511 let res = rx.await.map_err(|e| Box::new(e) as BoxError)?.map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
512
513 if let Some(new_flow) = &res {
514 *flow = new_flow.clone();
515 }
516 Ok(res)
517 }
518
519 async fn on_request(&self, flow: &mut Flow, body: HttpBody) -> Result<RequestAction, BoxError> {
520 let (tx, rx) = oneshot::channel();
521 let flow_clone = flow.clone();
522 self.tx.send(DenoCommand::OnRequest(flow_clone, body, tx)).await.map_err(|e| Box::new(e) as BoxError)?;
523 let (new_flow, action) = rx.await.map_err(|e| Box::new(e) as BoxError)?.map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
524
525 if let Some(f) = new_flow {
526 *flow = f;
527 }
528 Ok(action)
529 }
530
531 async fn on_response_headers(&self, flow: &mut Flow) -> Result<Option<Flow>, BoxError> {
532 let (tx, rx) = oneshot::channel();
533 let flow_clone = flow.clone();
534 self.tx.send(DenoCommand::OnResponseHeaders(flow_clone, tx)).await.map_err(|e| Box::new(e) as BoxError)?;
535 let res = rx.await.map_err(|e| Box::new(e) as BoxError)?.map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
536
537 if let Some(new_flow) = &res {
538 *flow = new_flow.clone();
539 }
540 Ok(res)
541 }
542
543 async fn on_response(&self, flow: &mut Flow, body: HttpBody) -> Result<ResponseAction, BoxError> {
544 let (tx, rx) = oneshot::channel();
545 let flow_clone = flow.clone();
546 self.tx.send(DenoCommand::OnResponse(flow_clone, body, tx)).await.map_err(|e| Box::new(e) as BoxError)?;
547 let (new_flow, action) = rx.await.map_err(|e| Box::new(e) as BoxError)?.map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
548
549 if let Some(f) = new_flow {
550 *flow = f;
551 }
552 Ok(action)
553 }
554
555 async fn on_websocket_message(&self, _flow: &mut Flow, message: &mut WebSocketMessage) -> Result<WebSocketMessageAction, BoxError> {
556 let (tx, rx) = oneshot::channel();
557 let flow_clone = _flow.clone();
558 let message_clone = message.clone();
559 self.tx.send(DenoCommand::OnWebSocketMessage(flow_clone, message_clone, tx)).await.map_err(|e| Box::new(e) as BoxError)?;
560 let res = rx.await.map_err(|e| Box::new(e) as BoxError)?.map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
561
562 Ok(res)
563 }
564}