qiniu_upload_manager/
auto_uploader.rs

1use super::{
2    callbacks::Callbacks, ConcurrencyProvider, ConcurrentMultiPartsUploaderScheduler, DataPartitionProvider,
3    FileSystemResumableRecorder, FixedConcurrencyProvider, FixedDataPartitionProvider, FixedThresholdResumablePolicy,
4    FormUploader, MultiPartsUploaderScheduler, MultiPartsUploaderSchedulerExt, MultiPartsUploaderWithCallbacks,
5    MultiPartsV1Uploader, MultiPartsV2Uploader, ObjectParams, ObjectParamsBuilder, ResumablePolicy,
6    ResumablePolicyProvider, ResumableRecorder, SerialMultiPartsUploaderScheduler, SinglePartUploader, UploadManager,
7    UploadedPart, UploaderWithCallbacks, UploadingProgressInfo,
8};
9use anyhow::Result as AnyResult;
10use assert_impl::assert_impl;
11use digest::Digest;
12use qiniu_apis::{
13    http::ResponseParts,
14    http_client::{ApiResult, RequestBuilderParts, ResponseError},
15};
16use serde_json::Value;
17use sha1::Sha1;
18use std::{
19    fmt::Debug,
20    fs::metadata,
21    io::Read,
22    ops::{Deref, DerefMut},
23    path::Path,
24    sync::Arc,
25};
26
27#[cfg(feature = "async")]
28use {async_std::fs::metadata as async_metadata, futures::AsyncRead};
29
30/// 自动上传器
31///
32/// 使用设置的各种提供者,将文件或是二进制流数据上传。
33///
34/// ### 用自动上传器上传文件
35///
36/// ##### 阻塞代码示例
37///
38/// ```
39/// use qiniu_upload_manager::{
40///     apis::credential::Credential, AutoUploader, AutoUploaderObjectParams, UploadManager,
41///     UploadTokenSigner,
42/// };
43/// use std::time::Duration;
44///
45/// # fn example() -> anyhow::Result<()> {
46/// let bucket_name = "test-bucket";
47/// let object_name = "test-object";
48/// let upload_manager = UploadManager::builder(UploadTokenSigner::new_credential_provider(
49///     Credential::new("abcdefghklmnopq", "1234567890"),
50///     bucket_name,
51///     Duration::from_secs(3600),
52/// ))
53/// .build();
54/// let params = AutoUploaderObjectParams::builder().object_name(object_name).file_name(object_name).build();
55/// let mut uploader: AutoUploader = upload_manager.auto_uploader();
56/// uploader.upload_path("/home/qiniu/test.png", params)?;
57/// # Ok(())
58/// # }
59/// ```
60///
61/// ##### 异步代码示例
62///
63/// ```
64/// use qiniu_upload_manager::{
65///     apis::credential::Credential, AutoUploader, AutoUploaderObjectParams, UploadManager,
66///     UploadTokenSigner,
67/// };
68/// use std::time::Duration;
69///
70/// # async fn example() -> anyhow::Result<()> {
71/// let bucket_name = "test-bucket";
72/// let object_name = "test-object";
73/// let upload_manager = UploadManager::builder(UploadTokenSigner::new_credential_provider(
74///     Credential::new("abcdefghklmnopq", "1234567890"),
75///     bucket_name,
76///     Duration::from_secs(3600),
77/// ))
78/// .build();
79/// let params = AutoUploaderObjectParams::builder().object_name(object_name).file_name(object_name).build();
80/// let mut uploader: AutoUploader = upload_manager.auto_uploader();
81/// uploader.async_upload_path("/home/qiniu/test.png", params).await?;
82/// # Ok(())
83/// # }
84/// ```
85#[derive(Debug)]
86pub struct AutoUploader<H: Digest = Sha1> {
87    upload_manager: UploadManager,
88    callbacks: Callbacks<'static>,
89    concurrency_provider: Arc<dyn ConcurrencyProvider>,
90    data_partition_provider: Arc<dyn DataPartitionProvider>,
91    resumable_recorder: Arc<dyn ResumableRecorder<HashAlgorithm = H>>,
92    resumable_policy_provider: Arc<dyn ResumablePolicyProvider>,
93}
94
95impl<H: Digest + 'static> AutoUploader<H> {
96    /// 创建自动上传器
97    #[inline]
98    pub fn new(upload_manager: UploadManager) -> Self {
99        Self {
100            upload_manager,
101            callbacks: Default::default(),
102            concurrency_provider: Arc::new(FixedConcurrencyProvider::default()),
103            data_partition_provider: Arc::new(FixedDataPartitionProvider::default()),
104            resumable_recorder: Arc::new(FileSystemResumableRecorder::<H>::default()),
105            resumable_policy_provider: Arc::new(FixedThresholdResumablePolicy::default()),
106        }
107    }
108
109    /// 构建自动上传构建器
110    #[inline]
111    pub fn builder(upload_manager: UploadManager) -> AutoUploaderBuilder<H> {
112        AutoUploaderBuilder {
113            upload_manager,
114            callbacks: Default::default(),
115            concurrency_provider: Box::<FixedConcurrencyProvider>::default(),
116            data_partition_provider: Box::<FixedDataPartitionProvider>::default(),
117            resumable_recorder: Box::<FileSystemResumableRecorder<H>>::default(),
118            resumable_policy_provider: Box::<FixedThresholdResumablePolicy>::default(),
119        }
120    }
121}
122
123impl<H: Digest> UploaderWithCallbacks for AutoUploader<H> {
124    fn on_before_request<F: Fn(&mut RequestBuilderParts<'_>) -> AnyResult<()> + Send + Sync + 'static>(
125        &mut self,
126        callback: F,
127    ) -> &mut Self {
128        self.callbacks.insert_before_request_callback(callback);
129        self
130    }
131
132    fn on_upload_progress<F: Fn(&UploadingProgressInfo) -> AnyResult<()> + Send + Sync + 'static>(
133        &mut self,
134        callback: F,
135    ) -> &mut Self {
136        self.callbacks.insert_upload_progress_callback(callback);
137        self
138    }
139
140    fn on_response_ok<F: Fn(&mut ResponseParts) -> AnyResult<()> + Send + Sync + 'static>(
141        &mut self,
142        callback: F,
143    ) -> &mut Self {
144        self.callbacks.insert_after_response_ok_callback(callback);
145        self
146    }
147
148    fn on_response_error<F: Fn(&mut ResponseError) -> AnyResult<()> + Send + Sync + 'static>(
149        &mut self,
150        callback: F,
151    ) -> &mut Self {
152        self.callbacks.insert_after_response_error_callback(callback);
153        self
154    }
155}
156
157impl<H: Digest> MultiPartsUploaderWithCallbacks for AutoUploader<H> {
158    fn on_part_uploaded<F: Fn(&dyn UploadedPart) -> AnyResult<()> + Send + Sync + 'static>(
159        &mut self,
160        callback: F,
161    ) -> &mut Self {
162        self.callbacks.insert_part_uploaded_callback(callback);
163        self
164    }
165}
166
167macro_rules! sync_block {
168    ($code:block) => {
169        $code
170    };
171}
172
173#[cfg(feature = "async")]
174macro_rules! async_block {
175    ($code:block) => {
176        $code.await
177    };
178}
179
180macro_rules! with_uploader {
181    ($uploader:ident, $resumable_policy:expr, $params:expr, $wrapper:ident, $method:ident, $($args:expr,)*) => {
182        match $resumable_policy {
183            ResumablePolicy::SinglePartUploading => match $params.single_part_uploader_prefer() {
184                SinglePartUploaderPrefer::Form => {
185                    let uploader = FormUploader::new_with_callbacks(
186                        $uploader.upload_manager.to_owned(),
187                        $uploader.callbacks.to_owned(),
188                    );
189                    $wrapper!({uploader.$method($($args),*)})
190                }
191            },
192            ResumablePolicy::MultiPartsUploading => {
193                match (
194                    $params.multi_parts_uploader_prefer(),
195                    $params.multi_parts_uploader_scheduler_prefer(),
196                ) {
197                    (MultiPartsUploaderPrefer::V1, MultiPartsUploaderSchedulerPrefer::Concurrent) => {
198                        let mut uploader =
199                            ConcurrentMultiPartsUploaderScheduler::new(MultiPartsV1Uploader::new_with_callbacks(
200                                $uploader.upload_manager.to_owned(),
201                                $uploader.callbacks.to_owned(),
202                                $uploader.resumable_recorder.to_owned(),
203                            ));
204                        uploader.set_concurrency_provider(Box::new($uploader.concurrency_provider.to_owned()));
205                        uploader.set_data_partition_provider(Box::new($uploader.data_partition_provider.to_owned()));
206                        $wrapper!({uploader.$method($($args),*)})
207                    }
208                    (MultiPartsUploaderPrefer::V1, MultiPartsUploaderSchedulerPrefer::Serial) => {
209                        let mut uploader =
210                            SerialMultiPartsUploaderScheduler::new(MultiPartsV1Uploader::new_with_callbacks(
211                                $uploader.upload_manager.to_owned(),
212                                $uploader.callbacks.to_owned(),
213                                $uploader.resumable_recorder.to_owned(),
214                            ));
215                        uploader.set_concurrency_provider(Box::new($uploader.concurrency_provider.to_owned()));
216                        uploader.set_data_partition_provider(Box::new($uploader.data_partition_provider.to_owned()));
217                        $wrapper!({uploader.$method($($args),*)})
218                    }
219                    (MultiPartsUploaderPrefer::V2, MultiPartsUploaderSchedulerPrefer::Concurrent) => {
220                        let mut uploader =
221                            ConcurrentMultiPartsUploaderScheduler::new(MultiPartsV2Uploader::new_with_callbacks(
222                                $uploader.upload_manager.to_owned(),
223                                $uploader.callbacks.to_owned(),
224                                $uploader.resumable_recorder.to_owned(),
225                            ));
226                        uploader.set_concurrency_provider(Box::new($uploader.concurrency_provider.to_owned()));
227                        uploader.set_data_partition_provider(Box::new($uploader.data_partition_provider.to_owned()));
228                        $wrapper!({uploader.$method($($args),*)})
229                    }
230                    (MultiPartsUploaderPrefer::V2, MultiPartsUploaderSchedulerPrefer::Serial) => {
231                        let mut uploader =
232                            SerialMultiPartsUploaderScheduler::new(MultiPartsV2Uploader::new_with_callbacks(
233                                $uploader.upload_manager.to_owned(),
234                                $uploader.callbacks.to_owned(),
235                                $uploader.resumable_recorder.to_owned(),
236                            ));
237                        uploader.set_concurrency_provider(Box::new($uploader.concurrency_provider.to_owned()));
238                        uploader.set_data_partition_provider(Box::new($uploader.data_partition_provider.to_owned()));
239                        $wrapper!({uploader.$method($($args),*)})
240                    }
241                }
242            }
243        }
244    };
245}
246
247impl<H: Digest + Send + 'static> AutoUploader<H> {
248    /// 阻塞上传指定路径的文件
249    ///
250    /// 该方法的异步版本为 [`Self::async_upload_path`]。
251    pub fn upload_path(&self, path: impl AsRef<Path>, params: impl Into<AutoUploaderObjectParams>) -> ApiResult<Value> {
252        let params = params.into();
253        let size = metadata(path.as_ref())?.len();
254        with_uploader!(
255            self,
256            self.resumable_policy_provider
257                .get_policy_from_size(size, Default::default()),
258            params,
259            sync_block,
260            upload_path,
261            path.as_ref(),
262            params.into(),
263        )
264    }
265
266    /// 阻塞上传阅读器的数据
267    ///
268    /// 该方法的异步版本为 [`Self::async_upload_reader`]。
269    pub fn upload_reader<R: Read + Debug + Send + Sync + 'static>(
270        &self,
271        reader: R,
272        params: impl Into<AutoUploaderObjectParams>,
273    ) -> ApiResult<Value> {
274        let params = params.into();
275        let (policy, reader) = self
276            .resumable_policy_provider
277            .get_policy_from_reader(Box::new(reader), Default::default())?;
278        with_uploader!(self, policy, params, sync_block, upload_reader, reader, params.into(),)
279    }
280
281    /// 异步上传指定路径的文件
282    #[cfg(feature = "async")]
283    #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))]
284    pub async fn async_upload_path<'a>(
285        &'a self,
286        path: impl AsRef<Path> + Send + Sync + 'a,
287        params: impl Into<AutoUploaderObjectParams>,
288    ) -> ApiResult<Value> {
289        let params = params.into();
290        let size = async_metadata(path.as_ref()).await?.len();
291        with_uploader!(
292            self,
293            self.resumable_policy_provider
294                .get_policy_from_size(size, Default::default()),
295            params,
296            async_block,
297            async_upload_path,
298            path.as_ref(),
299            params.into(),
300        )
301    }
302
303    /// 异步上传阅读器的数据
304    #[cfg(feature = "async")]
305    #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))]
306    pub async fn async_upload_reader<R: AsyncRead + Unpin + Debug + Send + Sync + 'static>(
307        &self,
308        reader: R,
309        params: impl Into<AutoUploaderObjectParams>,
310    ) -> ApiResult<Value> {
311        let params = params.into();
312        let (policy, reader) = self
313            .resumable_policy_provider
314            .get_policy_from_async_reader(Box::new(reader), Default::default())
315            .await?;
316        with_uploader!(
317            self,
318            policy,
319            params,
320            async_block,
321            async_upload_reader,
322            reader,
323            params.into(),
324        )
325    }
326}
327
328impl<H: Digest> AutoUploader<H> {
329    #[allow(dead_code)]
330    fn assert() {
331        assert_impl!(Send: Self);
332        assert_impl!(Sync: Self);
333    }
334}
335
336impl<H: Digest> Clone for AutoUploader<H> {
337    #[inline]
338    fn clone(&self) -> Self {
339        Self {
340            upload_manager: self.upload_manager.to_owned(),
341            callbacks: self.callbacks.to_owned(),
342            concurrency_provider: self.concurrency_provider.to_owned(),
343            data_partition_provider: self.data_partition_provider.to_owned(),
344            resumable_recorder: self.resumable_recorder.to_owned(),
345            resumable_policy_provider: self.resumable_policy_provider.to_owned(),
346        }
347    }
348}
349
350/// 自动上传器对象参数
351#[derive(Debug, Default, Clone)]
352pub struct AutoUploaderObjectParams {
353    object_params: ObjectParams,
354    multi_parts_uploader_scheduler_prefer: MultiPartsUploaderSchedulerPrefer,
355    single_part_uploader_prefer: SinglePartUploaderPrefer,
356    multi_parts_uploader_prefer: MultiPartsUploaderPrefer,
357}
358
359/// 期望的分片上传调度器
360#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
361#[non_exhaustive]
362pub enum MultiPartsUploaderSchedulerPrefer {
363    /// 串行上传调度器
364    ///
365    /// 即 [`crate::SerialMultiPartsUploaderScheduler`]。
366    ///
367    /// 使用该方式,则始终使用单并发上传,不会使用 [`crate::DataPartitionProvider`] 的值。
368    Serial,
369
370    /// 并行上传调度器
371    ///
372    /// 即 [`crate::ConcurrentMultiPartsUploaderScheduler`]。
373    #[default]
374    Concurrent,
375}
376
377/// 期望的对象单请求上传器
378#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
379#[non_exhaustive]
380pub enum SinglePartUploaderPrefer {
381    /// 表单上传器
382    ///
383    /// 即 [`crate::FormUploader`]。
384    #[default]
385    Form,
386}
387
388/// 期望的对象分片上传器
389#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
390#[non_exhaustive]
391pub enum MultiPartsUploaderPrefer {
392    /// 分片上传器 V1
393    ///
394    /// 即 [`crate::MultiPartsV1Uploader`]。
395    V1,
396
397    /// 分片上传器 V2
398    ///
399    /// 即 [`crate::MultiPartsV2Uploader`]。
400    #[default]
401    V2,
402}
403
404impl AutoUploaderObjectParams {
405    /// 创建自动上传器对象参数构建器
406    #[inline]
407    pub fn builder() -> AutoUploaderObjectParamsBuilder {
408        Default::default()
409    }
410
411    /// 获取期望的分片上传调度器
412    #[inline]
413    pub fn multi_parts_uploader_scheduler_prefer(&self) -> MultiPartsUploaderSchedulerPrefer {
414        self.multi_parts_uploader_scheduler_prefer
415    }
416
417    /// 期望的对象单请求上传器
418    #[inline]
419    pub fn single_part_uploader_prefer(&self) -> SinglePartUploaderPrefer {
420        self.single_part_uploader_prefer
421    }
422
423    /// 期望的对象分片上传器
424    #[inline]
425    pub fn multi_parts_uploader_prefer(&self) -> MultiPartsUploaderPrefer {
426        self.multi_parts_uploader_prefer
427    }
428
429    #[allow(dead_code)]
430    fn assert() {
431        assert_impl!(Send: Self);
432        assert_impl!(Sync: Self);
433    }
434}
435
436impl Deref for AutoUploaderObjectParams {
437    type Target = ObjectParams;
438
439    #[inline]
440    fn deref(&self) -> &Self::Target {
441        &self.object_params
442    }
443}
444
445impl DerefMut for AutoUploaderObjectParams {
446    #[inline]
447    fn deref_mut(&mut self) -> &mut Self::Target {
448        &mut self.object_params
449    }
450}
451
452impl From<ObjectParams> for AutoUploaderObjectParams {
453    #[inline]
454    fn from(object_params: ObjectParams) -> Self {
455        Self {
456            object_params,
457            multi_parts_uploader_scheduler_prefer: Default::default(),
458            single_part_uploader_prefer: Default::default(),
459            multi_parts_uploader_prefer: Default::default(),
460        }
461    }
462}
463
464impl From<AutoUploaderObjectParams> for ObjectParams {
465    #[inline]
466    fn from(auto_uploader_object_params: AutoUploaderObjectParams) -> Self {
467        auto_uploader_object_params.object_params
468    }
469}
470
471/// 自动上传器对象参数构建器
472#[derive(Debug, Default)]
473pub struct AutoUploaderObjectParamsBuilder {
474    object_params_builder: ObjectParamsBuilder,
475    multi_parts_uploader_scheduler_prefer: MultiPartsUploaderSchedulerPrefer,
476    single_part_uploader_prefer: SinglePartUploaderPrefer,
477    multi_parts_uploader_prefer: MultiPartsUploaderPrefer,
478}
479
480impl Deref for AutoUploaderObjectParamsBuilder {
481    type Target = ObjectParamsBuilder;
482
483    #[inline]
484    fn deref(&self) -> &Self::Target {
485        &self.object_params_builder
486    }
487}
488
489impl DerefMut for AutoUploaderObjectParamsBuilder {
490    #[inline]
491    fn deref_mut(&mut self) -> &mut Self::Target {
492        &mut self.object_params_builder
493    }
494}
495
496impl AutoUploaderObjectParamsBuilder {
497    /// 设置期望的分片上传调度器
498    #[inline]
499    pub fn multi_parts_uploader_scheduler_prefer(
500        &mut self,
501        multi_parts_uploader_scheduler_prefer: MultiPartsUploaderSchedulerPrefer,
502    ) -> &mut Self {
503        self.multi_parts_uploader_scheduler_prefer = multi_parts_uploader_scheduler_prefer;
504        self
505    }
506
507    /// 设置对象单请求上传器
508    #[inline]
509    pub fn single_part_uploader_prefer(&mut self, single_part_uploader_prefer: SinglePartUploaderPrefer) -> &mut Self {
510        self.single_part_uploader_prefer = single_part_uploader_prefer;
511        self
512    }
513
514    /// 设置对象分片上传器
515    #[inline]
516    pub fn multi_parts_uploader_prefer(&mut self, multi_parts_uploader_prefer: MultiPartsUploaderPrefer) -> &mut Self {
517        self.multi_parts_uploader_prefer = multi_parts_uploader_prefer;
518        self
519    }
520
521    /// 构建自动上传器对象参数
522    #[inline]
523    pub fn build(&mut self) -> AutoUploaderObjectParams {
524        AutoUploaderObjectParams {
525            object_params: self.object_params_builder.build(),
526            multi_parts_uploader_scheduler_prefer: self.multi_parts_uploader_scheduler_prefer,
527            single_part_uploader_prefer: self.single_part_uploader_prefer,
528            multi_parts_uploader_prefer: self.multi_parts_uploader_prefer,
529        }
530    }
531
532    #[allow(dead_code)]
533    fn assert() {
534        assert_impl!(Send: Self);
535        assert_impl!(Sync: Self);
536    }
537}
538
539/// 自动上传构建器
540#[derive(Debug)]
541pub struct AutoUploaderBuilder<H: Digest = Sha1> {
542    upload_manager: UploadManager,
543    callbacks: Callbacks<'static>,
544    concurrency_provider: Box<dyn ConcurrencyProvider>,
545    data_partition_provider: Box<dyn DataPartitionProvider>,
546    resumable_recorder: Box<dyn ResumableRecorder<HashAlgorithm = H>>,
547    resumable_policy_provider: Box<dyn ResumablePolicyProvider>,
548}
549
550impl<H: Digest> AutoUploaderBuilder<H> {
551    /// 设置并发数提供者
552    #[inline]
553    pub fn concurrency_provider(&mut self, concurrency_provider: impl ConcurrencyProvider + 'static) -> &mut Self {
554        self.concurrency_provider = Box::new(concurrency_provider);
555        self
556    }
557
558    /// 设置分片大小提供者
559    #[inline]
560    pub fn data_partition_provider(
561        &mut self,
562        data_partition_provider: impl DataPartitionProvider + 'static,
563    ) -> &mut Self {
564        self.data_partition_provider = Box::new(data_partition_provider);
565        self
566    }
567
568    /// 设置断点恢复记录器
569    #[inline]
570    pub fn resumable_recorder(
571        &mut self,
572        resumable_recorder: impl ResumableRecorder<HashAlgorithm = H> + 'static,
573    ) -> &mut Self {
574        self.resumable_recorder = Box::new(resumable_recorder);
575        self
576    }
577
578    /// 设置可恢复策略
579    #[inline]
580    pub fn resumable_policy_provider(
581        &mut self,
582        resumable_policy_provider: impl ResumablePolicyProvider + 'static,
583    ) -> &mut Self {
584        self.resumable_policy_provider = Box::new(resumable_policy_provider);
585        self
586    }
587}
588
589impl<H: Digest> AutoUploaderBuilder<H> {
590    /// 构建上传提供者
591    #[inline]
592    pub fn build(&self) -> AutoUploader<H> {
593        let owned: AutoUploaderBuilder<H> = self.to_owned();
594        AutoUploader {
595            upload_manager: owned.upload_manager,
596            callbacks: owned.callbacks,
597            resumable_policy_provider: owned.resumable_policy_provider.into(),
598            concurrency_provider: owned.concurrency_provider.into(),
599            data_partition_provider: owned.data_partition_provider.into(),
600            resumable_recorder: owned.resumable_recorder.into(),
601        }
602    }
603
604    #[allow(dead_code)]
605    fn assert() {
606        assert_impl!(Send: Self);
607        assert_impl!(Sync: Self);
608    }
609}
610
611impl<H: Digest> Clone for AutoUploaderBuilder<H> {
612    #[inline]
613    fn clone(&self) -> Self {
614        Self {
615            upload_manager: self.upload_manager.to_owned(),
616            callbacks: self.callbacks.to_owned(),
617            concurrency_provider: self.concurrency_provider.to_owned(),
618            data_partition_provider: self.data_partition_provider.to_owned(),
619            resumable_recorder: self.resumable_recorder.to_owned(),
620            resumable_policy_provider: self.resumable_policy_provider.to_owned(),
621        }
622    }
623}