1use crate::engine_trait::ScriptEngineTrait;
2use crate::streams::HttpBodyResource;
3use async_trait::async_trait;
4use base64::Engine as _;
5use bytes::Bytes;
6use deno_core::{
7 Extension, JsRuntime, Op, OpState, ResourceId, RuntimeOptions, error::AnyError, op2,
8};
9use http_body_util::{BodyExt, Full};
10use relay_core_api::flow::{Flow, WebSocketMessage};
11use relay_core_lib::interceptor::{
12 BoxError, HttpBody, RequestAction, ResponseAction, WebSocketMessageAction,
13};
14use std::cell::RefCell;
15use std::rc::Rc;
16use std::thread;
17use tokio::sync::{mpsc, oneshot};
18
19#[op2(fast)]
20fn op_log(#[string] msg: String) {
21 println!("[Deno] {}", msg);
22}
23
24#[op2(async)]
25#[serde]
26async fn op_read_body(
27 state: Rc<RefCell<OpState>>,
28 #[smi] rid: ResourceId,
29 #[smi] limit: usize,
30) -> Result<Vec<u8>, AnyError> {
31 let resource = {
32 let state = state.borrow();
33 state.resource_table.get_any(rid)?
34 };
35 let view = resource.read(limit).await?;
36 Ok(view.to_vec())
37}
38
39#[op2(fast)]
40fn op_close_body(state: &mut OpState, #[smi] rid: ResourceId) {
41 state.resource_table.take_any(rid).ok();
42}
43
44enum DenoCommand {
45 LoadScript(String, oneshot::Sender<Result<(), String>>),
46 OnRequestHeaders(Flow, oneshot::Sender<Result<Option<Flow>, String>>),
47 OnRequest(
48 Flow,
49 HttpBody,
50 oneshot::Sender<Result<(Option<Flow>, RequestAction), String>>,
51 ),
52 OnResponseHeaders(Flow, oneshot::Sender<Result<Option<Flow>, String>>),
53 OnResponse(
54 Flow,
55 HttpBody,
56 oneshot::Sender<Result<(Option<Flow>, ResponseAction), String>>,
57 ),
58 OnWebSocketMessage(
59 Flow,
60 WebSocketMessage,
61 oneshot::Sender<Result<WebSocketMessageAction, String>>,
62 ),
63}
64
65#[derive(Clone)]
66pub struct DenoScriptEngine {
67 tx: mpsc::Sender<DenoCommand>,
68}
69
70impl Default for DenoScriptEngine {
71 fn default() -> Self {
72 Self::new()
73 }
74}
75
76impl DenoScriptEngine {
77 pub fn new() -> Self {
78 let (tx, mut rx) = mpsc::channel(32);
79
80 thread::spawn(move || {
81 let rt = tokio::runtime::Builder::new_current_thread()
82 .enable_all()
83 .build()
84 .unwrap();
85
86 rt.block_on(async move {
87 let ext = Extension {
88 name: "relay_core",
89 ops: std::borrow::Cow::Borrowed(&[
90 op_log::DECL,
91 op_read_body::DECL,
92 op_close_body::DECL,
93 ]),
94 ..Default::default()
95 };
96
97 let mut js_runtime = JsRuntime::new(RuntimeOptions {
98 extensions: vec![ext],
99 ..Default::default()
100 });
101
102 let bootstrap = r#"
104 globalThis.console = {
105 log: (...args) => {
106 const msg = args.map(arg => {
107 if (typeof arg === 'object') {
108 try {
109 return JSON.stringify(arg);
110 } catch {
111 return String(arg);
112 }
113 }
114 return String(arg);
115 }).join(" ");
116 Deno.core.ops.op_log(msg);
117 }
118 };
119
120 class RelayBody {
121 constructor(rid) {
122 this.rid = rid;
123 }
124
125 async read(limit) {
126 return await Deno.core.ops.op_read_body(this.rid, limit || 65536);
127 }
128
129 close() {
130 Deno.core.ops.op_close_body(this.rid);
131 }
132
133 async text() {
134 const bytes = await this.read(10 * 1024 * 1024); // 10MB limit
135 return new TextDecoder().decode(bytes);
136 }
137
138 async json() {
139 const txt = await this.text();
140 return JSON.parse(txt);
141 }
142 }
143 globalThis.RelayBody = RelayBody;
144
145 globalThis.relay = {
146 log: globalThis.console.log,
147 // Future: add more helpers like base64, etc.
148 };
149 "#;
150 js_runtime.execute_script("bootstrap", bootstrap).unwrap();
151
152 while let Some(cmd) = rx.recv().await {
153 match cmd {
154 DenoCommand::LoadScript(script, resp) => {
155 let res = js_runtime.execute_script("<anon>", script);
156 let res = if let Err(e) = res {
157 Err(e.to_string())
158 } else {
159 js_runtime
160 .run_event_loop(Default::default())
161 .await
162 .map(|_| ())
163 .map_err(|e| e.to_string())
164 };
165 let _ = resp.send(res);
166 }
167 DenoCommand::OnRequestHeaders(flow, resp) => {
168 let res = Self::handle_on_request_headers(&mut js_runtime, flow);
169 let _ = resp.send(res);
170 }
171 DenoCommand::OnRequest(flow, body, resp) => {
172 let res = Self::handle_on_request(&mut js_runtime, flow, body).await;
173 let _ = resp.send(res);
174 }
175 DenoCommand::OnResponseHeaders(flow, resp) => {
176 let res = Self::handle_on_response_headers(&mut js_runtime, flow);
177 let _ = resp.send(res);
178 }
179 DenoCommand::OnResponse(flow, body, resp) => {
180 let res = Self::handle_on_response(&mut js_runtime, flow, body).await;
181 let _ = resp.send(res);
182 }
183 DenoCommand::OnWebSocketMessage(flow, message, resp) => {
184 let res =
185 Self::handle_on_websocket_message(&mut js_runtime, flow, message);
186 let _ = resp.send(res);
187 }
188 }
189 }
190 });
191 });
192
193 Self { tx }
194 }
195
196 fn handle_on_request_headers(
197 runtime: &mut JsRuntime,
198 flow: Flow,
199 ) -> Result<Option<Flow>, String> {
200 let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
201 let check_code = "typeof globalThis.onRequestHeaders === 'function'";
202 let exists = runtime
203 .execute_script("check_onRequestHeaders", check_code)
204 .map_err(|e| e.to_string())?;
205 {
206 let scope = &mut runtime.handle_scope();
207 let exists_val = deno_core::v8::Local::new(scope, exists);
208 if !exists_val.is_true() {
209 return Ok(None);
210 }
211 }
212 let code = format!("globalThis.onRequestHeaders({{}}, {})", flow_json);
213 let result = runtime
214 .execute_script("call_onRequestHeaders", code)
215 .map_err(|e| e.to_string())?;
216 let scope = &mut runtime.handle_scope();
217 let result_val = deno_core::v8::Local::new(scope, result);
218 if result_val.is_undefined() || result_val.is_null() {
219 return Ok(None);
220 }
221 let modified_flow: Flow = deno_core::serde_v8::from_v8(scope, result_val)
222 .map_err(|e| format!("Failed to deserialize flow: {}", e))?;
223 Ok(Some(modified_flow))
224 }
225
226 fn handle_on_response_headers(
227 runtime: &mut JsRuntime,
228 flow: Flow,
229 ) -> Result<Option<Flow>, String> {
230 let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
231 let check_code = "typeof globalThis.onResponseHeaders === 'function'";
232 let exists = runtime
233 .execute_script("check_onResponseHeaders", check_code)
234 .map_err(|e| e.to_string())?;
235 {
236 let scope = &mut runtime.handle_scope();
237 let exists_val = deno_core::v8::Local::new(scope, exists);
238 if !exists_val.is_true() {
239 return Ok(None);
240 }
241 }
242 let code = format!("globalThis.onResponseHeaders({{}}, {})", flow_json);
243 let result = runtime
244 .execute_script("call_onResponseHeaders", code)
245 .map_err(|e| e.to_string())?;
246 let scope = &mut runtime.handle_scope();
247 let result_val = deno_core::v8::Local::new(scope, result);
248 if result_val.is_undefined() || result_val.is_null() {
249 return Ok(None);
250 }
251 let modified_flow: Flow = deno_core::serde_v8::from_v8(scope, result_val)
252 .map_err(|e| format!("Failed to deserialize flow: {}", e))?;
253 Ok(Some(modified_flow))
254 }
255
256 async fn handle_on_request(
257 runtime: &mut JsRuntime,
258 flow: Flow,
259 body: HttpBody,
260 ) -> Result<(Option<Flow>, RequestAction), String> {
261 let resource = HttpBodyResource::new(body);
262 let rid = {
263 let op_state_rc = runtime.op_state();
264 let mut state = op_state_rc.borrow_mut();
265 state.resource_table.add(resource)
266 };
267
268 let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
269
270 let check_code = "typeof globalThis.onRequest === 'function'";
271 let exists = runtime
272 .execute_script("check_onRequest", check_code)
273 .map_err(|e| e.to_string())?;
274
275 let exists_bool = {
276 let scope = &mut runtime.handle_scope();
277 let exists_val = deno_core::v8::Local::new(scope, exists);
278 exists_val.is_true()
279 };
280
281 if !exists_bool {
282 let resource = {
284 let op_state_rc = runtime.op_state();
285 let mut state = op_state_rc.borrow_mut();
286 state.resource_table.take::<HttpBodyResource>(rid).ok()
287 };
288 if let Some(res) = resource {
289 let body = crate::streams::create_body_from_resource(&res);
290 return Ok((None, RequestAction::Continue(body)));
291 } else {
292 return Ok((
293 None,
294 RequestAction::Continue(
295 http_body_util::Empty::new()
296 .map_err(|_| -> BoxError { unreachable!() })
297 .boxed(),
298 ),
299 ));
300 }
301 }
302
303 let code = format!(
304 "globalThis.onRequest(new RelayBody({}), {})",
305 rid, flow_json
306 );
307 let result = runtime
308 .execute_script("call_onRequest", code)
309 .map_err(|e| e.to_string())?;
310
311 let result = runtime.resolve(result).await.map_err(|e| e.to_string())?;
312
313 let (is_empty, modified_flow) = {
314 let scope = &mut runtime.handle_scope();
315 let result_val = deno_core::v8::Local::new(scope, result);
316
317 if result_val.is_undefined() || result_val.is_null() {
318 (true, None)
319 } else {
320 let flow: Flow = deno_core::serde_v8::from_v8(scope, result_val)
321 .map_err(|e| format!("Failed to deserialize flow: {}", e))?;
322 (false, Some(flow))
323 }
324 };
325
326 if is_empty {
327 let resource = {
329 let op_state_rc = runtime.op_state();
330 let mut state = op_state_rc.borrow_mut();
331 state.resource_table.take::<HttpBodyResource>(rid).ok()
332 };
333 if let Some(res) = resource {
334 let body = crate::streams::create_body_from_resource(&res);
335 return Ok((None, RequestAction::Continue(body)));
336 } else {
337 return Ok((
338 None,
339 RequestAction::Continue(
340 http_body_util::Empty::new()
341 .map_err(|_| -> BoxError { unreachable!() })
342 .boxed(),
343 ),
344 ));
345 }
346 }
347
348 let modified_flow = modified_flow.unwrap();
349
350 let resource = {
351 let op_state_rc = runtime.op_state();
352 let mut state = op_state_rc.borrow_mut();
353 state.resource_table.take::<HttpBodyResource>(rid).ok()
354 };
355
356 let new_body: HttpBody = if let Some(res) = resource {
357 let has_new_body = if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer
360 {
361 http.request
362 .body
363 .as_ref()
364 .map(|b| !b.content.is_empty())
365 .unwrap_or(false)
366 } else {
367 false
368 };
369
370 if has_new_body {
371 if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
374 if let Some(b) = &http.request.body {
375 let bytes: Bytes = if b.encoding == "base64" {
376 base64::engine::general_purpose::STANDARD
377 .decode(&b.content)
378 .unwrap_or_default()
379 .into()
380 } else {
381 Bytes::from(b.content.clone())
382 };
383 Full::new(bytes)
384 .map_err(|e| -> BoxError { e.into() })
385 .boxed()
386 } else {
387 http_body_util::Empty::new()
388 .map_err(|_| -> BoxError { unreachable!() })
389 .boxed()
390 }
391 } else {
392 http_body_util::Empty::new()
393 .map_err(|_| -> BoxError { unreachable!() })
394 .boxed()
395 }
396 } else {
397 crate::streams::create_body_from_resource(&res)
399 }
400 } else if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
401 if let Some(b) = &http.request.body {
404 let bytes: Bytes = if b.encoding == "base64" {
405 base64::engine::general_purpose::STANDARD
406 .decode(&b.content)
407 .unwrap_or_default()
408 .into()
409 } else {
410 Bytes::from(b.content.clone())
411 };
412 Full::new(bytes)
413 .map_err(|e| -> BoxError { e.into() })
414 .boxed()
415 } else {
416 http_body_util::Empty::new()
417 .map_err(|_| -> BoxError { unreachable!() })
418 .boxed()
419 }
420 } else {
421 http_body_util::Empty::new()
422 .map_err(|_| -> BoxError { unreachable!() })
423 .boxed()
424 };
425
426 Ok((Some(modified_flow), RequestAction::Continue(new_body)))
427 }
428
429 async fn handle_on_response(
430 runtime: &mut JsRuntime,
431 flow: Flow,
432 body: HttpBody,
433 ) -> Result<(Option<Flow>, ResponseAction), String> {
434 let resource = HttpBodyResource::new(body);
435 let rid = {
436 let op_state_rc = runtime.op_state();
437 let mut state = op_state_rc.borrow_mut();
438 state.resource_table.add(resource)
439 };
440
441 let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
442
443 let check_code = "typeof globalThis.onResponse === 'function'";
444 let exists = runtime
445 .execute_script("check_onResponse", check_code)
446 .map_err(|e| e.to_string())?;
447
448 let exists_bool = {
449 let scope = &mut runtime.handle_scope();
450 let exists_val = deno_core::v8::Local::new(scope, exists);
451 exists_val.is_true()
452 };
453
454 if !exists_bool {
455 let resource = {
456 let op_state_rc = runtime.op_state();
457 let mut state = op_state_rc.borrow_mut();
458 state.resource_table.take::<HttpBodyResource>(rid).ok()
459 };
460 if let Some(res) = resource {
461 let body = crate::streams::create_body_from_resource(&res);
462 return Ok((None, ResponseAction::Continue(body)));
463 } else {
464 return Ok((
465 None,
466 ResponseAction::Continue(
467 http_body_util::Empty::new()
468 .map_err(|_| -> BoxError { unreachable!() })
469 .boxed(),
470 ),
471 ));
472 }
473 }
474
475 let code = format!(
476 "globalThis.onResponse(new RelayBody({}), {})",
477 rid, flow_json
478 );
479 let result = runtime
480 .execute_script("call_onResponse", code)
481 .map_err(|e| e.to_string())?;
482 let result = runtime.resolve(result).await.map_err(|e| e.to_string())?;
483
484 let (is_empty, modified_flow) = {
485 let scope = &mut runtime.handle_scope();
486 let result_val = deno_core::v8::Local::new(scope, result);
487
488 if result_val.is_undefined() || result_val.is_null() {
489 (true, None)
490 } else {
491 let flow: Flow = deno_core::serde_v8::from_v8(scope, result_val)
492 .map_err(|e| format!("Failed to deserialize flow: {}", e))?;
493 (false, Some(flow))
494 }
495 };
496
497 if is_empty {
498 let resource = {
499 let op_state_rc = runtime.op_state();
500 let mut state = op_state_rc.borrow_mut();
501 state.resource_table.take::<HttpBodyResource>(rid).ok()
502 };
503 if let Some(res) = resource {
504 let body = crate::streams::create_body_from_resource(&res);
505 return Ok((None, ResponseAction::Continue(body)));
506 } else {
507 return Ok((
508 None,
509 ResponseAction::Continue(
510 http_body_util::Empty::new()
511 .map_err(|_| -> BoxError { unreachable!() })
512 .boxed(),
513 ),
514 ));
515 }
516 }
517
518 let modified_flow = modified_flow.unwrap();
519
520 let resource = {
521 let op_state_rc = runtime.op_state();
522 let mut state = op_state_rc.borrow_mut();
523 state.resource_table.take::<HttpBodyResource>(rid).ok()
524 };
525
526 let new_body: HttpBody = if let Some(res) = resource {
527 let has_new_body = if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer
528 {
529 http.response
530 .as_ref()
531 .and_then(|r| r.body.as_ref())
532 .map(|b| !b.content.is_empty())
533 .unwrap_or(false)
534 } else {
535 false
536 };
537
538 if has_new_body {
539 if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
540 if let Some(resp) = &http.response {
541 if let Some(b) = &resp.body {
542 let bytes: Bytes = if b.encoding == "base64" {
543 base64::engine::general_purpose::STANDARD
544 .decode(&b.content)
545 .unwrap_or_default()
546 .into()
547 } else {
548 Bytes::from(b.content.clone())
549 };
550 Full::new(bytes)
551 .map_err(|e| -> BoxError { e.into() })
552 .boxed()
553 } else {
554 http_body_util::Empty::new()
555 .map_err(|_| -> BoxError { unreachable!() })
556 .boxed()
557 }
558 } else {
559 http_body_util::Empty::new()
560 .map_err(|_| -> BoxError { unreachable!() })
561 .boxed()
562 }
563 } else {
564 http_body_util::Empty::new()
565 .map_err(|_| -> BoxError { unreachable!() })
566 .boxed()
567 }
568 } else {
569 crate::streams::create_body_from_resource(&res)
570 }
571 } else if let relay_core_api::flow::Layer::Http(http) = &modified_flow.layer {
572 if let Some(resp) = &http.response {
573 if let Some(b) = &resp.body {
574 let bytes: Bytes = if b.encoding == "base64" {
575 base64::engine::general_purpose::STANDARD
576 .decode(&b.content)
577 .unwrap_or_default()
578 .into()
579 } else {
580 Bytes::from(b.content.clone())
581 };
582 Full::new(bytes)
583 .map_err(|e| -> BoxError { e.into() })
584 .boxed()
585 } else {
586 http_body_util::Empty::new()
587 .map_err(|_| -> BoxError { unreachable!() })
588 .boxed()
589 }
590 } else {
591 http_body_util::Empty::new()
592 .map_err(|_| -> BoxError { unreachable!() })
593 .boxed()
594 }
595 } else {
596 http_body_util::Empty::new()
597 .map_err(|_| -> BoxError { unreachable!() })
598 .boxed()
599 };
600
601 Ok((Some(modified_flow), ResponseAction::Continue(new_body)))
602 }
603
604 fn handle_on_websocket_message(
605 runtime: &mut JsRuntime,
606 flow: Flow,
607 message: WebSocketMessage,
608 ) -> Result<WebSocketMessageAction, String> {
609 let flow_json = serde_json::to_string(&flow).map_err(|e| e.to_string())?;
610 let message_json = serde_json::to_string(&message).map_err(|e| e.to_string())?;
611
612 let check_code = "typeof globalThis.onWebSocketMessage === 'function'";
613 let exists = runtime
614 .execute_script("check_onWebSocketMessage", check_code)
615 .map_err(|e| e.to_string())?;
616 {
617 let scope = &mut runtime.handle_scope();
618 let exists_val = deno_core::v8::Local::new(scope, exists);
619 if !exists_val.is_true() {
620 return Ok(WebSocketMessageAction::Continue(message));
621 }
622 }
623
624 let code = format!(
625 "globalThis.onWebSocketMessage({{}}, {}, {})",
626 flow_json, message_json
627 );
628 let result = runtime
629 .execute_script("call_onWebSocketMessage", code)
630 .map_err(|e| e.to_string())?;
631
632 let scope = &mut runtime.handle_scope();
633 let result_val = deno_core::v8::Local::new(scope, result);
634
635 if result_val.is_undefined() || result_val.is_null() {
636 return Ok(WebSocketMessageAction::Continue(message));
637 }
638
639 if result_val.is_string() {
641 let s = result_val.to_rust_string_lossy(scope);
642 if s == "DROP" {
643 return Ok(WebSocketMessageAction::Drop);
644 }
645 }
646
647 let modified_message: WebSocketMessage = deno_core::serde_v8::from_v8(scope, result_val)
648 .map_err(|e| format!("Failed to deserialize message: {}", e))?;
649
650 Ok(WebSocketMessageAction::Continue(modified_message))
651 }
652}
653
654#[async_trait]
655impl ScriptEngineTrait for DenoScriptEngine {
656 async fn load_script(&mut self, script: &str) -> Result<(), BoxError> {
657 let (tx, rx) = oneshot::channel();
658 self.tx
659 .send(DenoCommand::LoadScript(script.to_string(), tx))
660 .await
661 .map_err(|e| Box::new(e) as BoxError)?;
662 rx.await
663 .map_err(|e| Box::new(e) as BoxError)?
664 .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)
665 }
666
667 async fn on_request_headers(&self, flow: &mut Flow) -> Result<Option<Flow>, BoxError> {
668 let (tx, rx) = oneshot::channel();
669 let flow_clone = flow.clone();
670 self.tx
671 .send(DenoCommand::OnRequestHeaders(flow_clone, tx))
672 .await
673 .map_err(|e| Box::new(e) as BoxError)?;
674 let res = rx
675 .await
676 .map_err(|e| Box::new(e) as BoxError)?
677 .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
678
679 if let Some(new_flow) = &res {
680 *flow = new_flow.clone();
681 }
682 Ok(res)
683 }
684
685 async fn on_request(&self, flow: &mut Flow, body: HttpBody) -> Result<RequestAction, BoxError> {
686 let (tx, rx) = oneshot::channel();
687 let flow_clone = flow.clone();
688 self.tx
689 .send(DenoCommand::OnRequest(flow_clone, body, tx))
690 .await
691 .map_err(|e| Box::new(e) as BoxError)?;
692 let (new_flow, action) = rx
693 .await
694 .map_err(|e| Box::new(e) as BoxError)?
695 .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
696
697 if let Some(f) = new_flow {
698 *flow = f;
699 }
700 Ok(action)
701 }
702
703 async fn on_response_headers(&self, flow: &mut Flow) -> Result<Option<Flow>, BoxError> {
704 let (tx, rx) = oneshot::channel();
705 let flow_clone = flow.clone();
706 self.tx
707 .send(DenoCommand::OnResponseHeaders(flow_clone, tx))
708 .await
709 .map_err(|e| Box::new(e) as BoxError)?;
710 let res = rx
711 .await
712 .map_err(|e| Box::new(e) as BoxError)?
713 .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
714
715 if let Some(new_flow) = &res {
716 *flow = new_flow.clone();
717 }
718 Ok(res)
719 }
720
721 async fn on_response(
722 &self,
723 flow: &mut Flow,
724 body: HttpBody,
725 ) -> Result<ResponseAction, BoxError> {
726 let (tx, rx) = oneshot::channel();
727 let flow_clone = flow.clone();
728 self.tx
729 .send(DenoCommand::OnResponse(flow_clone, body, tx))
730 .await
731 .map_err(|e| Box::new(e) as BoxError)?;
732 let (new_flow, action) = rx
733 .await
734 .map_err(|e| Box::new(e) as BoxError)?
735 .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
736
737 if let Some(f) = new_flow {
738 *flow = f;
739 }
740 Ok(action)
741 }
742
743 async fn on_websocket_message(
744 &self,
745 _flow: &mut Flow,
746 message: &mut WebSocketMessage,
747 ) -> Result<WebSocketMessageAction, BoxError> {
748 let (tx, rx) = oneshot::channel();
749 let flow_clone = _flow.clone();
750 let message_clone = message.clone();
751 self.tx
752 .send(DenoCommand::OnWebSocketMessage(
753 flow_clone,
754 message_clone,
755 tx,
756 ))
757 .await
758 .map_err(|e| Box::new(e) as BoxError)?;
759 let res = rx
760 .await
761 .map_err(|e| Box::new(e) as BoxError)?
762 .map_err(|e| Box::new(std::io::Error::other(e)) as BoxError)?;
763
764 Ok(res)
765 }
766}