api_req/
request.rs

1//! Request from `ApiCaller` and `Payload`, a future that can be awaited to get the response
2
3use std::{
4    pin::Pin,
5    task::{Context, Poll},
6};
7
8#[cfg(feature = "stream")]
9use futures_util::{Stream, TryStreamExt as _};
10use pin_project::pin_project;
11use reqwest::{Client, Url};
12use serde::de::DeserializeOwned;
13
14use crate::{Payload, error::ApiErr};
15
16/// A request to the API, wrapping the payload into a future.
17///
18/// Create from a ApiCaller and a Payload.
19///
20/// # Example
21/// ```no_run
22/// use api_req::{Payload, ApiCaller, Method, ApiCaller as _};
23/// use serde::{Serialize, Deserialize};
24///
25/// #[derive(Debug, Default, Clone, Serialize, Payload)]
26/// #[api_req(
27///     path = "/payments/{customer_id}",   // customer_id from struct field
28///     method = Method::POST,
29/// )]
30/// pub struct ExamplePayload {
31///     #[serde(skip_serializing)]
32///     customer_id: String,
33///     amount: usize,
34/// }
35///
36/// #[derive(Debug, Deserialize)]
37/// struct ExampleResponse {
38///     client_secret: String,
39/// }
40///
41/// #[derive(ApiCaller)]
42/// #[api_req(base_url = "http://example.com")]
43/// struct ExampleApi;
44/// # async {
45/// let payload = ExamplePayload::default();
46/// let _resp: ExampleResponse = ExampleApi::request(payload).await.unwrap();
47/// # };
48/// // this will send a POST request to http://example.com/payments/{customer_id}
49/// // with json `{"amount": 100}`
50/// ```
51#[allow(clippy::type_complexity)]
52#[pin_project]
53pub struct Request<P, O, M>
54where
55    P: Payload,
56{
57    client: Client,
58    base_url: String,
59    payload: Option<P>,
60    future: Option<Pin<Box<dyn Future<Output = Result<O, ApiErr>> + Send + Sync + 'static>>>,
61    _marker: std::marker::PhantomData<M>,
62}
63
64impl<P, O, M> Request<P, O, M>
65where
66    P: Payload,
67{
68    /// Create a new request
69    pub fn new(payload: P, base_url: String, client: Client) -> Self {
70        Self {
71            client,
72            base_url,
73            payload: Some(payload),
74            future: None,
75            _marker: std::marker::PhantomData,
76        }
77    }
78}
79
80impl<P, O> Future for Request<P, O, ()>
81where
82    P: Payload,
83    O: DeserializeOwned,
84{
85    type Output = Result<O, ApiErr>;
86
87    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
88        let this = self.project();
89        if this.payload.is_some() {
90            let payload = this.payload.take().unwrap();
91            let client = this.client.clone();
92            let base_url = this.base_url.drain(..).collect::<String>();
93            let future = Box::pin(async move {
94                let mut req = client.request(
95                    P::METHOD,
96                    Url::parse(&base_url)
97                        .unwrap()
98                        .join(&payload.path().unwrap_or_default())
99                        .unwrap(),
100                );
101                req = payload.req_option(req);
102                let resp = req.send().await?;
103                let mut body = resp.text().await?;
104                if let Some(pre_op) = P::before_deserialize() {
105                    body = pre_op(body)?;
106                }
107                P::deserialize(body)
108            });
109            *this.future = Some(future);
110        }
111        let future = this.future.as_mut().unwrap().as_mut();
112        future.poll(cx)
113    }
114}
115
116/// `Bytes` response stream name alias
117///
118/// `api_req` re-export `StreamExt` from `futures-util`.
119#[cfg(feature = "stream")]
120pub type RespStream = Box<dyn Stream<Item = Result<bytes::Bytes, ApiErr>> + Send>;
121
122#[cfg(feature = "stream")]
123impl<P> Future for Request<P, RespStream, ((),)>
124where
125    P: Payload,
126{
127    type Output = Result<RespStream, ApiErr>;
128
129    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
130        let this = self.project();
131        if this.payload.is_some() {
132            let payload = this.payload.take().unwrap();
133            let client = this.client.clone();
134            let base_url = this.base_url.drain(..).collect::<String>();
135            let future = Box::pin(async move {
136                let mut req = client.request(
137                    P::METHOD,
138                    Url::parse(&base_url)
139                        .unwrap()
140                        .join(&payload.path().unwrap_or_default())
141                        .unwrap(),
142                );
143                req = payload.req_option(req);
144                let resp = req.send().await?;
145                let stream = resp.bytes_stream().err_into();
146                Ok(Box::new(stream) as RespStream)
147            });
148            *this.future = Some(future);
149        }
150        let future = this.future.as_mut().unwrap().as_mut();
151        future.poll(cx)
152    }
153}