actix_http/body/
sized_stream.rs

1use std::{
2    error::Error as StdError,
3    pin::Pin,
4    task::{Context, Poll},
5};
6
7use bytes::Bytes;
8use futures_core::{ready, Stream};
9use pin_project_lite::pin_project;
10
11use super::{BodySize, MessageBody};
12
13pin_project! {
14    /// Known sized streaming response wrapper.
15    ///
16    /// This body implementation should be used if total size of stream is known. Data is sent as-is
17    /// without using chunked transfer encoding.
18    pub struct SizedStream<S> {
19        size: u64,
20        #[pin]
21        stream: S,
22    }
23}
24
25impl<S, E> SizedStream<S>
26where
27    S: Stream<Item = Result<Bytes, E>>,
28    E: Into<Box<dyn StdError>> + 'static,
29{
30    #[inline]
31    pub fn new(size: u64, stream: S) -> Self {
32        SizedStream { size, stream }
33    }
34}
35
36// TODO: from_infallible method
37
38impl<S, E> MessageBody for SizedStream<S>
39where
40    S: Stream<Item = Result<Bytes, E>>,
41    E: Into<Box<dyn StdError>> + 'static,
42{
43    type Error = E;
44
45    #[inline]
46    fn size(&self) -> BodySize {
47        BodySize::Sized(self.size)
48    }
49
50    /// Attempts to pull out the next value of the underlying [`Stream`].
51    ///
52    /// Empty values are skipped to prevent [`SizedStream`]'s transmission being
53    /// ended on a zero-length chunk, but rather proceed until the underlying
54    /// [`Stream`] ends.
55    fn poll_next(
56        mut self: Pin<&mut Self>,
57        cx: &mut Context<'_>,
58    ) -> Poll<Option<Result<Bytes, Self::Error>>> {
59        loop {
60            let stream = self.as_mut().project().stream;
61
62            let chunk = match ready!(stream.poll_next(cx)) {
63                Some(Ok(ref bytes)) if bytes.is_empty() => continue,
64                val => val,
65            };
66
67            return Poll::Ready(chunk);
68        }
69    }
70}
71
72#[cfg(test)]
73mod tests {
74    use std::convert::Infallible;
75
76    use actix_rt::pin;
77    use actix_utils::future::poll_fn;
78    use futures_util::stream;
79    use static_assertions::{assert_impl_all, assert_not_impl_any};
80
81    use super::*;
82    use crate::body::to_bytes;
83
84    assert_impl_all!(SizedStream<stream::Empty<Result<Bytes, crate::Error>>>: MessageBody);
85    assert_impl_all!(SizedStream<stream::Empty<Result<Bytes, &'static str>>>: MessageBody);
86    assert_impl_all!(SizedStream<stream::Repeat<Result<Bytes, &'static str>>>: MessageBody);
87    assert_impl_all!(SizedStream<stream::Empty<Result<Bytes, Infallible>>>: MessageBody);
88    assert_impl_all!(SizedStream<stream::Repeat<Result<Bytes, Infallible>>>: MessageBody);
89
90    assert_not_impl_any!(SizedStream<stream::Empty<Bytes>>: MessageBody);
91    assert_not_impl_any!(SizedStream<stream::Repeat<Bytes>>: MessageBody);
92    // crate::Error is not Clone
93    assert_not_impl_any!(SizedStream<stream::Repeat<Result<Bytes, crate::Error>>>: MessageBody);
94
95    #[actix_rt::test]
96    async fn skips_empty_chunks() {
97        let body = SizedStream::new(
98            2,
99            stream::iter(
100                ["1", "", "2"]
101                    .iter()
102                    .map(|&v| Ok::<_, Infallible>(Bytes::from(v))),
103            ),
104        );
105
106        pin!(body);
107
108        assert_eq!(
109            poll_fn(|cx| body.as_mut().poll_next(cx))
110                .await
111                .unwrap()
112                .ok(),
113            Some(Bytes::from("1")),
114        );
115
116        assert_eq!(
117            poll_fn(|cx| body.as_mut().poll_next(cx))
118                .await
119                .unwrap()
120                .ok(),
121            Some(Bytes::from("2")),
122        );
123    }
124
125    #[actix_rt::test]
126    async fn read_to_bytes() {
127        let body = SizedStream::new(
128            2,
129            stream::iter(
130                ["1", "", "2"]
131                    .iter()
132                    .map(|&v| Ok::<_, Infallible>(Bytes::from(v))),
133            ),
134        );
135
136        assert_eq!(to_bytes(body).await.ok(), Some(Bytes::from("12")));
137    }
138
139    #[actix_rt::test]
140    async fn stream_string_error() {
141        // `&'static str` does not impl `Error`
142        // but it does impl `Into<Box<dyn Error>>`
143
144        let body = SizedStream::new(0, stream::once(async { Err("stringy error") }));
145        assert_eq!(to_bytes(body).await, Ok(Bytes::new()));
146
147        let body = SizedStream::new(1, stream::once(async { Err("stringy error") }));
148        assert!(matches!(to_bytes(body).await, Err("stringy error")));
149    }
150
151    #[actix_rt::test]
152    async fn stream_boxed_error() {
153        // `Box<dyn Error>` does not impl `Error`
154        // but it does impl `Into<Box<dyn Error>>`
155
156        let body = SizedStream::new(
157            0,
158            stream::once(async { Err(Box::<dyn StdError>::from("stringy error")) }),
159        );
160        assert_eq!(to_bytes(body).await.unwrap(), Bytes::new());
161
162        let body = SizedStream::new(
163            1,
164            stream::once(async { Err(Box::<dyn StdError>::from("stringy error")) }),
165        );
166        assert_eq!(
167            to_bytes(body).await.unwrap_err().to_string(),
168            "stringy error"
169        );
170    }
171}