Struct qiniu_reqwest::TimeoutExtension
source · pub struct TimeoutExtension(_);
Expand description
请求超时时长扩展
Implementations§
source§impl TimeoutExtension
impl TimeoutExtension
sourcepub fn get(&self) -> Duration
pub fn get(&self) -> Duration
获取请求超时时长扩展的值
Examples found in repository?
src/sync_client.rs (line 90)
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
fn make_sync_reqwest_request(
request: &mut SyncRequest,
user_cancelled_error: &mut Option<ResponseError>,
) -> Result<SyncReqwestRequest, ResponseError> {
let url = Url::parse(&request.url().to_string()).map_err(|err| {
ResponseError::builder(ResponseErrorKind::InvalidUrl, err)
.uri(request.url())
.build()
})?;
let mut reqwest_request = SyncReqwestRequest::new(request.method().to_owned(), url);
for (header_name, header_value) in request.headers() {
reqwest_request
.headers_mut()
.insert(header_name, header_value.to_owned());
}
reqwest_request
.headers_mut()
.insert(USER_AGENT, make_user_agent(request, "sync")?);
*reqwest_request.body_mut() = Some(SyncBody::sized(
RequestBodyWithCallbacks::new(request, user_cancelled_error),
request.body().size(),
));
if let Some(timeout) = request.extensions().get::<TimeoutExtension>() {
*reqwest_request.timeout_mut() = Some(timeout.get());
}
return Ok(reqwest_request);
struct RequestBodyWithCallbacks {
request: &'static mut SyncRequest<'static>,
user_cancelled_error: &'static mut Option<ResponseError>,
have_read: u64,
}
impl RequestBodyWithCallbacks {
fn new(request: &mut SyncRequest, user_cancelled_error: &mut Option<ResponseError>) -> Self {
#[allow(unsafe_code)]
Self {
have_read: 0,
request: unsafe { transmute(request) },
user_cancelled_error: unsafe { transmute(user_cancelled_error) },
}
}
}
impl Read for RequestBodyWithCallbacks {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
let n = self.request.body_mut().read(buf)?;
match n {
0 => Ok(0),
n => {
let buf = &buf[..n];
self.have_read += n as u64;
if let Some(on_uploading_progress) = self.request.on_uploading_progress() {
on_uploading_progress(TransferProgressInfo::new(
self.have_read,
self.request.body().size(),
buf,
))
.map_err(|err| {
*self.user_cancelled_error = Some(make_callback_error(err, self.request));
IoError::new(IoErrorKind::Other, "on_uploading_progress() callback returns error")
})?;
}
Ok(n)
}
}
}
}
}
More examples
src/async_client.rs (line 90)
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
fn make_async_reqwest_request(
request: &mut AsyncRequest,
user_cancelled_error: &mut Option<ResponseError>,
) -> Result<AsyncReqwestRequest, ResponseError> {
let url = Url::parse(&request.url().to_string()).map_err(|err| {
ResponseError::builder(ResponseErrorKind::InvalidUrl, err)
.uri(request.url())
.build()
})?;
let mut reqwest_request = AsyncReqwestRequest::new(request.method().to_owned(), url);
for (header_name, header_value) in request.headers() {
reqwest_request
.headers_mut()
.insert(header_name, header_value.to_owned());
}
reqwest_request
.headers_mut()
.insert(USER_AGENT, make_user_agent(request, "async")?);
*reqwest_request.body_mut() = Some(AsyncBody::wrap_stream(RequestBodyWithCallbacks::new(
request,
user_cancelled_error,
)));
if let Some(timeout) = request.extensions().get::<TimeoutExtension>() {
*reqwest_request.timeout_mut() = Some(timeout.get());
}
return Ok(reqwest_request);
struct RequestBodyWithCallbacks {
request: &'static mut AsyncRequest<'static>,
have_read: u64,
user_cancelled_error: &'static mut Option<ResponseError>,
}
impl RequestBodyWithCallbacks {
fn new(request: &mut AsyncRequest, user_cancelled_error: &mut Option<ResponseError>) -> Self {
#[allow(unsafe_code)]
Self {
have_read: 0,
request: unsafe { transmute(request) },
user_cancelled_error: unsafe { transmute(user_cancelled_error) },
}
}
}
impl Stream for RequestBodyWithCallbacks {
type Item = Result<Vec<u8>, Box<dyn Error + Send + Sync>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
const BUF_LEN: usize = 32 * 1024;
let mut buf = [0u8; BUF_LEN];
match ready!(Pin::new(&mut self.request.body_mut()).poll_read(cx, &mut buf)) {
Err(err) => Poll::Ready(Some(Err(Box::new(err)))),
Ok(0) => Poll::Ready(None),
Ok(n) => {
let buf = &buf[..n];
self.have_read += n as u64;
if let Some(on_uploading_progress) = self.request.on_uploading_progress() {
if let Err(err) = on_uploading_progress(TransferProgressInfo::new(
self.have_read,
self.request.body().size(),
buf,
)) {
*self.user_cancelled_error = Some(make_callback_error(err, self.request));
return Poll::Ready(Some(Err(Box::new(IoError::new(
IoErrorKind::Other,
"on_uploading_progress() callback returns error",
)))));
}
}
Poll::Ready(Some(Ok(buf.to_vec())))
}
}
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
(self.have_read as usize, Some(self.request.body().size() as usize))
}
}
}