1#![cfg_attr(feature = "docs", feature(doc_cfg))]
2#![deny(
3 single_use_lifetimes,
4 missing_debug_implementations,
5 large_assignments,
6 exported_private_dependencies,
7 absolute_paths_not_starting_with_crate,
8 anonymous_parameters,
9 explicit_outlives_requirements,
10 keyword_idents,
11 macro_use_extern_crate,
12 meta_variable_misuse,
13 missing_docs,
14 non_ascii_idents,
15 indirect_structural_match,
16 trivial_casts,
17 trivial_numeric_casts,
18 unreachable_pub,
19 unsafe_code,
20 unused_crate_dependencies,
21 unused_extern_crates,
22 unused_import_braces,
23 unused_lifetimes,
24 unused_qualifications
25)]
26
27mod extensions;
36mod sync_client;
37
38#[cfg(feature = "async")]
39mod async_client;
40
41pub use extensions::*;
42pub use qiniu_http as http;
43pub use reqwest;
44pub use sync_client::SyncClient;
45
46#[cfg(feature = "async")]
47pub use async_client::AsyncClient;
48
49#[cfg(test)]
50mod tests {
51 use super::*;
52 use bytes::Bytes;
53 use futures::channel::oneshot::channel;
54 use md5::{Digest, Md5};
55 use qiniu_http::{HttpCaller, Method, SyncRequest, SyncRequestBody, TransferProgressInfo};
56 use rand::{thread_rng, RngCore};
57 use reqwest::header::{CONTENT_LENGTH, USER_AGENT};
58 use std::{
59 io::{copy as io_copy, Read},
60 sync::{
61 atomic::{AtomicU64, Ordering::Relaxed},
62 Arc,
63 },
64 time::Duration,
65 };
66 use tokio::task::spawn_blocking;
67 use warp::{
68 filters::{body::bytes, method::post},
69 header::value as header_value,
70 http::header::HeaderValue,
71 path,
72 reply::Response,
73 Filter,
74 };
75
76 #[cfg(feature = "async")]
77 use {
78 futures::io::{copy as async_io_copy, AsyncReadExt},
79 qiniu_http::{AsyncRequest, AsyncRequestBody, OnProgressCallback},
80 };
81
82 macro_rules! starts_with_server {
83 ($addr:ident, $routes:ident, $code:block) => {{
84 let (tx, rx) = channel();
85 let ($addr, server) = warp::serve($routes).bind_with_graceful_shutdown(([127, 0, 0, 1], 0), async move {
86 rx.await.ok();
87 });
88 let handler = tokio::spawn(server);
89 $code?;
90 tx.send(()).ok();
91 handler.await.ok();
92 }};
93 }
94
95 const BUF_LEN: usize = 1 << 20;
96 const MD5_LEN: usize = 16;
97
98 #[tokio::test]
99 async fn sync_http_test() -> anyhow::Result<()> {
100 env_logger::builder().is_test(true).try_init().ok();
101
102 let routes = path!("dir1" / "dir2" / "file")
103 .and(post())
104 .and(header_value(USER_AGENT.as_str()))
105 .and(bytes())
106 .map(|user_agent: HeaderValue, req_body: Bytes| {
107 assert_eq!(req_body.len(), BUF_LEN + MD5_LEN);
108 {
109 let mut hasher = Md5::new();
110 hasher.update(&req_body[..BUF_LEN]);
111 assert_eq!(hasher.finalize().as_slice(), &req_body[BUF_LEN..]);
112 }
113
114 assert!(user_agent.as_bytes().starts_with(b"QiniuRust/"));
115 assert!(user_agent.as_bytes().ends_with(b"/sync"));
116
117 let mut resp_body = vec![0u8; BUF_LEN + MD5_LEN];
118 thread_rng().fill_bytes(&mut resp_body[..BUF_LEN]);
119 {
120 let mut hasher = Md5::new();
121 hasher.update(&resp_body[..BUF_LEN]);
122 resp_body[BUF_LEN..].copy_from_slice(hasher.finalize().as_slice());
123 }
124 Response::new(resp_body.into())
125 });
126 starts_with_server!(addr, routes, {
127 spawn_blocking(move || {
128 let mut request_body = vec![0u8; BUF_LEN + MD5_LEN];
129 thread_rng().fill_bytes(&mut request_body[..BUF_LEN]);
130 {
131 let mut hasher = Md5::new();
132 hasher.update(&request_body[..BUF_LEN]);
133 request_body[BUF_LEN..].copy_from_slice(hasher.finalize().as_slice());
134 }
135
136 let last_uploaded = Arc::new(AtomicU64::new(0));
137 let last_total = Arc::new(AtomicU64::new(0));
138 let mut response = {
139 let last_uploaded = last_uploaded.to_owned();
140 let last_total = last_total.to_owned();
141 let callback = move |info: TransferProgressInfo| {
142 last_uploaded.store(info.transferred_bytes(), Relaxed);
143 last_total.store(info.total_bytes(), Relaxed);
144 Ok(())
145 };
146 let mut request = SyncRequest::builder()
147 .method(Method::POST)
148 .url(format!("http://{addr}/dir1/dir2/file").parse().expect("invalid uri"))
149 .body(SyncRequestBody::from_referenced_bytes(&request_body))
150 .on_uploading_progress(&callback)
151 .add_extension(TimeoutExtension::new(Duration::from_secs(1)))
152 .build();
153 SyncClient::default().call(&mut request)?
154 };
155 assert_eq!(
156 response.header(&CONTENT_LENGTH).map(|h| h.as_bytes()),
157 Some(format!("{}", BUF_LEN + MD5_LEN).as_bytes())
158 );
159 assert_eq!(last_uploaded.load(Relaxed), request_body.len() as u64);
160 assert_eq!(last_total.load(Relaxed), request_body.len() as u64);
161 assert_eq!(
162 response.extensions().get::<TimeoutExtension>().unwrap().get(),
163 Duration::from_secs(1)
164 );
165
166 {
167 let mut body_part = Vec::new();
168 let mut checksum_part = Vec::new();
169
170 assert_eq!(
171 io_copy(&mut response.body_mut().take(BUF_LEN as u64), &mut body_part)?,
172 BUF_LEN as u64
173 );
174 assert_eq!(
175 io_copy(&mut response.body_mut().take(MD5_LEN as u64), &mut checksum_part)?,
176 MD5_LEN as u64
177 );
178
179 let mut hasher = Md5::new();
180 hasher.update(&body_part);
181 assert_eq!(hasher.finalize().as_slice(), checksum_part.as_slice());
182 }
183 Ok::<_, anyhow::Error>(())
184 })
185 .await?
186 });
187
188 Ok(())
189 }
190
191 #[cfg(feature = "async")]
192 #[tokio::test]
193 async fn async_http_test() -> anyhow::Result<()> {
194 env_logger::builder().is_test(true).try_init().ok();
195
196 let routes = path!("dir1" / "dir2" / "file")
197 .and(post())
198 .and(header_value(USER_AGENT.as_str()))
199 .and(bytes())
200 .map(|user_agent: HeaderValue, req_body: Bytes| {
201 assert_eq!(req_body.len(), BUF_LEN + MD5_LEN);
202 {
203 let mut hasher = Md5::new();
204 hasher.update(&req_body[..BUF_LEN]);
205 assert_eq!(hasher.finalize().as_slice(), &req_body[BUF_LEN..]);
206 }
207
208 assert!(user_agent.as_bytes().starts_with(b"QiniuRust/"));
209 assert!(user_agent.as_bytes().ends_with(b"/async"));
210
211 let mut resp_body = vec![0u8; BUF_LEN + MD5_LEN];
212 thread_rng().fill_bytes(&mut resp_body[..BUF_LEN]);
213 {
214 let mut hasher = Md5::new();
215 hasher.update(&resp_body[..BUF_LEN]);
216 resp_body[BUF_LEN..].copy_from_slice(hasher.finalize().as_slice());
217 }
218 Response::new(resp_body.into())
219 });
220 starts_with_server!(addr, routes, {
221 let mut request_body = vec![0u8; BUF_LEN + MD5_LEN];
222 thread_rng().fill_bytes(&mut request_body[..BUF_LEN]);
223 {
224 let mut hasher = Md5::new();
225 hasher.update(&request_body[..BUF_LEN]);
226 request_body[BUF_LEN..].copy_from_slice(hasher.finalize().as_slice());
227 }
228 let last_uploaded = Arc::new(AtomicU64::new(0));
229 let last_total = Arc::new(AtomicU64::new(0));
230
231 let mut response = {
232 let last_uploaded = last_uploaded.to_owned();
233 let last_total = last_total.to_owned();
234 let callback = move |info: TransferProgressInfo| {
235 last_uploaded.store(info.transferred_bytes(), Relaxed);
236 last_total.store(info.total_bytes(), Relaxed);
237 Ok(())
238 };
239 let mut request = AsyncRequest::builder()
240 .method(Method::POST)
241 .url(format!("http://{addr}/dir1/dir2/file").parse().expect("invalid uri"))
242 .body(AsyncRequestBody::from_referenced_bytes(&request_body))
243 .on_uploading_progress(OnProgressCallback::reference(&callback))
244 .add_extension(TimeoutExtension::new(Duration::from_secs(1)))
245 .build();
246 AsyncClient::default().async_call(&mut request).await?
247 };
248 assert_eq!(
249 response.header(&CONTENT_LENGTH).map(|h| h.as_bytes()),
250 Some(format!("{}", BUF_LEN + MD5_LEN).as_bytes())
251 );
252 assert_eq!(last_uploaded.load(Relaxed), request_body.len() as u64);
253 assert_eq!(last_total.load(Relaxed), request_body.len() as u64);
254 assert_eq!(
255 response.extensions().get::<TimeoutExtension>().unwrap().get(),
256 Duration::from_secs(1)
257 );
258
259 {
260 let mut body_part = Vec::new();
261 let mut checksum_part = Vec::new();
262
263 assert_eq!(
264 async_io_copy(&mut response.body_mut().take(BUF_LEN as u64), &mut body_part).await?,
265 BUF_LEN as u64
266 );
267 assert_eq!(
268 async_io_copy(&mut response.body_mut().take(MD5_LEN as u64), &mut checksum_part).await?,
269 MD5_LEN as u64
270 );
271
272 let mut hasher = Md5::new();
273 hasher.update(&body_part);
274 assert_eq!(hasher.finalize().as_slice(), checksum_part.as_slice());
275 }
276 Ok::<_, anyhow::Error>(())
277 });
278
279 Ok(())
280 }
281}