1#![cfg_attr(feature = "docs", feature(doc_cfg))]
2#![deny(
3 single_use_lifetimes,
4 missing_debug_implementations,
5 large_assignments,
6 exported_private_dependencies,
7 absolute_paths_not_starting_with_crate,
8 anonymous_parameters,
9 explicit_outlives_requirements,
10 keyword_idents,
11 macro_use_extern_crate,
12 meta_variable_misuse,
13 missing_docs,
14 non_ascii_idents,
15 indirect_structural_match,
16 trivial_casts,
17 trivial_numeric_casts,
18 unreachable_pub,
19 unsafe_code,
20 unused_crate_dependencies,
21 unused_extern_crates,
22 unused_import_braces,
23 unused_lifetimes,
24 unused_qualifications
25)]
26
27mod client;
35mod extensions;
36
37pub use client::Client;
38pub use extensions::*;
39
40pub use isahc;
41pub use qiniu_http as http;
42
43#[cfg(test)]
44mod tests {
45 use super::*;
46 use bytes::Bytes;
47 use futures::channel::oneshot::channel;
48 use isahc::http::header::{CONTENT_LENGTH, USER_AGENT};
49 use md5::{Digest, Md5};
50 use qiniu_http::{HttpCaller, Method, OnProgressCallback, SyncRequest, SyncRequestBody, TransferProgressInfo};
51 use rand::{thread_rng, RngCore};
52 use std::{
53 io::{copy as io_copy, Read},
54 sync::{
55 atomic::{AtomicU64, Ordering::Relaxed},
56 Arc,
57 },
58 time::Duration,
59 };
60 use tokio::task::spawn_blocking;
61 use warp::{
62 filters::{body::bytes, method::post},
63 header::value as header_value,
64 http::header::HeaderValue,
65 path,
66 reply::Response,
67 Filter,
68 };
69
70 #[cfg(feature = "async")]
71 use {
72 futures::io::{copy as async_io_copy, AsyncReadExt},
73 qiniu_http::{AsyncRequest, AsyncRequestBody},
74 };
75
76 macro_rules! starts_with_server {
77 ($addr:ident, $routes:ident, $code:block) => {{
78 let (tx, rx) = channel();
79 let ($addr, server) = warp::serve($routes).bind_with_graceful_shutdown(([127, 0, 0, 1], 0), async move {
80 rx.await.ok();
81 });
82 let handler = tokio::spawn(server);
83 $code?;
84 tx.send(()).ok();
85 handler.await.ok();
86 }};
87 }
88
89 const BUF_LEN: usize = 1 << 20;
90 const MD5_LEN: usize = 16;
91
92 #[tokio::test]
93 async fn sync_http_test() -> anyhow::Result<()> {
94 env_logger::builder().is_test(true).try_init().ok();
95
96 let routes = path!("dir1" / "dir2" / "file")
97 .and(post())
98 .and(header_value(USER_AGENT.as_str()))
99 .and(bytes())
100 .map(|user_agent: HeaderValue, req_body: Bytes| {
101 assert_eq!(req_body.len(), BUF_LEN + MD5_LEN);
102 {
103 let mut hasher = Md5::new();
104 hasher.update(&req_body[..BUF_LEN]);
105 assert_eq!(hasher.finalize().as_slice(), &req_body[BUF_LEN..]);
106 }
107
108 assert!(user_agent.to_str().unwrap().starts_with("QiniuRust/"));
109 assert!(user_agent.to_str().unwrap().contains("/qiniu-isahc/"));
110
111 let mut resp_body = vec![0u8; BUF_LEN + MD5_LEN];
112 thread_rng().fill_bytes(&mut resp_body[..BUF_LEN]);
113 {
114 let mut hasher = Md5::new();
115 hasher.update(&resp_body[..BUF_LEN]);
116 resp_body[BUF_LEN..].copy_from_slice(hasher.finalize().as_slice());
117 }
118 Response::new(resp_body.into())
119 });
120 starts_with_server!(addr, routes, {
121 spawn_blocking(move || {
122 let mut request_body = vec![0u8; BUF_LEN + MD5_LEN];
123 thread_rng().fill_bytes(&mut request_body[..BUF_LEN]);
124 {
125 let mut hasher = Md5::new();
126 hasher.update(&request_body[..BUF_LEN]);
127 request_body[BUF_LEN..].copy_from_slice(hasher.finalize().as_slice());
128 }
129
130 let last_uploaded = Arc::new(AtomicU64::new(0));
131 let last_total = Arc::new(AtomicU64::new(0));
132 let mut response = {
133 let last_uploaded = last_uploaded.to_owned();
134 let last_total = last_total.to_owned();
135 let callback = move |info: TransferProgressInfo| {
136 last_uploaded.store(info.transferred_bytes(), Relaxed);
137 last_total.store(info.total_bytes(), Relaxed);
138 Ok(())
139 };
140 let resolved_ip_addrs = [addr.ip()];
141 let mut request = SyncRequest::builder()
142 .method(Method::POST)
143 .url(
144 format!("http://fakehost:{}/dir1/dir2/file", addr.port())
145 .parse()
146 .expect("invalid uri"),
147 )
148 .body(SyncRequestBody::from_referenced_bytes(&request_body))
149 .resolved_ip_addrs(resolved_ip_addrs.as_ref())
150 .on_uploading_progress(OnProgressCallback::reference(&callback))
151 .add_extension(TimeoutRequestExtension::new(Duration::from_secs(1)))
152 .build();
153 Client::default_client()?.call(&mut request)?
154 };
155 assert_eq!(
156 response.header(&CONTENT_LENGTH).map(|h| h.as_bytes()),
157 Some(format!("{}", BUF_LEN + MD5_LEN).as_bytes())
158 );
159 assert_eq!(last_uploaded.load(Relaxed), request_body.len() as u64);
160 assert_eq!(last_total.load(Relaxed), request_body.len() as u64);
161 assert_eq!(
162 response.extensions().get::<TimeoutRequestExtension>().unwrap().get(),
163 &Duration::from_secs(1)
164 );
165
166 {
167 let mut body_part = Vec::new();
168 let mut checksum_part = Vec::new();
169
170 assert_eq!(
171 io_copy(&mut response.body_mut().take(BUF_LEN as u64), &mut body_part)?,
172 BUF_LEN as u64
173 );
174 assert_eq!(
175 io_copy(&mut response.body_mut().take(MD5_LEN as u64), &mut checksum_part)?,
176 MD5_LEN as u64
177 );
178
179 let mut hasher = Md5::new();
180 hasher.update(&body_part);
181 assert_eq!(hasher.finalize().as_slice(), checksum_part.as_slice());
182 }
183 Ok::<_, anyhow::Error>(())
184 })
185 .await?
186 });
187
188 Ok(())
189 }
190
191 #[cfg(feature = "async")]
192 #[tokio::test]
193 async fn async_http_test() -> anyhow::Result<()> {
194 env_logger::builder().is_test(true).try_init().ok();
195
196 let routes = path!("dir1" / "dir2" / "file")
197 .and(post())
198 .and(header_value(USER_AGENT.as_str()))
199 .and(bytes())
200 .map(|user_agent: HeaderValue, req_body: Bytes| {
201 assert_eq!(req_body.len(), BUF_LEN + MD5_LEN);
202 {
203 let mut hasher = Md5::new();
204 hasher.update(&req_body[..BUF_LEN]);
205 assert_eq!(hasher.finalize().as_slice(), &req_body[BUF_LEN..]);
206 }
207
208 assert!(user_agent.to_str().unwrap().starts_with("QiniuRust/"));
209 assert!(user_agent.to_str().unwrap().contains("/qiniu-isahc/"));
210
211 let mut resp_body = vec![0u8; BUF_LEN + MD5_LEN];
212 thread_rng().fill_bytes(&mut resp_body[..BUF_LEN]);
213 {
214 let mut hasher = Md5::new();
215 hasher.update(&resp_body[..BUF_LEN]);
216 resp_body[BUF_LEN..].copy_from_slice(hasher.finalize().as_slice());
217 }
218 Response::new(resp_body.into())
219 });
220 starts_with_server!(addr, routes, {
221 let mut request_body = vec![0u8; BUF_LEN + MD5_LEN];
222 thread_rng().fill_bytes(&mut request_body[..BUF_LEN]);
223 {
224 let mut hasher = Md5::new();
225 hasher.update(&request_body[..BUF_LEN]);
226 request_body[BUF_LEN..].copy_from_slice(hasher.finalize().as_slice());
227 }
228 let last_uploaded = Arc::new(AtomicU64::new(0));
229 let last_total = Arc::new(AtomicU64::new(0));
230
231 let mut response = {
232 let last_uploaded = last_uploaded.to_owned();
233 let last_total = last_total.to_owned();
234 let callback = move |info: TransferProgressInfo| {
235 last_uploaded.store(info.transferred_bytes(), Relaxed);
236 last_total.store(info.total_bytes(), Relaxed);
237 Ok(())
238 };
239 let resolved_ip_addrs = [addr.ip()];
240 let mut request = AsyncRequest::builder()
241 .method(Method::POST)
242 .url(
243 format!("http://fakehost:{}/dir1/dir2/file", addr.port())
244 .parse()
245 .expect("invalid uri"),
246 )
247 .body(AsyncRequestBody::from_referenced_bytes(&request_body))
248 .resolved_ip_addrs(resolved_ip_addrs.as_ref())
249 .on_uploading_progress(OnProgressCallback::reference(&callback))
250 .add_extension(TimeoutRequestExtension::new(Duration::from_secs(1)))
251 .build();
252 Client::default_client()?.async_call(&mut request).await?
253 };
254 assert_eq!(
255 response.header(&CONTENT_LENGTH).map(|h| h.as_bytes()),
256 Some(format!("{}", BUF_LEN + MD5_LEN).as_bytes())
257 );
258 assert_eq!(last_uploaded.load(Relaxed), request_body.len() as u64);
259 assert_eq!(last_total.load(Relaxed), request_body.len() as u64);
260 assert_eq!(
261 response.extensions().get::<TimeoutRequestExtension>().unwrap().get(),
262 &Duration::from_secs(1)
263 );
264
265 {
266 let mut body_part = Vec::new();
267 let mut checksum_part = Vec::new();
268
269 assert_eq!(
270 async_io_copy(&mut response.body_mut().take(BUF_LEN as u64), &mut body_part).await?,
271 BUF_LEN as u64
272 );
273 assert_eq!(
274 async_io_copy(&mut response.body_mut().take(MD5_LEN as u64), &mut checksum_part).await?,
275 MD5_LEN as u64
276 );
277
278 let mut hasher = Md5::new();
279 hasher.update(&body_part);
280 assert_eq!(hasher.finalize().as_slice(), checksum_part.as_slice());
281 }
282 Ok::<_, anyhow::Error>(())
283 });
284
285 Ok(())
286 }
287}