rig/
http_client.rs

1use crate::if_wasm;
2use bytes::Bytes;
3#[cfg(not(target_family = "wasm"))]
4use futures::stream::BoxStream;
5#[cfg(target_family = "wasm")]
6use futures::stream::Stream;
7pub use http::{HeaderMap, HeaderValue, Method, Request, Response, Uri, request::Builder};
8use reqwest::Body;
9use std::future::Future;
10
11if_wasm! {
12    use std::pin::Pin;
13}
14
15use crate::wasm_compat::*;
16
17#[derive(Debug, thiserror::Error)]
18pub enum Error {
19    #[error("Http error: {0}")]
20    Protocol(#[from] http::Error),
21    #[cfg(not(target_family = "wasm"))]
22    #[error("Http client error: {0}")]
23    Instance(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
24
25    #[cfg(target_family = "wasm")]
26    #[error("Http client error: {0}")]
27    Instance(#[from] Box<dyn std::error::Error + 'static>),
28}
29
30pub type Result<T> = std::result::Result<T, Error>;
31
32#[cfg(not(target_family = "wasm"))]
33fn instance_error<E: std::error::Error + Send + Sync + 'static>(error: E) -> Error {
34    Error::Instance(error.into())
35}
36
37#[cfg(target_family = "wasm")]
38fn instance_error<E: std::error::Error + 'static>(error: E) -> Error {
39    Error::Instance(error.into())
40}
41
42pub type LazyBytes = WasmBoxedFuture<'static, Result<Bytes>>;
43pub type LazyBody<T> = WasmBoxedFuture<'static, Result<T>>;
44
45#[cfg(not(target_family = "wasm"))]
46pub type ByteStream = BoxStream<'static, Result<Bytes>>;
47
48#[cfg(target_family = "wasm")]
49pub type ByteStream = Pin<Box<dyn Stream<Item = Result<Bytes>> + 'static>>;
50
51pub type StreamingResponse = Response<ByteStream>;
52
53pub struct NoBody;
54
55impl From<NoBody> for Bytes {
56    fn from(_: NoBody) -> Self {
57        Bytes::new()
58    }
59}
60
61impl From<NoBody> for Body {
62    fn from(_: NoBody) -> Self {
63        reqwest::Body::default()
64    }
65}
66
67pub async fn text(response: Response<LazyBody<Vec<u8>>>) -> Result<String> {
68    let text = response.into_body().await?;
69    Ok(String::from(String::from_utf8_lossy(&text)))
70}
71
72pub fn with_bearer_auth(req: Builder, auth: &str) -> Result<Builder> {
73    let auth_header =
74        HeaderValue::from_str(&format!("Bearer {}", auth)).map_err(http::Error::from)?;
75
76    Ok(req.header("Authorization", auth_header))
77}
78
79pub trait HttpClientExt: WasmCompatSend + WasmCompatSync {
80    fn send<T, U>(
81        &self,
82        req: Request<T>,
83    ) -> impl Future<Output = Result<Response<LazyBody<U>>>> + WasmCompatSend + 'static
84    where
85        T: Into<Bytes>,
86        T: WasmCompatSend,
87        U: From<Bytes>,
88        U: WasmCompatSend + 'static;
89
90    fn send_streaming<T>(
91        &self,
92        req: Request<T>,
93    ) -> impl Future<Output = Result<StreamingResponse>> + WasmCompatSend + 'static
94    where
95        T: Into<Bytes>;
96}
97
98impl HttpClientExt for reqwest::Client {
99    fn send<T, U>(
100        &self,
101        req: Request<T>,
102    ) -> impl Future<Output = Result<Response<LazyBody<U>>>> + WasmCompatSend + 'static
103    where
104        T: Into<Bytes>,
105        U: From<Bytes> + WasmCompatSend,
106    {
107        let (parts, body) = req.into_parts();
108        let req = self
109            .request(parts.method, parts.uri.to_string())
110            .headers(parts.headers)
111            .body(body.into());
112
113        async move {
114            let response = req.send().await.map_err(instance_error)?;
115
116            let mut res = Response::builder().status(response.status());
117
118            if let Some(hs) = res.headers_mut() {
119                *hs = response.headers().clone();
120            }
121
122            let body: LazyBody<U> = Box::pin(async {
123                let bytes = response
124                    .bytes()
125                    .await
126                    .map_err(|e| Error::Instance(e.into()))?;
127
128                let body = U::from(bytes);
129                Ok(body)
130            });
131
132            res.body(body).map_err(Error::Protocol)
133        }
134    }
135
136    fn send_streaming<T>(
137        &self,
138        req: Request<T>,
139    ) -> impl Future<Output = Result<StreamingResponse>> + WasmCompatSend + 'static
140    where
141        T: Into<Bytes>,
142    {
143        let (parts, body) = req.into_parts();
144        let req = self
145            .request(parts.method, parts.uri.to_string())
146            .headers(parts.headers)
147            .body(body.into());
148
149        async move {
150            let response: reqwest::Response = req.send().await.map_err(instance_error)?;
151
152            #[cfg(not(target_family = "wasm"))]
153            let mut res = Response::builder()
154                .status(response.status())
155                .version(response.version());
156
157            #[cfg(target_family = "wasm")]
158            let mut res = Response::builder().status(response.status());
159
160            if let Some(hs) = res.headers_mut() {
161                *hs = response.headers().clone();
162            }
163
164            let stream: ByteStream = {
165                use futures::TryStreamExt;
166                Box::pin(response.bytes_stream().map_err(instance_error))
167            };
168
169            Ok(res.body(stream)?)
170        }
171    }
172}