rustfs_rsc/client/
mutilpart_upload.rs

1use bytes::Bytes;
2use hyper::{header, HeaderMap, Method};
3
4use super::args::MultipartUploadTask;
5use super::{BucketArgs, CopySource, KeyArgs, ListMultipartUploadsArgs};
6use crate::datatype::Part;
7use crate::datatype::{
8    CompleteMultipartUpload, CompleteMultipartUploadResult, CopyPartResult,
9    InitiateMultipartUploadResult, ListMultipartUploadsResult, ListPartsResult,
10};
11use crate::error::{Result, S3Error, ValueError};
12use crate::signer::{MAX_MULTIPART_COUNT, MAX_PART_SIZE};
13use crate::Minio;
14
15/// Operating multiUpload
16impl Minio {
17    /// Aborts a multipart upload.
18    pub async fn abort_multipart_upload(&self, task: &MultipartUploadTask) -> Result<()> {
19        let res = self
20            .executor(Method::DELETE)
21            .bucket_name(task.bucket())
22            .object_name(task.key())
23            .query("uploadId", task.upload_id())
24            .apply(|e| {
25                if let Some(bucket) = task.bucket_owner() {
26                    e.header("x-amz-expected-bucket-owner", bucket)
27                } else {
28                    e
29                }
30            })
31            .send()
32            .await?;
33        if res.status() == 204 {
34            Ok(())
35        } else {
36            let text = res.text().await?;
37            let s: S3Error = text.as_str().try_into()?;
38            Err(s)?
39        }
40    }
41
42    /// Completes a multipart upload by assembling previously uploaded parts.
43    pub async fn complete_multipart_upload(
44        &self,
45        task: &MultipartUploadTask,
46        parts: Vec<Part>,
47        extra_header: Option<HeaderMap>,
48    ) -> Result<CompleteMultipartUploadResult> {
49        let body = CompleteMultipartUpload { parts };
50        self.executor(Method::POST)
51            .bucket_name(task.bucket())
52            .object_name(task.key())
53            .query("uploadId", task.upload_id())
54            .apply(|e| {
55                if let Some(bucket) = task.bucket_owner() {
56                    e.header("x-amz-expected-bucket-owner", bucket)
57                } else {
58                    e
59                }
60            })
61            .headers_merge2(extra_header)
62            .headers_merge2(task.ssec_header().cloned())
63            .xml(&body)
64            .send_xml_ok()
65            .await
66    }
67
68
69    pub async fn create_multipart_upload_with_versionid<B, K>(
70        &self,
71        bucket: B,
72        key: K,
73        version_id: String,
74    ) -> Result<MultipartUploadTask>
75    where
76        B: Into<BucketArgs>,
77        K: Into<KeyArgs>,
78    {
79        let bucket: BucketArgs = bucket.into();
80        let key: KeyArgs = key.into();
81        let metadata_header: HeaderMap = key.get_metadata_header()?;
82        let expected_bucket_owner = bucket.expected_bucket_owner.clone();
83        let mut result: MultipartUploadTask = self
84            ._bucket_executor(bucket, Method::POST)
85            .object_name(key.name.as_str())
86            .query_string("uploads")
87            .query("versionId", version_id)
88            .header(
89                header::CONTENT_TYPE,
90                &key.content_type
91                    .map_or("binary/octet-stream".to_string(), |f| f),
92            )
93            .headers_merge(metadata_header)
94            .headers_merge2(key.extra_headers)
95            .headers_merge2(key.ssec_headers.clone())
96            .send_xml_ok::<InitiateMultipartUploadResult>()
97            .await
98            .map(Into::into)?;
99        result.set_ssec_header(key.ssec_headers);
100        result.set_bucket_owner(expected_bucket_owner);
101        Ok(result)
102    }
103
104    /// This action initiates a multipart upload and returns an MultipartUploadArgs.
105    pub async fn create_multipart_upload<B, K>(
106        &self,
107        bucket: B,
108        key: K,
109    ) -> Result<MultipartUploadTask>
110    where
111        B: Into<BucketArgs>,
112        K: Into<KeyArgs>,
113    {
114        let bucket: BucketArgs = bucket.into();
115        let key: KeyArgs = key.into();
116        let metadata_header: HeaderMap = key.get_metadata_header()?;
117        let expected_bucket_owner = bucket.expected_bucket_owner.clone();
118        let mut result: MultipartUploadTask = self
119            ._bucket_executor(bucket, Method::POST)
120            .object_name(key.name.as_str())
121            .query_string("uploads")
122            .header(
123                header::CONTENT_TYPE,
124                &key.content_type
125                    .map_or("binary/octet-stream".to_string(), |f| f),
126            )
127            .headers_merge(metadata_header)
128            .headers_merge2(key.extra_headers)
129            .headers_merge2(key.ssec_headers.clone())
130            .send_xml_ok::<InitiateMultipartUploadResult>()
131            .await
132            .map(Into::into)?;
133        result.set_ssec_header(key.ssec_headers);
134        result.set_bucket_owner(expected_bucket_owner);
135        Ok(result)
136    }
137
138    /// lists in-progress multipart uploads.
139    pub async fn list_multipart_uploads(
140        &self,
141        args: ListMultipartUploadsArgs,
142    ) -> Result<ListMultipartUploadsResult> {
143        self.executor(Method::GET)
144            .bucket_name(args.bucket_name())
145            .querys(args.args_query_map())
146            .headers(args.args_headers())
147            .send_xml_ok()
148            .await
149    }
150
151    /// Lists the parts that have been uploaded for a specific multipart upload.
152    pub async fn list_parts(
153        &self,
154        task: &MultipartUploadTask,
155        max_parts: Option<usize>,
156        part_number_marker: Option<usize>,
157    ) -> Result<ListPartsResult> {
158        self.executor(Method::GET)
159            .bucket_name(task.bucket())
160            .object_name(task.key())
161            .query("uploadId", task.upload_id())
162            .query("max-parts", max_parts.unwrap_or(1000).to_string())
163            .apply(|e| {
164                let e = if let Some(n) = part_number_marker {
165                    e.query("part-number-marker", n.to_string())
166                } else {
167                    e
168                };
169                if let Some(bucket) = task.bucket_owner() {
170                    e.header("x-amz-expected-bucket-owner", bucket)
171                } else {
172                    e
173                }
174            })
175            .headers_merge2(task.ssec_header().cloned())
176            .send_xml_ok()
177            .await
178    }
179
180    /// Uploads a part in a multipart upload.
181    pub async fn upload_part(
182        &self,
183        task: &MultipartUploadTask,
184        part_number: usize,
185        body: Bytes,
186    ) -> Result<Part> {
187        if part_number < 1 || part_number > MAX_MULTIPART_COUNT {
188            return Err(ValueError::from(
189                "part_number is a positive integer between 1 and 10,000.",
190            ))?;
191        }
192        if body.len() > MAX_PART_SIZE {
193            return Err(ValueError::from("part size must be less then 5GiB."))?;
194        }
195        let res = self
196            .executor(Method::PUT)
197            .bucket_name(task.bucket())
198            .object_name(task.key())
199            .query("uploadId", task.upload_id())
200            .query("partNumber", part_number.to_string())
201            .apply(|e| {
202                if let Some(bucket) = task.bucket_owner() {
203                    e.header("x-amz-expected-bucket-owner", bucket)
204                } else {
205                    e
206                }
207            })
208            .headers_merge2(task.ssec_header().cloned())
209            .body(body)
210            .send()
211            .await?;
212        if res.status() == 200 {
213            if let Some(s) = res
214                .headers()
215                .get(header::ETAG)
216                .map(|x| x.to_str().unwrap_or(""))
217            {
218                Ok(Part {
219                    e_tag: s.to_string(),
220                    part_number,
221                })
222            } else {
223                Err(res.into())
224            }
225        } else {
226            let text = res.text().await?;
227            let s: S3Error = text.as_str().try_into()?;
228            Err(s)?
229        }
230    }
231
232    /// Uploads a part by copying data from an existing object as data source.
233    pub async fn upload_part_copy(
234        &self,
235        task: &MultipartUploadTask,
236        part_number: usize,
237        copy_source: CopySource,
238    ) -> Result<Part> {
239        if part_number < 1 || part_number > MAX_MULTIPART_COUNT {
240            return Err(ValueError::from(
241                "part_number is a positive integer between 1 and 10,000.",
242            ))?;
243        }
244        self.executor(Method::PUT)
245            .bucket_name(task.bucket())
246            .object_name(task.key())
247            .query("uploadId", task.upload_id())
248            .query("partNumber", part_number.to_string())
249            .apply(|e| {
250                if let Some(bucket) = task.bucket_owner() {
251                    e.header("x-amz-expected-bucket-owner", bucket)
252                } else {
253                    e
254                }
255            })
256            .headers_merge2(task.ssec_header().cloned())
257            .headers_merge(copy_source.args_headers())
258            .send_xml_ok()
259            .await
260            .map(|CopyPartResult { e_tag }| Part { e_tag, part_number })
261    }
262}