qiniu_isahc/
client.rs

1use anyhow::Error as AnyError;
2use isahc::{
3    config::{Configurable, Dialer},
4    error::{Error as IsahcError, ErrorKind as IsahcErrorKind},
5    http::{header::USER_AGENT, request::Builder as IsahcRequestBuilder, uri::Scheme},
6    Body as IsahcBody, HttpClient as IsahcHttpClient, Metrics as IsahcMetrics, Response as IsahcResponse, ResponseExt,
7};
8use qiniu_http::{
9    HeaderValue, HttpCaller, Metrics, Request, RequestParts, ResponseError, ResponseErrorKind, SyncRequest,
10    SyncResponse, SyncResponseBody, SyncResponseResult, TransferProgressInfo, Uri,
11};
12use std::{
13    io::{Error as IoError, ErrorKind as IoErrorKind, Read, Result as IoResult},
14    mem::{take, transmute},
15    net::{IpAddr, SocketAddr},
16    num::NonZeroU16,
17};
18
19type IsahcSyncRequest = isahc::Request<IsahcBody>;
20type IsahcSyncResponse = isahc::Response<IsahcBody>;
21
22#[cfg(feature = "async")]
23use {
24    futures::{future::BoxFuture, ready, AsyncRead},
25    isahc::AsyncBody as IsahcAsyncBody,
26    qiniu_http::{AsyncRequest, AsyncResponse, AsyncResponseBody, AsyncResponseResult},
27    std::{
28        pin::Pin,
29        task::{Context, Poll},
30    },
31};
32
33#[cfg(feature = "async")]
34type IsahcAsyncRequest = isahc::Request<IsahcAsyncBody>;
35
36#[cfg(feature = "async")]
37type IsahcAsyncResponse = isahc::Response<IsahcAsyncBody>;
38
39/// Isahc 客户端
40#[derive(Debug, Clone)]
41pub struct Client {
42    isahc_client: IsahcHttpClient,
43}
44
45impl Client {
46    /// 创建 Isahc 客户端
47    #[inline]
48    pub fn new(isahc_client: IsahcHttpClient) -> Self {
49        Client { isahc_client }
50    }
51
52    /// 创建默认的 Isahc 客户端
53    #[inline]
54    pub fn default_client() -> Result<Self, IsahcError> {
55        Ok(Self::new(IsahcHttpClient::new()?))
56    }
57}
58
59impl From<IsahcHttpClient> for Client {
60    #[inline]
61    fn from(isahc_client: IsahcHttpClient) -> Self {
62        Self::new(isahc_client)
63    }
64}
65
66impl HttpCaller for Client {
67    fn call<'a>(&'a self, request: &'a mut SyncRequest<'_>) -> SyncResponseResult {
68        let mut user_cancelled_error: Option<ResponseError> = None;
69
70        let isahc_result = match request.resolved_ip_addrs().map(|ips| ips.to_owned()) {
71            Some(ips) if !ips.is_empty() => {
72                let mut last_result = None;
73                for ip in ips {
74                    let isahc_request = match make_sync_isahc_request(request, Some(ip), &mut user_cancelled_error) {
75                        Ok(request) => request,
76                        Err(err) => {
77                            last_result = Some(Err(err));
78                            break;
79                        }
80                    };
81                    match self.isahc_client.send(isahc_request) {
82                        Ok(isahc_response) => {
83                            last_result = Some(Ok(isahc_response));
84                            break;
85                        }
86                        Err(err) => {
87                            let should_retry = should_retry(&err);
88                            last_result = Some(Err(from_isahc_error(err, request)));
89                            if !should_retry {
90                                break;
91                            }
92                        }
93                    }
94                }
95                last_result.unwrap()
96            }
97            _ => {
98                let isahc_request = make_sync_isahc_request(request, None, &mut user_cancelled_error)?;
99                self.isahc_client
100                    .send(isahc_request)
101                    .map_err(|err| from_isahc_error(err, request))
102            }
103        };
104
105        match isahc_result {
106            Ok(isahc_response) => make_sync_response(isahc_response, request),
107            Err(err) => user_cancelled_error.map_or(Err(err), Err),
108        }
109    }
110
111    #[cfg(feature = "async")]
112    #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))]
113    fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
114        Box::pin(async move {
115            let mut user_cancelled_error: Option<ResponseError> = None;
116
117            let isahc_result = match request.resolved_ip_addrs().map(|ips| ips.to_owned()) {
118                Some(ips) if !ips.is_empty() => {
119                    let mut last_result = None;
120                    for ip in ips {
121                        let isahc_request = match make_async_isahc_request(request, Some(ip), &mut user_cancelled_error)
122                        {
123                            Ok(request) => request,
124                            Err(err) => {
125                                last_result = Some(Err(err));
126                                break;
127                            }
128                        };
129                        match self.isahc_client.send_async(isahc_request).await {
130                            Ok(isahc_response) => {
131                                last_result = Some(Ok(isahc_response));
132                                break;
133                            }
134                            Err(err) => {
135                                let should_retry = should_retry(&err);
136                                last_result = Some(Err(from_isahc_error(err, request)));
137                                if !should_retry {
138                                    break;
139                                }
140                            }
141                        }
142                    }
143                    last_result.unwrap()
144                }
145                _ => {
146                    let isahc_request = make_async_isahc_request(request, None, &mut user_cancelled_error)?;
147                    self.isahc_client
148                        .send_async(isahc_request)
149                        .await
150                        .map_err(|err| from_isahc_error(err, request))
151                }
152            };
153
154            match isahc_result {
155                Ok(isahc_response) => make_async_response(isahc_response, request),
156                Err(err) => user_cancelled_error.map_or(Err(err), Err),
157            }
158        })
159    }
160
161    #[inline]
162    fn is_resolved_ip_addrs_supported(&self) -> bool {
163        true
164    }
165
166    #[inline]
167    fn is_response_metrics_supported(&self) -> bool {
168        true
169    }
170}
171
172fn make_user_agent(request: &RequestParts) -> Result<HeaderValue, ResponseError> {
173    HeaderValue::from_str(&format!("{}/qiniu-{}", request.user_agent(), isahc::version(),)).map_err(|err| {
174        ResponseError::builder(ResponseErrorKind::InvalidHeader, err)
175            .uri(request.url())
176            .build()
177    })
178}
179
180fn make_sync_response(mut response: IsahcSyncResponse, request: &mut SyncRequest) -> SyncResponseResult {
181    call_response_callbacks(request, &response)?;
182
183    let mut response_builder = SyncResponse::builder();
184    response_builder
185        .status_code(response.status())
186        .version(response.version())
187        .headers(take(response.headers_mut()))
188        .extensions(take(request.extensions_mut()));
189    if let Some(remote_addr) = response.remote_addr() {
190        response_builder.server_ip(remote_addr.ip());
191        if let Some(port) = NonZeroU16::new(remote_addr.port()) {
192            response_builder.server_port(port);
193        }
194    }
195    if let Some(metrics) = response.metrics() {
196        response_builder.metrics(make_metrics_from_isahc(metrics));
197    }
198    response_builder.body(SyncResponseBody::from_reader(response.into_body()));
199    Ok(response_builder.build())
200}
201
202#[cfg(feature = "async")]
203fn make_async_response(mut response: IsahcAsyncResponse, request: &mut AsyncRequest) -> AsyncResponseResult {
204    call_response_callbacks(request, &response)?;
205
206    let mut response_builder = AsyncResponse::builder();
207    response_builder
208        .status_code(response.status())
209        .version(response.version())
210        .headers(take(response.headers_mut()))
211        .extensions(take(request.extensions_mut()));
212    if let Some(remote_addr) = response.remote_addr() {
213        response_builder.server_ip(remote_addr.ip());
214        if let Some(port) = NonZeroU16::new(remote_addr.port()) {
215            response_builder.server_port(port);
216        }
217    }
218    if let Some(metrics) = response.metrics() {
219        response_builder.metrics(make_metrics_from_isahc(metrics));
220    }
221    response_builder.body(AsyncResponseBody::from_reader(response.into_body()));
222    Ok(response_builder.build())
223}
224
225fn call_response_callbacks<ReqBody, RespBody>(
226    request: &Request<ReqBody>,
227    response: &IsahcResponse<RespBody>,
228) -> Result<(), ResponseError> {
229    if let Some(on_receive_response_status) = request.on_receive_response_status() {
230        on_receive_response_status(response.status()).map_err(|err| make_callback_error(err, request))?;
231    }
232    if let Some(on_receive_response_header) = request.on_receive_response_header() {
233        response.headers().iter().try_for_each(|(header_name, header_value)| {
234            on_receive_response_header(header_name, header_value).map_err(|err| make_callback_error(err, request))
235        })?;
236    }
237    Ok(())
238}
239
240fn make_callback_error(err: AnyError, request: &RequestParts) -> ResponseError {
241    ResponseError::builder(ResponseErrorKind::CallbackError, err)
242        .uri(request.url())
243        .build()
244}
245
246fn should_retry(err: &IsahcError) -> bool {
247    err.kind() == IsahcErrorKind::ConnectionFailed
248}
249
250fn make_metrics_from_isahc(metrics: &IsahcMetrics) -> Metrics {
251    Metrics::builder()
252        .total_duration(metrics.total_time())
253        .name_lookup_duration(metrics.name_lookup_time())
254        .connect_duration(metrics.connect_time())
255        .secure_connect_duration(metrics.redirect_time())
256        .redirect_duration(metrics.redirect_time())
257        .transfer_duration(metrics.transfer_time())
258        .build()
259}
260
261fn make_sync_isahc_request(
262    request: &mut SyncRequest,
263    ip_addr: Option<IpAddr>,
264    user_cancelled_error: &mut Option<ResponseError>,
265) -> Result<IsahcSyncRequest, ResponseError> {
266    let mut isahc_request_builder = isahc::Request::builder().uri(request.url()).method(request.method());
267    for (header_name, header_value) in request.headers() {
268        isahc_request_builder = isahc_request_builder.header(header_name, header_value);
269    }
270    isahc_request_builder = add_extensions_to_isahc_request_builder(request, ip_addr, isahc_request_builder)?;
271
272    isahc_request_builder = isahc_request_builder.header(USER_AGENT, make_user_agent(request)?);
273
274    let isahc_request = isahc_request_builder
275        .body(IsahcBody::from_reader_sized(
276            RequestBodyWithCallbacks::new(request, user_cancelled_error),
277            request.body().size(),
278        ))
279        .map_err(|err| {
280            ResponseError::builder(ResponseErrorKind::InvalidRequestResponse, err)
281                .uri(request.url())
282                .build()
283        })?;
284    return Ok(isahc_request);
285
286    struct RequestBodyWithCallbacks {
287        request: &'static mut SyncRequest<'static>,
288        user_cancelled_error: &'static mut Option<ResponseError>,
289        have_read: u64,
290    }
291
292    impl RequestBodyWithCallbacks {
293        fn new(request: &mut SyncRequest, user_cancelled_error: &mut Option<ResponseError>) -> Self {
294            #[allow(unsafe_code)]
295            Self {
296                have_read: 0,
297                request: unsafe { transmute(request) },
298                user_cancelled_error: unsafe { transmute(user_cancelled_error) },
299            }
300        }
301    }
302
303    impl Read for RequestBodyWithCallbacks {
304        fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
305            let n = self.request.body_mut().read(buf)?;
306            match n {
307                0 => Ok(0),
308                n => {
309                    let buf = &buf[..n];
310                    self.have_read += n as u64;
311                    if let Some(on_uploading_progress) = self.request.on_uploading_progress() {
312                        on_uploading_progress(TransferProgressInfo::new(
313                            self.have_read,
314                            self.request.body().size(),
315                            buf,
316                        ))
317                        .map_err(|err| {
318                            *self.user_cancelled_error = Some(make_callback_error(err, self.request));
319                            IoError::new(IoErrorKind::Other, "on_uploading_progress() callback returns error")
320                        })?;
321                    }
322                    Ok(n)
323                }
324            }
325        }
326    }
327}
328
329#[cfg(feature = "async")]
330fn make_async_isahc_request(
331    request: &mut AsyncRequest,
332    ip_addr: Option<IpAddr>,
333    user_cancelled_error: &mut Option<ResponseError>,
334) -> Result<IsahcAsyncRequest, ResponseError> {
335    use futures::pin_mut;
336
337    let mut isahc_request_builder = isahc::Request::builder().uri(request.url()).method(request.method());
338    for (header_name, header_value) in request.headers() {
339        isahc_request_builder = isahc_request_builder.header(header_name, header_value);
340    }
341    isahc_request_builder = add_extensions_to_isahc_request_builder(request, ip_addr, isahc_request_builder)?;
342    isahc_request_builder = isahc_request_builder.header(USER_AGENT, make_user_agent(request)?);
343
344    let isahc_request = isahc_request_builder
345        .body(IsahcAsyncBody::from_reader_sized(
346            RequestBodyWithCallbacks::new(request, user_cancelled_error),
347            request.body().size(),
348        ))
349        .map_err(|err| {
350            ResponseError::builder(ResponseErrorKind::InvalidRequestResponse, err)
351                .uri(request.url())
352                .build()
353        })?;
354    return Ok(isahc_request);
355
356    struct RequestBodyWithCallbacks {
357        request: &'static mut AsyncRequest<'static>,
358        user_cancelled_error: &'static mut Option<ResponseError>,
359        have_read: u64,
360    }
361
362    impl RequestBodyWithCallbacks {
363        fn new(request: &mut AsyncRequest, user_cancelled_error: &mut Option<ResponseError>) -> Self {
364            #[allow(unsafe_code)]
365            Self {
366                have_read: 0,
367                request: unsafe { transmute(request) },
368                user_cancelled_error: unsafe { transmute(user_cancelled_error) },
369            }
370        }
371    }
372
373    impl AsyncRead for RequestBodyWithCallbacks {
374        fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<IoResult<usize>> {
375            let request_mut = &mut self.as_mut().request;
376            let body = request_mut.body_mut();
377            pin_mut!(body);
378            match ready!(body.poll_read(cx, buf)) {
379                Err(err) => Poll::Ready(Err(err)),
380                Ok(0) => Poll::Ready(Ok(0)),
381                Ok(n) => {
382                    let buf = &buf[..n];
383                    self.as_mut().have_read += n as u64;
384                    if let Some(on_uploading_progress) = self.as_ref().request.on_uploading_progress() {
385                        if let Err(err) = on_uploading_progress(TransferProgressInfo::new(
386                            self.as_ref().have_read,
387                            self.as_ref().request.body().size(),
388                            buf,
389                        )) {
390                            *self.user_cancelled_error = Some(make_callback_error(err, self.request));
391                            return Poll::Ready(Err(IoError::new(
392                                IoErrorKind::Other,
393                                "on_uploading_progress() callback returns error",
394                            )));
395                        }
396                    }
397                    Poll::Ready(Ok(n))
398                }
399            }
400        }
401    }
402}
403
404fn add_extensions_to_isahc_request_builder(
405    request: &RequestParts,
406    ip_addr: Option<IpAddr>,
407    mut isahc_request_builder: IsahcRequestBuilder,
408) -> Result<IsahcRequestBuilder, ResponseError> {
409    use super::extensions::*;
410
411    isahc_request_builder = isahc_request_builder.metrics(true);
412
413    if let Some(extension) = request.extensions().get::<TimeoutRequestExtension>() {
414        isahc_request_builder = isahc_request_builder.timeout(extension.get().to_owned());
415    }
416
417    if let Some(extension) = request.extensions().get::<ConnectTimeoutRequestExtension>() {
418        isahc_request_builder = isahc_request_builder.connect_timeout(extension.get().to_owned());
419    }
420
421    if let Some(extension) = request.extensions().get::<LowSpeedTimeoutRequestExtension>() {
422        isahc_request_builder =
423            isahc_request_builder.low_speed_timeout(extension.get().0.to_owned(), extension.get().1.to_owned());
424    }
425
426    if let Some(extension) = request.extensions().get::<VersionNegotiationRequestExtension>() {
427        isahc_request_builder = isahc_request_builder.version_negotiation(extension.get().to_owned());
428    }
429
430    if let Some(extension) = request.extensions().get::<RedirectPolicyRequestExtension>() {
431        isahc_request_builder = isahc_request_builder.redirect_policy(extension.get().to_owned());
432    }
433
434    if request.extensions().get::<AutoRefererRequestExtension>().is_some() {
435        isahc_request_builder = isahc_request_builder.auto_referer();
436    }
437
438    if let Some(extension) = request.extensions().get::<AutomaticDecompressionRequestExtension>() {
439        isahc_request_builder = isahc_request_builder.automatic_decompression(extension.get().to_owned());
440    }
441
442    if let Some(extension) = request.extensions().get::<TcpKeepaliveRequestExtension>() {
443        isahc_request_builder = isahc_request_builder.tcp_keepalive(extension.get().to_owned());
444    }
445
446    if request.extensions().get::<TcpNodelayRequestExtension>().is_some() {
447        isahc_request_builder = isahc_request_builder.tcp_nodelay();
448    }
449
450    if let Some(extension) = request.extensions().get::<NetworkInterfaceRequestExtension>() {
451        isahc_request_builder = isahc_request_builder.interface(extension.get().to_owned());
452    }
453
454    if let Some(extension) = request.extensions().get::<IpVersionRequestExtension>() {
455        isahc_request_builder = isahc_request_builder.ip_version(extension.get().to_owned());
456    }
457
458    if let Some(extension) = request.extensions().get::<DialRequestExtension>() {
459        isahc_request_builder = isahc_request_builder.dial(extension.get().to_owned());
460    } else if let Some(ip_addr) = ip_addr {
461        isahc_request_builder = isahc_request_builder.dial(Dialer::ip_socket(SocketAddr::new(
462            ip_addr,
463            extract_port_for_uri(request.url())?,
464        )));
465    }
466
467    if let Some(extension) = request.extensions().get::<ProxyRequestExtension>() {
468        isahc_request_builder = isahc_request_builder.proxy(extension.get().to_owned());
469    }
470
471    if let Some(extension) = request.extensions().get::<ProxyBlacklistRequestExtension>() {
472        isahc_request_builder = isahc_request_builder.proxy_blacklist(extension.get().to_owned());
473    }
474
475    if let Some(extension) = request.extensions().get::<ProxyAuthenticationRequestExtension>() {
476        isahc_request_builder = isahc_request_builder.proxy_authentication(extension.get().to_owned());
477    }
478
479    if let Some(extension) = request.extensions().get::<ProxyCredentialsRequestExtension>() {
480        isahc_request_builder = isahc_request_builder.proxy_credentials(extension.get().to_owned());
481    }
482
483    if let Some(extension) = request.extensions().get::<MaxUploadSpeedRequestExtension>() {
484        isahc_request_builder = isahc_request_builder.max_upload_speed(extension.get().to_owned());
485    }
486
487    if let Some(extension) = request.extensions().get::<MaxDownloadSpeedRequestExtension>() {
488        isahc_request_builder = isahc_request_builder.max_download_speed(extension.get().to_owned());
489    }
490
491    if let Some(extension) = request.extensions().get::<SslClientCertificateRequestExtension>() {
492        isahc_request_builder = isahc_request_builder.ssl_client_certificate(extension.get().to_owned());
493    }
494
495    if let Some(extension) = request.extensions().get::<SslCaCertificateRequestExtension>() {
496        isahc_request_builder = isahc_request_builder.ssl_ca_certificate(extension.get().to_owned());
497    }
498
499    if let Some(extension) = request.extensions().get::<SslCiphersRequestExtension>() {
500        isahc_request_builder = isahc_request_builder.ssl_ciphers(extension.get().to_owned());
501    }
502
503    if let Some(extension) = request.extensions().get::<SslOptionsRequestExtension>() {
504        isahc_request_builder = isahc_request_builder.ssl_options(extension.get().to_owned());
505    }
506
507    if let Some(extension) = request.extensions().get::<TitleCaseHeadersRequestExtension>() {
508        isahc_request_builder = isahc_request_builder.title_case_headers(extension.get().to_owned());
509    }
510
511    return Ok(isahc_request_builder);
512
513    fn extract_port_for_uri(uri: &Uri) -> Result<u16, ResponseError> {
514        const INVALID_URL: ResponseErrorKind = ResponseErrorKind::InvalidUrl;
515        uri.port_u16().map(Ok).unwrap_or_else(|| {
516            if let Some(scheme) = uri.scheme() {
517                if scheme == &Scheme::HTTP {
518                    Ok(80)
519                } else if scheme == &Scheme::HTTPS {
520                    Ok(443)
521                } else {
522                    Err(ResponseError::builder_with_msg(INVALID_URL, "unknown port for url").build())
523                }
524            } else {
525                Err(ResponseError::builder_with_msg(INVALID_URL, "empty scheme for url").build())
526            }
527        })
528    }
529}
530
531fn from_isahc_error(err: IsahcError, request: &RequestParts) -> ResponseError {
532    let error_builder = match err.kind() {
533        IsahcErrorKind::BadClientCertificate => ResponseError::builder(ResponseErrorKind::ClientCertError, err),
534        IsahcErrorKind::BadServerCertificate => ResponseError::builder(ResponseErrorKind::ServerCertError, err),
535        IsahcErrorKind::ClientInitialization => ResponseError::builder(ResponseErrorKind::LocalIoError, err),
536        IsahcErrorKind::ConnectionFailed => ResponseError::builder(ResponseErrorKind::ConnectError, err),
537        IsahcErrorKind::InvalidContentEncoding => ResponseError::builder(ResponseErrorKind::InvalidHeader, err),
538        IsahcErrorKind::InvalidCredentials => ResponseError::builder(ResponseErrorKind::InvalidHeader, err),
539        IsahcErrorKind::InvalidRequest => ResponseError::builder(ResponseErrorKind::InvalidRequestResponse, err),
540        IsahcErrorKind::Io => ResponseError::builder(ResponseErrorKind::SendError, err),
541        IsahcErrorKind::NameResolution => ResponseError::builder(ResponseErrorKind::DnsServerError, err),
542        IsahcErrorKind::ProtocolViolation => ResponseError::builder(ResponseErrorKind::InvalidRequestResponse, err),
543        IsahcErrorKind::Timeout => ResponseError::builder(ResponseErrorKind::TimeoutError, err),
544        IsahcErrorKind::TlsEngine => ResponseError::builder(ResponseErrorKind::SslError, err),
545        IsahcErrorKind::TooManyRedirects => ResponseError::builder(ResponseErrorKind::TooManyRedirect, err),
546        _ => ResponseError::builder(ResponseErrorKind::UnknownError, err),
547    };
548    error_builder.uri(request.url()).build()
549}