qiniu_upload_manager/resumable_policy/
fixed.rs1use super::{DynRead, GetPolicyOptions, ResumablePolicy, ResumablePolicyProvider};
2use std::{
3 fmt::Debug,
4 io::{Cursor, Read, Result as IoResult},
5};
6
7#[cfg(feature = "async")]
8use {
9 super::DynAsyncRead,
10 futures::{future::BoxFuture, io::Cursor as AsyncCursor, AsyncReadExt},
11};
12
13#[derive(Debug, Copy, Clone)]
15pub struct FixedThresholdResumablePolicy {
16 threshold: u64,
17}
18
19impl FixedThresholdResumablePolicy {
20 #[inline]
22 pub fn new(threshold: u64) -> Self {
23 Self::from(threshold)
24 }
25}
26
27impl Default for FixedThresholdResumablePolicy {
28 #[inline]
29 fn default() -> Self {
30 Self::from(1 << 22)
31 }
32}
33
34impl From<u64> for FixedThresholdResumablePolicy {
35 #[inline]
36 fn from(threshold: u64) -> Self {
37 Self { threshold }
38 }
39}
40
41impl From<FixedThresholdResumablePolicy> for u64 {
42 #[inline]
43 fn from(policy: FixedThresholdResumablePolicy) -> Self {
44 policy.threshold
45 }
46}
47
48impl ResumablePolicyProvider for FixedThresholdResumablePolicy {
49 #[inline]
50 fn get_policy_from_size(&self, source_size: u64, _opts: GetPolicyOptions) -> ResumablePolicy {
51 get_policy_from_size(self.threshold, source_size)
52 }
53
54 fn get_policy_from_reader<'a>(
55 &self,
56 mut reader: Box<dyn DynRead + 'a>,
57 opts: GetPolicyOptions,
58 ) -> IoResult<(ResumablePolicy, Box<dyn DynRead + 'a>)> {
59 let mut first_chunk = Vec::new();
60 (&mut reader).take(self.threshold + 1).read_to_end(&mut first_chunk)?;
61 let policy = self.get_policy_from_size(first_chunk.len() as u64, opts);
62 Ok((policy, Box::new(Cursor::new(first_chunk).chain(reader))))
63 }
64
65 #[cfg(feature = "async")]
66 #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))]
67 fn get_policy_from_async_reader<'a>(
68 &self,
69 mut reader: Box<dyn DynAsyncRead + 'a>,
70 _opts: GetPolicyOptions,
71 ) -> BoxFuture<'a, IoResult<(ResumablePolicy, Box<dyn DynAsyncRead + 'a>)>> {
72 let threshold = self.threshold;
73 Box::pin(async move {
74 let mut first_chunk = Vec::new();
75 (&mut reader).take(threshold + 1).read_to_end(&mut first_chunk).await?;
76 let policy = get_policy_from_size(threshold, first_chunk.len() as u64);
77 Ok((
78 policy,
79 Box::new(AsyncCursor::new(first_chunk).chain(reader)) as Box<dyn DynAsyncRead>,
80 ))
81 })
82 }
83}
84
85fn get_policy_from_size(threshold: u64, source_size: u64) -> ResumablePolicy {
86 if threshold < source_size {
87 ResumablePolicy::MultiPartsUploading
88 } else {
89 ResumablePolicy::SinglePartUploading
90 }
91}