Skip to main content

relay_core_script/
streams.rs

1use std::borrow::Cow;
2use std::io;
3use std::pin::Pin;
4use std::rc::Rc;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7use deno_core::{Resource, AsyncResult, BufView};
8use bytes::Bytes;
9use relay_core_lib::interceptor::{HttpBody, BoxError};
10use futures_util::Stream;
11use tokio_util::io::StreamReader;
12use http_body::{Body, Frame};
13use http_body_util::{BodyExt, StreamBody};
14use tokio::io::AsyncReadExt;
15use tokio::sync::Mutex;
16
17pub struct BodyStreamAdapter {
18    body: HttpBody,
19}
20
21impl BodyStreamAdapter {
22    pub fn new(body: HttpBody) -> Self {
23        Self { body }
24    }
25}
26
27impl Stream for BodyStreamAdapter {
28    type Item = io::Result<Bytes>;
29
30    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
31        loop {
32            match Pin::new(&mut self.body).poll_frame(cx) {
33                Poll::Ready(Some(Ok(frame))) => {
34                    if let Ok(data) = frame.into_data() {
35                        return Poll::Ready(Some(Ok(data)));
36                    } else {
37                        // Skip non-data frames (trailers)
38                        continue;
39                    }
40                }
41                Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(io::Error::other(e)))),
42                Poll::Ready(None) => return Poll::Ready(None),
43                Poll::Pending => return Poll::Pending,
44            }
45        }
46    }
47}
48
49type SharedReader = Arc<Mutex<StreamReader<BodyStreamAdapter, Bytes>>>;
50
51pub struct HttpBodyResource {
52    reader: SharedReader,
53}
54
55impl HttpBodyResource {
56    pub fn new(body: HttpBody) -> Self {
57        let adapter = BodyStreamAdapter::new(body);
58        let reader = StreamReader::new(adapter);
59        Self {
60            reader: Arc::new(Mutex::new(reader)),
61        }
62    }
63
64    pub fn clone_reader(&self) -> SharedReader {
65        self.reader.clone()
66    }
67}
68
69impl Resource for HttpBodyResource {
70    fn name(&self) -> Cow<'_, str> {
71        "httpBody".into()
72    }
73
74    fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
75        let reader = self.reader.clone();
76        Box::pin(async move {
77            let mut reader = reader.lock().await;
78            let mut buf = vec![0; limit];
79            let n = reader.read(&mut buf).await?;
80            buf.truncate(n);
81            Ok(BufView::from(buf))
82        })
83    }
84}
85
86pub fn create_body_from_resource(resource: &HttpBodyResource) -> HttpBody {
87    let reader = resource.clone_reader();
88    let stream = futures_util::stream::unfold(reader, |reader| async move {
89        let mut locked = reader.lock().await;
90        let mut buf = vec![0; 4096];
91        match locked.read(&mut buf).await {
92            Ok(0) => None, // EOF
93            Ok(n) => {
94                buf.truncate(n);
95                Some((Ok(Frame::data(Bytes::from(buf))), reader.clone()))
96            },
97            Err(e) => Some((Err(Box::new(e) as BoxError), reader.clone())),
98        }
99    });
100    
101    // StreamBody expects Frame<D, E>.
102    StreamBody::new(stream).boxed()
103}
104