1use std::{
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8#[cfg(feature = "stream")]
9use futures_util::{Stream, TryStreamExt as _};
10use pin_project::pin_project;
11use reqwest::{Client, Url};
12use serde::de::DeserializeOwned;
13
14use crate::{Payload, error::ApiErr};
15
16#[allow(clippy::type_complexity)]
52#[pin_project]
53pub struct Request<P, O, M>
54where
55 P: Payload,
56{
57 client: Client,
58 base_url: String,
59 payload: Option<P>,
60 future: Option<Pin<Box<dyn Future<Output = Result<O, ApiErr>> + Send + Sync + 'static>>>,
61 _marker: std::marker::PhantomData<M>,
62}
63
64impl<P, O, M> Request<P, O, M>
65where
66 P: Payload,
67{
68 pub fn new(payload: P, base_url: String, client: Client) -> Self {
70 Self {
71 client,
72 base_url,
73 payload: Some(payload),
74 future: None,
75 _marker: std::marker::PhantomData,
76 }
77 }
78}
79
80impl<P, O> Future for Request<P, O, ()>
81where
82 P: Payload,
83 O: DeserializeOwned,
84{
85 type Output = Result<O, ApiErr>;
86
87 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
88 let this = self.project();
89 if this.payload.is_some() {
90 let payload = this.payload.take().unwrap();
91 let client = this.client.clone();
92 let base_url = this.base_url.drain(..).collect::<String>();
93 let future = Box::pin(async move {
94 let mut req = client.request(
95 P::METHOD,
96 Url::parse(&base_url)
97 .unwrap()
98 .join(&payload.path().unwrap_or_default())
99 .unwrap(),
100 );
101 req = payload.req_option(req);
102 let resp = req.send().await?;
103 let mut body = resp.text().await?;
104 if let Some(pre_op) = P::before_deserialize() {
105 body = pre_op(body)?;
106 }
107 P::deserialize(body)
108 });
109 *this.future = Some(future);
110 }
111 let future = this.future.as_mut().unwrap().as_mut();
112 future.poll(cx)
113 }
114}
115
116#[cfg(feature = "stream")]
120pub type RespStream = Box<dyn Stream<Item = Result<bytes::Bytes, ApiErr>> + Send>;
121
122#[cfg(feature = "stream")]
123impl<P> Future for Request<P, RespStream, ((),)>
124where
125 P: Payload,
126{
127 type Output = Result<RespStream, ApiErr>;
128
129 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
130 let this = self.project();
131 if this.payload.is_some() {
132 let payload = this.payload.take().unwrap();
133 let client = this.client.clone();
134 let base_url = this.base_url.drain(..).collect::<String>();
135 let future = Box::pin(async move {
136 let mut req = client.request(
137 P::METHOD,
138 Url::parse(&base_url)
139 .unwrap()
140 .join(&payload.path().unwrap_or_default())
141 .unwrap(),
142 );
143 req = payload.req_option(req);
144 let resp = req.send().await?;
145 let stream = resp.bytes_stream().err_into();
146 Ok(Box::new(stream) as RespStream)
147 });
148 *this.future = Some(future);
149 }
150 let future = this.future.as_mut().unwrap().as_mut();
151 future.poll(cx)
152 }
153}