1use crate::Error;
2use bytes::Bytes;
3use std::fmt;
4use std::fs::File;
5use std::io::{self, Cursor, Read};
6use wasi::http::types::IncomingBody;
7use wasi::io::streams::InputStream;
8use wasi::io::*;
9
10#[derive(Debug)]
12pub struct Body {
13 kind: Option<Kind>,
14}
15
16#[allow(dead_code)]
17struct ChunkIterator<'a> {
18 bytes: &'a [u8],
19 chunk_size: usize,
20}
21
22#[allow(dead_code)]
23impl<'a> ChunkIterator<'a> {
24 fn new(bytes: &'a [u8], chunk_size: usize) -> Self {
25 ChunkIterator { bytes, chunk_size }
26 }
27}
28
29#[allow(dead_code)]
30impl<'a> Iterator for ChunkIterator<'a> {
31 type Item = &'a [u8];
32
33 fn next(&mut self) -> Option<Self::Item> {
34 if self.bytes.is_empty() {
35 return None;
36 }
37
38 let chunk_size = std::cmp::min(self.chunk_size, self.bytes.len());
39 let (chunk, rest) = self.bytes.split_at(chunk_size);
40 self.bytes = rest;
41
42 Some(chunk)
43 }
44}
45
46impl Body {
47 pub fn new<R: Read + 'static>(reader: R) -> Body {
57 Body {
58 kind: Some(Kind::Reader(Box::from(reader), None)),
59 }
60 }
61
62 pub fn sized<R: Read + 'static>(reader: R, len: u64) -> Body {
66 Body {
67 kind: Some(Kind::Reader(Box::from(reader), Some(len))),
68 }
69 }
70
71 #[cfg(feature = "async")]
73 pub fn from_stream<S>(stream: S) -> Body
74 where
75 S: futures::stream::Stream<Item = Result<Vec<u8>, Error>> + 'static,
76 {
77 Body {
78 kind: Some(Kind::Stream(Box::pin(stream))),
79 }
80 }
81
82 pub fn as_bytes(&self) -> Option<&[u8]> {
85 match self.kind {
86 Some(Kind::Reader(_, _)) => None,
87 Some(Kind::Bytes(ref bytes)) => Some(bytes.as_ref()),
88 Some(Kind::Incoming { .. }) => None,
89 #[cfg(feature = "async")]
90 Some(Kind::Stream(_)) => None,
91 None => None,
92 }
93 }
94
95 #[cfg(not(feature = "async"))]
102 pub fn buffer(&mut self) -> Result<&[u8], Error> {
103 match self.kind {
104 Some(Kind::Reader(ref mut reader, maybe_len)) => {
105 let mut bytes = if let Some(len) = maybe_len {
106 Vec::with_capacity(len as usize)
107 } else {
108 Vec::new()
109 };
110 io::copy(reader, &mut bytes).map_err(crate::error::builder)?;
111 self.kind = Some(Kind::Bytes(bytes.into()));
112 self.buffer()
113 }
114 Some(Kind::Bytes(ref bytes)) => Ok(bytes.as_ref()),
115 Some(Kind::Incoming { ref stream, .. }) => {
116 let mut body = Vec::new();
117 let mut eof = false;
118 while !eof {
119 match stream.blocking_read(u64::MAX) {
120 Ok(mut body_chunk) => {
121 body.append(&mut body_chunk);
122 }
123 Err(streams::StreamError::Closed) => {
124 eof = true;
125 }
126 Err(streams::StreamError::LastOperationFailed(err)) => {
127 return Err(Error::new(
128 crate::error::Kind::Body,
129 Some(err.to_debug_string()),
130 ));
131 }
132 }
133 }
134 self.kind = Some(Kind::Bytes(body.into()));
135 self.buffer()
136 }
137 None => panic!("Body has already been extracted"),
138 }
139 }
140
141 #[cfg(feature = "async")]
148 pub async fn buffer(&mut self) -> Result<&[u8], Error> {
149 match self.kind {
150 Some(Kind::Reader(ref mut reader, maybe_len)) => {
151 let mut bytes = if let Some(len) = maybe_len {
152 Vec::with_capacity(len as usize)
153 } else {
154 Vec::new()
155 };
156 io::copy(reader, &mut bytes).map_err(crate::error::builder)?;
157 self.kind = Some(Kind::Bytes(bytes.into()));
158 let Some(Kind::Bytes(bytes)) = &self.kind else {
159 unreachable!()
160 };
161 Ok(bytes.as_ref())
162 }
163 Some(Kind::Bytes(ref bytes)) => Ok(bytes.as_ref()),
164 Some(Kind::Incoming { ref stream, .. }) => {
165 let mut body = Vec::new();
166 let mut eof = false;
167 while !eof {
168 match stream.blocking_read(u64::MAX) {
169 Ok(mut body_chunk) => {
170 body.append(&mut body_chunk);
171 }
172 Err(streams::StreamError::Closed) => {
173 eof = true;
174 }
175 Err(streams::StreamError::LastOperationFailed(err)) => {
176 return Err(Error::new(
177 crate::error::Kind::Body,
178 Some(err.to_debug_string()),
179 ));
180 }
181 }
182 }
183 self.kind = Some(Kind::Bytes(body.into()));
184 let Some(Kind::Bytes(bytes)) = &self.kind else {
185 unreachable!()
186 };
187 Ok(bytes.as_ref())
188 }
189 Some(Kind::Stream(ref mut stream)) => {
190 use futures::StreamExt;
191
192 let mut bytes = Vec::new();
193
194 while let Some(chunk) = stream.next().await {
195 match chunk {
196 Ok(data) => bytes.extend(data),
197 Err(err) => return Err(err),
198 }
199 }
200
201 self.kind = Some(Kind::Bytes(Bytes::from(bytes)));
202 let Some(Kind::Bytes(bytes)) = &self.kind else {
203 unreachable!()
204 };
205 Ok(bytes.as_ref())
206 }
207 None => panic!("Body has already been extracted"),
208 }
209 }
210
211 pub(crate) fn from_incoming(stream: InputStream, incoming_body: IncomingBody) -> Body {
212 Body {
213 kind: Some(Kind::Incoming {
214 stream,
215 incoming_body,
216 }),
217 }
218 }
219
220 pub(crate) fn into_raw_input_stream(mut self) -> (InputStream, IncomingBody) {
221 match self.kind.take() {
222 Some(Kind::Reader(_, _)) => panic!("Body is not backed up by an input stream"),
223 Some(Kind::Bytes(_)) => panic!("Body is not backed up by an input stream"),
224 Some(Kind::Incoming {
225 stream,
226 incoming_body,
227 }) => (stream, incoming_body),
228 #[cfg(feature = "async")]
229 Some(Kind::Stream(_)) => panic!("Body is not backed up by an input stream"),
230 None => panic!("Body has already been extracted"),
231 }
232 }
233
234 pub(crate) fn into_reader(mut self) -> Reader {
235 match self.kind.take() {
236 Some(Kind::Reader(r, _)) => Reader::IoRead(r),
237 Some(Kind::Bytes(b)) => Reader::Bytes(Cursor::new(b)),
238 Some(Kind::Incoming {
239 stream,
240 incoming_body,
241 }) => Reader::Wasi {
242 body_stream: stream,
243 _incoming_body: incoming_body,
244 },
245 #[cfg(feature = "async")]
246 Some(Kind::Stream(_)) => {
247 panic!("Stream cannot be converted to Reader, use into_async_reader instead")
248 }
249 None => panic!("Body has already been extracted"),
250 }
251 }
252
253 #[cfg(feature = "async")]
254 pub(crate) fn into_async_reader(mut self) -> AsyncReader {
255 if matches!(self.kind, Some(Kind::Stream(_))) {
256 match self.kind.take() {
257 Some(Kind::Stream(stream)) => AsyncReader::StreamBased { stream },
258 _ => unreachable!(),
259 }
260 } else {
261 self.into_reader().into_async()
262 }
263 }
264
265 pub(crate) fn try_clone(&self) -> Option<Body> {
266 self.kind
267 .as_ref()
268 .unwrap()
269 .try_clone()
270 .map(|kind| Body { kind: Some(kind) })
271 }
272
273 #[allow(dead_code)]
274 pub(crate) fn len(&self) -> Option<u64> {
275 match self.kind.as_ref()? {
276 Kind::Reader(_, len) => *len,
277 Kind::Bytes(bytes) => Some(bytes.len() as u64),
278 Kind::Incoming { .. } => None,
279 #[cfg(feature = "async")]
280 Kind::Stream(_) => None,
281 }
282 }
283
284 #[cfg(not(feature = "async"))]
285 pub(crate) fn write(
286 mut self,
287 mut f: impl FnMut(&[u8]) -> Result<(), Error>,
288 ) -> Result<(), Error> {
289 match self.kind.take().expect("Body has already been extracted") {
290 Kind::Reader(mut reader, _) => {
291 let mut buf = [0; 4 * 1024];
292 loop {
293 let len = reader.read(&mut buf).map_err(crate::error::builder)?;
294 if len == 0 {
295 break;
296 }
297 f(&buf[..len])?;
298 }
299 Ok(())
300 }
301 Kind::Bytes(bytes) => ChunkIterator::new(&bytes, 4 * 1024).try_for_each(&mut f),
302 Kind::Incoming { ref stream, .. } => {
303 let mut eof = false;
304 while !eof {
305 match stream.blocking_read(u64::MAX) {
306 Ok(body_chunk) => {
307 f(&body_chunk)?;
308 }
309 Err(streams::StreamError::Closed) => {
310 eof = true;
311 }
312 Err(streams::StreamError::LastOperationFailed(err)) => {
313 return Err(Error::new(
314 crate::error::Kind::Body,
315 Some(err.to_debug_string()),
316 ));
317 }
318 }
319 }
320 Ok(())
321 }
322 }
323 }
324
325 #[cfg(feature = "async")]
326 pub(crate) async fn async_write(
327 self,
328 mut target: impl crate::wasi::async_client::AsyncWriteTarget,
329 ) -> Result<(), Error> {
330 use async_iterator::Iterator;
331
332 let mut async_reader = self.into_async_reader();
333 while let Some(chunk) = async_reader.next().await {
334 match chunk {
335 Ok(data) => target.write(&data).await?,
336 Err(err) => return Err(err),
337 }
338 }
339 Ok(())
340 }
341}
342
343enum Kind {
344 Reader(Box<dyn Read>, Option<u64>),
345 Bytes(Bytes),
346 Incoming {
347 stream: InputStream,
348 incoming_body: IncomingBody,
349 },
350 #[cfg(feature = "async")]
351 Stream(std::pin::Pin<Box<dyn futures::stream::Stream<Item = Result<Vec<u8>, Error>>>>),
352}
353
354impl Kind {
355 fn try_clone(&self) -> Option<Kind> {
356 match self {
357 Kind::Reader(..) => None,
358 Kind::Bytes(v) => Some(Kind::Bytes(v.clone())),
359 Kind::Incoming { .. } => None,
360 #[cfg(feature = "async")]
361 Kind::Stream(_) => None,
362 }
363 }
364}
365
366impl From<Vec<u8>> for Body {
367 #[inline]
368 fn from(v: Vec<u8>) -> Body {
369 Body {
370 kind: Some(Kind::Bytes(v.into())),
371 }
372 }
373}
374
375impl From<String> for Body {
376 #[inline]
377 fn from(s: String) -> Body {
378 s.into_bytes().into()
379 }
380}
381
382impl From<&'static [u8]> for Body {
383 #[inline]
384 fn from(s: &'static [u8]) -> Body {
385 Body {
386 kind: Some(Kind::Bytes(Bytes::from_static(s))),
387 }
388 }
389}
390
391impl From<&'static str> for Body {
392 #[inline]
393 fn from(s: &'static str) -> Body {
394 s.as_bytes().into()
395 }
396}
397
398impl From<File> for Body {
399 #[inline]
400 fn from(f: File) -> Body {
401 let len = f.metadata().map(|m| m.len()).ok();
402 Body {
403 kind: Some(Kind::Reader(Box::new(f), len)),
404 }
405 }
406}
407
408impl From<Bytes> for Body {
409 #[inline]
410 fn from(b: Bytes) -> Body {
411 Body {
412 kind: Some(Kind::Bytes(b)),
413 }
414 }
415}
416
417impl fmt::Debug for Kind {
418 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
419 match *self {
420 Kind::Reader(_, ref v) => f
421 .debug_struct("Reader")
422 .field("length", &DebugLength(v))
423 .finish(),
424 Kind::Bytes(ref v) => fmt::Debug::fmt(v, f),
425 Kind::Incoming { .. } => f.debug_struct("Incoming").finish(),
426 #[cfg(feature = "async")]
427 Kind::Stream(_) => f.debug_struct("Stream").finish(),
428 }
429 }
430}
431
432struct DebugLength<'a>(&'a Option<u64>);
433
434impl<'a> fmt::Debug for DebugLength<'a> {
435 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
436 match *self.0 {
437 Some(ref len) => fmt::Debug::fmt(len, f),
438 None => f.write_str("Unknown"),
439 }
440 }
441}
442
443pub(crate) enum Reader {
444 IoRead(Box<dyn Read>),
445 Bytes(Cursor<Bytes>),
446 Wasi {
447 body_stream: InputStream,
448 _incoming_body: IncomingBody,
449 },
450}
451
452impl Reader {
453 #[cfg(feature = "async")]
454 pub(crate) fn into_async(self) -> AsyncReader {
455 AsyncReader::ReaderBased { reader: self }
456 }
457}
458
459impl Read for Reader {
460 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
461 match self {
462 Reader::IoRead(rdr) => rdr.read(buf),
463 Reader::Bytes(rdr) => rdr.read(buf),
464 Reader::Wasi { body_stream, .. } => match body_stream.blocking_read(buf.len() as u64) {
465 Ok(body_chunk) => {
466 let len = body_chunk.len();
467 buf[..len].copy_from_slice(&body_chunk);
468 Ok(len)
469 }
470 Err(streams::StreamError::Closed) => Ok(0),
471 Err(streams::StreamError::LastOperationFailed(err)) => {
472 Err(io::Error::other(err.to_debug_string()))
473 }
474 },
475 }
476 }
477}
478
479#[cfg(feature = "async")]
480pub(crate) enum AsyncReader {
481 ReaderBased {
482 reader: Reader,
483 },
484 StreamBased {
485 stream: std::pin::Pin<Box<dyn futures::stream::Stream<Item = Result<Vec<u8>, Error>>>>,
486 },
487}
488
489#[cfg(feature = "async")]
490impl async_iterator::Iterator for AsyncReader {
491 type Item = Result<Vec<u8>, Error>;
492
493 async fn next(&mut self) -> Option<Self::Item> {
494 use futures::StreamExt;
495
496 const CHUNK_SIZE: usize = 4096; match self {
499 Self::ReaderBased { reader } => match reader {
500 Reader::IoRead(rdr) => {
501 let mut buf = vec![0; CHUNK_SIZE];
502 rdr.read(&mut buf)
503 .map(|n| {
504 if n == 0 {
505 None
506 } else {
507 Some(buf[..n].to_vec())
508 }
509 })
510 .map_err(|err| Error::new(crate::error::Kind::Body, Some(err)))
511 .transpose()
512 }
513 Reader::Bytes(rdr) => {
514 let mut buf = vec![0; CHUNK_SIZE];
515 rdr.read(&mut buf)
516 .map(|n| {
517 if n == 0 {
518 None
519 } else {
520 Some(buf[..n].to_vec())
521 }
522 })
523 .map_err(|err| Error::new(crate::error::Kind::Body, Some(err)))
524 .transpose()
525 }
526 Reader::Wasi { body_stream, .. } => {
527 let pollable = body_stream.subscribe();
528 wstd::runtime::AsyncPollable::new(pollable).wait_for().await;
529
530 let mut buf = vec![0; CHUNK_SIZE];
531 let result = body_stream.read(&mut buf);
532 match result {
533 Ok(n) => {
534 if n == 0 {
535 None
536 } else {
537 Some(Ok(buf))
538 }
539 }
540 Err(err) => Some(Err(Error::new(
541 crate::error::Kind::Body,
542 Some(err.to_string()),
543 ))),
544 }
545 }
546 },
547 AsyncReader::StreamBased { stream } => stream.next().await,
548 }
549 }
550}
551
552#[cfg(test)]
553mod tests {
554 use super::*;
555
556 #[test]
557 fn test_chunk_iterator_regular_slices() {
558 let data = b"hello world".to_vec();
559 let mut chunk_iter = ChunkIterator::new(&data, 5);
560
561 assert_eq!(chunk_iter.next(), Some(&b"hello"[..]));
562 assert_eq!(chunk_iter.next(), Some(&b" worl"[..]));
563 assert_eq!(chunk_iter.next(), Some(&b"d"[..]));
564 assert_eq!(chunk_iter.next(), None);
565 }
566
567 #[test]
568 fn test_chunk_iterator_only_one_chunk() {
569 let data = b"hello world".to_vec();
570 let mut chunk_iter = ChunkIterator::new(&data, 11);
571
572 assert_eq!(chunk_iter.next(), Some(&b"hello world"[..]));
573 assert_eq!(chunk_iter.next(), None);
574 }
575
576 #[test]
577 fn test_chunk_iterator_single_byte() {
578 let data = b"x".to_vec();
579 let mut chunk_iter = ChunkIterator::new(&data, 2);
580
581 assert_eq!(chunk_iter.next(), Some(&b"x"[..]));
582 assert_eq!(chunk_iter.next(), None);
583 }
584
585 #[test]
586 fn test_chunk_iterator_empty_slice() {
587 let data: Vec<u8> = vec![];
588 let mut chunk_iter = ChunkIterator::new(&data, 5);
589
590 assert_eq!(chunk_iter.next(), None);
591 }
592}