1use crate::http_client::sse::BoxedStream;
2use bytes::Bytes;
3use http::StatusCode;
4pub use http::{HeaderMap, HeaderValue, Method, Request, Response, Uri, request::Builder};
5use reqwest::{Body, multipart::Form};
6
7pub mod retry;
8pub mod sse;
9
10use std::pin::Pin;
11
12use crate::wasm_compat::*;
13
14#[derive(Debug, thiserror::Error)]
15pub enum Error {
16 #[error("Http error: {0}")]
17 Protocol(#[from] http::Error),
18 #[error("Invalid status code: {0}")]
19 InvalidStatusCode(StatusCode),
20 #[error("Invalid status code {0} with message: {1}")]
21 InvalidStatusCodeWithMessage(StatusCode, String),
22 #[error("Stream ended")]
23 StreamEnded,
24 #[error("Invalid content type was returned: {0:?}")]
25 InvalidContentType(HeaderValue),
26 #[cfg(not(target_family = "wasm"))]
27 #[error("Http client error: {0}")]
28 Instance(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
29
30 #[cfg(target_family = "wasm")]
31 #[error("Http client error: {0}")]
32 Instance(#[from] Box<dyn std::error::Error + 'static>),
33}
34
35pub type Result<T> = std::result::Result<T, Error>;
36
37#[cfg(not(target_family = "wasm"))]
38pub(crate) fn instance_error<E: std::error::Error + Send + Sync + 'static>(error: E) -> Error {
39 Error::Instance(error.into())
40}
41
42#[cfg(target_family = "wasm")]
43fn instance_error<E: std::error::Error + 'static>(error: E) -> Error {
44 Error::Instance(error.into())
45}
46
47pub type LazyBytes = WasmBoxedFuture<'static, Result<Bytes>>;
48pub type LazyBody<T> = WasmBoxedFuture<'static, Result<T>>;
49
50pub type StreamingResponse<T> = Response<T>;
51
52pub struct NoBody;
53
54impl From<NoBody> for Bytes {
55 fn from(_: NoBody) -> Self {
56 Bytes::new()
57 }
58}
59
60impl From<NoBody> for Body {
61 fn from(_: NoBody) -> Self {
62 reqwest::Body::default()
63 }
64}
65
66pub async fn text(response: Response<LazyBody<Vec<u8>>>) -> Result<String> {
67 let text = response.into_body().await?;
68 Ok(String::from(String::from_utf8_lossy(&text)))
69}
70
71pub fn with_bearer_auth(req: Builder, auth: &str) -> Result<Builder> {
72 let auth_header =
73 HeaderValue::from_str(&format!("Bearer {}", auth)).map_err(http::Error::from)?;
74
75 Ok(req.header("Authorization", auth_header))
76}
77
78pub trait HttpClientExt: WasmCompatSend + WasmCompatSync {
80 fn send<T, U>(
82 &self,
83 req: Request<T>,
84 ) -> impl Future<Output = Result<Response<LazyBody<U>>>> + WasmCompatSend + 'static
85 where
86 T: Into<Bytes>,
87 T: WasmCompatSend,
88 U: From<Bytes>,
89 U: WasmCompatSend + 'static;
90
91 fn send_multipart<U>(
93 &self,
94 req: Request<Form>,
95 ) -> impl Future<Output = Result<Response<LazyBody<U>>>> + WasmCompatSend + 'static
96 where
97 U: From<Bytes>,
98 U: WasmCompatSend + 'static;
99
100 fn send_streaming<T>(
102 &self,
103 req: Request<T>,
104 ) -> impl Future<Output = Result<StreamingResponse<BoxedStream>>> + WasmCompatSend
105 where
106 T: Into<Bytes>;
107}
108
109impl HttpClientExt for reqwest::Client {
110 fn send<T, U>(
111 &self,
112 req: Request<T>,
113 ) -> impl Future<Output = Result<Response<LazyBody<U>>>> + WasmCompatSend + 'static
114 where
115 T: Into<Bytes>,
116 U: From<Bytes> + WasmCompatSend,
117 {
118 let (parts, body) = req.into_parts();
119 let req = self
120 .request(parts.method, parts.uri.to_string())
121 .headers(parts.headers)
122 .body(body.into());
123
124 async move {
125 let response = req.send().await.map_err(instance_error)?;
126 if !response.status().is_success() {
127 return Err(Error::InvalidStatusCodeWithMessage(
128 response.status(),
129 response.text().await.unwrap(),
130 ));
131 }
132
133 let mut res = Response::builder().status(response.status());
134
135 if let Some(hs) = res.headers_mut() {
136 *hs = response.headers().clone();
137 }
138
139 let body: LazyBody<U> = Box::pin(async {
140 let bytes = response
141 .bytes()
142 .await
143 .map_err(|e| Error::Instance(e.into()))?;
144
145 let body = U::from(bytes);
146 Ok(body)
147 });
148
149 res.body(body).map_err(Error::Protocol)
150 }
151 }
152
153 fn send_multipart<U>(
154 &self,
155 req: Request<Form>,
156 ) -> impl Future<Output = Result<Response<LazyBody<U>>>> + WasmCompatSend + 'static
157 where
158 U: From<Bytes>,
159 U: WasmCompatSend + 'static,
160 {
161 let (parts, body) = req.into_parts();
162 let req = self
163 .request(parts.method, parts.uri.to_string())
164 .headers(parts.headers)
165 .multipart(body);
166
167 async move {
168 let response = req.send().await.map_err(instance_error)?;
169 if !response.status().is_success() {
170 return Err(Error::InvalidStatusCodeWithMessage(
171 response.status(),
172 response.text().await.unwrap(),
173 ));
174 }
175
176 let mut res = Response::builder().status(response.status());
177
178 if let Some(hs) = res.headers_mut() {
179 *hs = response.headers().clone();
180 }
181
182 let body: LazyBody<U> = Box::pin(async {
183 let bytes = response
184 .bytes()
185 .await
186 .map_err(|e| Error::Instance(e.into()))?;
187
188 let body = U::from(bytes);
189 Ok(body)
190 });
191
192 res.body(body).map_err(Error::Protocol)
193 }
194 }
195
196 fn send_streaming<T>(
197 &self,
198 req: Request<T>,
199 ) -> impl Future<Output = Result<StreamingResponse<BoxedStream>>> + WasmCompatSend
200 where
201 T: Into<Bytes>,
202 {
203 let (parts, body) = req.into_parts();
204
205 let req = self
206 .request(parts.method, parts.uri.to_string())
207 .headers(parts.headers)
208 .body(body.into())
209 .build()
210 .map_err(|x| Error::Instance(x.into()))
211 .unwrap();
212
213 let client = self.clone();
214
215 async move {
216 let response: reqwest::Response = client.execute(req).await.map_err(instance_error)?;
217 if !response.status().is_success() {
218 return Err(Error::InvalidStatusCodeWithMessage(
219 response.status(),
220 response.text().await.unwrap(),
221 ));
222 }
223
224 #[cfg(not(target_family = "wasm"))]
225 let mut res = Response::builder()
226 .status(response.status())
227 .version(response.version());
228
229 #[cfg(target_family = "wasm")]
230 let mut res = Response::builder().status(response.status());
231
232 if let Some(hs) = res.headers_mut() {
233 *hs = response.headers().clone();
234 }
235
236 use futures::StreamExt;
237
238 let mapped_stream: Pin<Box<dyn WasmCompatSendStream<InnerItem = Result<Bytes>>>> =
239 Box::pin(
240 response
241 .bytes_stream()
242 .map(|chunk| chunk.map_err(|e| Error::Instance(Box::new(e)))),
243 );
244
245 res.body(mapped_stream).map_err(Error::Protocol)
246 }
247 }
248}