qiniu_reqwest/
lib.rs

1#![cfg_attr(feature = "docs", feature(doc_cfg))]
2#![deny(
3    single_use_lifetimes,
4    missing_debug_implementations,
5    large_assignments,
6    exported_private_dependencies,
7    absolute_paths_not_starting_with_crate,
8    anonymous_parameters,
9    explicit_outlives_requirements,
10    keyword_idents,
11    macro_use_extern_crate,
12    meta_variable_misuse,
13    missing_docs,
14    non_ascii_idents,
15    indirect_structural_match,
16    trivial_casts,
17    trivial_numeric_casts,
18    unreachable_pub,
19    unsafe_code,
20    unused_crate_dependencies,
21    unused_extern_crates,
22    unused_import_braces,
23    unused_lifetimes,
24    unused_qualifications
25)]
26
27//! # qiniu-reqwest
28//!
29//! ## 七牛 Reqwest HTTP 客户端实现
30//!
31//! 基于 Reqwest 库提供 HTTP 客户端接口实现(分别实现阻塞接口和异步接口,异步实现则需要启用 `async` 功能)
32//!
33//! 需要注意的是,如果使用阻塞接口,则必须使用 `SyncClient`,而如果使用异步接口则必须使用 `AsyncClient`,二者不能混用。
34
35mod extensions;
36mod sync_client;
37
38#[cfg(feature = "async")]
39mod async_client;
40
41pub use extensions::*;
42pub use qiniu_http as http;
43pub use reqwest;
44pub use sync_client::SyncClient;
45
46#[cfg(feature = "async")]
47pub use async_client::AsyncClient;
48
49#[cfg(test)]
50mod tests {
51    use super::*;
52    use bytes::Bytes;
53    use futures::channel::oneshot::channel;
54    use md5::{Digest, Md5};
55    use qiniu_http::{HttpCaller, Method, SyncRequest, SyncRequestBody, TransferProgressInfo};
56    use rand::{thread_rng, RngCore};
57    use reqwest::header::{CONTENT_LENGTH, USER_AGENT};
58    use std::{
59        io::{copy as io_copy, Read},
60        sync::{
61            atomic::{AtomicU64, Ordering::Relaxed},
62            Arc,
63        },
64        time::Duration,
65    };
66    use tokio::task::spawn_blocking;
67    use warp::{
68        filters::{body::bytes, method::post},
69        header::value as header_value,
70        http::header::HeaderValue,
71        path,
72        reply::Response,
73        Filter,
74    };
75
76    #[cfg(feature = "async")]
77    use {
78        futures::io::{copy as async_io_copy, AsyncReadExt},
79        qiniu_http::{AsyncRequest, AsyncRequestBody, OnProgressCallback},
80    };
81
82    macro_rules! starts_with_server {
83        ($addr:ident, $routes:ident, $code:block) => {{
84            let (tx, rx) = channel();
85            let ($addr, server) = warp::serve($routes).bind_with_graceful_shutdown(([127, 0, 0, 1], 0), async move {
86                rx.await.ok();
87            });
88            let handler = tokio::spawn(server);
89            $code?;
90            tx.send(()).ok();
91            handler.await.ok();
92        }};
93    }
94
95    const BUF_LEN: usize = 1 << 20;
96    const MD5_LEN: usize = 16;
97
98    #[tokio::test]
99    async fn sync_http_test() -> anyhow::Result<()> {
100        env_logger::builder().is_test(true).try_init().ok();
101
102        let routes = path!("dir1" / "dir2" / "file")
103            .and(post())
104            .and(header_value(USER_AGENT.as_str()))
105            .and(bytes())
106            .map(|user_agent: HeaderValue, req_body: Bytes| {
107                assert_eq!(req_body.len(), BUF_LEN + MD5_LEN);
108                {
109                    let mut hasher = Md5::new();
110                    hasher.update(&req_body[..BUF_LEN]);
111                    assert_eq!(hasher.finalize().as_slice(), &req_body[BUF_LEN..]);
112                }
113
114                assert!(user_agent.as_bytes().starts_with(b"QiniuRust/"));
115                assert!(user_agent.as_bytes().ends_with(b"/sync"));
116
117                let mut resp_body = vec![0u8; BUF_LEN + MD5_LEN];
118                thread_rng().fill_bytes(&mut resp_body[..BUF_LEN]);
119                {
120                    let mut hasher = Md5::new();
121                    hasher.update(&resp_body[..BUF_LEN]);
122                    resp_body[BUF_LEN..].copy_from_slice(hasher.finalize().as_slice());
123                }
124                Response::new(resp_body.into())
125            });
126        starts_with_server!(addr, routes, {
127            spawn_blocking(move || {
128                let mut request_body = vec![0u8; BUF_LEN + MD5_LEN];
129                thread_rng().fill_bytes(&mut request_body[..BUF_LEN]);
130                {
131                    let mut hasher = Md5::new();
132                    hasher.update(&request_body[..BUF_LEN]);
133                    request_body[BUF_LEN..].copy_from_slice(hasher.finalize().as_slice());
134                }
135
136                let last_uploaded = Arc::new(AtomicU64::new(0));
137                let last_total = Arc::new(AtomicU64::new(0));
138                let mut response = {
139                    let last_uploaded = last_uploaded.to_owned();
140                    let last_total = last_total.to_owned();
141                    let callback = move |info: TransferProgressInfo| {
142                        last_uploaded.store(info.transferred_bytes(), Relaxed);
143                        last_total.store(info.total_bytes(), Relaxed);
144                        Ok(())
145                    };
146                    let mut request = SyncRequest::builder()
147                        .method(Method::POST)
148                        .url(format!("http://{addr}/dir1/dir2/file").parse().expect("invalid uri"))
149                        .body(SyncRequestBody::from_referenced_bytes(&request_body))
150                        .on_uploading_progress(&callback)
151                        .add_extension(TimeoutExtension::new(Duration::from_secs(1)))
152                        .build();
153                    SyncClient::default().call(&mut request)?
154                };
155                assert_eq!(
156                    response.header(&CONTENT_LENGTH).map(|h| h.as_bytes()),
157                    Some(format!("{}", BUF_LEN + MD5_LEN).as_bytes())
158                );
159                assert_eq!(last_uploaded.load(Relaxed), request_body.len() as u64);
160                assert_eq!(last_total.load(Relaxed), request_body.len() as u64);
161                assert_eq!(
162                    response.extensions().get::<TimeoutExtension>().unwrap().get(),
163                    Duration::from_secs(1)
164                );
165
166                {
167                    let mut body_part = Vec::new();
168                    let mut checksum_part = Vec::new();
169
170                    assert_eq!(
171                        io_copy(&mut response.body_mut().take(BUF_LEN as u64), &mut body_part)?,
172                        BUF_LEN as u64
173                    );
174                    assert_eq!(
175                        io_copy(&mut response.body_mut().take(MD5_LEN as u64), &mut checksum_part)?,
176                        MD5_LEN as u64
177                    );
178
179                    let mut hasher = Md5::new();
180                    hasher.update(&body_part);
181                    assert_eq!(hasher.finalize().as_slice(), checksum_part.as_slice());
182                }
183                Ok::<_, anyhow::Error>(())
184            })
185            .await?
186        });
187
188        Ok(())
189    }
190
191    #[cfg(feature = "async")]
192    #[tokio::test]
193    async fn async_http_test() -> anyhow::Result<()> {
194        env_logger::builder().is_test(true).try_init().ok();
195
196        let routes = path!("dir1" / "dir2" / "file")
197            .and(post())
198            .and(header_value(USER_AGENT.as_str()))
199            .and(bytes())
200            .map(|user_agent: HeaderValue, req_body: Bytes| {
201                assert_eq!(req_body.len(), BUF_LEN + MD5_LEN);
202                {
203                    let mut hasher = Md5::new();
204                    hasher.update(&req_body[..BUF_LEN]);
205                    assert_eq!(hasher.finalize().as_slice(), &req_body[BUF_LEN..]);
206                }
207
208                assert!(user_agent.as_bytes().starts_with(b"QiniuRust/"));
209                assert!(user_agent.as_bytes().ends_with(b"/async"));
210
211                let mut resp_body = vec![0u8; BUF_LEN + MD5_LEN];
212                thread_rng().fill_bytes(&mut resp_body[..BUF_LEN]);
213                {
214                    let mut hasher = Md5::new();
215                    hasher.update(&resp_body[..BUF_LEN]);
216                    resp_body[BUF_LEN..].copy_from_slice(hasher.finalize().as_slice());
217                }
218                Response::new(resp_body.into())
219            });
220        starts_with_server!(addr, routes, {
221            let mut request_body = vec![0u8; BUF_LEN + MD5_LEN];
222            thread_rng().fill_bytes(&mut request_body[..BUF_LEN]);
223            {
224                let mut hasher = Md5::new();
225                hasher.update(&request_body[..BUF_LEN]);
226                request_body[BUF_LEN..].copy_from_slice(hasher.finalize().as_slice());
227            }
228            let last_uploaded = Arc::new(AtomicU64::new(0));
229            let last_total = Arc::new(AtomicU64::new(0));
230
231            let mut response = {
232                let last_uploaded = last_uploaded.to_owned();
233                let last_total = last_total.to_owned();
234                let callback = move |info: TransferProgressInfo| {
235                    last_uploaded.store(info.transferred_bytes(), Relaxed);
236                    last_total.store(info.total_bytes(), Relaxed);
237                    Ok(())
238                };
239                let mut request = AsyncRequest::builder()
240                    .method(Method::POST)
241                    .url(format!("http://{addr}/dir1/dir2/file").parse().expect("invalid uri"))
242                    .body(AsyncRequestBody::from_referenced_bytes(&request_body))
243                    .on_uploading_progress(OnProgressCallback::reference(&callback))
244                    .add_extension(TimeoutExtension::new(Duration::from_secs(1)))
245                    .build();
246                AsyncClient::default().async_call(&mut request).await?
247            };
248            assert_eq!(
249                response.header(&CONTENT_LENGTH).map(|h| h.as_bytes()),
250                Some(format!("{}", BUF_LEN + MD5_LEN).as_bytes())
251            );
252            assert_eq!(last_uploaded.load(Relaxed), request_body.len() as u64);
253            assert_eq!(last_total.load(Relaxed), request_body.len() as u64);
254            assert_eq!(
255                response.extensions().get::<TimeoutExtension>().unwrap().get(),
256                Duration::from_secs(1)
257            );
258
259            {
260                let mut body_part = Vec::new();
261                let mut checksum_part = Vec::new();
262
263                assert_eq!(
264                    async_io_copy(&mut response.body_mut().take(BUF_LEN as u64), &mut body_part).await?,
265                    BUF_LEN as u64
266                );
267                assert_eq!(
268                    async_io_copy(&mut response.body_mut().take(MD5_LEN as u64), &mut checksum_part).await?,
269                    MD5_LEN as u64
270                );
271
272                let mut hasher = Md5::new();
273                hasher.update(&body_part);
274                assert_eq!(hasher.finalize().as_slice(), checksum_part.as_slice());
275            }
276            Ok::<_, anyhow::Error>(())
277        });
278
279        Ok(())
280    }
281}