awc/responses/
response.rs

1use std::{
2    cell::{Ref, RefCell, RefMut},
3    fmt, mem,
4    pin::Pin,
5    task::{Context, Poll},
6    time::{Duration, Instant},
7};
8
9use actix_http::{
10    error::PayloadError, header::HeaderMap, BoxedPayloadStream, Extensions, HttpMessage,
11    Payload, ResponseHead, StatusCode, Version,
12};
13use actix_rt::time::{sleep, Sleep};
14use bytes::Bytes;
15use futures_core::Stream;
16use pin_project_lite::pin_project;
17use serde::de::DeserializeOwned;
18
19#[cfg(feature = "cookies")]
20use crate::cookie::{Cookie, ParseError as CookieParseError};
21
22use super::{JsonBody, ResponseBody, ResponseTimeout};
23
24pin_project! {
25    /// Client Response
26    pub struct ClientResponse<S = BoxedPayloadStream> {
27        pub(crate) head: ResponseHead,
28        #[pin]
29        pub(crate) payload: Payload<S>,
30        pub(crate) timeout: ResponseTimeout,
31        pub(crate) extensions: RefCell<Extensions>,
32
33    }
34}
35
36impl<S> ClientResponse<S> {
37    /// Create new Request instance
38    pub(crate) fn new(head: ResponseHead, payload: Payload<S>) -> Self {
39        ClientResponse {
40            head,
41            payload,
42            timeout: ResponseTimeout::default(),
43            extensions: RefCell::new(Extensions::new()),
44        }
45    }
46
47    #[inline]
48    pub(crate) fn head(&self) -> &ResponseHead {
49        &self.head
50    }
51
52    /// Read the Request Version.
53    #[inline]
54    pub fn version(&self) -> Version {
55        self.head().version
56    }
57
58    /// Get the status from the server.
59    #[inline]
60    pub fn status(&self) -> StatusCode {
61        self.head().status
62    }
63
64    #[inline]
65    /// Returns request's headers.
66    pub fn headers(&self) -> &HeaderMap {
67        &self.head().headers
68    }
69
70    /// Map the current body type to another using a closure. Returns a new response.
71    ///
72    /// Closure receives the response head and the current body type.
73    pub fn map_body<F, U>(mut self, f: F) -> ClientResponse<U>
74    where
75        F: FnOnce(&mut ResponseHead, Payload<S>) -> Payload<U>,
76    {
77        let payload = f(&mut self.head, self.payload);
78
79        ClientResponse {
80            payload,
81            head: self.head,
82            timeout: self.timeout,
83            extensions: self.extensions,
84        }
85    }
86
87    /// Set a timeout duration for [`ClientResponse`](self::ClientResponse).
88    ///
89    /// This duration covers the duration of processing the response body stream
90    /// and would end it as timeout error when deadline met.
91    ///
92    /// Disabled by default.
93    pub fn timeout(self, dur: Duration) -> Self {
94        let timeout = match self.timeout {
95            ResponseTimeout::Disabled(Some(mut timeout))
96            | ResponseTimeout::Enabled(mut timeout) => match Instant::now().checked_add(dur) {
97                Some(deadline) => {
98                    timeout.as_mut().reset(deadline.into());
99                    ResponseTimeout::Enabled(timeout)
100                }
101                None => ResponseTimeout::Enabled(Box::pin(sleep(dur))),
102            },
103            _ => ResponseTimeout::Enabled(Box::pin(sleep(dur))),
104        };
105
106        Self {
107            payload: self.payload,
108            head: self.head,
109            timeout,
110            extensions: self.extensions,
111        }
112    }
113
114    /// This method does not enable timeout. It's used to pass the boxed `Sleep` from
115    /// `SendClientRequest` and reuse it's heap allocation together with it's slot in
116    /// timer wheel.
117    pub(crate) fn _timeout(mut self, timeout: Option<Pin<Box<Sleep>>>) -> Self {
118        self.timeout = ResponseTimeout::Disabled(timeout);
119        self
120    }
121
122    /// Load request cookies.
123    #[cfg(feature = "cookies")]
124    pub fn cookies(&self) -> Result<Ref<'_, Vec<Cookie<'static>>>, CookieParseError> {
125        struct Cookies(Vec<Cookie<'static>>);
126
127        if self.extensions().get::<Cookies>().is_none() {
128            let mut cookies = Vec::new();
129            for hdr in self.headers().get_all(&actix_http::header::SET_COOKIE) {
130                let s = std::str::from_utf8(hdr.as_bytes()).map_err(CookieParseError::from)?;
131                cookies.push(Cookie::parse_encoded(s)?.into_owned());
132            }
133            self.extensions_mut().insert(Cookies(cookies));
134        }
135
136        Ok(Ref::map(self.extensions(), |ext| {
137            &ext.get::<Cookies>().unwrap().0
138        }))
139    }
140
141    /// Return request cookie.
142    #[cfg(feature = "cookies")]
143    pub fn cookie(&self, name: &str) -> Option<Cookie<'static>> {
144        if let Ok(cookies) = self.cookies() {
145            for cookie in cookies.iter() {
146                if cookie.name() == name {
147                    return Some(cookie.to_owned());
148                }
149            }
150        }
151        None
152    }
153}
154
155impl<S> ClientResponse<S>
156where
157    S: Stream<Item = Result<Bytes, PayloadError>>,
158{
159    /// Returns a [`Future`] that consumes the body stream and resolves to [`Bytes`].
160    ///
161    /// # Errors
162    /// `Future` implementation returns error if:
163    /// - content length is greater than [limit](ResponseBody::limit) (default: 2 MiB)
164    ///
165    /// # Examples
166    /// ```no_run
167    /// # use awc::Client;
168    /// # use bytes::Bytes;
169    /// # #[actix_rt::main]
170    /// # async fn async_ctx() -> Result<(), Box<dyn std::error::Error>> {
171    /// let client = Client::default();
172    /// let mut res = client.get("https://httpbin.org/robots.txt").send().await?;
173    /// let body: Bytes = res.body().await?;
174    /// # Ok(())
175    /// # }
176    /// ```
177    ///
178    /// [`Future`]: std::future::Future
179    pub fn body(&mut self) -> ResponseBody<S> {
180        ResponseBody::new(self)
181    }
182
183    /// Returns a [`Future`] consumes the body stream, parses JSON, and resolves to a deserialized
184    /// `T` value.
185    ///
186    /// # Errors
187    /// Future returns error if:
188    /// - content type is not `application/json`;
189    /// - content length is greater than [limit](JsonBody::limit) (default: 2 MiB).
190    ///
191    /// # Examples
192    /// ```no_run
193    /// # use awc::Client;
194    /// # #[actix_rt::main]
195    /// # async fn async_ctx() -> Result<(), Box<dyn std::error::Error>> {
196    /// let client = Client::default();
197    /// let mut res = client.get("https://httpbin.org/json").send().await?;
198    /// let val = res.json::<serde_json::Value>().await?;
199    /// assert!(val.is_object());
200    /// # Ok(())
201    /// # }
202    /// ```
203    ///
204    /// [`Future`]: std::future::Future
205    pub fn json<T: DeserializeOwned>(&mut self) -> JsonBody<S, T> {
206        JsonBody::new(self)
207    }
208}
209
210impl<S> fmt::Debug for ClientResponse<S> {
211    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212        writeln!(f, "\nClientResponse {:?} {}", self.version(), self.status(),)?;
213        writeln!(f, "  headers:")?;
214        for (key, val) in self.headers().iter() {
215            writeln!(f, "    {:?}: {:?}", key, val)?;
216        }
217        Ok(())
218    }
219}
220
221impl<S> HttpMessage for ClientResponse<S> {
222    type Stream = S;
223
224    fn headers(&self) -> &HeaderMap {
225        &self.head.headers
226    }
227
228    fn take_payload(&mut self) -> Payload<S> {
229        mem::replace(&mut self.payload, Payload::None)
230    }
231
232    fn extensions(&self) -> Ref<'_, Extensions> {
233        self.extensions.borrow()
234    }
235
236    fn extensions_mut(&self) -> RefMut<'_, Extensions> {
237        self.extensions.borrow_mut()
238    }
239}
240
241impl<S> Stream for ClientResponse<S>
242where
243    S: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
244{
245    type Item = Result<Bytes, PayloadError>;
246
247    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
248        let this = self.project();
249        this.timeout.poll_timeout(cx)?;
250        this.payload.poll_next(cx)
251    }
252}
253
254#[cfg(test)]
255mod tests {
256    use static_assertions::assert_impl_all;
257
258    use super::*;
259    use crate::any_body::AnyBody;
260
261    assert_impl_all!(ClientResponse: Unpin);
262    assert_impl_all!(ClientResponse<()>: Unpin);
263    assert_impl_all!(ClientResponse<AnyBody>: Unpin);
264}