Skip to main content

dav_server/
body.rs

1//! Definitions for the Request and Response bodies.
2
3use std::error::Error as StdError;
4use std::io;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use bytes::{Buf, Bytes};
9use futures_util::stream::Stream;
10use http_body::Body as HttpBody;
11use pin_project_lite::pin_project;
12
13use crate::async_stream::AsyncStream;
14
15/// Body is returned by the webdav handler, and implements both `Stream`
16/// and `http_body::Body`.
17pub struct Body {
18    pub inner: BodyType,
19}
20
21pub enum BodyType {
22    Bytes(Option<Bytes>),
23    AsyncStream(AsyncStream<Bytes, io::Error>),
24    Empty,
25}
26
27impl Body {
28    /// Return an empty body.
29    pub fn empty() -> Body {
30        Body {
31            inner: BodyType::Empty,
32        }
33    }
34}
35
36impl Stream for Body {
37    type Item = io::Result<Bytes>;
38
39    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
40        match self.inner {
41            BodyType::Bytes(ref mut strm) => Poll::Ready(strm.take().map(Ok)),
42            BodyType::AsyncStream(ref mut strm) => {
43                let strm = Pin::new(strm);
44                strm.poll_next(cx)
45            }
46            BodyType::Empty => Poll::Ready(None),
47        }
48    }
49}
50
51impl HttpBody for Body {
52    type Data = Bytes;
53    type Error = io::Error;
54
55    fn poll_frame(
56        self: Pin<&mut Self>,
57        cx: &mut Context<'_>,
58    ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
59        self.poll_next(cx).map_ok(http_body::Frame::data)
60    }
61}
62
63impl From<String> for Body {
64    fn from(t: String) -> Body {
65        Body {
66            inner: BodyType::Bytes(Some(Bytes::from(t))),
67        }
68    }
69}
70
71impl From<&str> for Body {
72    fn from(t: &str) -> Body {
73        Body {
74            inner: BodyType::Bytes(Some(Bytes::from(t.to_string()))),
75        }
76    }
77}
78
79impl From<Bytes> for Body {
80    fn from(t: Bytes) -> Body {
81        Body {
82            inner: BodyType::Bytes(Some(t)),
83        }
84    }
85}
86
87impl From<AsyncStream<Bytes, io::Error>> for Body {
88    fn from(s: AsyncStream<Bytes, io::Error>) -> Body {
89        Body {
90            inner: BodyType::AsyncStream(s),
91        }
92    }
93}
94
95pin_project! {
96    //
97    // A struct that contains a Stream, and implements http_body::Body.
98    //
99    pub(crate) struct StreamBody<B> {
100        #[pin]
101        body: B,
102    }
103}
104
105impl<ReqBody, ReqData, ReqError> HttpBody for StreamBody<ReqBody>
106where
107    ReqData: Buf + Send,
108    ReqError: StdError + Send + Sync + 'static,
109    ReqBody: Stream<Item = Result<ReqData, ReqError>>,
110{
111    type Data = ReqData;
112    type Error = ReqError;
113
114    fn poll_frame(
115        self: Pin<&mut Self>,
116        cx: &mut Context<'_>,
117    ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
118        let this = self.project();
119        this.body.poll_next(cx).map_ok(http_body::Frame::data)
120    }
121}
122
123impl<ReqBody, ReqData, ReqError> StreamBody<ReqBody>
124where
125    ReqData: Buf + Send,
126    ReqError: StdError + Send + Sync + 'static,
127    ReqBody: Stream<Item = Result<ReqData, ReqError>>,
128{
129    pub fn new(body: ReqBody) -> StreamBody<ReqBody> {
130        StreamBody { body }
131    }
132}