1use super::{
2 async_api::{
3 RangePart, RangeReader as AsyncRangeReader, RangeReaderBuilder as AsyncRangeReaderBuilder,
4 },
5 base::{credential::Credential, download::RangeReaderBuilder as BaseRangeReaderBuilder},
6 config::{
7 build_range_reader_builder_from_config, build_range_reader_builder_from_env,
8 with_current_qiniu_config, Config,
9 },
10 sync_api::{
11 RangeReader as SyncRangeReader, RangeReaderBuilder as SyncRangeReaderBuilder, WriteSeek,
12 },
13};
14use positioned_io::ReadAt;
15use std::{io::Result as IoResult, time::Duration};
16
17#[derive(Debug)]
18pub struct RangeReaderBuilder(BaseRangeReaderBuilder);
20
21impl RangeReaderBuilder {
22 pub fn new(
31 bucket: impl Into<String>,
32 key: impl Into<String>,
33 credential: Credential,
34 io_urls: Vec<String>,
35 ) -> Self {
36 Self(BaseRangeReaderBuilder::new(
37 bucket.into(),
38 key.into(),
39 credential,
40 io_urls,
41 ))
42 }
43
44 pub fn uc_urls(self, urls: Vec<String>) -> Self {
47 self.with_inner(|b| b.uc_urls(urls))
48 }
49
50 pub fn monitor_urls(self, urls: Vec<String>) -> Self {
53 self.with_inner(|b| b.monitor_urls(urls))
54 }
55
56 pub fn io_tries(self, tries: usize) -> Self {
59 self.with_inner(|b| b.io_tries(tries))
60 }
61
62 pub fn uc_tries(self, tries: usize) -> Self {
65 self.with_inner(|b| b.uc_tries(tries))
66 }
67
68 pub fn dot_tries(self, tries: usize) -> Self {
71 self.with_inner(|b| b.dot_tries(tries))
72 }
73
74 pub fn update_interval(self, interval: Duration) -> Self {
77 self.with_inner(|b| b.update_interval(interval))
78 }
79
80 pub fn punish_duration(self, duration: Duration) -> Self {
83 self.with_inner(|b| b.punish_duration(duration))
84 }
85
86 pub fn base_timeout(self, timeout: Duration) -> Self {
89 self.with_inner(|b| b.base_timeout(timeout))
90 }
91
92 pub fn connect_timeout(self, timeout: Duration) -> Self {
95 self.with_inner(|b| b.connect_timeout(timeout))
96 }
97
98 pub fn max_punished_times(self, max_times: usize) -> Self {
103 self.with_inner(|b| b.max_punished_times(max_times))
104 }
105
106 pub fn max_punished_hosts_percent(self, percent: u8) -> Self {
111 self.with_inner(|b| b.max_punished_hosts_percent(percent))
112 }
113
114 pub fn use_getfile_api(self, use_getfile_api: bool) -> Self {
117 self.with_inner(|b| b.use_getfile_api(use_getfile_api))
118 }
119
120 pub fn normalize_key(self, normalize_key: bool) -> Self {
123 self.with_inner(|b| b.normalize_key(normalize_key))
124 }
125
126 pub fn private_url_lifetime(self, private_url_lifetime: Option<Duration>) -> Self {
129 self.with_inner(|b| b.private_url_lifetime(private_url_lifetime))
130 }
131
132 pub fn dot_interval(self, dot_interval: Duration) -> Self {
135 self.with_inner(|b| b.dot_interval(dot_interval))
136 }
137
138 pub fn max_dot_buffer_size(self, max_dot_buffer_size: u64) -> Self {
141 self.with_inner(|b| b.max_dot_buffer_size(max_dot_buffer_size))
142 }
143
144 pub fn max_retry_concurrency(self, max_retry_concurrency: u32) -> Self {
146 self.with_inner(|b| b.max_retry_concurrency(max_retry_concurrency))
147 }
148
149 pub fn use_https(self, use_https: bool) -> Self {
152 self.with_inner(|b| b.use_https(use_https))
153 }
154
155 fn with_inner(
156 mut self,
157 f: impl FnOnce(BaseRangeReaderBuilder) -> BaseRangeReaderBuilder,
158 ) -> Self {
159 self.0 = f(self.0);
160 self
161 }
162
163 pub fn build(self) -> RangeReader {
165 if self.0.max_retry_concurrency == Some(0) {
166 RangeReader(RangeReaderImpl::Sync(
167 SyncRangeReaderBuilder::from(self.0).build(),
168 ))
169 } else {
170 RangeReader(RangeReaderImpl::Async(
171 AsyncRangeReaderBuilder::from(self.0).build(),
172 ))
173 }
174 }
175
176 pub fn from_config(key: impl Into<String>, config: &Config) -> Self {
183 Self(build_range_reader_builder_from_config(key.into(), config))
184 }
185
186 pub fn from_env(key: impl Into<String>) -> Option<Self> {
192 build_range_reader_builder_from_env(key.into(), false).map(Self)
193 }
194}
195
196#[derive(Debug)]
198pub struct RangeReader(RangeReaderImpl);
199
200#[derive(Debug)]
201enum RangeReaderImpl {
202 Sync(SyncRangeReader),
203 Async(AsyncRangeReader),
204}
205
206impl RangeReader {
207 pub fn builder(
210 bucket: impl Into<String>,
211 key: impl Into<String>,
212 credential: Credential,
213 io_urls: Vec<String>,
214 ) -> RangeReaderBuilder {
215 RangeReaderBuilder::new(bucket, key, credential, io_urls)
216 }
217
218 pub fn from_config(key: impl Into<String>, config: &Config) -> Self {
224 if config.max_retry_concurrency() == Some(0) {
225 Self(RangeReaderImpl::Sync(SyncRangeReader::from_config(
226 key.into(),
227 config,
228 )))
229 } else {
230 Self(RangeReaderImpl::Async(AsyncRangeReader::from_config(
231 key.into(),
232 config,
233 )))
234 }
235 }
236
237 pub fn from_env(key: impl Into<String>) -> Option<Self> {
242 let key = key.into();
243 with_current_qiniu_config(|config| {
244 config.and_then(|config| {
245 config.with_key(&key.to_owned(), |config| {
246 if config.max_retry_concurrency() == Some(0) {
247 SyncRangeReader::from_env(key)
248 .map(RangeReaderImpl::Sync)
249 .map(Self)
250 } else {
251 AsyncRangeReader::from_env(key)
252 .map(RangeReaderImpl::Async)
253 .map(Self)
254 }
255 })
256 })
257 })
258 .flatten()
259 }
260
261 pub fn update_urls(&self) -> bool {
265 match &self.0 {
266 RangeReaderImpl::Sync(range_reader) => range_reader.update_urls(),
267 RangeReaderImpl::Async(range_reader) => range_reader.update_urls(),
268 }
269 }
270
271 pub fn io_urls(&self) -> Vec<String> {
273 match &self.0 {
274 RangeReaderImpl::Sync(range_reader) => range_reader.io_urls(),
275 RangeReaderImpl::Async(range_reader) => range_reader.io_urls(),
276 }
277 }
278
279 pub fn read_multi_ranges(&self, ranges: &[(u64, u64)]) -> IoResult<Vec<RangePart>> {
283 match &self.0 {
284 RangeReaderImpl::Sync(range_reader) => range_reader.read_multi_ranges(ranges),
285 RangeReaderImpl::Async(range_reader) => range_reader.read_multi_ranges(ranges),
286 }
287 }
288
289 pub fn exist(&self) -> IoResult<bool> {
291 match &self.0 {
292 RangeReaderImpl::Sync(range_reader) => range_reader.exist(),
293 RangeReaderImpl::Async(range_reader) => range_reader.exist(),
294 }
295 }
296
297 pub fn file_size(&self) -> IoResult<u64> {
299 match &self.0 {
300 RangeReaderImpl::Sync(range_reader) => range_reader.file_size(),
301 RangeReaderImpl::Async(range_reader) => range_reader.file_size(),
302 }
303 }
304
305 pub fn download(&self) -> IoResult<Vec<u8>> {
307 match &self.0 {
308 RangeReaderImpl::Sync(range_reader) => range_reader.download(),
309 RangeReaderImpl::Async(range_reader) => range_reader.download(),
310 }
311 }
312
313 pub fn download_to(&self, writer: &mut dyn WriteSeek) -> IoResult<u64> {
315 match &self.0 {
316 RangeReaderImpl::Sync(range_reader) => range_reader.download_to(writer),
317 RangeReaderImpl::Async(range_reader) => range_reader.download_to(writer),
318 }
319 }
320
321 pub fn read_last_bytes(&self, buf: &mut [u8]) -> IoResult<(u64, u64)> {
323 match &self.0 {
324 RangeReaderImpl::Sync(range_reader) => range_reader.read_last_bytes(buf),
325 RangeReaderImpl::Async(range_reader) => range_reader.read_last_bytes(buf),
326 }
327 }
328
329 #[cfg(test)]
330 pub(crate) fn is_async(&self) -> bool {
331 matches!(&self.0, RangeReaderImpl::Async(_))
332 }
333}
334
335impl ReadAt for RangeReader {
336 fn read_at(&self, pos: u64, buf: &mut [u8]) -> IoResult<usize> {
337 match &self.0 {
338 RangeReaderImpl::Sync(range_reader) => range_reader.read_at(pos, buf),
339 RangeReaderImpl::Async(range_reader) => range_reader.read_at(pos, buf),
340 }
341 }
342}