s3/
bucket.rs

1#[cfg(feature = "blocking")]
2use block_on_proc::block_on;
3#[cfg(feature = "tags")]
4use minidom::Element;
5use std::collections::HashMap;
6use std::time::Duration;
7
8use crate::bucket_ops::{BucketConfiguration, CreateBucketResponse};
9use crate::command::{Command, Multipart};
10use crate::creds::Credentials;
11use crate::region::Region;
12use crate::request::ResponseData;
13#[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
14use crate::request::ResponseDataStream;
15use std::str::FromStr;
16use std::sync::{Arc, RwLock};
17
18pub type Query = HashMap<String, String>;
19
20#[cfg(feature = "with-async-std")]
21use crate::request::async_std_backend::SurfRequest as RequestImpl;
22#[cfg(feature = "with-tokio")]
23use crate::request::tokio_backend::Reqwest as RequestImpl;
24
25#[cfg(feature = "with-async-std")]
26use futures_io::AsyncWrite;
27#[cfg(feature = "with-tokio")]
28use tokio::io::AsyncWrite;
29
30#[cfg(feature = "sync")]
31use crate::request::blocking::AttoRequest as RequestImpl;
32use std::io::Read;
33
34#[cfg(feature = "with-tokio")]
35use tokio::io::AsyncRead;
36
37#[cfg(feature = "with-async-std")]
38use futures::io::AsyncRead;
39
40use crate::error::S3Error;
41use crate::request::Request;
42use crate::serde_types::{
43    BucketLocationResult, CompleteMultipartUploadData, HeadObjectResult,
44    InitiateMultipartUploadResponse, ListBucketResult, ListMultipartUploadsResult, Part,
45};
46use crate::utils::error_from_response_data;
47use http::header::HeaderName;
48use http::HeaderMap;
49
50pub const CHUNK_SIZE: usize = 8_388_608; // 8 Mebibytes, min is 5 (5_242_880);
51
52const DEFAULT_REQUEST_TIMEOUT: Option<Duration> = Some(Duration::from_secs(60));
53
54#[derive(Debug, PartialEq, Eq)]
55pub struct Tag {
56    key: String,
57    value: String,
58}
59
60impl Tag {
61    pub fn key(&self) -> String {
62        self.key.to_owned()
63    }
64
65    pub fn value(&self) -> String {
66        self.value.to_owned()
67    }
68}
69
70/// Instantiate an existing Bucket
71///
72/// # Example
73///
74/// ```no_run
75/// use s3::bucket::Bucket;
76/// use s3::creds::Credentials;
77///
78/// let bucket_name = "rust-s3-test";
79/// let region = "us-east-1".parse().unwrap();
80/// let credentials = Credentials::default().unwrap();
81///
82/// let bucket = Bucket::new(bucket_name, region, credentials);
83/// ```
84#[derive(Clone, Debug)]
85pub struct Bucket {
86    pub name: String,
87    pub region: Region,
88    pub credentials: Arc<RwLock<Credentials>>,
89    pub extra_headers: HeaderMap,
90    pub extra_query: Query,
91    pub request_timeout: Option<Duration>,
92    path_style: bool,
93    listobjects_v2: bool,
94}
95
96impl Bucket {
97    pub fn credentials_refresh(&self) -> Result<(), S3Error> {
98        Ok(self
99            .credentials
100            .try_write()
101            .map_err(|_| S3Error::WLCredentials)?
102            .refresh()?)
103    }
104}
105
106fn validate_expiry(expiry_secs: u32) -> Result<(), S3Error> {
107    if 604800 < expiry_secs {
108        return Err(S3Error::MaxExpiry(expiry_secs));
109    }
110    Ok(())
111}
112
113#[cfg_attr(all(feature = "with-tokio", feature = "blocking"), block_on("tokio"))]
114#[cfg_attr(
115    all(feature = "with-async-std", feature = "blocking"),
116    block_on("async-std")
117)]
118impl Bucket {
119    /// Get a presigned url for getting object on a given path
120    ///
121    /// # Example:
122    ///
123    /// ```no_run
124    /// use std::collections::HashMap;
125    /// use s3::bucket::Bucket;
126    /// use s3::creds::Credentials;
127    ///
128    /// let bucket_name = "rust-s3-test";
129    /// let region = "us-east-1".parse().unwrap();
130    /// let credentials = Credentials::default().unwrap();
131    /// let bucket = Bucket::new(bucket_name, region, credentials).unwrap();
132    ///
133    /// // Add optional custom queries
134    /// let mut custom_queries = HashMap::new();
135    /// custom_queries.insert(
136    ///    "response-content-disposition".into(),
137    ///    "attachment; filename=\"test.png\"".into(),
138    /// );
139    ///
140    /// let url = bucket.presign_get("/test.file", 86400, Some(custom_queries)).unwrap();
141    /// println!("Presigned url: {}", url);
142    /// ```
143    pub fn presign_get<S: AsRef<str>>(
144        &self,
145        path: S,
146        expiry_secs: u32,
147        custom_queries: Option<HashMap<String, String>>,
148    ) -> Result<String, S3Error> {
149        validate_expiry(expiry_secs)?;
150        let request = RequestImpl::new(
151            self,
152            path.as_ref(),
153            Command::PresignGet {
154                expiry_secs,
155                custom_queries,
156            },
157        )?;
158        request.presigned()
159    }
160
161    /// Get a presigned url for posting an object to a given path
162    ///
163    /// # Example:
164    ///
165    /// ```no_run
166    /// use s3::bucket::Bucket;
167    /// use s3::creds::Credentials;
168    /// use http::HeaderMap;
169    /// use http::header::HeaderName;
170    ///
171    /// let bucket_name = "rust-s3-test";
172    /// let region = "us-east-1".parse().unwrap();
173    /// let credentials = Credentials::default().unwrap();
174    /// let bucket = Bucket::new(bucket_name, region, credentials).unwrap();
175    ///
176    /// let post_policy = "eyAiZXhwaXJhdGlvbiI6ICIyMDE1LTEyLTMwVDEyOjAwOjAwLjAwMFoiLA0KICAiY29uZGl0aW9ucyI6IFsNCiAgICB7ImJ1Y2tldCI6ICJzaWd2NGV4YW1wbGVidWNrZXQifSwNCiAgICBbInN0YXJ0cy13aXRoIiwgIiRrZXkiLCAidXNlci91c2VyMS8iXSwNCiAgICB7ImFjbCI6ICJwdWJsaWMtcmVhZCJ9LA0KICAgIHsic3VjY2Vzc19hY3Rpb25fcmVkaXJlY3QiOiAiaHR0cDovL3NpZ3Y0ZXhhbXBsZWJ1Y2tldC5zMy5hbWF6b25hd3MuY29tL3N1Y2Nlc3NmdWxfdXBsb2FkLmh0bWwifSwNCiAgICBbInN0YXJ0cy13aXRoIiwgIiRDb250ZW50LVR5cGUiLCAiaW1hZ2UvIl0sDQogICAgeyJ4LWFtei1tZXRhLXV1aWQiOiAiMTQzNjUxMjM2NTEyNzQifSwNCiAgICB7IngtYW16LXNlcnZlci1zaWRlLWVuY3J5cHRpb24iOiAiQUVTMjU2In0sDQogICAgWyJzdGFydHMtd2l0aCIsICIkeC1hbXotbWV0YS10YWciLCAiIl0sDQoNCiAgICB7IngtYW16LWNyZWRlbnRpYWwiOiAiQUtJQUlPU0ZPRE5ON0VYQU1QTEUvMjAxNTEyMjkvdXMtZWFzdC0xL3MzL2F3czRfcmVxdWVzdCJ9LA0KICAgIHsieC1hbXotYWxnb3JpdGhtIjogIkFXUzQtSE1BQy1TSEEyNTYifSwNCiAgICB7IngtYW16LWRhdGUiOiAiMjAxNTEyMjlUMDAwMDAwWiIgfQ0KICBdDQp9";
177    ///
178    /// let url = bucket.presign_post("/test.file", 86400, post_policy.to_string()).unwrap();
179    /// println!("Presigned url: {}", url);
180    /// ```
181    pub fn presign_post<S: AsRef<str>>(
182        &self,
183        path: S,
184        expiry_secs: u32,
185        // base64 encoded post policy document -> https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-post-example.html
186        post_policy: String,
187    ) -> Result<String, S3Error> {
188        validate_expiry(expiry_secs)?;
189        let request = RequestImpl::new(
190            self,
191            path.as_ref(),
192            Command::PresignPost {
193                expiry_secs,
194                post_policy,
195            },
196        )?;
197        request.presigned()
198    }
199
200    /// Get a presigned url for putting object to a given path
201    ///
202    /// # Example:
203    ///
204    /// ```no_run
205    /// use s3::bucket::Bucket;
206    /// use s3::creds::Credentials;
207    /// use http::HeaderMap;
208    /// use http::header::HeaderName;
209    ///
210    /// let bucket_name = "rust-s3-test";
211    /// let region = "us-east-1".parse().unwrap();
212    /// let credentials = Credentials::default().unwrap();
213    /// let bucket = Bucket::new(bucket_name, region, credentials).unwrap();
214    ///
215    /// // Add optional custom headers
216    /// let mut custom_headers = HeaderMap::new();
217    /// custom_headers.insert(
218    ///    HeaderName::from_static("custom_header"),
219    ///    "custom_value".parse().unwrap(),
220    /// );
221    ///
222    /// let url = bucket.presign_put("/test.file", 86400, Some(custom_headers)).unwrap();
223    /// println!("Presigned url: {}", url);
224    /// ```
225    pub fn presign_put<S: AsRef<str>>(
226        &self,
227        path: S,
228        expiry_secs: u32,
229        custom_headers: Option<HeaderMap>,
230    ) -> Result<String, S3Error> {
231        validate_expiry(expiry_secs)?;
232        let request = RequestImpl::new(
233            self,
234            path.as_ref(),
235            Command::PresignPut {
236                expiry_secs,
237                custom_headers,
238            },
239        )?;
240        request.presigned()
241    }
242
243    /// Get a presigned url for deleting object on a given path
244    ///
245    /// # Example:
246    ///
247    /// ```no_run
248    /// use s3::bucket::Bucket;
249    /// use s3::creds::Credentials;
250    ///
251    /// let bucket_name = "rust-s3-test";
252    /// let region = "us-east-1".parse().unwrap();
253    /// let credentials = Credentials::default().unwrap();
254    /// let bucket = Bucket::new(bucket_name, region, credentials).unwrap();
255    ///
256    /// let url = bucket.presign_delete("/test.file", 86400).unwrap();
257    /// println!("Presigned url: {}", url);
258    /// ```
259    pub fn presign_delete<S: AsRef<str>>(
260        &self,
261        path: S,
262        expiry_secs: u32,
263    ) -> Result<String, S3Error> {
264        validate_expiry(expiry_secs)?;
265        let request =
266            RequestImpl::new(self, path.as_ref(), Command::PresignDelete { expiry_secs })?;
267        request.presigned()
268    }
269
270    /// Create a new `Bucket` and instantiate it
271    ///
272    /// ```no_run
273    /// use s3::{Bucket, BucketConfiguration};
274    /// use s3::creds::Credentials;
275    /// # use s3::region::Region;
276    /// use anyhow::Result;
277    ///
278    /// # #[tokio::main]
279    /// # async fn main() -> Result<()> {
280    /// let bucket_name = "rust-s3-test";
281    /// let region = "us-east-1".parse()?;
282    /// let credentials = Credentials::default()?;
283    /// let config = BucketConfiguration::default();
284    ///
285    /// // Async variant with `tokio` or `async-std` features
286    /// let create_bucket_response = Bucket::create(bucket_name, region, credentials, config).await?;
287    ///
288    /// // `sync` fature will produce an identical method
289    /// #[cfg(feature = "sync")]
290    /// let create_bucket_response = Bucket::create(bucket_name, region, credentials, config)?;
291    ///
292    /// # let region: Region = "us-east-1".parse()?;
293    /// # let credentials = Credentials::default()?;
294    /// # let config = BucketConfiguration::default();
295    /// // Blocking variant, generated with `blocking` feature in combination
296    /// // with `tokio` or `async-std` features.
297    /// #[cfg(feature = "blocking")]
298    /// let create_bucket_response = Bucket::create_blocking(bucket_name, region, credentials, config)?;
299    /// # Ok(())
300    /// # }
301    /// ```
302    #[maybe_async::maybe_async]
303    pub async fn create(
304        name: &str,
305        region: Region,
306        credentials: Credentials,
307        config: BucketConfiguration,
308    ) -> Result<CreateBucketResponse, S3Error> {
309        let mut config = config;
310        config.set_region(region.clone());
311        let command = Command::CreateBucket { config };
312        let bucket = Bucket::new(name, region, credentials)?;
313        let request = RequestImpl::new(&bucket, "", command)?;
314        let response_data = request.response_data(false).await?;
315        let response_text = response_data.as_str()?;
316        Ok(CreateBucketResponse {
317            bucket,
318            response_text: response_text.to_string(),
319            response_code: response_data.status_code(),
320        })
321    }
322
323    /// Create a new `Bucket` with path style and instantiate it
324    ///
325    /// ```no_run
326    /// use s3::{Bucket, BucketConfiguration};
327    /// use s3::creds::Credentials;
328    /// # use s3::region::Region;
329    /// use anyhow::Result;
330    ///
331    /// # #[tokio::main]
332    /// # async fn main() -> Result<()> {
333    /// let bucket_name = "rust-s3-test";
334    /// let region = "us-east-1".parse()?;
335    /// let credentials = Credentials::default()?;
336    /// let config = BucketConfiguration::default();
337    ///
338    /// // Async variant with `tokio` or `async-std` features
339    /// let create_bucket_response = Bucket::create_with_path_style(bucket_name, region, credentials, config).await?;
340    ///
341    /// // `sync` fature will produce an identical method
342    /// #[cfg(feature = "sync")]
343    /// let create_bucket_response = Bucket::create_with_path_style(bucket_name, region, credentials, config)?;
344    ///
345    /// # let region: Region = "us-east-1".parse()?;
346    /// # let credentials = Credentials::default()?;
347    /// # let config = BucketConfiguration::default();
348    /// // Blocking variant, generated with `blocking` feature in combination
349    /// // with `tokio` or `async-std` features.
350    /// #[cfg(feature = "blocking")]
351    /// let create_bucket_response = Bucket::create_with_path_style_blocking(bucket_name, region, credentials, config)?;
352    /// # Ok(())
353    /// # }
354    /// ```
355    #[maybe_async::maybe_async]
356    pub async fn create_with_path_style(
357        name: &str,
358        region: Region,
359        credentials: Credentials,
360        config: BucketConfiguration,
361    ) -> Result<CreateBucketResponse, S3Error> {
362        let mut config = config;
363        config.set_region(region.clone());
364        let command = Command::CreateBucket { config };
365        let bucket = Bucket::new(name, region, credentials)?.with_path_style();
366        let request = RequestImpl::new(&bucket, "", command)?;
367        let response_data = request.response_data(false).await?;
368        let response_text = response_data.to_string()?;
369        Ok(CreateBucketResponse {
370            bucket,
371            response_text,
372            response_code: response_data.status_code(),
373        })
374    }
375
376    /// Delete existing `Bucket`
377    ///
378    /// # Example
379    /// ```rust,no_run
380    /// use s3::Bucket;
381    /// use s3::creds::Credentials;
382    /// use anyhow::Result;
383    ///
384    /// # #[tokio::main]
385    /// # async fn main() -> Result<()> {
386    /// let bucket_name = "rust-s3-test";
387    /// let region = "us-east-1".parse().unwrap();
388    /// let credentials = Credentials::default().unwrap();
389    /// let bucket = Bucket::new(bucket_name, region, credentials).unwrap();
390    ///
391    /// // Async variant with `tokio` or `async-std` features
392    /// bucket.delete().await.unwrap();
393    /// // `sync` fature will produce an identical method
394    ///
395    /// #[cfg(feature = "sync")]
396    /// bucket.delete().unwrap();
397    /// // Blocking variant, generated with `blocking` feature in combination
398    /// // with `tokio` or `async-std` features.
399    ///
400    /// #[cfg(feature = "blocking")]
401    /// bucket.delete_blocking().unwrap();
402    ///
403    /// # Ok(())
404    /// # }
405    /// ```
406    #[maybe_async::maybe_async]
407    pub async fn delete(&self) -> Result<u16, S3Error> {
408        let command = Command::DeleteBucket;
409        let request = RequestImpl::new(self, "", command)?;
410        let response_data = request.response_data(false).await?;
411        Ok(response_data.status_code())
412    }
413
414    /// Instantiate an existing `Bucket`.
415    ///
416    /// # Example
417    /// ```no_run
418    /// use s3::bucket::Bucket;
419    /// use s3::creds::Credentials;
420    ///
421    /// // Fake  credentials so we don't access user's real credentials in tests
422    /// let bucket_name = "rust-s3-test";
423    /// let region = "us-east-1".parse().unwrap();
424    /// let credentials = Credentials::default().unwrap();
425    ///
426    /// let bucket = Bucket::new(bucket_name, region, credentials).unwrap();
427    /// ```
428    pub fn new(name: &str, region: Region, credentials: Credentials) -> Result<Bucket, S3Error> {
429        Ok(Bucket {
430            name: name.into(),
431            region,
432            credentials: Arc::new(RwLock::new(credentials)),
433            extra_headers: HeaderMap::new(),
434            extra_query: HashMap::new(),
435            request_timeout: DEFAULT_REQUEST_TIMEOUT,
436            path_style: false,
437            listobjects_v2: true,
438        })
439    }
440
441    /// Instantiate a public existing `Bucket`.
442    ///
443    /// # Example
444    /// ```no_run
445    /// use s3::bucket::Bucket;
446    ///
447    /// let bucket_name = "rust-s3-test";
448    /// let region = "us-east-1".parse().unwrap();
449    ///
450    /// let bucket = Bucket::new_public(bucket_name, region).unwrap();
451    /// ```
452    pub fn new_public(name: &str, region: Region) -> Result<Bucket, S3Error> {
453        Ok(Bucket {
454            name: name.into(),
455            region,
456            credentials: Arc::new(RwLock::new(Credentials::anonymous()?)),
457            extra_headers: HeaderMap::new(),
458            extra_query: HashMap::new(),
459            request_timeout: DEFAULT_REQUEST_TIMEOUT,
460            path_style: false,
461            listobjects_v2: true,
462        })
463    }
464
465    pub fn with_path_style(&self) -> Bucket {
466        Bucket {
467            name: self.name.clone(),
468            region: self.region.clone(),
469            credentials: self.credentials.clone(),
470            extra_headers: self.extra_headers.clone(),
471            extra_query: self.extra_query.clone(),
472            request_timeout: self.request_timeout,
473            path_style: true,
474            listobjects_v2: self.listobjects_v2,
475        }
476    }
477
478    pub fn with_extra_headers(&self, extra_headers: HeaderMap) -> Bucket {
479        Bucket {
480            name: self.name.clone(),
481            region: self.region.clone(),
482            credentials: self.credentials.clone(),
483            extra_headers,
484            extra_query: self.extra_query.clone(),
485            request_timeout: self.request_timeout,
486            path_style: self.path_style,
487            listobjects_v2: self.listobjects_v2,
488        }
489    }
490
491    pub fn with_extra_query(&self, extra_query: HashMap<String, String>) -> Bucket {
492        Bucket {
493            name: self.name.clone(),
494            region: self.region.clone(),
495            credentials: self.credentials.clone(),
496            extra_headers: self.extra_headers.clone(),
497            extra_query,
498            request_timeout: self.request_timeout,
499            path_style: self.path_style,
500            listobjects_v2: self.listobjects_v2,
501        }
502    }
503
504    pub fn with_request_timeout(&self, request_timeout: Duration) -> Bucket {
505        Bucket {
506            name: self.name.clone(),
507            region: self.region.clone(),
508            credentials: self.credentials.clone(),
509            extra_headers: self.extra_headers.clone(),
510            extra_query: self.extra_query.clone(),
511            request_timeout: Some(request_timeout),
512            path_style: self.path_style,
513            listobjects_v2: self.listobjects_v2,
514        }
515    }
516
517    pub fn with_listobjects_v1(&self) -> Bucket {
518        Bucket {
519            name: self.name.clone(),
520            region: self.region.clone(),
521            credentials: self.credentials.clone(),
522            extra_headers: self.extra_headers.clone(),
523            extra_query: self.extra_query.clone(),
524            request_timeout: self.request_timeout,
525            path_style: self.path_style,
526            listobjects_v2: false,
527        }
528    }
529
530    /// Copy file from an S3 path, internally within the same bucket.
531    ///
532    /// # Example:
533    ///
534    /// ```rust,no_run
535    /// use s3::bucket::Bucket;
536    /// use s3::creds::Credentials;
537    /// use anyhow::Result;
538    ///
539    /// # #[tokio::main]
540    /// # async fn main() -> Result<()> {
541    ///
542    /// let bucket_name = "rust-s3-test";
543    /// let region = "us-east-1".parse()?;
544    /// let credentials = Credentials::default()?;
545    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
546    ///
547    /// // Async variant with `tokio` or `async-std` features
548    /// let code = bucket.copy_object_internal("/from.file", "/to.file").await?;
549    ///
550    /// // `sync` feature will produce an identical method
551    /// #[cfg(feature = "sync")]
552    /// let code = bucket.copy_object_internal("/from.file", "/to.file")?;
553    ///
554    /// # Ok(())
555    /// # }
556    /// ```
557    #[maybe_async::maybe_async]
558    pub async fn copy_object_internal<F: AsRef<str>, T: AsRef<str>>(
559        &self,
560        from: F,
561        to: T,
562    ) -> Result<u16, S3Error> {
563        let fq_from = {
564            let from = from.as_ref();
565            let from = from.strip_prefix('/').unwrap_or(from);
566            format!("{bucket}/{path}", bucket = self.name(), path = from)
567        };
568        self.copy_object(fq_from, to).await
569    }
570
571    #[maybe_async::maybe_async]
572    async fn copy_object<F: AsRef<str>, T: AsRef<str>>(
573        &self,
574        from: F,
575        to: T,
576    ) -> Result<u16, S3Error> {
577        let command = Command::CopyObject {
578            from: from.as_ref(),
579        };
580        let request = RequestImpl::new(self, to.as_ref(), command)?;
581        let response_data = request.response_data(false).await?;
582        Ok(response_data.status_code())
583    }
584
585    /// Gets file from an S3 path.
586    ///
587    /// # Example:
588    ///
589    /// ```rust,no_run
590    /// use s3::bucket::Bucket;
591    /// use s3::creds::Credentials;
592    /// use anyhow::Result;
593    ///
594    /// # #[tokio::main]
595    /// # async fn main() -> Result<()> {
596    ///
597    /// let bucket_name = "rust-s3-test";
598    /// let region = "us-east-1".parse()?;
599    /// let credentials = Credentials::default()?;
600    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
601    ///
602    /// // Async variant with `tokio` or `async-std` features
603    /// let response_data = bucket.get_object("/test.file").await?;
604    ///
605    /// // `sync` feature will produce an identical method
606    /// #[cfg(feature = "sync")]
607    /// let response_data = bucket.get_object("/test.file")?;
608    ///
609    /// // Blocking variant, generated with `blocking` feature in combination
610    /// // with `tokio` or `async-std` features.
611    /// #[cfg(feature = "blocking")]
612    /// let response_data = bucket.get_object_blocking("/test.file")?;
613    /// # Ok(())
614    /// # }
615    /// ```
616    #[maybe_async::maybe_async]
617    pub async fn get_object<S: AsRef<str>>(&self, path: S) -> Result<ResponseData, S3Error> {
618        let command = Command::GetObject;
619        let request = RequestImpl::new(self, path.as_ref(), command)?;
620        request.response_data(false).await
621    }
622
623    /// Gets torrent from an S3 path.
624    ///
625    /// # Example:
626    ///
627    /// ```rust,no_run
628    /// use s3::bucket::Bucket;
629    /// use s3::creds::Credentials;
630    /// use anyhow::Result;
631    ///
632    /// # #[tokio::main]
633    /// # async fn main() -> Result<()> {
634    ///
635    /// let bucket_name = "rust-s3-test";
636    /// let region = "us-east-1".parse()?;
637    /// let credentials = Credentials::default()?;
638    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
639    ///
640    /// // Async variant with `tokio` or `async-std` features
641    /// let response_data = bucket.get_object_torrent("/test.file").await?;
642    ///
643    /// // `sync` feature will produce an identical method
644    /// #[cfg(feature = "sync")]
645    /// let response_data = bucket.get_object_torrent("/test.file")?;
646    ///
647    /// // Blocking variant, generated with `blocking` feature in combination
648    /// // with `tokio` or `async-std` features.
649    /// #[cfg(feature = "blocking")]
650    /// let response_data = bucket.get_object_torrent_blocking("/test.file")?;
651    /// # Ok(())
652    /// # }
653    /// ```
654    #[maybe_async::maybe_async]
655    pub async fn get_object_torrent<S: AsRef<str>>(
656        &self,
657        path: S,
658    ) -> Result<ResponseData, S3Error> {
659        let command = Command::GetObjectTorrent;
660        let request = RequestImpl::new(self, path.as_ref(), command)?;
661        request.response_data(false).await
662    }
663
664    /// Gets specified inclusive byte range of file from an S3 path.
665    ///
666    /// # Example:
667    ///
668    /// ```rust,no_run
669    /// use s3::bucket::Bucket;
670    /// use s3::creds::Credentials;
671    /// use anyhow::Result;
672    ///
673    /// # #[tokio::main]
674    /// # async fn main() -> Result<()> {
675    ///
676    /// let bucket_name = "rust-s3-test";
677    /// let region = "us-east-1".parse()?;
678    /// let credentials = Credentials::default()?;
679    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
680    ///
681    /// // Async variant with `tokio` or `async-std` features
682    /// let response_data = bucket.get_object_range("/test.file", 0, Some(31)).await?;
683    ///
684    /// // `sync` feature will produce an identical method
685    /// #[cfg(feature = "sync")]
686    /// let response_data = bucket.get_object_range("/test.file", 0, Some(31))?;
687    ///
688    /// // Blocking variant, generated with `blocking` feature in combination
689    /// // with `tokio` or `async-std` features.
690    /// #[cfg(feature = "blocking")]
691    /// let response_data = bucket.get_object_range_blocking("/test.file", 0, Some(31))?;
692    /// #
693    /// # Ok(())
694    /// # }
695    /// ```
696    #[maybe_async::maybe_async]
697    pub async fn get_object_range<S: AsRef<str>>(
698        &self,
699        path: S,
700        start: u64,
701        end: Option<u64>,
702    ) -> Result<ResponseData, S3Error> {
703        if let Some(end) = end {
704            assert!(start < end);
705        }
706
707        let command = Command::GetObjectRange { start, end };
708        let request = RequestImpl::new(self, path.as_ref(), command)?;
709        request.response_data(false).await
710    }
711
712    /// Stream range of bytes from S3 path to a local file, generic over T: Write.
713    ///
714    /// # Example:
715    ///
716    /// ```rust,no_run
717    /// use s3::bucket::Bucket;
718    /// use s3::creds::Credentials;
719    /// use anyhow::Result;
720    /// use std::fs::File;
721    ///
722    /// # #[tokio::main]
723    /// # async fn main() -> Result<()> {
724    ///
725    /// let bucket_name = "rust-s3-test";
726    /// let region = "us-east-1".parse()?;
727    /// let credentials = Credentials::default()?;
728    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
729    /// let mut output_file = File::create("output_file").expect("Unable to create file");
730    /// let mut async_output_file = tokio::fs::File::create("async_output_file").await.expect("Unable to create file");
731    /// #[cfg(feature = "with-async-std")]
732    /// let mut async_output_file = async_std::fs::File::create("async_output_file").await.expect("Unable to create file");
733    ///
734    /// let start = 0;
735    /// let end = Some(1024);
736    ///
737    /// // Async variant with `tokio` or `async-std` features
738    /// let status_code = bucket.get_object_range_to_writer("/test.file", start, end, &mut async_output_file).await?;
739    ///
740    /// // `sync` feature will produce an identical method
741    /// #[cfg(feature = "sync")]
742    /// let status_code = bucket.get_object_range_to_writer("/test.file", start, end, &mut output_file)?;
743    ///
744    /// // Blocking variant, generated with `blocking` feature in combination
745    /// // with `tokio` or `async-std` features. Based of the async branch
746    /// #[cfg(feature = "blocking")]
747    /// let status_code = bucket.get_object_range_to_writer_blocking("/test.file", start, end, &mut async_output_file)?;
748    /// #
749    /// # Ok(())
750    /// # }
751    /// ```
752    #[maybe_async::async_impl]
753    pub async fn get_object_range_to_writer<T: AsyncWrite + Send + Unpin, S: AsRef<str>>(
754        &self,
755        path: S,
756        start: u64,
757        end: Option<u64>,
758        writer: &mut T,
759    ) -> Result<u16, S3Error> {
760        if let Some(end) = end {
761            assert!(start < end);
762        }
763
764        let command = Command::GetObjectRange { start, end };
765        let request = RequestImpl::new(self, path.as_ref(), command)?;
766        request.response_data_to_writer(writer).await
767    }
768
769    #[maybe_async::sync_impl]
770    pub async fn get_object_range_to_writer<T: std::io::Write + Send, S: AsRef<str>>(
771        &self,
772        path: S,
773        start: u64,
774        end: Option<u64>,
775        writer: &mut T,
776    ) -> Result<u16, S3Error> {
777        if let Some(end) = end {
778            assert!(start < end);
779        }
780
781        let command = Command::GetObjectRange { start, end };
782        let request = RequestImpl::new(self, path.as_ref(), command)?;
783        request.response_data_to_writer(writer)
784    }
785
786    /// Stream file from S3 path to a local file, generic over T: Write.
787    ///
788    /// # Example:
789    ///
790    /// ```rust,no_run
791    /// use s3::bucket::Bucket;
792    /// use s3::creds::Credentials;
793    /// use anyhow::Result;
794    /// use std::fs::File;
795    ///
796    /// # #[tokio::main]
797    /// # async fn main() -> Result<()> {
798    ///
799    /// let bucket_name = "rust-s3-test";
800    /// let region = "us-east-1".parse()?;
801    /// let credentials = Credentials::default()?;
802    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
803    /// let mut output_file = File::create("output_file").expect("Unable to create file");
804    /// let mut async_output_file = tokio::fs::File::create("async_output_file").await.expect("Unable to create file");
805    /// #[cfg(feature = "with-async-std")]
806    /// let mut async_output_file = async_std::fs::File::create("async_output_file").await.expect("Unable to create file");
807    ///
808    /// // Async variant with `tokio` or `async-std` features
809    /// let status_code = bucket.get_object_to_writer("/test.file", &mut async_output_file).await?;
810    ///
811    /// // `sync` feature will produce an identical method
812    /// #[cfg(feature = "sync")]
813    /// let status_code = bucket.get_object_to_writer("/test.file", &mut output_file)?;
814    ///
815    /// // Blocking variant, generated with `blocking` feature in combination
816    /// // with `tokio` or `async-std` features. Based of the async branch
817    /// #[cfg(feature = "blocking")]
818    /// let status_code = bucket.get_object_to_writer_blocking("/test.file", &mut async_output_file)?;
819    /// #
820    /// # Ok(())
821    /// # }
822    /// ```
823    #[maybe_async::async_impl]
824    pub async fn get_object_to_writer<T: AsyncWrite + Send + Unpin, S: AsRef<str>>(
825        &self,
826        path: S,
827        writer: &mut T,
828    ) -> Result<u16, S3Error> {
829        let command = Command::GetObject;
830        let request = RequestImpl::new(self, path.as_ref(), command)?;
831        request.response_data_to_writer(writer).await
832    }
833
834    #[maybe_async::sync_impl]
835    pub fn get_object_to_writer<T: std::io::Write + Send, S: AsRef<str>>(
836        &self,
837        path: S,
838        writer: &mut T,
839    ) -> Result<u16, S3Error> {
840        let command = Command::GetObject;
841        let request = RequestImpl::new(self, path.as_ref(), command)?;
842        request.response_data_to_writer(writer)
843    }
844
845    /// Stream file from S3 path to a local file using an async stream.
846    ///
847    /// # Example
848    ///
849    /// ```rust,no_run
850    /// use s3::bucket::Bucket;
851    /// use s3::creds::Credentials;
852    /// use anyhow::Result;
853    /// #[cfg(feature = "with-tokio")]
854    /// use tokio_stream::StreamExt;
855    /// #[cfg(feature = "with-tokio")]
856    /// use tokio::io::AsyncWriteExt;
857    /// #[cfg(feature = "with-async-std")]
858    /// use futures_util::StreamExt;
859    /// #[cfg(feature = "with-async-std")]
860    /// use futures_util::AsyncWriteExt;
861    ///
862    /// # #[tokio::main]
863    /// # async fn main() -> Result<()> {
864    ///
865    /// let bucket_name = "rust-s3-test";
866    /// let region = "us-east-1".parse()?;
867    /// let credentials = Credentials::default()?;
868    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
869    /// let path = "path";
870    ///
871    /// let mut response_data_stream = bucket.get_object_stream(path).await?;
872    ///
873    /// #[cfg(feature = "with-tokio")]
874    /// let mut async_output_file = tokio::fs::File::create("async_output_file").await.expect("Unable to create file");
875    /// #[cfg(feature = "with-async-std")]
876    /// let mut async_output_file = async_std::fs::File::create("async_output_file").await.expect("Unable to create file");
877    ///
878    /// while let Some(chunk) = response_data_stream.bytes().next().await {
879    ///     async_output_file.write_all(&chunk).await?;
880    /// }
881    ///
882    /// #
883    /// # Ok(())
884    /// # }
885    /// ```
886    #[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
887    pub async fn get_object_stream<S: AsRef<str>>(
888        &self,
889        path: S,
890    ) -> Result<ResponseDataStream, S3Error> {
891        let command = Command::GetObject;
892        let request = RequestImpl::new(self, path.as_ref(), command)?;
893        request.response_data_to_stream().await
894    }
895
896    /// Stream file from local path to s3, generic over T: Write.
897    ///
898    /// # Example:
899    ///
900    /// ```rust,no_run
901    /// use s3::bucket::Bucket;
902    /// use s3::creds::Credentials;
903    /// use anyhow::Result;
904    /// use std::fs::File;
905    /// use std::io::Write;
906    ///
907    /// # #[tokio::main]
908    /// # async fn main() -> Result<()> {
909    ///
910    /// let bucket_name = "rust-s3-test";
911    /// let region = "us-east-1".parse()?;
912    /// let credentials = Credentials::default()?;
913    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
914    /// let path = "path";
915    /// let test: Vec<u8> = (0..1000).map(|_| 42).collect();
916    /// let mut file = File::create(path)?;
917    /// // tokio open file
918    /// let mut async_output_file = tokio::fs::File::create("async_output_file").await.expect("Unable to create file");
919    /// file.write_all(&test)?;
920    ///
921    /// // Generic over std::io::Read
922    /// #[cfg(feature = "with-tokio")]
923    /// let status_code = bucket.put_object_stream(&mut async_output_file, "/path").await?;
924    ///
925    ///
926    /// #[cfg(feature = "with-async-std")]
927    /// let mut async_output_file = async_std::fs::File::create("async_output_file").await.expect("Unable to create file");
928    ///
929    /// // `sync` feature will produce an identical method
930    /// #[cfg(feature = "sync")]
931    /// // Generic over std::io::Read
932    /// let status_code = bucket.put_object_stream(&mut path, "/path")?;
933    ///
934    /// // Blocking variant, generated with `blocking` feature in combination
935    /// // with `tokio` or `async-std` features.
936    /// #[cfg(feature = "blocking")]
937    /// let status_code = bucket.put_object_stream_blocking(&mut path, "/path")?;
938    /// #
939    /// # Ok(())
940    /// # }
941    /// ```
942    #[maybe_async::async_impl]
943    pub async fn put_object_stream<R: AsyncRead + Unpin>(
944        &self,
945        reader: &mut R,
946        s3_path: impl AsRef<str>,
947    ) -> Result<u16, S3Error> {
948        self._put_object_stream_with_content_type(
949            reader,
950            s3_path.as_ref(),
951            "application/octet-stream",
952        )
953        .await
954    }
955
956    #[maybe_async::async_impl]
957    pub async fn mc_put_object_stream<R: AsyncRead + Unpin>(
958        &self,
959        reader: &mut R,
960        s3_path: impl AsRef<str>,
961    ) -> Result<u16, S3Error> {
962        self._put_object_stream_with_content_type_in_sequence(
963            reader,
964            s3_path.as_ref(),
965            "application/octet-stream",
966        )
967        .await
968    }
969
970
971    #[maybe_async::sync_impl]
972    pub fn put_object_stream<R: Read>(
973        &self,
974        reader: &mut R,
975        s3_path: impl AsRef<str>,
976    ) -> Result<u16, S3Error> {
977        self._put_object_stream_with_content_type(
978            reader,
979            s3_path.as_ref(),
980            "application/octet-stream",
981        )
982    }
983
984    /// Stream file from local path to s3, generic over T: Write with explicit content type.
985    ///
986    /// # Example:
987    ///
988    /// ```rust,no_run
989    /// use s3::bucket::Bucket;
990    /// use s3::creds::Credentials;
991    /// use anyhow::Result;
992    /// use std::fs::File;
993    /// use std::io::Write;
994    ///
995    /// # #[tokio::main]
996    /// # async fn main() -> Result<()> {
997    ///
998    /// let bucket_name = "rust-s3-test";
999    /// let region = "us-east-1".parse()?;
1000    /// let credentials = Credentials::default()?;
1001    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
1002    /// let path = "path";
1003    /// let test: Vec<u8> = (0..1000).map(|_| 42).collect();
1004    /// let mut file = File::create(path)?;
1005    /// file.write_all(&test)?;
1006    ///
1007    /// #[cfg(feature = "with-tokio")]
1008    /// let mut async_output_file = tokio::fs::File::create("async_output_file").await.expect("Unable to create file");
1009    ///
1010    /// #[cfg(feature = "with-async-std")]
1011    /// let mut async_output_file = async_std::fs::File::create("async_output_file").await.expect("Unable to create file");
1012    ///
1013    /// // Async variant with `tokio` or `async-std` features
1014    /// // Generic over std::io::Read
1015    /// let status_code = bucket
1016    ///     .put_object_stream_with_content_type(&mut async_output_file, "/path", "application/octet-stream")
1017    ///     .await?;
1018    ///
1019    /// // `sync` feature will produce an identical method
1020    /// #[cfg(feature = "sync")]
1021    /// // Generic over std::io::Read
1022    /// let status_code = bucket
1023    ///     .put_object_stream_with_content_type(&mut path, "/path", "application/octet-stream")?;
1024    ///
1025    /// // Blocking variant, generated with `blocking` feature in combination
1026    /// // with `tokio` or `async-std` features.
1027    /// #[cfg(feature = "blocking")]
1028    /// let status_code = bucket
1029    ///     .put_object_stream_with_content_type_blocking(&mut path, "/path", "application/octet-stream")?;
1030    /// #
1031    /// # Ok(())
1032    /// # }
1033    /// ```
1034    #[maybe_async::async_impl]
1035    pub async fn put_object_stream_with_content_type<R: AsyncRead + Unpin>(
1036        &self,
1037        reader: &mut R,
1038        s3_path: impl AsRef<str>,
1039        content_type: impl AsRef<str>,
1040    ) -> Result<u16, S3Error> {
1041        self._put_object_stream_with_content_type(reader, s3_path.as_ref(), content_type.as_ref())
1042            .await
1043    }
1044
1045    #[maybe_async::sync_impl]
1046    pub fn put_object_stream_with_content_type<R: Read>(
1047        &self,
1048        reader: &mut R,
1049        s3_path: impl AsRef<str>,
1050        content_type: impl AsRef<str>,
1051    ) -> Result<u16, S3Error> {
1052        self._put_object_stream_with_content_type(reader, s3_path.as_ref(), content_type.as_ref())
1053    }
1054
1055    #[maybe_async::async_impl]
1056    async fn make_multipart_request(
1057        &self,
1058        path: &str,
1059        chunk: Vec<u8>,
1060        part_number: u32,
1061        upload_id: &str,
1062        content_type: &str,
1063    ) -> Result<ResponseData, S3Error> {
1064        let command = Command::PutObject {
1065            content: &chunk,
1066            multipart: Some(Multipart::new(part_number, upload_id)), // upload_id: &msg.upload_id,
1067            content_type,
1068        };
1069        let request = RequestImpl::new(self, path, command)?;
1070        request.response_data(true).await
1071    }
1072
1073    #[maybe_async::async_impl]
1074    async fn _put_object_stream_with_content_type<R: AsyncRead + Unpin>(
1075        &self,
1076        reader: &mut R,
1077        s3_path: &str,
1078        content_type: &str,
1079    ) -> Result<u16, S3Error> {
1080        // If the file is smaller CHUNK_SIZE, just do a regular upload.
1081        // Otherwise perform a multi-part upload.
1082        let first_chunk = crate::utils::read_chunk_async(reader).await?;
1083        if first_chunk.len() < CHUNK_SIZE {
1084            let response_data = self
1085                .put_object_with_content_type(s3_path, first_chunk.as_slice(), content_type)
1086                .await?;
1087            if response_data.status_code() >= 300 {
1088                return Err(error_from_response_data(response_data)?);
1089            }
1090            return Ok(response_data.status_code());
1091        }
1092
1093        let msg = self
1094            .initiate_multipart_upload(s3_path, content_type)
1095            .await?;
1096        let path = msg.key;
1097        let upload_id = &msg.upload_id;
1098
1099        let mut part_number: u32 = 0;
1100        let mut etags = Vec::new();
1101
1102        // Collect request handles
1103        let mut handles = vec![];
1104        loop {
1105            let chunk = if part_number == 0 {
1106                first_chunk.clone()
1107            } else {
1108                crate::utils::read_chunk_async(reader).await?
1109            };
1110
1111            let done = chunk.len() < CHUNK_SIZE;
1112
1113            // Start chunk upload
1114            part_number += 1;
1115            handles.push(self.make_multipart_request(
1116                &path,
1117                chunk,
1118                part_number,
1119                upload_id,
1120                content_type,
1121            ));
1122
1123            if done {
1124                break;
1125            }
1126        }
1127
1128        // Wait for all chunks to finish (or fail)
1129        let responses = futures::future::join_all(handles).await;
1130
1131        for response in responses {
1132            let response_data = response?;
1133            if !(200..300).contains(&response_data.status_code()) {
1134                // if chunk upload failed - abort the upload
1135                match self.abort_upload(&path, upload_id).await {
1136                    Ok(_) => {
1137                        return Err(error_from_response_data(response_data)?);
1138                    }
1139                    Err(error) => {
1140                        return Err(error);
1141                    }
1142                }
1143            }
1144
1145            let etag = response_data.as_str()?;
1146            etags.push(etag.to_string());
1147        }
1148
1149        // Finish the upload
1150        let inner_data = etags
1151            .clone()
1152            .into_iter()
1153            .enumerate()
1154            .map(|(i, x)| Part {
1155                etag: x,
1156                part_number: i as u32 + 1,
1157            })
1158            .collect::<Vec<Part>>();
1159        let response_data = self
1160            .complete_multipart_upload(&path, &msg.upload_id, inner_data)
1161            .await?;
1162
1163        Ok(response_data.status_code())
1164    }
1165
1166    #[maybe_async::async_impl]
1167    async fn _put_object_stream_with_content_type_in_sequence<R: AsyncRead + Unpin>(
1168        &self,
1169        reader: &mut R,
1170        s3_path: &str,
1171        content_type: &str,
1172    ) -> Result<u16, S3Error> {
1173        // If the file is smaller CHUNK_SIZE, just do a regular upload.
1174        // Otherwise perform a multi-part upload.
1175        let first_chunk = crate::utils::read_chunk_async(reader).await?;
1176        if first_chunk.len() < CHUNK_SIZE {
1177            let response_data = self
1178                .put_object_with_content_type(s3_path, first_chunk.as_slice(), content_type)
1179                .await?;
1180            if response_data.status_code() >= 300 {
1181                return Err(error_from_response_data(response_data)?);
1182            }
1183            return Ok(response_data.status_code());
1184        }
1185
1186        let msg = self
1187            .initiate_multipart_upload(s3_path, content_type)
1188            .await?;
1189        let path = msg.key;
1190        let upload_id = &msg.upload_id;
1191
1192        let mut part_number: u32 = 0;
1193        let mut etags = Vec::new();
1194
1195        // Collect request handles
1196        let mut responses = vec![];
1197        loop {
1198            let chunk = if part_number == 0 {
1199                first_chunk.clone()
1200            } else {
1201                crate::utils::read_chunk_async(reader).await?
1202            };
1203
1204            let done = chunk.len() < CHUNK_SIZE;
1205
1206            // Start chunk upload
1207            part_number += 1;
1208            responses.push(self.make_multipart_request(
1209                &path,
1210                chunk,
1211                part_number,
1212                upload_id,
1213                content_type,
1214            ).await);
1215
1216            if done {
1217                break;
1218            }
1219        }
1220
1221        for response in responses {
1222            let response_data = response?;
1223            if !(200..300).contains(&response_data.status_code()) {
1224                // if chunk upload failed - abort the upload
1225                match self.abort_upload(&path, upload_id).await {
1226                    Ok(_) => {
1227                        return Err(error_from_response_data(response_data)?);
1228                    }
1229                    Err(error) => {
1230                        return Err(error);
1231                    }
1232                }
1233            }
1234
1235            let etag = response_data.as_str()?;
1236            etags.push(etag.to_string());
1237        }
1238
1239        // Finish the upload
1240        let inner_data = etags
1241            .clone()
1242            .into_iter()
1243            .enumerate()
1244            .map(|(i, x)| Part {
1245                etag: x,
1246                part_number: i as u32 + 1,
1247            })
1248            .collect::<Vec<Part>>();
1249        let response_data = self
1250            .complete_multipart_upload(&path, &msg.upload_id, inner_data)
1251            .await?;
1252
1253        Ok(response_data.status_code())
1254    }
1255
1256    #[maybe_async::sync_impl]
1257    fn _put_object_stream_with_content_type<R: Read>(
1258        &self,
1259        reader: &mut R,
1260        s3_path: &str,
1261        content_type: &str,
1262    ) -> Result<u16, S3Error> {
1263        let msg = self.initiate_multipart_upload(s3_path, content_type)?;
1264        let path = msg.key;
1265        let upload_id = &msg.upload_id;
1266
1267        let mut part_number: u32 = 0;
1268        let mut etags = Vec::new();
1269        loop {
1270            let chunk = crate::utils::read_chunk(reader)?;
1271
1272            if chunk.len() < CHUNK_SIZE {
1273                if part_number == 0 {
1274                    // Files is not big enough for multipart upload, going with regular put_object
1275                    self.abort_upload(&path, upload_id)?;
1276
1277                    self.put_object(s3_path, chunk.as_slice())?;
1278                } else {
1279                    part_number += 1;
1280                    let part = self.put_multipart_chunk(
1281                        chunk,
1282                        &path,
1283                        part_number,
1284                        upload_id,
1285                        content_type,
1286                    )?;
1287                    etags.push(part.etag);
1288                    let inner_data = etags
1289                        .into_iter()
1290                        .enumerate()
1291                        .map(|(i, x)| Part {
1292                            etag: x,
1293                            part_number: i as u32 + 1,
1294                        })
1295                        .collect::<Vec<Part>>();
1296                    return Ok(self
1297                        .complete_multipart_upload(&path, upload_id, inner_data)?
1298                        .status_code());
1299                    // let response = std::str::from_utf8(data.as_slice())?;
1300                }
1301            } else {
1302                part_number += 1;
1303                let part =
1304                    self.put_multipart_chunk(chunk, &path, part_number, upload_id, content_type)?;
1305                etags.push(part.etag.to_string());
1306            }
1307        }
1308    }
1309
1310    /// Initiate multipart upload to s3.
1311    #[maybe_async::async_impl]
1312    pub async fn initiate_multipart_upload(
1313        &self,
1314        s3_path: &str,
1315        content_type: &str,
1316    ) -> Result<InitiateMultipartUploadResponse, S3Error> {
1317        let command = Command::InitiateMultipartUpload { content_type };
1318        let request = RequestImpl::new(self, s3_path, command)?;
1319        let response_data = request.response_data(false).await?;
1320        if response_data.status_code() >= 300 {
1321            return Err(error_from_response_data(response_data)?);
1322        }
1323
1324        let msg: InitiateMultipartUploadResponse =
1325            quick_xml::de::from_str(response_data.as_str()?)?;
1326        Ok(msg)
1327    }
1328
1329    #[maybe_async::sync_impl]
1330    pub fn initiate_multipart_upload(
1331        &self,
1332        s3_path: &str,
1333        content_type: &str,
1334    ) -> Result<InitiateMultipartUploadResponse, S3Error> {
1335        let command = Command::InitiateMultipartUpload { content_type };
1336        let request = RequestImpl::new(self, s3_path, command)?;
1337        let response_data = request.response_data(false)?;
1338        if response_data.status_code() >= 300 {
1339            return Err(error_from_response_data(response_data)?);
1340        }
1341
1342        let msg: InitiateMultipartUploadResponse =
1343            quick_xml::de::from_str(response_data.as_str()?)?;
1344        Ok(msg)
1345    }
1346
1347    /// Upload a streamed multipart chunk to s3 using a previously initiated multipart upload
1348    #[maybe_async::async_impl]
1349    pub async fn put_multipart_stream<R: Read + Unpin>(
1350        &self,
1351        reader: &mut R,
1352        path: &str,
1353        part_number: u32,
1354        upload_id: &str,
1355        content_type: &str,
1356    ) -> Result<Part, S3Error> {
1357        let chunk = crate::utils::read_chunk(reader)?;
1358        self.put_multipart_chunk(chunk, path, part_number, upload_id, content_type)
1359            .await
1360    }
1361
1362    #[maybe_async::sync_impl]
1363    pub async fn put_multipart_stream<R: Read + Unpin>(
1364        &self,
1365        reader: &mut R,
1366        path: &str,
1367        part_number: u32,
1368        upload_id: &str,
1369        content_type: &str,
1370    ) -> Result<Part, S3Error> {
1371        let chunk = crate::utils::read_chunk(reader)?;
1372        self.put_multipart_chunk(chunk, path, part_number, upload_id, content_type)
1373    }
1374
1375    /// Upload a buffered multipart chunk to s3 using a previously initiated multipart upload
1376    #[maybe_async::async_impl]
1377    pub async fn put_multipart_chunk(
1378        &self,
1379        chunk: Vec<u8>,
1380        path: &str,
1381        part_number: u32,
1382        upload_id: &str,
1383        content_type: &str,
1384    ) -> Result<Part, S3Error> {
1385        let command = Command::PutObject {
1386            // part_number,
1387            content: &chunk,
1388            multipart: Some(Multipart::new(part_number, upload_id)), // upload_id: &msg.upload_id,
1389            content_type,
1390        };
1391        let request = RequestImpl::new(self, path, command)?;
1392        let response_data = request.response_data(true).await?;
1393        if !(200..300).contains(&response_data.status_code()) {
1394            // if chunk upload failed - abort the upload
1395            match self.abort_upload(path, upload_id).await {
1396                Ok(_) => {
1397                    return Err(error_from_response_data(response_data)?);
1398                }
1399                Err(error) => {
1400                    return Err(error);
1401                }
1402            }
1403        }
1404        let etag = response_data.as_str()?;
1405        Ok(Part {
1406            etag: etag.to_string(),
1407            part_number,
1408        })
1409    }
1410
1411    #[maybe_async::sync_impl]
1412    pub fn put_multipart_chunk(
1413        &self,
1414        chunk: Vec<u8>,
1415        path: &str,
1416        part_number: u32,
1417        upload_id: &str,
1418        content_type: &str,
1419    ) -> Result<Part, S3Error> {
1420        let command = Command::PutObject {
1421            // part_number,
1422            content: &chunk,
1423            multipart: Some(Multipart::new(part_number, upload_id)), // upload_id: &msg.upload_id,
1424            content_type,
1425        };
1426        let request = RequestImpl::new(self, path, command)?;
1427        let response_data = request.response_data(true)?;
1428        if !(200..300).contains(&response_data.status_code()) {
1429            // if chunk upload failed - abort the upload
1430            match self.abort_upload(path, upload_id) {
1431                Ok(_) => {
1432                    return Err(error_from_response_data(response_data)?);
1433                }
1434                Err(error) => {
1435                    return Err(error);
1436                }
1437            }
1438        }
1439        let etag = response_data.as_str()?;
1440        Ok(Part {
1441            etag: etag.to_string(),
1442            part_number,
1443        })
1444    }
1445
1446    /// Completes a previously initiated multipart upload, with optional final data chunks
1447    #[maybe_async::async_impl]
1448    pub async fn complete_multipart_upload(
1449        &self,
1450        path: &str,
1451        upload_id: &str,
1452        parts: Vec<Part>,
1453    ) -> Result<ResponseData, S3Error> {
1454        let data = CompleteMultipartUploadData { parts };
1455        let complete = Command::CompleteMultipartUpload { upload_id, data };
1456        let complete_request = RequestImpl::new(self, path, complete)?;
1457        complete_request.response_data(false).await
1458    }
1459
1460    #[maybe_async::sync_impl]
1461    pub fn complete_multipart_upload(
1462        &self,
1463        path: &str,
1464        upload_id: &str,
1465        parts: Vec<Part>,
1466    ) -> Result<ResponseData, S3Error> {
1467        let data = CompleteMultipartUploadData { parts };
1468        let complete = Command::CompleteMultipartUpload { upload_id, data };
1469        let complete_request = RequestImpl::new(self, path, complete)?;
1470        complete_request.response_data(false)
1471    }
1472
1473    /// Get Bucket location.
1474    ///
1475    /// # Example:
1476    ///
1477    /// ```no_run
1478    /// use s3::bucket::Bucket;
1479    /// use s3::creds::Credentials;
1480    /// use anyhow::Result;
1481    ///
1482    /// # #[tokio::main]
1483    /// # async fn main() -> Result<()> {
1484    ///
1485    /// let bucket_name = "rust-s3-test";
1486    /// let region = "us-east-1".parse()?;
1487    /// let credentials = Credentials::default()?;
1488    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
1489    ///
1490    /// // Async variant with `tokio` or `async-std` features
1491    /// let (region, status_code) = bucket.location().await?;
1492    ///
1493    /// // `sync` feature will produce an identical method
1494    /// #[cfg(feature = "sync")]
1495    /// let (region, status_code) = bucket.location()?;
1496    ///
1497    /// // Blocking variant, generated with `blocking` feature in combination
1498    /// // with `tokio` or `async-std` features.
1499    /// #[cfg(feature = "blocking")]
1500    /// let (region, status_code) = bucket.location_blocking()?;
1501    /// #
1502    /// # Ok(())
1503    /// # }
1504    /// ```
1505    #[maybe_async::maybe_async]
1506    pub async fn location(&self) -> Result<(Region, u16), S3Error> {
1507        let request = RequestImpl::new(self, "?location", Command::GetBucketLocation)?;
1508        let response_data = request.response_data(false).await?;
1509        let region_string = String::from_utf8_lossy(response_data.as_slice());
1510        let region = match quick_xml::de::from_reader(region_string.as_bytes()) {
1511            Ok(r) => {
1512                let location_result: BucketLocationResult = r;
1513                location_result.region.parse()?
1514            }
1515            Err(e) => {
1516                if response_data.status_code() == 200 {
1517                    Region::Custom {
1518                        region: "Custom".to_string(),
1519                        endpoint: "".to_string(),
1520                    }
1521                } else {
1522                    Region::Custom {
1523                        region: format!("Error encountered : {}", e),
1524                        endpoint: "".to_string(),
1525                    }
1526                }
1527            }
1528        };
1529        Ok((region, response_data.status_code()))
1530    }
1531
1532    /// Delete file from an S3 path.
1533    ///
1534    /// # Example:
1535    ///
1536    /// ```no_run
1537    /// use s3::bucket::Bucket;
1538    /// use s3::creds::Credentials;
1539    /// use anyhow::Result;
1540    ///
1541    /// # #[tokio::main]
1542    /// # async fn main() -> Result<()> {
1543    ///
1544    /// let bucket_name = "rust-s3-test";
1545    /// let region = "us-east-1".parse()?;
1546    /// let credentials = Credentials::default()?;
1547    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
1548    ///
1549    /// // Async variant with `tokio` or `async-std` features
1550    /// let response_data = bucket.delete_object("/test.file").await?;
1551    ///
1552    /// // `sync` feature will produce an identical method
1553    /// #[cfg(feature = "sync")]
1554    /// let response_data = bucket.delete_object("/test.file")?;
1555    ///
1556    /// // Blocking variant, generated with `blocking` feature in combination
1557    /// // with `tokio` or `async-std` features.
1558    /// #[cfg(feature = "blocking")]
1559    /// let response_data = bucket.delete_object_blocking("/test.file")?;
1560    /// #
1561    /// # Ok(())
1562    /// # }
1563    /// ```
1564    #[maybe_async::maybe_async]
1565    pub async fn delete_object<S: AsRef<str>>(&self, path: S) -> Result<ResponseData, S3Error> {
1566        let command = Command::DeleteObject;
1567        let request = RequestImpl::new(self, path.as_ref(), command)?;
1568        request.response_data(false).await
1569    }
1570
1571    /// Head object from S3.
1572    ///
1573    /// # Example:
1574    ///
1575    /// ```no_run
1576    /// use s3::bucket::Bucket;
1577    /// use s3::creds::Credentials;
1578    /// use anyhow::Result;
1579    ///
1580    /// # #[tokio::main]
1581    /// # async fn main() -> Result<()> {
1582    ///
1583    /// let bucket_name = "rust-s3-test";
1584    /// let region = "us-east-1".parse()?;
1585    /// let credentials = Credentials::default()?;
1586    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
1587    ///
1588    /// // Async variant with `tokio` or `async-std` features
1589    /// let (head_object_result, code) = bucket.head_object("/test.png").await?;
1590    ///
1591    /// // `sync` feature will produce an identical method
1592    /// #[cfg(feature = "sync")]
1593    /// let (head_object_result, code) = bucket.head_object("/test.png")?;
1594    ///
1595    /// // Blocking variant, generated with `blocking` feature in combination
1596    /// // with `tokio` or `async-std` features.
1597    /// #[cfg(feature = "blocking")]
1598    /// let (head_object_result, code) = bucket.head_object_blocking("/test.png")?;
1599    /// #
1600    /// # Ok(())
1601    /// # }
1602    /// ```
1603    #[maybe_async::maybe_async]
1604    pub async fn head_object<S: AsRef<str>>(
1605        &self,
1606        path: S,
1607    ) -> Result<(HeadObjectResult, u16), S3Error> {
1608        let command = Command::HeadObject;
1609        let request = RequestImpl::new(self, path.as_ref(), command)?;
1610        let (headers, status) = request.response_header().await?;
1611        let header_object = HeadObjectResult::from(&headers);
1612        Ok((header_object, status))
1613    }
1614
1615    /// Put into an S3 bucket, with explicit content-type.
1616    ///
1617    /// # Example:
1618    ///
1619    /// ```no_run
1620    /// use s3::bucket::Bucket;
1621    /// use s3::creds::Credentials;
1622    /// use anyhow::Result;
1623    ///
1624    /// # #[tokio::main]
1625    /// # async fn main() -> Result<()> {
1626    ///
1627    /// let bucket_name = "rust-s3-test";
1628    /// let region = "us-east-1".parse()?;
1629    /// let credentials = Credentials::default()?;
1630    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
1631    /// let content = "I want to go to S3".as_bytes();
1632    ///
1633    /// // Async variant with `tokio` or `async-std` features
1634    /// let response_data = bucket.put_object_with_content_type("/test.file", content, "text/plain").await?;
1635    ///
1636    /// // `sync` feature will produce an identical method
1637    /// #[cfg(feature = "sync")]
1638    /// let response_data = bucket.put_object_with_content_type("/test.file", content, "text/plain")?;
1639    ///
1640    /// // Blocking variant, generated with `blocking` feature in combination
1641    /// // with `tokio` or `async-std` features.
1642    /// #[cfg(feature = "blocking")]
1643    /// let response_data = bucket.put_object_with_content_type_blocking("/test.file", content, "text/plain")?;
1644    /// #
1645    /// # Ok(())
1646    /// # }
1647    /// ```
1648    #[maybe_async::maybe_async]
1649    pub async fn put_object_with_content_type<S: AsRef<str>>(
1650        &self,
1651        path: S,
1652        content: &[u8],
1653        content_type: &str,
1654    ) -> Result<ResponseData, S3Error> {
1655        let command = Command::PutObject {
1656            content,
1657            content_type,
1658            multipart: None,
1659        };
1660        let request = RequestImpl::new(self, path.as_ref(), command)?;
1661        request.response_data(true).await
1662    }
1663
1664    /// Put into an S3 bucket.
1665    ///
1666    /// # Example:
1667    ///
1668    /// ```no_run
1669    /// use s3::bucket::Bucket;
1670    /// use s3::creds::Credentials;
1671    /// use anyhow::Result;
1672    ///
1673    /// # #[tokio::main]
1674    /// # async fn main() -> Result<()> {
1675    ///
1676    /// let bucket_name = "rust-s3-test";
1677    /// let region = "us-east-1".parse()?;
1678    /// let credentials = Credentials::default()?;
1679    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
1680    /// let content = "I want to go to S3".as_bytes();
1681    ///
1682    /// // Async variant with `tokio` or `async-std` features
1683    /// let response_data = bucket.put_object("/test.file", content).await?;
1684    ///
1685    /// // `sync` feature will produce an identical method
1686    /// #[cfg(feature = "sync")]
1687    /// let response_data = bucket.put_object("/test.file", content)?;
1688    ///
1689    /// // Blocking variant, generated with `blocking` feature in combination
1690    /// // with `tokio` or `async-std` features.
1691    /// #[cfg(feature = "blocking")]
1692    /// let response_data = bucket.put_object_blocking("/test.file", content)?;
1693    /// #
1694    /// # Ok(())
1695    /// # }
1696    /// ```
1697    #[maybe_async::maybe_async]
1698    pub async fn put_object<S: AsRef<str>>(
1699        &self,
1700        path: S,
1701        content: &[u8],
1702    ) -> Result<ResponseData, S3Error> {
1703        self.put_object_with_content_type(path, content, "application/octet-stream")
1704            .await
1705    }
1706
1707    fn _tags_xml<S: AsRef<str>>(&self, tags: &[(S, S)]) -> String {
1708        let mut s = String::new();
1709        let content = tags
1710            .iter()
1711            .map(|(name, value)| {
1712                format!(
1713                    "<Tag><Key>{}</Key><Value>{}</Value></Tag>",
1714                    name.as_ref(),
1715                    value.as_ref()
1716                )
1717            })
1718            .fold(String::new(), |mut a, b| {
1719                a.push_str(b.as_str());
1720                a
1721            });
1722        s.push_str("<Tagging><TagSet>");
1723        s.push_str(&content);
1724        s.push_str("</TagSet></Tagging>");
1725        s
1726    }
1727
1728    /// Tag an S3 object.
1729    ///
1730    /// # Example:
1731    ///
1732    /// ```no_run
1733    /// use s3::bucket::Bucket;
1734    /// use s3::creds::Credentials;
1735    /// use anyhow::Result;
1736    ///
1737    /// # #[tokio::main]
1738    /// # async fn main() -> Result<()> {
1739    ///
1740    /// let bucket_name = "rust-s3-test";
1741    /// let region = "us-east-1".parse()?;
1742    /// let credentials = Credentials::default()?;
1743    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
1744    ///
1745    /// // Async variant with `tokio` or `async-std` features
1746    /// let response_data = bucket.put_object_tagging("/test.file", &[("Tag1", "Value1"), ("Tag2", "Value2")]).await?;
1747    ///
1748    /// // `sync` feature will produce an identical method
1749    /// #[cfg(feature = "sync")]
1750    /// let response_data = bucket.put_object_tagging("/test.file", &[("Tag1", "Value1"), ("Tag2", "Value2")])?;
1751    ///
1752    /// // Blocking variant, generated with `blocking` feature in combination
1753    /// // with `tokio` or `async-std` features.
1754    /// #[cfg(feature = "blocking")]
1755    /// let response_data = bucket.put_object_tagging_blocking("/test.file", &[("Tag1", "Value1"), ("Tag2", "Value2")])?;
1756    /// #
1757    /// # Ok(())
1758    /// # }
1759    /// ```
1760    #[maybe_async::maybe_async]
1761    pub async fn put_object_tagging<S: AsRef<str>>(
1762        &self,
1763        path: &str,
1764        tags: &[(S, S)],
1765    ) -> Result<ResponseData, S3Error> {
1766        let content = self._tags_xml(tags);
1767        let command = Command::PutObjectTagging { tags: &content };
1768        let request = RequestImpl::new(self, path, command)?;
1769        request.response_data(false).await
1770    }
1771
1772    /// Delete tags from an S3 object.
1773    ///
1774    /// # Example:
1775    ///
1776    /// ```no_run
1777    /// use s3::bucket::Bucket;
1778    /// use s3::creds::Credentials;
1779    /// use anyhow::Result;
1780    ///
1781    /// # #[tokio::main]
1782    /// # async fn main() -> Result<()> {
1783    ///
1784    /// let bucket_name = "rust-s3-test";
1785    /// let region = "us-east-1".parse()?;
1786    /// let credentials = Credentials::default()?;
1787    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
1788    ///
1789    /// // Async variant with `tokio` or `async-std` features
1790    /// let response_data = bucket.delete_object_tagging("/test.file").await?;
1791    ///
1792    /// // `sync` feature will produce an identical method
1793    /// #[cfg(feature = "sync")]
1794    /// let response_data = bucket.delete_object_tagging("/test.file")?;
1795    ///
1796    /// // Blocking variant, generated with `blocking` feature in combination
1797    /// // with `tokio` or `async-std` features.
1798    /// #[cfg(feature = "blocking")]
1799    /// let response_data = bucket.delete_object_tagging_blocking("/test.file")?;
1800    /// #
1801    /// # Ok(())
1802    /// # }
1803    /// ```
1804    #[maybe_async::maybe_async]
1805    pub async fn delete_object_tagging<S: AsRef<str>>(
1806        &self,
1807        path: S,
1808    ) -> Result<ResponseData, S3Error> {
1809        let command = Command::DeleteObjectTagging;
1810        let request = RequestImpl::new(self, path.as_ref(), command)?;
1811        request.response_data(false).await
1812    }
1813
1814    /// Retrieve an S3 object list of tags.
1815    ///
1816    /// # Example:
1817    ///
1818    /// ```no_run
1819    /// use s3::bucket::Bucket;
1820    /// use s3::creds::Credentials;
1821    /// use anyhow::Result;
1822    ///
1823    /// # #[tokio::main]
1824    /// # async fn main() -> Result<()> {
1825    ///
1826    /// let bucket_name = "rust-s3-test";
1827    /// let region = "us-east-1".parse()?;
1828    /// let credentials = Credentials::default()?;
1829    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
1830    ///
1831    /// // Async variant with `tokio` or `async-std` features
1832    /// let response_data = bucket.get_object_tagging("/test.file").await?;
1833    ///
1834    /// // `sync` feature will produce an identical method
1835    /// #[cfg(feature = "sync")]
1836    /// let response_data = bucket.get_object_tagging("/test.file")?;
1837    ///
1838    /// // Blocking variant, generated with `blocking` feature in combination
1839    /// // with `tokio` or `async-std` features.
1840    /// #[cfg(feature = "blocking")]
1841    /// let response_data = bucket.get_object_tagging_blocking("/test.file")?;
1842    /// #
1843    /// # Ok(())
1844    /// # }
1845    /// ```
1846    #[cfg(feature = "tags")]
1847    #[maybe_async::maybe_async]
1848    pub async fn get_object_tagging<S: AsRef<str>>(
1849        &self,
1850        path: S,
1851    ) -> Result<(Vec<Tag>, u16), S3Error> {
1852        let command = Command::GetObjectTagging {};
1853        let request = RequestImpl::new(self, path.as_ref(), command)?;
1854        let result = request.response_data(false).await?;
1855
1856        let mut tags = Vec::new();
1857
1858        if result.status_code() == 200 {
1859            let result_string = String::from_utf8_lossy(result.as_slice());
1860
1861            // Add namespace if it doesn't exist
1862            let ns = "http://s3.amazonaws.com/doc/2006-03-01/";
1863            let result_string =
1864                if let Err(minidom::Error::MissingNamespace) = result_string.parse::<Element>() {
1865                    result_string
1866                        .replace("<Tagging>", &format!("<Tagging xmlns=\"{}\">", ns))
1867                        .into()
1868                } else {
1869                    result_string
1870                };
1871
1872            if let Ok(tagging) = result_string.parse::<Element>() {
1873                for tag_set in tagging.children() {
1874                    if tag_set.is("TagSet", ns) {
1875                        for tag in tag_set.children() {
1876                            if tag.is("Tag", ns) {
1877                                let key = if let Some(element) = tag.get_child("Key", ns) {
1878                                    element.text()
1879                                } else {
1880                                    "Could not parse Key from Tag".to_string()
1881                                };
1882                                let value = if let Some(element) = tag.get_child("Value", ns) {
1883                                    element.text()
1884                                } else {
1885                                    "Could not parse Values from Tag".to_string()
1886                                };
1887                                tags.push(Tag { key, value });
1888                            }
1889                        }
1890                    }
1891                }
1892            }
1893        }
1894
1895        Ok((tags, result.status_code()))
1896    }
1897
1898    #[maybe_async::maybe_async]
1899    pub async fn list_page(
1900        &self,
1901        prefix: String,
1902        delimiter: Option<String>,
1903        continuation_token: Option<String>,
1904        start_after: Option<String>,
1905        max_keys: Option<usize>,
1906    ) -> Result<(ListBucketResult, u16), S3Error> {
1907        let command = if self.listobjects_v2 {
1908            Command::ListObjectsV2 {
1909                prefix,
1910                delimiter,
1911                continuation_token,
1912                start_after,
1913                max_keys,
1914            }
1915        } else {
1916            // In the v1 ListObjects request, there is only one "marker"
1917            // field that serves as both the initial starting position,
1918            // and as the continuation token.
1919            Command::ListObjects {
1920                prefix,
1921                delimiter,
1922                marker: std::cmp::max(continuation_token, start_after),
1923                max_keys,
1924            }
1925        };
1926        let request = RequestImpl::new(self, "/", command)?;
1927        let response_data = request.response_data(false).await?;
1928        let list_bucket_result = quick_xml::de::from_reader(response_data.as_slice())?;
1929
1930        Ok((list_bucket_result, response_data.status_code()))
1931    }
1932
1933    /// List the contents of an S3 bucket.
1934    ///
1935    /// # Example:
1936    ///
1937    /// ```no_run
1938    /// use s3::bucket::Bucket;
1939    /// use s3::creds::Credentials;
1940    /// use anyhow::Result;
1941    ///
1942    /// # #[tokio::main]
1943    /// # async fn main() -> Result<()> {
1944    ///
1945    /// let bucket_name = "rust-s3-test";
1946    /// let region = "us-east-1".parse()?;
1947    /// let credentials = Credentials::default()?;
1948    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
1949    ///
1950    /// // Async variant with `tokio` or `async-std` features
1951    /// let results = bucket.list("/".to_string(), Some("/".to_string())).await?;
1952    ///
1953    /// // `sync` feature will produce an identical method
1954    /// #[cfg(feature = "sync")]
1955    /// let results = bucket.list("/".to_string(), Some("/".to_string()))?;
1956    ///
1957    /// // Blocking variant, generated with `blocking` feature in combination
1958    /// // with `tokio` or `async-std` features.
1959    /// #[cfg(feature = "blocking")]
1960    /// let results = bucket.list_blocking("/".to_string(), Some("/".to_string()))?;
1961    /// #
1962    /// # Ok(())
1963    /// # }
1964    /// ```
1965    #[maybe_async::maybe_async]
1966    pub async fn list(
1967        &self,
1968        prefix: String,
1969        delimiter: Option<String>,
1970    ) -> Result<Vec<ListBucketResult>, S3Error> {
1971        let the_bucket = self.to_owned();
1972        let mut results = Vec::new();
1973        let mut continuation_token = None;
1974
1975        loop {
1976            let (list_bucket_result, _) = the_bucket
1977                .list_page(
1978                    prefix.clone(),
1979                    delimiter.clone(),
1980                    continuation_token,
1981                    None,
1982                    None,
1983                )
1984                .await?;
1985            continuation_token = list_bucket_result.next_continuation_token.clone();
1986            results.push(list_bucket_result);
1987            if continuation_token.is_none() {
1988                break;
1989            }
1990        }
1991
1992        Ok(results)
1993    }
1994
1995    #[maybe_async::maybe_async]
1996    pub async fn list_multiparts_uploads_page(
1997        &self,
1998        prefix: Option<&str>,
1999        delimiter: Option<&str>,
2000        key_marker: Option<String>,
2001        max_uploads: Option<usize>,
2002    ) -> Result<(ListMultipartUploadsResult, u16), S3Error> {
2003        let command = Command::ListMultipartUploads {
2004            prefix,
2005            delimiter,
2006            key_marker,
2007            max_uploads,
2008        };
2009        let request = RequestImpl::new(self, "/", command)?;
2010        let response_data = request.response_data(false).await?;
2011        let list_bucket_result = quick_xml::de::from_reader(response_data.as_slice())?;
2012
2013        Ok((list_bucket_result, response_data.status_code()))
2014    }
2015
2016    /// List the ongoing multipart uploads of an S3 bucket. This may be useful to cleanup failed
2017    /// uploads, together with [`crate::bucket::Bucket::abort_upload`].
2018    ///
2019    /// # Example:
2020    ///
2021    /// ```no_run
2022    /// use s3::bucket::Bucket;
2023    /// use s3::creds::Credentials;
2024    /// use anyhow::Result;
2025    ///
2026    /// # #[tokio::main]
2027    /// # async fn main() -> Result<()> {
2028    ///
2029    /// let bucket_name = "rust-s3-test";
2030    /// let region = "us-east-1".parse()?;
2031    /// let credentials = Credentials::default()?;
2032    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
2033    ///
2034    /// // Async variant with `tokio` or `async-std` features
2035    /// let results = bucket.list_multiparts_uploads(Some("/"), Some("/")).await?;
2036    ///
2037    /// // `sync` feature will produce an identical method
2038    /// #[cfg(feature = "sync")]
2039    /// let results = bucket.list_multiparts_uploads(Some("/"), Some("/"))?;
2040    ///
2041    /// // Blocking variant, generated with `blocking` feature in combination
2042    /// // with `tokio` or `async-std` features.
2043    /// #[cfg(feature = "blocking")]
2044    /// let results = bucket.list_multiparts_uploads_blocking(Some("/"), Some("/"))?;
2045    /// #
2046    /// # Ok(())
2047    /// # }
2048    /// ```
2049    #[maybe_async::maybe_async]
2050    pub async fn list_multiparts_uploads(
2051        &self,
2052        prefix: Option<&str>,
2053        delimiter: Option<&str>,
2054    ) -> Result<Vec<ListMultipartUploadsResult>, S3Error> {
2055        let the_bucket = self.to_owned();
2056        let mut results = Vec::new();
2057        let mut next_marker: Option<String> = None;
2058
2059        loop {
2060            let (list_multiparts_uploads_result, _) = the_bucket
2061                .list_multiparts_uploads_page(prefix, delimiter, next_marker, None)
2062                .await?;
2063
2064            let is_truncated = list_multiparts_uploads_result.is_truncated;
2065            next_marker = list_multiparts_uploads_result.next_marker.clone();
2066            results.push(list_multiparts_uploads_result);
2067
2068            if !is_truncated {
2069                break;
2070            }
2071        }
2072
2073        Ok(results)
2074    }
2075
2076    /// Abort a running multipart upload.
2077    ///
2078    /// # Example:
2079    ///
2080    /// ```no_run
2081    /// use s3::bucket::Bucket;
2082    /// use s3::creds::Credentials;
2083    /// use anyhow::Result;
2084    ///
2085    /// # #[tokio::main]
2086    /// # async fn main() -> Result<()> {
2087    ///
2088    /// let bucket_name = "rust-s3-test";
2089    /// let region = "us-east-1".parse()?;
2090    /// let credentials = Credentials::default()?;
2091    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
2092    ///
2093    /// // Async variant with `tokio` or `async-std` features
2094    /// let results = bucket.abort_upload("/some/file.txt", "ZDFjM2I0YmEtMzU3ZC00OTQ1LTlkNGUtMTgxZThjYzIwNjA2").await?;
2095    ///
2096    /// // `sync` feature will produce an identical method
2097    /// #[cfg(feature = "sync")]
2098    /// let results = bucket.abort_upload("/some/file.txt", "ZDFjM2I0YmEtMzU3ZC00OTQ1LTlkNGUtMTgxZThjYzIwNjA2")?;
2099    ///
2100    /// // Blocking variant, generated with `blocking` feature in combination
2101    /// // with `tokio` or `async-std` features.
2102    /// #[cfg(feature = "blocking")]
2103    /// let results = bucket.abort_upload_blocking("/some/file.txt", "ZDFjM2I0YmEtMzU3ZC00OTQ1LTlkNGUtMTgxZThjYzIwNjA2")?;
2104    /// #
2105    /// # Ok(())
2106    /// # }
2107    /// ```
2108    #[maybe_async::maybe_async]
2109    pub async fn abort_upload(&self, key: &str, upload_id: &str) -> Result<(), S3Error> {
2110        let abort = Command::AbortMultipartUpload { upload_id };
2111        let abort_request = RequestImpl::new(self, key, abort)?;
2112        let response_data = abort_request.response_data(false).await?;
2113
2114        if (200..300).contains(&response_data.status_code()) {
2115            Ok(())
2116        } else {
2117            let utf8_content = String::from_utf8(response_data.as_slice().to_vec())?;
2118            Err(S3Error::Http(response_data.status_code(), utf8_content))
2119        }
2120    }
2121
2122    /// Get path_style field of the Bucket struct
2123    pub fn is_path_style(&self) -> bool {
2124        self.path_style
2125    }
2126
2127    /// Get negated path_style field of the Bucket struct
2128    pub fn is_subdomain_style(&self) -> bool {
2129        !self.path_style
2130    }
2131
2132    /// Configure bucket to use path-style urls and headers
2133    pub fn set_path_style(&mut self) {
2134        self.path_style = true;
2135    }
2136
2137    /// Configure bucket to use subdomain style urls and headers \[default\]
2138    pub fn set_subdomain_style(&mut self) {
2139        self.path_style = false;
2140    }
2141
2142    /// Configure bucket to apply this request timeout to all HTTP
2143    /// requests, or no (infinity) timeout if `None`.  Defaults to
2144    /// 30 seconds.
2145    ///
2146    /// Only the attohttpc and the Reqwest backends obey this option;
2147    /// async code may instead await with a timeout.
2148    pub fn set_request_timeout(&mut self, timeout: Option<Duration>) {
2149        self.request_timeout = timeout;
2150    }
2151
2152    /// Configure bucket to use the older ListObjects API
2153    ///
2154    /// If your provider doesn't support the ListObjectsV2 interface, set this to
2155    /// use the v1 ListObjects interface instead. This is currently needed at least
2156    /// for Google Cloud Storage.
2157    pub fn set_listobjects_v1(&mut self) {
2158        self.listobjects_v2 = false;
2159    }
2160
2161    /// Configure bucket to use the newer ListObjectsV2 API
2162    pub fn set_listobjects_v2(&mut self) {
2163        self.listobjects_v2 = true;
2164    }
2165
2166    /// Get a reference to the name of the S3 bucket.
2167    pub fn name(&self) -> String {
2168        self.name.to_string()
2169    }
2170
2171    // Get a reference to the hostname of the S3 API endpoint.
2172    pub fn host(&self) -> String {
2173        if self.path_style {
2174            self.path_style_host()
2175        } else {
2176            self.subdomain_style_host()
2177        }
2178    }
2179
2180    pub fn url(&self) -> String {
2181        if self.path_style {
2182            format!(
2183                "{}://{}/{}",
2184                self.scheme(),
2185                self.path_style_host(),
2186                self.name()
2187            )
2188        } else {
2189            format!("{}://{}", self.scheme(), self.subdomain_style_host())
2190        }
2191    }
2192
2193    /// Get a paths-style reference to the hostname of the S3 API endpoint.
2194    pub fn path_style_host(&self) -> String {
2195        self.region.host()
2196    }
2197
2198    pub fn subdomain_style_host(&self) -> String {
2199        format!("{}.{}", self.name, self.region.host())
2200    }
2201
2202    // pub fn self_host(&self) -> String {
2203    //     format!("{}.{}", self.name, self.region.host())
2204    // }
2205
2206    pub fn scheme(&self) -> String {
2207        self.region.scheme()
2208    }
2209
2210    /// Get the region this object will connect to.
2211    pub fn region(&self) -> Region {
2212        self.region.clone()
2213    }
2214
2215    /// Get a reference to the AWS access key.
2216    pub fn access_key(&self) -> Result<Option<String>, S3Error> {
2217        Ok(self
2218            .credentials()
2219            .try_read()
2220            .map_err(|_| S3Error::RLCredentials)?
2221            .access_key
2222            .clone()
2223            .map(|key| key.replace('\n', "")))
2224    }
2225
2226    /// Get a reference to the AWS secret key.
2227    pub fn secret_key(&self) -> Result<Option<String>, S3Error> {
2228        Ok(self
2229            .credentials()
2230            .try_read()
2231            .map_err(|_| S3Error::RLCredentials)?
2232            .secret_key
2233            .clone()
2234            .map(|key| key.replace('\n', "")))
2235    }
2236
2237    /// Get a reference to the AWS security token.
2238    pub fn security_token(&self) -> Result<Option<String>, S3Error> {
2239        Ok(self
2240            .credentials()
2241            .try_read()
2242            .map_err(|_| S3Error::RLCredentials)?
2243            .security_token
2244            .clone())
2245    }
2246
2247    /// Get a reference to the AWS session token.
2248    pub fn session_token(&self) -> Result<Option<String>, S3Error> {
2249        Ok(self
2250            .credentials()
2251            .try_read()
2252            .map_err(|_| S3Error::RLCredentials)?
2253            .session_token
2254            .clone())
2255    }
2256
2257    /// Get a reference to the full [`Credentials`](struct.Credentials.html)
2258    /// object used by this `Bucket`.
2259    pub fn credentials(&self) -> Arc<RwLock<Credentials>> {
2260        self.credentials.clone()
2261    }
2262
2263    /// Change the credentials used by the Bucket.
2264    pub fn set_credentials(&mut self, credentials: Credentials) {
2265        self.credentials = Arc::new(RwLock::new(credentials));
2266    }
2267
2268    /// Add an extra header to send with requests to S3.
2269    ///
2270    /// Add an extra header to send with requests. Note that the library
2271    /// already sets a number of headers - headers set with this method will be
2272    /// overridden by the library headers:
2273    ///   * Host
2274    ///   * Content-Type
2275    ///   * Date
2276    ///   * Content-Length
2277    ///   * Authorization
2278    ///   * X-Amz-Content-Sha256
2279    ///   * X-Amz-Date
2280    pub fn add_header(&mut self, key: &str, value: &str) {
2281        self.extra_headers
2282            .insert(HeaderName::from_str(key).unwrap(), value.parse().unwrap());
2283    }
2284
2285    /// Get a reference to the extra headers to be passed to the S3 API.
2286    pub fn extra_headers(&self) -> &HeaderMap {
2287        &self.extra_headers
2288    }
2289
2290    /// Get a mutable reference to the extra headers to be passed to the S3
2291    /// API.
2292    pub fn extra_headers_mut(&mut self) -> &mut HeaderMap {
2293        &mut self.extra_headers
2294    }
2295
2296    /// Add an extra query pair to the URL used for S3 API access.
2297    pub fn add_query(&mut self, key: &str, value: &str) {
2298        self.extra_query.insert(key.into(), value.into());
2299    }
2300
2301    /// Get a reference to the extra query pairs to be passed to the S3 API.
2302    pub fn extra_query(&self) -> &Query {
2303        &self.extra_query
2304    }
2305
2306    /// Get a mutable reference to the extra query pairs to be passed to the S3
2307    /// API.
2308    pub fn extra_query_mut(&mut self) -> &mut Query {
2309        &mut self.extra_query
2310    }
2311
2312    pub fn request_timeout(&self) -> Option<Duration> {
2313        self.request_timeout
2314    }
2315}
2316
2317#[cfg(test)]
2318mod test {
2319
2320    use crate::creds::Credentials;
2321    use crate::region::Region;
2322    use crate::Bucket;
2323    use crate::BucketConfiguration;
2324    use crate::Tag;
2325    use http::header::HeaderName;
2326    use http::HeaderMap;
2327    use std::env;
2328
2329    fn init() {
2330        let _ = env_logger::builder().is_test(true).try_init();
2331    }
2332
2333    fn test_aws_credentials() -> Credentials {
2334        Credentials::new(
2335            Some(&env::var("EU_AWS_ACCESS_KEY_ID").unwrap()),
2336            Some(&env::var("EU_AWS_SECRET_ACCESS_KEY").unwrap()),
2337            None,
2338            None,
2339            None,
2340        )
2341        .unwrap()
2342    }
2343
2344    fn test_gc_credentials() -> Credentials {
2345        Credentials::new(
2346            Some(&env::var("GC_ACCESS_KEY_ID").unwrap()),
2347            Some(&env::var("GC_SECRET_ACCESS_KEY").unwrap()),
2348            None,
2349            None,
2350            None,
2351        )
2352        .unwrap()
2353    }
2354
2355    fn test_wasabi_credentials() -> Credentials {
2356        Credentials::new(
2357            Some(&env::var("WASABI_ACCESS_KEY_ID").unwrap()),
2358            Some(&env::var("WASABI_SECRET_ACCESS_KEY").unwrap()),
2359            None,
2360            None,
2361            None,
2362        )
2363        .unwrap()
2364    }
2365
2366    fn test_minio_credentials() -> Credentials {
2367        Credentials::new(Some("test"), Some("test1234"), None, None, None).unwrap()
2368    }
2369
2370    fn test_digital_ocean_credentials() -> Credentials {
2371        Credentials::new(
2372            Some(&env::var("DIGITAL_OCEAN_ACCESS_KEY_ID").unwrap()),
2373            Some(&env::var("DIGITAL_OCEAN_SECRET_ACCESS_KEY").unwrap()),
2374            None,
2375            None,
2376            None,
2377        )
2378        .unwrap()
2379    }
2380
2381    fn test_r2_credentials() -> Credentials {
2382        Credentials::new(
2383            Some(&env::var("R2_ACCESS_KEY_ID").unwrap()),
2384            Some(&env::var("R2_SECRET_ACCESS_KEY").unwrap()),
2385            None,
2386            None,
2387            None,
2388        )
2389        .unwrap()
2390    }
2391
2392    fn test_aws_bucket() -> Bucket {
2393        Bucket::new(
2394            "rust-s3-test",
2395            "eu-central-1".parse().unwrap(),
2396            test_aws_credentials(),
2397        )
2398        .unwrap()
2399    }
2400
2401    fn test_wasabi_bucket() -> Bucket {
2402        Bucket::new(
2403            "rust-s3",
2404            "wa-eu-central-1".parse().unwrap(),
2405            test_wasabi_credentials(),
2406        )
2407        .unwrap()
2408    }
2409
2410    fn test_gc_bucket() -> Bucket {
2411        let mut bucket = Bucket::new(
2412            "rust-s3",
2413            Region::Custom {
2414                region: "us-east1".to_owned(),
2415                endpoint: "https://storage.googleapis.com".to_owned(),
2416            },
2417            test_gc_credentials(),
2418        )
2419        .unwrap();
2420        bucket.set_listobjects_v1();
2421        bucket
2422    }
2423
2424    fn test_minio_bucket() -> Bucket {
2425        Bucket::new(
2426            "rust-s3",
2427            Region::Custom {
2428                region: "eu-central-1".to_owned(),
2429                endpoint: "http://localhost:9000".to_owned(),
2430            },
2431            test_minio_credentials(),
2432        )
2433        .unwrap()
2434        .with_path_style()
2435    }
2436
2437    fn test_digital_ocean_bucket() -> Bucket {
2438        Bucket::new("rust-s3", Region::DoFra1, test_digital_ocean_credentials()).unwrap()
2439    }
2440
2441    fn test_r2_bucket() -> Bucket {
2442        Bucket::new(
2443            "rust-s3",
2444            Region::R2 {
2445                account_id: "f048f3132be36fa1aaa8611992002b3f".to_string(),
2446            },
2447            test_r2_credentials(),
2448        )
2449        .unwrap()
2450    }
2451
2452    fn object(size: u32) -> Vec<u8> {
2453        (0..size).map(|_| 33).collect()
2454    }
2455
2456    #[maybe_async::maybe_async]
2457    async fn put_head_get_delete_object(bucket: Bucket, head: bool) {
2458        let s3_path = "/+test.file";
2459        let test: Vec<u8> = object(3072);
2460
2461        let response_data = bucket.put_object(s3_path, &test).await.unwrap();
2462        assert_eq!(response_data.status_code(), 200);
2463        let response_data = bucket.get_object(s3_path).await.unwrap();
2464        assert_eq!(response_data.status_code(), 200);
2465        assert_eq!(test, response_data.as_slice());
2466
2467        let response_data = bucket
2468            .get_object_range(s3_path, 100, Some(1000))
2469            .await
2470            .unwrap();
2471        assert_eq!(response_data.status_code(), 206);
2472        assert_eq!(test[100..1001].to_vec(), response_data.as_slice());
2473        if head {
2474            let (head_object_result, code) = bucket.head_object(s3_path).await.unwrap();
2475            assert_eq!(code, 200);
2476            assert_eq!(
2477                head_object_result.content_type.unwrap(),
2478                "application/octet-stream".to_owned()
2479            );
2480        }
2481
2482        // println!("{:?}", head_object_result);
2483        let response_data = bucket.delete_object(s3_path).await.unwrap();
2484        assert_eq!(response_data.status_code(), 204);
2485    }
2486
2487    #[ignore]
2488    #[cfg(feature = "tags")]
2489    #[maybe_async::test(
2490        feature = "sync",
2491        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
2492        async(
2493            all(not(feature = "sync"), feature = "with-async-std"),
2494            async_std::test
2495        )
2496    )]
2497    async fn test_tagging_aws() {
2498        let bucket = test_aws_bucket();
2499        let _target_tags = vec![
2500            Tag {
2501                key: "Tag1".to_string(),
2502                value: "Value1".to_string(),
2503            },
2504            Tag {
2505                key: "Tag2".to_string(),
2506                value: "Value2".to_string(),
2507            },
2508        ];
2509        let empty_tags: Vec<Tag> = Vec::new();
2510        let response_data = bucket
2511            .put_object("tagging_test", b"Gimme tags")
2512            .await
2513            .unwrap();
2514        assert_eq!(response_data.status_code(), 200);
2515        let (tags, _code) = bucket.get_object_tagging("tagging_test").await.unwrap();
2516        assert_eq!(tags, empty_tags);
2517        let response_data = bucket
2518            .put_object_tagging("tagging_test", &[("Tag1", "Value1"), ("Tag2", "Value2")])
2519            .await
2520            .unwrap();
2521        assert_eq!(response_data.status_code(), 200);
2522        // This could be eventually consistent now
2523        let (_tags, _code) = bucket.get_object_tagging("tagging_test").await.unwrap();
2524        // assert_eq!(tags, target_tags)
2525        let _response_data = bucket.delete_object("tagging_test").await.unwrap();
2526    }
2527
2528    #[ignore]
2529    #[cfg(feature = "tags")]
2530    #[maybe_async::test(
2531        feature = "sync",
2532        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
2533        async(
2534            all(not(feature = "sync"), feature = "with-async-std"),
2535            async_std::test
2536        )
2537    )]
2538    async fn test_tagging_minio() {
2539        let bucket = test_minio_bucket();
2540        let _target_tags = vec![
2541            Tag {
2542                key: "Tag1".to_string(),
2543                value: "Value1".to_string(),
2544            },
2545            Tag {
2546                key: "Tag2".to_string(),
2547                value: "Value2".to_string(),
2548            },
2549        ];
2550        let empty_tags: Vec<Tag> = Vec::new();
2551        let response_data = bucket
2552            .put_object("tagging_test", b"Gimme tags")
2553            .await
2554            .unwrap();
2555        assert_eq!(response_data.status_code(), 200);
2556        let (tags, _code) = bucket.get_object_tagging("tagging_test").await.unwrap();
2557        assert_eq!(tags, empty_tags);
2558        let response_data = bucket
2559            .put_object_tagging("tagging_test", &[("Tag1", "Value1"), ("Tag2", "Value2")])
2560            .await
2561            .unwrap();
2562        assert_eq!(response_data.status_code(), 200);
2563        // This could be eventually consistent now
2564        let (_tags, _code) = bucket.get_object_tagging("tagging_test").await.unwrap();
2565        // assert_eq!(tags, target_tags)
2566        let _response_data = bucket.delete_object("tagging_test").await.unwrap();
2567    }
2568
2569    #[ignore]
2570    #[maybe_async::test(
2571        feature = "sync",
2572        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
2573        async(
2574            all(not(feature = "sync"), feature = "with-async-std"),
2575            async_std::test
2576        )
2577    )]
2578    async fn streaming_big_aws_put_head_get_delete_object() {
2579        streaming_test_put_get_delete_big_object(test_aws_bucket()).await;
2580    }
2581
2582    #[ignore]
2583    #[maybe_async::test(
2584        feature = "sync",
2585        async(
2586            all(
2587                not(feature = "sync"),
2588                not(feature = "tokio-rustls-tls"),
2589                feature = "with-tokio"
2590            ),
2591            tokio::test
2592        ),
2593        async(
2594            all(not(feature = "sync"), feature = "with-async-std"),
2595            async_std::test
2596        )
2597    )]
2598    async fn streaming_big_gc_put_head_get_delete_object() {
2599        streaming_test_put_get_delete_big_object(test_gc_bucket()).await;
2600    }
2601
2602    #[ignore]
2603    #[maybe_async::test(
2604        feature = "sync",
2605        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
2606        async(
2607            all(not(feature = "sync"), feature = "with-async-std"),
2608            async_std::test
2609        )
2610    )]
2611    async fn streaming_big_minio_put_head_get_delete_object() {
2612        streaming_test_put_get_delete_big_object(test_minio_bucket()).await;
2613    }
2614
2615    // Test multi-part upload
2616    #[maybe_async::maybe_async]
2617    async fn streaming_test_put_get_delete_big_object(bucket: Bucket) {
2618        #[cfg(feature = "with-async-std")]
2619        use async_std::fs::File;
2620        #[cfg(feature = "with-async-std")]
2621        use async_std::io::WriteExt;
2622        #[cfg(feature = "with-async-std")]
2623        use async_std::stream::StreamExt;
2624        #[cfg(feature = "with-tokio")]
2625        use futures::StreamExt;
2626        #[cfg(not(any(feature = "with-tokio", feature = "with-async-std")))]
2627        use std::fs::File;
2628        #[cfg(not(any(feature = "with-tokio", feature = "with-async-std")))]
2629        use std::io::Write;
2630        #[cfg(feature = "with-tokio")]
2631        use tokio::fs::File;
2632        #[cfg(feature = "with-tokio")]
2633        use tokio::io::AsyncWriteExt;
2634
2635        init();
2636        let remote_path = "+stream_test_big";
2637        let local_path = "+stream_test_big";
2638        std::fs::remove_file(remote_path).unwrap_or_else(|_| {});
2639        let content: Vec<u8> = object(20_000_000);
2640
2641        let mut file = File::create(local_path).await.unwrap();
2642        file.write_all(&content).await.unwrap();
2643        let mut reader = File::open(local_path).await.unwrap();
2644
2645        let code = bucket
2646            .put_object_stream(&mut reader, remote_path)
2647            .await
2648            .unwrap();
2649        assert_eq!(code, 200);
2650        let mut writer = Vec::new();
2651        let code = bucket
2652            .get_object_to_writer(remote_path, &mut writer)
2653            .await
2654            .unwrap();
2655        assert_eq!(code, 200);
2656        // assert_eq!(content, writer);
2657        assert_eq!(content.len(), writer.len());
2658        assert_eq!(content.len(), 20_000_000);
2659
2660        #[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
2661        {
2662            let mut response_data_stream = bucket.get_object_stream(remote_path).await.unwrap();
2663
2664            let mut bytes = vec![];
2665
2666            while let Some(chunk) = response_data_stream.bytes().next().await {
2667                bytes.push(chunk)
2668            }
2669            assert_ne!(bytes.len(), 0);
2670        }
2671
2672        let response_data = bucket.delete_object(remote_path).await.unwrap();
2673        assert_eq!(response_data.status_code(), 204);
2674        std::fs::remove_file(local_path).unwrap_or_else(|_| {});
2675    }
2676
2677    #[ignore]
2678    #[maybe_async::test(
2679        feature = "sync",
2680        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
2681        async(
2682            all(not(feature = "sync"), feature = "with-async-std"),
2683            async_std::test
2684        )
2685    )]
2686    async fn streaming_aws_put_head_get_delete_object() {
2687        streaming_test_put_get_delete_small_object(test_aws_bucket()).await;
2688    }
2689
2690    #[ignore]
2691    #[maybe_async::test(
2692        feature = "sync",
2693        async(
2694            all(
2695                not(feature = "sync"),
2696                not(feature = "tokio-rustls-tls"),
2697                feature = "with-tokio"
2698            ),
2699            tokio::test
2700        ),
2701        async(
2702            all(not(feature = "sync"), feature = "with-async-std"),
2703            async_std::test
2704        )
2705    )]
2706    async fn streaming_gc_put_head_get_delete_object() {
2707        streaming_test_put_get_delete_small_object(test_gc_bucket()).await;
2708    }
2709
2710    #[ignore]
2711    #[maybe_async::test(
2712        feature = "sync",
2713        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
2714        async(
2715            all(not(feature = "sync"), feature = "with-async-std"),
2716            async_std::test
2717        )
2718    )]
2719    async fn streaming_r2_put_head_get_delete_object() {
2720        streaming_test_put_get_delete_small_object(test_r2_bucket()).await;
2721    }
2722
2723    #[ignore]
2724    #[maybe_async::test(
2725        feature = "sync",
2726        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
2727        async(
2728            all(not(feature = "sync"), feature = "with-async-std"),
2729            async_std::test
2730        )
2731    )]
2732    async fn streaming_minio_put_head_get_delete_object() {
2733        streaming_test_put_get_delete_small_object(test_minio_bucket()).await;
2734    }
2735
2736    #[maybe_async::maybe_async]
2737    async fn streaming_test_put_get_delete_small_object(bucket: Bucket) {
2738        init();
2739        let remote_path = "+stream_test_small";
2740        let content: Vec<u8> = object(1000);
2741        #[cfg(feature = "with-tokio")]
2742        let mut reader = std::io::Cursor::new(&content);
2743        #[cfg(feature = "with-async-std")]
2744        let mut reader = async_std::io::Cursor::new(&content);
2745
2746        let code = bucket
2747            .put_object_stream(&mut reader, remote_path)
2748            .await
2749            .unwrap();
2750        assert_eq!(code, 200);
2751        let mut writer = Vec::new();
2752        let code = bucket
2753            .get_object_to_writer(remote_path, &mut writer)
2754            .await
2755            .unwrap();
2756        assert_eq!(code, 200);
2757        assert_eq!(content, writer);
2758
2759        let response_data = bucket.delete_object(remote_path).await.unwrap();
2760        assert_eq!(response_data.status_code(), 204);
2761    }
2762
2763    #[cfg(feature = "blocking")]
2764    fn put_head_get_list_delete_object_blocking(bucket: Bucket) {
2765        let s3_path = "/test_blocking.file";
2766        let s3_path_2 = "/test_blocking.file2";
2767        let s3_path_3 = "/test_blocking.file3";
2768        let test: Vec<u8> = object(3072);
2769
2770        // Test PutObject
2771        let response_data = bucket.put_object_blocking(s3_path, &test).unwrap();
2772        assert_eq!(response_data.status_code(), 200);
2773
2774        // Test GetObject
2775        let response_data = bucket.get_object_blocking(s3_path).unwrap();
2776        assert_eq!(response_data.status_code(), 200);
2777        assert_eq!(test, response_data.as_slice());
2778
2779        // Test GetObject with a range
2780        let response_data = bucket
2781            .get_object_range_blocking(s3_path, 100, Some(1000))
2782            .unwrap();
2783        assert_eq!(response_data.status_code(), 206);
2784        assert_eq!(test[100..1001].to_vec(), response_data.as_slice());
2785
2786        // Test HeadObject
2787        let (head_object_result, code) = bucket.head_object_blocking(s3_path).unwrap();
2788        assert_eq!(code, 200);
2789        assert_eq!(
2790            head_object_result.content_type.unwrap(),
2791            "application/octet-stream".to_owned()
2792        );
2793        // println!("{:?}", head_object_result);
2794
2795        // Put some additional objects, so that we can test ListObjects
2796        let response_data = bucket.put_object_blocking(s3_path_2, &test).unwrap();
2797        assert_eq!(response_data.status_code(), 200);
2798        let response_data = bucket.put_object_blocking(s3_path_3, &test).unwrap();
2799        assert_eq!(response_data.status_code(), 200);
2800
2801        // Test ListObjects, with continuation
2802        let (result, code) = bucket
2803            .list_page_blocking(
2804                "test_blocking.".to_string(),
2805                Some("/".to_string()),
2806                None,
2807                None,
2808                Some(2),
2809            )
2810            .unwrap();
2811        assert_eq!(code, 200);
2812        assert_eq!(result.contents.len(), 2);
2813        assert_eq!(result.contents[0].key, s3_path[1..]);
2814        assert_eq!(result.contents[1].key, s3_path_2[1..]);
2815
2816        let cont_token = result.next_continuation_token.unwrap();
2817
2818        let (result, code) = bucket
2819            .list_page_blocking(
2820                "test_blocking.".to_string(),
2821                Some("/".to_string()),
2822                Some(cont_token),
2823                None,
2824                Some(2),
2825            )
2826            .unwrap();
2827        assert_eq!(code, 200);
2828        assert_eq!(result.contents.len(), 1);
2829        assert_eq!(result.contents[0].key, s3_path_3[1..]);
2830        assert!(result.next_continuation_token.is_none());
2831
2832        // cleanup (and test Delete)
2833        let response_data = bucket.delete_object_blocking(s3_path).unwrap();
2834        assert_eq!(code, 200);
2835        let response_data = bucket.delete_object_blocking(s3_path_2).unwrap();
2836        assert_eq!(code, 200);
2837        let response_data = bucket.delete_object_blocking(s3_path_3).unwrap();
2838        assert_eq!(code, 200);
2839    }
2840
2841    #[ignore]
2842    #[cfg(all(
2843        any(feature = "with-tokio", feature = "with-async-std"),
2844        feature = "blocking"
2845    ))]
2846    #[test]
2847    fn aws_put_head_get_delete_object_blocking() {
2848        put_head_get_list_delete_object_blocking(test_aws_bucket())
2849    }
2850
2851    #[ignore]
2852    #[cfg(all(
2853        any(feature = "with-tokio", feature = "with-async-std"),
2854        feature = "blocking"
2855    ))]
2856    #[test]
2857    fn gc_put_head_get_delete_object_blocking() {
2858        put_head_get_list_delete_object_blocking(test_gc_bucket())
2859    }
2860
2861    #[ignore]
2862    #[cfg(all(
2863        any(feature = "with-tokio", feature = "with-async-std"),
2864        feature = "blocking"
2865    ))]
2866    #[test]
2867    fn wasabi_put_head_get_delete_object_blocking() {
2868        put_head_get_list_delete_object_blocking(test_wasabi_bucket())
2869    }
2870
2871    #[ignore]
2872    #[cfg(all(
2873        any(feature = "with-tokio", feature = "with-async-std"),
2874        feature = "blocking"
2875    ))]
2876    #[test]
2877    fn minio_put_head_get_delete_object_blocking() {
2878        put_head_get_list_delete_object_blocking(test_minio_bucket())
2879    }
2880
2881    #[ignore]
2882    #[cfg(all(
2883        any(feature = "with-tokio", feature = "with-async-std"),
2884        feature = "blocking"
2885    ))]
2886    #[test]
2887    fn digital_ocean_put_head_get_delete_object_blocking() {
2888        put_head_get_list_delete_object_blocking(test_digital_ocean_bucket())
2889    }
2890
2891    #[ignore]
2892    #[maybe_async::test(
2893        feature = "sync",
2894        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
2895        async(
2896            all(not(feature = "sync"), feature = "with-async-std"),
2897            async_std::test
2898        )
2899    )]
2900    async fn aws_put_head_get_delete_object() {
2901        put_head_get_delete_object(test_aws_bucket(), true).await;
2902    }
2903
2904    #[ignore]
2905    #[maybe_async::test(
2906        feature = "sync",
2907        async(
2908            all(
2909                not(any(feature = "sync", feature = "tokio-rustls-tls")),
2910                feature = "with-tokio"
2911            ),
2912            tokio::test
2913        ),
2914        async(
2915            all(not(feature = "sync"), feature = "with-async-std"),
2916            async_std::test
2917        )
2918    )]
2919    async fn gc_test_put_head_get_delete_object() {
2920        put_head_get_delete_object(test_gc_bucket(), true).await;
2921    }
2922
2923    #[ignore]
2924    #[maybe_async::test(
2925        feature = "sync",
2926        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
2927        async(
2928            all(not(feature = "sync"), feature = "with-async-std"),
2929            async_std::test
2930        )
2931    )]
2932    async fn wasabi_test_put_head_get_delete_object() {
2933        put_head_get_delete_object(test_wasabi_bucket(), true).await;
2934    }
2935
2936    #[ignore]
2937    #[maybe_async::test(
2938        feature = "sync",
2939        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
2940        async(
2941            all(not(feature = "sync"), feature = "with-async-std"),
2942            async_std::test
2943        )
2944    )]
2945    async fn minio_test_put_head_get_delete_object() {
2946        put_head_get_delete_object(test_minio_bucket(), true).await;
2947    }
2948
2949    // Keeps failing on tokio-rustls-tls
2950    // #[ignore]
2951    // #[maybe_async::test(
2952    //     feature = "sync",
2953    //     async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
2954    //     async(
2955    //         all(not(feature = "sync"), feature = "with-async-std"),
2956    //         async_std::test
2957    //     )
2958    // )]
2959    // async fn digital_ocean_test_put_head_get_delete_object() {
2960    //     put_head_get_delete_object(test_digital_ocean_bucket(), true).await;
2961    // }
2962
2963    #[ignore]
2964    #[maybe_async::test(
2965        feature = "sync",
2966        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
2967        async(
2968            all(not(feature = "sync"), feature = "with-async-std"),
2969            async_std::test
2970        )
2971    )]
2972    async fn r2_test_put_head_get_delete_object() {
2973        put_head_get_delete_object(test_r2_bucket(), false).await;
2974    }
2975
2976    #[test]
2977    #[ignore]
2978    fn test_presign_put() {
2979        let s3_path = "/test/test.file";
2980        let bucket = test_aws_bucket();
2981
2982        let mut custom_headers = HeaderMap::new();
2983        custom_headers.insert(
2984            HeaderName::from_static("custom_header"),
2985            "custom_value".parse().unwrap(),
2986        );
2987
2988        let url = bucket
2989            .presign_put(s3_path, 86400, Some(custom_headers))
2990            .unwrap();
2991
2992        assert!(url.contains("host%3Bcustom_header"));
2993        assert!(url.contains("/test/test.file"))
2994    }
2995
2996    #[test]
2997    #[ignore]
2998    fn test_presign_get() {
2999        let s3_path = "/test/test.file";
3000        let bucket = test_aws_bucket();
3001
3002        let url = bucket.presign_get(s3_path, 86400, None).unwrap();
3003        assert!(url.contains("/test/test.file?"))
3004    }
3005
3006    #[test]
3007    #[ignore]
3008    fn test_presign_delete() {
3009        let s3_path = "/test/test.file";
3010        let bucket = test_aws_bucket();
3011
3012        let url = bucket.presign_delete(s3_path, 86400).unwrap();
3013        assert!(url.contains("/test/test.file?"))
3014    }
3015
3016    #[maybe_async::test(
3017        feature = "sync",
3018        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3019        async(
3020            all(not(feature = "sync"), feature = "with-async-std"),
3021            async_std::test
3022        )
3023    )]
3024    #[ignore]
3025    async fn test_bucket_create_delete_default_region() {
3026        let config = BucketConfiguration::default();
3027        let response = Bucket::create(
3028            &uuid::Uuid::new_v4().to_string(),
3029            "us-east-1".parse().unwrap(),
3030            test_aws_credentials(),
3031            config,
3032        )
3033        .await
3034        .unwrap();
3035
3036        assert_eq!(&response.response_text, "");
3037
3038        assert_eq!(response.response_code, 200);
3039
3040        let response_code = response.bucket.delete().await.unwrap();
3041        assert!(response_code < 300);
3042    }
3043
3044    #[ignore]
3045    #[maybe_async::test(
3046        feature = "sync",
3047        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3048        async(
3049            all(not(feature = "sync"), feature = "with-async-std"),
3050            async_std::test
3051        )
3052    )]
3053    async fn test_bucket_create_delete_non_default_region() {
3054        let config = BucketConfiguration::default();
3055        let response = Bucket::create(
3056            &uuid::Uuid::new_v4().to_string(),
3057            "eu-central-1".parse().unwrap(),
3058            test_aws_credentials(),
3059            config,
3060        )
3061        .await
3062        .unwrap();
3063
3064        assert_eq!(&response.response_text, "");
3065
3066        assert_eq!(response.response_code, 200);
3067
3068        let response_code = response.bucket.delete().await.unwrap();
3069        assert!(response_code < 300);
3070    }
3071
3072    #[ignore]
3073    #[maybe_async::test(
3074        feature = "sync",
3075        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3076        async(
3077            all(not(feature = "sync"), feature = "with-async-std"),
3078            async_std::test
3079        )
3080    )]
3081    async fn test_bucket_create_delete_non_default_region_public() {
3082        let config = BucketConfiguration::public();
3083        let response = Bucket::create(
3084            &uuid::Uuid::new_v4().to_string(),
3085            "eu-central-1".parse().unwrap(),
3086            test_aws_credentials(),
3087            config,
3088        )
3089        .await
3090        .unwrap();
3091
3092        assert_eq!(&response.response_text, "");
3093
3094        assert_eq!(response.response_code, 200);
3095
3096        let response_code = response.bucket.delete().await.unwrap();
3097        assert!(response_code < 300);
3098    }
3099
3100    #[test]
3101    fn test_tag_has_key_and_value_functions() {
3102        let key = "key".to_owned();
3103        let value = "value".to_owned();
3104        let tag = Tag { key, value };
3105        assert_eq!["key", tag.key()];
3106        assert_eq!["value", tag.value()];
3107    }
3108
3109    #[test]
3110    #[ignore]
3111    fn test_builder_composition() {
3112        use std::time::Duration;
3113
3114        let bucket = Bucket::new(
3115            "test-bucket",
3116            "eu-central-1".parse().unwrap(),
3117            test_aws_credentials(),
3118        )
3119        .unwrap()
3120        .with_request_timeout(Duration::from_secs(10));
3121
3122        assert_eq!(bucket.request_timeout(), Some(Duration::from_secs(10)));
3123    }
3124}