1use super::extensions::TimeoutExtension;
2use anyhow::Error as AnyError;
3use qiniu_http::{
4 HeaderMap, HeaderValue, HttpCaller, RequestParts, ResponseError, ResponseErrorKind, StatusCode, SyncRequest,
5 SyncResponse, SyncResponseBody, SyncResponseResult, TransferProgressInfo,
6};
7use reqwest::{
8 blocking::{
9 Body as SyncBody, Client as SyncReqwestClient, Request as SyncReqwestRequest, Response as SyncReqwestResponse,
10 },
11 header::USER_AGENT,
12 Error as ReqwestError, Url,
13};
14use std::{
15 io::{Error as IoError, ErrorKind as IoErrorKind, Read, Result as IoResult},
16 mem::take,
17 mem::transmute,
18 num::NonZeroU16,
19};
20
21#[cfg(feature = "async")]
22use {
23 futures::future::BoxFuture,
24 qiniu_http::{AsyncRequest, AsyncResponseResult},
25};
26
27#[derive(Debug, Default)]
29pub struct SyncClient {
30 sync_client: SyncReqwestClient,
31}
32
33impl SyncClient {
34 #[inline]
36 pub fn new(sync_client: SyncReqwestClient) -> Self {
37 Self { sync_client }
38 }
39}
40
41impl From<SyncReqwestClient> for SyncClient {
42 #[inline]
43 fn from(sync_client: SyncReqwestClient) -> Self {
44 Self::new(sync_client)
45 }
46}
47
48impl HttpCaller for SyncClient {
49 fn call<'a>(&'a self, request: &'a mut SyncRequest<'_>) -> SyncResponseResult {
50 let mut user_cancelled_error: Option<ResponseError> = None;
51 let reqwest_request = make_sync_reqwest_request(request, &mut user_cancelled_error)?;
52 match self.sync_client.execute(reqwest_request) {
53 Ok(reqwest_response) => from_sync_response(reqwest_response, request),
54 Err(err) => user_cancelled_error.map_or_else(|| Err(from_reqwest_error(err, request)), Err),
55 }
56 }
57
58 #[inline]
59 #[cfg(feature = "async")]
60 #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))]
61 fn async_call<'a>(&'a self, _request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
62 unimplemented!("SyncClient does not support async call")
63 }
64}
65
66fn make_sync_reqwest_request(
67 request: &mut SyncRequest,
68 user_cancelled_error: &mut Option<ResponseError>,
69) -> Result<SyncReqwestRequest, ResponseError> {
70 let url = Url::parse(&request.url().to_string()).map_err(|err| {
71 ResponseError::builder(ResponseErrorKind::InvalidUrl, err)
72 .uri(request.url())
73 .build()
74 })?;
75 let mut reqwest_request = SyncReqwestRequest::new(request.method().to_owned(), url);
76 for (header_name, header_value) in request.headers() {
77 reqwest_request
78 .headers_mut()
79 .insert(header_name, header_value.to_owned());
80 }
81 reqwest_request
82 .headers_mut()
83 .insert(USER_AGENT, make_user_agent(request, "sync")?);
84 *reqwest_request.body_mut() = Some(SyncBody::sized(
85 RequestBodyWithCallbacks::new(request, user_cancelled_error),
86 request.body().size(),
87 ));
88
89 if let Some(timeout) = request.extensions().get::<TimeoutExtension>() {
90 *reqwest_request.timeout_mut() = Some(timeout.get());
91 }
92
93 return Ok(reqwest_request);
94
95 struct RequestBodyWithCallbacks {
96 request: &'static mut SyncRequest<'static>,
97 user_cancelled_error: &'static mut Option<ResponseError>,
98 have_read: u64,
99 }
100
101 impl RequestBodyWithCallbacks {
102 fn new(request: &mut SyncRequest, user_cancelled_error: &mut Option<ResponseError>) -> Self {
103 #[allow(unsafe_code)]
104 Self {
105 have_read: 0,
106 request: unsafe { transmute(request) },
107 user_cancelled_error: unsafe { transmute(user_cancelled_error) },
108 }
109 }
110 }
111
112 impl Read for RequestBodyWithCallbacks {
113 fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
114 let n = self.request.body_mut().read(buf)?;
115 match n {
116 0 => Ok(0),
117 n => {
118 let buf = &buf[..n];
119 self.have_read += n as u64;
120 if let Some(on_uploading_progress) = self.request.on_uploading_progress() {
121 on_uploading_progress(TransferProgressInfo::new(
122 self.have_read,
123 self.request.body().size(),
124 buf,
125 ))
126 .map_err(|err| {
127 *self.user_cancelled_error = Some(make_callback_error(err, self.request));
128 IoError::new(IoErrorKind::Other, "on_uploading_progress() callback returns error")
129 })?;
130 }
131 Ok(n)
132 }
133 }
134 }
135 }
136}
137
138pub(super) fn make_user_agent(request: &RequestParts, suffix: &str) -> Result<HeaderValue, ResponseError> {
139 HeaderValue::from_str(&format!(
140 "{}/qiniu-reqwest-{}/{}",
141 request.user_agent(),
142 env!("CARGO_PKG_VERSION"),
143 suffix
144 ))
145 .map_err(|err| {
146 ResponseError::builder(ResponseErrorKind::InvalidHeader, err)
147 .uri(request.url())
148 .build()
149 })
150}
151
152fn from_sync_response(mut response: SyncReqwestResponse, request: &mut SyncRequest) -> SyncResponseResult {
153 call_response_callbacks(request, response.status(), response.headers())?;
154 let mut response_builder = SyncResponse::builder();
155 response_builder
156 .status_code(response.status())
157 .version(response.version())
158 .headers(take(response.headers_mut()))
159 .extensions(take(request.extensions_mut()));
160 if let Some(port) = response.url().port_or_known_default().and_then(NonZeroU16::new) {
161 response_builder.server_port(port);
162 }
163 if let Some(remote_addr) = response.remote_addr() {
164 response_builder.server_ip(remote_addr.ip());
165 if let Some(port) = NonZeroU16::new(remote_addr.port()) {
166 response_builder.server_port(port);
167 }
168 }
169 response_builder.body(SyncResponseBody::from_reader(response));
170 Ok(response_builder.build())
171}
172
173pub(super) fn from_reqwest_error(err: ReqwestError, request: &RequestParts) -> ResponseError {
174 if err.url().is_some() {
175 ResponseError::builder(ResponseErrorKind::InvalidUrl, err)
176 .uri(request.url())
177 .build()
178 } else if err.is_redirect() {
179 ResponseError::builder(ResponseErrorKind::TooManyRedirect, err)
180 .uri(request.url())
181 .build()
182 } else if err.is_timeout() {
183 ResponseError::builder(ResponseErrorKind::TimeoutError, err)
184 .uri(request.url())
185 .build()
186 } else if err.is_request() {
187 ResponseError::builder(ResponseErrorKind::InvalidRequestResponse, err)
188 .uri(request.url())
189 .build()
190 } else if err.is_connect() {
191 ResponseError::builder(ResponseErrorKind::ConnectError, err)
192 .uri(request.url())
193 .build()
194 } else {
195 ResponseError::builder(ResponseErrorKind::UnknownError, err)
196 .uri(request.url())
197 .build()
198 }
199}
200
201pub(super) fn call_response_callbacks(
202 request: &RequestParts,
203 status_code: StatusCode,
204 headers: &HeaderMap,
205) -> Result<(), ResponseError> {
206 if let Some(on_receive_response_status) = request.on_receive_response_status() {
207 on_receive_response_status(status_code).map_err(|err| make_callback_error(err, request))?;
208 }
209 if let Some(on_receive_response_header) = request.on_receive_response_header() {
210 headers.iter().try_for_each(|(header_name, header_value)| {
211 on_receive_response_header(header_name, header_value).map_err(|err| make_callback_error(err, request))
212 })?;
213 }
214 Ok(())
215}
216
217pub(super) fn make_callback_error(err: AnyError, request: &RequestParts) -> ResponseError {
218 ResponseError::builder(ResponseErrorKind::CallbackError, err)
219 .uri(request.url())
220 .build()
221}