twitter_client/
response.rs

1use std::future::Future;
2use std::marker::PhantomData;
3use std::ops::Deref;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use bytes::Bytes;
8use futures_core::ready;
9use http::StatusCode;
10use http_body::Body;
11use pin_project_lite::pin_project;
12use serde::{de, Deserialize};
13
14use crate::error::{Error, ErrorCode, TwitterErrors};
15use crate::traits::HttpTryFuture;
16use crate::util::ConcatBody;
17use crate::RateLimit;
18
19#[derive(Debug)]
20pub struct Response<T> {
21    pub data: T,
22    pub rate_limit: Option<RateLimit>,
23}
24
25pin_project! {
26    pub struct ResponseFuture<T, F: HttpTryFuture> {
27        #[pin]
28        raw: RawResponseFuture<F>,
29        marker: PhantomData<fn() -> T>,
30    }
31}
32
33pin_project! {
34    pub struct RawResponseFuture<F: HttpTryFuture> {
35        #[pin]
36        inner: ResponseFutureInner<F>,
37    }
38}
39
40pin_project! {
41    #[project = ResponseFutureInnerProj]
42    enum ResponseFutureInner<F: HttpTryFuture> {
43        Resp {
44            #[pin]
45            response: F,
46        },
47        Body {
48            status: StatusCode,
49            rate_limit: Option<RateLimit>,
50            #[pin]
51            body: ConcatBody<F::Body>,
52        },
53    }
54}
55
56impl<T> Deref for Response<T> {
57    type Target = T;
58
59    fn deref(&self) -> &T {
60        &self.data
61    }
62}
63
64impl<T, F: HttpTryFuture> From<RawResponseFuture<F>> for ResponseFuture<T, F> {
65    fn from(raw: RawResponseFuture<F>) -> Self {
66        ResponseFuture {
67            raw,
68            marker: PhantomData,
69        }
70    }
71}
72
73impl<T: de::DeserializeOwned, F: HttpTryFuture> Future for ResponseFuture<T, F> {
74    #[allow(clippy::type_complexity)]
75    type Output = Result<Response<T>, Error<F::Error, <F::Body as Body>::Error>>;
76
77    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
78        let res = ready!(self.project().raw.poll(cx)?);
79        let data = serde_json::from_slice(&res.data).map_err(Error::Deserializing)?;
80        Poll::Ready(Ok(Response {
81            data,
82            rate_limit: res.rate_limit,
83        }))
84    }
85}
86
87impl<F: HttpTryFuture> RawResponseFuture<F> {
88    pub(crate) fn new(response: F) -> Self
89    where
90        F: HttpTryFuture,
91    {
92        RawResponseFuture {
93            inner: ResponseFutureInner::Resp { response },
94        }
95    }
96}
97
98impl<T, F: HttpTryFuture> From<ResponseFuture<T, F>> for RawResponseFuture<F> {
99    fn from(future: ResponseFuture<T, F>) -> Self {
100        future.raw
101    }
102}
103
104impl<F: HttpTryFuture> Future for RawResponseFuture<F> {
105    #[allow(clippy::type_complexity)]
106    type Output = Result<Response<Bytes>, Error<F::Error, <F::Body as Body>::Error>>;
107
108    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
109        if let ResponseFutureInnerProj::Resp { response } = self.as_mut().project().inner.project()
110        {
111            let res = ready!(response.try_poll(cx).map_err(Error::Service))?;
112
113            self.as_mut()
114                .project()
115                .inner
116                .set(ResponseFutureInner::Body {
117                    status: res.status(),
118                    rate_limit: RateLimit::from_headers(res.headers()),
119                    body: ConcatBody::new(res.into_body()),
120                });
121        }
122
123        if let ResponseFutureInnerProj::Body {
124            status,
125            rate_limit,
126            body,
127        } = self.project().inner.project()
128        {
129            let status = *status;
130            let rate_limit = *rate_limit;
131            let body = ready!(body.poll(cx).map_err(Error::Body))?;
132
133            let result = if let StatusCode::OK = status {
134                Ok(Response {
135                    data: body,
136                    rate_limit,
137                })
138            } else {
139                #[derive(Default, Deserialize)]
140                struct Errors {
141                    errors: Vec<ErrorCode>,
142                }
143                serde_json::from_slice(&body)
144                    .or_else(|_| Ok(Errors::default()))
145                    .and_then(|errors| {
146                        Err(Error::Twitter(TwitterErrors {
147                            status,
148                            errors: errors.errors,
149                            rate_limit,
150                        }))
151                    })
152            };
153
154            Poll::Ready(result)
155        } else {
156            unreachable!();
157        }
158    }
159}