rustfs_rsc/client/
operate_object.rs

1use std::collections::HashMap;
2use std::ops::Add;
3use std::path::Path;
4use std::pin::Pin;
5
6use bytes::{Bytes, BytesMut};
7use futures::{Stream, StreamExt};
8use hyper::{header, HeaderMap, Method};
9use reqwest::Response;
10
11use super::{BucketArgs, CopySource, KeyArgs, ObjectStat, SelectObjectReader, Tags};
12use crate::datatype::{AccessControlPolicy, LegalHold, Retention};
13use crate::datatype::{LegalHoldStatus, SelectRequest};
14use crate::error::{Error, Result, S3Error, ValueError};
15use crate::signer::{MAX_MULTIPART_OBJECT_SIZE, MIN_PART_SIZE};
16use crate::Minio;
17
18/// Operating the object
19impl Minio {
20    #[inline]
21    fn _object_executor(
22        &self,
23        method: Method,
24        bucket: BucketArgs,
25        key: KeyArgs,
26        with_sscs: bool,
27        with_content_type: bool,
28    ) -> Result<super::BaseExecutor> {
29        let is_put = method == Method::PUT;
30        let metadata_header = if is_put {
31            key.get_metadata_header()?
32        } else {
33            HeaderMap::new()
34        };
35        let executor = self
36            ._bucket_executor(bucket, method)
37            .object_name(key.name)
38            .headers_merge2(key.extra_headers)
39            .apply(|mut e| {
40                if let Some(version_id) = key.version_id {
41                    e = e.query("versionId", version_id)
42                }
43                if is_put {
44                    e = e.headers_merge(metadata_header);
45                }
46                if with_content_type {
47                    if let Some(content_type) = key.content_type {
48                        if is_put {
49                            e = e.header(header::CONTENT_TYPE, content_type);
50                        } else {
51                            e = e.header("response-content-type", content_type);
52                        }
53                    }
54                };
55                if with_sscs {
56                    e = e.headers_merge2(key.ssec_headers);
57                }
58                e
59            });
60        Ok(executor)
61    }
62
63    /// Creates a copy of an object that is already stored in Minio.
64    /// ## Exapmle
65    /// ``` rust
66    /// # use minio_rsc::Minio;
67    /// use minio_rsc::error::Result;
68    /// use minio_rsc::client::{CopySource, KeyArgs};
69    ///
70    /// # async fn example(minio: Minio)->Result<()>{
71    /// let src = CopySource::new("bucket","key1");
72    /// let response = minio.copy_object("bucket", "det", src).await?;
73    /// // modify content-type
74    /// let dst = KeyArgs::new("key2").content_type(Some("image/jpeg".to_string()));
75    /// let src = CopySource::new("bucket","key1").metadata_replace(true);
76    /// let response = minio.copy_object("bucket", dst, src).await?;
77    /// # Ok(())
78    /// # }
79    /// ```
80    #[inline]
81    pub async fn copy_object<B, K>(&self, bucket: B, key: K, src: CopySource) -> Result<()>
82    where
83        B: Into<BucketArgs>,
84        K: Into<KeyArgs>,
85    {
86        self._object_executor(Method::PUT, bucket.into(), key.into(), true, true)?
87            .headers_merge(src.args_headers())
88            .send_ok()
89            .await
90            .map(|_| ())
91    }
92
93    /// Downloads data of an object to file.
94    /// # Exapmle
95    /// ``` rust
96    /// # use minio_rsc::Minio;
97    /// # use minio_rsc::error::Result;
98    /// # async fn example(minio: Minio)->Result<()>{
99    /// let response = minio.fget_object("bucket", "file.txt", "local_file.txt").await?;
100    /// # Ok(())
101    /// # }
102    /// ```
103    #[cfg(feature = "fs-tokio")]
104    pub async fn fget_object<B, K, P>(&self, bucket: B, key: K, path: P) -> Result<()>
105    where
106        B: Into<BucketArgs>,
107        K: Into<KeyArgs>,
108        P: AsRef<Path>,
109    {
110        use tokio::{fs::File, io::AsyncWriteExt};
111
112        let res = self.get_object(bucket, key).await?;
113        if !res.status().is_success() {
114            let text = res.text().await?;
115            let s3err: S3Error = text.as_str().try_into()?;
116            Err(s3err)?
117        } else {
118            let mut stream = res.bytes_stream();
119            let mut file = File::create(path).await?;
120            while let Some(item) = stream.next().await {
121                if let Ok(datas) = item {
122                    file.write_all(&datas).await?;
123                }
124            }
125            Ok(())
126        }
127    }
128
129    /// Get [reqwest::Response] of an object.
130    /// ## Exapmle
131    /// ``` rust
132    /// use reqwest::Response;
133    /// # use minio_rsc::Minio;
134    /// # use minio_rsc::client::KeyArgs;
135    /// # use minio_rsc::error::Result;
136    /// # async fn example(minio: Minio)->Result<()>{
137    /// let response: Response = minio.get_object("bucket", "file.txt").await?;
138    /// let key = KeyArgs::new("file.txt").version_id(Some("cdabf31a-9752-4265-b137-6b3961fbaf9b".to_string()));
139    /// let response: Response = minio.get_object("bucket", key).await?;
140    /// # Ok(())
141    /// # }
142    /// ```
143    pub async fn get_object<B, K>(&self, bucket: B, key: K) -> Result<Response>
144    where
145        B: Into<BucketArgs>,
146        K: Into<KeyArgs>,
147    {
148        let bucket: BucketArgs = bucket.into();
149        let key: KeyArgs = key.into();
150        let range = key.range();
151        self._object_executor(Method::GET, bucket, key, true, true)?
152            .apply(|e| {
153                if let Some(range) = range {
154                    e.header(header::RANGE, &range)
155                } else {
156                    e
157                }
158            })
159            .send_ok()
160            .await
161    }
162
163    /// Get torrent files from a bucket.
164    pub async fn get_object_torrent<B, K>(&self, bucket: B, key: K) -> Result<Response>
165    where
166        B: Into<BucketArgs>,
167        K: Into<KeyArgs>,
168    {
169        let bucket: BucketArgs = bucket.into();
170        let key: KeyArgs = key.into();
171        self._object_executor(Method::GET, bucket, key, true, true)?
172            .query("torrent", "")
173            .send_ok()
174            .await
175    }
176
177    /// Uploads data to an object in a bucket.
178    /// ## Exapmle
179    /// ``` rust
180    /// use reqwest::Response;
181    /// use std::collections::HashMap;
182    /// use minio_rsc::client::KeyArgs;
183    /// # use minio_rsc::Minio;
184    /// # use minio_rsc::error::Result;
185    ///
186    /// # async fn example(minio: Minio)->Result<()>{
187    /// let data = "hello minio";
188    /// minio.put_object("bucket", "file.txt", data.into()).await?;
189    ///
190    /// let metadata: HashMap<String, String> = [("filename".to_owned(), "file.txt".to_owned())].into();
191    /// let key = KeyArgs::new("file.txt")
192    ///             .content_type(Some("text/plain".to_string()))
193    ///             .metadata(metadata);
194    /// minio.put_object("bucket", key, data.into()).await?;
195    /// # Ok(())
196    /// # }
197    /// ```
198    pub async fn put_object<B, K>(&self, bucket: B, key: K, data: Bytes) -> Result<()>
199    where
200        B: Into<BucketArgs>,
201        K: Into<KeyArgs>,
202    {
203        let bucket: BucketArgs = bucket.into();
204        let key: KeyArgs = key.into();
205        self._object_executor(Method::PUT, bucket, key, true, true)?
206            .body(data)
207            .send_ok()
208            .await?;
209        Ok(())
210    }
211
212    /// Upload large payload in an efficient manner easily.
213    ///
214    /// - len: total byte length of stream.
215    /// If set None, the data will be transmitted through `multipart_upload`.
216    /// otherwise the data will be transmitted in multiple chunks through an HTTP request.
217    pub async fn put_object_stream<B, K>(
218        &self,
219        bucket: B,
220        key: K,
221        mut stream: Pin<Box<dyn Stream<Item = Result<Bytes>> + Sync + Send>>,
222        len: Option<usize>,
223    ) -> Result<()>
224    where
225        B: Into<BucketArgs>,
226        K: Into<KeyArgs>,
227    {
228        let bucket: BucketArgs = bucket.into();
229        let key: KeyArgs = key.into();
230        if let Some(len) = len {
231            if len >= MAX_MULTIPART_OBJECT_SIZE {
232                return Err(ValueError::from("max object size is 5TiB").into());
233            }
234            if self.multi_chunked() || len < MIN_PART_SIZE {
235                self._object_executor(Method::PUT, bucket, key, true, true)?
236                    .body((stream, len))
237                    .send_ok()
238                    .await?;
239                return Ok(());
240            }
241        }
242        let mpu_args = self.create_multipart_upload(bucket, key).await?;
243
244        let mut parts = Vec::new();
245        let mut current = BytesMut::with_capacity(MIN_PART_SIZE);
246        while let Some(piece) = stream.next().await {
247            if current.len() >= MIN_PART_SIZE {
248                let part = match self
249                    .upload_part(&mpu_args, parts.len().add(1), current.freeze())
250                    .await
251                {
252                    Ok(pce) => pce,
253                    Err(e) => {
254                        return match self.abort_multipart_upload(&mpu_args).await {
255                            Ok(_) => Err(e),
256                            Err(err) => Err(err),
257                        }
258                    }
259                };
260                current = BytesMut::with_capacity(MIN_PART_SIZE);
261                parts.push(part);
262            }
263            match piece {
264                Ok(open_piece) => {
265                    current.extend_from_slice(&open_piece);
266                }
267                Err(e) => {
268                    self.abort_multipart_upload(&mpu_args).await?;
269                    return Err(e);
270                }
271            }
272        }
273        if current.len() != 0 {
274            let part = match self
275                .upload_part(&mpu_args, parts.len().add(1), current.freeze())
276                .await
277            {
278                Ok(pce) => pce,
279                Err(e) => {
280                    return match self.abort_multipart_upload(&mpu_args).await {
281                        Ok(_) => Err(e),
282                        Err(err) => Err(err),
283                    }
284                }
285            };
286            parts.push(part);
287        }
288
289        self.complete_multipart_upload(&mpu_args, parts, None)
290            .await
291            .map(|_| ())
292    }
293
294    /// Uploads data from a file to an object in a bucket.
295    /// ## Exapmle
296    /// ``` rust
297    /// # use minio_rsc::Minio;
298    /// # use minio_rsc::error::Result;
299    /// # async fn example(minio: Minio)->Result<()>{
300    /// minio.fput_object("bucket", "file.txt","localfile.txt").await?;
301    /// # Ok(())
302    /// # }
303    /// ```
304    #[cfg(feature = "fs-tokio")]
305    pub async fn fput_object<B, K, P>(&self, bucket: B, key: K, path: P) -> Result<()>
306    where
307        B: Into<BucketArgs>,
308        K: Into<KeyArgs>,
309        P: AsRef<Path>,
310    {
311        use crate::signer::RECOMMEND_CHUNK_SIZE;
312        use async_stream::stream;
313        use tokio::io::AsyncReadExt;
314
315        let mut file = tokio::fs::File::open(path).await?;
316        let meta = file.metadata().await?;
317        let len = meta.len() as usize;
318        let stm = Box::pin(stream! {
319            loop  {
320                let mut buf = BytesMut::with_capacity(RECOMMEND_CHUNK_SIZE);
321                let size = file.read_buf(&mut buf).await;
322                yield match size {
323                    Ok(d) if d > 0 => Ok(buf.freeze()),
324                    Ok(_) => break,
325                    Err(e) => Err(e.into())
326                }
327            }
328        });
329        self.put_object_stream(bucket, key, stm, Some(len)).await
330    }
331
332    /// Remove an object.
333    /// ## Exapmle
334    /// ``` rust
335    /// # use minio_rsc::Minio;
336    /// # use minio_rsc::error::Result;
337    /// # async fn example(minio: Minio)->Result<()>{
338    /// let response = minio.remove_object("bucket", "file.txt").await?;
339    /// # Ok(())
340    /// # }
341    /// ```
342    #[inline]
343    pub async fn remove_object<B, K>(&self, bucket: B, key: K) -> Result<()>
344    where
345        B: Into<BucketArgs>,
346        K: Into<KeyArgs>,
347    {
348        self._object_executor(Method::DELETE, bucket.into(), key.into(), true, false)?
349            .send_ok()
350            .await
351            .map(|_| ())
352    }
353
354    /// Get object information.
355    ///
356    /// return Ok(Some([ObjectStat])) if object exists and you have READ access to the object, otherwise return Ok([None])
357    /// ## Exapmle
358    /// ``` rust
359    /// # use minio_rsc::Minio;
360    /// # use minio_rsc::error::Result;
361    /// # async fn example(minio: Minio)->Result<()>{
362    /// let object_stat = minio.stat_object("bucket", "file.txt").await?;
363    /// # Ok(())
364    /// # }
365    /// ```
366    pub async fn stat_object<B, K>(&self, bucket: B, key: K) -> Result<Option<ObjectStat>>
367    where
368        B: Into<BucketArgs>,
369        K: Into<KeyArgs>,
370    {
371        let bucket: BucketArgs = bucket.into();
372        let key: KeyArgs = key.into();
373        let bucket_name = bucket.name.clone();
374        let object_name = key.name.clone();
375        let res = self
376            ._object_executor(Method::HEAD, bucket, key, true, false)?
377            .send()
378            .await?;
379        if !res.status().is_success() {
380            return Ok(None);
381        }
382        let res_header = res.headers();
383        let etag = res_header
384            .get(header::ETAG)
385            .map(|x| x.to_str().unwrap_or(""))
386            .unwrap_or("")
387            .replace("\"", "");
388        let size: usize = res_header
389            .get(header::CONTENT_LENGTH)
390            .map(|x| x.to_str().unwrap_or("0").parse().unwrap_or(0))
391            .unwrap_or(0);
392        let last_modified = res_header
393            .get(header::LAST_MODIFIED)
394            .map(|x| x.to_str().unwrap_or(""))
395            .unwrap_or("")
396            .to_owned();
397        let content_type = res_header
398            .get(header::CONTENT_TYPE)
399            .map(|x| x.to_str().unwrap_or(""))
400            .unwrap_or("")
401            .to_owned();
402        let version_id = res_header
403            .get("x-amz-version-id")
404            .map(|x| x.to_str().unwrap_or(""))
405            .unwrap_or("")
406            .to_owned();
407        let mut metadata = HashMap::new();
408        res_header.into_iter().for_each(|(k, v)| {
409            let key = k.as_str();
410            if key.starts_with("x-amz-meta-") {
411                if let Ok(value) = String::from_utf8(v.as_bytes().to_vec()) {
412                    metadata.insert(key[11..].to_string(), value.to_owned());
413                }
414            }
415        });
416        Ok(Some(ObjectStat {
417            bucket_name,
418            object_name,
419            last_modified,
420            etag,
421            content_type,
422            version_id,
423            size,
424            metadata,
425        }))
426    }
427
428    /// Get the access control list (ACL) of an object.
429    pub async fn get_object_acl<B, K>(&self, bucket: B, key: K) -> Result<AccessControlPolicy>
430    where
431        B: Into<BucketArgs>,
432        K: Into<KeyArgs>,
433    {
434        let bucket: BucketArgs = bucket.into();
435        let key: KeyArgs = key.into();
436        self._object_executor(Method::GET, bucket, key, false, false)?
437            .query("acl", "")
438            .send_xml_ok()
439            .await
440    }
441
442    /// Returns true if legal hold is enabled on an object.
443    pub async fn is_object_legal_hold_enabled<B, K>(&self, bucket: B, key: K) -> Result<bool>
444    where
445        B: Into<BucketArgs>,
446        K: Into<KeyArgs>,
447    {
448        let bucket: BucketArgs = bucket.into();
449        let key: KeyArgs = key.into();
450        let result = self
451            ._object_executor(Method::GET, bucket, key, false, false)?
452            .query("legal-hold", "")
453            .send_xml_ok::<LegalHold>()
454            .await;
455        match result {
456            Ok(l) => Ok(l.status == LegalHoldStatus::ON),
457            // Ok(Err(err)) => Err(err.into()),
458            Err(Error::S3Error(s)) => {
459                if s.code == "NoSuchObjectLockConfiguration" {
460                    return Ok(false);
461                } else {
462                    Err(Error::S3Error(s))
463                }
464            }
465            Err(err) => Err(err),
466        }
467    }
468
469    /// Enables legal hold on an object.
470    pub async fn enable_object_legal_hold_enabled<B, K>(&self, bucket: B, key: K) -> Result<()>
471    where
472        B: Into<BucketArgs>,
473        K: Into<KeyArgs>,
474    {
475        let bucket: BucketArgs = bucket.into();
476        let key: KeyArgs = key.into();
477        let legal_hold: LegalHold = LegalHold {
478            status: LegalHoldStatus::ON,
479        };
480        self._object_executor(Method::PUT, bucket, key, false, false)?
481            .query("legal-hold", "")
482            .xml(&legal_hold)
483            .send_ok()
484            .await
485            .map(|_| ())
486    }
487
488    /// Disables legal hold on an object.
489    pub async fn disable_object_legal_hold_enabled<B, K>(&self, bucket: B, key: K) -> Result<()>
490    where
491        B: Into<BucketArgs>,
492        K: Into<KeyArgs>,
493    {
494        let bucket: BucketArgs = bucket.into();
495        let key: KeyArgs = key.into();
496        let legal_hold: LegalHold = LegalHold {
497            status: LegalHoldStatus::OFF,
498        };
499        self._object_executor(Method::PUT, bucket, key, false, false)?
500            .query("legal-hold", "")
501            .xml(&legal_hold)
502            .send_ok()
503            .await
504            .map(|_| ())
505    }
506
507    /// Get [Tags] of an object.
508    /// ## Example
509    /// ```rust
510    /// # use minio_rsc::Minio;
511    /// # use minio_rsc::error::Result;
512    /// use minio_rsc::client::Tags;
513    ///
514    /// # async fn example(minio: Minio)->Result<()>{
515    /// let tags: Tags = minio.get_object_tags("bucket", "file.txt").await?;
516    /// # Ok(())
517    /// # }
518    /// ```
519    pub async fn get_object_tags<B, K>(&self, bucket: B, key: K) -> Result<Tags>
520    where
521        B: Into<BucketArgs>,
522        K: Into<KeyArgs>,
523    {
524        let bucket: BucketArgs = bucket.into();
525        let key: KeyArgs = key.into();
526        self._object_executor(Method::GET, bucket, key, false, false)?
527            .query("tagging", "")
528            .send_xml_ok()
529            .await
530    }
531
532    /// Set [Tags] of an object.
533    /// ## Example
534    /// ```rust
535    /// # use minio_rsc::Minio;
536    /// # use minio_rsc::error::Result;
537    /// use minio_rsc::client::Tags;
538    ///
539    /// # async fn example(minio: Minio)->Result<()>{
540    /// let mut tags: Tags = Tags::new();
541    /// tags.insert("key1", "value1")
542    ///     .insert("key2", "value2");
543    /// minio.set_object_tags("bucket", "file.txt", tags).await?;
544    /// # Ok(())
545    /// # }
546    /// ```
547    pub async fn set_object_tags<B, K, T>(&self, bucket: B, key: K, tags: T) -> Result<()>
548    where
549        B: Into<BucketArgs>,
550        K: Into<KeyArgs>,
551        T: Into<Tags>,
552    {
553        let bucket: BucketArgs = bucket.into();
554        let key: KeyArgs = key.into();
555        self._object_executor(Method::PUT, bucket, key, false, false)?
556            .query("tagging", "")
557            .xml(&tags.into())
558            .send_ok()
559            .await
560            .map(|_| ())
561    }
562
563    /// Delete tags of an object.
564    /// ## Example
565    /// ```rust
566    /// # use minio_rsc::Minio;
567    /// # use minio_rsc::error::Result;
568    /// # async fn example(minio: Minio)->Result<()>{
569    /// minio.del_object_tags("bucket", "file.txt").await?;
570    /// # Ok(())
571    /// # }
572    /// ```
573    pub async fn del_object_tags<B, K>(&self, bucket: B, key: K) -> Result<()>
574    where
575        B: Into<BucketArgs>,
576        K: Into<KeyArgs>,
577    {
578        let bucket: BucketArgs = bucket.into();
579        let key: KeyArgs = key.into();
580        self._object_executor(Method::DELETE, bucket, key, false, false)?
581            .query("tagging", "")
582            .send_ok()
583            .await
584            .map(|_| ())
585    }
586
587    /// Get [Retention] of an object.
588    pub async fn get_object_retention<B, K>(&self, bucket: B, key: K) -> Result<Retention>
589    where
590        B: Into<BucketArgs>,
591        K: Into<KeyArgs>,
592    {
593        let bucket: BucketArgs = bucket.into();
594        let key: KeyArgs = key.into();
595        self._object_executor(Method::GET, bucket, key, false, false)?
596            .query("retention", "")
597            .send_xml_ok()
598            .await
599    }
600
601    /// Set [Retention] of an object.
602    pub async fn set_object_retention<B, K>(
603        &self,
604        bucket: B,
605        key: K,
606        retention: Retention,
607    ) -> Result<()>
608    where
609        B: Into<BucketArgs>,
610        K: Into<KeyArgs>,
611    {
612        let bucket: BucketArgs = bucket.into();
613        let key: KeyArgs = key.into();
614        self._object_executor(Method::PUT, bucket, key, false, false)?
615            .query("retention", "")
616            .xml(&retention)
617            .send_ok()
618            .await
619            .map(|_| ())
620    }
621
622    /// Filters the contents of an object based on a simple structured query language (SQL) statement.
623    /// ## Example
624    /// ```rust
625    /// # use minio_rsc::Minio;
626    /// # use minio_rsc::error::Result;
627    /// use minio_rsc::datatype::{SelectRequest,InputSerialization,CsvInput,CompressionType,JsonOutput};
628    ///     # async fn example(client:Minio) -> Result<()>{
629    /// let input_serialization = InputSerialization::new(CsvInput::default(), CompressionType::NONE);
630    /// let output_serialization = JsonOutput::default().into();
631    /// let req = SelectRequest::new(
632    ///     "Select * from s3object where s3object._1>100".to_owned(),
633    ///     input_serialization,
634    ///     output_serialization,
635    ///     true,
636    ///     None,
637    ///     None);
638    /// let reader = client.select_object_content("bucket", "example.csv", req).await?;
639    /// let data = reader.read_all().await?;
640    /// # Ok(())
641    /// # }
642    /// ```
643    pub async fn select_object_content<B, K>(
644        &self,
645        bucket: B,
646        key: K,
647        request: SelectRequest,
648    ) -> Result<SelectObjectReader>
649    where
650        B: Into<BucketArgs>,
651        K: Into<KeyArgs>,
652    {
653        let bucket: BucketArgs = bucket.into();
654        let key: KeyArgs = key.into();
655        self._object_executor(Method::POST, bucket, key, true, false)?
656            .query_string("select&select-type=2")
657            .xml(&request)
658            .send_ok()
659            .await
660            .map(|res| SelectObjectReader::new(res, request.output_serialization))
661    }
662}