qiniu_upload_manager/scheduler/
mod.rs

1use super::{ConcurrencyProvider, DataPartitionProvider, DataSource, ObjectParams};
2use crate::{data_source::FileDataSource, data_source::UnseekableDataSource};
3use auto_impl::auto_impl;
4use digest::Digest;
5use dyn_clonable::clonable;
6use qiniu_apis::http_client::ApiResult;
7use serde_json::Value;
8use std::{fmt::Debug, io::Read, path::Path};
9
10#[cfg(feature = "async")]
11use {
12    crate::data_source::{AsyncDataSource, AsyncFileDataSource, AsyncUnseekableDataSource},
13    futures::{future::BoxFuture, AsyncRead},
14};
15
16/// 分片上传调度器接口
17///
18/// 负责分片上传的调度,包括初始化分片信息、上传分片、完成分片上传。
19#[clonable]
20#[auto_impl(&mut, Box)]
21pub trait MultiPartsUploaderScheduler<A: Digest>: Clone + Send + Sync + Debug {
22    /// 设置并发数提供者
23    fn set_concurrency_provider(&mut self, concurrency_provider: Box<dyn ConcurrencyProvider>);
24
25    /// 设置分片大小提供者
26    fn set_data_partition_provider(&mut self, data_partition_provider: Box<dyn DataPartitionProvider>);
27
28    /// 上传数据源
29    ///
30    /// 该方法的异步版本为 [`Self::async_upload`]。
31    fn upload(&self, source: Box<dyn DataSource<A>>, params: ObjectParams) -> ApiResult<Value>;
32
33    /// 异步上传数据源
34    #[cfg(feature = "async")]
35    #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))]
36    fn async_upload(&self, source: Box<dyn AsyncDataSource<A>>, params: ObjectParams) -> BoxFuture<ApiResult<Value>>;
37}
38
39/// 分片上传调度器扩展接口
40pub trait MultiPartsUploaderSchedulerExt<A: Digest + Send + 'static>: MultiPartsUploaderScheduler<A> {
41    /// 上传指定路径的文件
42    fn upload_path(&self, path: impl AsRef<Path>, params: ObjectParams) -> ApiResult<Value> {
43        self.upload(Box::new(FileDataSource::new(path.as_ref())), params)
44    }
45
46    /// 上传输入流的数据
47    fn upload_reader<R: Read + Debug + Send + Sync + 'static>(
48        &self,
49        reader: R,
50        params: ObjectParams,
51    ) -> ApiResult<Value> {
52        self.upload(Box::new(UnseekableDataSource::new(reader)), params)
53    }
54
55    /// 异步上传指定路径的文件
56    #[cfg(feature = "async")]
57    #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))]
58    fn async_upload_path<'a>(
59        &'a self,
60        path: impl AsRef<Path> + Send + Sync + 'a,
61        params: ObjectParams,
62    ) -> BoxFuture<'a, ApiResult<Value>> {
63        Box::pin(async move {
64            self.async_upload(Box::new(AsyncFileDataSource::new(path.as_ref())), params)
65                .await
66        })
67    }
68
69    /// 异步上传输入流的数据
70    #[cfg(feature = "async")]
71    #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))]
72    fn async_upload_reader<R: AsyncRead + Unpin + Debug + Send + Sync + 'static>(
73        &self,
74        reader: R,
75        params: ObjectParams,
76    ) -> BoxFuture<ApiResult<Value>> {
77        Box::pin(async move {
78            self.async_upload(Box::new(AsyncUnseekableDataSource::new(reader)), params)
79                .await
80        })
81    }
82}
83
84impl<A: Digest + Send + 'static, T: MultiPartsUploaderScheduler<A>> MultiPartsUploaderSchedulerExt<A> for T {}
85
86mod serial_multi_parts_uploader_scheduler;
87pub use serial_multi_parts_uploader_scheduler::SerialMultiPartsUploaderScheduler;
88mod concurrent_multi_parts_uploader_scheduler;
89pub use concurrent_multi_parts_uploader_scheduler::ConcurrentMultiPartsUploaderScheduler;
90
91mod utils;