rig/http_client/
mod.rs

1use crate::http_client::sse::BoxedStream;
2use bytes::Bytes;
3use http::StatusCode;
4pub use http::{HeaderMap, HeaderValue, Method, Request, Response, Uri, request::Builder};
5use reqwest::{Body, multipart::Form};
6
7pub mod retry;
8pub mod sse;
9
10use std::pin::Pin;
11
12use crate::wasm_compat::*;
13
14#[derive(Debug, thiserror::Error)]
15pub enum Error {
16    #[error("Http error: {0}")]
17    Protocol(#[from] http::Error),
18    #[error("Invalid status code: {0}")]
19    InvalidStatusCode(StatusCode),
20    #[error("Invalid status code {0} with message: {1}")]
21    InvalidStatusCodeWithMessage(StatusCode, String),
22    #[error("Stream ended")]
23    StreamEnded,
24    #[error("Invalid content type was returned: {0:?}")]
25    InvalidContentType(HeaderValue),
26    #[cfg(not(target_family = "wasm"))]
27    #[error("Http client error: {0}")]
28    Instance(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
29
30    #[cfg(target_family = "wasm")]
31    #[error("Http client error: {0}")]
32    Instance(#[from] Box<dyn std::error::Error + 'static>),
33}
34
35pub type Result<T> = std::result::Result<T, Error>;
36
37#[cfg(not(target_family = "wasm"))]
38pub(crate) fn instance_error<E: std::error::Error + Send + Sync + 'static>(error: E) -> Error {
39    Error::Instance(error.into())
40}
41
42#[cfg(target_family = "wasm")]
43fn instance_error<E: std::error::Error + 'static>(error: E) -> Error {
44    Error::Instance(error.into())
45}
46
47pub type LazyBytes = WasmBoxedFuture<'static, Result<Bytes>>;
48pub type LazyBody<T> = WasmBoxedFuture<'static, Result<T>>;
49
50pub type StreamingResponse<T> = Response<T>;
51
52pub struct NoBody;
53
54impl From<NoBody> for Bytes {
55    fn from(_: NoBody) -> Self {
56        Bytes::new()
57    }
58}
59
60impl From<NoBody> for Body {
61    fn from(_: NoBody) -> Self {
62        reqwest::Body::default()
63    }
64}
65
66pub async fn text(response: Response<LazyBody<Vec<u8>>>) -> Result<String> {
67    let text = response.into_body().await?;
68    Ok(String::from(String::from_utf8_lossy(&text)))
69}
70
71pub fn with_bearer_auth(req: Builder, auth: &str) -> Result<Builder> {
72    let auth_header =
73        HeaderValue::from_str(&format!("Bearer {}", auth)).map_err(http::Error::from)?;
74
75    Ok(req.header("Authorization", auth_header))
76}
77
78/// A helper trait to make generic requests (both regular and SSE) possible.
79pub trait HttpClientExt: WasmCompatSend + WasmCompatSync {
80    /// Send a HTTP request, get a response back (as bytes). Response must be able to be turned back into Bytes.
81    fn send<T, U>(
82        &self,
83        req: Request<T>,
84    ) -> impl Future<Output = Result<Response<LazyBody<U>>>> + WasmCompatSend + 'static
85    where
86        T: Into<Bytes>,
87        T: WasmCompatSend,
88        U: From<Bytes>,
89        U: WasmCompatSend + 'static;
90
91    /// Send a HTTP request with a multipart body, get a response back (as bytes). Response must be able to be turned back into Bytes (although usually for the response, you will probably want to specify Bytes anyway).
92    fn send_multipart<U>(
93        &self,
94        req: Request<Form>,
95    ) -> impl Future<Output = Result<Response<LazyBody<U>>>> + WasmCompatSend + 'static
96    where
97        U: From<Bytes>,
98        U: WasmCompatSend + 'static;
99
100    /// Send a HTTP request, get a streamed response back (as a stream of [`bytes::Bytes`].)
101    fn send_streaming<T>(
102        &self,
103        req: Request<T>,
104    ) -> impl Future<Output = Result<StreamingResponse<BoxedStream>>> + WasmCompatSend
105    where
106        T: Into<Bytes>;
107}
108
109impl HttpClientExt for reqwest::Client {
110    fn send<T, U>(
111        &self,
112        req: Request<T>,
113    ) -> impl Future<Output = Result<Response<LazyBody<U>>>> + WasmCompatSend + 'static
114    where
115        T: Into<Bytes>,
116        U: From<Bytes> + WasmCompatSend,
117    {
118        let (parts, body) = req.into_parts();
119        let req = self
120            .request(parts.method, parts.uri.to_string())
121            .headers(parts.headers)
122            .body(body.into());
123
124        async move {
125            let response = req.send().await.map_err(instance_error)?;
126            if !response.status().is_success() {
127                return Err(Error::InvalidStatusCodeWithMessage(
128                    response.status(),
129                    response.text().await.unwrap(),
130                ));
131            }
132
133            let mut res = Response::builder().status(response.status());
134
135            if let Some(hs) = res.headers_mut() {
136                *hs = response.headers().clone();
137            }
138
139            let body: LazyBody<U> = Box::pin(async {
140                let bytes = response
141                    .bytes()
142                    .await
143                    .map_err(|e| Error::Instance(e.into()))?;
144
145                let body = U::from(bytes);
146                Ok(body)
147            });
148
149            res.body(body).map_err(Error::Protocol)
150        }
151    }
152
153    fn send_multipart<U>(
154        &self,
155        req: Request<Form>,
156    ) -> impl Future<Output = Result<Response<LazyBody<U>>>> + WasmCompatSend + 'static
157    where
158        U: From<Bytes>,
159        U: WasmCompatSend + 'static,
160    {
161        let (parts, body) = req.into_parts();
162        let req = self
163            .request(parts.method, parts.uri.to_string())
164            .headers(parts.headers)
165            .multipart(body);
166
167        async move {
168            let response = req.send().await.map_err(instance_error)?;
169            if !response.status().is_success() {
170                return Err(Error::InvalidStatusCodeWithMessage(
171                    response.status(),
172                    response.text().await.unwrap(),
173                ));
174            }
175
176            let mut res = Response::builder().status(response.status());
177
178            if let Some(hs) = res.headers_mut() {
179                *hs = response.headers().clone();
180            }
181
182            let body: LazyBody<U> = Box::pin(async {
183                let bytes = response
184                    .bytes()
185                    .await
186                    .map_err(|e| Error::Instance(e.into()))?;
187
188                let body = U::from(bytes);
189                Ok(body)
190            });
191
192            res.body(body).map_err(Error::Protocol)
193        }
194    }
195
196    fn send_streaming<T>(
197        &self,
198        req: Request<T>,
199    ) -> impl Future<Output = Result<StreamingResponse<BoxedStream>>> + WasmCompatSend
200    where
201        T: Into<Bytes>,
202    {
203        let (parts, body) = req.into_parts();
204
205        let req = self
206            .request(parts.method, parts.uri.to_string())
207            .headers(parts.headers)
208            .body(body.into())
209            .build()
210            .map_err(|x| Error::Instance(x.into()))
211            .unwrap();
212
213        let client = self.clone();
214
215        async move {
216            let response: reqwest::Response = client.execute(req).await.map_err(instance_error)?;
217            if !response.status().is_success() {
218                return Err(Error::InvalidStatusCodeWithMessage(
219                    response.status(),
220                    response.text().await.unwrap(),
221                ));
222            }
223
224            #[cfg(not(target_family = "wasm"))]
225            let mut res = Response::builder()
226                .status(response.status())
227                .version(response.version());
228
229            #[cfg(target_family = "wasm")]
230            let mut res = Response::builder().status(response.status());
231
232            if let Some(hs) = res.headers_mut() {
233                *hs = response.headers().clone();
234            }
235
236            use futures::StreamExt;
237
238            let mapped_stream: Pin<Box<dyn WasmCompatSendStream<InnerItem = Result<Bytes>>>> =
239                Box::pin(
240                    response
241                        .bytes_stream()
242                        .map(|chunk| chunk.map_err(|e| Error::Instance(Box::new(e)))),
243                );
244
245            res.body(mapped_stream).map_err(Error::Protocol)
246        }
247    }
248}