awc/responses/
response.rs1use std::{
2 cell::{Ref, RefCell, RefMut},
3 fmt, mem,
4 pin::Pin,
5 task::{Context, Poll},
6 time::{Duration, Instant},
7};
8
9use actix_http::{
10 error::PayloadError, header::HeaderMap, BoxedPayloadStream, Extensions, HttpMessage,
11 Payload, ResponseHead, StatusCode, Version,
12};
13use actix_rt::time::{sleep, Sleep};
14use bytes::Bytes;
15use futures_core::Stream;
16use pin_project_lite::pin_project;
17use serde::de::DeserializeOwned;
18
19#[cfg(feature = "cookies")]
20use crate::cookie::{Cookie, ParseError as CookieParseError};
21
22use super::{JsonBody, ResponseBody, ResponseTimeout};
23
24pin_project! {
25 pub struct ClientResponse<S = BoxedPayloadStream> {
27 pub(crate) head: ResponseHead,
28 #[pin]
29 pub(crate) payload: Payload<S>,
30 pub(crate) timeout: ResponseTimeout,
31 pub(crate) extensions: RefCell<Extensions>,
32
33 }
34}
35
36impl<S> ClientResponse<S> {
37 pub(crate) fn new(head: ResponseHead, payload: Payload<S>) -> Self {
39 ClientResponse {
40 head,
41 payload,
42 timeout: ResponseTimeout::default(),
43 extensions: RefCell::new(Extensions::new()),
44 }
45 }
46
47 #[inline]
48 pub(crate) fn head(&self) -> &ResponseHead {
49 &self.head
50 }
51
52 #[inline]
54 pub fn version(&self) -> Version {
55 self.head().version
56 }
57
58 #[inline]
60 pub fn status(&self) -> StatusCode {
61 self.head().status
62 }
63
64 #[inline]
65 pub fn headers(&self) -> &HeaderMap {
67 &self.head().headers
68 }
69
70 pub fn map_body<F, U>(mut self, f: F) -> ClientResponse<U>
74 where
75 F: FnOnce(&mut ResponseHead, Payload<S>) -> Payload<U>,
76 {
77 let payload = f(&mut self.head, self.payload);
78
79 ClientResponse {
80 payload,
81 head: self.head,
82 timeout: self.timeout,
83 extensions: self.extensions,
84 }
85 }
86
87 pub fn timeout(self, dur: Duration) -> Self {
94 let timeout = match self.timeout {
95 ResponseTimeout::Disabled(Some(mut timeout))
96 | ResponseTimeout::Enabled(mut timeout) => match Instant::now().checked_add(dur) {
97 Some(deadline) => {
98 timeout.as_mut().reset(deadline.into());
99 ResponseTimeout::Enabled(timeout)
100 }
101 None => ResponseTimeout::Enabled(Box::pin(sleep(dur))),
102 },
103 _ => ResponseTimeout::Enabled(Box::pin(sleep(dur))),
104 };
105
106 Self {
107 payload: self.payload,
108 head: self.head,
109 timeout,
110 extensions: self.extensions,
111 }
112 }
113
114 pub(crate) fn _timeout(mut self, timeout: Option<Pin<Box<Sleep>>>) -> Self {
118 self.timeout = ResponseTimeout::Disabled(timeout);
119 self
120 }
121
122 #[cfg(feature = "cookies")]
124 pub fn cookies(&self) -> Result<Ref<'_, Vec<Cookie<'static>>>, CookieParseError> {
125 struct Cookies(Vec<Cookie<'static>>);
126
127 if self.extensions().get::<Cookies>().is_none() {
128 let mut cookies = Vec::new();
129 for hdr in self.headers().get_all(&actix_http::header::SET_COOKIE) {
130 let s = std::str::from_utf8(hdr.as_bytes()).map_err(CookieParseError::from)?;
131 cookies.push(Cookie::parse_encoded(s)?.into_owned());
132 }
133 self.extensions_mut().insert(Cookies(cookies));
134 }
135
136 Ok(Ref::map(self.extensions(), |ext| {
137 &ext.get::<Cookies>().unwrap().0
138 }))
139 }
140
141 #[cfg(feature = "cookies")]
143 pub fn cookie(&self, name: &str) -> Option<Cookie<'static>> {
144 if let Ok(cookies) = self.cookies() {
145 for cookie in cookies.iter() {
146 if cookie.name() == name {
147 return Some(cookie.to_owned());
148 }
149 }
150 }
151 None
152 }
153}
154
155impl<S> ClientResponse<S>
156where
157 S: Stream<Item = Result<Bytes, PayloadError>>,
158{
159 pub fn body(&mut self) -> ResponseBody<S> {
180 ResponseBody::new(self)
181 }
182
183 pub fn json<T: DeserializeOwned>(&mut self) -> JsonBody<S, T> {
206 JsonBody::new(self)
207 }
208}
209
210impl<S> fmt::Debug for ClientResponse<S> {
211 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212 writeln!(f, "\nClientResponse {:?} {}", self.version(), self.status(),)?;
213 writeln!(f, " headers:")?;
214 for (key, val) in self.headers().iter() {
215 writeln!(f, " {:?}: {:?}", key, val)?;
216 }
217 Ok(())
218 }
219}
220
221impl<S> HttpMessage for ClientResponse<S> {
222 type Stream = S;
223
224 fn headers(&self) -> &HeaderMap {
225 &self.head.headers
226 }
227
228 fn take_payload(&mut self) -> Payload<S> {
229 mem::replace(&mut self.payload, Payload::None)
230 }
231
232 fn extensions(&self) -> Ref<'_, Extensions> {
233 self.extensions.borrow()
234 }
235
236 fn extensions_mut(&self) -> RefMut<'_, Extensions> {
237 self.extensions.borrow_mut()
238 }
239}
240
241impl<S> Stream for ClientResponse<S>
242where
243 S: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
244{
245 type Item = Result<Bytes, PayloadError>;
246
247 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
248 let this = self.project();
249 this.timeout.poll_timeout(cx)?;
250 this.payload.poll_next(cx)
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use static_assertions::assert_impl_all;
257
258 use super::*;
259 use crate::any_body::AnyBody;
260
261 assert_impl_all!(ClientResponse: Unpin);
262 assert_impl_all!(ClientResponse<()>: Unpin);
263 assert_impl_all!(ClientResponse<AnyBody>: Unpin);
264}