qiniu_ureq/
client.rs

1use anyhow::Error as AnyError;
2use qiniu_http::{
3    header::{CONTENT_LENGTH, USER_AGENT},
4    HeaderName, HeaderValue, HttpCaller, RequestParts, ResponseError, ResponseErrorKind, StatusCode, SyncRequest,
5    SyncResponse, SyncResponseBody, SyncResponseResult, TransferProgressInfo, Version,
6};
7use std::{
8    fmt::{self, Display},
9    io::{Error as IoError, ErrorKind as IoErrorKind, Read, Result as IoResult},
10    mem::take,
11};
12use ureq::{Agent, Error as UreqError, ErrorKind as UreqErrorKind, Request as UreqRequest, Response as UreqResponse};
13
14#[cfg(feature = "async")]
15use {
16    super::BoxFuture,
17    qiniu_http::{AsyncRequest, AsyncResponseResult},
18};
19
20/// Ureq 客户端
21#[derive(Debug, Clone)]
22pub struct Client {
23    client: Agent,
24}
25
26impl Client {
27    /// 创建 Ureq 客户端
28    #[inline]
29    pub fn new(client: Agent) -> Self {
30        Self { client }
31    }
32}
33
34impl From<Agent> for Client {
35    #[inline]
36    fn from(agent: Agent) -> Self {
37        Self::new(agent)
38    }
39}
40
41impl Default for Client {
42    #[inline]
43    fn default() -> Self {
44        Self { client: ureq::agent() }
45    }
46}
47
48impl HttpCaller for Client {
49    fn call(&self, request: &mut SyncRequest<'_>) -> SyncResponseResult {
50        let mut user_cancelled_error: Option<ResponseError> = None;
51
52        let ureq_request = make_ureq_request(&self.client, request)?;
53        match ureq_request.send(RequestBodyWithCallbacks::new(request, &mut user_cancelled_error)) {
54            Ok(response) => make_ureq_sync_response(response, request),
55            Err(err) => {
56                let kind = err.kind();
57                match err {
58                    UreqError::Status(_, response) => make_ureq_sync_response(response, request),
59                    UreqError::Transport(transport) => user_cancelled_error
60                        .map_or_else(|| Err(from_ureq_error(kind, AnyError::new(transport), request)), Err),
61                }
62            }
63        }
64    }
65
66    #[inline]
67    #[cfg(feature = "async")]
68    #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))]
69    fn async_call<'a>(&'a self, _request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
70        unimplemented!("http_ureq::Client does not support async call")
71    }
72}
73
74fn make_user_agent(request: &RequestParts) -> Result<HeaderValue, ResponseError> {
75    let user_agent = format!("{}/qiniu-ureq", request.user_agent());
76    HeaderValue::from_str(&user_agent).map_err(|err| build_header_value_error(request, &user_agent, &err))
77}
78
79fn make_ureq_request(agent: &Agent, request: &SyncRequest) -> Result<UreqRequest, ResponseError> {
80    let mut request_builder = agent.request(request.method().as_str(), &request.url().to_string());
81    for (header_name, header_value) in request.headers() {
82        request_builder = set_header_for_request_builder(request_builder, request, header_name, header_value)?;
83    }
84    request_builder =
85        set_header_for_request_builder(request_builder, request, &USER_AGENT, &make_user_agent(request)?)?;
86    request_builder = request_builder.set(CONTENT_LENGTH.as_str(), &request.body().size().to_string());
87    request_builder = add_extensions_to_request_builder(request, request_builder);
88    Ok(request_builder)
89}
90
91fn make_ureq_sync_response(response: UreqResponse, request: &mut SyncRequest) -> SyncResponseResult {
92    call_response_callbacks(request, &response)?;
93
94    let mut response_builder = SyncResponse::builder();
95    response_builder
96        .status_code(status_code_of_response(&response, request)?)
97        .version(parse_http_version(response.http_version(), request)?)
98        .extensions(take(request.extensions_mut()));
99    for header_name_str in response.headers_names().into_iter() {
100        if let Some(header_value_str) = response.header(&header_name_str) {
101            let header_name = HeaderName::from_bytes(header_name_str.as_bytes())
102                .map_err(|err| build_header_name_error(request, &header_name_str, &err))?;
103            let header_value = HeaderValue::from_bytes(header_value_str.as_bytes())
104                .map_err(|err| build_header_value_error(request, header_value_str, &err))?;
105            response_builder.header(header_name, header_value);
106        }
107    }
108    response_builder.body(SyncResponseBody::from_reader(ResponseReaderWrapper(
109        response.into_reader(),
110    )));
111    return Ok(response_builder.build());
112
113    struct ResponseReaderWrapper<R>(R);
114
115    impl<R: Read> Read for ResponseReaderWrapper<R> {
116        #[inline]
117        fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
118            self.0.read(buf)
119        }
120    }
121
122    impl<R> fmt::Debug for ResponseReaderWrapper<R> {
123        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124            f.debug_tuple("ResponseReaderWrapper").finish()
125        }
126    }
127}
128
129fn add_extensions_to_request_builder(request: &RequestParts, mut request_builder: UreqRequest) -> UreqRequest {
130    use super::extensions::TimeoutExtension;
131
132    if let Some(extension) = request.extensions().get::<TimeoutExtension>() {
133        request_builder = request_builder.timeout(extension.get());
134    }
135
136    request_builder
137}
138
139fn call_response_callbacks(request: &RequestParts, response: &UreqResponse) -> Result<(), ResponseError> {
140    if let Some(on_receive_response_status) = request.on_receive_response_status() {
141        on_receive_response_status(status_code_of_response(response, request)?)
142            .map_err(|err| build_on_receive_response_status_error(request, err))?;
143    }
144    if let Some(on_receive_response_header) = request.on_receive_response_header() {
145        for header_name_str in response.headers_names().into_iter() {
146            if let Some(header_value_str) = response.header(&header_name_str) {
147                let header_name = HeaderName::from_bytes(header_name_str.as_bytes())
148                    .map_err(|err| build_header_name_error(request, &header_name_str, &err))?;
149                let header_value = HeaderValue::from_bytes(header_value_str.as_bytes())
150                    .map_err(|err| build_header_value_error(request, header_value_str, &err))?;
151                on_receive_response_header(&header_name, &header_value)
152                    .map_err(|err| build_on_receive_response_header_error(request, err))?
153            }
154        }
155    }
156    Ok(())
157}
158
159fn build_on_receive_response_status_error(request: &RequestParts, err: AnyError) -> ResponseError {
160    ResponseError::builder(ResponseErrorKind::CallbackError, err)
161        .uri(request.url())
162        .build()
163}
164
165fn build_on_receive_response_header_error(request: &RequestParts, err: AnyError) -> ResponseError {
166    ResponseError::builder(ResponseErrorKind::CallbackError, err)
167        .uri(request.url())
168        .build()
169}
170
171fn build_status_code_error(request: &RequestParts, code: u16, err: &dyn Display) -> ResponseError {
172    ResponseError::builder_with_msg(
173        ResponseErrorKind::InvalidRequestResponse,
174        format!("invalid status code({code}): {err}"),
175    )
176    .uri(request.url())
177    .build()
178}
179
180fn build_header_name_error(request: &RequestParts, header_name: &str, err: &dyn Display) -> ResponseError {
181    ResponseError::builder_with_msg(
182        ResponseErrorKind::InvalidHeader,
183        format!("invalid header name({header_name}): {err}"),
184    )
185    .uri(request.url())
186    .build()
187}
188
189fn build_header_value_error(request: &RequestParts, header_value: &str, err: &dyn Display) -> ResponseError {
190    ResponseError::builder_with_msg(
191        ResponseErrorKind::InvalidHeader,
192        format!("invalid header value({header_value}): {err}"),
193    )
194    .uri(request.url())
195    .build()
196}
197
198fn convert_header_value_error(request: &RequestParts, header_value: &HeaderValue, err: &dyn Display) -> ResponseError {
199    ResponseError::builder_with_msg(
200        ResponseErrorKind::InvalidHeader,
201        format!("invalid header value({header_value:?}): {err}"),
202    )
203    .uri(request.url())
204    .build()
205}
206
207fn set_header_for_request_builder(
208    request_builder: UreqRequest,
209    request: &RequestParts,
210    header_name: &HeaderName,
211    header_value: &HeaderValue,
212) -> Result<UreqRequest, ResponseError> {
213    Ok(request_builder.set(
214        header_name.as_str(),
215        header_value
216            .to_str()
217            .map_err(|err| convert_header_value_error(request, header_value, &err))?,
218    ))
219}
220
221fn status_code_of_response(response: &UreqResponse, request: &RequestParts) -> Result<StatusCode, ResponseError> {
222    StatusCode::from_u16(response.status()).map_err(|err| build_status_code_error(request, response.status(), &err))
223}
224
225fn parse_http_version(version: &str, request: &RequestParts) -> Result<Version, ResponseError> {
226    match version {
227        "HTTP/0.9" => Ok(Version::HTTP_09),
228        "HTTP/1.0" => Ok(Version::HTTP_10),
229        "HTTP/1.1" => Ok(Version::HTTP_11),
230        "HTTP/2.0" => Ok(Version::HTTP_2),
231        "HTTP/3.0" => Ok(Version::HTTP_3),
232        _ => Err(ResponseError::builder_with_msg(
233            ResponseErrorKind::InvalidRequestResponse,
234            format!("invalid http version: {version}"),
235        )
236        .uri(request.url())
237        .build()),
238    }
239}
240
241fn from_ureq_error(kind: UreqErrorKind, err: AnyError, request: &RequestParts) -> ResponseError {
242    let response_error_kind = match kind {
243        UreqErrorKind::InvalidUrl => ResponseErrorKind::InvalidUrl,
244        UreqErrorKind::UnknownScheme => ResponseErrorKind::InvalidUrl,
245        UreqErrorKind::Dns => ResponseErrorKind::DnsServerError,
246        UreqErrorKind::ConnectionFailed => ResponseErrorKind::ConnectError,
247        UreqErrorKind::TooManyRedirects => ResponseErrorKind::TooManyRedirect,
248        UreqErrorKind::BadStatus => ResponseErrorKind::InvalidRequestResponse,
249        UreqErrorKind::BadHeader => ResponseErrorKind::InvalidHeader,
250        UreqErrorKind::Io => ResponseErrorKind::LocalIoError,
251        UreqErrorKind::InvalidProxyUrl => ResponseErrorKind::ProxyError,
252        UreqErrorKind::ProxyConnect => ResponseErrorKind::ProxyError,
253        UreqErrorKind::ProxyUnauthorized => ResponseErrorKind::ProxyError,
254        UreqErrorKind::HTTP => ResponseErrorKind::InvalidRequestResponse,
255        UreqErrorKind::InsecureRequestHttpsOnly => ResponseErrorKind::SslError,
256    };
257    ResponseError::builder(response_error_kind, err)
258        .uri(request.url())
259        .build()
260}
261
262struct RequestBodyWithCallbacks<'a, 'r> {
263    request: &'a mut SyncRequest<'r>,
264    have_read: u64,
265    user_cancelled_error: &'a mut Option<ResponseError>,
266}
267
268impl<'a, 'r> RequestBodyWithCallbacks<'a, 'r> {
269    fn new(request: &'a mut SyncRequest<'r>, user_cancelled_error: &'a mut Option<ResponseError>) -> Self {
270        Self {
271            request,
272            user_cancelled_error,
273            have_read: 0,
274        }
275    }
276}
277
278impl Read for RequestBodyWithCallbacks<'_, '_> {
279    fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
280        let n = self.request.body_mut().read(buf)?;
281        match n {
282            0 => Ok(0),
283            n => {
284                self.have_read += n as u64;
285                let buf = &buf[..n];
286                if let Some(on_uploading_progress) = self.request.on_uploading_progress() {
287                    on_uploading_progress(TransferProgressInfo::new(
288                        self.have_read,
289                        self.request.body().size(),
290                        buf,
291                    ))
292                    .map_err(|err| {
293                        *self.user_cancelled_error = Some(
294                            ResponseError::builder(ResponseErrorKind::CallbackError, err)
295                                .uri(self.request.url())
296                                .build(),
297                        );
298                        IoError::new(IoErrorKind::Other, "on_uploading_progress() callback returns error")
299                    })?;
300                }
301                Ok(n)
302            }
303        }
304    }
305}