1use super::{
2 callbacks::Callbacks, ConcurrencyProvider, ConcurrentMultiPartsUploaderScheduler, DataPartitionProvider,
3 FileSystemResumableRecorder, FixedConcurrencyProvider, FixedDataPartitionProvider, FixedThresholdResumablePolicy,
4 FormUploader, MultiPartsUploaderScheduler, MultiPartsUploaderSchedulerExt, MultiPartsUploaderWithCallbacks,
5 MultiPartsV1Uploader, MultiPartsV2Uploader, ObjectParams, ObjectParamsBuilder, ResumablePolicy,
6 ResumablePolicyProvider, ResumableRecorder, SerialMultiPartsUploaderScheduler, SinglePartUploader, UploadManager,
7 UploadedPart, UploaderWithCallbacks, UploadingProgressInfo,
8};
9use anyhow::Result as AnyResult;
10use assert_impl::assert_impl;
11use digest::Digest;
12use qiniu_apis::{
13 http::ResponseParts,
14 http_client::{ApiResult, RequestBuilderParts, ResponseError},
15};
16use serde_json::Value;
17use sha1::Sha1;
18use std::{
19 fmt::Debug,
20 fs::metadata,
21 io::Read,
22 ops::{Deref, DerefMut},
23 path::Path,
24 sync::Arc,
25};
26
27#[cfg(feature = "async")]
28use {async_std::fs::metadata as async_metadata, futures::AsyncRead};
29
30#[derive(Debug)]
86pub struct AutoUploader<H: Digest = Sha1> {
87 upload_manager: UploadManager,
88 callbacks: Callbacks<'static>,
89 concurrency_provider: Arc<dyn ConcurrencyProvider>,
90 data_partition_provider: Arc<dyn DataPartitionProvider>,
91 resumable_recorder: Arc<dyn ResumableRecorder<HashAlgorithm = H>>,
92 resumable_policy_provider: Arc<dyn ResumablePolicyProvider>,
93}
94
95impl<H: Digest + 'static> AutoUploader<H> {
96 #[inline]
98 pub fn new(upload_manager: UploadManager) -> Self {
99 Self {
100 upload_manager,
101 callbacks: Default::default(),
102 concurrency_provider: Arc::new(FixedConcurrencyProvider::default()),
103 data_partition_provider: Arc::new(FixedDataPartitionProvider::default()),
104 resumable_recorder: Arc::new(FileSystemResumableRecorder::<H>::default()),
105 resumable_policy_provider: Arc::new(FixedThresholdResumablePolicy::default()),
106 }
107 }
108
109 #[inline]
111 pub fn builder(upload_manager: UploadManager) -> AutoUploaderBuilder<H> {
112 AutoUploaderBuilder {
113 upload_manager,
114 callbacks: Default::default(),
115 concurrency_provider: Box::<FixedConcurrencyProvider>::default(),
116 data_partition_provider: Box::<FixedDataPartitionProvider>::default(),
117 resumable_recorder: Box::<FileSystemResumableRecorder<H>>::default(),
118 resumable_policy_provider: Box::<FixedThresholdResumablePolicy>::default(),
119 }
120 }
121}
122
123impl<H: Digest> UploaderWithCallbacks for AutoUploader<H> {
124 fn on_before_request<F: Fn(&mut RequestBuilderParts<'_>) -> AnyResult<()> + Send + Sync + 'static>(
125 &mut self,
126 callback: F,
127 ) -> &mut Self {
128 self.callbacks.insert_before_request_callback(callback);
129 self
130 }
131
132 fn on_upload_progress<F: Fn(&UploadingProgressInfo) -> AnyResult<()> + Send + Sync + 'static>(
133 &mut self,
134 callback: F,
135 ) -> &mut Self {
136 self.callbacks.insert_upload_progress_callback(callback);
137 self
138 }
139
140 fn on_response_ok<F: Fn(&mut ResponseParts) -> AnyResult<()> + Send + Sync + 'static>(
141 &mut self,
142 callback: F,
143 ) -> &mut Self {
144 self.callbacks.insert_after_response_ok_callback(callback);
145 self
146 }
147
148 fn on_response_error<F: Fn(&mut ResponseError) -> AnyResult<()> + Send + Sync + 'static>(
149 &mut self,
150 callback: F,
151 ) -> &mut Self {
152 self.callbacks.insert_after_response_error_callback(callback);
153 self
154 }
155}
156
157impl<H: Digest> MultiPartsUploaderWithCallbacks for AutoUploader<H> {
158 fn on_part_uploaded<F: Fn(&dyn UploadedPart) -> AnyResult<()> + Send + Sync + 'static>(
159 &mut self,
160 callback: F,
161 ) -> &mut Self {
162 self.callbacks.insert_part_uploaded_callback(callback);
163 self
164 }
165}
166
167macro_rules! sync_block {
168 ($code:block) => {
169 $code
170 };
171}
172
173#[cfg(feature = "async")]
174macro_rules! async_block {
175 ($code:block) => {
176 $code.await
177 };
178}
179
180macro_rules! with_uploader {
181 ($uploader:ident, $resumable_policy:expr, $params:expr, $wrapper:ident, $method:ident, $($args:expr,)*) => {
182 match $resumable_policy {
183 ResumablePolicy::SinglePartUploading => match $params.single_part_uploader_prefer() {
184 SinglePartUploaderPrefer::Form => {
185 let uploader = FormUploader::new_with_callbacks(
186 $uploader.upload_manager.to_owned(),
187 $uploader.callbacks.to_owned(),
188 );
189 $wrapper!({uploader.$method($($args),*)})
190 }
191 },
192 ResumablePolicy::MultiPartsUploading => {
193 match (
194 $params.multi_parts_uploader_prefer(),
195 $params.multi_parts_uploader_scheduler_prefer(),
196 ) {
197 (MultiPartsUploaderPrefer::V1, MultiPartsUploaderSchedulerPrefer::Concurrent) => {
198 let mut uploader =
199 ConcurrentMultiPartsUploaderScheduler::new(MultiPartsV1Uploader::new_with_callbacks(
200 $uploader.upload_manager.to_owned(),
201 $uploader.callbacks.to_owned(),
202 $uploader.resumable_recorder.to_owned(),
203 ));
204 uploader.set_concurrency_provider(Box::new($uploader.concurrency_provider.to_owned()));
205 uploader.set_data_partition_provider(Box::new($uploader.data_partition_provider.to_owned()));
206 $wrapper!({uploader.$method($($args),*)})
207 }
208 (MultiPartsUploaderPrefer::V1, MultiPartsUploaderSchedulerPrefer::Serial) => {
209 let mut uploader =
210 SerialMultiPartsUploaderScheduler::new(MultiPartsV1Uploader::new_with_callbacks(
211 $uploader.upload_manager.to_owned(),
212 $uploader.callbacks.to_owned(),
213 $uploader.resumable_recorder.to_owned(),
214 ));
215 uploader.set_concurrency_provider(Box::new($uploader.concurrency_provider.to_owned()));
216 uploader.set_data_partition_provider(Box::new($uploader.data_partition_provider.to_owned()));
217 $wrapper!({uploader.$method($($args),*)})
218 }
219 (MultiPartsUploaderPrefer::V2, MultiPartsUploaderSchedulerPrefer::Concurrent) => {
220 let mut uploader =
221 ConcurrentMultiPartsUploaderScheduler::new(MultiPartsV2Uploader::new_with_callbacks(
222 $uploader.upload_manager.to_owned(),
223 $uploader.callbacks.to_owned(),
224 $uploader.resumable_recorder.to_owned(),
225 ));
226 uploader.set_concurrency_provider(Box::new($uploader.concurrency_provider.to_owned()));
227 uploader.set_data_partition_provider(Box::new($uploader.data_partition_provider.to_owned()));
228 $wrapper!({uploader.$method($($args),*)})
229 }
230 (MultiPartsUploaderPrefer::V2, MultiPartsUploaderSchedulerPrefer::Serial) => {
231 let mut uploader =
232 SerialMultiPartsUploaderScheduler::new(MultiPartsV2Uploader::new_with_callbacks(
233 $uploader.upload_manager.to_owned(),
234 $uploader.callbacks.to_owned(),
235 $uploader.resumable_recorder.to_owned(),
236 ));
237 uploader.set_concurrency_provider(Box::new($uploader.concurrency_provider.to_owned()));
238 uploader.set_data_partition_provider(Box::new($uploader.data_partition_provider.to_owned()));
239 $wrapper!({uploader.$method($($args),*)})
240 }
241 }
242 }
243 }
244 };
245}
246
247impl<H: Digest + Send + 'static> AutoUploader<H> {
248 pub fn upload_path(&self, path: impl AsRef<Path>, params: impl Into<AutoUploaderObjectParams>) -> ApiResult<Value> {
252 let params = params.into();
253 let size = metadata(path.as_ref())?.len();
254 with_uploader!(
255 self,
256 self.resumable_policy_provider
257 .get_policy_from_size(size, Default::default()),
258 params,
259 sync_block,
260 upload_path,
261 path.as_ref(),
262 params.into(),
263 )
264 }
265
266 pub fn upload_reader<R: Read + Debug + Send + Sync + 'static>(
270 &self,
271 reader: R,
272 params: impl Into<AutoUploaderObjectParams>,
273 ) -> ApiResult<Value> {
274 let params = params.into();
275 let (policy, reader) = self
276 .resumable_policy_provider
277 .get_policy_from_reader(Box::new(reader), Default::default())?;
278 with_uploader!(self, policy, params, sync_block, upload_reader, reader, params.into(),)
279 }
280
281 #[cfg(feature = "async")]
283 #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))]
284 pub async fn async_upload_path<'a>(
285 &'a self,
286 path: impl AsRef<Path> + Send + Sync + 'a,
287 params: impl Into<AutoUploaderObjectParams>,
288 ) -> ApiResult<Value> {
289 let params = params.into();
290 let size = async_metadata(path.as_ref()).await?.len();
291 with_uploader!(
292 self,
293 self.resumable_policy_provider
294 .get_policy_from_size(size, Default::default()),
295 params,
296 async_block,
297 async_upload_path,
298 path.as_ref(),
299 params.into(),
300 )
301 }
302
303 #[cfg(feature = "async")]
305 #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))]
306 pub async fn async_upload_reader<R: AsyncRead + Unpin + Debug + Send + Sync + 'static>(
307 &self,
308 reader: R,
309 params: impl Into<AutoUploaderObjectParams>,
310 ) -> ApiResult<Value> {
311 let params = params.into();
312 let (policy, reader) = self
313 .resumable_policy_provider
314 .get_policy_from_async_reader(Box::new(reader), Default::default())
315 .await?;
316 with_uploader!(
317 self,
318 policy,
319 params,
320 async_block,
321 async_upload_reader,
322 reader,
323 params.into(),
324 )
325 }
326}
327
328impl<H: Digest> AutoUploader<H> {
329 #[allow(dead_code)]
330 fn assert() {
331 assert_impl!(Send: Self);
332 assert_impl!(Sync: Self);
333 }
334}
335
336impl<H: Digest> Clone for AutoUploader<H> {
337 #[inline]
338 fn clone(&self) -> Self {
339 Self {
340 upload_manager: self.upload_manager.to_owned(),
341 callbacks: self.callbacks.to_owned(),
342 concurrency_provider: self.concurrency_provider.to_owned(),
343 data_partition_provider: self.data_partition_provider.to_owned(),
344 resumable_recorder: self.resumable_recorder.to_owned(),
345 resumable_policy_provider: self.resumable_policy_provider.to_owned(),
346 }
347 }
348}
349
350#[derive(Debug, Default, Clone)]
352pub struct AutoUploaderObjectParams {
353 object_params: ObjectParams,
354 multi_parts_uploader_scheduler_prefer: MultiPartsUploaderSchedulerPrefer,
355 single_part_uploader_prefer: SinglePartUploaderPrefer,
356 multi_parts_uploader_prefer: MultiPartsUploaderPrefer,
357}
358
359#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
361#[non_exhaustive]
362pub enum MultiPartsUploaderSchedulerPrefer {
363 Serial,
369
370 #[default]
374 Concurrent,
375}
376
377#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
379#[non_exhaustive]
380pub enum SinglePartUploaderPrefer {
381 #[default]
385 Form,
386}
387
388#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
390#[non_exhaustive]
391pub enum MultiPartsUploaderPrefer {
392 V1,
396
397 #[default]
401 V2,
402}
403
404impl AutoUploaderObjectParams {
405 #[inline]
407 pub fn builder() -> AutoUploaderObjectParamsBuilder {
408 Default::default()
409 }
410
411 #[inline]
413 pub fn multi_parts_uploader_scheduler_prefer(&self) -> MultiPartsUploaderSchedulerPrefer {
414 self.multi_parts_uploader_scheduler_prefer
415 }
416
417 #[inline]
419 pub fn single_part_uploader_prefer(&self) -> SinglePartUploaderPrefer {
420 self.single_part_uploader_prefer
421 }
422
423 #[inline]
425 pub fn multi_parts_uploader_prefer(&self) -> MultiPartsUploaderPrefer {
426 self.multi_parts_uploader_prefer
427 }
428
429 #[allow(dead_code)]
430 fn assert() {
431 assert_impl!(Send: Self);
432 assert_impl!(Sync: Self);
433 }
434}
435
436impl Deref for AutoUploaderObjectParams {
437 type Target = ObjectParams;
438
439 #[inline]
440 fn deref(&self) -> &Self::Target {
441 &self.object_params
442 }
443}
444
445impl DerefMut for AutoUploaderObjectParams {
446 #[inline]
447 fn deref_mut(&mut self) -> &mut Self::Target {
448 &mut self.object_params
449 }
450}
451
452impl From<ObjectParams> for AutoUploaderObjectParams {
453 #[inline]
454 fn from(object_params: ObjectParams) -> Self {
455 Self {
456 object_params,
457 multi_parts_uploader_scheduler_prefer: Default::default(),
458 single_part_uploader_prefer: Default::default(),
459 multi_parts_uploader_prefer: Default::default(),
460 }
461 }
462}
463
464impl From<AutoUploaderObjectParams> for ObjectParams {
465 #[inline]
466 fn from(auto_uploader_object_params: AutoUploaderObjectParams) -> Self {
467 auto_uploader_object_params.object_params
468 }
469}
470
471#[derive(Debug, Default)]
473pub struct AutoUploaderObjectParamsBuilder {
474 object_params_builder: ObjectParamsBuilder,
475 multi_parts_uploader_scheduler_prefer: MultiPartsUploaderSchedulerPrefer,
476 single_part_uploader_prefer: SinglePartUploaderPrefer,
477 multi_parts_uploader_prefer: MultiPartsUploaderPrefer,
478}
479
480impl Deref for AutoUploaderObjectParamsBuilder {
481 type Target = ObjectParamsBuilder;
482
483 #[inline]
484 fn deref(&self) -> &Self::Target {
485 &self.object_params_builder
486 }
487}
488
489impl DerefMut for AutoUploaderObjectParamsBuilder {
490 #[inline]
491 fn deref_mut(&mut self) -> &mut Self::Target {
492 &mut self.object_params_builder
493 }
494}
495
496impl AutoUploaderObjectParamsBuilder {
497 #[inline]
499 pub fn multi_parts_uploader_scheduler_prefer(
500 &mut self,
501 multi_parts_uploader_scheduler_prefer: MultiPartsUploaderSchedulerPrefer,
502 ) -> &mut Self {
503 self.multi_parts_uploader_scheduler_prefer = multi_parts_uploader_scheduler_prefer;
504 self
505 }
506
507 #[inline]
509 pub fn single_part_uploader_prefer(&mut self, single_part_uploader_prefer: SinglePartUploaderPrefer) -> &mut Self {
510 self.single_part_uploader_prefer = single_part_uploader_prefer;
511 self
512 }
513
514 #[inline]
516 pub fn multi_parts_uploader_prefer(&mut self, multi_parts_uploader_prefer: MultiPartsUploaderPrefer) -> &mut Self {
517 self.multi_parts_uploader_prefer = multi_parts_uploader_prefer;
518 self
519 }
520
521 #[inline]
523 pub fn build(&mut self) -> AutoUploaderObjectParams {
524 AutoUploaderObjectParams {
525 object_params: self.object_params_builder.build(),
526 multi_parts_uploader_scheduler_prefer: self.multi_parts_uploader_scheduler_prefer,
527 single_part_uploader_prefer: self.single_part_uploader_prefer,
528 multi_parts_uploader_prefer: self.multi_parts_uploader_prefer,
529 }
530 }
531
532 #[allow(dead_code)]
533 fn assert() {
534 assert_impl!(Send: Self);
535 assert_impl!(Sync: Self);
536 }
537}
538
539#[derive(Debug)]
541pub struct AutoUploaderBuilder<H: Digest = Sha1> {
542 upload_manager: UploadManager,
543 callbacks: Callbacks<'static>,
544 concurrency_provider: Box<dyn ConcurrencyProvider>,
545 data_partition_provider: Box<dyn DataPartitionProvider>,
546 resumable_recorder: Box<dyn ResumableRecorder<HashAlgorithm = H>>,
547 resumable_policy_provider: Box<dyn ResumablePolicyProvider>,
548}
549
550impl<H: Digest> AutoUploaderBuilder<H> {
551 #[inline]
553 pub fn concurrency_provider(&mut self, concurrency_provider: impl ConcurrencyProvider + 'static) -> &mut Self {
554 self.concurrency_provider = Box::new(concurrency_provider);
555 self
556 }
557
558 #[inline]
560 pub fn data_partition_provider(
561 &mut self,
562 data_partition_provider: impl DataPartitionProvider + 'static,
563 ) -> &mut Self {
564 self.data_partition_provider = Box::new(data_partition_provider);
565 self
566 }
567
568 #[inline]
570 pub fn resumable_recorder(
571 &mut self,
572 resumable_recorder: impl ResumableRecorder<HashAlgorithm = H> + 'static,
573 ) -> &mut Self {
574 self.resumable_recorder = Box::new(resumable_recorder);
575 self
576 }
577
578 #[inline]
580 pub fn resumable_policy_provider(
581 &mut self,
582 resumable_policy_provider: impl ResumablePolicyProvider + 'static,
583 ) -> &mut Self {
584 self.resumable_policy_provider = Box::new(resumable_policy_provider);
585 self
586 }
587}
588
589impl<H: Digest> AutoUploaderBuilder<H> {
590 #[inline]
592 pub fn build(&self) -> AutoUploader<H> {
593 let owned: AutoUploaderBuilder<H> = self.to_owned();
594 AutoUploader {
595 upload_manager: owned.upload_manager,
596 callbacks: owned.callbacks,
597 resumable_policy_provider: owned.resumable_policy_provider.into(),
598 concurrency_provider: owned.concurrency_provider.into(),
599 data_partition_provider: owned.data_partition_provider.into(),
600 resumable_recorder: owned.resumable_recorder.into(),
601 }
602 }
603
604 #[allow(dead_code)]
605 fn assert() {
606 assert_impl!(Send: Self);
607 assert_impl!(Sync: Self);
608 }
609}
610
611impl<H: Digest> Clone for AutoUploaderBuilder<H> {
612 #[inline]
613 fn clone(&self) -> Self {
614 Self {
615 upload_manager: self.upload_manager.to_owned(),
616 callbacks: self.callbacks.to_owned(),
617 concurrency_provider: self.concurrency_provider.to_owned(),
618 data_partition_provider: self.data_partition_provider.to_owned(),
619 resumable_recorder: self.resumable_recorder.to_owned(),
620 resumable_policy_provider: self.resumable_policy_provider.to_owned(),
621 }
622 }
623}