qiniu_download/async_api/
download.rs

1#![allow(clippy::too_many_arguments)]
2
3use super::{
4    super::{
5        base::{credential::Credential, download::RangeReaderBuilder as BaseRangeReaderBuilder},
6        config::{build_range_reader_builder_from_config, Config, Timeouts},
7    },
8    dot::{ApiName, DotType, Dotter},
9    host_selector::{HostInfo, HostSelector, HostSelectorBuilder},
10    query::HostsQuerier,
11    req_id::{get_req_id2, REQUEST_ID_HEADER},
12};
13use async_once_cell::Lazy as AsyncLazy;
14use futures::{AsyncReadExt, TryStreamExt};
15use hyper::HeaderMap;
16use log::{debug, info, warn};
17use mime::{Mime, BOUNDARY};
18use multer::Multipart;
19use reqwest::{
20    header::{HeaderValue, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, RANGE},
21    Client as HttpClient, Error as ReqwestError, Method, RequestBuilder as HttpRequestBuilder,
22    Response as HttpResponse, StatusCode, Url,
23};
24use std::{
25    collections::HashSet,
26    error::Error as StdError,
27    fmt::{self, Debug},
28    future::Future,
29    io::{Cursor, Error as IoError, ErrorKind as IoErrorKind, Result as IoResult},
30    mem::take,
31    ops::Deref,
32    sync::{
33        atomic::{AtomicUsize, Ordering::Relaxed},
34        Arc,
35    },
36    time::{Duration, Instant, SystemTime, SystemTimeError, UNIX_EPOCH},
37};
38use tap::prelude::*;
39use text_io::{try_scan as try_scan_text, Error as TextIoError};
40use tokio::{
41    io::{copy as io_copy, AsyncWrite},
42    spawn,
43    sync::Mutex,
44};
45use tokio_util::{compat::FuturesAsyncReadCompatExt, either::Either};
46
47/// 为私有空间签发对象下载 URL
48/// # Arguments
49///
50/// * `c` - 私有空间所在账户的凭证
51/// * `url` - 对象下载 URL
52/// * `deadline` - 下载 URL 有效截止时间
53pub fn sign_download_url_with_deadline(
54    c: &Credential,
55    url: Url,
56    deadline: SystemTime,
57) -> Result<String, SystemTimeError> {
58    let mut signed_url = url.to_string();
59
60    if signed_url.contains('?') {
61        signed_url.push_str("&e=");
62    } else {
63        signed_url.push_str("?e=");
64    }
65
66    let deadline = deadline.duration_since(UNIX_EPOCH)?.as_secs().to_string();
67    signed_url.push_str(&deadline);
68    let signature = c.sign(signed_url.as_bytes());
69    signed_url.push_str("&token=");
70    signed_url.push_str(&signature);
71    Ok(signed_url)
72}
73
74/// 为私有空间签发对象下载 URL
75/// # Arguments
76///
77/// * `c` - 私有空间所在账户的凭证
78/// * `url` - 对象下载 URL
79/// * `lifetime` - 下载 URL 有效期
80pub fn sign_download_url_with_lifetime(
81    c: &Credential,
82    url: Url,
83    lifetime: Duration,
84) -> Result<String, SystemTimeError> {
85    let deadline = SystemTime::now() + lifetime;
86    sign_download_url_with_deadline(c, url, deadline)
87}
88
89#[derive(Debug)]
90pub(super) struct AsyncRangeReaderBuilder(BaseRangeReaderBuilder);
91
92impl From<BaseRangeReaderBuilder> for AsyncRangeReaderBuilder {
93    fn from(builder: BaseRangeReaderBuilder) -> Self {
94        Self(builder)
95    }
96}
97
98impl From<AsyncRangeReaderBuilder> for BaseRangeReaderBuilder {
99    fn from(builder: AsyncRangeReaderBuilder) -> Self {
100        builder.0
101    }
102}
103
104impl AsyncRangeReaderBuilder {
105    pub(super) fn take_key(&mut self) -> String {
106        take(&mut self.0.key)
107    }
108
109    pub(super) fn build(self) -> AsyncRangeReader {
110        AsyncRangeReader(Arc::new(AsyncLazy::new(Box::pin(async move {
111            self.build_inner().await
112        }))))
113    }
114
115    async fn build_inner(self) -> Arc<AsyncRangeReaderInner> {
116        let builder = self.0;
117        let http_client =
118            Timeouts::new(builder.base_timeout, builder.dial_timeout).async_http_client();
119        let dotter = Dotter::new(
120            http_client.to_owned(),
121            builder.credential.to_owned(),
122            builder.bucket.to_owned(),
123            builder.monitor_urls,
124            builder.dot_interval,
125            builder.max_dot_buffer_size,
126            builder.dot_tries,
127            builder.punish_duration,
128            builder.max_punished_times,
129            builder.max_punished_hosts_percent,
130            builder.base_timeout,
131        )
132        .await;
133
134        let params = HostSelectorParams {
135            update_interval: builder.update_interval,
136            punish_duration: builder.punish_duration,
137            max_punished_times: builder.max_punished_times,
138            max_punished_hosts_percent: builder.max_punished_hosts_percent,
139            base_timeout: builder.base_timeout,
140        };
141
142        let io_querier = if builder.uc_urls.is_empty() {
143            None
144        } else {
145            Some(HostsQuerier::new(
146                make_uc_host_selector(builder.uc_urls, &params).await,
147                builder.uc_tries,
148                dotter.to_owned(),
149                http_client.to_owned(),
150            ))
151        };
152        let io_selector = make_io_selector(
153            builder.io_urls,
154            io_querier,
155            builder.credential.access_key().to_owned(),
156            builder.bucket.to_owned(),
157            builder.use_https,
158            &params,
159        )
160        .await;
161
162        return Arc::new(AsyncRangeReaderInner {
163            io_selector,
164            dotter,
165            http_client,
166            credential: builder.credential,
167            bucket: builder.bucket,
168            use_getfile_api: builder.use_getfile_api,
169            normalize_key: builder.normalize_key,
170            use_https: builder.use_https,
171            private_url_lifetime: builder.private_url_lifetime,
172        });
173
174        #[derive(Clone, Debug)]
175        struct HostSelectorParams {
176            update_interval: Option<Duration>,
177            punish_duration: Option<Duration>,
178            max_punished_times: Option<usize>,
179            max_punished_hosts_percent: Option<u8>,
180            base_timeout: Option<Duration>,
181        }
182
183        impl HostSelectorParams {
184            fn set_builder(&self, mut builder: HostSelectorBuilder) -> HostSelectorBuilder {
185                if let Some(update_interval) = self.update_interval {
186                    builder = builder.update_interval(update_interval);
187                }
188                if let Some(punish_duration) = self.punish_duration {
189                    builder = builder.punish_duration(punish_duration);
190                }
191                if let Some(max_punished_times) = self.max_punished_times {
192                    builder = builder.max_punished_times(max_punished_times);
193                }
194                if let Some(max_punished_hosts_percent) = self.max_punished_hosts_percent {
195                    builder = builder.max_punished_hosts_percent(max_punished_hosts_percent);
196                }
197                if let Some(base_timeout) = self.base_timeout {
198                    builder = builder.base_timeout(base_timeout);
199                }
200                builder
201            }
202        }
203
204        async fn make_uc_host_selector(
205            uc_urls: Vec<String>,
206            params: &HostSelectorParams,
207        ) -> HostSelector {
208            params
209                .set_builder(HostSelector::builder(uc_urls))
210                .build()
211                .await
212        }
213
214        async fn make_io_selector(
215            io_urls: Vec<String>,
216            io_querier: Option<HostsQuerier>,
217            access_key: String,
218            bucket: String,
219            use_https: bool,
220            params: &HostSelectorParams,
221        ) -> HostSelector {
222            let builder = HostSelector::builder(io_urls)
223                .update_callback(Some(Box::new(move || {
224                    let io_querier = io_querier.to_owned();
225                    let access_key = access_key.to_owned();
226                    let bucket = bucket.to_owned();
227                    Box::pin(async move {
228                        if let Some(io_querier) = io_querier.as_ref() {
229                            io_querier
230                                .query_for_io_urls(&access_key, &bucket, use_https)
231                                .await
232                        } else {
233                            Ok(vec![])
234                        }
235                    })
236                })))
237                .should_punish_callback(Some(Box::new(|error| {
238                    let kind = error.kind();
239                    Box::pin(async move { !matches!(kind, IoErrorKind::InvalidData) })
240                })));
241            params.set_builder(builder).build().await
242        }
243    }
244
245    pub(crate) fn from_config(key: String, config: &Config) -> Self {
246        build_range_reader_builder_from_config(key, config).into()
247    }
248}
249
250#[derive(Clone)]
251pub(super) struct AsyncRangeReader(Arc<AsyncLazy<Arc<AsyncRangeReaderInner>>>);
252
253#[derive(Debug)]
254struct AsyncRangeReaderInner {
255    io_selector: HostSelector,
256    dotter: Dotter,
257    credential: Credential,
258    http_client: Arc<HttpClient>,
259    bucket: String,
260    use_getfile_api: bool,
261    normalize_key: bool,
262    use_https: bool,
263    private_url_lifetime: Option<Duration>,
264}
265
266impl AsyncRangeReader {
267    pub(super) async fn dot(
268        &self,
269        dot_type: DotType,
270        api_name: ApiName,
271        successful: bool,
272        elapsed_duration: Duration,
273    ) -> IoResult<()> {
274        let inner = self.0.get().await;
275        inner
276            .dotter
277            .dot(dot_type, api_name, successful, elapsed_duration)
278            .await
279    }
280
281    pub(super) async fn update_urls(&self) -> bool {
282        self.inner().await.io_selector.update_hosts().await
283    }
284
285    pub(super) async fn io_urls(&self) -> Vec<String> {
286        let inner = self.inner().await;
287        return inner
288            .io_selector
289            .hosts()
290            .await
291            .iter()
292            .map(|host| normalize_host(host, inner.use_https))
293            .collect();
294
295        fn normalize_host(host: &str, use_https: bool) -> String {
296            if host.contains("://") {
297                host.to_string()
298            } else if use_https {
299                "https://".to_owned() + host
300            } else {
301                "http://".to_owned() + host
302            }
303        }
304    }
305
306    pub(super) async fn base_timeout(&self) -> Duration {
307        self.inner().await.io_selector.base_timeout()
308    }
309
310    pub(super) async fn increase_timeout_power_by(&self, host: &str, timeout_power: usize) {
311        self.inner()
312            .await
313            .io_selector
314            .increase_timeout_power_by(host, timeout_power)
315            .await
316    }
317
318    pub(super) async fn read_at<F: FnMut(HostInfo) -> Fut, Fut: Future<Output = ()>>(
319        &self,
320        pos: u64,
321        size: u64,
322        key: &str,
323        async_task_id: u32,
324        tries_info: TriesInfo<'_>,
325        trying_hosts: &TryingHosts,
326        on_host_selected: F,
327    ) -> IoResult3<Vec<u8>> {
328        if size == 0 {
329            return Ok(Default::default()).into();
330        }
331        return self.with_retries(
332            key,
333            Method::GET,
334            async_task_id,
335            tries_info,
336            trying_hosts,
337            on_host_selected,
338            |tries, request_builder, req_id, download_url, host_info| {
339                async move {
340                    let range = generate_range_header(pos, size);
341                    debug!(
342                        "{{{}}} [{}] read_at url: {}, req_id: {:?}, range: {}",
343                        async_task_id, tries, download_url, req_id, &range
344                    );
345                    let begin_at = Instant::now();
346                    let result = request_builder
347                        .header(RANGE, &range)
348                        .send()
349                        .await;
350                        if let Err(err) = &result {
351                            self.punish_if_needed(host_info.host(), host_info.timeout_power(), err).await;
352                        }
353                    let result = result
354                        .map_err(io_error_from(IoErrorKind::ConnectionAborted))
355                        .and_then(|resp| {
356                            if resp.status() != StatusCode::PARTIAL_CONTENT && resp.status() != StatusCode::OK {
357                                return Err(unexpected_status_code(&resp));
358                            }
359                            Ok(resp)
360                        })
361                        .map(|resp| {
362                            let max_size = parse_content_length(&resp).min(size);
363                            (resp, max_size)
364                        });
365                    match result {
366                        Ok((resp, max_size)) => {
367                            read_response_body(resp, Some(max_size)).await
368                        }
369                        Err(err) => Err(err),
370                    }
371                        .tap_ok(|_| {
372                            info!(
373                                "{{{}}} [{}] read_at ok url: {}, range: {}, req_id: {:?}, elapsed: {:?}",
374                                async_task_id,
375                                tries,
376                                download_url,
377                                range,
378                                req_id,
379                                begin_at.elapsed(),
380                            );
381                        })
382                        .tap_err(|err| {
383                            warn!(
384                                "{{{}}} [{}] read_at error url: {}, range: {}, error: {}, req_id: {:?}, elapsed: {:?}",
385                                async_task_id, tries, download_url, range, err, req_id, begin_at.elapsed(),
386                            );
387                        })
388                }
389            },
390        )
391        .await;
392
393        fn generate_range_header(pos: u64, size: u64) -> String {
394            format!("bytes={}-{}", pos, pos + size - 1)
395        }
396    }
397
398    pub(super) async fn read_multi_ranges<F: FnMut(HostInfo) -> Fut, Fut: Future<Output = ()>>(
399        &self,
400        ranges: &[(u64, u64)],
401        key: &str,
402        async_task_id: u32,
403        tries_info: TriesInfo<'_>,
404        trying_hosts: &TryingHosts,
405        on_host_selected: F,
406    ) -> IoResult3<Vec<RangePart>> {
407        return self
408            .with_retries(
409                key,
410                Method::GET,
411                async_task_id,
412                tries_info,
413                trying_hosts,
414                on_host_selected,
415                |tries, request_builder, req_id, download_url, host_info| async move {
416                    debug!(
417                        "{{{}}} [{}] read_multi_ranges url: {}, req_id: {:?}, range counts: {}",
418                        async_task_id,
419                        tries,
420                        download_url,
421                        req_id,
422                        ranges.len(),
423                    );
424                    let range = generate_range_header(ranges);
425                    let begin_at = Instant::now();
426                    let result = request_builder
427                        .header(RANGE, &range)
428                        .send()
429                        .await;
430                    if let Err(err) = &result {
431                        self.punish_if_needed(host_info.host(), host_info.timeout_power(), err).await;
432                    }
433                    let result = result.map_err(io_error_from(IoErrorKind::ConnectionAborted));
434                    match result {
435                        Ok(resp) => {
436                            let mut parts = Vec::with_capacity(ranges.len());
437                            match resp.status() {
438                                StatusCode::OK => {
439                                    let body = read_response_body(resp, None).await?;
440                                    for &(from, len) in ranges.iter() {
441                                        let from = (from as usize).min(body.len());
442                                        let len = (len as usize).min(body.len() - from);
443                                        if len > 0 {
444                                            parts.push(RangePart {
445                                                data: body[from..(from + len)].to_vec(),
446                                                range: (from as u64, len as u64),
447                                            });
448                                        }
449                                    }
450                                }
451                                StatusCode::PARTIAL_CONTENT if ranges.len() > 1 => {
452                                    let content_type = resp
453                                        .headers()
454                                        .get(CONTENT_TYPE)
455                                        .ok_or_else(new_io_error(
456                                            IoErrorKind::InvalidInput,
457                                            "Content-Type must be existed",
458                                        ))?;
459                                    let content_type: Mime = content_type
460                                        .to_str()
461                                        .map_err(io_error_from(IoErrorKind::InvalidInput))?
462                                        .parse()
463                                        .map_err(io_error_from(IoErrorKind::InvalidInput))?;
464                                    let boundary = content_type.get_param(BOUNDARY).unwrap();
465                                    let mut multipart =
466                                        Multipart::new(resp.bytes_stream(), boundary.as_str());
467                                    while let Some(field) = multipart
468                                        .next_field()
469                                        .await
470                                        .map_err(io_error_from(IoErrorKind::BrokenPipe))?
471                                    {
472                                        let (from, to, _) = extract_range_header(field.headers())?;
473                                        let len = to - from + 1;
474                                        parts.push(RangePart {
475                                            data: field
476                                                .bytes()
477                                                .await
478                                                .map(|b| b.to_vec())
479                                                .map_err(io_error_from(IoErrorKind::BrokenPipe))?,
480                                            range: (from, len),
481                                        });
482                                    }
483                                }
484                                StatusCode::PARTIAL_CONTENT => {
485                                    let (from, to, _) = extract_range_header(resp.headers())?;
486                                    let len = to - from + 1;
487                                    parts.push(RangePart {
488                                        data: read_response_body(resp, None).await?,
489                                        range: (from, len),
490                                    });
491                                }
492                                _ => {
493                                    return Err(unexpected_status_code(&resp));
494                                }
495                            }
496                            Ok(parts)
497                        }
498                        Err(err) => Err(err),
499                    }
500                    .tap_ok(|_| {
501                        info!(
502                            "{{{}}} [{}] read_multi_ranges ok url: {}, req_id: {:?}, elapsed: {:?}",
503                            async_task_id, tries, download_url, req_id, begin_at.elapsed(),
504                        );
505                    })
506                    .tap_err(|err| {
507                        warn!(
508                            "{{{}}} [{}] read_multi_ranges error url: {}, error: {}, req_id: {:?}, elapsed: {:?}",
509                            async_task_id, tries, download_url, err, req_id, begin_at.elapsed(),
510                        );
511                    })
512                },
513            )
514            .await;
515
516        fn generate_range_header(ranges: &[(u64, u64)]) -> String {
517            let range = ranges
518                .iter()
519                .map(|range| {
520                    let start = range.0;
521                    let end = start + range.1 - 1;
522                    format!("{}-{}", start, end)
523                })
524                .collect::<Vec<_>>()
525                .join(",");
526            format!("bytes={}", range)
527        }
528    }
529
530    pub(super) async fn exist<F: FnMut(HostInfo) -> Fut, Fut: Future<Output = ()>>(
531        &self,
532        key: &str,
533        async_task_id: u32,
534        tries_info: TriesInfo<'_>,
535        trying_hosts: &TryingHosts,
536        on_host_selected: F,
537    ) -> IoResult3<bool> {
538        self.with_retries(
539            key,
540            Method::HEAD,
541            async_task_id,
542            tries_info,
543            trying_hosts,
544            on_host_selected,
545            |tries, request_builder, req_id, download_url, host_info| async move {
546                debug!(
547                    "{{{}}} [{}] exist url: {}, req_id: {:?}",
548                    async_task_id, tries, download_url, req_id
549                );
550                let begin_at = Instant::now();
551                let result = request_builder.send().await;
552                if let Err(err) = &result {
553                    self.punish_if_needed(host_info.host(), host_info.timeout_power(), err)
554                        .await;
555                }
556                result
557                    .map_err(io_error_from(IoErrorKind::ConnectionAborted))
558                    .and_then(|resp| match resp.status() {
559                        StatusCode::OK => Ok(true),
560                        StatusCode::NOT_FOUND => Ok(false),
561                        _ => Err(unexpected_status_code(&resp)),
562                    })
563                    .tap_ok(|_| {
564                        info!(
565                            "{{{}}} [{}] exist ok url: {}, req_id: {:?}, elapsed: {:?}",
566                            async_task_id,
567                            tries,
568                            download_url,
569                            req_id,
570                            begin_at.elapsed(),
571                        );
572                    })
573                    .tap_err(|err| {
574                        warn!(
575                            "{{{}}} [{}] exist error url: {}, error: {}, req_id: {:?}, elapsed: {:?}",
576                            async_task_id,
577                            tries,
578                            download_url,
579                            err,
580                            req_id,
581                            begin_at.elapsed(),
582                        );
583                    })
584            },
585        )
586        .await
587    }
588
589    pub(super) async fn file_size<F: FnMut(HostInfo) -> Fut, Fut: Future<Output = ()>>(
590        &self,
591        key: &str,
592        async_task_id: u32,
593        tries_info: TriesInfo<'_>,
594        trying_hosts: &TryingHosts,
595        on_host_selected: F,
596    ) -> IoResult3<u64> {
597        self.with_retries(
598            key,
599            Method::HEAD,
600            async_task_id,
601            tries_info,
602            trying_hosts,
603            on_host_selected,
604            |tries, request_builder, req_id, download_url, host_info| async move {
605                debug!(
606                    "{{{}}} [{}] file_size url: {}, req_id: {:?}",
607                    async_task_id, tries, download_url, req_id
608                );
609                let begin_at = Instant::now();
610                let result = request_builder.send().await;
611                if let Err(err) = &result {
612                    self.punish_if_needed(host_info.host(), host_info.timeout_power(), err)
613                        .await;
614                }
615                result
616                    .map_err(io_error_from(IoErrorKind::ConnectionAborted))
617                    .and_then(|resp| {
618                        if resp.status() == StatusCode::OK {
619                            Ok(parse_content_length(&resp))
620                        } else {
621                            Err(unexpected_status_code(&resp))
622                        }
623                    })
624                    .tap_ok(|_| {
625                        info!(
626                            "{{{}}} [{}] file_size ok url: {}, req_id: {:?}, elapsed: {:?}",
627                            async_task_id,
628                            tries,
629                            download_url,
630                            req_id,
631                            begin_at.elapsed(),
632                        );
633                    })
634                    .tap_err(|err| {
635                        warn!(
636                            "{{{}}} [{}] file_size error url: {}, error: {}, req_id: {:?}, elapsed: {:?}",
637                            async_task_id,
638                            tries,
639                            download_url,
640                            err,
641                            req_id,
642                            begin_at.elapsed(),
643                        );
644                    })
645            },
646        )
647        .await
648    }
649
650    pub(super) async fn download<F: FnMut(HostInfo) -> Fut, Fut: Future<Output = ()>>(
651        &self,
652        key: &str,
653        async_task_id: u32,
654        tries_info: TriesInfo<'_>,
655        trying_hosts: &TryingHosts,
656        mut on_host_selected: F,
657    ) -> IoResult3<Vec<u8>> {
658        let mut result = Vec::new();
659        loop {
660            let (chunk, mut completed) = match self
661                ._download(
662                    key,
663                    async_task_id,
664                    result.len() as u64,
665                    tries_info,
666                    trying_hosts,
667                    &mut on_host_selected,
668                )
669                .await
670            {
671                Result3::Ok(result) => result,
672                Result3::Err(err) => return Result3::Err(err),
673                Result3::NoMoreTries(err) => return Result3::NoMoreTries(err),
674            };
675            if result.is_empty() {
676                result = chunk;
677            } else if chunk.is_empty() {
678                completed = true;
679            } else {
680                result.extend(chunk);
681            }
682            if completed {
683                return Result3::Ok(result);
684            } else {
685                info!("Early EOF Response Body is detected in {}::download(), will start a new GET request for the rest body", module_path!());
686            }
687        }
688    }
689
690    async fn _download<F: FnMut(HostInfo) -> Fut, Fut: Future<Output = ()>>(
691        &self,
692        key: &str,
693        async_task_id: u32,
694        init_from: u64,
695        tries_info: TriesInfo<'_>,
696        trying_hosts: &TryingHosts,
697        on_host_selected: F,
698    ) -> IoResult3<(Vec<u8>, bool)> {
699        let mut buf = Vec::new();
700        let buf_cursor = Arc::new(Mutex::new(Cursor::new(&mut buf)));
701        let result = self
702            .with_retries(
703                key,
704                Method::GET,
705                async_task_id,
706                tries_info,
707                trying_hosts,
708                on_host_selected,
709                move |tries, mut request_builder, req_id, download_url, host_info| {
710                    let buf_cursor = buf_cursor.to_owned();
711                    async move {
712                        let mut buf_cursor = buf_cursor.lock().await;
713                        let start_from = init_from + buf_cursor.position();
714                        debug!(
715                            "{{{}}} [{}] download_to url: {}, req_id: {:?}, start_from: {}",
716                            async_task_id, tries, download_url, req_id, start_from
717                        );
718                        let begin_at = Instant::now();
719                        if start_from > 0 {
720                            request_builder =
721                                request_builder.header(RANGE, format!("bytes={}-", start_from));
722                        }
723                        let result = request_builder
724                            .send()
725                            .await;
726                        if let Err(err) = &result {
727                            self.punish_if_needed(
728                                host_info.host(),
729                                host_info.timeout_power(),
730                                err,
731                            ).await;
732                        }
733                        let result = result.map_err(io_error_from(IoErrorKind::ConnectionAborted));
734                        match result {
735                            Ok(resp) => {
736                                let content_length = parse_content_length(&resp);
737                                write_to_writer(resp,  &mut *buf_cursor).await.map(|actually_downloaded| {
738                                    if let Some(actually_downloaded) = actually_downloaded {
739                                        (actually_downloaded, actually_downloaded < content_length)
740                                    } else {
741                                        (0, false)
742                                    }
743                                })
744                            },
745                            Err(err) => Err(err),
746                        }
747                        .tap_ok(|(downloaded, incompleted)| {
748                            info!(
749                                "{{{}}} [{}] download ok url: {}, start_from: {}, downloaded: {}, completed: {:?}, req_id: {:?}, elapsed: {:?}",
750                                async_task_id, tries, download_url, start_from, downloaded, !incompleted, req_id, begin_at.elapsed(),
751                            );
752                        })
753                        .tap_err(|err| {
754                            warn!(
755                                "{{{}}} [{}] download error url: {}, start_from: {}, error: {}, req_id: {:?}, elapsed: {:?}",
756                                async_task_id, tries, download_url, start_from, err, req_id, begin_at.elapsed(),
757                            );
758                        })
759                    }
760                },
761            )
762            .await;
763        return match result {
764            Result3::Ok((_, incompleted)) => Ok((buf, !incompleted)).into(),
765            Result3::Err(err) => Result3::Err(err),
766            Result3::NoMoreTries(err) => Result3::NoMoreTries(err),
767        };
768
769        async fn write_to_writer<W: AsyncWrite + Unpin>(
770            resp: HttpResponse,
771            mut writer: W,
772        ) -> IoResult<Option<u64>> {
773            if resp.status() == StatusCode::RANGE_NOT_SATISFIABLE {
774                Ok(None)
775            } else if resp.status() != StatusCode::OK
776                && resp.status() != StatusCode::PARTIAL_CONTENT
777            {
778                Err(unexpected_status_code(&resp))
779            } else {
780                let body = resp
781                    .bytes_stream()
782                    .map_err(io_error_from(IoErrorKind::BrokenPipe));
783                io_copy(&mut body.into_async_read().compat(), &mut writer)
784                    .await
785                    .map(Some)
786            }
787        }
788    }
789
790    pub(super) async fn read_last_bytes<F: FnMut(HostInfo) -> Fut, Fut: Future<Output = ()>>(
791        &self,
792        size: u64,
793        key: &str,
794        async_task_id: u32,
795        tries_info: TriesInfo<'_>,
796        trying_hosts: &TryingHosts,
797        on_host_selected: F,
798    ) -> IoResult3<(Vec<u8>, u64)> {
799        return self.with_retries(
800            key,
801            Method::GET,
802            async_task_id,
803            tries_info,
804            trying_hosts,
805            on_host_selected,
806            move |tries, request_builder, req_id, download_url, host_info| async move {
807                debug!(
808                    "{{{}}} [{}] read_last_bytes url: {}, req_id: {:?}, len: {}",
809                    async_task_id, tries, download_url, req_id, size,
810                );
811                let begin_at = Instant::now();
812                let result = request_builder
813                    .header(RANGE, format!("bytes=-{}", size))
814                    .send()
815                    .await;
816                    if let Err(err) = &result {
817                        self.punish_if_needed(host_info.host(), host_info.timeout_power(), err).await;
818                    }
819                    let result = result.map_err(io_error_from(IoErrorKind::ConnectionAborted))
820                    .and_then(|resp| {
821                        if resp.status() == StatusCode::PARTIAL_CONTENT {
822                            Ok(resp)
823                        } else {
824                            Err(unexpected_status_code(&resp))
825                        }
826                    });
827                match result {
828                    Ok(resp) => get_response_body_and_total_size(resp, size).await,
829                    Err(err) => Err(err),
830                }
831                .tap_ok(|_| {
832                    info!(
833                        "{{{}}} [{}] download ok url: {}, len: {}, req_id: {:?}, elapsed: {:?}",
834                        async_task_id, tries, download_url, size, req_id, begin_at.elapsed(),
835                    );
836                })
837                .tap_err(|err| {
838                    warn!(
839                        "{{{}}} [{}] download error url: {}, len: {}, error: {}, req_id: {:?}, elapsed: {:?}",
840                        async_task_id, tries, download_url, size, err, req_id, begin_at.elapsed(),
841                    );
842                })
843            }
844        )
845        .await;
846
847        async fn get_response_body_and_total_size(
848            resp: HttpResponse,
849            limit: u64,
850        ) -> IoResult<(Vec<u8>, u64)> {
851            let (_, _, total_size) = extract_range_header(resp.headers())?;
852            let last_bytes = read_response_body(resp, Some(limit)).await?;
853            Ok((last_bytes, total_size))
854        }
855    }
856
857    async fn inner(&self) -> &Arc<AsyncRangeReaderInner> {
858        self.0.get().await
859    }
860
861    async fn with_retries<
862        T,
863        F: FnMut(usize, HttpRequestBuilder, HeaderValue, Url, HostInfo) -> Fut,
864        Fut: Future<Output = IoResult<T>>,
865        F2: FnMut(HostInfo) -> Fut2,
866        Fut2: Future<Output = ()>,
867    >(
868        &self,
869        key: &str,
870        method: Method,
871        async_task_id: u32,
872        tries_info: TriesInfo<'_>,
873        trying_hosts: &TryingHosts,
874        mut on_host_selected: F2,
875        mut for_each_url: F,
876    ) -> IoResult3<T> {
877        let begin_at = SystemTime::now();
878        let mut last_error: Option<IoError> = None;
879        let inner = self.inner().await;
880
881        loop {
882            let tries = tries_info.have_tried.fetch_add(1, Relaxed);
883            if tries >= tries_info.total_tries {
884                return IoResult3::NoMoreTries(last_error);
885            }
886
887            let chosen_io_info = {
888                let mut guard = trying_hosts.lock().await;
889                if let Some(chosen) = inner.io_selector.select_host(&guard).await {
890                    guard.insert(chosen.host().to_owned());
891                    drop(guard);
892                    TryingHostInfo {
893                        host_info: chosen,
894                        trying_hosts: trying_hosts.to_owned(),
895                    }
896                } else {
897                    return IoResult3::NoMoreTries(last_error);
898                }
899            };
900            on_host_selected(chosen_io_info.to_owned()).await;
901            let download_url = sign_download_url_if_needed(
902                &make_download_url(
903                    chosen_io_info.host(),
904                    inner.credential.access_key(),
905                    &inner.bucket,
906                    key,
907                    inner.use_getfile_api,
908                    inner.normalize_key,
909                ),
910                inner.private_url_lifetime,
911                &inner.credential,
912            );
913            let req_id = get_req_id2(
914                begin_at,
915                tries,
916                async_task_id,
917                chosen_io_info.host_info.timeout(),
918            );
919            let request_begin_at_instant = Instant::now();
920            let request_builder = inner
921                .http_client
922                .request(method.to_owned(), download_url.to_owned())
923                .header(REQUEST_ID_HEADER, req_id.to_owned());
924            match for_each_url(
925                tries,
926                request_builder,
927                req_id,
928                download_url,
929                chosen_io_info.to_owned(),
930            )
931            .await
932            {
933                Ok(result) => {
934                    inner.io_selector.reward(chosen_io_info.host()).await;
935                    inner
936                        .dotter
937                        .dot(
938                            DotType::Http,
939                            ApiName::IoGetfile,
940                            true,
941                            request_begin_at_instant.elapsed(),
942                        )
943                        .await
944                        .ok();
945                    return Ok(result).into();
946                }
947                Err(err) => {
948                    let punished = inner
949                        .io_selector
950                        .punish(chosen_io_info.host(), &err, &inner.dotter)
951                        .await;
952                    inner
953                        .dotter
954                        .dot(
955                            DotType::Http,
956                            ApiName::IoGetfile,
957                            false,
958                            request_begin_at_instant.elapsed(),
959                        )
960                        .await
961                        .ok();
962                    if punished {
963                        last_error = Some(err);
964                    } else {
965                        return Err(err).into();
966                    }
967                }
968            }
969        }
970
971        fn make_download_url(
972            io_url: &str,
973            access_key: &str,
974            bucket: &str,
975            key: &str,
976            use_getfile_api: bool,
977            normalize_key: bool,
978        ) -> String {
979            let mut url = if use_getfile_api {
980                format!("{}/getfile/{}/{}", io_url, access_key, bucket)
981            } else {
982                io_url.to_owned()
983            };
984            if normalize_key {
985                if url.ends_with('/') && key.starts_with('/') {
986                    url.truncate(url.len() - 1);
987                } else if !url.ends_with('/') && !key.starts_with('/') {
988                    url.push('/');
989                }
990            }
991            url.push_str(key);
992            url
993        }
994
995        fn sign_download_url_if_needed(
996            url: &str,
997            private_url_lifetime: Option<Duration>,
998            credential: &Credential,
999        ) -> Url {
1000            if let Some(private_url_lifetime) = private_url_lifetime {
1001                Url::parse(
1002                    &sign_download_url_with_lifetime(
1003                        credential,
1004                        Url::parse(url).unwrap(),
1005                        private_url_lifetime,
1006                    )
1007                    .unwrap(),
1008                )
1009                .unwrap()
1010            } else {
1011                Url::parse(url).unwrap()
1012            }
1013        }
1014    }
1015
1016    async fn punish_if_needed(&self, host: &str, timeout_power: usize, err: &ReqwestError) {
1017        if err.is_timeout() {
1018            self.inner()
1019                .await
1020                .io_selector
1021                .increase_timeout_power_by(host, timeout_power)
1022                .await
1023        } else if err.is_connect() {
1024            self.inner()
1025                .await
1026                .io_selector
1027                .mark_connection_as_failed(host)
1028                .await
1029        }
1030    }
1031}
1032
1033impl Debug for AsyncRangeReader {
1034    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1035        f.debug_tuple("AsyncRangeReader")
1036            .field(&self.0.try_get())
1037            .finish()
1038    }
1039}
1040
1041/// 通过 RangeReader::read_multi_ranges() 获取文件的区域以及对应的数据
1042#[derive(Debug, Clone)]
1043pub struct RangePart {
1044    /// 区域对应的数据
1045    pub data: Vec<u8>,
1046    /// 区域的开始偏移量和区域长度
1047    pub range: (u64, u64),
1048}
1049
1050#[derive(Clone, Debug, PartialEq, Eq)]
1051pub(super) enum Result3<T, E> {
1052    Ok(T),
1053    Err(E),
1054    NoMoreTries(Option<E>),
1055}
1056
1057pub(super) type IoResult3<T> = Result3<T, IoError>;
1058
1059impl<T, E> From<Result<T, E>> for Result3<T, E> {
1060    fn from(r: Result<T, E>) -> Self {
1061        match r {
1062            Ok(r) => Result3::Ok(r),
1063            Err(e) => Result3::Err(e),
1064        }
1065    }
1066}
1067
1068pub(super) type TryingHosts = Arc<Mutex<HashSet<String>>>;
1069
1070struct TryingHostInfo {
1071    host_info: HostInfo,
1072    trying_hosts: TryingHosts,
1073}
1074
1075impl Deref for TryingHostInfo {
1076    type Target = HostInfo;
1077
1078    fn deref(&self) -> &Self::Target {
1079        &self.host_info
1080    }
1081}
1082
1083impl Drop for TryingHostInfo {
1084    fn drop(&mut self) {
1085        if let Ok(mut trying_hosts) = self.trying_hosts.try_lock() {
1086            trying_hosts.remove(self.host_info.host());
1087            return;
1088        }
1089        let trying_hosts = take(&mut self.trying_hosts);
1090        let host_info = take(&mut self.host_info);
1091        spawn(async move {
1092            trying_hosts.lock().await.remove(host_info.host());
1093        });
1094    }
1095}
1096
1097#[derive(Clone, Copy, Debug)]
1098pub(super) struct TriesInfo<'a> {
1099    have_tried: &'a AtomicUsize,
1100    total_tries: usize,
1101}
1102
1103impl<'a> TriesInfo<'a> {
1104    pub(super) fn new(have_tried: &'a AtomicUsize, total_tries: usize) -> Self {
1105        Self {
1106            have_tried,
1107            total_tries,
1108        }
1109    }
1110}
1111
1112fn unexpected_status_code(resp: &HttpResponse) -> IoError {
1113    let error_kind = if resp.status().is_client_error() {
1114        IoErrorKind::InvalidData
1115    } else {
1116        IoErrorKind::Other
1117    };
1118    IoError::new(
1119        error_kind,
1120        format!("Unexpected status code {}", resp.status().as_u16()),
1121    )
1122}
1123
1124fn parse_content_length(resp: &HttpResponse) -> u64 {
1125    resp.content_length()
1126        .and_then(|s| if s > 0 { Some(s) } else { None })
1127        .or_else(|| {
1128            resp.headers()
1129                .get(CONTENT_LENGTH)
1130                .and_then(|length| length.to_str().ok())
1131                .and_then(|length| length.parse().ok())
1132        })
1133        .expect("Content-Length must be existed")
1134}
1135
1136fn extract_range_header(headers: &HeaderMap) -> IoResult<(u64, u64, u64)> {
1137    let content_range = headers
1138        .get(CONTENT_RANGE)
1139        .ok_or_else(new_io_error(
1140            IoErrorKind::InvalidInput,
1141            "Content-Range must be existed",
1142        ))?
1143        .to_str()
1144        .map_err(io_error_from(IoErrorKind::InvalidInput))?;
1145    let (from, to, total_size) =
1146        parse_range_header(content_range).map_err(io_error_from(IoErrorKind::InvalidInput))?;
1147    Ok((from, to, total_size))
1148}
1149
1150fn parse_range_header(range: &str) -> Result<(u64, u64, u64), TextIoError> {
1151    let from: u64;
1152    let to: u64;
1153    let total_size: u64;
1154    try_scan_text!(range.bytes() => "bytes {}-{}/{}", from, to, total_size);
1155    Ok((from, to, total_size))
1156}
1157
1158async fn read_response_body(resp: HttpResponse, limit: Option<u64>) -> IoResult<Vec<u8>> {
1159    let mut buf_cursor = Cursor::new(Vec::<u8>::new());
1160    let body = resp
1161        .bytes_stream()
1162        .map_err(io_error_from(IoErrorKind::BrokenPipe))
1163        .into_async_read();
1164    let mut copy_from = if let Some(limit) = limit {
1165        Either::Left(body.take(limit).compat())
1166    } else {
1167        Either::Right(body.compat())
1168    };
1169    io_copy(&mut copy_from, &mut buf_cursor).await?;
1170    Ok(buf_cursor.into_inner())
1171}
1172
1173fn io_error_from<E: Into<Box<dyn StdError + Send + Sync>>>(
1174    kind: IoErrorKind,
1175) -> impl Fn(E) -> IoError {
1176    move |err| IoError::new(kind, err)
1177}
1178
1179fn new_io_error<E: Into<Box<dyn StdError + Send + Sync>>>(
1180    kind: IoErrorKind,
1181    err: E,
1182) -> impl FnOnce() -> IoError {
1183    move || IoError::new(kind, err)
1184}
1185
1186#[cfg(test)]
1187mod tests {
1188    use super::{
1189        super::{
1190            cache_dir::cache_dir_path_of,
1191            dot::{AsyncDotRecordsMap, DotRecordKey, DotRecords, DOT_FILE_NAME},
1192            query::CACHE_FILE_NAME,
1193        },
1194        *,
1195    };
1196    use futures::channel::oneshot::channel;
1197    use multipart::client::lazy::Multipart as LazyMultipart;
1198    use serde_json::{json, to_vec as json_to_vec};
1199    use std::{
1200        io::Read,
1201        sync::{
1202            atomic::{AtomicUsize, Ordering::Relaxed},
1203            Arc,
1204        },
1205    };
1206    use tokio::{fs::remove_file, task::spawn, time::sleep};
1207    use warp::{
1208        header,
1209        http::{header::AUTHORIZATION, HeaderValue, StatusCode},
1210        hyper::Body,
1211        path,
1212        reply::Response,
1213        Filter,
1214    };
1215
1216    macro_rules! starts_with_server {
1217        ($addr:ident, $routes:ident, $code:block) => {{
1218            let (tx, rx) = channel();
1219            let ($addr, server) =
1220                warp::serve($routes).bind_with_graceful_shutdown(([127, 0, 0, 1], 0), async move {
1221                    rx.await.unwrap();
1222                });
1223            spawn(server);
1224            $code;
1225            tx.send(()).unwrap();
1226        }};
1227        ($io_addr:ident, $monitor_addr:ident, $io_routes:ident, $records_map:ident, $code:block) => {{
1228            let (io_tx, io_rx) = channel();
1229            let (monitor_tx, monitor_rx) = channel();
1230            let ($io_addr, io_server) = warp::serve($io_routes).bind_with_graceful_shutdown(
1231                ([127, 0, 0, 1], 0),
1232                async move {
1233                    io_rx.await.unwrap();
1234                },
1235            );
1236            let $records_map = Arc::new(AsyncDotRecordsMap::default());
1237            let monitor_routes = {
1238                let records_map = $records_map.to_owned();
1239                path!("v1" / "stat")
1240                    .and(warp::header::value(AUTHORIZATION.as_str()))
1241                    .and(warp::body::json())
1242                    .then(move |authorization: HeaderValue, records: DotRecords| {
1243                        assert!(authorization.to_str().unwrap().starts_with("UpToken "));
1244                        let records_map = records_map.to_owned();
1245                        async move {
1246                            records_map.merge_with_records(records).await;
1247                            Response::new(Body::empty())
1248                        }
1249                    })
1250            };
1251            let ($monitor_addr, monitor_server) = warp::serve(monitor_routes)
1252                .bind_with_graceful_shutdown(([127, 0, 0, 1], 0), async move {
1253                    monitor_rx.await.unwrap();
1254                });
1255            spawn(io_server);
1256            spawn(monitor_server);
1257            $code;
1258            io_tx.send(()).unwrap();
1259            monitor_tx.send(()).unwrap();
1260        }};
1261        ($io_addr:ident, $uc_addr:ident, $io_routes:ident, $code:block) => {{
1262            let (io_tx, io_rx) = channel();
1263            let (uc_tx, uc_rx) = channel();
1264            let ($io_addr, io_server) = warp::serve($io_routes).bind_with_graceful_shutdown(
1265                ([127, 0, 0, 1], 0),
1266                async move {
1267                    io_rx.await.unwrap();
1268                },
1269            );
1270            let io_addr = $io_addr.to_owned();
1271            let uc_routes = {
1272                path!("v4" / "query")
1273                    .map(move || {
1274                        Response::new(json_to_vec(&json!({
1275                            "hosts": [{
1276                                "ttl": 86400,
1277                                "io": {
1278                                    "domains": [io_addr]
1279                                },
1280                                "uc": {
1281                                    "domains": []
1282                                }
1283                            }]
1284                        })).unwrap().into())
1285                    })
1286            };
1287            let ($uc_addr, uc_server) = warp::serve(uc_routes).bind_with_graceful_shutdown(
1288                ([127, 0, 0, 1], 0),
1289                async move {
1290                    uc_rx.await.unwrap();
1291                },
1292            );
1293            spawn(io_server);
1294            spawn(uc_server);
1295            $code;
1296            io_tx.send(()).unwrap();
1297            uc_tx.send(()).unwrap();
1298        }};
1299    }
1300
1301    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1302    async fn test_read_at() -> anyhow::Result<()> {
1303        env_logger::try_init().ok();
1304        clear_cache().await?;
1305
1306        let io_routes = {
1307            let action_1 =
1308                path!("file")
1309                    .and(header::value(RANGE.as_str()))
1310                    .map(|range: HeaderValue| {
1311                        assert_eq!(range.to_str().unwrap(), "bytes=5-10");
1312                        Response::new("1234567890".into())
1313                    });
1314            let action_2 =
1315                path!("file2")
1316                    .and(header::value(RANGE.as_str()))
1317                    .map(|range: HeaderValue| {
1318                        assert_eq!(range.to_str().unwrap(), "bytes=5-16");
1319                        Response::new("1234567890".into())
1320                    });
1321            action_1.or(action_2)
1322        };
1323
1324        starts_with_server!(io_addr, monitor_addr, io_routes, records_map, {
1325            let io_urls = vec![format!("http://{}", io_addr)];
1326            {
1327                let have_tried = AtomicUsize::new(0);
1328                let io_urls = io_urls.to_owned();
1329                let downloader = AsyncRangeReaderBuilder::from(
1330                    BaseRangeReaderBuilder::new(
1331                        "bucket".to_owned(),
1332                        "file".to_owned(),
1333                        get_credential(),
1334                        io_urls,
1335                    )
1336                    .use_getfile_api(false)
1337                    .normalize_key(true)
1338                    .monitor_urls(vec!["http://".to_owned() + &monitor_addr.to_string()])
1339                    .dot_interval(Duration::from_millis(0))
1340                    .max_dot_buffer_size(1),
1341                )
1342                .build();
1343
1344                match downloader
1345                    .read_at(
1346                        5,
1347                        6,
1348                        "file",
1349                        0,
1350                        TriesInfo::new(&have_tried, 1),
1351                        &Default::default(),
1352                        |_| async {},
1353                    )
1354                    .await
1355                {
1356                    Result3::Ok(buf) => {
1357                        assert_eq!(&buf, b"123456")
1358                    }
1359                    _ => unreachable!(),
1360                }
1361            }
1362            {
1363                let have_tried = AtomicUsize::new(0);
1364                let io_urls = io_urls.to_owned();
1365                let downloader = AsyncRangeReaderBuilder::from(
1366                    BaseRangeReaderBuilder::new(
1367                        "bucket".to_owned(),
1368                        "file2".to_owned(),
1369                        get_credential(),
1370                        io_urls,
1371                    )
1372                    .use_getfile_api(false)
1373                    .normalize_key(true)
1374                    .monitor_urls(vec!["http://".to_owned() + &monitor_addr.to_string()])
1375                    .dot_interval(Duration::from_millis(0))
1376                    .max_dot_buffer_size(1),
1377                )
1378                .build();
1379
1380                match downloader
1381                    .read_at(
1382                        5,
1383                        12,
1384                        "file2",
1385                        0,
1386                        TriesInfo::new(&have_tried, 1),
1387                        &Default::default(),
1388                        |_| async {},
1389                    )
1390                    .await
1391                {
1392                    Result3::Ok(buf) => {
1393                        assert_eq!(&buf[..10], b"1234567890")
1394                    }
1395                    _ => unreachable!(),
1396                }
1397            }
1398
1399            sleep(Duration::from_secs(5)).await;
1400            {
1401                let record = records_map
1402                    .read_async(
1403                        &DotRecordKey::new(DotType::Http, ApiName::IoGetfile),
1404                        |_, record| record.to_owned(),
1405                    )
1406                    .await
1407                    .unwrap();
1408                assert_eq!(record.success_count(), Some(2));
1409                assert_eq!(record.failed_count(), Some(0));
1410            }
1411        });
1412        Ok(())
1413    }
1414
1415    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1416    async fn test_read_at_2() -> anyhow::Result<()> {
1417        env_logger::try_init().ok();
1418        clear_cache().await?;
1419
1420        let io_called = Arc::new(AtomicUsize::new(0));
1421        let io_routes = {
1422            let io_called = io_called.to_owned();
1423            path!("file")
1424                .and(header::value(RANGE.as_str()))
1425                .map(move |range: HeaderValue| {
1426                    assert_eq!(range.to_str().unwrap(), "bytes=1-5");
1427                    io_called.fetch_add(1, Relaxed);
1428                    let mut resp = Response::new("12345".into());
1429                    *resp.status_mut() = StatusCode::NOT_IMPLEMENTED;
1430                    resp
1431                })
1432        };
1433        starts_with_server!(io_addr, monitor_addr, io_routes, records_map, {
1434            let have_tried = AtomicUsize::new(0);
1435            let io_urls = vec![format!("http://{}", io_addr)];
1436            let downloader = AsyncRangeReaderBuilder::from(
1437                BaseRangeReaderBuilder::new(
1438                    "bucket".to_owned(),
1439                    "file".to_owned(),
1440                    get_credential(),
1441                    io_urls,
1442                )
1443                .use_getfile_api(false)
1444                .normalize_key(true)
1445                .monitor_urls(vec!["http://".to_owned() + &monitor_addr.to_string()])
1446                .dot_interval(Duration::from_millis(0))
1447                .max_dot_buffer_size(1),
1448            )
1449            .build();
1450
1451            match downloader
1452                .read_at(
1453                    1,
1454                    5,
1455                    "file",
1456                    0,
1457                    TriesInfo::new(&have_tried, 3),
1458                    &Default::default(),
1459                    |_| async {},
1460                )
1461                .await
1462            {
1463                Result3::NoMoreTries(..) => {}
1464                _ => unreachable!(),
1465            }
1466            assert_eq!(io_called.load(Relaxed), 3);
1467
1468            sleep(Duration::from_secs(5)).await;
1469            {
1470                let record = records_map
1471                    .read_async(
1472                        &DotRecordKey::new(DotType::Http, ApiName::IoGetfile),
1473                        |_, record| record.to_owned(),
1474                    )
1475                    .await
1476                    .unwrap();
1477                assert_eq!(record.success_count(), Some(0));
1478                assert_eq!(record.failed_count(), Some(3));
1479            }
1480        });
1481        Ok(())
1482    }
1483
1484    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1485    async fn test_read_at_3() -> anyhow::Result<()> {
1486        env_logger::try_init().ok();
1487        clear_cache().await?;
1488
1489        let io_called = Arc::new(AtomicUsize::new(0));
1490        let io_routes = {
1491            let io_called = io_called.to_owned();
1492            path!("file")
1493                .and(header::value(RANGE.as_str()))
1494                .map(move |range: HeaderValue| {
1495                    assert_eq!(range.to_str().unwrap(), "bytes=1-5");
1496                    io_called.fetch_add(1, Relaxed);
1497                    let mut resp = Response::new("12345".into());
1498                    *resp.status_mut() = StatusCode::BAD_REQUEST;
1499                    resp
1500                })
1501        };
1502        starts_with_server!(io_addr, monitor_addr, io_routes, records_map, {
1503            let have_tried = AtomicUsize::new(0);
1504            let io_urls = vec![format!("http://{}", io_addr)];
1505            let downloader = AsyncRangeReaderBuilder::from(
1506                BaseRangeReaderBuilder::new(
1507                    "bucket".to_owned(),
1508                    "file".to_owned(),
1509                    get_credential(),
1510                    io_urls,
1511                )
1512                .use_getfile_api(false)
1513                .normalize_key(true)
1514                .monitor_urls(vec!["http://".to_owned() + &monitor_addr.to_string()])
1515                .dot_interval(Duration::from_millis(0))
1516                .max_dot_buffer_size(1),
1517            )
1518            .build();
1519
1520            match downloader
1521                .read_at(
1522                    1,
1523                    5,
1524                    "file",
1525                    0,
1526                    TriesInfo::new(&have_tried, 1),
1527                    &Default::default(),
1528                    |_| async {},
1529                )
1530                .await
1531            {
1532                Result3::Err(..) => {}
1533                _ => unreachable!(),
1534            }
1535            assert_eq!(io_called.load(Relaxed), 1);
1536
1537            sleep(Duration::from_secs(5)).await;
1538            {
1539                let record = records_map
1540                    .read_async(
1541                        &DotRecordKey::new(DotType::Http, ApiName::IoGetfile),
1542                        |_, record| record.to_owned(),
1543                    )
1544                    .await
1545                    .unwrap();
1546                assert_eq!(record.success_count(), Some(0));
1547                assert_eq!(record.failed_count(), Some(1));
1548            }
1549        });
1550        Ok(())
1551    }
1552
1553    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1554    async fn test_read_last_bytes() -> anyhow::Result<()> {
1555        env_logger::try_init().ok();
1556        clear_cache().await?;
1557
1558        let io_routes =
1559            path!("file")
1560                .and(header::value(RANGE.as_str()))
1561                .map(|range: HeaderValue| {
1562                    assert_eq!(range.to_str().unwrap(), "bytes=-10");
1563                    let mut resp = Response::new("1234567890".into());
1564                    *resp.status_mut() = StatusCode::PARTIAL_CONTENT;
1565                    resp.headers_mut().insert(
1566                        CONTENT_RANGE,
1567                        "bytes 157286390-157286399/157286400".parse().unwrap(),
1568                    );
1569                    resp
1570                });
1571        starts_with_server!(io_addr, monitor_addr, io_routes, records_map, {
1572            let have_tried = AtomicUsize::new(0);
1573            let io_urls = vec![format!("http://{}", io_addr)];
1574            let downloader = AsyncRangeReaderBuilder::from(
1575                BaseRangeReaderBuilder::new(
1576                    "bucket".to_owned(),
1577                    "file".to_owned(),
1578                    get_credential(),
1579                    io_urls,
1580                )
1581                .use_getfile_api(false)
1582                .normalize_key(true)
1583                .monitor_urls(vec!["http://".to_owned() + &monitor_addr.to_string()])
1584                .dot_interval(Duration::from_millis(0))
1585                .max_dot_buffer_size(1),
1586            )
1587            .build();
1588
1589            match downloader
1590                .read_last_bytes(
1591                    10,
1592                    "file",
1593                    0,
1594                    TriesInfo::new(&have_tried, 1),
1595                    &Default::default(),
1596                    |_| async {},
1597                )
1598                .await
1599            {
1600                Result3::Ok((buf, total_size)) => {
1601                    assert_eq!(&buf, b"1234567890");
1602                    assert_eq!(total_size, 157286400);
1603                }
1604                _ => unreachable!(),
1605            }
1606
1607            sleep(Duration::from_secs(5)).await;
1608            {
1609                let record = records_map
1610                    .read_async(
1611                        &DotRecordKey::new(DotType::Http, ApiName::IoGetfile),
1612                        |_, record| record.to_owned(),
1613                    )
1614                    .await
1615                    .unwrap();
1616                assert_eq!(record.success_count(), Some(1));
1617                assert_eq!(record.failed_count(), Some(0));
1618            }
1619        });
1620        Ok(())
1621    }
1622
1623    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1624    async fn test_download_file() -> anyhow::Result<()> {
1625        env_logger::try_init().ok();
1626        clear_cache().await?;
1627
1628        let io_routes = { path!("file").map(|| Response::new("1234567890".into())) };
1629        starts_with_server!(io_addr, monitor_addr, io_routes, records_map, {
1630            let io_urls = vec![format!("http://{}", io_addr)];
1631            let downloader = AsyncRangeReaderBuilder::from(
1632                BaseRangeReaderBuilder::new(
1633                    "bucket".to_owned(),
1634                    "file".to_owned(),
1635                    get_credential(),
1636                    io_urls,
1637                )
1638                .use_getfile_api(false)
1639                .normalize_key(true)
1640                .monitor_urls(vec!["http://".to_owned() + &monitor_addr.to_string()])
1641                .dot_interval(Duration::from_millis(0))
1642                .max_dot_buffer_size(1),
1643            )
1644            .build();
1645
1646            let have_tried = AtomicUsize::new(0);
1647            match downloader
1648                .exist(
1649                    "file",
1650                    0,
1651                    TriesInfo::new(&have_tried, 1),
1652                    &Default::default(),
1653                    |_| async {},
1654                )
1655                .await
1656            {
1657                Result3::Ok(existed) => {
1658                    assert!(existed);
1659                }
1660                _ => unreachable!(),
1661            }
1662
1663            let have_tried = AtomicUsize::new(0);
1664            match downloader
1665                .file_size(
1666                    "file",
1667                    0,
1668                    TriesInfo::new(&have_tried, 1),
1669                    &Default::default(),
1670                    |_| async {},
1671                )
1672                .await
1673            {
1674                Result3::Ok(file_size) => {
1675                    assert_eq!(file_size, 10);
1676                }
1677                _ => unreachable!(),
1678            }
1679
1680            let have_tried = AtomicUsize::new(0);
1681            match downloader
1682                .download(
1683                    "file",
1684                    0,
1685                    TriesInfo::new(&have_tried, 1),
1686                    &Default::default(),
1687                    |_| async {},
1688                )
1689                .await
1690            {
1691                Result3::Ok(buf) => {
1692                    assert_eq!(&buf, b"1234567890");
1693                }
1694                _ => unreachable!(),
1695            }
1696
1697            sleep(Duration::from_secs(5)).await;
1698            {
1699                let record = records_map
1700                    .read_async(
1701                        &DotRecordKey::new(DotType::Http, ApiName::IoGetfile),
1702                        |_, record| record.to_owned(),
1703                    )
1704                    .await
1705                    .unwrap();
1706                assert_eq!(record.success_count(), Some(3));
1707                assert_eq!(record.failed_count(), Some(0));
1708            }
1709        });
1710        Ok(())
1711    }
1712
1713    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1714    async fn test_download_file_2() -> anyhow::Result<()> {
1715        env_logger::try_init().ok();
1716        clear_cache().await?;
1717
1718        let counter = Arc::new(AtomicUsize::new(0));
1719        let routes = {
1720            let counter = counter.to_owned();
1721            path!("file").map(move || {
1722                counter.fetch_add(1, Relaxed);
1723                let mut resp = Response::new("12345".into());
1724                *resp.status_mut() = StatusCode::NOT_IMPLEMENTED;
1725                resp
1726            })
1727        };
1728
1729        starts_with_server!(addr, monitor_addr, routes, records_map, {
1730            let io_urls = vec![format!("http://{}", addr)];
1731            let downloader = AsyncRangeReaderBuilder::from(
1732                BaseRangeReaderBuilder::new(
1733                    "bucket".to_owned(),
1734                    "file".to_owned(),
1735                    get_credential(),
1736                    io_urls,
1737                )
1738                .monitor_urls(vec!["http://".to_owned() + &monitor_addr.to_string()])
1739                .use_getfile_api(false)
1740                .normalize_key(true)
1741                .dot_interval(Duration::from_millis(0))
1742                .max_dot_buffer_size(1),
1743            )
1744            .build();
1745
1746            let have_tried = AtomicUsize::new(0);
1747            match downloader
1748                .exist(
1749                    "file",
1750                    0,
1751                    TriesInfo::new(&have_tried, 3),
1752                    &Default::default(),
1753                    |_| async {},
1754                )
1755                .await
1756            {
1757                Result3::NoMoreTries(_) => {}
1758                _ => unreachable!(),
1759            }
1760
1761            let have_tried = AtomicUsize::new(0);
1762            match downloader
1763                .file_size(
1764                    "file",
1765                    0,
1766                    TriesInfo::new(&have_tried, 3),
1767                    &Default::default(),
1768                    |_| async {},
1769                )
1770                .await
1771            {
1772                Result3::NoMoreTries(_) => {}
1773                _ => unreachable!(),
1774            }
1775
1776            let have_tried = AtomicUsize::new(0);
1777            match downloader
1778                .download(
1779                    "file",
1780                    0,
1781                    TriesInfo::new(&have_tried, 3),
1782                    &Default::default(),
1783                    |_| async {},
1784                )
1785                .await
1786            {
1787                Result3::NoMoreTries(_) => {}
1788                _ => unreachable!(),
1789            }
1790
1791            assert_eq!(counter.load(Relaxed), 3 * 3);
1792
1793            sleep(Duration::from_secs(5)).await;
1794            {
1795                let record = records_map
1796                    .read_async(
1797                        &DotRecordKey::new(DotType::Http, ApiName::IoGetfile),
1798                        |_, record| record.to_owned(),
1799                    )
1800                    .await
1801                    .unwrap();
1802                assert_eq!(record.success_count(), Some(0));
1803                assert_eq!(record.failed_count(), Some(9));
1804            }
1805            {
1806                let record = records_map
1807                    .read_async(&DotRecordKey::punished(), |_, record| record.to_owned())
1808                    .await
1809                    .unwrap();
1810                assert_eq!(record.punished_count(), Some(4));
1811            }
1812        });
1813        Ok(())
1814    }
1815
1816    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1817    async fn test_download_file_3() -> anyhow::Result<()> {
1818        env_logger::try_init().ok();
1819        clear_cache().await?;
1820
1821        let counter = Arc::new(AtomicUsize::new(0));
1822        let routes = {
1823            let counter = counter.to_owned();
1824            path!("file").map(move || {
1825                counter.fetch_add(1, Relaxed);
1826                let mut resp = Response::new("12345".into());
1827                *resp.status_mut() = StatusCode::BAD_REQUEST;
1828                resp
1829            })
1830        };
1831        starts_with_server!(addr, routes, {
1832            let io_urls = vec![format!("http://{}", addr)];
1833
1834            let downloader = AsyncRangeReaderBuilder::from(
1835                BaseRangeReaderBuilder::new(
1836                    "bucket".to_owned(),
1837                    "file".to_owned(),
1838                    get_credential(),
1839                    io_urls,
1840                )
1841                .use_getfile_api(false)
1842                .normalize_key(true),
1843            )
1844            .build();
1845
1846            let have_tried = AtomicUsize::new(0);
1847            match downloader
1848                .exist(
1849                    "file",
1850                    0,
1851                    TriesInfo::new(&have_tried, 3),
1852                    &Default::default(),
1853                    |_| async {},
1854                )
1855                .await
1856            {
1857                Result3::Err(_) => {}
1858                _ => unreachable!(),
1859            }
1860
1861            let have_tried = AtomicUsize::new(0);
1862            match downloader
1863                .file_size(
1864                    "file",
1865                    0,
1866                    TriesInfo::new(&have_tried, 3),
1867                    &Default::default(),
1868                    |_| async {},
1869                )
1870                .await
1871            {
1872                Result3::Err(_) => {}
1873                _ => unreachable!(),
1874            }
1875
1876            let have_tried = AtomicUsize::new(0);
1877            match downloader
1878                .download(
1879                    "file",
1880                    0,
1881                    TriesInfo::new(&have_tried, 3),
1882                    &Default::default(),
1883                    |_| async {},
1884                )
1885                .await
1886            {
1887                Result3::Err(_) => {}
1888                _ => unreachable!(),
1889            }
1890            assert_eq!(counter.load(Relaxed), 3);
1891        });
1892        Ok(())
1893    }
1894
1895    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1896    async fn test_download_file_4() -> anyhow::Result<()> {
1897        env_logger::try_init().ok();
1898        clear_cache().await?;
1899
1900        let routes = { path!("file").map(|| Response::new("1234567890".into())) };
1901        starts_with_server!(addr, routes, {
1902            let io_urls = vec![format!("http://{}", addr)];
1903
1904            let downloader = AsyncRangeReaderBuilder::from(
1905                BaseRangeReaderBuilder::new(
1906                    "bucket".to_owned(),
1907                    "file".to_owned(),
1908                    get_credential(),
1909                    io_urls,
1910                )
1911                .use_getfile_api(false)
1912                .normalize_key(true),
1913            )
1914            .build();
1915
1916            let have_tried = AtomicUsize::new(0);
1917            match downloader
1918                .exist(
1919                    "file",
1920                    0,
1921                    TriesInfo::new(&have_tried, 1),
1922                    &Default::default(),
1923                    |_| async {},
1924                )
1925                .await
1926            {
1927                Result3::Ok(existed) => {
1928                    assert!(existed);
1929                }
1930                _ => unreachable!(),
1931            }
1932
1933            let have_tried = AtomicUsize::new(0);
1934            match downloader
1935                .file_size(
1936                    "file",
1937                    0,
1938                    TriesInfo::new(&have_tried, 1),
1939                    &Default::default(),
1940                    |_| async {},
1941                )
1942                .await
1943            {
1944                Result3::Ok(file_size) => {
1945                    assert_eq!(file_size, 10);
1946                }
1947                _ => unreachable!(),
1948            }
1949
1950            let have_tried = AtomicUsize::new(0);
1951            match downloader
1952                .download(
1953                    "file",
1954                    0,
1955                    TriesInfo::new(&have_tried, 1),
1956                    &Default::default(),
1957                    |_| async {},
1958                )
1959                .await
1960            {
1961                Result3::Ok(buf) => {
1962                    assert_eq!(buf, b"1234567890");
1963                }
1964                _ => unreachable!(),
1965            }
1966        });
1967        Ok(())
1968    }
1969
1970    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1971    async fn test_download_range() -> anyhow::Result<()> {
1972        env_logger::try_init().ok();
1973        clear_cache().await?;
1974
1975        let routes = {
1976            path!("file")
1977                .and(header::value(RANGE.as_str()))
1978                .map(move |range: HeaderValue| {
1979                    assert_eq!(range.to_str().unwrap(), "bytes=0-4,5-9");
1980                    let mut response_body = LazyMultipart::new();
1981                    response_body.add_stream(
1982                        "",
1983                        Cursor::new(b"12345"),
1984                        None,
1985                        None,
1986                        Some("bytes 0-4/10"),
1987                    );
1988                    response_body.add_stream(
1989                        "",
1990                        Cursor::new(b"67890"),
1991                        None,
1992                        None,
1993                        Some("bytes 5-9/19"),
1994                    );
1995                    let mut fields = response_body.prepare().unwrap();
1996                    let mut buffer = Vec::new();
1997                    fields.read_to_end(&mut buffer).unwrap();
1998                    let mut response = Response::new(buffer.into());
1999                    *response.status_mut() = StatusCode::PARTIAL_CONTENT;
2000                    response.headers_mut().insert(
2001                        CONTENT_TYPE,
2002                        ("multipart/form-data; boundary=".to_owned() + fields.boundary())
2003                            .parse()
2004                            .unwrap(),
2005                    );
2006                    response
2007                })
2008        };
2009
2010        starts_with_server!(addr, routes, {
2011            let io_urls = vec![format!("http://{}", addr)];
2012            let downloader = AsyncRangeReaderBuilder::from(
2013                BaseRangeReaderBuilder::new(
2014                    "bucket".to_owned(),
2015                    "file".to_owned(),
2016                    get_credential(),
2017                    io_urls,
2018                )
2019                .use_getfile_api(false)
2020                .normalize_key(true),
2021            )
2022            .build();
2023
2024            let ranges = [(0, 5), (5, 5)];
2025            let have_tried = AtomicUsize::new(0);
2026            match downloader
2027                .read_multi_ranges(
2028                    &ranges,
2029                    "file",
2030                    0,
2031                    TriesInfo::new(&have_tried, 1),
2032                    &Default::default(),
2033                    |_| async {},
2034                )
2035                .await
2036            {
2037                Result3::Ok(parts) => {
2038                    assert_eq!(parts.len(), 2);
2039                    assert_eq!(&parts.get(1).unwrap().data, b"12345");
2040                    assert_eq!(parts.get(1).unwrap().range, (0, 5));
2041                    assert_eq!(&parts.first().unwrap().data, b"67890");
2042                    assert_eq!(parts.first().unwrap().range, (5, 5));
2043                }
2044                _ => unreachable!(),
2045            }
2046        });
2047        Ok(())
2048    }
2049
2050    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2051    async fn test_download_range_2() -> anyhow::Result<()> {
2052        env_logger::try_init().ok();
2053        clear_cache().await?;
2054
2055        let routes = {
2056            path!("file")
2057                .and(header::value(RANGE.as_str()))
2058                .map(move |range: HeaderValue| {
2059                    assert_eq!(range.to_str().unwrap(), "bytes=0-4,5-9");
2060                    "12345678901357924680"
2061                })
2062        };
2063
2064        starts_with_server!(addr, routes, {
2065            let io_urls = vec![format!("http://{}", addr)];
2066            let downloader = AsyncRangeReaderBuilder::from(
2067                BaseRangeReaderBuilder::new(
2068                    "bucket".to_owned(),
2069                    "file".to_owned(),
2070                    get_credential(),
2071                    io_urls,
2072                )
2073                .use_getfile_api(false)
2074                .normalize_key(true),
2075            )
2076            .build();
2077
2078            let ranges = [(0, 5), (5, 5)];
2079            let have_tried = AtomicUsize::new(0);
2080            match downloader
2081                .read_multi_ranges(
2082                    &ranges,
2083                    "file",
2084                    0,
2085                    TriesInfo::new(&have_tried, 1),
2086                    &Default::default(),
2087                    |_| async {},
2088                )
2089                .await
2090            {
2091                Result3::Ok(parts) => {
2092                    assert_eq!(parts.len(), 2);
2093                    assert_eq!(&parts.first().unwrap().data, b"12345");
2094                    assert_eq!(parts.first().unwrap().range, (0, 5));
2095                    assert_eq!(&parts.get(1).unwrap().data, b"67890");
2096                    assert_eq!(parts.get(1).unwrap().range, (5, 5));
2097                }
2098                _ => unreachable!(),
2099            }
2100        });
2101
2102        Ok(())
2103    }
2104
2105    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2106    async fn test_download_range_3() -> anyhow::Result<()> {
2107        env_logger::try_init().ok();
2108        clear_cache().await?;
2109
2110        let counter = Arc::new(AtomicUsize::new(0));
2111        let routes = {
2112            let counter = counter.to_owned();
2113            path!("file")
2114                .and(header::value(RANGE.as_str()))
2115                .map(move |range: HeaderValue| {
2116                    counter.fetch_add(1, Relaxed);
2117                    assert_eq!(range.to_str().unwrap(), "bytes=0-4,5-9");
2118                    let mut resp = Response::new("12345".into());
2119                    *resp.status_mut() = StatusCode::NOT_IMPLEMENTED;
2120                    resp
2121                })
2122        };
2123
2124        starts_with_server!(addr, routes, {
2125            let c = counter.to_owned();
2126            spawn(async move {
2127                let io_urls = vec![format!("http://{}", addr)];
2128                let downloader = AsyncRangeReaderBuilder::from(
2129                    BaseRangeReaderBuilder::new(
2130                        "bucket".to_owned(),
2131                        "file".to_owned(),
2132                        get_credential(),
2133                        io_urls,
2134                    )
2135                    .use_getfile_api(false)
2136                    .normalize_key(true),
2137                )
2138                .build();
2139
2140                let ranges = [(0, 5), (5, 5)];
2141                let have_tried = AtomicUsize::new(0);
2142                match downloader
2143                    .read_multi_ranges(
2144                        &ranges,
2145                        "file",
2146                        0,
2147                        TriesInfo::new(&have_tried, 3),
2148                        &Default::default(),
2149                        |_| async {},
2150                    )
2151                    .await
2152                {
2153                    Result3::NoMoreTries(..) => {}
2154                    _ => unreachable!(),
2155                }
2156                assert_eq!(c.load(Relaxed), 3);
2157            })
2158            .await?;
2159
2160            let c = counter.to_owned();
2161            spawn(async move {
2162                let io_urls = vec![format!("http://{}", addr)];
2163                let downloader = AsyncRangeReaderBuilder::from(
2164                    BaseRangeReaderBuilder::new(
2165                        "bucket".to_owned(),
2166                        "/file".to_owned(),
2167                        get_credential(),
2168                        io_urls,
2169                    )
2170                    .use_getfile_api(false),
2171                )
2172                .build();
2173
2174                let ranges = [(0, 5), (5, 5)];
2175                let have_tried = AtomicUsize::new(0);
2176                match downloader
2177                    .read_multi_ranges(
2178                        &ranges,
2179                        "/file",
2180                        0,
2181                        TriesInfo::new(&have_tried, 3),
2182                        &Default::default(),
2183                        |_| async {},
2184                    )
2185                    .await
2186                {
2187                    Result3::NoMoreTries(..) => {}
2188                    _ => unreachable!(),
2189                }
2190                assert_eq!(c.load(Relaxed), 6);
2191            })
2192            .await?;
2193        });
2194
2195        Ok(())
2196    }
2197
2198    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2199    async fn test_download_range_4() -> anyhow::Result<()> {
2200        env_logger::try_init().ok();
2201        clear_cache().await?;
2202
2203        let routes = {
2204            path!("file")
2205                .and(header::value(RANGE.as_str()))
2206                .map(move |range: HeaderValue| {
2207                    assert_eq!(range.to_str().unwrap(), "bytes=0-4,5-9");
2208                    let mut response_body = LazyMultipart::new();
2209                    response_body.add_stream(
2210                        "",
2211                        Cursor::new(b"12345"),
2212                        None,
2213                        None,
2214                        Some("bytes 0-4/6"),
2215                    );
2216                    response_body.add_stream(
2217                        "",
2218                        Cursor::new(b"6"),
2219                        None,
2220                        None,
2221                        Some("bytes 5-5/6"),
2222                    );
2223                    let mut fields = response_body.prepare().unwrap();
2224                    let mut buffer = Vec::new();
2225                    fields.read_to_end(&mut buffer).unwrap();
2226                    let mut response = Response::new(buffer.into());
2227                    *response.status_mut() = StatusCode::PARTIAL_CONTENT;
2228                    response.headers_mut().insert(
2229                        CONTENT_TYPE,
2230                        ("multipart/form-data; boundary=".to_owned() + fields.boundary())
2231                            .parse()
2232                            .unwrap(),
2233                    );
2234                    response
2235                })
2236        };
2237
2238        starts_with_server!(addr, routes, {
2239            let io_urls = vec![format!("http://{}", addr)];
2240            let downloader = AsyncRangeReaderBuilder::from(
2241                BaseRangeReaderBuilder::new(
2242                    "bucket".to_owned(),
2243                    "file".to_owned(),
2244                    get_credential(),
2245                    io_urls,
2246                )
2247                .use_getfile_api(false)
2248                .normalize_key(true),
2249            )
2250            .build();
2251
2252            let ranges = [(0, 5), (5, 5)];
2253            let have_tried = AtomicUsize::new(0);
2254            match downloader
2255                .read_multi_ranges(
2256                    &ranges,
2257                    "file",
2258                    0,
2259                    TriesInfo::new(&have_tried, 1),
2260                    &Default::default(),
2261                    |_| async {},
2262                )
2263                .await
2264            {
2265                Result3::Ok(parts) => {
2266                    assert_eq!(parts.len(), 2);
2267                    assert_eq!(&parts.get(1).unwrap().data, b"12345");
2268                    assert_eq!(parts.get(1).unwrap().range, (0, 5));
2269                    assert_eq!(&parts.first().unwrap().data, b"6");
2270                    assert_eq!(parts.first().unwrap().range, (5, 1));
2271                }
2272                _ => unreachable!(),
2273            }
2274        });
2275
2276        Ok(())
2277    }
2278
2279    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2280    async fn test_download_range_5() -> anyhow::Result<()> {
2281        env_logger::try_init().ok();
2282        clear_cache().await?;
2283
2284        let routes = {
2285            path!("file")
2286                .and(header::value(RANGE.as_str()))
2287                .map(move |range: HeaderValue| {
2288                    assert_eq!(range.to_str().unwrap(), "bytes=0-4,5-5");
2289                    "1234"
2290                })
2291        };
2292
2293        starts_with_server!(addr, routes, {
2294            let io_urls = vec![format!("http://{}", addr)];
2295            let downloader = AsyncRangeReaderBuilder::from(
2296                BaseRangeReaderBuilder::new(
2297                    "bucket".to_owned(),
2298                    "file".to_owned(),
2299                    get_credential(),
2300                    io_urls,
2301                )
2302                .use_getfile_api(false)
2303                .normalize_key(true),
2304            )
2305            .build();
2306
2307            let ranges = [(0, 5), (5, 1)];
2308            let have_tried = AtomicUsize::new(0);
2309            match downloader
2310                .read_multi_ranges(
2311                    &ranges,
2312                    "file",
2313                    0,
2314                    TriesInfo::new(&have_tried, 1),
2315                    &Default::default(),
2316                    |_| async {},
2317                )
2318                .await
2319            {
2320                IoResult3::Ok(parts) => {
2321                    assert_eq!(parts.len(), 1);
2322                    assert_eq!(&parts.first().unwrap().data, b"1234");
2323                    assert_eq!(parts.first().unwrap().range, (0, 4));
2324                }
2325                _ => unreachable!(),
2326            }
2327        });
2328
2329        Ok(())
2330    }
2331
2332    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2333    async fn test_download_range_6() -> anyhow::Result<()> {
2334        env_logger::try_init().ok();
2335        clear_cache().await?;
2336
2337        let routes = {
2338            path!("file")
2339                .and(header::value(RANGE.as_str()))
2340                .map(move |range: HeaderValue| {
2341                    assert_eq!(range.to_str().unwrap(), "bytes=0-3");
2342                    let mut response = Response::new("123".into());
2343                    response
2344                        .headers_mut()
2345                        .insert(CONTENT_RANGE, "bytes 0-3/3".parse().unwrap());
2346                    response
2347                })
2348        };
2349
2350        starts_with_server!(addr, routes, {
2351            let io_urls = vec![format!("http://{}", addr)];
2352            let downloader = AsyncRangeReaderBuilder::from(
2353                BaseRangeReaderBuilder::new(
2354                    "bucket".to_owned(),
2355                    "file".to_owned(),
2356                    get_credential(),
2357                    io_urls,
2358                )
2359                .use_getfile_api(false)
2360                .normalize_key(true),
2361            )
2362            .build();
2363
2364            let ranges = [(0, 4)];
2365            let have_tried = AtomicUsize::new(0);
2366            match downloader
2367                .read_multi_ranges(
2368                    &ranges,
2369                    "file",
2370                    0,
2371                    TriesInfo::new(&have_tried, 1),
2372                    &Default::default(),
2373                    |_| async {},
2374                )
2375                .await
2376            {
2377                Result3::Ok(parts) => {
2378                    assert_eq!(parts.len(), 1);
2379                    assert_eq!(&parts.first().unwrap().data, b"123");
2380                    assert_eq!(parts.first().unwrap().range, (0, 3));
2381                }
2382                _ => unreachable!(),
2383            }
2384        });
2385
2386        Ok(())
2387    }
2388
2389    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2390    async fn test_update_hosts() -> anyhow::Result<()> {
2391        env_logger::try_init().ok();
2392        clear_cache().await?;
2393
2394        let routes = { path!("file").map(move || Response::new("12345".into())) };
2395        starts_with_server!(io_addr, uc_addr, routes, {
2396            let io_urls = vec!["http://fakedomain:12345".to_owned()];
2397            let uc_urls = vec![format!("http://{}", uc_addr)];
2398            let downloader = AsyncRangeReaderBuilder::from(
2399                BaseRangeReaderBuilder::new(
2400                    "bucket".to_owned(),
2401                    "file".to_owned(),
2402                    get_credential(),
2403                    io_urls.to_owned(),
2404                )
2405                .uc_urls(uc_urls)
2406                .use_getfile_api(false)
2407                .normalize_key(true),
2408            )
2409            .build();
2410
2411            assert_eq!(downloader.io_urls().await, io_urls);
2412            assert!(downloader.update_urls().await);
2413            assert_eq!(
2414                downloader.io_urls().await,
2415                vec![format!("http://{}", io_addr)]
2416            );
2417            let have_tried = AtomicUsize::new(0);
2418            match downloader
2419                .download(
2420                    "file",
2421                    0,
2422                    TriesInfo::new(&have_tried, 1),
2423                    &Default::default(),
2424                    |_| async {},
2425                )
2426                .await
2427            {
2428                Result3::Ok(buf) => {
2429                    assert_eq!(&buf, b"12345")
2430                }
2431                _ => unreachable!(),
2432            }
2433        });
2434        Ok(())
2435    }
2436
2437    #[tokio::test]
2438    async fn test_sign_download_url_with_deadline() -> anyhow::Result<()> {
2439        env_logger::try_init().ok();
2440        clear_cache().await?;
2441
2442        let credential = Credential::new("abcdefghklmnopq", "1234567890");
2443        assert_eq!(
2444            sign_download_url_with_deadline(&credential,
2445                Url::parse("http://www.qiniu.com/?go=1")?,
2446                SystemTime::UNIX_EPOCH + Duration::from_secs(1_234_567_890 + 3600),
2447            )?,
2448            "http://www.qiniu.com/?go=1&e=1234571490&token=abcdefghklmnopq:KjQtlGAkEOhSwtFjJfYtYa2-reE=",
2449        );
2450        Ok(())
2451    }
2452
2453    fn get_credential() -> Credential {
2454        Credential::new("1234567890", "abcdefghijk")
2455    }
2456
2457    async fn clear_cache() -> IoResult<()> {
2458        let cache_file_path = cache_dir_path_of(CACHE_FILE_NAME).await?;
2459        remove_file(&cache_file_path).await.or_else(|err| {
2460            if err.kind() == IoErrorKind::NotFound {
2461                Ok(())
2462            } else {
2463                Err(err)
2464            }
2465        })?;
2466        let dot_file_path = cache_dir_path_of(DOT_FILE_NAME).await?;
2467        remove_file(&dot_file_path).await.or_else(|err| {
2468            if err.kind() == IoErrorKind::NotFound {
2469                Ok(())
2470            } else {
2471                Err(err)
2472            }
2473        })?;
2474        Ok(())
2475    }
2476}