1use crate::engine_trait::ScriptEngineTrait;
2use crate::streams::HttpBodyResource;
3use async_trait::async_trait;
4use base64::Engine as _;
5use bytes::Bytes;
6use deno_core::{
7 Extension, JsRuntime, Op, OpState, ResourceId, RuntimeOptions, error::AnyError, op2,
8};
9use http_body_util::{BodyExt, Full};
10use relay_core_api::flow::{Flow, WebSocketMessage};
11use relay_core_lib::interceptor::{
12 BoxError, ConnectAction, ConnectionInfo, ConnectionStats, HttpBody, RequestAction,
13 ResponseAction, WebSocketMessageAction,
14};
15use std::cell::RefCell;
16use std::collections::{HashMap, HashSet};
17use std::rc::Rc;
18use std::thread;
19use tokio::sync::{mpsc, oneshot};
20
21#[op2(fast)]
22fn op_log_level(#[string] level: String, #[string] msg: String) {
23 match level.as_str() {
24 "error" => tracing::error!("[User Script] {}", msg),
25 "warn" => tracing::warn!("[User Script] {}", msg),
26 "info" => tracing::info!("[User Script] {}", msg),
27 "debug" => tracing::debug!("[User Script] {}", msg),
28 _ => tracing::info!("[User Script] {}", msg),
29 }
30}
31
32#[op2(async)]
33#[serde]
34async fn op_read_body(
35 state: Rc<RefCell<OpState>>,
36 #[smi] rid: ResourceId,
37 #[smi] limit: usize,
38) -> Result<Vec<u8>, AnyError> {
39 let resource = {
40 let state = state.borrow();
41 state.resource_table.get_any(rid)?
42 };
43 let view = resource.read(limit).await?;
44 Ok(view.to_vec())
45}
46
47#[op2(fast)]
48fn op_close_body(state: &mut OpState, #[smi] rid: ResourceId) {
49 state.resource_table.take_any(rid).ok();
50}
51
52const SHARED_STATE_SOFT_CAP: usize = 10_000;
56
57fn shared_state(state: &mut OpState) -> &mut HashMap<String, serde_json::Value> {
58 state.borrow_mut::<HashMap<String, serde_json::Value>>()
59}
60
61#[op2]
62#[serde]
63fn op_shared_state_get(state: &mut OpState, #[string] key: String) -> Option<serde_json::Value> {
64 shared_state(state).get(&key).cloned()
65}
66
67#[op2]
68fn op_shared_state_set(
69 state: &mut OpState,
70 #[string] key: String,
71 #[serde] value: serde_json::Value,
72) {
73 let map = shared_state(state);
74 if !map.contains_key(&key) && map.len() >= SHARED_STATE_SOFT_CAP {
76 tracing::warn!(
77 "sharedState exceeds soft cap ({} keys): user scripts should delete unused keys. \
78 Use sharedState.size() to check.",
79 map.len()
80 );
81 }
82 map.insert(key, value);
83}
84
85#[op2(fast)]
86fn op_shared_state_delete(state: &mut OpState, #[string] key: String) -> bool {
87 shared_state(state).remove(&key).is_some()
88}
89
90#[op2(fast)]
91fn op_shared_state_clear(state: &mut OpState) {
92 shared_state(state).clear();
93}
94
95#[op2]
96#[serde]
97fn op_shared_state_keys(state: &mut OpState) -> Vec<String> {
98 shared_state(state).keys().cloned().collect()
99}
100
101#[op2(fast)]
102fn op_shared_state_size(state: &OpState) -> u32 {
103 let map = state.borrow::<HashMap<String, serde_json::Value>>();
104 map.len() as u32
105}
106
107static SCRIPT_ENV_ACCESS_TOTAL: std::sync::atomic::AtomicUsize =
111 std::sync::atomic::AtomicUsize::new(0);
112
113fn env_allow(state: &OpState) -> &HashSet<String> {
114 state.borrow::<HashSet<String>>()
115}
116
117#[op2]
118#[string]
119fn op_env_get(state: &OpState, #[string] key: String) -> Option<String> {
120 let allowed = env_allow(state);
121 SCRIPT_ENV_ACCESS_TOTAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
123 if allowed.is_empty() || !allowed.contains(&key) {
124 return None;
125 }
126 std::env::var(&key).ok()
127}
128
129pub fn get_script_env_access_total() -> usize {
131 SCRIPT_ENV_ACCESS_TOTAL.load(std::sync::atomic::Ordering::Relaxed)
132}
133
134#[derive(Clone)]
138pub struct ScriptFetchConfig {
139 pub enabled: bool,
141 pub allow_hosts: HashSet<String>,
143 pub max_concurrency: usize,
145 pub timeout_ms: u64,
147 pub proxy_listen_port: u16,
149}
150
151impl Default for ScriptFetchConfig {
152 fn default() -> Self {
153 Self {
154 enabled: false,
155 allow_hosts: HashSet::new(),
156 max_concurrency: 8,
157 timeout_ms: 5000,
158 proxy_listen_port: 0,
159 }
160 }
161}
162
163fn fetch_config(state: &OpState) -> &ScriptFetchConfig {
164 state.borrow::<ScriptFetchConfig>()
165}
166
167static SCRIPT_FETCH_TOTAL: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
169static SCRIPT_FETCH_REJECTED_TOTAL: std::sync::atomic::AtomicUsize =
170 std::sync::atomic::AtomicUsize::new(0);
171
172#[op2]
175#[string]
176fn op_script_fetch(state: &OpState, #[string] url: String) -> String {
177 SCRIPT_FETCH_TOTAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
178 let config = fetch_config(state);
179
180 if !config.enabled {
181 SCRIPT_FETCH_REJECTED_TOTAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
182 return serde_json::json!({"ok": false, "error": "script fetch disabled"}).to_string();
183 }
184
185 if !config.allow_hosts.is_empty()
186 && let Ok(parsed) = url::Url::parse(&url)
187 && let Some(host) = parsed.host_str()
188 && !config.allow_hosts.contains(host)
189 {
190 SCRIPT_FETCH_REJECTED_TOTAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
191 return serde_json::json!({"ok": false, "error": "host not in allowlist"}).to_string();
192 }
193
194 if let Ok(parsed) = url::Url::parse(&url)
195 && let Some(port) = parsed.port_or_known_default()
196 && port == config.proxy_listen_port
197 && matches!(parsed.scheme(), "http" | "https")
198 {
199 let host = parsed.host_str().unwrap_or("");
200 if host == "localhost" || host == "127.0.0.1" || host == "::1" {
201 SCRIPT_FETCH_REJECTED_TOTAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
202 return serde_json::json!({"ok": false, "error": "recursive fetch to self rejected"})
203 .to_string();
204 }
205 }
206
207 let timeout = std::time::Duration::from_millis(config.timeout_ms);
211 let agent = ureq::AgentBuilder::new()
212 .timeout_read(timeout)
213 .timeout_connect(timeout)
214 .build();
215 match agent.get(&url).call() {
216 Ok(resp) => {
217 let status = resp.status();
218 let body = resp.into_string().unwrap_or_default();
219 serde_json::json!({"ok": true, "status": status, "body": body}).to_string()
220 }
221 Err(ureq::Error::Status(code, resp)) => {
222 let body = resp.into_string().unwrap_or_default();
223 serde_json::json!({"ok": false, "status": code, "body": body, "error": format!("HTTP {}", code)}).to_string()
224 }
225 Err(e) => {
226 SCRIPT_FETCH_REJECTED_TOTAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
227 serde_json::json!({"ok": false, "error": e.to_string()}).to_string()
228 }
229 }
230}
231
232pub fn get_script_fetch_total() -> usize {
234 SCRIPT_FETCH_TOTAL.load(std::sync::atomic::Ordering::Relaxed)
235}
236
237pub fn get_script_fetch_rejected_total() -> usize {
239 SCRIPT_FETCH_REJECTED_TOTAL.load(std::sync::atomic::Ordering::Relaxed)
240}
241
242#[op2]
245#[string]
246fn op_uuid_v4() -> String {
247 uuid::Uuid::new_v4().to_string()
248}
249
250#[op2]
251#[string]
252fn op_hash(#[string] algorithm: String, #[string] data: String) -> Result<String, AnyError> {
253 use sha1::Sha1;
254 use sha2::{Sha256, Sha512};
255 let bytes = data.as_bytes();
256 let hex = match algorithm.as_str() {
257 "sha1" => {
258 use sha1::Digest;
259 data_encoding::HEXLOWER.encode(&Sha1::digest(bytes))
260 }
261 "sha256" => {
262 use sha2::Digest;
263 data_encoding::HEXLOWER.encode(&Sha256::digest(bytes))
264 }
265 "sha512" => {
266 use sha2::Digest;
267 data_encoding::HEXLOWER.encode(&Sha512::digest(bytes))
268 }
269 "md5" => {
270 use md5::Digest;
271 data_encoding::HEXLOWER.encode(&md5::Md5::digest(bytes))
272 }
273 other => {
274 return Err(AnyError::msg(format!(
275 "unsupported hash algorithm: {}. Supported: sha1, sha256, sha512, md5",
276 other
277 )));
278 }
279 };
280 Ok(hex)
281}
282
283#[op2]
284#[string]
285fn op_base64_encode(#[string] data: String) -> String {
286 use base64::Engine as _;
287 base64::engine::general_purpose::STANDARD.encode(data.as_bytes())
288}
289
290#[op2]
291#[string]
292fn op_base64_decode(#[string] data: String) -> Result<String, AnyError> {
293 use base64::Engine as _;
294 let bytes = base64::engine::general_purpose::STANDARD
295 .decode(data.as_bytes())
296 .map_err(|e| AnyError::msg(format!("base64 decode error: {}", e)))?;
297 String::from_utf8(bytes).map_err(|e| AnyError::msg(format!("utf-8 error: {}", e)))
298}
299
300#[op2]
301#[serde]
302fn op_json_parse_safe(#[string] data: String) -> serde_json::Value {
303 serde_json::from_str(&data).unwrap_or(serde_json::Value::Null)
304}
305
306#[op2]
307#[string]
308fn op_json_stringify_pretty(#[serde] value: serde_json::Value) -> String {
309 serde_json::to_string_pretty(&value).unwrap_or_else(|_| String::new())
310}
311
312enum DenoCommand {
313 LoadScript(String, oneshot::Sender<Result<(), String>>),
314 OnConnect(
315 ConnectionInfo,
316 oneshot::Sender<Result<ConnectAction, String>>,
317 ),
318 OnDisconnect(
319 ConnectionInfo,
320 ConnectionStats,
321 oneshot::Sender<Result<(), String>>,
322 ),
323 OnRequestHeaders(Flow, oneshot::Sender<Result<Option<Flow>, String>>),
324 OnRequest(
325 Flow,
326 HttpBody,
327 oneshot::Sender<Result<(Option<Flow>, RequestAction), String>>,
328 ),
329 OnResponseHeaders(Flow, oneshot::Sender<Result<Option<Flow>, String>>),
330 OnResponse(
331 Flow,
332 HttpBody,
333 oneshot::Sender<Result<(Option<Flow>, ResponseAction), String>>,
334 ),
335 OnWebSocketMessage(
336 Flow,
337 WebSocketMessage,
338 oneshot::Sender<Result<WebSocketMessageAction, String>>,
339 ),
340 OnWebSocketStart(Flow, oneshot::Sender<Result<Option<Flow>, String>>),
341 OnWebSocketEnd(
342 Flow,
343 u16,
344 String,
345 oneshot::Sender<Result<Option<Flow>, String>>,
346 ),
347 OnWebSocketError(Flow, String, oneshot::Sender<Result<Option<Flow>, String>>),
348}
349
350#[derive(Clone)]
351pub struct DenoScriptEngine {
352 tx: mpsc::Sender<DenoCommand>,
353}
354
355impl Default for DenoScriptEngine {
356 fn default() -> Self {
357 Self::new(HashSet::new())
358 }
359}
360
361impl DenoScriptEngine {
362 pub fn new(env_allow: HashSet<String>) -> Self {
363 Self::new_with_fetch(env_allow, ScriptFetchConfig::default())
364 }
365
366 pub fn new_with_fetch(env_allow: HashSet<String>, fetch_config: ScriptFetchConfig) -> Self {
367 let (tx, mut rx) = mpsc::channel(32);
368
369 thread::spawn(move || {
370 let rt = tokio::runtime::Builder::new_current_thread()
371 .enable_all()
372 .build()
373 .unwrap();
374
375 rt.block_on(async move {
376 let env_allow = env_allow; let ext = Extension {
378 name: "relay_core",
379 ops: std::borrow::Cow::Borrowed(&[
380 op_log_level::DECL,
381 op_read_body::DECL,
382 op_close_body::DECL,
383 op_shared_state_get::DECL,
384 op_shared_state_set::DECL,
385 op_shared_state_delete::DECL,
386 op_shared_state_clear::DECL,
387 op_shared_state_keys::DECL,
388 op_shared_state_size::DECL,
389 op_env_get::DECL,
390 op_uuid_v4::DECL,
391 op_hash::DECL,
392 op_base64_encode::DECL,
393 op_base64_decode::DECL,
394 op_json_parse_safe::DECL,
395 op_json_stringify_pretty::DECL,
396 op_script_fetch::DECL,
397 ]),
398 op_state_fn: Some(Box::new({
399 let env_allow = env_allow.clone();
400 let fetch_config = fetch_config.clone();
401 move |state| {
402 state.put(HashMap::<String, serde_json::Value>::new());
403 state.put(env_allow.clone());
404 state.put(fetch_config.clone());
405 }
406 })),
407 ..Default::default()
408 };
409
410 let mut js_runtime = JsRuntime::new(RuntimeOptions {
411 extensions: vec![ext],
412 ..Default::default()
413 });
414
415 let bootstrap = r#"
417 globalThis.console = {
418 log: (...args) => {
419 Deno.core.ops.op_log_level("log", _format(args));
420 },
421 info: (...args) => {
422 Deno.core.ops.op_log_level("info", _format(args));
423 },
424 warn: (...args) => {
425 Deno.core.ops.op_log_level("warn", _format(args));
426 },
427 error: (...args) => {
428 Deno.core.ops.op_log_level("error", _format(args));
429 },
430 debug: (...args) => {
431 Deno.core.ops.op_log_level("debug", _format(args));
432 },
433 };
434
435 function _format(args) {
436 return args.map(arg => {
437 if (typeof arg === 'object') {
438 try { return JSON.stringify(arg); }
439 catch { return String(arg); }
440 }
441 return String(arg);
442 }).join(" ");
443 }
444
445 class RelayBody {
446 constructor(rid) { this.rid = rid; }
447 async read(limit) {
448 return await Deno.core.ops.op_read_body(this.rid, limit || 65536);
449 }
450 close() {
451 Deno.core.ops.op_close_body(this.rid);
452 }
453 async text() {
454 const bytes = await this.read(10 * 1024 * 1024);
455 return new TextDecoder().decode(bytes);
456 }
457 async json() {
458 const txt = await this.text();
459 return JSON.parse(txt);
460 }
461 }
462 globalThis.RelayBody = RelayBody;
463
464 globalThis.relay = {
465 log: globalThis.console.log,
466 env: function(name) {
467 return Deno.core.ops.op_env_get(name) ?? undefined;
468 },
469 uuid: function() {
470 return Deno.core.ops.op_uuid_v4();
471 },
472 hash: function(alg, data) {
473 return Deno.core.ops.op_hash(alg, data);
474 },
475 base64: {
476 encode: function(data) {
477 return Deno.core.ops.op_base64_encode(data);
478 },
479 decode: function(data) {
480 return Deno.core.ops.op_base64_decode(data);
481 },
482 },
483 json: {
484 parseSafe: function(str) {
485 return Deno.core.ops.op_json_parse_safe(str);
486 },
487 stringifyPretty: function(obj) {
488 return Deno.core.ops.op_json_stringify_pretty(obj);
489 },
490 },
491 fetch: function(url) {
492 return JSON.parse(Deno.core.ops.op_script_fetch(url));
493 },
494 };
495
496 // S12a: ctx.setTag / ctx.setVariable (script→rule injection) deferred to 1.x.
497 // Cross-thread synchronous V8 callback into the rule execution engine would
498 // require architectural changes that risk rule engine atomicity.
499 // Script-side rule context reading (flow.matched_rules, flow.rule_variables)
500 // is fully supported (S10a/S11).
501
502 // S1: sharedState — cross-hook shared map per isolate
503 globalThis.sharedState = {
504 get(key) {
505 return Deno.core.ops.op_shared_state_get(key);
506 },
507 set(key, value) {
508 Deno.core.ops.op_shared_state_set(key, value);
509 },
510 delete(key) {
511 return Deno.core.ops.op_shared_state_delete(key);
512 },
513 clear() {
514 Deno.core.ops.op_shared_state_clear();
515 },
516 keys() {
517 return Deno.core.ops.op_shared_state_keys();
518 },
519 size() {
520 return Deno.core.ops.op_shared_state_size();
521 },
522 };
523 "#;
524 js_runtime.execute_script("bootstrap", bootstrap).unwrap();
525
526 while let Some(cmd) = rx.recv().await {
527 match cmd {
528 DenoCommand::LoadScript(script, resp) => {
529 let res = js_runtime.execute_script("<anon>", script);
530 let res = if let Err(e) = res {
531 Err(e.to_string())
532 } else {
533 js_runtime
534 .run_event_loop(Default::default())
535 .await
536 .map(|_| ())
537 .map_err(|e| e.to_string())
538 };
539 let _ = resp.send(res);
540 }
541 DenoCommand::OnRequestHeaders(flow, resp) => {
542 let res = Self::handle_on_request_headers(&mut js_runtime, flow);
543 let _ = resp.send(res);
544 }
545 DenoCommand::OnRequest(flow, body, resp) => {
546 let res = Self::handle_on_request(&mut js_runtime, flow, body).await;
547 let _ = resp.send(res);
548 }
549 DenoCommand::OnResponseHeaders(flow, resp) => {
550 let res = Self::handle_on_response_headers(&mut js_runtime, flow);
551 let _ = resp.send(res);
552 }
553 DenoCommand::OnResponse(flow, body, resp) => {
554 let res = Self::handle_on_response(&mut js_runtime, flow, body).await;
555 let _ = resp.send(res);
556 }
557 DenoCommand::OnWebSocketMessage(flow, message, resp) => {
558 let res =
559 Self::handle_on_websocket_message(&mut js_runtime, flow, message);
560 let _ = resp.send(res);
561 }
562 DenoCommand::OnConnect(conn, resp) => {
563 let res = Self::handle_on_connect(&mut js_runtime, conn);
564 let _ = resp.send(res);
565 }
566 DenoCommand::OnDisconnect(conn, stats, resp) => {
567 let res = Self::handle_on_disconnect(&mut js_runtime, conn, stats);
568 let _ = resp.send(res);
569 }
570 DenoCommand::OnWebSocketStart(flow, resp) => {
571 let res = Self::handle_on_websocket_start(&mut js_runtime, flow);
572 let _ = resp.send(res);
573 }
574 DenoCommand::OnWebSocketEnd(flow, code, reason, resp) => {
575 let res =
576 Self::handle_on_websocket_end(&mut js_runtime, flow, code, reason);
577 let _ = resp.send(res);
578 }
579 DenoCommand::OnWebSocketError(flow, error, resp) => {
580 let res = Self::handle_on_websocket_error(&mut js_runtime, flow, error);
581 let _ = resp.send(res);
582 }
583 }
584 }
585 });
586 });
587
588 Self { tx }
589 }
590
591 fn try_call_on_error(runtime: &mut JsRuntime, flow: &Flow, error: &str, stage: &str) {
594 let check_code = "typeof globalThis.onError === 'function'";
595 let exists = runtime
596 .execute_script("check_onError_v2", check_code)
597 .ok()
598 .map(|v| {
599 let mut scope = runtime.handle_scope();
600 let val = deno_core::v8::Local::new(&mut scope, v);
601 val.is_true()
602 })
603 .unwrap_or(false);
604
605 if !exists {
606 return;
607 }
608
609 let flow_json = match serde_json::to_string(flow) {
610 Ok(j) => j,
611 Err(_) => return,
612 };
613 let error_escaped = error.replace('\\', "\\\\").replace('\'', "\\'");
614 let code = format!(
615 "globalThis.onError({{}}, {}, '{}', '{}')",
616 flow_json, error_escaped, stage
617 );
618
619 let _ = runtime.execute_script("call_onError_v2", code);
620 }
621
622 fn handle_on_request_headers(
623 runtime: &mut JsRuntime,
624 flow: Flow,
625 ) -> Result<Option<Flow>, String> {
626 let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
627 let check_code = "typeof globalThis.onRequestHeaders === 'function'";
628 let exists = runtime
629 .execute_script("check_onRequestHeaders", check_code)
630 .map_err(|e| {
631 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequestHeaders");
632 e.to_string()
633 })?;
634 {
635 let mut scope = runtime.handle_scope();
636 let exists_val = deno_core::v8::Local::new(&mut scope, exists);
637 if !exists_val.is_true() {
638 return Ok(None);
639 }
640 }
641 let code = format!("globalThis.onRequestHeaders({{}}, {})", flow_json);
642 let result = runtime
643 .execute_script("call_onRequestHeaders", code)
644 .map_err(|e| {
645 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequestHeaders");
646 e.to_string()
647 })?;
648 let mut scope = runtime.handle_scope();
649 let result_val = deno_core::v8::Local::new(&mut scope, result);
650 if result_val.is_undefined() || result_val.is_null() {
651 return Ok(None);
652 }
653 let deser: Result<Flow, _> = deno_core::serde_v8::from_v8(&mut scope, result_val);
654 drop(scope);
655 let modified_flow = match deser {
656 Ok(f) => f,
657 Err(e) => {
658 let err_str = format!("Failed to deserialize flow: {}", e);
659 Self::try_call_on_error(runtime, &flow, &err_str, "onRequestHeaders");
660 return Err(err_str);
661 }
662 };
663 Ok(Some(modified_flow))
664 }
665
666 fn handle_on_response_headers(
667 runtime: &mut JsRuntime,
668 flow: Flow,
669 ) -> Result<Option<Flow>, String> {
670 let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
671 let check_code = "typeof globalThis.onResponseHeaders === 'function'";
672 let exists = runtime
673 .execute_script("check_onResponseHeaders", check_code)
674 .map_err(|e| {
675 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponseHeaders");
676 e.to_string()
677 })?;
678 {
679 let mut scope = runtime.handle_scope();
680 let exists_val = deno_core::v8::Local::new(&mut scope, exists);
681 if !exists_val.is_true() {
682 return Ok(None);
683 }
684 }
685 let code = format!("globalThis.onResponseHeaders({{}}, {})", flow_json);
686 let result = runtime
687 .execute_script("call_onResponseHeaders", code)
688 .map_err(|e| {
689 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponseHeaders");
690 e.to_string()
691 })?;
692 let mut scope = runtime.handle_scope();
693 let result_val = deno_core::v8::Local::new(&mut scope, result);
694 if result_val.is_undefined() || result_val.is_null() {
695 return Ok(None);
696 }
697 let deser: Result<Flow, _> = deno_core::serde_v8::from_v8(&mut scope, result_val);
698 drop(scope);
699 let modified_flow = match deser {
700 Ok(f) => f,
701 Err(e) => {
702 let err_str = format!("Failed to deserialize flow: {}", e);
703 Self::try_call_on_error(runtime, &flow, &err_str, "onResponseHeaders");
704 return Err(err_str);
705 }
706 };
707 Ok(Some(modified_flow))
708 }
709
710 async fn handle_on_request(
711 runtime: &mut JsRuntime,
712 flow: Flow,
713 body: HttpBody,
714 ) -> Result<(Option<Flow>, RequestAction), String> {
715 let resource = HttpBodyResource::new(body);
716 let rid = {
717 let op_state_rc = runtime.op_state();
718 let mut state = op_state_rc.borrow_mut();
719 state.resource_table.add(resource)
720 };
721
722 let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
723
724 let check_code = "typeof globalThis.onRequest === 'function'";
725 let exists = runtime
726 .execute_script("check_onRequest", check_code)
727 .map_err(|e| {
728 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequest");
729 e.to_string()
730 })?;
731
732 let exists_bool = {
733 let scope = &mut runtime.handle_scope();
734 let exists_val = deno_core::v8::Local::new(scope, exists);
735 exists_val.is_true()
736 };
737
738 if !exists_bool {
739 let resource = {
740 let op_state_rc = runtime.op_state();
741 let mut state = op_state_rc.borrow_mut();
742 state.resource_table.take::<HttpBodyResource>(rid).ok()
743 };
744 if let Some(res) = resource {
745 let body = crate::streams::create_body_from_resource(&res);
746 return Ok((None, RequestAction::Continue(body)));
747 } else {
748 return Ok((
749 None,
750 RequestAction::Continue(
751 http_body_util::Empty::new()
752 .map_err(|_| -> BoxError { unreachable!() })
753 .boxed(),
754 ),
755 ));
756 }
757 }
758
759 let code = format!(
760 "globalThis.onRequest(new RelayBody({}), {})",
761 rid, flow_json
762 );
763 let result = runtime
764 .execute_script("call_onRequest", code)
765 .map_err(|e| {
766 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequest");
767 e.to_string()
768 })?;
769
770 let result = runtime.resolve(result).await.map_err(|e| {
771 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onRequest");
772 e.to_string()
773 })?;
774
775 let (is_empty, modified_flow) = {
776 let mut scope = runtime.handle_scope();
777 let result_val = deno_core::v8::Local::new(&mut scope, result);
778
779 if result_val.is_undefined() || result_val.is_null() {
780 (true, None)
781 } else {
782 let deser: Result<Flow, _> = deno_core::serde_v8::from_v8(&mut scope, result_val);
783 drop(scope);
784 match deser {
785 Ok(f) => (false, Some(f)),
786 Err(e) => {
787 let err_str = format!("Failed to deserialize flow: {}", e);
788 Self::try_call_on_error(runtime, &flow, &err_str, "onRequest");
789 return Err(err_str);
790 }
791 }
792 }
793 };
794
795 if is_empty {
796 let resource = {
797 let op_state_rc = runtime.op_state();
798 let mut state = op_state_rc.borrow_mut();
799 state.resource_table.take::<HttpBodyResource>(rid).ok()
800 };
801 if let Some(res) = resource {
802 let body = crate::streams::create_body_from_resource(&res);
803 return Ok((None, RequestAction::Continue(body)));
804 } else {
805 return Ok((
806 None,
807 RequestAction::Continue(
808 http_body_util::Empty::new()
809 .map_err(|_| -> BoxError { unreachable!() })
810 .boxed(),
811 ),
812 ));
813 }
814 }
815
816 let modified_flow = modified_flow.unwrap();
817
818 let resource = {
819 let op_state_rc = runtime.op_state();
820 let mut state = op_state_rc.borrow_mut();
821 state.resource_table.take::<HttpBodyResource>(rid).ok()
822 };
823
824 let new_body: HttpBody = if let Some(res) = resource {
825 let has_new_body = if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer
826 {
827 http.request
828 .body
829 .as_ref()
830 .map(|b| !b.content.is_empty())
831 .unwrap_or(false)
832 } else {
833 false
834 };
835
836 if has_new_body {
837 if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
838 if let Some(b) = &http.request.body {
839 let bytes: Bytes = if b.encoding == "base64" {
840 base64::engine::general_purpose::STANDARD
841 .decode(&b.content)
842 .unwrap_or_default()
843 .into()
844 } else {
845 Bytes::from(b.content.clone())
846 };
847 Full::new(bytes)
848 .map_err(|e| -> BoxError { e.into() })
849 .boxed()
850 } else {
851 http_body_util::Empty::new()
852 .map_err(|_| -> BoxError { unreachable!() })
853 .boxed()
854 }
855 } else {
856 http_body_util::Empty::new()
857 .map_err(|_| -> BoxError { unreachable!() })
858 .boxed()
859 }
860 } else {
861 crate::streams::create_body_from_resource(&res)
862 }
863 } else if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
864 if let Some(b) = &http.request.body {
865 let bytes: Bytes = if b.encoding == "base64" {
866 base64::engine::general_purpose::STANDARD
867 .decode(&b.content)
868 .unwrap_or_default()
869 .into()
870 } else {
871 Bytes::from(b.content.clone())
872 };
873 Full::new(bytes)
874 .map_err(|e| -> BoxError { e.into() })
875 .boxed()
876 } else {
877 http_body_util::Empty::new()
878 .map_err(|_| -> BoxError { unreachable!() })
879 .boxed()
880 }
881 } else {
882 http_body_util::Empty::new()
883 .map_err(|_| -> BoxError { unreachable!() })
884 .boxed()
885 };
886
887 Ok((Some(modified_flow), RequestAction::Continue(new_body)))
888 }
889
890 async fn handle_on_response(
891 runtime: &mut JsRuntime,
892 flow: Flow,
893 body: HttpBody,
894 ) -> Result<(Option<Flow>, ResponseAction), String> {
895 let resource = HttpBodyResource::new(body);
896 let rid = {
897 let op_state_rc = runtime.op_state();
898 let mut state = op_state_rc.borrow_mut();
899 state.resource_table.add(resource)
900 };
901
902 let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
903
904 let check_code = "typeof globalThis.onResponse === 'function'";
905 let exists = runtime
906 .execute_script("check_onResponse", check_code)
907 .map_err(|e| {
908 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponse");
909 e.to_string()
910 })?;
911
912 let exists_bool = {
913 let scope = &mut runtime.handle_scope();
914 let exists_val = deno_core::v8::Local::new(scope, exists);
915 exists_val.is_true()
916 };
917
918 if !exists_bool {
919 let resource = {
920 let op_state_rc = runtime.op_state();
921 let mut state = op_state_rc.borrow_mut();
922 state.resource_table.take::<HttpBodyResource>(rid).ok()
923 };
924 if let Some(res) = resource {
925 let body = crate::streams::create_body_from_resource(&res);
926 return Ok((None, ResponseAction::Continue(body)));
927 } else {
928 return Ok((
929 None,
930 ResponseAction::Continue(
931 http_body_util::Empty::new()
932 .map_err(|_| -> BoxError { unreachable!() })
933 .boxed(),
934 ),
935 ));
936 }
937 }
938
939 let code = format!(
940 "globalThis.onResponse(new RelayBody({}), {})",
941 rid, flow_json
942 );
943 let result = runtime
944 .execute_script("call_onResponse", code)
945 .map_err(|e| {
946 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponse");
947 e.to_string()
948 })?;
949 let result = runtime.resolve(result).await.map_err(|e| {
950 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onResponse");
951 e.to_string()
952 })?;
953
954 let (is_empty, modified_flow) = {
955 let mut scope = runtime.handle_scope();
956 let result_val = deno_core::v8::Local::new(&mut scope, result);
957
958 if result_val.is_undefined() || result_val.is_null() {
959 (true, None)
960 } else {
961 let deser: Result<Flow, _> = deno_core::serde_v8::from_v8(&mut scope, result_val);
962 drop(scope);
963 match deser {
964 Ok(f) => (false, Some(f)),
965 Err(e) => {
966 let err_str = format!("Failed to deserialize flow: {}", e);
967 Self::try_call_on_error(runtime, &flow, &err_str, "onResponse");
968 return Err(err_str);
969 }
970 }
971 }
972 };
973
974 if is_empty {
975 let resource = {
976 let op_state_rc = runtime.op_state();
977 let mut state = op_state_rc.borrow_mut();
978 state.resource_table.take::<HttpBodyResource>(rid).ok()
979 };
980 if let Some(res) = resource {
981 let body = crate::streams::create_body_from_resource(&res);
982 return Ok((None, ResponseAction::Continue(body)));
983 } else {
984 return Ok((
985 None,
986 ResponseAction::Continue(
987 http_body_util::Empty::new()
988 .map_err(|_| -> BoxError { unreachable!() })
989 .boxed(),
990 ),
991 ));
992 }
993 }
994
995 let modified_flow = modified_flow.unwrap();
996
997 let resource = {
998 let op_state_rc = runtime.op_state();
999 let mut state = op_state_rc.borrow_mut();
1000 state.resource_table.take::<HttpBodyResource>(rid).ok()
1001 };
1002
1003 let new_body: HttpBody = if let Some(res) = resource {
1004 let has_new_body = if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer
1005 {
1006 http.response
1007 .as_ref()
1008 .and_then(|r| r.body.as_ref())
1009 .map(|b| !b.content.is_empty())
1010 .unwrap_or(false)
1011 } else {
1012 false
1013 };
1014
1015 if has_new_body {
1016 if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
1017 if let Some(resp) = &http.response {
1018 if let Some(b) = &resp.body {
1019 let bytes: Bytes = if b.encoding == "base64" {
1020 base64::engine::general_purpose::STANDARD
1021 .decode(&b.content)
1022 .unwrap_or_default()
1023 .into()
1024 } else {
1025 Bytes::from(b.content.clone())
1026 };
1027 Full::new(bytes)
1028 .map_err(|e| -> BoxError { e.into() })
1029 .boxed()
1030 } else {
1031 http_body_util::Empty::new()
1032 .map_err(|_| -> BoxError { unreachable!() })
1033 .boxed()
1034 }
1035 } else {
1036 http_body_util::Empty::new()
1037 .map_err(|_| -> BoxError { unreachable!() })
1038 .boxed()
1039 }
1040 } else {
1041 http_body_util::Empty::new()
1042 .map_err(|_| -> BoxError { unreachable!() })
1043 .boxed()
1044 }
1045 } else {
1046 crate::streams::create_body_from_resource(&res)
1047 }
1048 } else if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
1049 if let Some(resp) = &http.response {
1050 if let Some(b) = &resp.body {
1051 let bytes: Bytes = if b.encoding == "base64" {
1052 base64::engine::general_purpose::STANDARD
1053 .decode(&b.content)
1054 .unwrap_or_default()
1055 .into()
1056 } else {
1057 Bytes::from(b.content.clone())
1058 };
1059 Full::new(bytes)
1060 .map_err(|e| -> BoxError { e.into() })
1061 .boxed()
1062 } else {
1063 http_body_util::Empty::new()
1064 .map_err(|_| -> BoxError { unreachable!() })
1065 .boxed()
1066 }
1067 } else {
1068 http_body_util::Empty::new()
1069 .map_err(|_| -> BoxError { unreachable!() })
1070 .boxed()
1071 }
1072 } else {
1073 http_body_util::Empty::new()
1074 .map_err(|_| -> BoxError { unreachable!() })
1075 .boxed()
1076 };
1077
1078 Ok((Some(modified_flow), ResponseAction::Continue(new_body)))
1079 }
1080
1081 fn handle_on_websocket_message(
1082 runtime: &mut JsRuntime,
1083 flow: Flow,
1084 message: WebSocketMessage,
1085 ) -> Result<WebSocketMessageAction, String> {
1086 let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
1087 let message_json = serde_json::to_string(&message).map_err(|e| e.to_string())?;
1088
1089 let check_code = "typeof globalThis.onWebSocketMessage === 'function'";
1090 let exists = runtime
1091 .execute_script("check_onWebSocketMessage", check_code)
1092 .map_err(|e| {
1093 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onWebSocketMessage");
1094 e.to_string()
1095 })?;
1096 {
1097 let mut scope = runtime.handle_scope();
1098 let exists_val = deno_core::v8::Local::new(&mut scope, exists);
1099 if !exists_val.is_true() {
1100 return Ok(WebSocketMessageAction::Continue(message));
1101 }
1102 }
1103
1104 let code = format!(
1105 "globalThis.onWebSocketMessage({{}}, {}, {})",
1106 flow_json, message_json
1107 );
1108 let result = runtime
1109 .execute_script("call_onWebSocketMessage", code)
1110 .map_err(|e| {
1111 Self::try_call_on_error(runtime, &flow, &e.to_string(), "onWebSocketMessage");
1112 e.to_string()
1113 })?;
1114
1115 let mut scope = runtime.handle_scope();
1116 let result_val = deno_core::v8::Local::new(&mut scope, result);
1117
1118 if result_val.is_undefined() || result_val.is_null() {
1119 return Ok(WebSocketMessageAction::Continue(message));
1120 }
1121
1122 if result_val.is_string() {
1123 let s = result_val.to_rust_string_lossy(&mut scope);
1124 if s == "DROP" {
1125 return Ok(WebSocketMessageAction::Drop);
1126 }
1127 }
1128
1129 let deser: Result<WebSocketMessage, _> =
1130 deno_core::serde_v8::from_v8(&mut scope, result_val);
1131 drop(scope);
1132 let modified_message = match deser {
1133 Ok(m) => m,
1134 Err(e) => {
1135 let err_str = format!("Failed to deserialize message: {}", e);
1136 Self::try_call_on_error(runtime, &flow, &err_str, "onWebSocketMessage");
1137 return Err(err_str);
1138 }
1139 };
1140
1141 Ok(WebSocketMessageAction::Continue(modified_message))
1142 }
1143
1144 fn handle_on_connect(
1145 runtime: &mut JsRuntime,
1146 conn: ConnectionInfo,
1147 ) -> Result<ConnectAction, String> {
1148 let check_code = "typeof globalThis.onConnect === 'function'";
1149 let exists = runtime
1150 .execute_script("check_onConnect", check_code)
1151 .ok()
1152 .map(|v| {
1153 let mut scope = runtime.handle_scope();
1154 deno_core::v8::Local::new(&mut scope, v).is_true()
1155 })
1156 .unwrap_or(false);
1157 if !exists {
1158 return Ok(ConnectAction::Allow);
1159 }
1160
1161 let conn_json = serde_json::json!({
1162 "id": conn.id.to_string(),
1163 "client_addr": conn.client_addr.to_string(),
1164 "server_addr": conn.server_addr.map(|a| a.to_string()),
1165 "tls_sni": conn.tls_sni,
1166 });
1167 let code = format!(
1168 "globalThis.onConnect({{}}, {})",
1169 serde_json::to_string(&conn_json).unwrap_or_default()
1170 );
1171 let result = runtime
1172 .execute_script("call_onConnect", code)
1173 .map_err(|e| {
1174 tracing::warn!("onConnect script error: {}", e);
1175 e.to_string()
1176 })?;
1177 let mut scope = runtime.handle_scope();
1178 let result_val = deno_core::v8::Local::new(&mut scope, result);
1179 if result_val.is_undefined() || result_val.is_null() {
1180 return Ok(ConnectAction::Allow);
1181 }
1182 if result_val.is_object() {
1183 let Some(obj) = result_val.to_object(&mut scope) else {
1184 return Ok(ConnectAction::Allow);
1185 };
1186 let drop_key: deno_core::v8::Local<deno_core::v8::Value> =
1187 deno_core::v8::String::new(&mut scope, "drop")
1188 .expect("v8 string")
1189 .into();
1190 if let Some(drop_val) = obj.get(&mut scope, drop_key)
1191 && drop_val.is_true()
1192 {
1193 let reason_key: deno_core::v8::Local<deno_core::v8::Value> =
1194 deno_core::v8::String::new(&mut scope, "reason")
1195 .expect("v8 string")
1196 .into();
1197 let reason = obj
1198 .get(&mut scope, reason_key)
1199 .map(|v| v.to_rust_string_lossy(&mut scope))
1200 .unwrap_or_else(|| "script onConnect drop".to_string());
1201 return Ok(ConnectAction::Drop { reason });
1202 }
1203 }
1204 Ok(ConnectAction::Allow)
1205 }
1206
1207 fn handle_on_disconnect(
1208 runtime: &mut JsRuntime,
1209 conn: ConnectionInfo,
1210 stats: ConnectionStats,
1211 ) -> Result<(), String> {
1212 let check_code = "typeof globalThis.onDisconnect === 'function'";
1213 let exists = runtime
1214 .execute_script("check_onDisconnect", check_code)
1215 .ok()
1216 .map(|v| {
1217 let mut scope = runtime.handle_scope();
1218 deno_core::v8::Local::new(&mut scope, v).is_true()
1219 })
1220 .unwrap_or(false);
1221 if !exists {
1222 return Ok(());
1223 }
1224
1225 let conn_json = serde_json::json!({
1226 "id": conn.id.to_string(),
1227 "client_addr": conn.client_addr.to_string(),
1228 "server_addr": conn.server_addr.map(|a| a.to_string()),
1229 "tls_sni": conn.tls_sni,
1230 });
1231 let stats_json = serde_json::json!({
1232 "duration_ms": stats.duration_ms,
1233 "bytes_sent": stats.bytes_sent,
1234 "bytes_received": stats.bytes_received,
1235 "flows_count": stats.flows_count,
1236 });
1237 let code = format!(
1238 "globalThis.onDisconnect({{}}, {}, {})",
1239 serde_json::to_string(&conn_json).unwrap_or_default(),
1240 serde_json::to_string(&stats_json).unwrap_or_default()
1241 );
1242 let _ = runtime.execute_script("call_onDisconnect", code);
1243 Ok(())
1244 }
1245
1246 fn handle_on_websocket_start(
1247 runtime: &mut JsRuntime,
1248 flow: Flow,
1249 ) -> Result<Option<Flow>, String> {
1250 let check_code = "typeof globalThis.onWebSocketStart === 'function'";
1251 let exists = runtime
1252 .execute_script("check_onWebSocketStart", check_code)
1253 .ok()
1254 .map(|v| {
1255 let mut scope = runtime.handle_scope();
1256 deno_core::v8::Local::new(&mut scope, v).is_true()
1257 })
1258 .unwrap_or(false);
1259 if !exists {
1260 return Ok(None);
1261 }
1262
1263 let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
1264 let code = format!("globalThis.onWebSocketStart({{}}, {})", flow_json);
1265 let result = runtime
1266 .execute_script("call_onWebSocketStart", code)
1267 .map_err(|e| e.to_string())?;
1268 let mut scope = runtime.handle_scope();
1269 let result_val = deno_core::v8::Local::new(&mut scope, result);
1270 if result_val.is_undefined() || result_val.is_null() {
1271 return Ok(None);
1272 }
1273 let deser: Flow =
1274 deno_core::serde_v8::from_v8(&mut scope, result_val).map_err(|e| e.to_string())?;
1275 Ok(Some(deser))
1276 }
1277
1278 fn handle_on_websocket_end(
1279 runtime: &mut JsRuntime,
1280 flow: Flow,
1281 close_code: u16,
1282 close_reason: String,
1283 ) -> Result<Option<Flow>, String> {
1284 let check_code = "typeof globalThis.onWebSocketEnd === 'function'";
1285 let exists = runtime
1286 .execute_script("check_onWebSocketEnd", check_code)
1287 .ok()
1288 .map(|v| {
1289 let mut scope = runtime.handle_scope();
1290 deno_core::v8::Local::new(&mut scope, v).is_true()
1291 })
1292 .unwrap_or(false);
1293 if !exists {
1294 return Ok(None);
1295 }
1296
1297 let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
1298 let reason_json =
1299 serde_json::to_string(&close_reason).unwrap_or_else(|_| "null".to_string());
1300 let code = format!(
1301 "globalThis.onWebSocketEnd({{}}, {}, {}, {})",
1302 flow_json, close_code, reason_json
1303 );
1304 let result = runtime
1305 .execute_script("call_onWebSocketEnd", code)
1306 .map_err(|e| e.to_string())?;
1307 let mut scope = runtime.handle_scope();
1308 let result_val = deno_core::v8::Local::new(&mut scope, result);
1309 if result_val.is_undefined() || result_val.is_null() {
1310 return Ok(None);
1311 }
1312 let deser: Flow =
1313 deno_core::serde_v8::from_v8(&mut scope, result_val).map_err(|e| e.to_string())?;
1314 Ok(Some(deser))
1315 }
1316
1317 fn handle_on_websocket_error(
1318 runtime: &mut JsRuntime,
1319 flow: Flow,
1320 error: String,
1321 ) -> Result<Option<Flow>, String> {
1322 let check_code = "typeof globalThis.onWebSocketError === 'function'";
1323 let exists = runtime
1324 .execute_script("check_onWebSocketError", check_code)
1325 .ok()
1326 .map(|v| {
1327 let mut scope = runtime.handle_scope();
1328 deno_core::v8::Local::new(&mut scope, v).is_true()
1329 })
1330 .unwrap_or(false);
1331 if !exists {
1332 return Ok(None);
1333 }
1334
1335 let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
1336 let error_json = serde_json::to_string(&error).unwrap_or_else(|_| "null".to_string());
1337 let code = format!(
1338 "globalThis.onWebSocketError({{}}, {}, {})",
1339 flow_json, error_json
1340 );
1341 let result = runtime
1342 .execute_script("call_onWebSocketError", code)
1343 .map_err(|e| e.to_string())?;
1344 let mut scope = runtime.handle_scope();
1345 let result_val = deno_core::v8::Local::new(&mut scope, result);
1346 if result_val.is_undefined() || result_val.is_null() {
1347 return Ok(None);
1348 }
1349 let deser: Flow =
1350 deno_core::serde_v8::from_v8(&mut scope, result_val).map_err(|e| e.to_string())?;
1351 Ok(Some(deser))
1352 }
1353}
1354
1355#[async_trait]
1356impl ScriptEngineTrait for DenoScriptEngine {
1357 async fn load_script(&mut self, script: &str) -> Result<(), BoxError> {
1358 let (tx, rx) = oneshot::channel();
1359 self.tx
1360 .send(DenoCommand::LoadScript(script.to_string(), tx))
1361 .await
1362 .map_err(|e| Box::new(e) as BoxError)?;
1363 rx.await
1364 .map_err(|e| Box::new(e) as BoxError)?
1365 .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)
1366 }
1367
1368 async fn on_request_headers(&self, flow: &mut Flow) -> Result<Option<Flow>, BoxError> {
1369 let (tx, rx) = oneshot::channel();
1370 let flow_clone = flow.clone();
1371 self.tx
1372 .send(DenoCommand::OnRequestHeaders(flow_clone, tx))
1373 .await
1374 .map_err(|e| Box::new(e) as BoxError)?;
1375 let res = rx
1376 .await
1377 .map_err(|e| Box::new(e) as BoxError)?
1378 .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
1379
1380 if let Some(new_flow) = &res {
1381 *flow = new_flow.clone();
1382 }
1383 Ok(res)
1384 }
1385
1386 async fn on_request(&self, flow: &mut Flow, body: HttpBody) -> Result<RequestAction, BoxError> {
1387 let (tx, rx) = oneshot::channel();
1388 let flow_clone = flow.clone();
1389 self.tx
1390 .send(DenoCommand::OnRequest(flow_clone, body, tx))
1391 .await
1392 .map_err(|e| Box::new(e) as BoxError)?;
1393 let (new_flow, action) = rx
1394 .await
1395 .map_err(|e| Box::new(e) as BoxError)?
1396 .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
1397
1398 if let Some(f) = new_flow {
1399 *flow = f;
1400 }
1401 Ok(action)
1402 }
1403
1404 async fn on_response_headers(&self, flow: &mut Flow) -> Result<Option<Flow>, BoxError> {
1405 let (tx, rx) = oneshot::channel();
1406 let flow_clone = flow.clone();
1407 self.tx
1408 .send(DenoCommand::OnResponseHeaders(flow_clone, tx))
1409 .await
1410 .map_err(|e| Box::new(e) as BoxError)?;
1411 let res = rx
1412 .await
1413 .map_err(|e| Box::new(e) as BoxError)?
1414 .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
1415
1416 if let Some(new_flow) = &res {
1417 *flow = new_flow.clone();
1418 }
1419 Ok(res)
1420 }
1421
1422 async fn on_response(
1423 &self,
1424 flow: &mut Flow,
1425 body: HttpBody,
1426 ) -> Result<ResponseAction, BoxError> {
1427 let (tx, rx) = oneshot::channel();
1428 let flow_clone = flow.clone();
1429 self.tx
1430 .send(DenoCommand::OnResponse(flow_clone, body, tx))
1431 .await
1432 .map_err(|e| Box::new(e) as BoxError)?;
1433 let (new_flow, action) = rx
1434 .await
1435 .map_err(|e| Box::new(e) as BoxError)?
1436 .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
1437
1438 if let Some(f) = new_flow {
1439 *flow = f;
1440 }
1441 Ok(action)
1442 }
1443
1444 async fn on_websocket_message(
1445 &self,
1446 _flow: &mut Flow,
1447 message: &mut WebSocketMessage,
1448 ) -> Result<WebSocketMessageAction, BoxError> {
1449 let (tx, rx) = oneshot::channel();
1450 let flow_clone = _flow.clone();
1451 let message_clone = message.clone();
1452 self.tx
1453 .send(DenoCommand::OnWebSocketMessage(
1454 flow_clone,
1455 message_clone,
1456 tx,
1457 ))
1458 .await
1459 .map_err(|e| Box::new(e) as BoxError)?;
1460 let res = rx
1461 .await
1462 .map_err(|e| Box::new(e) as BoxError)?
1463 .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
1464
1465 Ok(res)
1466 }
1467
1468 async fn on_connect(&self, conn: &ConnectionInfo) -> Result<ConnectAction, BoxError> {
1469 let (tx, rx) = oneshot::channel();
1470 let conn_clone = conn.clone();
1471 self.tx
1472 .send(DenoCommand::OnConnect(conn_clone, tx))
1473 .await
1474 .map_err(|e| Box::new(e) as BoxError)?;
1475 rx.await
1476 .map_err(|e| Box::new(e) as BoxError)?
1477 .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)
1478 }
1479
1480 async fn on_disconnect(
1481 &self,
1482 conn: &ConnectionInfo,
1483 stats: &ConnectionStats,
1484 ) -> Result<(), BoxError> {
1485 let (tx, rx) = oneshot::channel();
1486 let conn_clone = conn.clone();
1487 let stats_clone = stats.clone();
1488 self.tx
1489 .send(DenoCommand::OnDisconnect(conn_clone, stats_clone, tx))
1490 .await
1491 .map_err(|e| Box::new(e) as BoxError)?;
1492 rx.await
1493 .map_err(|e| Box::new(e) as BoxError)?
1494 .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)
1495 }
1496
1497 async fn on_websocket_start(&self, flow: &mut Flow) -> Result<(), BoxError> {
1498 let (tx, rx) = oneshot::channel();
1499 let flow_clone = flow.clone();
1500 self.tx
1501 .send(DenoCommand::OnWebSocketStart(flow_clone, tx))
1502 .await
1503 .map_err(|e| Box::new(e) as BoxError)?;
1504 let res = rx
1505 .await
1506 .map_err(|e| Box::new(e) as BoxError)?
1507 .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
1508
1509 if let Some(new_flow) = res {
1510 *flow = new_flow;
1511 }
1512 Ok(())
1513 }
1514
1515 async fn on_websocket_end(
1516 &self,
1517 flow: &mut Flow,
1518 close_code: u16,
1519 close_reason: &str,
1520 ) -> Result<(), BoxError> {
1521 let (tx, rx) = oneshot::channel();
1522 let flow_clone = flow.clone();
1523 let reason_owned = close_reason.to_string();
1524 self.tx
1525 .send(DenoCommand::OnWebSocketEnd(
1526 flow_clone,
1527 close_code,
1528 reason_owned,
1529 tx,
1530 ))
1531 .await
1532 .map_err(|e| Box::new(e) as BoxError)?;
1533 let res = rx
1534 .await
1535 .map_err(|e| Box::new(e) as BoxError)?
1536 .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
1537
1538 if let Some(new_flow) = res {
1539 *flow = new_flow;
1540 }
1541 Ok(())
1542 }
1543
1544 async fn on_websocket_error(&self, flow: &mut Flow, error: &str) -> Result<(), BoxError> {
1545 let (tx, rx) = oneshot::channel();
1546 let flow_clone = flow.clone();
1547 let error_owned = error.to_string();
1548 self.tx
1549 .send(DenoCommand::OnWebSocketError(flow_clone, error_owned, tx))
1550 .await
1551 .map_err(|e| Box::new(e) as BoxError)?;
1552 let res = rx
1553 .await
1554 .map_err(|e| Box::new(e) as BoxError)?
1555 .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
1556
1557 if let Some(new_flow) = res {
1558 *flow = new_flow;
1559 }
1560 Ok(())
1561 }
1562}