1#![allow(clippy::too_many_arguments)]
2
3use super::{
4 super::{
5 base::{credential::Credential, download::RangeReaderBuilder as BaseRangeReaderBuilder},
6 config::{build_range_reader_builder_from_config, Config, Timeouts},
7 },
8 dot::{ApiName, DotType, Dotter},
9 host_selector::{HostInfo, HostSelector, HostSelectorBuilder},
10 query::HostsQuerier,
11 req_id::{get_req_id2, REQUEST_ID_HEADER},
12};
13use async_once_cell::Lazy as AsyncLazy;
14use futures::{AsyncReadExt, TryStreamExt};
15use hyper::HeaderMap;
16use log::{debug, info, warn};
17use mime::{Mime, BOUNDARY};
18use multer::Multipart;
19use reqwest::{
20 header::{HeaderValue, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, RANGE},
21 Client as HttpClient, Error as ReqwestError, Method, RequestBuilder as HttpRequestBuilder,
22 Response as HttpResponse, StatusCode, Url,
23};
24use std::{
25 collections::HashSet,
26 error::Error as StdError,
27 fmt::{self, Debug},
28 future::Future,
29 io::{Cursor, Error as IoError, ErrorKind as IoErrorKind, Result as IoResult},
30 mem::take,
31 ops::Deref,
32 sync::{
33 atomic::{AtomicUsize, Ordering::Relaxed},
34 Arc,
35 },
36 time::{Duration, Instant, SystemTime, SystemTimeError, UNIX_EPOCH},
37};
38use tap::prelude::*;
39use text_io::{try_scan as try_scan_text, Error as TextIoError};
40use tokio::{
41 io::{copy as io_copy, AsyncWrite},
42 spawn,
43 sync::Mutex,
44};
45use tokio_util::{compat::FuturesAsyncReadCompatExt, either::Either};
46
47pub fn sign_download_url_with_deadline(
54 c: &Credential,
55 url: Url,
56 deadline: SystemTime,
57) -> Result<String, SystemTimeError> {
58 let mut signed_url = url.to_string();
59
60 if signed_url.contains('?') {
61 signed_url.push_str("&e=");
62 } else {
63 signed_url.push_str("?e=");
64 }
65
66 let deadline = deadline.duration_since(UNIX_EPOCH)?.as_secs().to_string();
67 signed_url.push_str(&deadline);
68 let signature = c.sign(signed_url.as_bytes());
69 signed_url.push_str("&token=");
70 signed_url.push_str(&signature);
71 Ok(signed_url)
72}
73
74pub fn sign_download_url_with_lifetime(
81 c: &Credential,
82 url: Url,
83 lifetime: Duration,
84) -> Result<String, SystemTimeError> {
85 let deadline = SystemTime::now() + lifetime;
86 sign_download_url_with_deadline(c, url, deadline)
87}
88
89#[derive(Debug)]
90pub(super) struct AsyncRangeReaderBuilder(BaseRangeReaderBuilder);
91
92impl From<BaseRangeReaderBuilder> for AsyncRangeReaderBuilder {
93 fn from(builder: BaseRangeReaderBuilder) -> Self {
94 Self(builder)
95 }
96}
97
98impl From<AsyncRangeReaderBuilder> for BaseRangeReaderBuilder {
99 fn from(builder: AsyncRangeReaderBuilder) -> Self {
100 builder.0
101 }
102}
103
104impl AsyncRangeReaderBuilder {
105 pub(super) fn take_key(&mut self) -> String {
106 take(&mut self.0.key)
107 }
108
109 pub(super) fn build(self) -> AsyncRangeReader {
110 AsyncRangeReader(Arc::new(AsyncLazy::new(Box::pin(async move {
111 self.build_inner().await
112 }))))
113 }
114
115 async fn build_inner(self) -> Arc<AsyncRangeReaderInner> {
116 let builder = self.0;
117 let http_client =
118 Timeouts::new(builder.base_timeout, builder.dial_timeout).async_http_client();
119 let dotter = Dotter::new(
120 http_client.to_owned(),
121 builder.credential.to_owned(),
122 builder.bucket.to_owned(),
123 builder.monitor_urls,
124 builder.dot_interval,
125 builder.max_dot_buffer_size,
126 builder.dot_tries,
127 builder.punish_duration,
128 builder.max_punished_times,
129 builder.max_punished_hosts_percent,
130 builder.base_timeout,
131 )
132 .await;
133
134 let params = HostSelectorParams {
135 update_interval: builder.update_interval,
136 punish_duration: builder.punish_duration,
137 max_punished_times: builder.max_punished_times,
138 max_punished_hosts_percent: builder.max_punished_hosts_percent,
139 base_timeout: builder.base_timeout,
140 };
141
142 let io_querier = if builder.uc_urls.is_empty() {
143 None
144 } else {
145 Some(HostsQuerier::new(
146 make_uc_host_selector(builder.uc_urls, ¶ms).await,
147 builder.uc_tries,
148 dotter.to_owned(),
149 http_client.to_owned(),
150 ))
151 };
152 let io_selector = make_io_selector(
153 builder.io_urls,
154 io_querier,
155 builder.credential.access_key().to_owned(),
156 builder.bucket.to_owned(),
157 builder.use_https,
158 ¶ms,
159 )
160 .await;
161
162 return Arc::new(AsyncRangeReaderInner {
163 io_selector,
164 dotter,
165 http_client,
166 credential: builder.credential,
167 bucket: builder.bucket,
168 use_getfile_api: builder.use_getfile_api,
169 normalize_key: builder.normalize_key,
170 use_https: builder.use_https,
171 private_url_lifetime: builder.private_url_lifetime,
172 });
173
174 #[derive(Clone, Debug)]
175 struct HostSelectorParams {
176 update_interval: Option<Duration>,
177 punish_duration: Option<Duration>,
178 max_punished_times: Option<usize>,
179 max_punished_hosts_percent: Option<u8>,
180 base_timeout: Option<Duration>,
181 }
182
183 impl HostSelectorParams {
184 fn set_builder(&self, mut builder: HostSelectorBuilder) -> HostSelectorBuilder {
185 if let Some(update_interval) = self.update_interval {
186 builder = builder.update_interval(update_interval);
187 }
188 if let Some(punish_duration) = self.punish_duration {
189 builder = builder.punish_duration(punish_duration);
190 }
191 if let Some(max_punished_times) = self.max_punished_times {
192 builder = builder.max_punished_times(max_punished_times);
193 }
194 if let Some(max_punished_hosts_percent) = self.max_punished_hosts_percent {
195 builder = builder.max_punished_hosts_percent(max_punished_hosts_percent);
196 }
197 if let Some(base_timeout) = self.base_timeout {
198 builder = builder.base_timeout(base_timeout);
199 }
200 builder
201 }
202 }
203
204 async fn make_uc_host_selector(
205 uc_urls: Vec<String>,
206 params: &HostSelectorParams,
207 ) -> HostSelector {
208 params
209 .set_builder(HostSelector::builder(uc_urls))
210 .build()
211 .await
212 }
213
214 async fn make_io_selector(
215 io_urls: Vec<String>,
216 io_querier: Option<HostsQuerier>,
217 access_key: String,
218 bucket: String,
219 use_https: bool,
220 params: &HostSelectorParams,
221 ) -> HostSelector {
222 let builder = HostSelector::builder(io_urls)
223 .update_callback(Some(Box::new(move || {
224 let io_querier = io_querier.to_owned();
225 let access_key = access_key.to_owned();
226 let bucket = bucket.to_owned();
227 Box::pin(async move {
228 if let Some(io_querier) = io_querier.as_ref() {
229 io_querier
230 .query_for_io_urls(&access_key, &bucket, use_https)
231 .await
232 } else {
233 Ok(vec![])
234 }
235 })
236 })))
237 .should_punish_callback(Some(Box::new(|error| {
238 let kind = error.kind();
239 Box::pin(async move { !matches!(kind, IoErrorKind::InvalidData) })
240 })));
241 params.set_builder(builder).build().await
242 }
243 }
244
245 pub(crate) fn from_config(key: String, config: &Config) -> Self {
246 build_range_reader_builder_from_config(key, config).into()
247 }
248}
249
250#[derive(Clone)]
251pub(super) struct AsyncRangeReader(Arc<AsyncLazy<Arc<AsyncRangeReaderInner>>>);
252
253#[derive(Debug)]
254struct AsyncRangeReaderInner {
255 io_selector: HostSelector,
256 dotter: Dotter,
257 credential: Credential,
258 http_client: Arc<HttpClient>,
259 bucket: String,
260 use_getfile_api: bool,
261 normalize_key: bool,
262 use_https: bool,
263 private_url_lifetime: Option<Duration>,
264}
265
266impl AsyncRangeReader {
267 pub(super) async fn dot(
268 &self,
269 dot_type: DotType,
270 api_name: ApiName,
271 successful: bool,
272 elapsed_duration: Duration,
273 ) -> IoResult<()> {
274 let inner = self.0.get().await;
275 inner
276 .dotter
277 .dot(dot_type, api_name, successful, elapsed_duration)
278 .await
279 }
280
281 pub(super) async fn update_urls(&self) -> bool {
282 self.inner().await.io_selector.update_hosts().await
283 }
284
285 pub(super) async fn io_urls(&self) -> Vec<String> {
286 let inner = self.inner().await;
287 return inner
288 .io_selector
289 .hosts()
290 .await
291 .iter()
292 .map(|host| normalize_host(host, inner.use_https))
293 .collect();
294
295 fn normalize_host(host: &str, use_https: bool) -> String {
296 if host.contains("://") {
297 host.to_string()
298 } else if use_https {
299 "https://".to_owned() + host
300 } else {
301 "http://".to_owned() + host
302 }
303 }
304 }
305
306 pub(super) async fn base_timeout(&self) -> Duration {
307 self.inner().await.io_selector.base_timeout()
308 }
309
310 pub(super) async fn increase_timeout_power_by(&self, host: &str, timeout_power: usize) {
311 self.inner()
312 .await
313 .io_selector
314 .increase_timeout_power_by(host, timeout_power)
315 .await
316 }
317
318 pub(super) async fn read_at<F: FnMut(HostInfo) -> Fut, Fut: Future<Output = ()>>(
319 &self,
320 pos: u64,
321 size: u64,
322 key: &str,
323 async_task_id: u32,
324 tries_info: TriesInfo<'_>,
325 trying_hosts: &TryingHosts,
326 on_host_selected: F,
327 ) -> IoResult3<Vec<u8>> {
328 if size == 0 {
329 return Ok(Default::default()).into();
330 }
331 return self.with_retries(
332 key,
333 Method::GET,
334 async_task_id,
335 tries_info,
336 trying_hosts,
337 on_host_selected,
338 |tries, request_builder, req_id, download_url, host_info| {
339 async move {
340 let range = generate_range_header(pos, size);
341 debug!(
342 "{{{}}} [{}] read_at url: {}, req_id: {:?}, range: {}",
343 async_task_id, tries, download_url, req_id, &range
344 );
345 let begin_at = Instant::now();
346 let result = request_builder
347 .header(RANGE, &range)
348 .send()
349 .await;
350 if let Err(err) = &result {
351 self.punish_if_needed(host_info.host(), host_info.timeout_power(), err).await;
352 }
353 let result = result
354 .map_err(io_error_from(IoErrorKind::ConnectionAborted))
355 .and_then(|resp| {
356 if resp.status() != StatusCode::PARTIAL_CONTENT && resp.status() != StatusCode::OK {
357 return Err(unexpected_status_code(&resp));
358 }
359 Ok(resp)
360 })
361 .map(|resp| {
362 let max_size = parse_content_length(&resp).min(size);
363 (resp, max_size)
364 });
365 match result {
366 Ok((resp, max_size)) => {
367 read_response_body(resp, Some(max_size)).await
368 }
369 Err(err) => Err(err),
370 }
371 .tap_ok(|_| {
372 info!(
373 "{{{}}} [{}] read_at ok url: {}, range: {}, req_id: {:?}, elapsed: {:?}",
374 async_task_id,
375 tries,
376 download_url,
377 range,
378 req_id,
379 begin_at.elapsed(),
380 );
381 })
382 .tap_err(|err| {
383 warn!(
384 "{{{}}} [{}] read_at error url: {}, range: {}, error: {}, req_id: {:?}, elapsed: {:?}",
385 async_task_id, tries, download_url, range, err, req_id, begin_at.elapsed(),
386 );
387 })
388 }
389 },
390 )
391 .await;
392
393 fn generate_range_header(pos: u64, size: u64) -> String {
394 format!("bytes={}-{}", pos, pos + size - 1)
395 }
396 }
397
398 pub(super) async fn read_multi_ranges<F: FnMut(HostInfo) -> Fut, Fut: Future<Output = ()>>(
399 &self,
400 ranges: &[(u64, u64)],
401 key: &str,
402 async_task_id: u32,
403 tries_info: TriesInfo<'_>,
404 trying_hosts: &TryingHosts,
405 on_host_selected: F,
406 ) -> IoResult3<Vec<RangePart>> {
407 return self
408 .with_retries(
409 key,
410 Method::GET,
411 async_task_id,
412 tries_info,
413 trying_hosts,
414 on_host_selected,
415 |tries, request_builder, req_id, download_url, host_info| async move {
416 debug!(
417 "{{{}}} [{}] read_multi_ranges url: {}, req_id: {:?}, range counts: {}",
418 async_task_id,
419 tries,
420 download_url,
421 req_id,
422 ranges.len(),
423 );
424 let range = generate_range_header(ranges);
425 let begin_at = Instant::now();
426 let result = request_builder
427 .header(RANGE, &range)
428 .send()
429 .await;
430 if let Err(err) = &result {
431 self.punish_if_needed(host_info.host(), host_info.timeout_power(), err).await;
432 }
433 let result = result.map_err(io_error_from(IoErrorKind::ConnectionAborted));
434 match result {
435 Ok(resp) => {
436 let mut parts = Vec::with_capacity(ranges.len());
437 match resp.status() {
438 StatusCode::OK => {
439 let body = read_response_body(resp, None).await?;
440 for &(from, len) in ranges.iter() {
441 let from = (from as usize).min(body.len());
442 let len = (len as usize).min(body.len() - from);
443 if len > 0 {
444 parts.push(RangePart {
445 data: body[from..(from + len)].to_vec(),
446 range: (from as u64, len as u64),
447 });
448 }
449 }
450 }
451 StatusCode::PARTIAL_CONTENT if ranges.len() > 1 => {
452 let content_type = resp
453 .headers()
454 .get(CONTENT_TYPE)
455 .ok_or_else(new_io_error(
456 IoErrorKind::InvalidInput,
457 "Content-Type must be existed",
458 ))?;
459 let content_type: Mime = content_type
460 .to_str()
461 .map_err(io_error_from(IoErrorKind::InvalidInput))?
462 .parse()
463 .map_err(io_error_from(IoErrorKind::InvalidInput))?;
464 let boundary = content_type.get_param(BOUNDARY).unwrap();
465 let mut multipart =
466 Multipart::new(resp.bytes_stream(), boundary.as_str());
467 while let Some(field) = multipart
468 .next_field()
469 .await
470 .map_err(io_error_from(IoErrorKind::BrokenPipe))?
471 {
472 let (from, to, _) = extract_range_header(field.headers())?;
473 let len = to - from + 1;
474 parts.push(RangePart {
475 data: field
476 .bytes()
477 .await
478 .map(|b| b.to_vec())
479 .map_err(io_error_from(IoErrorKind::BrokenPipe))?,
480 range: (from, len),
481 });
482 }
483 }
484 StatusCode::PARTIAL_CONTENT => {
485 let (from, to, _) = extract_range_header(resp.headers())?;
486 let len = to - from + 1;
487 parts.push(RangePart {
488 data: read_response_body(resp, None).await?,
489 range: (from, len),
490 });
491 }
492 _ => {
493 return Err(unexpected_status_code(&resp));
494 }
495 }
496 Ok(parts)
497 }
498 Err(err) => Err(err),
499 }
500 .tap_ok(|_| {
501 info!(
502 "{{{}}} [{}] read_multi_ranges ok url: {}, req_id: {:?}, elapsed: {:?}",
503 async_task_id, tries, download_url, req_id, begin_at.elapsed(),
504 );
505 })
506 .tap_err(|err| {
507 warn!(
508 "{{{}}} [{}] read_multi_ranges error url: {}, error: {}, req_id: {:?}, elapsed: {:?}",
509 async_task_id, tries, download_url, err, req_id, begin_at.elapsed(),
510 );
511 })
512 },
513 )
514 .await;
515
516 fn generate_range_header(ranges: &[(u64, u64)]) -> String {
517 let range = ranges
518 .iter()
519 .map(|range| {
520 let start = range.0;
521 let end = start + range.1 - 1;
522 format!("{}-{}", start, end)
523 })
524 .collect::<Vec<_>>()
525 .join(",");
526 format!("bytes={}", range)
527 }
528 }
529
530 pub(super) async fn exist<F: FnMut(HostInfo) -> Fut, Fut: Future<Output = ()>>(
531 &self,
532 key: &str,
533 async_task_id: u32,
534 tries_info: TriesInfo<'_>,
535 trying_hosts: &TryingHosts,
536 on_host_selected: F,
537 ) -> IoResult3<bool> {
538 self.with_retries(
539 key,
540 Method::HEAD,
541 async_task_id,
542 tries_info,
543 trying_hosts,
544 on_host_selected,
545 |tries, request_builder, req_id, download_url, host_info| async move {
546 debug!(
547 "{{{}}} [{}] exist url: {}, req_id: {:?}",
548 async_task_id, tries, download_url, req_id
549 );
550 let begin_at = Instant::now();
551 let result = request_builder.send().await;
552 if let Err(err) = &result {
553 self.punish_if_needed(host_info.host(), host_info.timeout_power(), err)
554 .await;
555 }
556 result
557 .map_err(io_error_from(IoErrorKind::ConnectionAborted))
558 .and_then(|resp| match resp.status() {
559 StatusCode::OK => Ok(true),
560 StatusCode::NOT_FOUND => Ok(false),
561 _ => Err(unexpected_status_code(&resp)),
562 })
563 .tap_ok(|_| {
564 info!(
565 "{{{}}} [{}] exist ok url: {}, req_id: {:?}, elapsed: {:?}",
566 async_task_id,
567 tries,
568 download_url,
569 req_id,
570 begin_at.elapsed(),
571 );
572 })
573 .tap_err(|err| {
574 warn!(
575 "{{{}}} [{}] exist error url: {}, error: {}, req_id: {:?}, elapsed: {:?}",
576 async_task_id,
577 tries,
578 download_url,
579 err,
580 req_id,
581 begin_at.elapsed(),
582 );
583 })
584 },
585 )
586 .await
587 }
588
589 pub(super) async fn file_size<F: FnMut(HostInfo) -> Fut, Fut: Future<Output = ()>>(
590 &self,
591 key: &str,
592 async_task_id: u32,
593 tries_info: TriesInfo<'_>,
594 trying_hosts: &TryingHosts,
595 on_host_selected: F,
596 ) -> IoResult3<u64> {
597 self.with_retries(
598 key,
599 Method::HEAD,
600 async_task_id,
601 tries_info,
602 trying_hosts,
603 on_host_selected,
604 |tries, request_builder, req_id, download_url, host_info| async move {
605 debug!(
606 "{{{}}} [{}] file_size url: {}, req_id: {:?}",
607 async_task_id, tries, download_url, req_id
608 );
609 let begin_at = Instant::now();
610 let result = request_builder.send().await;
611 if let Err(err) = &result {
612 self.punish_if_needed(host_info.host(), host_info.timeout_power(), err)
613 .await;
614 }
615 result
616 .map_err(io_error_from(IoErrorKind::ConnectionAborted))
617 .and_then(|resp| {
618 if resp.status() == StatusCode::OK {
619 Ok(parse_content_length(&resp))
620 } else {
621 Err(unexpected_status_code(&resp))
622 }
623 })
624 .tap_ok(|_| {
625 info!(
626 "{{{}}} [{}] file_size ok url: {}, req_id: {:?}, elapsed: {:?}",
627 async_task_id,
628 tries,
629 download_url,
630 req_id,
631 begin_at.elapsed(),
632 );
633 })
634 .tap_err(|err| {
635 warn!(
636 "{{{}}} [{}] file_size error url: {}, error: {}, req_id: {:?}, elapsed: {:?}",
637 async_task_id,
638 tries,
639 download_url,
640 err,
641 req_id,
642 begin_at.elapsed(),
643 );
644 })
645 },
646 )
647 .await
648 }
649
650 pub(super) async fn download<F: FnMut(HostInfo) -> Fut, Fut: Future<Output = ()>>(
651 &self,
652 key: &str,
653 async_task_id: u32,
654 tries_info: TriesInfo<'_>,
655 trying_hosts: &TryingHosts,
656 mut on_host_selected: F,
657 ) -> IoResult3<Vec<u8>> {
658 let mut result = Vec::new();
659 loop {
660 let (chunk, mut completed) = match self
661 ._download(
662 key,
663 async_task_id,
664 result.len() as u64,
665 tries_info,
666 trying_hosts,
667 &mut on_host_selected,
668 )
669 .await
670 {
671 Result3::Ok(result) => result,
672 Result3::Err(err) => return Result3::Err(err),
673 Result3::NoMoreTries(err) => return Result3::NoMoreTries(err),
674 };
675 if result.is_empty() {
676 result = chunk;
677 } else if chunk.is_empty() {
678 completed = true;
679 } else {
680 result.extend(chunk);
681 }
682 if completed {
683 return Result3::Ok(result);
684 } else {
685 info!("Early EOF Response Body is detected in {}::download(), will start a new GET request for the rest body", module_path!());
686 }
687 }
688 }
689
690 async fn _download<F: FnMut(HostInfo) -> Fut, Fut: Future<Output = ()>>(
691 &self,
692 key: &str,
693 async_task_id: u32,
694 init_from: u64,
695 tries_info: TriesInfo<'_>,
696 trying_hosts: &TryingHosts,
697 on_host_selected: F,
698 ) -> IoResult3<(Vec<u8>, bool)> {
699 let mut buf = Vec::new();
700 let buf_cursor = Arc::new(Mutex::new(Cursor::new(&mut buf)));
701 let result = self
702 .with_retries(
703 key,
704 Method::GET,
705 async_task_id,
706 tries_info,
707 trying_hosts,
708 on_host_selected,
709 move |tries, mut request_builder, req_id, download_url, host_info| {
710 let buf_cursor = buf_cursor.to_owned();
711 async move {
712 let mut buf_cursor = buf_cursor.lock().await;
713 let start_from = init_from + buf_cursor.position();
714 debug!(
715 "{{{}}} [{}] download_to url: {}, req_id: {:?}, start_from: {}",
716 async_task_id, tries, download_url, req_id, start_from
717 );
718 let begin_at = Instant::now();
719 if start_from > 0 {
720 request_builder =
721 request_builder.header(RANGE, format!("bytes={}-", start_from));
722 }
723 let result = request_builder
724 .send()
725 .await;
726 if let Err(err) = &result {
727 self.punish_if_needed(
728 host_info.host(),
729 host_info.timeout_power(),
730 err,
731 ).await;
732 }
733 let result = result.map_err(io_error_from(IoErrorKind::ConnectionAborted));
734 match result {
735 Ok(resp) => {
736 let content_length = parse_content_length(&resp);
737 write_to_writer(resp, &mut *buf_cursor).await.map(|actually_downloaded| {
738 if let Some(actually_downloaded) = actually_downloaded {
739 (actually_downloaded, actually_downloaded < content_length)
740 } else {
741 (0, false)
742 }
743 })
744 },
745 Err(err) => Err(err),
746 }
747 .tap_ok(|(downloaded, incompleted)| {
748 info!(
749 "{{{}}} [{}] download ok url: {}, start_from: {}, downloaded: {}, completed: {:?}, req_id: {:?}, elapsed: {:?}",
750 async_task_id, tries, download_url, start_from, downloaded, !incompleted, req_id, begin_at.elapsed(),
751 );
752 })
753 .tap_err(|err| {
754 warn!(
755 "{{{}}} [{}] download error url: {}, start_from: {}, error: {}, req_id: {:?}, elapsed: {:?}",
756 async_task_id, tries, download_url, start_from, err, req_id, begin_at.elapsed(),
757 );
758 })
759 }
760 },
761 )
762 .await;
763 return match result {
764 Result3::Ok((_, incompleted)) => Ok((buf, !incompleted)).into(),
765 Result3::Err(err) => Result3::Err(err),
766 Result3::NoMoreTries(err) => Result3::NoMoreTries(err),
767 };
768
769 async fn write_to_writer<W: AsyncWrite + Unpin>(
770 resp: HttpResponse,
771 mut writer: W,
772 ) -> IoResult<Option<u64>> {
773 if resp.status() == StatusCode::RANGE_NOT_SATISFIABLE {
774 Ok(None)
775 } else if resp.status() != StatusCode::OK
776 && resp.status() != StatusCode::PARTIAL_CONTENT
777 {
778 Err(unexpected_status_code(&resp))
779 } else {
780 let body = resp
781 .bytes_stream()
782 .map_err(io_error_from(IoErrorKind::BrokenPipe));
783 io_copy(&mut body.into_async_read().compat(), &mut writer)
784 .await
785 .map(Some)
786 }
787 }
788 }
789
790 pub(super) async fn read_last_bytes<F: FnMut(HostInfo) -> Fut, Fut: Future<Output = ()>>(
791 &self,
792 size: u64,
793 key: &str,
794 async_task_id: u32,
795 tries_info: TriesInfo<'_>,
796 trying_hosts: &TryingHosts,
797 on_host_selected: F,
798 ) -> IoResult3<(Vec<u8>, u64)> {
799 return self.with_retries(
800 key,
801 Method::GET,
802 async_task_id,
803 tries_info,
804 trying_hosts,
805 on_host_selected,
806 move |tries, request_builder, req_id, download_url, host_info| async move {
807 debug!(
808 "{{{}}} [{}] read_last_bytes url: {}, req_id: {:?}, len: {}",
809 async_task_id, tries, download_url, req_id, size,
810 );
811 let begin_at = Instant::now();
812 let result = request_builder
813 .header(RANGE, format!("bytes=-{}", size))
814 .send()
815 .await;
816 if let Err(err) = &result {
817 self.punish_if_needed(host_info.host(), host_info.timeout_power(), err).await;
818 }
819 let result = result.map_err(io_error_from(IoErrorKind::ConnectionAborted))
820 .and_then(|resp| {
821 if resp.status() == StatusCode::PARTIAL_CONTENT {
822 Ok(resp)
823 } else {
824 Err(unexpected_status_code(&resp))
825 }
826 });
827 match result {
828 Ok(resp) => get_response_body_and_total_size(resp, size).await,
829 Err(err) => Err(err),
830 }
831 .tap_ok(|_| {
832 info!(
833 "{{{}}} [{}] download ok url: {}, len: {}, req_id: {:?}, elapsed: {:?}",
834 async_task_id, tries, download_url, size, req_id, begin_at.elapsed(),
835 );
836 })
837 .tap_err(|err| {
838 warn!(
839 "{{{}}} [{}] download error url: {}, len: {}, error: {}, req_id: {:?}, elapsed: {:?}",
840 async_task_id, tries, download_url, size, err, req_id, begin_at.elapsed(),
841 );
842 })
843 }
844 )
845 .await;
846
847 async fn get_response_body_and_total_size(
848 resp: HttpResponse,
849 limit: u64,
850 ) -> IoResult<(Vec<u8>, u64)> {
851 let (_, _, total_size) = extract_range_header(resp.headers())?;
852 let last_bytes = read_response_body(resp, Some(limit)).await?;
853 Ok((last_bytes, total_size))
854 }
855 }
856
857 async fn inner(&self) -> &Arc<AsyncRangeReaderInner> {
858 self.0.get().await
859 }
860
861 async fn with_retries<
862 T,
863 F: FnMut(usize, HttpRequestBuilder, HeaderValue, Url, HostInfo) -> Fut,
864 Fut: Future<Output = IoResult<T>>,
865 F2: FnMut(HostInfo) -> Fut2,
866 Fut2: Future<Output = ()>,
867 >(
868 &self,
869 key: &str,
870 method: Method,
871 async_task_id: u32,
872 tries_info: TriesInfo<'_>,
873 trying_hosts: &TryingHosts,
874 mut on_host_selected: F2,
875 mut for_each_url: F,
876 ) -> IoResult3<T> {
877 let begin_at = SystemTime::now();
878 let mut last_error: Option<IoError> = None;
879 let inner = self.inner().await;
880
881 loop {
882 let tries = tries_info.have_tried.fetch_add(1, Relaxed);
883 if tries >= tries_info.total_tries {
884 return IoResult3::NoMoreTries(last_error);
885 }
886
887 let chosen_io_info = {
888 let mut guard = trying_hosts.lock().await;
889 if let Some(chosen) = inner.io_selector.select_host(&guard).await {
890 guard.insert(chosen.host().to_owned());
891 drop(guard);
892 TryingHostInfo {
893 host_info: chosen,
894 trying_hosts: trying_hosts.to_owned(),
895 }
896 } else {
897 return IoResult3::NoMoreTries(last_error);
898 }
899 };
900 on_host_selected(chosen_io_info.to_owned()).await;
901 let download_url = sign_download_url_if_needed(
902 &make_download_url(
903 chosen_io_info.host(),
904 inner.credential.access_key(),
905 &inner.bucket,
906 key,
907 inner.use_getfile_api,
908 inner.normalize_key,
909 ),
910 inner.private_url_lifetime,
911 &inner.credential,
912 );
913 let req_id = get_req_id2(
914 begin_at,
915 tries,
916 async_task_id,
917 chosen_io_info.host_info.timeout(),
918 );
919 let request_begin_at_instant = Instant::now();
920 let request_builder = inner
921 .http_client
922 .request(method.to_owned(), download_url.to_owned())
923 .header(REQUEST_ID_HEADER, req_id.to_owned());
924 match for_each_url(
925 tries,
926 request_builder,
927 req_id,
928 download_url,
929 chosen_io_info.to_owned(),
930 )
931 .await
932 {
933 Ok(result) => {
934 inner.io_selector.reward(chosen_io_info.host()).await;
935 inner
936 .dotter
937 .dot(
938 DotType::Http,
939 ApiName::IoGetfile,
940 true,
941 request_begin_at_instant.elapsed(),
942 )
943 .await
944 .ok();
945 return Ok(result).into();
946 }
947 Err(err) => {
948 let punished = inner
949 .io_selector
950 .punish(chosen_io_info.host(), &err, &inner.dotter)
951 .await;
952 inner
953 .dotter
954 .dot(
955 DotType::Http,
956 ApiName::IoGetfile,
957 false,
958 request_begin_at_instant.elapsed(),
959 )
960 .await
961 .ok();
962 if punished {
963 last_error = Some(err);
964 } else {
965 return Err(err).into();
966 }
967 }
968 }
969 }
970
971 fn make_download_url(
972 io_url: &str,
973 access_key: &str,
974 bucket: &str,
975 key: &str,
976 use_getfile_api: bool,
977 normalize_key: bool,
978 ) -> String {
979 let mut url = if use_getfile_api {
980 format!("{}/getfile/{}/{}", io_url, access_key, bucket)
981 } else {
982 io_url.to_owned()
983 };
984 if normalize_key {
985 if url.ends_with('/') && key.starts_with('/') {
986 url.truncate(url.len() - 1);
987 } else if !url.ends_with('/') && !key.starts_with('/') {
988 url.push('/');
989 }
990 }
991 url.push_str(key);
992 url
993 }
994
995 fn sign_download_url_if_needed(
996 url: &str,
997 private_url_lifetime: Option<Duration>,
998 credential: &Credential,
999 ) -> Url {
1000 if let Some(private_url_lifetime) = private_url_lifetime {
1001 Url::parse(
1002 &sign_download_url_with_lifetime(
1003 credential,
1004 Url::parse(url).unwrap(),
1005 private_url_lifetime,
1006 )
1007 .unwrap(),
1008 )
1009 .unwrap()
1010 } else {
1011 Url::parse(url).unwrap()
1012 }
1013 }
1014 }
1015
1016 async fn punish_if_needed(&self, host: &str, timeout_power: usize, err: &ReqwestError) {
1017 if err.is_timeout() {
1018 self.inner()
1019 .await
1020 .io_selector
1021 .increase_timeout_power_by(host, timeout_power)
1022 .await
1023 } else if err.is_connect() {
1024 self.inner()
1025 .await
1026 .io_selector
1027 .mark_connection_as_failed(host)
1028 .await
1029 }
1030 }
1031}
1032
1033impl Debug for AsyncRangeReader {
1034 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1035 f.debug_tuple("AsyncRangeReader")
1036 .field(&self.0.try_get())
1037 .finish()
1038 }
1039}
1040
1041#[derive(Debug, Clone)]
1043pub struct RangePart {
1044 pub data: Vec<u8>,
1046 pub range: (u64, u64),
1048}
1049
1050#[derive(Clone, Debug, PartialEq, Eq)]
1051pub(super) enum Result3<T, E> {
1052 Ok(T),
1053 Err(E),
1054 NoMoreTries(Option<E>),
1055}
1056
1057pub(super) type IoResult3<T> = Result3<T, IoError>;
1058
1059impl<T, E> From<Result<T, E>> for Result3<T, E> {
1060 fn from(r: Result<T, E>) -> Self {
1061 match r {
1062 Ok(r) => Result3::Ok(r),
1063 Err(e) => Result3::Err(e),
1064 }
1065 }
1066}
1067
1068pub(super) type TryingHosts = Arc<Mutex<HashSet<String>>>;
1069
1070struct TryingHostInfo {
1071 host_info: HostInfo,
1072 trying_hosts: TryingHosts,
1073}
1074
1075impl Deref for TryingHostInfo {
1076 type Target = HostInfo;
1077
1078 fn deref(&self) -> &Self::Target {
1079 &self.host_info
1080 }
1081}
1082
1083impl Drop for TryingHostInfo {
1084 fn drop(&mut self) {
1085 if let Ok(mut trying_hosts) = self.trying_hosts.try_lock() {
1086 trying_hosts.remove(self.host_info.host());
1087 return;
1088 }
1089 let trying_hosts = take(&mut self.trying_hosts);
1090 let host_info = take(&mut self.host_info);
1091 spawn(async move {
1092 trying_hosts.lock().await.remove(host_info.host());
1093 });
1094 }
1095}
1096
1097#[derive(Clone, Copy, Debug)]
1098pub(super) struct TriesInfo<'a> {
1099 have_tried: &'a AtomicUsize,
1100 total_tries: usize,
1101}
1102
1103impl<'a> TriesInfo<'a> {
1104 pub(super) fn new(have_tried: &'a AtomicUsize, total_tries: usize) -> Self {
1105 Self {
1106 have_tried,
1107 total_tries,
1108 }
1109 }
1110}
1111
1112fn unexpected_status_code(resp: &HttpResponse) -> IoError {
1113 let error_kind = if resp.status().is_client_error() {
1114 IoErrorKind::InvalidData
1115 } else {
1116 IoErrorKind::Other
1117 };
1118 IoError::new(
1119 error_kind,
1120 format!("Unexpected status code {}", resp.status().as_u16()),
1121 )
1122}
1123
1124fn parse_content_length(resp: &HttpResponse) -> u64 {
1125 resp.content_length()
1126 .and_then(|s| if s > 0 { Some(s) } else { None })
1127 .or_else(|| {
1128 resp.headers()
1129 .get(CONTENT_LENGTH)
1130 .and_then(|length| length.to_str().ok())
1131 .and_then(|length| length.parse().ok())
1132 })
1133 .expect("Content-Length must be existed")
1134}
1135
1136fn extract_range_header(headers: &HeaderMap) -> IoResult<(u64, u64, u64)> {
1137 let content_range = headers
1138 .get(CONTENT_RANGE)
1139 .ok_or_else(new_io_error(
1140 IoErrorKind::InvalidInput,
1141 "Content-Range must be existed",
1142 ))?
1143 .to_str()
1144 .map_err(io_error_from(IoErrorKind::InvalidInput))?;
1145 let (from, to, total_size) =
1146 parse_range_header(content_range).map_err(io_error_from(IoErrorKind::InvalidInput))?;
1147 Ok((from, to, total_size))
1148}
1149
1150fn parse_range_header(range: &str) -> Result<(u64, u64, u64), TextIoError> {
1151 let from: u64;
1152 let to: u64;
1153 let total_size: u64;
1154 try_scan_text!(range.bytes() => "bytes {}-{}/{}", from, to, total_size);
1155 Ok((from, to, total_size))
1156}
1157
1158async fn read_response_body(resp: HttpResponse, limit: Option<u64>) -> IoResult<Vec<u8>> {
1159 let mut buf_cursor = Cursor::new(Vec::<u8>::new());
1160 let body = resp
1161 .bytes_stream()
1162 .map_err(io_error_from(IoErrorKind::BrokenPipe))
1163 .into_async_read();
1164 let mut copy_from = if let Some(limit) = limit {
1165 Either::Left(body.take(limit).compat())
1166 } else {
1167 Either::Right(body.compat())
1168 };
1169 io_copy(&mut copy_from, &mut buf_cursor).await?;
1170 Ok(buf_cursor.into_inner())
1171}
1172
1173fn io_error_from<E: Into<Box<dyn StdError + Send + Sync>>>(
1174 kind: IoErrorKind,
1175) -> impl Fn(E) -> IoError {
1176 move |err| IoError::new(kind, err)
1177}
1178
1179fn new_io_error<E: Into<Box<dyn StdError + Send + Sync>>>(
1180 kind: IoErrorKind,
1181 err: E,
1182) -> impl FnOnce() -> IoError {
1183 move || IoError::new(kind, err)
1184}
1185
1186#[cfg(test)]
1187mod tests {
1188 use super::{
1189 super::{
1190 cache_dir::cache_dir_path_of,
1191 dot::{AsyncDotRecordsMap, DotRecordKey, DotRecords, DOT_FILE_NAME},
1192 query::CACHE_FILE_NAME,
1193 },
1194 *,
1195 };
1196 use futures::channel::oneshot::channel;
1197 use multipart::client::lazy::Multipart as LazyMultipart;
1198 use serde_json::{json, to_vec as json_to_vec};
1199 use std::{
1200 io::Read,
1201 sync::{
1202 atomic::{AtomicUsize, Ordering::Relaxed},
1203 Arc,
1204 },
1205 };
1206 use tokio::{fs::remove_file, task::spawn, time::sleep};
1207 use warp::{
1208 header,
1209 http::{header::AUTHORIZATION, HeaderValue, StatusCode},
1210 hyper::Body,
1211 path,
1212 reply::Response,
1213 Filter,
1214 };
1215
1216 macro_rules! starts_with_server {
1217 ($addr:ident, $routes:ident, $code:block) => {{
1218 let (tx, rx) = channel();
1219 let ($addr, server) =
1220 warp::serve($routes).bind_with_graceful_shutdown(([127, 0, 0, 1], 0), async move {
1221 rx.await.unwrap();
1222 });
1223 spawn(server);
1224 $code;
1225 tx.send(()).unwrap();
1226 }};
1227 ($io_addr:ident, $monitor_addr:ident, $io_routes:ident, $records_map:ident, $code:block) => {{
1228 let (io_tx, io_rx) = channel();
1229 let (monitor_tx, monitor_rx) = channel();
1230 let ($io_addr, io_server) = warp::serve($io_routes).bind_with_graceful_shutdown(
1231 ([127, 0, 0, 1], 0),
1232 async move {
1233 io_rx.await.unwrap();
1234 },
1235 );
1236 let $records_map = Arc::new(AsyncDotRecordsMap::default());
1237 let monitor_routes = {
1238 let records_map = $records_map.to_owned();
1239 path!("v1" / "stat")
1240 .and(warp::header::value(AUTHORIZATION.as_str()))
1241 .and(warp::body::json())
1242 .then(move |authorization: HeaderValue, records: DotRecords| {
1243 assert!(authorization.to_str().unwrap().starts_with("UpToken "));
1244 let records_map = records_map.to_owned();
1245 async move {
1246 records_map.merge_with_records(records).await;
1247 Response::new(Body::empty())
1248 }
1249 })
1250 };
1251 let ($monitor_addr, monitor_server) = warp::serve(monitor_routes)
1252 .bind_with_graceful_shutdown(([127, 0, 0, 1], 0), async move {
1253 monitor_rx.await.unwrap();
1254 });
1255 spawn(io_server);
1256 spawn(monitor_server);
1257 $code;
1258 io_tx.send(()).unwrap();
1259 monitor_tx.send(()).unwrap();
1260 }};
1261 ($io_addr:ident, $uc_addr:ident, $io_routes:ident, $code:block) => {{
1262 let (io_tx, io_rx) = channel();
1263 let (uc_tx, uc_rx) = channel();
1264 let ($io_addr, io_server) = warp::serve($io_routes).bind_with_graceful_shutdown(
1265 ([127, 0, 0, 1], 0),
1266 async move {
1267 io_rx.await.unwrap();
1268 },
1269 );
1270 let io_addr = $io_addr.to_owned();
1271 let uc_routes = {
1272 path!("v4" / "query")
1273 .map(move || {
1274 Response::new(json_to_vec(&json!({
1275 "hosts": [{
1276 "ttl": 86400,
1277 "io": {
1278 "domains": [io_addr]
1279 },
1280 "uc": {
1281 "domains": []
1282 }
1283 }]
1284 })).unwrap().into())
1285 })
1286 };
1287 let ($uc_addr, uc_server) = warp::serve(uc_routes).bind_with_graceful_shutdown(
1288 ([127, 0, 0, 1], 0),
1289 async move {
1290 uc_rx.await.unwrap();
1291 },
1292 );
1293 spawn(io_server);
1294 spawn(uc_server);
1295 $code;
1296 io_tx.send(()).unwrap();
1297 uc_tx.send(()).unwrap();
1298 }};
1299 }
1300
1301 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1302 async fn test_read_at() -> anyhow::Result<()> {
1303 env_logger::try_init().ok();
1304 clear_cache().await?;
1305
1306 let io_routes = {
1307 let action_1 =
1308 path!("file")
1309 .and(header::value(RANGE.as_str()))
1310 .map(|range: HeaderValue| {
1311 assert_eq!(range.to_str().unwrap(), "bytes=5-10");
1312 Response::new("1234567890".into())
1313 });
1314 let action_2 =
1315 path!("file2")
1316 .and(header::value(RANGE.as_str()))
1317 .map(|range: HeaderValue| {
1318 assert_eq!(range.to_str().unwrap(), "bytes=5-16");
1319 Response::new("1234567890".into())
1320 });
1321 action_1.or(action_2)
1322 };
1323
1324 starts_with_server!(io_addr, monitor_addr, io_routes, records_map, {
1325 let io_urls = vec![format!("http://{}", io_addr)];
1326 {
1327 let have_tried = AtomicUsize::new(0);
1328 let io_urls = io_urls.to_owned();
1329 let downloader = AsyncRangeReaderBuilder::from(
1330 BaseRangeReaderBuilder::new(
1331 "bucket".to_owned(),
1332 "file".to_owned(),
1333 get_credential(),
1334 io_urls,
1335 )
1336 .use_getfile_api(false)
1337 .normalize_key(true)
1338 .monitor_urls(vec!["http://".to_owned() + &monitor_addr.to_string()])
1339 .dot_interval(Duration::from_millis(0))
1340 .max_dot_buffer_size(1),
1341 )
1342 .build();
1343
1344 match downloader
1345 .read_at(
1346 5,
1347 6,
1348 "file",
1349 0,
1350 TriesInfo::new(&have_tried, 1),
1351 &Default::default(),
1352 |_| async {},
1353 )
1354 .await
1355 {
1356 Result3::Ok(buf) => {
1357 assert_eq!(&buf, b"123456")
1358 }
1359 _ => unreachable!(),
1360 }
1361 }
1362 {
1363 let have_tried = AtomicUsize::new(0);
1364 let io_urls = io_urls.to_owned();
1365 let downloader = AsyncRangeReaderBuilder::from(
1366 BaseRangeReaderBuilder::new(
1367 "bucket".to_owned(),
1368 "file2".to_owned(),
1369 get_credential(),
1370 io_urls,
1371 )
1372 .use_getfile_api(false)
1373 .normalize_key(true)
1374 .monitor_urls(vec!["http://".to_owned() + &monitor_addr.to_string()])
1375 .dot_interval(Duration::from_millis(0))
1376 .max_dot_buffer_size(1),
1377 )
1378 .build();
1379
1380 match downloader
1381 .read_at(
1382 5,
1383 12,
1384 "file2",
1385 0,
1386 TriesInfo::new(&have_tried, 1),
1387 &Default::default(),
1388 |_| async {},
1389 )
1390 .await
1391 {
1392 Result3::Ok(buf) => {
1393 assert_eq!(&buf[..10], b"1234567890")
1394 }
1395 _ => unreachable!(),
1396 }
1397 }
1398
1399 sleep(Duration::from_secs(5)).await;
1400 {
1401 let record = records_map
1402 .read_async(
1403 &DotRecordKey::new(DotType::Http, ApiName::IoGetfile),
1404 |_, record| record.to_owned(),
1405 )
1406 .await
1407 .unwrap();
1408 assert_eq!(record.success_count(), Some(2));
1409 assert_eq!(record.failed_count(), Some(0));
1410 }
1411 });
1412 Ok(())
1413 }
1414
1415 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1416 async fn test_read_at_2() -> anyhow::Result<()> {
1417 env_logger::try_init().ok();
1418 clear_cache().await?;
1419
1420 let io_called = Arc::new(AtomicUsize::new(0));
1421 let io_routes = {
1422 let io_called = io_called.to_owned();
1423 path!("file")
1424 .and(header::value(RANGE.as_str()))
1425 .map(move |range: HeaderValue| {
1426 assert_eq!(range.to_str().unwrap(), "bytes=1-5");
1427 io_called.fetch_add(1, Relaxed);
1428 let mut resp = Response::new("12345".into());
1429 *resp.status_mut() = StatusCode::NOT_IMPLEMENTED;
1430 resp
1431 })
1432 };
1433 starts_with_server!(io_addr, monitor_addr, io_routes, records_map, {
1434 let have_tried = AtomicUsize::new(0);
1435 let io_urls = vec![format!("http://{}", io_addr)];
1436 let downloader = AsyncRangeReaderBuilder::from(
1437 BaseRangeReaderBuilder::new(
1438 "bucket".to_owned(),
1439 "file".to_owned(),
1440 get_credential(),
1441 io_urls,
1442 )
1443 .use_getfile_api(false)
1444 .normalize_key(true)
1445 .monitor_urls(vec!["http://".to_owned() + &monitor_addr.to_string()])
1446 .dot_interval(Duration::from_millis(0))
1447 .max_dot_buffer_size(1),
1448 )
1449 .build();
1450
1451 match downloader
1452 .read_at(
1453 1,
1454 5,
1455 "file",
1456 0,
1457 TriesInfo::new(&have_tried, 3),
1458 &Default::default(),
1459 |_| async {},
1460 )
1461 .await
1462 {
1463 Result3::NoMoreTries(..) => {}
1464 _ => unreachable!(),
1465 }
1466 assert_eq!(io_called.load(Relaxed), 3);
1467
1468 sleep(Duration::from_secs(5)).await;
1469 {
1470 let record = records_map
1471 .read_async(
1472 &DotRecordKey::new(DotType::Http, ApiName::IoGetfile),
1473 |_, record| record.to_owned(),
1474 )
1475 .await
1476 .unwrap();
1477 assert_eq!(record.success_count(), Some(0));
1478 assert_eq!(record.failed_count(), Some(3));
1479 }
1480 });
1481 Ok(())
1482 }
1483
1484 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1485 async fn test_read_at_3() -> anyhow::Result<()> {
1486 env_logger::try_init().ok();
1487 clear_cache().await?;
1488
1489 let io_called = Arc::new(AtomicUsize::new(0));
1490 let io_routes = {
1491 let io_called = io_called.to_owned();
1492 path!("file")
1493 .and(header::value(RANGE.as_str()))
1494 .map(move |range: HeaderValue| {
1495 assert_eq!(range.to_str().unwrap(), "bytes=1-5");
1496 io_called.fetch_add(1, Relaxed);
1497 let mut resp = Response::new("12345".into());
1498 *resp.status_mut() = StatusCode::BAD_REQUEST;
1499 resp
1500 })
1501 };
1502 starts_with_server!(io_addr, monitor_addr, io_routes, records_map, {
1503 let have_tried = AtomicUsize::new(0);
1504 let io_urls = vec![format!("http://{}", io_addr)];
1505 let downloader = AsyncRangeReaderBuilder::from(
1506 BaseRangeReaderBuilder::new(
1507 "bucket".to_owned(),
1508 "file".to_owned(),
1509 get_credential(),
1510 io_urls,
1511 )
1512 .use_getfile_api(false)
1513 .normalize_key(true)
1514 .monitor_urls(vec!["http://".to_owned() + &monitor_addr.to_string()])
1515 .dot_interval(Duration::from_millis(0))
1516 .max_dot_buffer_size(1),
1517 )
1518 .build();
1519
1520 match downloader
1521 .read_at(
1522 1,
1523 5,
1524 "file",
1525 0,
1526 TriesInfo::new(&have_tried, 1),
1527 &Default::default(),
1528 |_| async {},
1529 )
1530 .await
1531 {
1532 Result3::Err(..) => {}
1533 _ => unreachable!(),
1534 }
1535 assert_eq!(io_called.load(Relaxed), 1);
1536
1537 sleep(Duration::from_secs(5)).await;
1538 {
1539 let record = records_map
1540 .read_async(
1541 &DotRecordKey::new(DotType::Http, ApiName::IoGetfile),
1542 |_, record| record.to_owned(),
1543 )
1544 .await
1545 .unwrap();
1546 assert_eq!(record.success_count(), Some(0));
1547 assert_eq!(record.failed_count(), Some(1));
1548 }
1549 });
1550 Ok(())
1551 }
1552
1553 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1554 async fn test_read_last_bytes() -> anyhow::Result<()> {
1555 env_logger::try_init().ok();
1556 clear_cache().await?;
1557
1558 let io_routes =
1559 path!("file")
1560 .and(header::value(RANGE.as_str()))
1561 .map(|range: HeaderValue| {
1562 assert_eq!(range.to_str().unwrap(), "bytes=-10");
1563 let mut resp = Response::new("1234567890".into());
1564 *resp.status_mut() = StatusCode::PARTIAL_CONTENT;
1565 resp.headers_mut().insert(
1566 CONTENT_RANGE,
1567 "bytes 157286390-157286399/157286400".parse().unwrap(),
1568 );
1569 resp
1570 });
1571 starts_with_server!(io_addr, monitor_addr, io_routes, records_map, {
1572 let have_tried = AtomicUsize::new(0);
1573 let io_urls = vec![format!("http://{}", io_addr)];
1574 let downloader = AsyncRangeReaderBuilder::from(
1575 BaseRangeReaderBuilder::new(
1576 "bucket".to_owned(),
1577 "file".to_owned(),
1578 get_credential(),
1579 io_urls,
1580 )
1581 .use_getfile_api(false)
1582 .normalize_key(true)
1583 .monitor_urls(vec!["http://".to_owned() + &monitor_addr.to_string()])
1584 .dot_interval(Duration::from_millis(0))
1585 .max_dot_buffer_size(1),
1586 )
1587 .build();
1588
1589 match downloader
1590 .read_last_bytes(
1591 10,
1592 "file",
1593 0,
1594 TriesInfo::new(&have_tried, 1),
1595 &Default::default(),
1596 |_| async {},
1597 )
1598 .await
1599 {
1600 Result3::Ok((buf, total_size)) => {
1601 assert_eq!(&buf, b"1234567890");
1602 assert_eq!(total_size, 157286400);
1603 }
1604 _ => unreachable!(),
1605 }
1606
1607 sleep(Duration::from_secs(5)).await;
1608 {
1609 let record = records_map
1610 .read_async(
1611 &DotRecordKey::new(DotType::Http, ApiName::IoGetfile),
1612 |_, record| record.to_owned(),
1613 )
1614 .await
1615 .unwrap();
1616 assert_eq!(record.success_count(), Some(1));
1617 assert_eq!(record.failed_count(), Some(0));
1618 }
1619 });
1620 Ok(())
1621 }
1622
1623 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1624 async fn test_download_file() -> anyhow::Result<()> {
1625 env_logger::try_init().ok();
1626 clear_cache().await?;
1627
1628 let io_routes = { path!("file").map(|| Response::new("1234567890".into())) };
1629 starts_with_server!(io_addr, monitor_addr, io_routes, records_map, {
1630 let io_urls = vec![format!("http://{}", io_addr)];
1631 let downloader = AsyncRangeReaderBuilder::from(
1632 BaseRangeReaderBuilder::new(
1633 "bucket".to_owned(),
1634 "file".to_owned(),
1635 get_credential(),
1636 io_urls,
1637 )
1638 .use_getfile_api(false)
1639 .normalize_key(true)
1640 .monitor_urls(vec!["http://".to_owned() + &monitor_addr.to_string()])
1641 .dot_interval(Duration::from_millis(0))
1642 .max_dot_buffer_size(1),
1643 )
1644 .build();
1645
1646 let have_tried = AtomicUsize::new(0);
1647 match downloader
1648 .exist(
1649 "file",
1650 0,
1651 TriesInfo::new(&have_tried, 1),
1652 &Default::default(),
1653 |_| async {},
1654 )
1655 .await
1656 {
1657 Result3::Ok(existed) => {
1658 assert!(existed);
1659 }
1660 _ => unreachable!(),
1661 }
1662
1663 let have_tried = AtomicUsize::new(0);
1664 match downloader
1665 .file_size(
1666 "file",
1667 0,
1668 TriesInfo::new(&have_tried, 1),
1669 &Default::default(),
1670 |_| async {},
1671 )
1672 .await
1673 {
1674 Result3::Ok(file_size) => {
1675 assert_eq!(file_size, 10);
1676 }
1677 _ => unreachable!(),
1678 }
1679
1680 let have_tried = AtomicUsize::new(0);
1681 match downloader
1682 .download(
1683 "file",
1684 0,
1685 TriesInfo::new(&have_tried, 1),
1686 &Default::default(),
1687 |_| async {},
1688 )
1689 .await
1690 {
1691 Result3::Ok(buf) => {
1692 assert_eq!(&buf, b"1234567890");
1693 }
1694 _ => unreachable!(),
1695 }
1696
1697 sleep(Duration::from_secs(5)).await;
1698 {
1699 let record = records_map
1700 .read_async(
1701 &DotRecordKey::new(DotType::Http, ApiName::IoGetfile),
1702 |_, record| record.to_owned(),
1703 )
1704 .await
1705 .unwrap();
1706 assert_eq!(record.success_count(), Some(3));
1707 assert_eq!(record.failed_count(), Some(0));
1708 }
1709 });
1710 Ok(())
1711 }
1712
1713 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1714 async fn test_download_file_2() -> anyhow::Result<()> {
1715 env_logger::try_init().ok();
1716 clear_cache().await?;
1717
1718 let counter = Arc::new(AtomicUsize::new(0));
1719 let routes = {
1720 let counter = counter.to_owned();
1721 path!("file").map(move || {
1722 counter.fetch_add(1, Relaxed);
1723 let mut resp = Response::new("12345".into());
1724 *resp.status_mut() = StatusCode::NOT_IMPLEMENTED;
1725 resp
1726 })
1727 };
1728
1729 starts_with_server!(addr, monitor_addr, routes, records_map, {
1730 let io_urls = vec![format!("http://{}", addr)];
1731 let downloader = AsyncRangeReaderBuilder::from(
1732 BaseRangeReaderBuilder::new(
1733 "bucket".to_owned(),
1734 "file".to_owned(),
1735 get_credential(),
1736 io_urls,
1737 )
1738 .monitor_urls(vec!["http://".to_owned() + &monitor_addr.to_string()])
1739 .use_getfile_api(false)
1740 .normalize_key(true)
1741 .dot_interval(Duration::from_millis(0))
1742 .max_dot_buffer_size(1),
1743 )
1744 .build();
1745
1746 let have_tried = AtomicUsize::new(0);
1747 match downloader
1748 .exist(
1749 "file",
1750 0,
1751 TriesInfo::new(&have_tried, 3),
1752 &Default::default(),
1753 |_| async {},
1754 )
1755 .await
1756 {
1757 Result3::NoMoreTries(_) => {}
1758 _ => unreachable!(),
1759 }
1760
1761 let have_tried = AtomicUsize::new(0);
1762 match downloader
1763 .file_size(
1764 "file",
1765 0,
1766 TriesInfo::new(&have_tried, 3),
1767 &Default::default(),
1768 |_| async {},
1769 )
1770 .await
1771 {
1772 Result3::NoMoreTries(_) => {}
1773 _ => unreachable!(),
1774 }
1775
1776 let have_tried = AtomicUsize::new(0);
1777 match downloader
1778 .download(
1779 "file",
1780 0,
1781 TriesInfo::new(&have_tried, 3),
1782 &Default::default(),
1783 |_| async {},
1784 )
1785 .await
1786 {
1787 Result3::NoMoreTries(_) => {}
1788 _ => unreachable!(),
1789 }
1790
1791 assert_eq!(counter.load(Relaxed), 3 * 3);
1792
1793 sleep(Duration::from_secs(5)).await;
1794 {
1795 let record = records_map
1796 .read_async(
1797 &DotRecordKey::new(DotType::Http, ApiName::IoGetfile),
1798 |_, record| record.to_owned(),
1799 )
1800 .await
1801 .unwrap();
1802 assert_eq!(record.success_count(), Some(0));
1803 assert_eq!(record.failed_count(), Some(9));
1804 }
1805 {
1806 let record = records_map
1807 .read_async(&DotRecordKey::punished(), |_, record| record.to_owned())
1808 .await
1809 .unwrap();
1810 assert_eq!(record.punished_count(), Some(4));
1811 }
1812 });
1813 Ok(())
1814 }
1815
1816 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1817 async fn test_download_file_3() -> anyhow::Result<()> {
1818 env_logger::try_init().ok();
1819 clear_cache().await?;
1820
1821 let counter = Arc::new(AtomicUsize::new(0));
1822 let routes = {
1823 let counter = counter.to_owned();
1824 path!("file").map(move || {
1825 counter.fetch_add(1, Relaxed);
1826 let mut resp = Response::new("12345".into());
1827 *resp.status_mut() = StatusCode::BAD_REQUEST;
1828 resp
1829 })
1830 };
1831 starts_with_server!(addr, routes, {
1832 let io_urls = vec![format!("http://{}", addr)];
1833
1834 let downloader = AsyncRangeReaderBuilder::from(
1835 BaseRangeReaderBuilder::new(
1836 "bucket".to_owned(),
1837 "file".to_owned(),
1838 get_credential(),
1839 io_urls,
1840 )
1841 .use_getfile_api(false)
1842 .normalize_key(true),
1843 )
1844 .build();
1845
1846 let have_tried = AtomicUsize::new(0);
1847 match downloader
1848 .exist(
1849 "file",
1850 0,
1851 TriesInfo::new(&have_tried, 3),
1852 &Default::default(),
1853 |_| async {},
1854 )
1855 .await
1856 {
1857 Result3::Err(_) => {}
1858 _ => unreachable!(),
1859 }
1860
1861 let have_tried = AtomicUsize::new(0);
1862 match downloader
1863 .file_size(
1864 "file",
1865 0,
1866 TriesInfo::new(&have_tried, 3),
1867 &Default::default(),
1868 |_| async {},
1869 )
1870 .await
1871 {
1872 Result3::Err(_) => {}
1873 _ => unreachable!(),
1874 }
1875
1876 let have_tried = AtomicUsize::new(0);
1877 match downloader
1878 .download(
1879 "file",
1880 0,
1881 TriesInfo::new(&have_tried, 3),
1882 &Default::default(),
1883 |_| async {},
1884 )
1885 .await
1886 {
1887 Result3::Err(_) => {}
1888 _ => unreachable!(),
1889 }
1890 assert_eq!(counter.load(Relaxed), 3);
1891 });
1892 Ok(())
1893 }
1894
1895 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1896 async fn test_download_file_4() -> anyhow::Result<()> {
1897 env_logger::try_init().ok();
1898 clear_cache().await?;
1899
1900 let routes = { path!("file").map(|| Response::new("1234567890".into())) };
1901 starts_with_server!(addr, routes, {
1902 let io_urls = vec![format!("http://{}", addr)];
1903
1904 let downloader = AsyncRangeReaderBuilder::from(
1905 BaseRangeReaderBuilder::new(
1906 "bucket".to_owned(),
1907 "file".to_owned(),
1908 get_credential(),
1909 io_urls,
1910 )
1911 .use_getfile_api(false)
1912 .normalize_key(true),
1913 )
1914 .build();
1915
1916 let have_tried = AtomicUsize::new(0);
1917 match downloader
1918 .exist(
1919 "file",
1920 0,
1921 TriesInfo::new(&have_tried, 1),
1922 &Default::default(),
1923 |_| async {},
1924 )
1925 .await
1926 {
1927 Result3::Ok(existed) => {
1928 assert!(existed);
1929 }
1930 _ => unreachable!(),
1931 }
1932
1933 let have_tried = AtomicUsize::new(0);
1934 match downloader
1935 .file_size(
1936 "file",
1937 0,
1938 TriesInfo::new(&have_tried, 1),
1939 &Default::default(),
1940 |_| async {},
1941 )
1942 .await
1943 {
1944 Result3::Ok(file_size) => {
1945 assert_eq!(file_size, 10);
1946 }
1947 _ => unreachable!(),
1948 }
1949
1950 let have_tried = AtomicUsize::new(0);
1951 match downloader
1952 .download(
1953 "file",
1954 0,
1955 TriesInfo::new(&have_tried, 1),
1956 &Default::default(),
1957 |_| async {},
1958 )
1959 .await
1960 {
1961 Result3::Ok(buf) => {
1962 assert_eq!(buf, b"1234567890");
1963 }
1964 _ => unreachable!(),
1965 }
1966 });
1967 Ok(())
1968 }
1969
1970 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1971 async fn test_download_range() -> anyhow::Result<()> {
1972 env_logger::try_init().ok();
1973 clear_cache().await?;
1974
1975 let routes = {
1976 path!("file")
1977 .and(header::value(RANGE.as_str()))
1978 .map(move |range: HeaderValue| {
1979 assert_eq!(range.to_str().unwrap(), "bytes=0-4,5-9");
1980 let mut response_body = LazyMultipart::new();
1981 response_body.add_stream(
1982 "",
1983 Cursor::new(b"12345"),
1984 None,
1985 None,
1986 Some("bytes 0-4/10"),
1987 );
1988 response_body.add_stream(
1989 "",
1990 Cursor::new(b"67890"),
1991 None,
1992 None,
1993 Some("bytes 5-9/19"),
1994 );
1995 let mut fields = response_body.prepare().unwrap();
1996 let mut buffer = Vec::new();
1997 fields.read_to_end(&mut buffer).unwrap();
1998 let mut response = Response::new(buffer.into());
1999 *response.status_mut() = StatusCode::PARTIAL_CONTENT;
2000 response.headers_mut().insert(
2001 CONTENT_TYPE,
2002 ("multipart/form-data; boundary=".to_owned() + fields.boundary())
2003 .parse()
2004 .unwrap(),
2005 );
2006 response
2007 })
2008 };
2009
2010 starts_with_server!(addr, routes, {
2011 let io_urls = vec![format!("http://{}", addr)];
2012 let downloader = AsyncRangeReaderBuilder::from(
2013 BaseRangeReaderBuilder::new(
2014 "bucket".to_owned(),
2015 "file".to_owned(),
2016 get_credential(),
2017 io_urls,
2018 )
2019 .use_getfile_api(false)
2020 .normalize_key(true),
2021 )
2022 .build();
2023
2024 let ranges = [(0, 5), (5, 5)];
2025 let have_tried = AtomicUsize::new(0);
2026 match downloader
2027 .read_multi_ranges(
2028 &ranges,
2029 "file",
2030 0,
2031 TriesInfo::new(&have_tried, 1),
2032 &Default::default(),
2033 |_| async {},
2034 )
2035 .await
2036 {
2037 Result3::Ok(parts) => {
2038 assert_eq!(parts.len(), 2);
2039 assert_eq!(&parts.get(1).unwrap().data, b"12345");
2040 assert_eq!(parts.get(1).unwrap().range, (0, 5));
2041 assert_eq!(&parts.first().unwrap().data, b"67890");
2042 assert_eq!(parts.first().unwrap().range, (5, 5));
2043 }
2044 _ => unreachable!(),
2045 }
2046 });
2047 Ok(())
2048 }
2049
2050 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2051 async fn test_download_range_2() -> anyhow::Result<()> {
2052 env_logger::try_init().ok();
2053 clear_cache().await?;
2054
2055 let routes = {
2056 path!("file")
2057 .and(header::value(RANGE.as_str()))
2058 .map(move |range: HeaderValue| {
2059 assert_eq!(range.to_str().unwrap(), "bytes=0-4,5-9");
2060 "12345678901357924680"
2061 })
2062 };
2063
2064 starts_with_server!(addr, routes, {
2065 let io_urls = vec![format!("http://{}", addr)];
2066 let downloader = AsyncRangeReaderBuilder::from(
2067 BaseRangeReaderBuilder::new(
2068 "bucket".to_owned(),
2069 "file".to_owned(),
2070 get_credential(),
2071 io_urls,
2072 )
2073 .use_getfile_api(false)
2074 .normalize_key(true),
2075 )
2076 .build();
2077
2078 let ranges = [(0, 5), (5, 5)];
2079 let have_tried = AtomicUsize::new(0);
2080 match downloader
2081 .read_multi_ranges(
2082 &ranges,
2083 "file",
2084 0,
2085 TriesInfo::new(&have_tried, 1),
2086 &Default::default(),
2087 |_| async {},
2088 )
2089 .await
2090 {
2091 Result3::Ok(parts) => {
2092 assert_eq!(parts.len(), 2);
2093 assert_eq!(&parts.first().unwrap().data, b"12345");
2094 assert_eq!(parts.first().unwrap().range, (0, 5));
2095 assert_eq!(&parts.get(1).unwrap().data, b"67890");
2096 assert_eq!(parts.get(1).unwrap().range, (5, 5));
2097 }
2098 _ => unreachable!(),
2099 }
2100 });
2101
2102 Ok(())
2103 }
2104
2105 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2106 async fn test_download_range_3() -> anyhow::Result<()> {
2107 env_logger::try_init().ok();
2108 clear_cache().await?;
2109
2110 let counter = Arc::new(AtomicUsize::new(0));
2111 let routes = {
2112 let counter = counter.to_owned();
2113 path!("file")
2114 .and(header::value(RANGE.as_str()))
2115 .map(move |range: HeaderValue| {
2116 counter.fetch_add(1, Relaxed);
2117 assert_eq!(range.to_str().unwrap(), "bytes=0-4,5-9");
2118 let mut resp = Response::new("12345".into());
2119 *resp.status_mut() = StatusCode::NOT_IMPLEMENTED;
2120 resp
2121 })
2122 };
2123
2124 starts_with_server!(addr, routes, {
2125 let c = counter.to_owned();
2126 spawn(async move {
2127 let io_urls = vec![format!("http://{}", addr)];
2128 let downloader = AsyncRangeReaderBuilder::from(
2129 BaseRangeReaderBuilder::new(
2130 "bucket".to_owned(),
2131 "file".to_owned(),
2132 get_credential(),
2133 io_urls,
2134 )
2135 .use_getfile_api(false)
2136 .normalize_key(true),
2137 )
2138 .build();
2139
2140 let ranges = [(0, 5), (5, 5)];
2141 let have_tried = AtomicUsize::new(0);
2142 match downloader
2143 .read_multi_ranges(
2144 &ranges,
2145 "file",
2146 0,
2147 TriesInfo::new(&have_tried, 3),
2148 &Default::default(),
2149 |_| async {},
2150 )
2151 .await
2152 {
2153 Result3::NoMoreTries(..) => {}
2154 _ => unreachable!(),
2155 }
2156 assert_eq!(c.load(Relaxed), 3);
2157 })
2158 .await?;
2159
2160 let c = counter.to_owned();
2161 spawn(async move {
2162 let io_urls = vec![format!("http://{}", addr)];
2163 let downloader = AsyncRangeReaderBuilder::from(
2164 BaseRangeReaderBuilder::new(
2165 "bucket".to_owned(),
2166 "/file".to_owned(),
2167 get_credential(),
2168 io_urls,
2169 )
2170 .use_getfile_api(false),
2171 )
2172 .build();
2173
2174 let ranges = [(0, 5), (5, 5)];
2175 let have_tried = AtomicUsize::new(0);
2176 match downloader
2177 .read_multi_ranges(
2178 &ranges,
2179 "/file",
2180 0,
2181 TriesInfo::new(&have_tried, 3),
2182 &Default::default(),
2183 |_| async {},
2184 )
2185 .await
2186 {
2187 Result3::NoMoreTries(..) => {}
2188 _ => unreachable!(),
2189 }
2190 assert_eq!(c.load(Relaxed), 6);
2191 })
2192 .await?;
2193 });
2194
2195 Ok(())
2196 }
2197
2198 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2199 async fn test_download_range_4() -> anyhow::Result<()> {
2200 env_logger::try_init().ok();
2201 clear_cache().await?;
2202
2203 let routes = {
2204 path!("file")
2205 .and(header::value(RANGE.as_str()))
2206 .map(move |range: HeaderValue| {
2207 assert_eq!(range.to_str().unwrap(), "bytes=0-4,5-9");
2208 let mut response_body = LazyMultipart::new();
2209 response_body.add_stream(
2210 "",
2211 Cursor::new(b"12345"),
2212 None,
2213 None,
2214 Some("bytes 0-4/6"),
2215 );
2216 response_body.add_stream(
2217 "",
2218 Cursor::new(b"6"),
2219 None,
2220 None,
2221 Some("bytes 5-5/6"),
2222 );
2223 let mut fields = response_body.prepare().unwrap();
2224 let mut buffer = Vec::new();
2225 fields.read_to_end(&mut buffer).unwrap();
2226 let mut response = Response::new(buffer.into());
2227 *response.status_mut() = StatusCode::PARTIAL_CONTENT;
2228 response.headers_mut().insert(
2229 CONTENT_TYPE,
2230 ("multipart/form-data; boundary=".to_owned() + fields.boundary())
2231 .parse()
2232 .unwrap(),
2233 );
2234 response
2235 })
2236 };
2237
2238 starts_with_server!(addr, routes, {
2239 let io_urls = vec![format!("http://{}", addr)];
2240 let downloader = AsyncRangeReaderBuilder::from(
2241 BaseRangeReaderBuilder::new(
2242 "bucket".to_owned(),
2243 "file".to_owned(),
2244 get_credential(),
2245 io_urls,
2246 )
2247 .use_getfile_api(false)
2248 .normalize_key(true),
2249 )
2250 .build();
2251
2252 let ranges = [(0, 5), (5, 5)];
2253 let have_tried = AtomicUsize::new(0);
2254 match downloader
2255 .read_multi_ranges(
2256 &ranges,
2257 "file",
2258 0,
2259 TriesInfo::new(&have_tried, 1),
2260 &Default::default(),
2261 |_| async {},
2262 )
2263 .await
2264 {
2265 Result3::Ok(parts) => {
2266 assert_eq!(parts.len(), 2);
2267 assert_eq!(&parts.get(1).unwrap().data, b"12345");
2268 assert_eq!(parts.get(1).unwrap().range, (0, 5));
2269 assert_eq!(&parts.first().unwrap().data, b"6");
2270 assert_eq!(parts.first().unwrap().range, (5, 1));
2271 }
2272 _ => unreachable!(),
2273 }
2274 });
2275
2276 Ok(())
2277 }
2278
2279 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2280 async fn test_download_range_5() -> anyhow::Result<()> {
2281 env_logger::try_init().ok();
2282 clear_cache().await?;
2283
2284 let routes = {
2285 path!("file")
2286 .and(header::value(RANGE.as_str()))
2287 .map(move |range: HeaderValue| {
2288 assert_eq!(range.to_str().unwrap(), "bytes=0-4,5-5");
2289 "1234"
2290 })
2291 };
2292
2293 starts_with_server!(addr, routes, {
2294 let io_urls = vec![format!("http://{}", addr)];
2295 let downloader = AsyncRangeReaderBuilder::from(
2296 BaseRangeReaderBuilder::new(
2297 "bucket".to_owned(),
2298 "file".to_owned(),
2299 get_credential(),
2300 io_urls,
2301 )
2302 .use_getfile_api(false)
2303 .normalize_key(true),
2304 )
2305 .build();
2306
2307 let ranges = [(0, 5), (5, 1)];
2308 let have_tried = AtomicUsize::new(0);
2309 match downloader
2310 .read_multi_ranges(
2311 &ranges,
2312 "file",
2313 0,
2314 TriesInfo::new(&have_tried, 1),
2315 &Default::default(),
2316 |_| async {},
2317 )
2318 .await
2319 {
2320 IoResult3::Ok(parts) => {
2321 assert_eq!(parts.len(), 1);
2322 assert_eq!(&parts.first().unwrap().data, b"1234");
2323 assert_eq!(parts.first().unwrap().range, (0, 4));
2324 }
2325 _ => unreachable!(),
2326 }
2327 });
2328
2329 Ok(())
2330 }
2331
2332 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2333 async fn test_download_range_6() -> anyhow::Result<()> {
2334 env_logger::try_init().ok();
2335 clear_cache().await?;
2336
2337 let routes = {
2338 path!("file")
2339 .and(header::value(RANGE.as_str()))
2340 .map(move |range: HeaderValue| {
2341 assert_eq!(range.to_str().unwrap(), "bytes=0-3");
2342 let mut response = Response::new("123".into());
2343 response
2344 .headers_mut()
2345 .insert(CONTENT_RANGE, "bytes 0-3/3".parse().unwrap());
2346 response
2347 })
2348 };
2349
2350 starts_with_server!(addr, routes, {
2351 let io_urls = vec![format!("http://{}", addr)];
2352 let downloader = AsyncRangeReaderBuilder::from(
2353 BaseRangeReaderBuilder::new(
2354 "bucket".to_owned(),
2355 "file".to_owned(),
2356 get_credential(),
2357 io_urls,
2358 )
2359 .use_getfile_api(false)
2360 .normalize_key(true),
2361 )
2362 .build();
2363
2364 let ranges = [(0, 4)];
2365 let have_tried = AtomicUsize::new(0);
2366 match downloader
2367 .read_multi_ranges(
2368 &ranges,
2369 "file",
2370 0,
2371 TriesInfo::new(&have_tried, 1),
2372 &Default::default(),
2373 |_| async {},
2374 )
2375 .await
2376 {
2377 Result3::Ok(parts) => {
2378 assert_eq!(parts.len(), 1);
2379 assert_eq!(&parts.first().unwrap().data, b"123");
2380 assert_eq!(parts.first().unwrap().range, (0, 3));
2381 }
2382 _ => unreachable!(),
2383 }
2384 });
2385
2386 Ok(())
2387 }
2388
2389 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2390 async fn test_update_hosts() -> anyhow::Result<()> {
2391 env_logger::try_init().ok();
2392 clear_cache().await?;
2393
2394 let routes = { path!("file").map(move || Response::new("12345".into())) };
2395 starts_with_server!(io_addr, uc_addr, routes, {
2396 let io_urls = vec!["http://fakedomain:12345".to_owned()];
2397 let uc_urls = vec![format!("http://{}", uc_addr)];
2398 let downloader = AsyncRangeReaderBuilder::from(
2399 BaseRangeReaderBuilder::new(
2400 "bucket".to_owned(),
2401 "file".to_owned(),
2402 get_credential(),
2403 io_urls.to_owned(),
2404 )
2405 .uc_urls(uc_urls)
2406 .use_getfile_api(false)
2407 .normalize_key(true),
2408 )
2409 .build();
2410
2411 assert_eq!(downloader.io_urls().await, io_urls);
2412 assert!(downloader.update_urls().await);
2413 assert_eq!(
2414 downloader.io_urls().await,
2415 vec![format!("http://{}", io_addr)]
2416 );
2417 let have_tried = AtomicUsize::new(0);
2418 match downloader
2419 .download(
2420 "file",
2421 0,
2422 TriesInfo::new(&have_tried, 1),
2423 &Default::default(),
2424 |_| async {},
2425 )
2426 .await
2427 {
2428 Result3::Ok(buf) => {
2429 assert_eq!(&buf, b"12345")
2430 }
2431 _ => unreachable!(),
2432 }
2433 });
2434 Ok(())
2435 }
2436
2437 #[tokio::test]
2438 async fn test_sign_download_url_with_deadline() -> anyhow::Result<()> {
2439 env_logger::try_init().ok();
2440 clear_cache().await?;
2441
2442 let credential = Credential::new("abcdefghklmnopq", "1234567890");
2443 assert_eq!(
2444 sign_download_url_with_deadline(&credential,
2445 Url::parse("http://www.qiniu.com/?go=1")?,
2446 SystemTime::UNIX_EPOCH + Duration::from_secs(1_234_567_890 + 3600),
2447 )?,
2448 "http://www.qiniu.com/?go=1&e=1234571490&token=abcdefghklmnopq:KjQtlGAkEOhSwtFjJfYtYa2-reE=",
2449 );
2450 Ok(())
2451 }
2452
2453 fn get_credential() -> Credential {
2454 Credential::new("1234567890", "abcdefghijk")
2455 }
2456
2457 async fn clear_cache() -> IoResult<()> {
2458 let cache_file_path = cache_dir_path_of(CACHE_FILE_NAME).await?;
2459 remove_file(&cache_file_path).await.or_else(|err| {
2460 if err.kind() == IoErrorKind::NotFound {
2461 Ok(())
2462 } else {
2463 Err(err)
2464 }
2465 })?;
2466 let dot_file_path = cache_dir_path_of(DOT_FILE_NAME).await?;
2467 remove_file(&dot_file_path).await.or_else(|err| {
2468 if err.kind() == IoErrorKind::NotFound {
2469 Ok(())
2470 } else {
2471 Err(err)
2472 }
2473 })?;
2474 Ok(())
2475 }
2476}