qiniu_isahc/
lib.rs

1#![cfg_attr(feature = "docs", feature(doc_cfg))]
2#![deny(
3    single_use_lifetimes,
4    missing_debug_implementations,
5    large_assignments,
6    exported_private_dependencies,
7    absolute_paths_not_starting_with_crate,
8    anonymous_parameters,
9    explicit_outlives_requirements,
10    keyword_idents,
11    macro_use_extern_crate,
12    meta_variable_misuse,
13    missing_docs,
14    non_ascii_idents,
15    indirect_structural_match,
16    trivial_casts,
17    trivial_numeric_casts,
18    unreachable_pub,
19    unsafe_code,
20    unused_crate_dependencies,
21    unused_extern_crates,
22    unused_import_braces,
23    unused_lifetimes,
24    unused_qualifications
25)]
26
27//! # qiniu-isahc
28//!
29//! ## 七牛 Isahc HTTP 客户端实现
30//!
31//! 基于 Isahc 库提供 HTTP 客户端接口实现(同时实现阻塞接口和异步接口,异步实现则需要启用 `async` 功能)
32//! 不过由于 Isahc 库本身核心是使用异步接口实现,所以即使不启用 `async` 功能,也会引入异步相关库。
33
34mod client;
35mod extensions;
36
37pub use client::Client;
38pub use extensions::*;
39
40pub use isahc;
41pub use qiniu_http as http;
42
43#[cfg(test)]
44mod tests {
45    use super::*;
46    use bytes::Bytes;
47    use futures::channel::oneshot::channel;
48    use isahc::http::header::{CONTENT_LENGTH, USER_AGENT};
49    use md5::{Digest, Md5};
50    use qiniu_http::{HttpCaller, Method, OnProgressCallback, SyncRequest, SyncRequestBody, TransferProgressInfo};
51    use rand::{thread_rng, RngCore};
52    use std::{
53        io::{copy as io_copy, Read},
54        sync::{
55            atomic::{AtomicU64, Ordering::Relaxed},
56            Arc,
57        },
58        time::Duration,
59    };
60    use tokio::task::spawn_blocking;
61    use warp::{
62        filters::{body::bytes, method::post},
63        header::value as header_value,
64        http::header::HeaderValue,
65        path,
66        reply::Response,
67        Filter,
68    };
69
70    #[cfg(feature = "async")]
71    use {
72        futures::io::{copy as async_io_copy, AsyncReadExt},
73        qiniu_http::{AsyncRequest, AsyncRequestBody},
74    };
75
76    macro_rules! starts_with_server {
77        ($addr:ident, $routes:ident, $code:block) => {{
78            let (tx, rx) = channel();
79            let ($addr, server) = warp::serve($routes).bind_with_graceful_shutdown(([127, 0, 0, 1], 0), async move {
80                rx.await.ok();
81            });
82            let handler = tokio::spawn(server);
83            $code?;
84            tx.send(()).ok();
85            handler.await.ok();
86        }};
87    }
88
89    const BUF_LEN: usize = 1 << 20;
90    const MD5_LEN: usize = 16;
91
92    #[tokio::test]
93    async fn sync_http_test() -> anyhow::Result<()> {
94        env_logger::builder().is_test(true).try_init().ok();
95
96        let routes = path!("dir1" / "dir2" / "file")
97            .and(post())
98            .and(header_value(USER_AGENT.as_str()))
99            .and(bytes())
100            .map(|user_agent: HeaderValue, req_body: Bytes| {
101                assert_eq!(req_body.len(), BUF_LEN + MD5_LEN);
102                {
103                    let mut hasher = Md5::new();
104                    hasher.update(&req_body[..BUF_LEN]);
105                    assert_eq!(hasher.finalize().as_slice(), &req_body[BUF_LEN..]);
106                }
107
108                assert!(user_agent.to_str().unwrap().starts_with("QiniuRust/"));
109                assert!(user_agent.to_str().unwrap().contains("/qiniu-isahc/"));
110
111                let mut resp_body = vec![0u8; BUF_LEN + MD5_LEN];
112                thread_rng().fill_bytes(&mut resp_body[..BUF_LEN]);
113                {
114                    let mut hasher = Md5::new();
115                    hasher.update(&resp_body[..BUF_LEN]);
116                    resp_body[BUF_LEN..].copy_from_slice(hasher.finalize().as_slice());
117                }
118                Response::new(resp_body.into())
119            });
120        starts_with_server!(addr, routes, {
121            spawn_blocking(move || {
122                let mut request_body = vec![0u8; BUF_LEN + MD5_LEN];
123                thread_rng().fill_bytes(&mut request_body[..BUF_LEN]);
124                {
125                    let mut hasher = Md5::new();
126                    hasher.update(&request_body[..BUF_LEN]);
127                    request_body[BUF_LEN..].copy_from_slice(hasher.finalize().as_slice());
128                }
129
130                let last_uploaded = Arc::new(AtomicU64::new(0));
131                let last_total = Arc::new(AtomicU64::new(0));
132                let mut response = {
133                    let last_uploaded = last_uploaded.to_owned();
134                    let last_total = last_total.to_owned();
135                    let callback = move |info: TransferProgressInfo| {
136                        last_uploaded.store(info.transferred_bytes(), Relaxed);
137                        last_total.store(info.total_bytes(), Relaxed);
138                        Ok(())
139                    };
140                    let resolved_ip_addrs = [addr.ip()];
141                    let mut request = SyncRequest::builder()
142                        .method(Method::POST)
143                        .url(
144                            format!("http://fakehost:{}/dir1/dir2/file", addr.port())
145                                .parse()
146                                .expect("invalid uri"),
147                        )
148                        .body(SyncRequestBody::from_referenced_bytes(&request_body))
149                        .resolved_ip_addrs(resolved_ip_addrs.as_ref())
150                        .on_uploading_progress(OnProgressCallback::reference(&callback))
151                        .add_extension(TimeoutRequestExtension::new(Duration::from_secs(1)))
152                        .build();
153                    Client::default_client()?.call(&mut request)?
154                };
155                assert_eq!(
156                    response.header(&CONTENT_LENGTH).map(|h| h.as_bytes()),
157                    Some(format!("{}", BUF_LEN + MD5_LEN).as_bytes())
158                );
159                assert_eq!(last_uploaded.load(Relaxed), request_body.len() as u64);
160                assert_eq!(last_total.load(Relaxed), request_body.len() as u64);
161                assert_eq!(
162                    response.extensions().get::<TimeoutRequestExtension>().unwrap().get(),
163                    &Duration::from_secs(1)
164                );
165
166                {
167                    let mut body_part = Vec::new();
168                    let mut checksum_part = Vec::new();
169
170                    assert_eq!(
171                        io_copy(&mut response.body_mut().take(BUF_LEN as u64), &mut body_part)?,
172                        BUF_LEN as u64
173                    );
174                    assert_eq!(
175                        io_copy(&mut response.body_mut().take(MD5_LEN as u64), &mut checksum_part)?,
176                        MD5_LEN as u64
177                    );
178
179                    let mut hasher = Md5::new();
180                    hasher.update(&body_part);
181                    assert_eq!(hasher.finalize().as_slice(), checksum_part.as_slice());
182                }
183                Ok::<_, anyhow::Error>(())
184            })
185            .await?
186        });
187
188        Ok(())
189    }
190
191    #[cfg(feature = "async")]
192    #[tokio::test]
193    async fn async_http_test() -> anyhow::Result<()> {
194        env_logger::builder().is_test(true).try_init().ok();
195
196        let routes = path!("dir1" / "dir2" / "file")
197            .and(post())
198            .and(header_value(USER_AGENT.as_str()))
199            .and(bytes())
200            .map(|user_agent: HeaderValue, req_body: Bytes| {
201                assert_eq!(req_body.len(), BUF_LEN + MD5_LEN);
202                {
203                    let mut hasher = Md5::new();
204                    hasher.update(&req_body[..BUF_LEN]);
205                    assert_eq!(hasher.finalize().as_slice(), &req_body[BUF_LEN..]);
206                }
207
208                assert!(user_agent.to_str().unwrap().starts_with("QiniuRust/"));
209                assert!(user_agent.to_str().unwrap().contains("/qiniu-isahc/"));
210
211                let mut resp_body = vec![0u8; BUF_LEN + MD5_LEN];
212                thread_rng().fill_bytes(&mut resp_body[..BUF_LEN]);
213                {
214                    let mut hasher = Md5::new();
215                    hasher.update(&resp_body[..BUF_LEN]);
216                    resp_body[BUF_LEN..].copy_from_slice(hasher.finalize().as_slice());
217                }
218                Response::new(resp_body.into())
219            });
220        starts_with_server!(addr, routes, {
221            let mut request_body = vec![0u8; BUF_LEN + MD5_LEN];
222            thread_rng().fill_bytes(&mut request_body[..BUF_LEN]);
223            {
224                let mut hasher = Md5::new();
225                hasher.update(&request_body[..BUF_LEN]);
226                request_body[BUF_LEN..].copy_from_slice(hasher.finalize().as_slice());
227            }
228            let last_uploaded = Arc::new(AtomicU64::new(0));
229            let last_total = Arc::new(AtomicU64::new(0));
230
231            let mut response = {
232                let last_uploaded = last_uploaded.to_owned();
233                let last_total = last_total.to_owned();
234                let callback = move |info: TransferProgressInfo| {
235                    last_uploaded.store(info.transferred_bytes(), Relaxed);
236                    last_total.store(info.total_bytes(), Relaxed);
237                    Ok(())
238                };
239                let resolved_ip_addrs = [addr.ip()];
240                let mut request = AsyncRequest::builder()
241                    .method(Method::POST)
242                    .url(
243                        format!("http://fakehost:{}/dir1/dir2/file", addr.port())
244                            .parse()
245                            .expect("invalid uri"),
246                    )
247                    .body(AsyncRequestBody::from_referenced_bytes(&request_body))
248                    .resolved_ip_addrs(resolved_ip_addrs.as_ref())
249                    .on_uploading_progress(OnProgressCallback::reference(&callback))
250                    .add_extension(TimeoutRequestExtension::new(Duration::from_secs(1)))
251                    .build();
252                Client::default_client()?.async_call(&mut request).await?
253            };
254            assert_eq!(
255                response.header(&CONTENT_LENGTH).map(|h| h.as_bytes()),
256                Some(format!("{}", BUF_LEN + MD5_LEN).as_bytes())
257            );
258            assert_eq!(last_uploaded.load(Relaxed), request_body.len() as u64);
259            assert_eq!(last_total.load(Relaxed), request_body.len() as u64);
260            assert_eq!(
261                response.extensions().get::<TimeoutRequestExtension>().unwrap().get(),
262                &Duration::from_secs(1)
263            );
264
265            {
266                let mut body_part = Vec::new();
267                let mut checksum_part = Vec::new();
268
269                assert_eq!(
270                    async_io_copy(&mut response.body_mut().take(BUF_LEN as u64), &mut body_part).await?,
271                    BUF_LEN as u64
272                );
273                assert_eq!(
274                    async_io_copy(&mut response.body_mut().take(MD5_LEN as u64), &mut checksum_part).await?,
275                    MD5_LEN as u64
276                );
277
278                let mut hasher = Md5::new();
279                hasher.update(&body_part);
280                assert_eq!(hasher.finalize().as_slice(), checksum_part.as_slice());
281            }
282            Ok::<_, anyhow::Error>(())
283        });
284
285        Ok(())
286    }
287}