relay_core_script/
streams.rs1use bytes::Bytes;
2use deno_core::{AsyncResult, BufView, Resource};
3use futures_util::Stream;
4use http_body::{Body, Frame};
5use http_body_util::{BodyExt, StreamBody};
6use relay_core_lib::interceptor::{BoxError, HttpBody};
7use std::borrow::Cow;
8use std::io;
9use std::pin::Pin;
10use std::rc::Rc;
11use std::sync::Arc;
12use std::task::{Context, Poll};
13use tokio::io::AsyncReadExt;
14use tokio::sync::Mutex;
15use tokio_util::io::StreamReader;
16
17pub struct BodyStreamAdapter {
18 body: HttpBody,
19}
20
21impl BodyStreamAdapter {
22 pub fn new(body: HttpBody) -> Self {
23 Self { body }
24 }
25}
26
27impl Stream for BodyStreamAdapter {
28 type Item = io::Result<Bytes>;
29
30 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
31 loop {
32 match Pin::new(&mut self.body).poll_frame(cx) {
33 Poll::Ready(Some(Ok(frame))) => {
34 if let Ok(data) = frame.into_data() {
35 return Poll::Ready(Some(Ok(data)));
36 } else {
37 continue;
39 }
40 }
41 Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(io::Error::other(e)))),
42 Poll::Ready(None) => return Poll::Ready(None),
43 Poll::Pending => return Poll::Pending,
44 }
45 }
46 }
47}
48
49type SharedReader = Arc<Mutex<StreamReader<BodyStreamAdapter, Bytes>>>;
50
51pub struct HttpBodyResource {
52 reader: SharedReader,
53}
54
55impl HttpBodyResource {
56 pub fn new(body: HttpBody) -> Self {
57 let adapter = BodyStreamAdapter::new(body);
58 let reader = StreamReader::new(adapter);
59 Self {
60 reader: Arc::new(Mutex::new(reader)),
61 }
62 }
63
64 pub fn clone_reader(&self) -> SharedReader {
65 self.reader.clone()
66 }
67}
68
69impl Resource for HttpBodyResource {
70 fn name(&self) -> Cow<'_, str> {
71 "httpBody".into()
72 }
73
74 fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
75 let reader = self.reader.clone();
76 Box::pin(async move {
77 let mut reader = reader.lock().await;
78 let mut buf = vec![0; limit];
79 let n = reader.read(&mut buf).await?;
80 buf.truncate(n);
81 Ok(BufView::from(buf))
82 })
83 }
84}
85
86pub fn create_body_from_resource(resource: &HttpBodyResource) -> HttpBody {
87 let reader = resource.clone_reader();
88 let stream = futures_util::stream::unfold(reader, |reader| async move {
89 let mut locked = reader.lock().await;
90 let mut buf = vec![0; 4096];
91 match locked.read(&mut buf).await {
92 Ok(0) => None, Ok(n) => {
94 buf.truncate(n);
95 Some((Ok(Frame::data(Bytes::from(buf))), reader.clone()))
96 }
97 Err(e) => Some((Err(Box::new(e) as BoxError), reader.clone())),
98 }
99 });
100
101 StreamBody::new(stream).boxed()
103}