1#![cfg_attr(feature = "docs", feature(doc_cfg))]
2#![deny(
3 single_use_lifetimes,
4 missing_debug_implementations,
5 large_assignments,
6 exported_private_dependencies,
7 absolute_paths_not_starting_with_crate,
8 anonymous_parameters,
9 explicit_outlives_requirements,
10 keyword_idents,
11 macro_use_extern_crate,
12 meta_variable_misuse,
13 missing_docs,
14 non_ascii_idents,
15 indirect_structural_match,
16 trivial_casts,
17 trivial_numeric_casts,
18 unreachable_pub,
19 unsafe_code,
20 unused_crate_dependencies,
21 unused_extern_crates,
22 unused_import_braces,
23 unused_lifetimes,
24 unused_qualifications
25)]
26
27mod client;
34mod extensions;
35
36pub use client::Client;
37pub use extensions::*;
38
39pub use qiniu_http as http;
40pub use ureq;
41
42#[cfg(feature = "async")]
43use std::{future::Future, pin::Pin};
44
45#[cfg(feature = "async")]
46type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a + Send>>;
47
48#[cfg(test)]
49mod tests {
50 use super::*;
51 use bytes::Bytes;
52 use futures::channel::oneshot::channel;
53 use md5::{Digest, Md5};
54 use qiniu_http::HttpCaller;
55 use qiniu_http::{
56 header::{CONTENT_LENGTH, USER_AGENT},
57 Method, OnProgressCallback, SyncRequest, SyncRequestBody, TransferProgressInfo,
58 };
59 use rand::{thread_rng, RngCore};
60 use std::{
61 io::{copy as io_copy, Read},
62 sync::{
63 atomic::{AtomicU64, Ordering::Relaxed},
64 Arc,
65 },
66 };
67 use tokio::task::spawn_blocking;
68 use warp::{
69 filters::{body::bytes, method::post},
70 header::value as header_value,
71 http::header::HeaderValue,
72 path,
73 reply::Response,
74 Filter,
75 };
76
77 macro_rules! starts_with_server {
78 ($addr:ident, $routes:ident, $code:block) => {{
79 let (tx, rx) = channel();
80 let ($addr, server) = warp::serve($routes).bind_with_graceful_shutdown(([127, 0, 0, 1], 0), async move {
81 rx.await.ok();
82 });
83 let handler = tokio::spawn(server);
84 $code?;
85 tx.send(()).ok();
86 handler.await.ok();
87 }};
88 }
89
90 const BUF_LEN: usize = 1 << 20;
91 const MD5_LEN: usize = 16;
92
93 #[tokio::test]
94 async fn sync_http_test() -> anyhow::Result<()> {
95 env_logger::builder().is_test(true).try_init().ok();
96
97 let routes = path!("dir1" / "dir2" / "file")
98 .and(post())
99 .and(header_value(USER_AGENT.as_str()))
100 .and(bytes())
101 .map(|user_agent: HeaderValue, req_body: Bytes| {
102 assert_eq!(req_body.len(), BUF_LEN + MD5_LEN);
103 {
104 let mut hasher = Md5::new();
105 hasher.update(&req_body[..BUF_LEN]);
106 assert_eq!(hasher.finalize().as_slice(), &req_body[BUF_LEN..]);
107 }
108
109 assert!(user_agent.to_str().unwrap().starts_with("QiniuRust/"));
110 assert!(user_agent.to_str().unwrap().ends_with("/qiniu-ureq"));
111
112 let mut resp_body = vec![0u8; BUF_LEN + MD5_LEN];
113 thread_rng().fill_bytes(&mut resp_body[..BUF_LEN]);
114 {
115 let mut hasher = Md5::new();
116 hasher.update(&resp_body[..BUF_LEN]);
117 resp_body[BUF_LEN..].copy_from_slice(hasher.finalize().as_slice());
118 }
119 Response::new(resp_body.into())
120 });
121 starts_with_server!(addr, routes, {
122 spawn_blocking(move || {
123 let mut request_body = vec![0u8; BUF_LEN + MD5_LEN];
124 thread_rng().fill_bytes(&mut request_body[..BUF_LEN]);
125 {
126 let mut hasher = Md5::new();
127 hasher.update(&request_body[..BUF_LEN]);
128 request_body[BUF_LEN..].copy_from_slice(hasher.finalize().as_slice());
129 }
130
131 let last_uploaded = Arc::new(AtomicU64::new(0));
132 let last_total = Arc::new(AtomicU64::new(0));
133 let mut response = {
134 let last_uploaded = last_uploaded.to_owned();
135 let last_total = last_total.to_owned();
136 let callback = move |info: TransferProgressInfo| {
137 last_uploaded.store(info.transferred_bytes(), Relaxed);
138 last_total.store(info.total_bytes(), Relaxed);
139 Ok(())
140 };
141 let mut request = SyncRequest::builder()
142 .method(Method::POST)
143 .url(format!("http://{addr}/dir1/dir2/file").parse().expect("invalid uri"))
144 .body(SyncRequestBody::from_referenced_bytes(&request_body))
145 .on_uploading_progress(OnProgressCallback::reference(&callback))
146 .build();
147 Client::default().call(&mut request)?
148 };
149 assert_eq!(
150 response.header(&CONTENT_LENGTH).map(|h| h.as_bytes()),
151 Some(format!("{}", BUF_LEN + MD5_LEN).as_bytes())
152 );
153 assert_eq!(last_uploaded.load(Relaxed), request_body.len() as u64);
154 assert_eq!(last_total.load(Relaxed), request_body.len() as u64);
155
156 {
157 let mut body_part = Vec::new();
158 let mut checksum_part = Vec::new();
159
160 assert_eq!(
161 io_copy(&mut response.body_mut().take(BUF_LEN as u64), &mut body_part)?,
162 BUF_LEN as u64
163 );
164 assert_eq!(
165 io_copy(&mut response.body_mut().take(MD5_LEN as u64), &mut checksum_part)?,
166 MD5_LEN as u64
167 );
168
169 let mut hasher = Md5::new();
170 hasher.update(&body_part);
171 assert_eq!(hasher.finalize().as_slice(), checksum_part.as_slice());
172 }
173 Ok::<_, anyhow::Error>(())
174 })
175 .await?
176 });
177
178 Ok(())
179 }
180}