golem_wasi_http/wasi/
body.rs

1use crate::Error;
2use bytes::Bytes;
3use std::fmt;
4use std::fs::File;
5use std::io::{self, Cursor, Read};
6use wasi::http::types::IncomingBody;
7use wasi::io::streams::InputStream;
8use wasi::io::*;
9
10/// An asynchronous request body.
11#[derive(Debug)]
12pub struct Body {
13    kind: Option<Kind>,
14}
15
16#[allow(dead_code)]
17struct ChunkIterator<'a> {
18    bytes: &'a [u8],
19    chunk_size: usize,
20}
21
22#[allow(dead_code)]
23impl<'a> ChunkIterator<'a> {
24    fn new(bytes: &'a [u8], chunk_size: usize) -> Self {
25        ChunkIterator { bytes, chunk_size }
26    }
27}
28
29#[allow(dead_code)]
30impl<'a> Iterator for ChunkIterator<'a> {
31    type Item = &'a [u8];
32
33    fn next(&mut self) -> Option<Self::Item> {
34        if self.bytes.is_empty() {
35            return None;
36        }
37
38        let chunk_size = std::cmp::min(self.chunk_size, self.bytes.len());
39        let (chunk, rest) = self.bytes.split_at(chunk_size);
40        self.bytes = rest;
41
42        Some(chunk)
43    }
44}
45
46impl Body {
47    /// Instantiate a `Body` from a reader.
48    ///
49    /// # Note
50    ///
51    /// While allowing for many types to be used, these bodies do not have
52    /// a way to reset to the beginning and be reused. This means that when
53    /// encountering a 307 or 308 status code, instead of repeating the
54    /// request at the new location, the `Response` will be returned with
55    /// the redirect status code set.
56    pub fn new<R: Read + 'static>(reader: R) -> Body {
57        Body {
58            kind: Some(Kind::Reader(Box::from(reader), None)),
59        }
60    }
61
62    /// Create a `Body` from a `Read` where the size is known in advance
63    /// but the data should not be fully loaded into memory. This will
64    /// set the `Content-Length` header and stream from the `Read`.
65    pub fn sized<R: Read + 'static>(reader: R, len: u64) -> Body {
66        Body {
67            kind: Some(Kind::Reader(Box::from(reader), Some(len))),
68        }
69    }
70
71    /// Creates a Body that consumes the given stream
72    #[cfg(feature = "async")]
73    pub fn from_stream<S>(stream: S) -> Body
74    where
75        S: futures::stream::Stream<Item = Result<Vec<u8>, Error>> + 'static,
76    {
77        Body {
78            kind: Some(Kind::Stream(Box::pin(stream))),
79        }
80    }
81
82    /// Returns the body as a byte slice if the body is already buffered in
83    /// memory. For streamed requests this method returns `None`.
84    pub fn as_bytes(&self) -> Option<&[u8]> {
85        match self.kind {
86            Some(Kind::Reader(_, _)) => None,
87            Some(Kind::Bytes(ref bytes)) => Some(bytes.as_ref()),
88            Some(Kind::Incoming { .. }) => None,
89            #[cfg(feature = "async")]
90            Some(Kind::Stream(_)) => None,
91            None => None,
92        }
93    }
94
95    /// Converts streamed requests to their buffered equivalent and
96    /// returns a reference to the buffer. If the request is already
97    /// buffered, this has no effect.
98    ///
99    /// Be aware that for large requests this method is expensive
100    /// and may cause your program to run out of memory.
101    #[cfg(not(feature = "async"))]
102    pub fn buffer(&mut self) -> Result<&[u8], Error> {
103        match self.kind {
104            Some(Kind::Reader(ref mut reader, maybe_len)) => {
105                let mut bytes = if let Some(len) = maybe_len {
106                    Vec::with_capacity(len as usize)
107                } else {
108                    Vec::new()
109                };
110                io::copy(reader, &mut bytes).map_err(crate::error::builder)?;
111                self.kind = Some(Kind::Bytes(bytes.into()));
112                self.buffer()
113            }
114            Some(Kind::Bytes(ref bytes)) => Ok(bytes.as_ref()),
115            Some(Kind::Incoming { ref stream, .. }) => {
116                let mut body = Vec::new();
117                let mut eof = false;
118                while !eof {
119                    match stream.blocking_read(u64::MAX) {
120                        Ok(mut body_chunk) => {
121                            body.append(&mut body_chunk);
122                        }
123                        Err(streams::StreamError::Closed) => {
124                            eof = true;
125                        }
126                        Err(streams::StreamError::LastOperationFailed(err)) => {
127                            return Err(Error::new(
128                                crate::error::Kind::Body,
129                                Some(err.to_debug_string()),
130                            ));
131                        }
132                    }
133                }
134                self.kind = Some(Kind::Bytes(body.into()));
135                self.buffer()
136            }
137            None => panic!("Body has already been extracted"),
138        }
139    }
140
141    /// Converts streamed requests to their buffered equivalent and
142    /// returns a reference to the buffer. If the request is already
143    /// buffered, this has no effect.
144    ///
145    /// Be aware that for large requests this method is expensive
146    /// and may cause your program to run out of memory.
147    #[cfg(feature = "async")]
148    pub async fn buffer(&mut self) -> Result<&[u8], Error> {
149        match self.kind {
150            Some(Kind::Reader(ref mut reader, maybe_len)) => {
151                let mut bytes = if let Some(len) = maybe_len {
152                    Vec::with_capacity(len as usize)
153                } else {
154                    Vec::new()
155                };
156                io::copy(reader, &mut bytes).map_err(crate::error::builder)?;
157                self.kind = Some(Kind::Bytes(bytes.into()));
158                let Some(Kind::Bytes(bytes)) = &self.kind else {
159                    unreachable!()
160                };
161                Ok(bytes.as_ref())
162            }
163            Some(Kind::Bytes(ref bytes)) => Ok(bytes.as_ref()),
164            Some(Kind::Incoming { ref stream, .. }) => {
165                let mut body = Vec::new();
166                let mut eof = false;
167                while !eof {
168                    match stream.blocking_read(u64::MAX) {
169                        Ok(mut body_chunk) => {
170                            body.append(&mut body_chunk);
171                        }
172                        Err(streams::StreamError::Closed) => {
173                            eof = true;
174                        }
175                        Err(streams::StreamError::LastOperationFailed(err)) => {
176                            return Err(Error::new(
177                                crate::error::Kind::Body,
178                                Some(err.to_debug_string()),
179                            ));
180                        }
181                    }
182                }
183                self.kind = Some(Kind::Bytes(body.into()));
184                let Some(Kind::Bytes(bytes)) = &self.kind else {
185                    unreachable!()
186                };
187                Ok(bytes.as_ref())
188            }
189            Some(Kind::Stream(ref mut stream)) => {
190                use futures::StreamExt;
191
192                let mut bytes = Vec::new();
193
194                while let Some(chunk) = stream.next().await {
195                    match chunk {
196                        Ok(data) => bytes.extend(data),
197                        Err(err) => return Err(err),
198                    }
199                }
200
201                self.kind = Some(Kind::Bytes(Bytes::from(bytes)));
202                let Some(Kind::Bytes(bytes)) = &self.kind else {
203                    unreachable!()
204                };
205                Ok(bytes.as_ref())
206            }
207            None => panic!("Body has already been extracted"),
208        }
209    }
210
211    pub(crate) fn from_incoming(stream: InputStream, incoming_body: IncomingBody) -> Body {
212        Body {
213            kind: Some(Kind::Incoming {
214                stream,
215                incoming_body,
216            }),
217        }
218    }
219
220    pub(crate) fn into_raw_input_stream(mut self) -> (InputStream, IncomingBody) {
221        match self.kind.take() {
222            Some(Kind::Reader(_, _)) => panic!("Body is not backed up by an input stream"),
223            Some(Kind::Bytes(_)) => panic!("Body is not backed up by an input stream"),
224            Some(Kind::Incoming {
225                stream,
226                incoming_body,
227            }) => (stream, incoming_body),
228            #[cfg(feature = "async")]
229            Some(Kind::Stream(_)) => panic!("Body is not backed up by an input stream"),
230            None => panic!("Body has already been extracted"),
231        }
232    }
233
234    pub(crate) fn into_reader(mut self) -> Reader {
235        match self.kind.take() {
236            Some(Kind::Reader(r, _)) => Reader::IoRead(r),
237            Some(Kind::Bytes(b)) => Reader::Bytes(Cursor::new(b)),
238            Some(Kind::Incoming {
239                stream,
240                incoming_body,
241            }) => Reader::Wasi {
242                body_stream: stream,
243                _incoming_body: incoming_body,
244            },
245            #[cfg(feature = "async")]
246            Some(Kind::Stream(_)) => {
247                panic!("Stream cannot be converted to Reader, use into_async_reader instead")
248            }
249            None => panic!("Body has already been extracted"),
250        }
251    }
252
253    #[cfg(feature = "async")]
254    pub(crate) fn into_async_reader(mut self) -> AsyncReader {
255        if matches!(self.kind, Some(Kind::Stream(_))) {
256            match self.kind.take() {
257                Some(Kind::Stream(stream)) => AsyncReader::StreamBased { stream },
258                _ => unreachable!(),
259            }
260        } else {
261            self.into_reader().into_async()
262        }
263    }
264
265    pub(crate) fn try_clone(&self) -> Option<Body> {
266        self.kind
267            .as_ref()
268            .unwrap()
269            .try_clone()
270            .map(|kind| Body { kind: Some(kind) })
271    }
272
273    #[allow(dead_code)]
274    pub(crate) fn len(&self) -> Option<u64> {
275        match self.kind.as_ref()? {
276            Kind::Reader(_, len) => *len,
277            Kind::Bytes(bytes) => Some(bytes.len() as u64),
278            Kind::Incoming { .. } => None,
279            #[cfg(feature = "async")]
280            Kind::Stream(_) => None,
281        }
282    }
283
284    #[cfg(not(feature = "async"))]
285    pub(crate) fn write(
286        mut self,
287        mut f: impl FnMut(&[u8]) -> Result<(), Error>,
288    ) -> Result<(), Error> {
289        match self.kind.take().expect("Body has already been extracted") {
290            Kind::Reader(mut reader, _) => {
291                let mut buf = [0; 4 * 1024];
292                loop {
293                    let len = reader.read(&mut buf).map_err(crate::error::builder)?;
294                    if len == 0 {
295                        break;
296                    }
297                    f(&buf[..len])?;
298                }
299                Ok(())
300            }
301            Kind::Bytes(bytes) => ChunkIterator::new(&bytes, 4 * 1024).try_for_each(&mut f),
302            Kind::Incoming { ref stream, .. } => {
303                let mut eof = false;
304                while !eof {
305                    match stream.blocking_read(u64::MAX) {
306                        Ok(body_chunk) => {
307                            f(&body_chunk)?;
308                        }
309                        Err(streams::StreamError::Closed) => {
310                            eof = true;
311                        }
312                        Err(streams::StreamError::LastOperationFailed(err)) => {
313                            return Err(Error::new(
314                                crate::error::Kind::Body,
315                                Some(err.to_debug_string()),
316                            ));
317                        }
318                    }
319                }
320                Ok(())
321            }
322        }
323    }
324
325    #[cfg(feature = "async")]
326    pub(crate) async fn async_write(
327        self,
328        mut target: impl crate::wasi::async_client::AsyncWriteTarget,
329    ) -> Result<(), Error> {
330        use async_iterator::Iterator;
331
332        let mut async_reader = self.into_async_reader();
333        while let Some(chunk) = async_reader.next().await {
334            match chunk {
335                Ok(data) => target.write(&data).await?,
336                Err(err) => return Err(err),
337            }
338        }
339        Ok(())
340    }
341}
342
343enum Kind {
344    Reader(Box<dyn Read>, Option<u64>),
345    Bytes(Bytes),
346    Incoming {
347        stream: InputStream,
348        incoming_body: IncomingBody,
349    },
350    #[cfg(feature = "async")]
351    Stream(std::pin::Pin<Box<dyn futures::stream::Stream<Item = Result<Vec<u8>, Error>>>>),
352}
353
354impl Kind {
355    fn try_clone(&self) -> Option<Kind> {
356        match self {
357            Kind::Reader(..) => None,
358            Kind::Bytes(v) => Some(Kind::Bytes(v.clone())),
359            Kind::Incoming { .. } => None,
360            #[cfg(feature = "async")]
361            Kind::Stream(_) => None,
362        }
363    }
364}
365
366impl From<Vec<u8>> for Body {
367    #[inline]
368    fn from(v: Vec<u8>) -> Body {
369        Body {
370            kind: Some(Kind::Bytes(v.into())),
371        }
372    }
373}
374
375impl From<String> for Body {
376    #[inline]
377    fn from(s: String) -> Body {
378        s.into_bytes().into()
379    }
380}
381
382impl From<&'static [u8]> for Body {
383    #[inline]
384    fn from(s: &'static [u8]) -> Body {
385        Body {
386            kind: Some(Kind::Bytes(Bytes::from_static(s))),
387        }
388    }
389}
390
391impl From<&'static str> for Body {
392    #[inline]
393    fn from(s: &'static str) -> Body {
394        s.as_bytes().into()
395    }
396}
397
398impl From<File> for Body {
399    #[inline]
400    fn from(f: File) -> Body {
401        let len = f.metadata().map(|m| m.len()).ok();
402        Body {
403            kind: Some(Kind::Reader(Box::new(f), len)),
404        }
405    }
406}
407
408impl From<Bytes> for Body {
409    #[inline]
410    fn from(b: Bytes) -> Body {
411        Body {
412            kind: Some(Kind::Bytes(b)),
413        }
414    }
415}
416
417impl fmt::Debug for Kind {
418    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
419        match *self {
420            Kind::Reader(_, ref v) => f
421                .debug_struct("Reader")
422                .field("length", &DebugLength(v))
423                .finish(),
424            Kind::Bytes(ref v) => fmt::Debug::fmt(v, f),
425            Kind::Incoming { .. } => f.debug_struct("Incoming").finish(),
426            #[cfg(feature = "async")]
427            Kind::Stream(_) => f.debug_struct("Stream").finish(),
428        }
429    }
430}
431
432struct DebugLength<'a>(&'a Option<u64>);
433
434impl<'a> fmt::Debug for DebugLength<'a> {
435    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
436        match *self.0 {
437            Some(ref len) => fmt::Debug::fmt(len, f),
438            None => f.write_str("Unknown"),
439        }
440    }
441}
442
443pub(crate) enum Reader {
444    IoRead(Box<dyn Read>),
445    Bytes(Cursor<Bytes>),
446    Wasi {
447        body_stream: InputStream,
448        _incoming_body: IncomingBody,
449    },
450}
451
452impl Reader {
453    #[cfg(feature = "async")]
454    pub(crate) fn into_async(self) -> AsyncReader {
455        AsyncReader::ReaderBased { reader: self }
456    }
457}
458
459impl Read for Reader {
460    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
461        match self {
462            Reader::IoRead(rdr) => rdr.read(buf),
463            Reader::Bytes(rdr) => rdr.read(buf),
464            Reader::Wasi { body_stream, .. } => match body_stream.blocking_read(buf.len() as u64) {
465                Ok(body_chunk) => {
466                    let len = body_chunk.len();
467                    buf[..len].copy_from_slice(&body_chunk);
468                    Ok(len)
469                }
470                Err(streams::StreamError::Closed) => Ok(0),
471                Err(streams::StreamError::LastOperationFailed(err)) => {
472                    Err(io::Error::other(err.to_debug_string()))
473                }
474            },
475        }
476    }
477}
478
479#[cfg(feature = "async")]
480pub(crate) enum AsyncReader {
481    ReaderBased {
482        reader: Reader,
483    },
484    StreamBased {
485        stream: std::pin::Pin<Box<dyn futures::stream::Stream<Item = Result<Vec<u8>, Error>>>>,
486    },
487}
488
489#[cfg(feature = "async")]
490impl async_iterator::Iterator for AsyncReader {
491    type Item = Result<Vec<u8>, Error>;
492
493    async fn next(&mut self) -> Option<Self::Item> {
494        use futures::StreamExt;
495
496        const CHUNK_SIZE: usize = 4096; // Define a constant chunk size
497
498        match self {
499            Self::ReaderBased { reader } => match reader {
500                Reader::IoRead(rdr) => {
501                    let mut buf = vec![0; CHUNK_SIZE];
502                    rdr.read(&mut buf)
503                        .map(|n| {
504                            if n == 0 {
505                                None
506                            } else {
507                                Some(buf[..n].to_vec())
508                            }
509                        })
510                        .map_err(|err| Error::new(crate::error::Kind::Body, Some(err)))
511                        .transpose()
512                }
513                Reader::Bytes(rdr) => {
514                    let mut buf = vec![0; CHUNK_SIZE];
515                    rdr.read(&mut buf)
516                        .map(|n| {
517                            if n == 0 {
518                                None
519                            } else {
520                                Some(buf[..n].to_vec())
521                            }
522                        })
523                        .map_err(|err| Error::new(crate::error::Kind::Body, Some(err)))
524                        .transpose()
525                }
526                Reader::Wasi { body_stream, .. } => {
527                    let pollable = body_stream.subscribe();
528                    wstd::runtime::AsyncPollable::new(pollable).wait_for().await;
529
530                    let mut buf = vec![0; CHUNK_SIZE];
531                    let result = body_stream.read(&mut buf);
532                    match result {
533                        Ok(n) => {
534                            if n == 0 {
535                                None
536                            } else {
537                                Some(Ok(buf))
538                            }
539                        }
540                        Err(err) => Some(Err(Error::new(
541                            crate::error::Kind::Body,
542                            Some(err.to_string()),
543                        ))),
544                    }
545                }
546            },
547            AsyncReader::StreamBased { stream } => stream.next().await,
548        }
549    }
550}
551
552#[cfg(test)]
553mod tests {
554    use super::*;
555
556    #[test]
557    fn test_chunk_iterator_regular_slices() {
558        let data = b"hello world".to_vec();
559        let mut chunk_iter = ChunkIterator::new(&data, 5);
560
561        assert_eq!(chunk_iter.next(), Some(&b"hello"[..]));
562        assert_eq!(chunk_iter.next(), Some(&b" worl"[..]));
563        assert_eq!(chunk_iter.next(), Some(&b"d"[..]));
564        assert_eq!(chunk_iter.next(), None);
565    }
566
567    #[test]
568    fn test_chunk_iterator_only_one_chunk() {
569        let data = b"hello world".to_vec();
570        let mut chunk_iter = ChunkIterator::new(&data, 11);
571
572        assert_eq!(chunk_iter.next(), Some(&b"hello world"[..]));
573        assert_eq!(chunk_iter.next(), None);
574    }
575
576    #[test]
577    fn test_chunk_iterator_single_byte() {
578        let data = b"x".to_vec();
579        let mut chunk_iter = ChunkIterator::new(&data, 2);
580
581        assert_eq!(chunk_iter.next(), Some(&b"x"[..]));
582        assert_eq!(chunk_iter.next(), None);
583    }
584
585    #[test]
586    fn test_chunk_iterator_empty_slice() {
587        let data: Vec<u8> = vec![];
588        let mut chunk_iter = ChunkIterator::new(&data, 5);
589
590        assert_eq!(chunk_iter.next(), None);
591    }
592}