qiniu_download/
download.rs

1use super::{
2    async_api::{
3        RangePart, RangeReader as AsyncRangeReader, RangeReaderBuilder as AsyncRangeReaderBuilder,
4    },
5    base::{credential::Credential, download::RangeReaderBuilder as BaseRangeReaderBuilder},
6    config::{
7        build_range_reader_builder_from_config, build_range_reader_builder_from_env,
8        with_current_qiniu_config, Config,
9    },
10    sync_api::{
11        RangeReader as SyncRangeReader, RangeReaderBuilder as SyncRangeReaderBuilder, WriteSeek,
12    },
13};
14use positioned_io::ReadAt;
15use std::{io::Result as IoResult, time::Duration};
16
17#[derive(Debug)]
18/// 对象范围下载构建器
19pub struct RangeReaderBuilder(BaseRangeReaderBuilder);
20
21impl RangeReaderBuilder {
22    /// 创建对象范围下载构建器
23    /// # Arguments
24    ///
25    /// * `bucket` - 存储空间
26    /// * `key` - 对象名称
27    /// * `credential` - 存储空间所在账户的凭证
28    /// * `io_urls` - 七牛 IO 服务器 URL 列表
29
30    pub fn new(
31        bucket: impl Into<String>,
32        key: impl Into<String>,
33        credential: Credential,
34        io_urls: Vec<String>,
35    ) -> Self {
36        Self(BaseRangeReaderBuilder::new(
37            bucket.into(),
38            key.into(),
39            credential,
40            io_urls,
41        ))
42    }
43
44    /// 设置七牛 UC 服务器 URL 列表
45
46    pub fn uc_urls(self, urls: Vec<String>) -> Self {
47        self.with_inner(|b| b.uc_urls(urls))
48    }
49
50    /// 设置七牛监控服务器 URL 列表
51
52    pub fn monitor_urls(self, urls: Vec<String>) -> Self {
53        self.with_inner(|b| b.monitor_urls(urls))
54    }
55
56    /// 设置对象下载最大尝试次数
57
58    pub fn io_tries(self, tries: usize) -> Self {
59        self.with_inner(|b| b.io_tries(tries))
60    }
61
62    /// 设置 UC 查询的最大尝试次数
63
64    pub fn uc_tries(self, tries: usize) -> Self {
65        self.with_inner(|b| b.uc_tries(tries))
66    }
67
68    /// 设置打点记录上传的最大尝试次数
69
70    pub fn dot_tries(self, tries: usize) -> Self {
71        self.with_inner(|b| b.dot_tries(tries))
72    }
73
74    /// 设置 UC 查询的频率
75
76    pub fn update_interval(self, interval: Duration) -> Self {
77        self.with_inner(|b| b.update_interval(interval))
78    }
79
80    /// 设置域名访问失败后的惩罚时长
81
82    pub fn punish_duration(self, duration: Duration) -> Self {
83        self.with_inner(|b| b.punish_duration(duration))
84    }
85
86    /// 设置域名访问的基础超时时长
87
88    pub fn base_timeout(self, timeout: Duration) -> Self {
89        self.with_inner(|b| b.base_timeout(timeout))
90    }
91
92    /// 设置域名访问的连接时长
93
94    pub fn connect_timeout(self, timeout: Duration) -> Self {
95        self.with_inner(|b| b.connect_timeout(timeout))
96    }
97
98    /// 设置失败域名的最大重试次数
99    ///
100    /// 一旦一个域名的被惩罚次数超过限制,则域名选择器不会选择该域名,除非被惩罚的域名比例超过上限,或惩罚时长超过指定时长
101
102    pub fn max_punished_times(self, max_times: usize) -> Self {
103        self.with_inner(|b| b.max_punished_times(max_times))
104    }
105
106    /// 设置被惩罚的域名最大比例
107    ///
108    /// 域名选择器在搜索域名时,一旦被跳过的域名比例大于该值,则下一个域名将被选中,不管该域名是否也被惩罚。一旦该域名成功,则惩罚将立刻被取消
109
110    pub fn max_punished_hosts_percent(self, percent: u8) -> Self {
111        self.with_inner(|b| b.max_punished_hosts_percent(percent))
112    }
113
114    /// 设置是否使用 getfile API 下载
115
116    pub fn use_getfile_api(self, use_getfile_api: bool) -> Self {
117        self.with_inner(|b| b.use_getfile_api(use_getfile_api))
118    }
119
120    /// 设置是否对 key 进行格式化
121
122    pub fn normalize_key(self, normalize_key: bool) -> Self {
123        self.with_inner(|b| b.normalize_key(normalize_key))
124    }
125
126    /// 设置私有空间下载 URL 有效期,如果为 None,则使用公开空间下载 URL
127
128    pub fn private_url_lifetime(self, private_url_lifetime: Option<Duration>) -> Self {
129        self.with_inner(|b| b.private_url_lifetime(private_url_lifetime))
130    }
131
132    /// 设置打点记录上传频率
133
134    pub fn dot_interval(self, dot_interval: Duration) -> Self {
135        self.with_inner(|b| b.dot_interval(dot_interval))
136    }
137
138    /// 设置打点记录本地缓存文件尺寸上限
139
140    pub fn max_dot_buffer_size(self, max_dot_buffer_size: u64) -> Self {
141        self.with_inner(|b| b.max_dot_buffer_size(max_dot_buffer_size))
142    }
143
144    /// 设置最大并行重试次数,如果设置为 0 则表示禁止并行重试功能
145    pub fn max_retry_concurrency(self, max_retry_concurrency: u32) -> Self {
146        self.with_inner(|b| b.max_retry_concurrency(max_retry_concurrency))
147    }
148
149    /// 设置是否使用 HTTPS 协议来访问 IO 服务器
150
151    pub fn use_https(self, use_https: bool) -> Self {
152        self.with_inner(|b| b.use_https(use_https))
153    }
154
155    fn with_inner(
156        mut self,
157        f: impl FnOnce(BaseRangeReaderBuilder) -> BaseRangeReaderBuilder,
158    ) -> Self {
159        self.0 = f(self.0);
160        self
161    }
162
163    /// 构建范围下载器
164    pub fn build(self) -> RangeReader {
165        if self.0.max_retry_concurrency == Some(0) {
166            RangeReader(RangeReaderImpl::Sync(
167                SyncRangeReaderBuilder::from(self.0).build(),
168            ))
169        } else {
170            RangeReader(RangeReaderImpl::Async(
171                AsyncRangeReaderBuilder::from(self.0).build(),
172            ))
173        }
174    }
175
176    /// 从配置创建范围下载构建器
177    /// # Arguments
178    ///
179    /// * `key` - 对象名称
180    /// * `config` - 下载配置
181
182    pub fn from_config(key: impl Into<String>, config: &Config) -> Self {
183        Self(build_range_reader_builder_from_config(key.into(), config))
184    }
185
186    /// 从环境变量创建范围下载构建器
187    /// # Arguments
188    ///
189    /// * `key` - 对象名称
190
191    pub fn from_env(key: impl Into<String>) -> Option<Self> {
192        build_range_reader_builder_from_env(key.into(), false).map(Self)
193    }
194}
195
196/// 对象范围下载器
197#[derive(Debug)]
198pub struct RangeReader(RangeReaderImpl);
199
200#[derive(Debug)]
201enum RangeReaderImpl {
202    Sync(SyncRangeReader),
203    Async(AsyncRangeReader),
204}
205
206impl RangeReader {
207    /// 创建范围下载构建器
208
209    pub fn builder(
210        bucket: impl Into<String>,
211        key: impl Into<String>,
212        credential: Credential,
213        io_urls: Vec<String>,
214    ) -> RangeReaderBuilder {
215        RangeReaderBuilder::new(bucket, key, credential, io_urls)
216    }
217
218    /// 从配置创建范围下载器
219    /// # Arguments
220    ///
221    /// * `key` - 对象名称
222    /// * `config` - 下载配置
223    pub fn from_config(key: impl Into<String>, config: &Config) -> Self {
224        if config.max_retry_concurrency() == Some(0) {
225            Self(RangeReaderImpl::Sync(SyncRangeReader::from_config(
226                key.into(),
227                config,
228            )))
229        } else {
230            Self(RangeReaderImpl::Async(AsyncRangeReader::from_config(
231                key.into(),
232                config,
233            )))
234        }
235    }
236
237    /// 从环境变量创建范围下载器
238    /// # Arguments
239    ///
240    /// * `key` - 对象名称
241    pub fn from_env(key: impl Into<String>) -> Option<Self> {
242        let key = key.into();
243        with_current_qiniu_config(|config| {
244            config.and_then(|config| {
245                config.with_key(&key.to_owned(), |config| {
246                    if config.max_retry_concurrency() == Some(0) {
247                        SyncRangeReader::from_env(key)
248                            .map(RangeReaderImpl::Sync)
249                            .map(Self)
250                    } else {
251                        AsyncRangeReader::from_env(key)
252                            .map(RangeReaderImpl::Async)
253                            .map(Self)
254                    }
255                })
256            })
257        })
258        .flatten()
259    }
260
261    /// 主动更新域名列表
262    ///
263    /// 如果返回为 true 表示更新成功,否则返回 false
264    pub fn update_urls(&self) -> bool {
265        match &self.0 {
266            RangeReaderImpl::Sync(range_reader) => range_reader.update_urls(),
267            RangeReaderImpl::Async(range_reader) => range_reader.update_urls(),
268        }
269    }
270
271    /// 获取当前可用的 IO 节点的域名
272    pub fn io_urls(&self) -> Vec<String> {
273        match &self.0 {
274            RangeReaderImpl::Sync(range_reader) => range_reader.io_urls(),
275            RangeReaderImpl::Async(range_reader) => range_reader.io_urls(),
276        }
277    }
278
279    /// 读取文件的多个区域,返回每个区域对应的数据
280    /// # Arguments
281    /// * `range` - 区域列表,每个区域有开始偏移量和区域长度组成
282    pub fn read_multi_ranges(&self, ranges: &[(u64, u64)]) -> IoResult<Vec<RangePart>> {
283        match &self.0 {
284            RangeReaderImpl::Sync(range_reader) => range_reader.read_multi_ranges(ranges),
285            RangeReaderImpl::Async(range_reader) => range_reader.read_multi_ranges(ranges),
286        }
287    }
288
289    /// 判定当前对象是否存在
290    pub fn exist(&self) -> IoResult<bool> {
291        match &self.0 {
292            RangeReaderImpl::Sync(range_reader) => range_reader.exist(),
293            RangeReaderImpl::Async(range_reader) => range_reader.exist(),
294        }
295    }
296
297    /// 获取当前对象的文件大小
298    pub fn file_size(&self) -> IoResult<u64> {
299        match &self.0 {
300            RangeReaderImpl::Sync(range_reader) => range_reader.file_size(),
301            RangeReaderImpl::Async(range_reader) => range_reader.file_size(),
302        }
303    }
304
305    /// 下载当前对象到内存缓冲区中
306    pub fn download(&self) -> IoResult<Vec<u8>> {
307        match &self.0 {
308            RangeReaderImpl::Sync(range_reader) => range_reader.download(),
309            RangeReaderImpl::Async(range_reader) => range_reader.download(),
310        }
311    }
312
313    /// 下载当前对象到指定输出流中
314    pub fn download_to(&self, writer: &mut dyn WriteSeek) -> IoResult<u64> {
315        match &self.0 {
316            RangeReaderImpl::Sync(range_reader) => range_reader.download_to(writer),
317            RangeReaderImpl::Async(range_reader) => range_reader.download_to(writer),
318        }
319    }
320
321    /// 下载对象的最后指定个字节到缓冲区中,返回实际下载的字节数和整个文件的大小
322    pub fn read_last_bytes(&self, buf: &mut [u8]) -> IoResult<(u64, u64)> {
323        match &self.0 {
324            RangeReaderImpl::Sync(range_reader) => range_reader.read_last_bytes(buf),
325            RangeReaderImpl::Async(range_reader) => range_reader.read_last_bytes(buf),
326        }
327    }
328
329    #[cfg(test)]
330    pub(crate) fn is_async(&self) -> bool {
331        matches!(&self.0, RangeReaderImpl::Async(_))
332    }
333}
334
335impl ReadAt for RangeReader {
336    fn read_at(&self, pos: u64, buf: &mut [u8]) -> IoResult<usize> {
337        match &self.0 {
338            RangeReaderImpl::Sync(range_reader) => range_reader.read_at(pos, buf),
339            RangeReaderImpl::Async(range_reader) => range_reader.read_at(pos, buf),
340        }
341    }
342}