qiniu_reqwest/
sync_client.rs

1use super::extensions::TimeoutExtension;
2use anyhow::Error as AnyError;
3use qiniu_http::{
4    HeaderMap, HeaderValue, HttpCaller, RequestParts, ResponseError, ResponseErrorKind, StatusCode, SyncRequest,
5    SyncResponse, SyncResponseBody, SyncResponseResult, TransferProgressInfo,
6};
7use reqwest::{
8    blocking::{
9        Body as SyncBody, Client as SyncReqwestClient, Request as SyncReqwestRequest, Response as SyncReqwestResponse,
10    },
11    header::USER_AGENT,
12    Error as ReqwestError, Url,
13};
14use std::{
15    io::{Error as IoError, ErrorKind as IoErrorKind, Read, Result as IoResult},
16    mem::take,
17    mem::transmute,
18    num::NonZeroU16,
19};
20
21#[cfg(feature = "async")]
22use {
23    futures::future::BoxFuture,
24    qiniu_http::{AsyncRequest, AsyncResponseResult},
25};
26
27/// Reqwest 阻塞客户端
28#[derive(Debug, Default)]
29pub struct SyncClient {
30    sync_client: SyncReqwestClient,
31}
32
33impl SyncClient {
34    /// 创建 Reqwest 阻塞客户端
35    #[inline]
36    pub fn new(sync_client: SyncReqwestClient) -> Self {
37        Self { sync_client }
38    }
39}
40
41impl From<SyncReqwestClient> for SyncClient {
42    #[inline]
43    fn from(sync_client: SyncReqwestClient) -> Self {
44        Self::new(sync_client)
45    }
46}
47
48impl HttpCaller for SyncClient {
49    fn call<'a>(&'a self, request: &'a mut SyncRequest<'_>) -> SyncResponseResult {
50        let mut user_cancelled_error: Option<ResponseError> = None;
51        let reqwest_request = make_sync_reqwest_request(request, &mut user_cancelled_error)?;
52        match self.sync_client.execute(reqwest_request) {
53            Ok(reqwest_response) => from_sync_response(reqwest_response, request),
54            Err(err) => user_cancelled_error.map_or_else(|| Err(from_reqwest_error(err, request)), Err),
55        }
56    }
57
58    #[inline]
59    #[cfg(feature = "async")]
60    #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))]
61    fn async_call<'a>(&'a self, _request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
62        unimplemented!("SyncClient does not support async call")
63    }
64}
65
66fn make_sync_reqwest_request(
67    request: &mut SyncRequest,
68    user_cancelled_error: &mut Option<ResponseError>,
69) -> Result<SyncReqwestRequest, ResponseError> {
70    let url = Url::parse(&request.url().to_string()).map_err(|err| {
71        ResponseError::builder(ResponseErrorKind::InvalidUrl, err)
72            .uri(request.url())
73            .build()
74    })?;
75    let mut reqwest_request = SyncReqwestRequest::new(request.method().to_owned(), url);
76    for (header_name, header_value) in request.headers() {
77        reqwest_request
78            .headers_mut()
79            .insert(header_name, header_value.to_owned());
80    }
81    reqwest_request
82        .headers_mut()
83        .insert(USER_AGENT, make_user_agent(request, "sync")?);
84    *reqwest_request.body_mut() = Some(SyncBody::sized(
85        RequestBodyWithCallbacks::new(request, user_cancelled_error),
86        request.body().size(),
87    ));
88
89    if let Some(timeout) = request.extensions().get::<TimeoutExtension>() {
90        *reqwest_request.timeout_mut() = Some(timeout.get());
91    }
92
93    return Ok(reqwest_request);
94
95    struct RequestBodyWithCallbacks {
96        request: &'static mut SyncRequest<'static>,
97        user_cancelled_error: &'static mut Option<ResponseError>,
98        have_read: u64,
99    }
100
101    impl RequestBodyWithCallbacks {
102        fn new(request: &mut SyncRequest, user_cancelled_error: &mut Option<ResponseError>) -> Self {
103            #[allow(unsafe_code)]
104            Self {
105                have_read: 0,
106                request: unsafe { transmute(request) },
107                user_cancelled_error: unsafe { transmute(user_cancelled_error) },
108            }
109        }
110    }
111
112    impl Read for RequestBodyWithCallbacks {
113        fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
114            let n = self.request.body_mut().read(buf)?;
115            match n {
116                0 => Ok(0),
117                n => {
118                    let buf = &buf[..n];
119                    self.have_read += n as u64;
120                    if let Some(on_uploading_progress) = self.request.on_uploading_progress() {
121                        on_uploading_progress(TransferProgressInfo::new(
122                            self.have_read,
123                            self.request.body().size(),
124                            buf,
125                        ))
126                        .map_err(|err| {
127                            *self.user_cancelled_error = Some(make_callback_error(err, self.request));
128                            IoError::new(IoErrorKind::Other, "on_uploading_progress() callback returns error")
129                        })?;
130                    }
131                    Ok(n)
132                }
133            }
134        }
135    }
136}
137
138pub(super) fn make_user_agent(request: &RequestParts, suffix: &str) -> Result<HeaderValue, ResponseError> {
139    HeaderValue::from_str(&format!(
140        "{}/qiniu-reqwest-{}/{}",
141        request.user_agent(),
142        env!("CARGO_PKG_VERSION"),
143        suffix
144    ))
145    .map_err(|err| {
146        ResponseError::builder(ResponseErrorKind::InvalidHeader, err)
147            .uri(request.url())
148            .build()
149    })
150}
151
152fn from_sync_response(mut response: SyncReqwestResponse, request: &mut SyncRequest) -> SyncResponseResult {
153    call_response_callbacks(request, response.status(), response.headers())?;
154    let mut response_builder = SyncResponse::builder();
155    response_builder
156        .status_code(response.status())
157        .version(response.version())
158        .headers(take(response.headers_mut()))
159        .extensions(take(request.extensions_mut()));
160    if let Some(port) = response.url().port_or_known_default().and_then(NonZeroU16::new) {
161        response_builder.server_port(port);
162    }
163    if let Some(remote_addr) = response.remote_addr() {
164        response_builder.server_ip(remote_addr.ip());
165        if let Some(port) = NonZeroU16::new(remote_addr.port()) {
166            response_builder.server_port(port);
167        }
168    }
169    response_builder.body(SyncResponseBody::from_reader(response));
170    Ok(response_builder.build())
171}
172
173pub(super) fn from_reqwest_error(err: ReqwestError, request: &RequestParts) -> ResponseError {
174    if err.url().is_some() {
175        ResponseError::builder(ResponseErrorKind::InvalidUrl, err)
176            .uri(request.url())
177            .build()
178    } else if err.is_redirect() {
179        ResponseError::builder(ResponseErrorKind::TooManyRedirect, err)
180            .uri(request.url())
181            .build()
182    } else if err.is_timeout() {
183        ResponseError::builder(ResponseErrorKind::TimeoutError, err)
184            .uri(request.url())
185            .build()
186    } else if err.is_request() {
187        ResponseError::builder(ResponseErrorKind::InvalidRequestResponse, err)
188            .uri(request.url())
189            .build()
190    } else if err.is_connect() {
191        ResponseError::builder(ResponseErrorKind::ConnectError, err)
192            .uri(request.url())
193            .build()
194    } else {
195        ResponseError::builder(ResponseErrorKind::UnknownError, err)
196            .uri(request.url())
197            .build()
198    }
199}
200
201pub(super) fn call_response_callbacks(
202    request: &RequestParts,
203    status_code: StatusCode,
204    headers: &HeaderMap,
205) -> Result<(), ResponseError> {
206    if let Some(on_receive_response_status) = request.on_receive_response_status() {
207        on_receive_response_status(status_code).map_err(|err| make_callback_error(err, request))?;
208    }
209    if let Some(on_receive_response_header) = request.on_receive_response_header() {
210        headers.iter().try_for_each(|(header_name, header_value)| {
211            on_receive_response_header(header_name, header_value).map_err(|err| make_callback_error(err, request))
212        })?;
213    }
214    Ok(())
215}
216
217pub(super) fn make_callback_error(err: AnyError, request: &RequestParts) -> ResponseError {
218    ResponseError::builder(ResponseErrorKind::CallbackError, err)
219        .uri(request.url())
220        .build()
221}