connect_rpc/
reqwest.rs

1use std::future::Future;
2
3use bytes::Bytes;
4
5use crate::{
6    request::{ConnectRequest, UnaryGetRequest, UnaryRequest},
7    response::{
8        error::{ConnectCode, ConnectError},
9        UnaryResponse, ValidateOpts,
10    },
11    Error,
12};
13
14pub trait ReqwestClientExt {
15    /// Executes a Connect RPC [`UnaryRequest`].
16    fn execute_unary(
17        &self,
18        req: UnaryRequest<impl Into<reqwest::Body>>,
19    ) -> impl Future<Output = Result<UnaryResponse<Bytes>, Error>>;
20
21    /// Executes a Connect RPC [`UnaryGetRequest`].
22    fn execute_unary_get(
23        &self,
24        req: UnaryGetRequest,
25    ) -> impl Future<Output = Result<UnaryResponse<Bytes>, Error>>;
26}
27
28impl ReqwestClientExt for reqwest::Client {
29    async fn execute_unary(
30        &self,
31        req: UnaryRequest<impl Into<reqwest::Body>>,
32    ) -> Result<UnaryResponse<Bytes>, Error> {
33        let validate_opts = ValidateOpts::from_request(&req);
34        let resp = self.execute(req.try_into()?).await?;
35        let connect_resp: UnaryResponse<_> = response_to_http_bytes(resp).await?.into();
36        connect_resp.result(&validate_opts)
37    }
38
39    async fn execute_unary_get(&self, req: UnaryGetRequest) -> Result<UnaryResponse<Bytes>, Error> {
40        let validate_opts = ValidateOpts::from_request(&req);
41        let resp = self.execute(req.try_into()?).await?;
42        let connect_resp: UnaryResponse<_> = response_to_http_bytes(resp).await?.into();
43        connect_resp.result(&validate_opts)
44    }
45}
46
47async fn response_to_http_bytes(
48    mut resp: reqwest::Response,
49) -> Result<http::Response<Bytes>, Error> {
50    let status = resp.status();
51    let headers = std::mem::take(resp.headers_mut());
52    let body = resp.bytes().await?;
53    let mut http_resp = http::Response::new(body);
54    *http_resp.status_mut() = status;
55    *http_resp.headers_mut() = headers;
56    Ok(http_resp)
57}
58
59impl<T: Into<reqwest::Body>> TryFrom<UnaryRequest<T>> for reqwest::Request {
60    type Error = Error;
61
62    fn try_from(req: UnaryRequest<T>) -> Result<Self, Self::Error> {
63        let timeout = req.timeout();
64        let mut req = reqwest::Request::try_from(http::Request::from(req))?;
65        *req.timeout_mut() = timeout;
66        Ok(req)
67    }
68}
69
70impl TryFrom<UnaryGetRequest> for reqwest::Request {
71    type Error = Error;
72
73    fn try_from(req: UnaryGetRequest) -> Result<Self, Self::Error> {
74        let timeout = req.timeout();
75        let http_req = http::Request::from(req).map(|()| reqwest::Body::default());
76        let mut req = reqwest::Request::try_from(http_req)?;
77        *req.timeout_mut() = timeout;
78        Ok(req)
79    }
80}
81
82impl From<reqwest::Error> for Error {
83    fn from(err: reqwest::Error) -> Self {
84        if err.is_timeout() {
85            Self::ConnectError(ConnectError::new(
86                ConnectCode::DeadlineExceeded,
87                "request timed out",
88            ))
89        } else {
90            Self::ReqwestError(err)
91        }
92    }
93}