qiniu_ureq/
lib.rs

1#![cfg_attr(feature = "docs", feature(doc_cfg))]
2#![deny(
3    single_use_lifetimes,
4    missing_debug_implementations,
5    large_assignments,
6    exported_private_dependencies,
7    absolute_paths_not_starting_with_crate,
8    anonymous_parameters,
9    explicit_outlives_requirements,
10    keyword_idents,
11    macro_use_extern_crate,
12    meta_variable_misuse,
13    missing_docs,
14    non_ascii_idents,
15    indirect_structural_match,
16    trivial_casts,
17    trivial_numeric_casts,
18    unreachable_pub,
19    unsafe_code,
20    unused_crate_dependencies,
21    unused_extern_crates,
22    unused_import_braces,
23    unused_lifetimes,
24    unused_qualifications
25)]
26
27//! # qiniu-ureq
28//!
29//! ## 七牛 Ureq HTTP 客户端实现
30//!
31//! 基于 Ureq 库提供 HTTP 客户端接口实现,仅提供阻塞接口的实现,不提供异步接口的实现。
32
33mod client;
34mod extensions;
35
36pub use client::Client;
37pub use extensions::*;
38
39pub use qiniu_http as http;
40pub use ureq;
41
42#[cfg(feature = "async")]
43use std::{future::Future, pin::Pin};
44
45#[cfg(feature = "async")]
46type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a + Send>>;
47
48#[cfg(test)]
49mod tests {
50    use super::*;
51    use bytes::Bytes;
52    use futures::channel::oneshot::channel;
53    use md5::{Digest, Md5};
54    use qiniu_http::HttpCaller;
55    use qiniu_http::{
56        header::{CONTENT_LENGTH, USER_AGENT},
57        Method, OnProgressCallback, SyncRequest, SyncRequestBody, TransferProgressInfo,
58    };
59    use rand::{thread_rng, RngCore};
60    use std::{
61        io::{copy as io_copy, Read},
62        sync::{
63            atomic::{AtomicU64, Ordering::Relaxed},
64            Arc,
65        },
66    };
67    use tokio::task::spawn_blocking;
68    use warp::{
69        filters::{body::bytes, method::post},
70        header::value as header_value,
71        http::header::HeaderValue,
72        path,
73        reply::Response,
74        Filter,
75    };
76
77    macro_rules! starts_with_server {
78        ($addr:ident, $routes:ident, $code:block) => {{
79            let (tx, rx) = channel();
80            let ($addr, server) = warp::serve($routes).bind_with_graceful_shutdown(([127, 0, 0, 1], 0), async move {
81                rx.await.ok();
82            });
83            let handler = tokio::spawn(server);
84            $code?;
85            tx.send(()).ok();
86            handler.await.ok();
87        }};
88    }
89
90    const BUF_LEN: usize = 1 << 20;
91    const MD5_LEN: usize = 16;
92
93    #[tokio::test]
94    async fn sync_http_test() -> anyhow::Result<()> {
95        env_logger::builder().is_test(true).try_init().ok();
96
97        let routes = path!("dir1" / "dir2" / "file")
98            .and(post())
99            .and(header_value(USER_AGENT.as_str()))
100            .and(bytes())
101            .map(|user_agent: HeaderValue, req_body: Bytes| {
102                assert_eq!(req_body.len(), BUF_LEN + MD5_LEN);
103                {
104                    let mut hasher = Md5::new();
105                    hasher.update(&req_body[..BUF_LEN]);
106                    assert_eq!(hasher.finalize().as_slice(), &req_body[BUF_LEN..]);
107                }
108
109                assert!(user_agent.to_str().unwrap().starts_with("QiniuRust/"));
110                assert!(user_agent.to_str().unwrap().ends_with("/qiniu-ureq"));
111
112                let mut resp_body = vec![0u8; BUF_LEN + MD5_LEN];
113                thread_rng().fill_bytes(&mut resp_body[..BUF_LEN]);
114                {
115                    let mut hasher = Md5::new();
116                    hasher.update(&resp_body[..BUF_LEN]);
117                    resp_body[BUF_LEN..].copy_from_slice(hasher.finalize().as_slice());
118                }
119                Response::new(resp_body.into())
120            });
121        starts_with_server!(addr, routes, {
122            spawn_blocking(move || {
123                let mut request_body = vec![0u8; BUF_LEN + MD5_LEN];
124                thread_rng().fill_bytes(&mut request_body[..BUF_LEN]);
125                {
126                    let mut hasher = Md5::new();
127                    hasher.update(&request_body[..BUF_LEN]);
128                    request_body[BUF_LEN..].copy_from_slice(hasher.finalize().as_slice());
129                }
130
131                let last_uploaded = Arc::new(AtomicU64::new(0));
132                let last_total = Arc::new(AtomicU64::new(0));
133                let mut response = {
134                    let last_uploaded = last_uploaded.to_owned();
135                    let last_total = last_total.to_owned();
136                    let callback = move |info: TransferProgressInfo| {
137                        last_uploaded.store(info.transferred_bytes(), Relaxed);
138                        last_total.store(info.total_bytes(), Relaxed);
139                        Ok(())
140                    };
141                    let mut request = SyncRequest::builder()
142                        .method(Method::POST)
143                        .url(format!("http://{addr}/dir1/dir2/file").parse().expect("invalid uri"))
144                        .body(SyncRequestBody::from_referenced_bytes(&request_body))
145                        .on_uploading_progress(OnProgressCallback::reference(&callback))
146                        .build();
147                    Client::default().call(&mut request)?
148                };
149                assert_eq!(
150                    response.header(&CONTENT_LENGTH).map(|h| h.as_bytes()),
151                    Some(format!("{}", BUF_LEN + MD5_LEN).as_bytes())
152                );
153                assert_eq!(last_uploaded.load(Relaxed), request_body.len() as u64);
154                assert_eq!(last_total.load(Relaxed), request_body.len() as u64);
155
156                {
157                    let mut body_part = Vec::new();
158                    let mut checksum_part = Vec::new();
159
160                    assert_eq!(
161                        io_copy(&mut response.body_mut().take(BUF_LEN as u64), &mut body_part)?,
162                        BUF_LEN as u64
163                    );
164                    assert_eq!(
165                        io_copy(&mut response.body_mut().take(MD5_LEN as u64), &mut checksum_part)?,
166                        MD5_LEN as u64
167                    );
168
169                    let mut hasher = Md5::new();
170                    hasher.update(&body_part);
171                    assert_eq!(hasher.finalize().as_slice(), checksum_part.as_slice());
172                }
173                Ok::<_, anyhow::Error>(())
174            })
175            .await?
176        });
177
178        Ok(())
179    }
180}