rustfs_rsc/client/
mutilpart_upload.rs1use bytes::Bytes;
2use hyper::{header, HeaderMap, Method};
3
4use super::args::MultipartUploadTask;
5use super::{BucketArgs, CopySource, KeyArgs, ListMultipartUploadsArgs};
6use crate::datatype::Part;
7use crate::datatype::{
8 CompleteMultipartUpload, CompleteMultipartUploadResult, CopyPartResult,
9 InitiateMultipartUploadResult, ListMultipartUploadsResult, ListPartsResult,
10};
11use crate::error::{Result, S3Error, ValueError};
12use crate::signer::{MAX_MULTIPART_COUNT, MAX_PART_SIZE};
13use crate::Minio;
14
15impl Minio {
17 pub async fn abort_multipart_upload(&self, task: &MultipartUploadTask) -> Result<()> {
19 let res = self
20 .executor(Method::DELETE)
21 .bucket_name(task.bucket())
22 .object_name(task.key())
23 .query("uploadId", task.upload_id())
24 .apply(|e| {
25 if let Some(bucket) = task.bucket_owner() {
26 e.header("x-amz-expected-bucket-owner", bucket)
27 } else {
28 e
29 }
30 })
31 .send()
32 .await?;
33 if res.status() == 204 {
34 Ok(())
35 } else {
36 let text = res.text().await?;
37 let s: S3Error = text.as_str().try_into()?;
38 Err(s)?
39 }
40 }
41
42 pub async fn complete_multipart_upload(
44 &self,
45 task: &MultipartUploadTask,
46 parts: Vec<Part>,
47 extra_header: Option<HeaderMap>,
48 ) -> Result<CompleteMultipartUploadResult> {
49 let body = CompleteMultipartUpload { parts };
50 self.executor(Method::POST)
51 .bucket_name(task.bucket())
52 .object_name(task.key())
53 .query("uploadId", task.upload_id())
54 .apply(|e| {
55 if let Some(bucket) = task.bucket_owner() {
56 e.header("x-amz-expected-bucket-owner", bucket)
57 } else {
58 e
59 }
60 })
61 .headers_merge2(extra_header)
62 .headers_merge2(task.ssec_header().cloned())
63 .xml(&body)
64 .send_xml_ok()
65 .await
66 }
67
68
69 pub async fn create_multipart_upload_with_versionid<B, K>(
70 &self,
71 bucket: B,
72 key: K,
73 version_id: String,
74 ) -> Result<MultipartUploadTask>
75 where
76 B: Into<BucketArgs>,
77 K: Into<KeyArgs>,
78 {
79 let bucket: BucketArgs = bucket.into();
80 let key: KeyArgs = key.into();
81 let metadata_header: HeaderMap = key.get_metadata_header()?;
82 let expected_bucket_owner = bucket.expected_bucket_owner.clone();
83 let mut result: MultipartUploadTask = self
84 ._bucket_executor(bucket, Method::POST)
85 .object_name(key.name.as_str())
86 .query_string("uploads")
87 .query("versionId", version_id)
88 .header(
89 header::CONTENT_TYPE,
90 &key.content_type
91 .map_or("binary/octet-stream".to_string(), |f| f),
92 )
93 .headers_merge(metadata_header)
94 .headers_merge2(key.extra_headers)
95 .headers_merge2(key.ssec_headers.clone())
96 .send_xml_ok::<InitiateMultipartUploadResult>()
97 .await
98 .map(Into::into)?;
99 result.set_ssec_header(key.ssec_headers);
100 result.set_bucket_owner(expected_bucket_owner);
101 Ok(result)
102 }
103
104 pub async fn create_multipart_upload<B, K>(
106 &self,
107 bucket: B,
108 key: K,
109 ) -> Result<MultipartUploadTask>
110 where
111 B: Into<BucketArgs>,
112 K: Into<KeyArgs>,
113 {
114 let bucket: BucketArgs = bucket.into();
115 let key: KeyArgs = key.into();
116 let metadata_header: HeaderMap = key.get_metadata_header()?;
117 let expected_bucket_owner = bucket.expected_bucket_owner.clone();
118 let mut result: MultipartUploadTask = self
119 ._bucket_executor(bucket, Method::POST)
120 .object_name(key.name.as_str())
121 .query_string("uploads")
122 .header(
123 header::CONTENT_TYPE,
124 &key.content_type
125 .map_or("binary/octet-stream".to_string(), |f| f),
126 )
127 .headers_merge(metadata_header)
128 .headers_merge2(key.extra_headers)
129 .headers_merge2(key.ssec_headers.clone())
130 .send_xml_ok::<InitiateMultipartUploadResult>()
131 .await
132 .map(Into::into)?;
133 result.set_ssec_header(key.ssec_headers);
134 result.set_bucket_owner(expected_bucket_owner);
135 Ok(result)
136 }
137
138 pub async fn list_multipart_uploads(
140 &self,
141 args: ListMultipartUploadsArgs,
142 ) -> Result<ListMultipartUploadsResult> {
143 self.executor(Method::GET)
144 .bucket_name(args.bucket_name())
145 .querys(args.args_query_map())
146 .headers(args.args_headers())
147 .send_xml_ok()
148 .await
149 }
150
151 pub async fn list_parts(
153 &self,
154 task: &MultipartUploadTask,
155 max_parts: Option<usize>,
156 part_number_marker: Option<usize>,
157 ) -> Result<ListPartsResult> {
158 self.executor(Method::GET)
159 .bucket_name(task.bucket())
160 .object_name(task.key())
161 .query("uploadId", task.upload_id())
162 .query("max-parts", max_parts.unwrap_or(1000).to_string())
163 .apply(|e| {
164 let e = if let Some(n) = part_number_marker {
165 e.query("part-number-marker", n.to_string())
166 } else {
167 e
168 };
169 if let Some(bucket) = task.bucket_owner() {
170 e.header("x-amz-expected-bucket-owner", bucket)
171 } else {
172 e
173 }
174 })
175 .headers_merge2(task.ssec_header().cloned())
176 .send_xml_ok()
177 .await
178 }
179
180 pub async fn upload_part(
182 &self,
183 task: &MultipartUploadTask,
184 part_number: usize,
185 body: Bytes,
186 ) -> Result<Part> {
187 if part_number < 1 || part_number > MAX_MULTIPART_COUNT {
188 return Err(ValueError::from(
189 "part_number is a positive integer between 1 and 10,000.",
190 ))?;
191 }
192 if body.len() > MAX_PART_SIZE {
193 return Err(ValueError::from("part size must be less then 5GiB."))?;
194 }
195 let res = self
196 .executor(Method::PUT)
197 .bucket_name(task.bucket())
198 .object_name(task.key())
199 .query("uploadId", task.upload_id())
200 .query("partNumber", part_number.to_string())
201 .apply(|e| {
202 if let Some(bucket) = task.bucket_owner() {
203 e.header("x-amz-expected-bucket-owner", bucket)
204 } else {
205 e
206 }
207 })
208 .headers_merge2(task.ssec_header().cloned())
209 .body(body)
210 .send()
211 .await?;
212 if res.status() == 200 {
213 if let Some(s) = res
214 .headers()
215 .get(header::ETAG)
216 .map(|x| x.to_str().unwrap_or(""))
217 {
218 Ok(Part {
219 e_tag: s.to_string(),
220 part_number,
221 })
222 } else {
223 Err(res.into())
224 }
225 } else {
226 let text = res.text().await?;
227 let s: S3Error = text.as_str().try_into()?;
228 Err(s)?
229 }
230 }
231
232 pub async fn upload_part_copy(
234 &self,
235 task: &MultipartUploadTask,
236 part_number: usize,
237 copy_source: CopySource,
238 ) -> Result<Part> {
239 if part_number < 1 || part_number > MAX_MULTIPART_COUNT {
240 return Err(ValueError::from(
241 "part_number is a positive integer between 1 and 10,000.",
242 ))?;
243 }
244 self.executor(Method::PUT)
245 .bucket_name(task.bucket())
246 .object_name(task.key())
247 .query("uploadId", task.upload_id())
248 .query("partNumber", part_number.to_string())
249 .apply(|e| {
250 if let Some(bucket) = task.bucket_owner() {
251 e.header("x-amz-expected-bucket-owner", bucket)
252 } else {
253 e
254 }
255 })
256 .headers_merge2(task.ssec_header().cloned())
257 .headers_merge(copy_source.args_headers())
258 .send_xml_ok()
259 .await
260 .map(|CopyPartResult { e_tag }| Part { e_tag, part_number })
261 }
262}