1use anyhow::Error as AnyError;
2use isahc::{
3 config::{Configurable, Dialer},
4 error::{Error as IsahcError, ErrorKind as IsahcErrorKind},
5 http::{header::USER_AGENT, request::Builder as IsahcRequestBuilder, uri::Scheme},
6 Body as IsahcBody, HttpClient as IsahcHttpClient, Metrics as IsahcMetrics, Response as IsahcResponse, ResponseExt,
7};
8use qiniu_http::{
9 HeaderValue, HttpCaller, Metrics, Request, RequestParts, ResponseError, ResponseErrorKind, SyncRequest,
10 SyncResponse, SyncResponseBody, SyncResponseResult, TransferProgressInfo, Uri,
11};
12use std::{
13 io::{Error as IoError, ErrorKind as IoErrorKind, Read, Result as IoResult},
14 mem::{take, transmute},
15 net::{IpAddr, SocketAddr},
16 num::NonZeroU16,
17};
18
19type IsahcSyncRequest = isahc::Request<IsahcBody>;
20type IsahcSyncResponse = isahc::Response<IsahcBody>;
21
22#[cfg(feature = "async")]
23use {
24 futures::{future::BoxFuture, ready, AsyncRead},
25 isahc::AsyncBody as IsahcAsyncBody,
26 qiniu_http::{AsyncRequest, AsyncResponse, AsyncResponseBody, AsyncResponseResult},
27 std::{
28 pin::Pin,
29 task::{Context, Poll},
30 },
31};
32
33#[cfg(feature = "async")]
34type IsahcAsyncRequest = isahc::Request<IsahcAsyncBody>;
35
36#[cfg(feature = "async")]
37type IsahcAsyncResponse = isahc::Response<IsahcAsyncBody>;
38
39#[derive(Debug, Clone)]
41pub struct Client {
42 isahc_client: IsahcHttpClient,
43}
44
45impl Client {
46 #[inline]
48 pub fn new(isahc_client: IsahcHttpClient) -> Self {
49 Client { isahc_client }
50 }
51
52 #[inline]
54 pub fn default_client() -> Result<Self, IsahcError> {
55 Ok(Self::new(IsahcHttpClient::new()?))
56 }
57}
58
59impl From<IsahcHttpClient> for Client {
60 #[inline]
61 fn from(isahc_client: IsahcHttpClient) -> Self {
62 Self::new(isahc_client)
63 }
64}
65
66impl HttpCaller for Client {
67 fn call<'a>(&'a self, request: &'a mut SyncRequest<'_>) -> SyncResponseResult {
68 let mut user_cancelled_error: Option<ResponseError> = None;
69
70 let isahc_result = match request.resolved_ip_addrs().map(|ips| ips.to_owned()) {
71 Some(ips) if !ips.is_empty() => {
72 let mut last_result = None;
73 for ip in ips {
74 let isahc_request = match make_sync_isahc_request(request, Some(ip), &mut user_cancelled_error) {
75 Ok(request) => request,
76 Err(err) => {
77 last_result = Some(Err(err));
78 break;
79 }
80 };
81 match self.isahc_client.send(isahc_request) {
82 Ok(isahc_response) => {
83 last_result = Some(Ok(isahc_response));
84 break;
85 }
86 Err(err) => {
87 let should_retry = should_retry(&err);
88 last_result = Some(Err(from_isahc_error(err, request)));
89 if !should_retry {
90 break;
91 }
92 }
93 }
94 }
95 last_result.unwrap()
96 }
97 _ => {
98 let isahc_request = make_sync_isahc_request(request, None, &mut user_cancelled_error)?;
99 self.isahc_client
100 .send(isahc_request)
101 .map_err(|err| from_isahc_error(err, request))
102 }
103 };
104
105 match isahc_result {
106 Ok(isahc_response) => make_sync_response(isahc_response, request),
107 Err(err) => user_cancelled_error.map_or(Err(err), Err),
108 }
109 }
110
111 #[cfg(feature = "async")]
112 #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))]
113 fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
114 Box::pin(async move {
115 let mut user_cancelled_error: Option<ResponseError> = None;
116
117 let isahc_result = match request.resolved_ip_addrs().map(|ips| ips.to_owned()) {
118 Some(ips) if !ips.is_empty() => {
119 let mut last_result = None;
120 for ip in ips {
121 let isahc_request = match make_async_isahc_request(request, Some(ip), &mut user_cancelled_error)
122 {
123 Ok(request) => request,
124 Err(err) => {
125 last_result = Some(Err(err));
126 break;
127 }
128 };
129 match self.isahc_client.send_async(isahc_request).await {
130 Ok(isahc_response) => {
131 last_result = Some(Ok(isahc_response));
132 break;
133 }
134 Err(err) => {
135 let should_retry = should_retry(&err);
136 last_result = Some(Err(from_isahc_error(err, request)));
137 if !should_retry {
138 break;
139 }
140 }
141 }
142 }
143 last_result.unwrap()
144 }
145 _ => {
146 let isahc_request = make_async_isahc_request(request, None, &mut user_cancelled_error)?;
147 self.isahc_client
148 .send_async(isahc_request)
149 .await
150 .map_err(|err| from_isahc_error(err, request))
151 }
152 };
153
154 match isahc_result {
155 Ok(isahc_response) => make_async_response(isahc_response, request),
156 Err(err) => user_cancelled_error.map_or(Err(err), Err),
157 }
158 })
159 }
160
161 #[inline]
162 fn is_resolved_ip_addrs_supported(&self) -> bool {
163 true
164 }
165
166 #[inline]
167 fn is_response_metrics_supported(&self) -> bool {
168 true
169 }
170}
171
172fn make_user_agent(request: &RequestParts) -> Result<HeaderValue, ResponseError> {
173 HeaderValue::from_str(&format!("{}/qiniu-{}", request.user_agent(), isahc::version(),)).map_err(|err| {
174 ResponseError::builder(ResponseErrorKind::InvalidHeader, err)
175 .uri(request.url())
176 .build()
177 })
178}
179
180fn make_sync_response(mut response: IsahcSyncResponse, request: &mut SyncRequest) -> SyncResponseResult {
181 call_response_callbacks(request, &response)?;
182
183 let mut response_builder = SyncResponse::builder();
184 response_builder
185 .status_code(response.status())
186 .version(response.version())
187 .headers(take(response.headers_mut()))
188 .extensions(take(request.extensions_mut()));
189 if let Some(remote_addr) = response.remote_addr() {
190 response_builder.server_ip(remote_addr.ip());
191 if let Some(port) = NonZeroU16::new(remote_addr.port()) {
192 response_builder.server_port(port);
193 }
194 }
195 if let Some(metrics) = response.metrics() {
196 response_builder.metrics(make_metrics_from_isahc(metrics));
197 }
198 response_builder.body(SyncResponseBody::from_reader(response.into_body()));
199 Ok(response_builder.build())
200}
201
202#[cfg(feature = "async")]
203fn make_async_response(mut response: IsahcAsyncResponse, request: &mut AsyncRequest) -> AsyncResponseResult {
204 call_response_callbacks(request, &response)?;
205
206 let mut response_builder = AsyncResponse::builder();
207 response_builder
208 .status_code(response.status())
209 .version(response.version())
210 .headers(take(response.headers_mut()))
211 .extensions(take(request.extensions_mut()));
212 if let Some(remote_addr) = response.remote_addr() {
213 response_builder.server_ip(remote_addr.ip());
214 if let Some(port) = NonZeroU16::new(remote_addr.port()) {
215 response_builder.server_port(port);
216 }
217 }
218 if let Some(metrics) = response.metrics() {
219 response_builder.metrics(make_metrics_from_isahc(metrics));
220 }
221 response_builder.body(AsyncResponseBody::from_reader(response.into_body()));
222 Ok(response_builder.build())
223}
224
225fn call_response_callbacks<ReqBody, RespBody>(
226 request: &Request<ReqBody>,
227 response: &IsahcResponse<RespBody>,
228) -> Result<(), ResponseError> {
229 if let Some(on_receive_response_status) = request.on_receive_response_status() {
230 on_receive_response_status(response.status()).map_err(|err| make_callback_error(err, request))?;
231 }
232 if let Some(on_receive_response_header) = request.on_receive_response_header() {
233 response.headers().iter().try_for_each(|(header_name, header_value)| {
234 on_receive_response_header(header_name, header_value).map_err(|err| make_callback_error(err, request))
235 })?;
236 }
237 Ok(())
238}
239
240fn make_callback_error(err: AnyError, request: &RequestParts) -> ResponseError {
241 ResponseError::builder(ResponseErrorKind::CallbackError, err)
242 .uri(request.url())
243 .build()
244}
245
246fn should_retry(err: &IsahcError) -> bool {
247 err.kind() == IsahcErrorKind::ConnectionFailed
248}
249
250fn make_metrics_from_isahc(metrics: &IsahcMetrics) -> Metrics {
251 Metrics::builder()
252 .total_duration(metrics.total_time())
253 .name_lookup_duration(metrics.name_lookup_time())
254 .connect_duration(metrics.connect_time())
255 .secure_connect_duration(metrics.redirect_time())
256 .redirect_duration(metrics.redirect_time())
257 .transfer_duration(metrics.transfer_time())
258 .build()
259}
260
261fn make_sync_isahc_request(
262 request: &mut SyncRequest,
263 ip_addr: Option<IpAddr>,
264 user_cancelled_error: &mut Option<ResponseError>,
265) -> Result<IsahcSyncRequest, ResponseError> {
266 let mut isahc_request_builder = isahc::Request::builder().uri(request.url()).method(request.method());
267 for (header_name, header_value) in request.headers() {
268 isahc_request_builder = isahc_request_builder.header(header_name, header_value);
269 }
270 isahc_request_builder = add_extensions_to_isahc_request_builder(request, ip_addr, isahc_request_builder)?;
271
272 isahc_request_builder = isahc_request_builder.header(USER_AGENT, make_user_agent(request)?);
273
274 let isahc_request = isahc_request_builder
275 .body(IsahcBody::from_reader_sized(
276 RequestBodyWithCallbacks::new(request, user_cancelled_error),
277 request.body().size(),
278 ))
279 .map_err(|err| {
280 ResponseError::builder(ResponseErrorKind::InvalidRequestResponse, err)
281 .uri(request.url())
282 .build()
283 })?;
284 return Ok(isahc_request);
285
286 struct RequestBodyWithCallbacks {
287 request: &'static mut SyncRequest<'static>,
288 user_cancelled_error: &'static mut Option<ResponseError>,
289 have_read: u64,
290 }
291
292 impl RequestBodyWithCallbacks {
293 fn new(request: &mut SyncRequest, user_cancelled_error: &mut Option<ResponseError>) -> Self {
294 #[allow(unsafe_code)]
295 Self {
296 have_read: 0,
297 request: unsafe { transmute(request) },
298 user_cancelled_error: unsafe { transmute(user_cancelled_error) },
299 }
300 }
301 }
302
303 impl Read for RequestBodyWithCallbacks {
304 fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
305 let n = self.request.body_mut().read(buf)?;
306 match n {
307 0 => Ok(0),
308 n => {
309 let buf = &buf[..n];
310 self.have_read += n as u64;
311 if let Some(on_uploading_progress) = self.request.on_uploading_progress() {
312 on_uploading_progress(TransferProgressInfo::new(
313 self.have_read,
314 self.request.body().size(),
315 buf,
316 ))
317 .map_err(|err| {
318 *self.user_cancelled_error = Some(make_callback_error(err, self.request));
319 IoError::new(IoErrorKind::Other, "on_uploading_progress() callback returns error")
320 })?;
321 }
322 Ok(n)
323 }
324 }
325 }
326 }
327}
328
329#[cfg(feature = "async")]
330fn make_async_isahc_request(
331 request: &mut AsyncRequest,
332 ip_addr: Option<IpAddr>,
333 user_cancelled_error: &mut Option<ResponseError>,
334) -> Result<IsahcAsyncRequest, ResponseError> {
335 use futures::pin_mut;
336
337 let mut isahc_request_builder = isahc::Request::builder().uri(request.url()).method(request.method());
338 for (header_name, header_value) in request.headers() {
339 isahc_request_builder = isahc_request_builder.header(header_name, header_value);
340 }
341 isahc_request_builder = add_extensions_to_isahc_request_builder(request, ip_addr, isahc_request_builder)?;
342 isahc_request_builder = isahc_request_builder.header(USER_AGENT, make_user_agent(request)?);
343
344 let isahc_request = isahc_request_builder
345 .body(IsahcAsyncBody::from_reader_sized(
346 RequestBodyWithCallbacks::new(request, user_cancelled_error),
347 request.body().size(),
348 ))
349 .map_err(|err| {
350 ResponseError::builder(ResponseErrorKind::InvalidRequestResponse, err)
351 .uri(request.url())
352 .build()
353 })?;
354 return Ok(isahc_request);
355
356 struct RequestBodyWithCallbacks {
357 request: &'static mut AsyncRequest<'static>,
358 user_cancelled_error: &'static mut Option<ResponseError>,
359 have_read: u64,
360 }
361
362 impl RequestBodyWithCallbacks {
363 fn new(request: &mut AsyncRequest, user_cancelled_error: &mut Option<ResponseError>) -> Self {
364 #[allow(unsafe_code)]
365 Self {
366 have_read: 0,
367 request: unsafe { transmute(request) },
368 user_cancelled_error: unsafe { transmute(user_cancelled_error) },
369 }
370 }
371 }
372
373 impl AsyncRead for RequestBodyWithCallbacks {
374 fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<IoResult<usize>> {
375 let request_mut = &mut self.as_mut().request;
376 let body = request_mut.body_mut();
377 pin_mut!(body);
378 match ready!(body.poll_read(cx, buf)) {
379 Err(err) => Poll::Ready(Err(err)),
380 Ok(0) => Poll::Ready(Ok(0)),
381 Ok(n) => {
382 let buf = &buf[..n];
383 self.as_mut().have_read += n as u64;
384 if let Some(on_uploading_progress) = self.as_ref().request.on_uploading_progress() {
385 if let Err(err) = on_uploading_progress(TransferProgressInfo::new(
386 self.as_ref().have_read,
387 self.as_ref().request.body().size(),
388 buf,
389 )) {
390 *self.user_cancelled_error = Some(make_callback_error(err, self.request));
391 return Poll::Ready(Err(IoError::new(
392 IoErrorKind::Other,
393 "on_uploading_progress() callback returns error",
394 )));
395 }
396 }
397 Poll::Ready(Ok(n))
398 }
399 }
400 }
401 }
402}
403
404fn add_extensions_to_isahc_request_builder(
405 request: &RequestParts,
406 ip_addr: Option<IpAddr>,
407 mut isahc_request_builder: IsahcRequestBuilder,
408) -> Result<IsahcRequestBuilder, ResponseError> {
409 use super::extensions::*;
410
411 isahc_request_builder = isahc_request_builder.metrics(true);
412
413 if let Some(extension) = request.extensions().get::<TimeoutRequestExtension>() {
414 isahc_request_builder = isahc_request_builder.timeout(extension.get().to_owned());
415 }
416
417 if let Some(extension) = request.extensions().get::<ConnectTimeoutRequestExtension>() {
418 isahc_request_builder = isahc_request_builder.connect_timeout(extension.get().to_owned());
419 }
420
421 if let Some(extension) = request.extensions().get::<LowSpeedTimeoutRequestExtension>() {
422 isahc_request_builder =
423 isahc_request_builder.low_speed_timeout(extension.get().0.to_owned(), extension.get().1.to_owned());
424 }
425
426 if let Some(extension) = request.extensions().get::<VersionNegotiationRequestExtension>() {
427 isahc_request_builder = isahc_request_builder.version_negotiation(extension.get().to_owned());
428 }
429
430 if let Some(extension) = request.extensions().get::<RedirectPolicyRequestExtension>() {
431 isahc_request_builder = isahc_request_builder.redirect_policy(extension.get().to_owned());
432 }
433
434 if request.extensions().get::<AutoRefererRequestExtension>().is_some() {
435 isahc_request_builder = isahc_request_builder.auto_referer();
436 }
437
438 if let Some(extension) = request.extensions().get::<AutomaticDecompressionRequestExtension>() {
439 isahc_request_builder = isahc_request_builder.automatic_decompression(extension.get().to_owned());
440 }
441
442 if let Some(extension) = request.extensions().get::<TcpKeepaliveRequestExtension>() {
443 isahc_request_builder = isahc_request_builder.tcp_keepalive(extension.get().to_owned());
444 }
445
446 if request.extensions().get::<TcpNodelayRequestExtension>().is_some() {
447 isahc_request_builder = isahc_request_builder.tcp_nodelay();
448 }
449
450 if let Some(extension) = request.extensions().get::<NetworkInterfaceRequestExtension>() {
451 isahc_request_builder = isahc_request_builder.interface(extension.get().to_owned());
452 }
453
454 if let Some(extension) = request.extensions().get::<IpVersionRequestExtension>() {
455 isahc_request_builder = isahc_request_builder.ip_version(extension.get().to_owned());
456 }
457
458 if let Some(extension) = request.extensions().get::<DialRequestExtension>() {
459 isahc_request_builder = isahc_request_builder.dial(extension.get().to_owned());
460 } else if let Some(ip_addr) = ip_addr {
461 isahc_request_builder = isahc_request_builder.dial(Dialer::ip_socket(SocketAddr::new(
462 ip_addr,
463 extract_port_for_uri(request.url())?,
464 )));
465 }
466
467 if let Some(extension) = request.extensions().get::<ProxyRequestExtension>() {
468 isahc_request_builder = isahc_request_builder.proxy(extension.get().to_owned());
469 }
470
471 if let Some(extension) = request.extensions().get::<ProxyBlacklistRequestExtension>() {
472 isahc_request_builder = isahc_request_builder.proxy_blacklist(extension.get().to_owned());
473 }
474
475 if let Some(extension) = request.extensions().get::<ProxyAuthenticationRequestExtension>() {
476 isahc_request_builder = isahc_request_builder.proxy_authentication(extension.get().to_owned());
477 }
478
479 if let Some(extension) = request.extensions().get::<ProxyCredentialsRequestExtension>() {
480 isahc_request_builder = isahc_request_builder.proxy_credentials(extension.get().to_owned());
481 }
482
483 if let Some(extension) = request.extensions().get::<MaxUploadSpeedRequestExtension>() {
484 isahc_request_builder = isahc_request_builder.max_upload_speed(extension.get().to_owned());
485 }
486
487 if let Some(extension) = request.extensions().get::<MaxDownloadSpeedRequestExtension>() {
488 isahc_request_builder = isahc_request_builder.max_download_speed(extension.get().to_owned());
489 }
490
491 if let Some(extension) = request.extensions().get::<SslClientCertificateRequestExtension>() {
492 isahc_request_builder = isahc_request_builder.ssl_client_certificate(extension.get().to_owned());
493 }
494
495 if let Some(extension) = request.extensions().get::<SslCaCertificateRequestExtension>() {
496 isahc_request_builder = isahc_request_builder.ssl_ca_certificate(extension.get().to_owned());
497 }
498
499 if let Some(extension) = request.extensions().get::<SslCiphersRequestExtension>() {
500 isahc_request_builder = isahc_request_builder.ssl_ciphers(extension.get().to_owned());
501 }
502
503 if let Some(extension) = request.extensions().get::<SslOptionsRequestExtension>() {
504 isahc_request_builder = isahc_request_builder.ssl_options(extension.get().to_owned());
505 }
506
507 if let Some(extension) = request.extensions().get::<TitleCaseHeadersRequestExtension>() {
508 isahc_request_builder = isahc_request_builder.title_case_headers(extension.get().to_owned());
509 }
510
511 return Ok(isahc_request_builder);
512
513 fn extract_port_for_uri(uri: &Uri) -> Result<u16, ResponseError> {
514 const INVALID_URL: ResponseErrorKind = ResponseErrorKind::InvalidUrl;
515 uri.port_u16().map(Ok).unwrap_or_else(|| {
516 if let Some(scheme) = uri.scheme() {
517 if scheme == &Scheme::HTTP {
518 Ok(80)
519 } else if scheme == &Scheme::HTTPS {
520 Ok(443)
521 } else {
522 Err(ResponseError::builder_with_msg(INVALID_URL, "unknown port for url").build())
523 }
524 } else {
525 Err(ResponseError::builder_with_msg(INVALID_URL, "empty scheme for url").build())
526 }
527 })
528 }
529}
530
531fn from_isahc_error(err: IsahcError, request: &RequestParts) -> ResponseError {
532 let error_builder = match err.kind() {
533 IsahcErrorKind::BadClientCertificate => ResponseError::builder(ResponseErrorKind::ClientCertError, err),
534 IsahcErrorKind::BadServerCertificate => ResponseError::builder(ResponseErrorKind::ServerCertError, err),
535 IsahcErrorKind::ClientInitialization => ResponseError::builder(ResponseErrorKind::LocalIoError, err),
536 IsahcErrorKind::ConnectionFailed => ResponseError::builder(ResponseErrorKind::ConnectError, err),
537 IsahcErrorKind::InvalidContentEncoding => ResponseError::builder(ResponseErrorKind::InvalidHeader, err),
538 IsahcErrorKind::InvalidCredentials => ResponseError::builder(ResponseErrorKind::InvalidHeader, err),
539 IsahcErrorKind::InvalidRequest => ResponseError::builder(ResponseErrorKind::InvalidRequestResponse, err),
540 IsahcErrorKind::Io => ResponseError::builder(ResponseErrorKind::SendError, err),
541 IsahcErrorKind::NameResolution => ResponseError::builder(ResponseErrorKind::DnsServerError, err),
542 IsahcErrorKind::ProtocolViolation => ResponseError::builder(ResponseErrorKind::InvalidRequestResponse, err),
543 IsahcErrorKind::Timeout => ResponseError::builder(ResponseErrorKind::TimeoutError, err),
544 IsahcErrorKind::TlsEngine => ResponseError::builder(ResponseErrorKind::SslError, err),
545 IsahcErrorKind::TooManyRedirects => ResponseError::builder(ResponseErrorKind::TooManyRedirect, err),
546 _ => ResponseError::builder(ResponseErrorKind::UnknownError, err),
547 };
548 error_builder.uri(request.url()).build()
549}