relay_core_script/
streams.rs1use std::borrow::Cow;
2use std::io;
3use std::pin::Pin;
4use std::rc::Rc;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7use deno_core::{Resource, AsyncResult, BufView};
8use bytes::Bytes;
9use relay_core_lib::interceptor::{HttpBody, BoxError};
10use futures_util::Stream;
11use tokio_util::io::StreamReader;
12use http_body::{Body, Frame};
13use http_body_util::{BodyExt, StreamBody};
14use tokio::io::AsyncReadExt;
15use tokio::sync::Mutex;
16
17pub struct BodyStreamAdapter {
18 body: HttpBody,
19}
20
21impl BodyStreamAdapter {
22 pub fn new(body: HttpBody) -> Self {
23 Self { body }
24 }
25}
26
27impl Stream for BodyStreamAdapter {
28 type Item = io::Result<Bytes>;
29
30 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
31 loop {
32 match Pin::new(&mut self.body).poll_frame(cx) {
33 Poll::Ready(Some(Ok(frame))) => {
34 if let Ok(data) = frame.into_data() {
35 return Poll::Ready(Some(Ok(data)));
36 } else {
37 continue;
39 }
40 }
41 Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(io::Error::other(e)))),
42 Poll::Ready(None) => return Poll::Ready(None),
43 Poll::Pending => return Poll::Pending,
44 }
45 }
46 }
47}
48
49type SharedReader = Arc<Mutex<StreamReader<BodyStreamAdapter, Bytes>>>;
50
51pub struct HttpBodyResource {
52 reader: SharedReader,
53}
54
55impl HttpBodyResource {
56 pub fn new(body: HttpBody) -> Self {
57 let adapter = BodyStreamAdapter::new(body);
58 let reader = StreamReader::new(adapter);
59 Self {
60 reader: Arc::new(Mutex::new(reader)),
61 }
62 }
63
64 pub fn clone_reader(&self) -> SharedReader {
65 self.reader.clone()
66 }
67}
68
69impl Resource for HttpBodyResource {
70 fn name(&self) -> Cow<'_, str> {
71 "httpBody".into()
72 }
73
74 fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
75 let reader = self.reader.clone();
76 Box::pin(async move {
77 let mut reader = reader.lock().await;
78 let mut buf = vec![0; limit];
79 let n = reader.read(&mut buf).await?;
80 buf.truncate(n);
81 Ok(BufView::from(buf))
82 })
83 }
84}
85
86pub fn create_body_from_resource(resource: &HttpBodyResource) -> HttpBody {
87 let reader = resource.clone_reader();
88 let stream = futures_util::stream::unfold(reader, |reader| async move {
89 let mut locked = reader.lock().await;
90 let mut buf = vec![0; 4096];
91 match locked.read(&mut buf).await {
92 Ok(0) => None, Ok(n) => {
94 buf.truncate(n);
95 Some((Ok(Frame::data(Bytes::from(buf))), reader.clone()))
96 },
97 Err(e) => Some((Err(Box::new(e) as BoxError), reader.clone())),
98 }
99 });
100
101 StreamBody::new(stream).boxed()
103}
104