1use std::collections::HashMap;
2use std::ops::Add;
3use std::path::Path;
4use std::pin::Pin;
5
6use bytes::{Bytes, BytesMut};
7use futures::{Stream, StreamExt};
8use hyper::{header, HeaderMap, Method};
9use reqwest::Response;
10
11use super::{BucketArgs, CopySource, KeyArgs, ObjectStat, SelectObjectReader, Tags};
12use crate::datatype::{AccessControlPolicy, LegalHold, Retention};
13use crate::datatype::{LegalHoldStatus, SelectRequest};
14use crate::error::{Error, Result, S3Error, ValueError};
15use crate::signer::{MAX_MULTIPART_OBJECT_SIZE, MIN_PART_SIZE};
16use crate::Minio;
17
18impl Minio {
20 #[inline]
21 fn _object_executor(
22 &self,
23 method: Method,
24 bucket: BucketArgs,
25 key: KeyArgs,
26 with_sscs: bool,
27 with_content_type: bool,
28 ) -> Result<super::BaseExecutor> {
29 let is_put = method == Method::PUT;
30 let metadata_header = if is_put {
31 key.get_metadata_header()?
32 } else {
33 HeaderMap::new()
34 };
35 let executor = self
36 ._bucket_executor(bucket, method)
37 .object_name(key.name)
38 .headers_merge2(key.extra_headers)
39 .apply(|mut e| {
40 if let Some(version_id) = key.version_id {
41 e = e.query("versionId", version_id)
42 }
43 if is_put {
44 e = e.headers_merge(metadata_header);
45 }
46 if with_content_type {
47 if let Some(content_type) = key.content_type {
48 if is_put {
49 e = e.header(header::CONTENT_TYPE, content_type);
50 } else {
51 e = e.header("response-content-type", content_type);
52 }
53 }
54 };
55 if with_sscs {
56 e = e.headers_merge2(key.ssec_headers);
57 }
58 e
59 });
60 Ok(executor)
61 }
62
63 #[inline]
81 pub async fn copy_object<B, K>(&self, bucket: B, key: K, src: CopySource) -> Result<()>
82 where
83 B: Into<BucketArgs>,
84 K: Into<KeyArgs>,
85 {
86 self._object_executor(Method::PUT, bucket.into(), key.into(), true, true)?
87 .headers_merge(src.args_headers())
88 .send_ok()
89 .await
90 .map(|_| ())
91 }
92
93 #[cfg(feature = "fs-tokio")]
104 pub async fn fget_object<B, K, P>(&self, bucket: B, key: K, path: P) -> Result<()>
105 where
106 B: Into<BucketArgs>,
107 K: Into<KeyArgs>,
108 P: AsRef<Path>,
109 {
110 use tokio::{fs::File, io::AsyncWriteExt};
111
112 let res = self.get_object(bucket, key).await?;
113 if !res.status().is_success() {
114 let text = res.text().await?;
115 let s3err: S3Error = text.as_str().try_into()?;
116 Err(s3err)?
117 } else {
118 let mut stream = res.bytes_stream();
119 let mut file = File::create(path).await?;
120 while let Some(item) = stream.next().await {
121 if let Ok(datas) = item {
122 file.write_all(&datas).await?;
123 }
124 }
125 Ok(())
126 }
127 }
128
129 pub async fn get_object<B, K>(&self, bucket: B, key: K) -> Result<Response>
144 where
145 B: Into<BucketArgs>,
146 K: Into<KeyArgs>,
147 {
148 let bucket: BucketArgs = bucket.into();
149 let key: KeyArgs = key.into();
150 let range = key.range();
151 self._object_executor(Method::GET, bucket, key, true, true)?
152 .apply(|e| {
153 if let Some(range) = range {
154 e.header(header::RANGE, &range)
155 } else {
156 e
157 }
158 })
159 .send_ok()
160 .await
161 }
162
163 pub async fn get_object_torrent<B, K>(&self, bucket: B, key: K) -> Result<Response>
165 where
166 B: Into<BucketArgs>,
167 K: Into<KeyArgs>,
168 {
169 let bucket: BucketArgs = bucket.into();
170 let key: KeyArgs = key.into();
171 self._object_executor(Method::GET, bucket, key, true, true)?
172 .query("torrent", "")
173 .send_ok()
174 .await
175 }
176
177 pub async fn put_object<B, K>(&self, bucket: B, key: K, data: Bytes) -> Result<()>
199 where
200 B: Into<BucketArgs>,
201 K: Into<KeyArgs>,
202 {
203 let bucket: BucketArgs = bucket.into();
204 let key: KeyArgs = key.into();
205 self._object_executor(Method::PUT, bucket, key, true, true)?
206 .body(data)
207 .send_ok()
208 .await?;
209 Ok(())
210 }
211
212 pub async fn put_object_stream<B, K>(
218 &self,
219 bucket: B,
220 key: K,
221 mut stream: Pin<Box<dyn Stream<Item = Result<Bytes>> + Sync + Send>>,
222 len: Option<usize>,
223 ) -> Result<()>
224 where
225 B: Into<BucketArgs>,
226 K: Into<KeyArgs>,
227 {
228 let bucket: BucketArgs = bucket.into();
229 let key: KeyArgs = key.into();
230 if let Some(len) = len {
231 if len >= MAX_MULTIPART_OBJECT_SIZE {
232 return Err(ValueError::from("max object size is 5TiB").into());
233 }
234 if self.multi_chunked() || len < MIN_PART_SIZE {
235 self._object_executor(Method::PUT, bucket, key, true, true)?
236 .body((stream, len))
237 .send_ok()
238 .await?;
239 return Ok(());
240 }
241 }
242 let mpu_args = self.create_multipart_upload(bucket, key).await?;
243
244 let mut parts = Vec::new();
245 let mut current = BytesMut::with_capacity(MIN_PART_SIZE);
246 while let Some(piece) = stream.next().await {
247 if current.len() >= MIN_PART_SIZE {
248 let part = match self
249 .upload_part(&mpu_args, parts.len().add(1), current.freeze())
250 .await
251 {
252 Ok(pce) => pce,
253 Err(e) => {
254 return match self.abort_multipart_upload(&mpu_args).await {
255 Ok(_) => Err(e),
256 Err(err) => Err(err),
257 }
258 }
259 };
260 current = BytesMut::with_capacity(MIN_PART_SIZE);
261 parts.push(part);
262 }
263 match piece {
264 Ok(open_piece) => {
265 current.extend_from_slice(&open_piece);
266 }
267 Err(e) => {
268 self.abort_multipart_upload(&mpu_args).await?;
269 return Err(e);
270 }
271 }
272 }
273 if current.len() != 0 {
274 let part = match self
275 .upload_part(&mpu_args, parts.len().add(1), current.freeze())
276 .await
277 {
278 Ok(pce) => pce,
279 Err(e) => {
280 return match self.abort_multipart_upload(&mpu_args).await {
281 Ok(_) => Err(e),
282 Err(err) => Err(err),
283 }
284 }
285 };
286 parts.push(part);
287 }
288
289 self.complete_multipart_upload(&mpu_args, parts, None)
290 .await
291 .map(|_| ())
292 }
293
294 #[cfg(feature = "fs-tokio")]
305 pub async fn fput_object<B, K, P>(&self, bucket: B, key: K, path: P) -> Result<()>
306 where
307 B: Into<BucketArgs>,
308 K: Into<KeyArgs>,
309 P: AsRef<Path>,
310 {
311 use crate::signer::RECOMMEND_CHUNK_SIZE;
312 use async_stream::stream;
313 use tokio::io::AsyncReadExt;
314
315 let mut file = tokio::fs::File::open(path).await?;
316 let meta = file.metadata().await?;
317 let len = meta.len() as usize;
318 let stm = Box::pin(stream! {
319 loop {
320 let mut buf = BytesMut::with_capacity(RECOMMEND_CHUNK_SIZE);
321 let size = file.read_buf(&mut buf).await;
322 yield match size {
323 Ok(d) if d > 0 => Ok(buf.freeze()),
324 Ok(_) => break,
325 Err(e) => Err(e.into())
326 }
327 }
328 });
329 self.put_object_stream(bucket, key, stm, Some(len)).await
330 }
331
332 #[inline]
343 pub async fn remove_object<B, K>(&self, bucket: B, key: K) -> Result<()>
344 where
345 B: Into<BucketArgs>,
346 K: Into<KeyArgs>,
347 {
348 self._object_executor(Method::DELETE, bucket.into(), key.into(), true, false)?
349 .send_ok()
350 .await
351 .map(|_| ())
352 }
353
354 pub async fn stat_object<B, K>(&self, bucket: B, key: K) -> Result<Option<ObjectStat>>
367 where
368 B: Into<BucketArgs>,
369 K: Into<KeyArgs>,
370 {
371 let bucket: BucketArgs = bucket.into();
372 let key: KeyArgs = key.into();
373 let bucket_name = bucket.name.clone();
374 let object_name = key.name.clone();
375 let res = self
376 ._object_executor(Method::HEAD, bucket, key, true, false)?
377 .send()
378 .await?;
379 if !res.status().is_success() {
380 return Ok(None);
381 }
382 let res_header = res.headers();
383 let etag = res_header
384 .get(header::ETAG)
385 .map(|x| x.to_str().unwrap_or(""))
386 .unwrap_or("")
387 .replace("\"", "");
388 let size: usize = res_header
389 .get(header::CONTENT_LENGTH)
390 .map(|x| x.to_str().unwrap_or("0").parse().unwrap_or(0))
391 .unwrap_or(0);
392 let last_modified = res_header
393 .get(header::LAST_MODIFIED)
394 .map(|x| x.to_str().unwrap_or(""))
395 .unwrap_or("")
396 .to_owned();
397 let content_type = res_header
398 .get(header::CONTENT_TYPE)
399 .map(|x| x.to_str().unwrap_or(""))
400 .unwrap_or("")
401 .to_owned();
402 let version_id = res_header
403 .get("x-amz-version-id")
404 .map(|x| x.to_str().unwrap_or(""))
405 .unwrap_or("")
406 .to_owned();
407 let mut metadata = HashMap::new();
408 res_header.into_iter().for_each(|(k, v)| {
409 let key = k.as_str();
410 if key.starts_with("x-amz-meta-") {
411 if let Ok(value) = String::from_utf8(v.as_bytes().to_vec()) {
412 metadata.insert(key[11..].to_string(), value.to_owned());
413 }
414 }
415 });
416 Ok(Some(ObjectStat {
417 bucket_name,
418 object_name,
419 last_modified,
420 etag,
421 content_type,
422 version_id,
423 size,
424 metadata,
425 }))
426 }
427
428 pub async fn get_object_acl<B, K>(&self, bucket: B, key: K) -> Result<AccessControlPolicy>
430 where
431 B: Into<BucketArgs>,
432 K: Into<KeyArgs>,
433 {
434 let bucket: BucketArgs = bucket.into();
435 let key: KeyArgs = key.into();
436 self._object_executor(Method::GET, bucket, key, false, false)?
437 .query("acl", "")
438 .send_xml_ok()
439 .await
440 }
441
442 pub async fn is_object_legal_hold_enabled<B, K>(&self, bucket: B, key: K) -> Result<bool>
444 where
445 B: Into<BucketArgs>,
446 K: Into<KeyArgs>,
447 {
448 let bucket: BucketArgs = bucket.into();
449 let key: KeyArgs = key.into();
450 let result = self
451 ._object_executor(Method::GET, bucket, key, false, false)?
452 .query("legal-hold", "")
453 .send_xml_ok::<LegalHold>()
454 .await;
455 match result {
456 Ok(l) => Ok(l.status == LegalHoldStatus::ON),
457 Err(Error::S3Error(s)) => {
459 if s.code == "NoSuchObjectLockConfiguration" {
460 return Ok(false);
461 } else {
462 Err(Error::S3Error(s))
463 }
464 }
465 Err(err) => Err(err),
466 }
467 }
468
469 pub async fn enable_object_legal_hold_enabled<B, K>(&self, bucket: B, key: K) -> Result<()>
471 where
472 B: Into<BucketArgs>,
473 K: Into<KeyArgs>,
474 {
475 let bucket: BucketArgs = bucket.into();
476 let key: KeyArgs = key.into();
477 let legal_hold: LegalHold = LegalHold {
478 status: LegalHoldStatus::ON,
479 };
480 self._object_executor(Method::PUT, bucket, key, false, false)?
481 .query("legal-hold", "")
482 .xml(&legal_hold)
483 .send_ok()
484 .await
485 .map(|_| ())
486 }
487
488 pub async fn disable_object_legal_hold_enabled<B, K>(&self, bucket: B, key: K) -> Result<()>
490 where
491 B: Into<BucketArgs>,
492 K: Into<KeyArgs>,
493 {
494 let bucket: BucketArgs = bucket.into();
495 let key: KeyArgs = key.into();
496 let legal_hold: LegalHold = LegalHold {
497 status: LegalHoldStatus::OFF,
498 };
499 self._object_executor(Method::PUT, bucket, key, false, false)?
500 .query("legal-hold", "")
501 .xml(&legal_hold)
502 .send_ok()
503 .await
504 .map(|_| ())
505 }
506
507 pub async fn get_object_tags<B, K>(&self, bucket: B, key: K) -> Result<Tags>
520 where
521 B: Into<BucketArgs>,
522 K: Into<KeyArgs>,
523 {
524 let bucket: BucketArgs = bucket.into();
525 let key: KeyArgs = key.into();
526 self._object_executor(Method::GET, bucket, key, false, false)?
527 .query("tagging", "")
528 .send_xml_ok()
529 .await
530 }
531
532 pub async fn set_object_tags<B, K, T>(&self, bucket: B, key: K, tags: T) -> Result<()>
548 where
549 B: Into<BucketArgs>,
550 K: Into<KeyArgs>,
551 T: Into<Tags>,
552 {
553 let bucket: BucketArgs = bucket.into();
554 let key: KeyArgs = key.into();
555 self._object_executor(Method::PUT, bucket, key, false, false)?
556 .query("tagging", "")
557 .xml(&tags.into())
558 .send_ok()
559 .await
560 .map(|_| ())
561 }
562
563 pub async fn del_object_tags<B, K>(&self, bucket: B, key: K) -> Result<()>
574 where
575 B: Into<BucketArgs>,
576 K: Into<KeyArgs>,
577 {
578 let bucket: BucketArgs = bucket.into();
579 let key: KeyArgs = key.into();
580 self._object_executor(Method::DELETE, bucket, key, false, false)?
581 .query("tagging", "")
582 .send_ok()
583 .await
584 .map(|_| ())
585 }
586
587 pub async fn get_object_retention<B, K>(&self, bucket: B, key: K) -> Result<Retention>
589 where
590 B: Into<BucketArgs>,
591 K: Into<KeyArgs>,
592 {
593 let bucket: BucketArgs = bucket.into();
594 let key: KeyArgs = key.into();
595 self._object_executor(Method::GET, bucket, key, false, false)?
596 .query("retention", "")
597 .send_xml_ok()
598 .await
599 }
600
601 pub async fn set_object_retention<B, K>(
603 &self,
604 bucket: B,
605 key: K,
606 retention: Retention,
607 ) -> Result<()>
608 where
609 B: Into<BucketArgs>,
610 K: Into<KeyArgs>,
611 {
612 let bucket: BucketArgs = bucket.into();
613 let key: KeyArgs = key.into();
614 self._object_executor(Method::PUT, bucket, key, false, false)?
615 .query("retention", "")
616 .xml(&retention)
617 .send_ok()
618 .await
619 .map(|_| ())
620 }
621
622 pub async fn select_object_content<B, K>(
644 &self,
645 bucket: B,
646 key: K,
647 request: SelectRequest,
648 ) -> Result<SelectObjectReader>
649 where
650 B: Into<BucketArgs>,
651 K: Into<KeyArgs>,
652 {
653 let bucket: BucketArgs = bucket.into();
654 let key: KeyArgs = key.into();
655 self._object_executor(Method::POST, bucket, key, true, false)?
656 .query_string("select&select-type=2")
657 .xml(&request)
658 .send_ok()
659 .await
660 .map(|res| SelectObjectReader::new(res, request.output_serialization))
661 }
662}