1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
use async_stream::try_stream; use futures_core::stream::Stream; use futures_util::{pin_mut, StreamExt}; use reqwest::Url; use std::time::Duration; use tokio::time::delay_for; use crate::error::Error; pub struct Builder { url: Url, size: u64, offset: u64, retry_delay: Option<Duration>, receive_timeout: Option<Duration>, retry_count: u32, client: reqwest::Client, } impl Builder { pub fn new(url: Url, offset: u64, size: u64) -> Self { Self { url, offset, size, retry_delay: None, retry_count: 0, receive_timeout: None, client: reqwest::Client::new(), } } pub fn retry(mut self, retry_count: u32, retry_delay: Option<Duration>) -> Self { self.retry_delay = retry_delay; self.retry_count = retry_count; self } pub fn receive_timeout(mut self, timeout: Option<Duration>) -> Self { self.receive_timeout = timeout; self } fn stream_fail( client: reqwest::Client, offset: u64, size: u64, url: Url, timeout: Option<Duration>, ) -> impl Stream<Item = Result<bytes::Bytes, Error>> { try_stream! { log::info!("Requesting {}, starting at offset {} with size {}...", url, offset, size); let end_offset = offset + size - 1; let mut request = client.get(url).header( reqwest::header::RANGE, format!("bytes={}-{}", offset, end_offset), ); if let Some(timeout) = timeout { request = request.timeout(timeout); } let response = request.send().await.map_err(|err| ("request failed", err))?; let mut stream = response.bytes_stream(); while let Some(item) = stream.next().await { let chunk = item.map_err(|err| ("receive failed", err))?; yield chunk; } } } pub fn stream(mut self) -> impl Stream<Item = Result<bytes::Bytes, Error>> { try_stream! { loop { let stream = Self::stream_fail( self.client.clone(), self.offset, self.size, self.url.clone(), self.receive_timeout.clone(), ); pin_mut!(stream); while let Some(result) = stream.next().await { match result { Ok(item) => { self.offset += item.len() as u64; self.size -= item.len() as u64; yield item }, Err(err) => if self.retry_count == 0 { Err(err)? } else { log::warn!("request for {} failed (retrying soon): {}", self.url, err); self.retry_count -= 1; } }; } if let Some(retry_delay) = self.retry_delay { delay_for(retry_delay).await; } } } } pub async fn single_fail( client: reqwest::Client, offset: u64, size: u64, url: Url, timeout: Option<Duration>, ) -> Result<bytes::Bytes, Error> { let end_offset = offset + size - 1; let mut request = client.get(url).header( reqwest::header::RANGE, format!("bytes={}-{}", offset, end_offset), ); if let Some(timeout) = timeout { request = request.timeout(timeout); } let response = request .send() .await .map_err(|err| ("request failed", err))?; response .bytes() .await .map_err(|err| ("receive failed", err).into()) } pub async fn single(mut self) -> Result<bytes::Bytes, Error> { loop { match Self::single_fail( self.client.clone(), self.offset, self.size, self.url.clone(), self.receive_timeout, ) .await { Ok(item) => return Ok(item), Err(err) => { if self.retry_count == 0 { return Err(err); } else { log::warn!("request for {} failed (retrying soon): {}", self.url, err); self.retry_count -= 1; } } } if let Some(retry_delay) = self.retry_delay { delay_for(retry_delay).await; } } } }