1#[cfg(feature = "blocking")]
37use block_on_proc::block_on;
38#[cfg(feature = "tags")]
39use minidom::Element;
40use std::collections::HashMap;
41use std::time::Duration;
42
43use crate::bucket_ops::{BucketConfiguration, CreateBucketResponse};
44use crate::command::{Command, Multipart};
45use crate::creds::Credentials;
46use crate::region::Region;
47#[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
48use crate::request::ResponseDataStream;
49#[cfg(feature = "with-tokio")]
50use crate::request::tokio_backend::ClientOptions;
51#[cfg(feature = "with-tokio")]
52use crate::request::tokio_backend::client;
53use crate::request::{Request as _, ResponseData};
54use std::str::FromStr;
55use std::sync::Arc;
56
57#[cfg(feature = "with-tokio")]
58use tokio::sync::RwLock;
59
60#[cfg(feature = "with-async-std")]
61use async_std::sync::RwLock;
62
63#[cfg(feature = "sync")]
64use std::sync::RwLock;
65
66pub type Query = HashMap<String, String>;
67
68#[cfg(feature = "with-async-std")]
69use crate::request::async_std_backend::SurfRequest as RequestImpl;
70#[cfg(feature = "with-tokio")]
71use crate::request::tokio_backend::ReqwestRequest as RequestImpl;
72
73#[cfg(feature = "with-async-std")]
74use async_std::io::Write as AsyncWrite;
75#[cfg(feature = "with-tokio")]
76use tokio::io::AsyncWrite;
77
78#[cfg(feature = "sync")]
79use crate::request::blocking::AttoRequest as RequestImpl;
80use std::io::Read;
81
82#[cfg(feature = "with-tokio")]
83use tokio::io::AsyncRead;
84
85#[cfg(feature = "with-async-std")]
86use async_std::io::Read as AsyncRead;
87
88use crate::PostPolicy;
89use crate::error::S3Error;
90use crate::post_policy::PresignedPost;
91use crate::serde_types::{
92 BucketLifecycleConfiguration, BucketLocationResult, CompleteMultipartUploadData,
93 CorsConfiguration, GetObjectAttributesOutput, HeadObjectResult,
94 InitiateMultipartUploadResponse, ListBucketResult, ListMultipartUploadsResult, Part,
95};
96#[allow(unused_imports)]
97use crate::utils::{PutStreamResponse, error_from_response_data};
98use http::HeaderMap;
99use http::header::HeaderName;
100#[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
101use sysinfo::{MemoryRefreshKind, System};
102
103pub const CHUNK_SIZE: usize = 8_388_608; const DEFAULT_REQUEST_TIMEOUT: Option<Duration> = Some(Duration::from_secs(60));
106
107#[derive(Debug, PartialEq, Eq)]
108pub struct Tag {
109 key: String,
110 value: String,
111}
112
113impl Tag {
114 pub fn key(&self) -> String {
115 self.key.to_owned()
116 }
117
118 pub fn value(&self) -> String {
119 self.value.to_owned()
120 }
121}
122
123#[derive(Clone, Debug)]
138pub struct Bucket {
139 pub name: String,
140 pub region: Region,
141 credentials: Arc<RwLock<Credentials>>,
142 pub extra_headers: HeaderMap,
143 pub extra_query: Query,
144 pub request_timeout: Option<Duration>,
145 path_style: bool,
146 listobjects_v2: bool,
147 #[cfg(feature = "with-tokio")]
148 http_client: reqwest::Client,
149 #[cfg(feature = "with-tokio")]
150 client_options: crate::request::tokio_backend::ClientOptions,
151}
152
153impl Bucket {
154 #[maybe_async::async_impl]
155 pub async fn credentials_refresh(&self) -> Result<(), S3Error> {
157 Ok(self.credentials.write().await.refresh()?)
158 }
159
160 #[maybe_async::sync_impl]
161 pub fn credentials_refresh(&self) -> Result<(), S3Error> {
163 match self.credentials.write() {
164 Ok(mut credentials) => Ok(credentials.refresh()?),
165 Err(_) => Err(S3Error::CredentialsWriteLock),
166 }
167 }
168
169 #[cfg(feature = "with-tokio")]
170 pub fn http_client(&self) -> reqwest::Client {
171 self.http_client.clone()
172 }
173}
174
175fn validate_expiry(expiry_secs: u32) -> Result<(), S3Error> {
176 if 604800 < expiry_secs {
177 return Err(S3Error::MaxExpiry(expiry_secs));
178 }
179 Ok(())
180}
181
182#[cfg_attr(all(feature = "with-tokio", feature = "blocking"), block_on("tokio"))]
183#[cfg_attr(
184 all(feature = "with-async-std", feature = "blocking"),
185 block_on("async-std")
186)]
187impl Bucket {
188 #[maybe_async::maybe_async]
216 pub async fn presign_get<S: AsRef<str>>(
217 &self,
218 path: S,
219 expiry_secs: u32,
220 custom_queries: Option<HashMap<String, String>>,
221 ) -> Result<String, S3Error> {
222 validate_expiry(expiry_secs)?;
223 let request = RequestImpl::new(
224 self,
225 path.as_ref(),
226 Command::PresignGet {
227 expiry_secs,
228 custom_queries,
229 },
230 )
231 .await?;
232 request.presigned().await
233 }
234
235 #[maybe_async::maybe_async]
262 #[allow(clippy::needless_lifetimes)]
263 pub async fn presign_post<'a>(
264 &self,
265 post_policy: PostPolicy<'a>,
266 ) -> Result<PresignedPost, S3Error> {
267 post_policy.sign(Box::new(self.clone())).await
268 }
269
270 #[maybe_async::maybe_async]
298 pub async fn presign_put<S: AsRef<str>>(
299 &self,
300 path: S,
301 expiry_secs: u32,
302 custom_headers: Option<HeaderMap>,
303 custom_queries: Option<HashMap<String, String>>,
304 ) -> Result<String, S3Error> {
305 validate_expiry(expiry_secs)?;
306 let request = RequestImpl::new(
307 self,
308 path.as_ref(),
309 Command::PresignPut {
310 expiry_secs,
311 custom_headers,
312 custom_queries,
313 },
314 )
315 .await?;
316 request.presigned().await
317 }
318
319 #[maybe_async::maybe_async]
340 pub async fn presign_delete<S: AsRef<str>>(
341 &self,
342 path: S,
343 expiry_secs: u32,
344 ) -> Result<String, S3Error> {
345 validate_expiry(expiry_secs)?;
346 let request =
347 RequestImpl::new(self, path.as_ref(), Command::PresignDelete { expiry_secs }).await?;
348 request.presigned().await
349 }
350
351 #[maybe_async::maybe_async]
384 pub async fn create(
385 name: &str,
386 region: Region,
387 credentials: Credentials,
388 config: BucketConfiguration,
389 ) -> Result<CreateBucketResponse, S3Error> {
390 let mut config = config;
391
392 let skip_constraint = std::env::var("RUST_S3_SKIP_LOCATION_CONSTRAINT")
396 .unwrap_or_default()
397 .to_lowercase();
398
399 if skip_constraint != "true" && skip_constraint != "1" {
400 config.set_region(region.clone());
401 }
402
403 let command = Command::CreateBucket { config };
404 let bucket = Bucket::new(name, region, credentials)?;
405 let request = RequestImpl::new(&bucket, "", command).await?;
406 let response_data = request.response_data(false).await?;
407 let response_text = response_data.as_str()?;
408 Ok(CreateBucketResponse {
409 bucket,
410 response_text: response_text.to_string(),
411 response_code: response_data.status_code(),
412 })
413 }
414
415 #[maybe_async::maybe_async]
449 pub async fn list_buckets(
450 region: Region,
451 credentials: Credentials,
452 ) -> Result<crate::bucket_ops::ListBucketsResponse, S3Error> {
453 let dummy_bucket = Bucket::new("", region, credentials)?.with_path_style();
454 dummy_bucket._list_buckets().await
455 }
456
457 #[maybe_async::maybe_async]
460 async fn _list_buckets(&self) -> Result<crate::bucket_ops::ListBucketsResponse, S3Error> {
461 let request = RequestImpl::new(self, "", Command::ListBuckets).await?;
462 let response = request.response_data(false).await?;
463
464 Ok(quick_xml::de::from_str::<
465 crate::bucket_ops::ListBucketsResponse,
466 >(response.as_str()?)?)
467 }
468
469 #[maybe_async::maybe_async]
501 pub async fn exists(&self) -> Result<bool, S3Error> {
502 let mut dummy_bucket = self.clone();
503 dummy_bucket.name = "".into();
504
505 let response = dummy_bucket._list_buckets().await?;
506
507 Ok(response
508 .bucket_names()
509 .collect::<std::collections::HashSet<String>>()
510 .contains(&self.name))
511 }
512
513 #[maybe_async::maybe_async]
546 pub async fn create_with_path_style(
547 name: &str,
548 region: Region,
549 credentials: Credentials,
550 config: BucketConfiguration,
551 ) -> Result<CreateBucketResponse, S3Error> {
552 let mut config = config;
553
554 let skip_constraint = std::env::var("RUST_S3_SKIP_LOCATION_CONSTRAINT")
558 .unwrap_or_default()
559 .to_lowercase();
560
561 if skip_constraint != "true" && skip_constraint != "1" {
562 config.set_region(region.clone());
563 }
564
565 let command = Command::CreateBucket { config };
566 let bucket = Bucket::new(name, region, credentials)?.with_path_style();
567 let request = RequestImpl::new(&bucket, "", command).await?;
568 let response_data = request.response_data(false).await?;
569 let response_text = response_data.to_string()?;
570
571 Ok(CreateBucketResponse {
572 bucket,
573 response_text,
574 response_code: response_data.status_code(),
575 })
576 }
577
578 #[maybe_async::maybe_async]
609 pub async fn delete(&self) -> Result<u16, S3Error> {
610 let command = Command::DeleteBucket;
611 let request = RequestImpl::new(self, "", command).await?;
612 let response_data = request.response_data(false).await?;
613 Ok(response_data.status_code())
614 }
615
616 pub fn new(
631 name: &str,
632 region: Region,
633 credentials: Credentials,
634 ) -> Result<Box<Bucket>, S3Error> {
635 #[cfg(feature = "with-tokio")]
636 let options = ClientOptions::default();
637
638 Ok(Box::new(Bucket {
639 name: name.into(),
640 region,
641 credentials: Arc::new(RwLock::new(credentials)),
642 extra_headers: HeaderMap::new(),
643 extra_query: HashMap::new(),
644 request_timeout: DEFAULT_REQUEST_TIMEOUT,
645 path_style: false,
646 listobjects_v2: true,
647 #[cfg(feature = "with-tokio")]
648 http_client: client(&options)?,
649 #[cfg(feature = "with-tokio")]
650 client_options: options,
651 }))
652 }
653
654 pub fn new_public(name: &str, region: Region) -> Result<Bucket, S3Error> {
666 #[cfg(feature = "with-tokio")]
667 let options = ClientOptions::default();
668
669 Ok(Bucket {
670 name: name.into(),
671 region,
672 credentials: Arc::new(RwLock::new(Credentials::anonymous()?)),
673 extra_headers: HeaderMap::new(),
674 extra_query: HashMap::new(),
675 request_timeout: DEFAULT_REQUEST_TIMEOUT,
676 path_style: false,
677 listobjects_v2: true,
678 #[cfg(feature = "with-tokio")]
679 http_client: client(&options)?,
680 #[cfg(feature = "with-tokio")]
681 client_options: options,
682 })
683 }
684
685 pub fn with_path_style(&self) -> Box<Bucket> {
686 Box::new(Bucket {
687 name: self.name.clone(),
688 region: self.region.clone(),
689 credentials: self.credentials.clone(),
690 extra_headers: self.extra_headers.clone(),
691 extra_query: self.extra_query.clone(),
692 request_timeout: self.request_timeout,
693 path_style: true,
694 listobjects_v2: self.listobjects_v2,
695 #[cfg(feature = "with-tokio")]
696 http_client: self.http_client(),
697 #[cfg(feature = "with-tokio")]
698 client_options: self.client_options.clone(),
699 })
700 }
701
702 pub fn with_extra_headers(&self, extra_headers: HeaderMap) -> Result<Bucket, S3Error> {
703 Ok(Bucket {
704 name: self.name.clone(),
705 region: self.region.clone(),
706 credentials: self.credentials.clone(),
707 extra_headers,
708 extra_query: self.extra_query.clone(),
709 request_timeout: self.request_timeout,
710 path_style: self.path_style,
711 listobjects_v2: self.listobjects_v2,
712 #[cfg(feature = "with-tokio")]
713 http_client: self.http_client(),
714 #[cfg(feature = "with-tokio")]
715 client_options: self.client_options.clone(),
716 })
717 }
718
719 pub fn with_extra_query(
720 &self,
721 extra_query: HashMap<String, String>,
722 ) -> Result<Bucket, S3Error> {
723 Ok(Bucket {
724 name: self.name.clone(),
725 region: self.region.clone(),
726 credentials: self.credentials.clone(),
727 extra_headers: self.extra_headers.clone(),
728 extra_query,
729 request_timeout: self.request_timeout,
730 path_style: self.path_style,
731 listobjects_v2: self.listobjects_v2,
732 #[cfg(feature = "with-tokio")]
733 http_client: self.http_client(),
734 #[cfg(feature = "with-tokio")]
735 client_options: self.client_options.clone(),
736 })
737 }
738
739 #[cfg(not(feature = "with-tokio"))]
740 pub fn with_request_timeout(&self, request_timeout: Duration) -> Result<Box<Bucket>, S3Error> {
741 Ok(Box::new(Bucket {
742 name: self.name.clone(),
743 region: self.region.clone(),
744 credentials: self.credentials.clone(),
745 extra_headers: self.extra_headers.clone(),
746 extra_query: self.extra_query.clone(),
747 request_timeout: Some(request_timeout),
748 path_style: self.path_style,
749 listobjects_v2: self.listobjects_v2,
750 }))
751 }
752
753 #[cfg(feature = "with-tokio")]
754 pub fn with_request_timeout(&self, request_timeout: Duration) -> Result<Box<Bucket>, S3Error> {
755 let options = ClientOptions {
756 request_timeout: Some(request_timeout),
757 ..Default::default()
758 };
759
760 Ok(Box::new(Bucket {
761 name: self.name.clone(),
762 region: self.region.clone(),
763 credentials: self.credentials.clone(),
764 extra_headers: self.extra_headers.clone(),
765 extra_query: self.extra_query.clone(),
766 request_timeout: Some(request_timeout),
767 path_style: self.path_style,
768 listobjects_v2: self.listobjects_v2,
769 #[cfg(feature = "with-tokio")]
770 http_client: client(&options)?,
771 #[cfg(feature = "with-tokio")]
772 client_options: options,
773 }))
774 }
775
776 pub fn with_listobjects_v1(&self) -> Bucket {
777 Bucket {
778 name: self.name.clone(),
779 region: self.region.clone(),
780 credentials: self.credentials.clone(),
781 extra_headers: self.extra_headers.clone(),
782 extra_query: self.extra_query.clone(),
783 request_timeout: self.request_timeout,
784 path_style: self.path_style,
785 listobjects_v2: false,
786 #[cfg(feature = "with-tokio")]
787 http_client: self.http_client(),
788 #[cfg(feature = "with-tokio")]
789 client_options: self.client_options.clone(),
790 }
791 }
792
793 #[cfg(any(feature = "tokio-native-tls", feature = "tokio-rustls-tls"))]
826 pub fn set_dangereous_config(
827 &self,
828 accept_invalid_certs: bool,
829 accept_invalid_hostnames: bool,
830 ) -> Result<Bucket, S3Error> {
831 let mut options = self.client_options.clone();
832 options.accept_invalid_certs = accept_invalid_certs;
833 options.accept_invalid_hostnames = accept_invalid_hostnames;
834
835 Ok(Bucket {
836 name: self.name.clone(),
837 region: self.region.clone(),
838 credentials: self.credentials.clone(),
839 extra_headers: self.extra_headers.clone(),
840 extra_query: self.extra_query.clone(),
841 request_timeout: self.request_timeout,
842 path_style: self.path_style,
843 listobjects_v2: self.listobjects_v2,
844 http_client: client(&options)?,
845 client_options: options,
846 })
847 }
848
849 #[cfg(feature = "with-tokio")]
850 pub fn set_proxy(&self, proxy: reqwest::Proxy) -> Result<Bucket, S3Error> {
851 let mut options = self.client_options.clone();
852 options.proxy = Some(proxy);
853
854 Ok(Bucket {
855 name: self.name.clone(),
856 region: self.region.clone(),
857 credentials: self.credentials.clone(),
858 extra_headers: self.extra_headers.clone(),
859 extra_query: self.extra_query.clone(),
860 request_timeout: self.request_timeout,
861 path_style: self.path_style,
862 listobjects_v2: self.listobjects_v2,
863 http_client: client(&options)?,
864 client_options: options,
865 })
866 }
867
868 #[maybe_async::maybe_async]
896 pub async fn copy_object_internal<F: AsRef<str>, T: AsRef<str>>(
897 &self,
898 from: F,
899 to: T,
900 ) -> Result<u16, S3Error> {
901 let fq_from = {
902 let from = from.as_ref();
903 let from = from.strip_prefix('/').unwrap_or(from);
904 format!("{bucket}/{path}", bucket = self.name(), path = from)
905 };
906 self.copy_object(fq_from, to).await
907 }
908
909 #[maybe_async::maybe_async]
910 async fn copy_object<F: AsRef<str>, T: AsRef<str>>(
911 &self,
912 from: F,
913 to: T,
914 ) -> Result<u16, S3Error> {
915 let command = Command::CopyObject {
916 from: from.as_ref(),
917 };
918 let request = RequestImpl::new(self, to.as_ref(), command).await?;
919 let response_data = request.response_data(false).await?;
920 Ok(response_data.status_code())
921 }
922
923 #[maybe_async::maybe_async]
955 pub async fn get_object<S: AsRef<str>>(&self, path: S) -> Result<ResponseData, S3Error> {
956 let command = Command::GetObject;
957 let request = RequestImpl::new(self, path.as_ref(), command).await?;
958 request.response_data(false).await
959 }
960
961 #[maybe_async::maybe_async]
962 pub async fn get_object_attributes<S: AsRef<str>>(
963 &self,
964 path: S,
965 expected_bucket_owner: &str,
966 version_id: Option<String>,
967 ) -> Result<GetObjectAttributesOutput, S3Error> {
968 let command = Command::GetObjectAttributes {
969 expected_bucket_owner: expected_bucket_owner.to_string(),
970 version_id,
971 };
972 let request = RequestImpl::new(self, path.as_ref(), command).await?;
973
974 let response = request.response_data(false).await?;
975
976 Ok(quick_xml::de::from_str::<GetObjectAttributesOutput>(
977 response.as_str()?,
978 )?)
979 }
980
981 #[maybe_async::maybe_async]
1024 pub async fn object_exists<S: AsRef<str>>(&self, path: S) -> Result<bool, S3Error> {
1025 let command = Command::HeadObject;
1026 let request = RequestImpl::new(self, path.as_ref(), command).await?;
1027 let response_data = match request.response_data(false).await {
1028 Ok(response_data) => response_data,
1029 Err(S3Error::HttpFailWithBody(status_code, error)) => {
1030 if status_code == 404 {
1031 return Ok(false);
1032 }
1033 return Err(S3Error::HttpFailWithBody(status_code, error));
1034 }
1035 Err(e) => return Err(e),
1036 };
1037 Ok(response_data.status_code() != 404)
1038 }
1039
1040 #[maybe_async::maybe_async]
1041 pub async fn put_bucket_cors(
1042 &self,
1043 expected_bucket_owner: &str,
1044 cors_config: &CorsConfiguration,
1045 ) -> Result<ResponseData, S3Error> {
1046 let command = Command::PutBucketCors {
1047 expected_bucket_owner: expected_bucket_owner.to_string(),
1048 configuration: cors_config.clone(),
1049 };
1050 let request = RequestImpl::new(self, "", command).await?;
1051 request.response_data(false).await
1052 }
1053
1054 #[maybe_async::maybe_async]
1055 pub async fn get_bucket_cors(
1056 &self,
1057 expected_bucket_owner: &str,
1058 ) -> Result<CorsConfiguration, S3Error> {
1059 let command = Command::GetBucketCors {
1060 expected_bucket_owner: expected_bucket_owner.to_string(),
1061 };
1062 let request = RequestImpl::new(self, "", command).await?;
1063 let response = request.response_data(false).await?;
1064 Ok(quick_xml::de::from_str::<CorsConfiguration>(
1065 response.as_str()?,
1066 )?)
1067 }
1068
1069 #[maybe_async::maybe_async]
1070 pub async fn delete_bucket_cors(
1071 &self,
1072 expected_bucket_owner: &str,
1073 ) -> Result<ResponseData, S3Error> {
1074 let command = Command::DeleteBucketCors {
1075 expected_bucket_owner: expected_bucket_owner.to_string(),
1076 };
1077 let request = RequestImpl::new(self, "", command).await?;
1078 request.response_data(false).await
1079 }
1080
1081 #[maybe_async::maybe_async]
1082 pub async fn get_bucket_lifecycle(&self) -> Result<BucketLifecycleConfiguration, S3Error> {
1083 let request = RequestImpl::new(self, "", Command::GetBucketLifecycle).await?;
1084 let response = request.response_data(false).await?;
1085 Ok(quick_xml::de::from_str::<BucketLifecycleConfiguration>(
1086 response.as_str()?,
1087 )?)
1088 }
1089
1090 #[maybe_async::maybe_async]
1091 pub async fn put_bucket_lifecycle(
1092 &self,
1093 lifecycle_config: BucketLifecycleConfiguration,
1094 ) -> Result<ResponseData, S3Error> {
1095 let command = Command::PutBucketLifecycle {
1096 configuration: lifecycle_config,
1097 };
1098 let request = RequestImpl::new(self, "", command).await?;
1099 request.response_data(false).await
1100 }
1101
1102 #[maybe_async::maybe_async]
1103 pub async fn delete_bucket_lifecycle(&self) -> Result<ResponseData, S3Error> {
1104 let request = RequestImpl::new(self, "", Command::DeleteBucketLifecycle).await?;
1105 request.response_data(false).await
1106 }
1107
1108 #[maybe_async::maybe_async]
1140 pub async fn get_object_torrent<S: AsRef<str>>(
1141 &self,
1142 path: S,
1143 ) -> Result<ResponseData, S3Error> {
1144 let command = Command::GetObjectTorrent;
1145 let request = RequestImpl::new(self, path.as_ref(), command).await?;
1146 request.response_data(false).await
1147 }
1148
1149 #[maybe_async::maybe_async]
1182 pub async fn get_object_range<S: AsRef<str>>(
1183 &self,
1184 path: S,
1185 start: u64,
1186 end: Option<u64>,
1187 ) -> Result<ResponseData, S3Error> {
1188 if let Some(end) = end {
1189 assert!(start <= end);
1190 }
1191
1192 let command = Command::GetObjectRange { start, end };
1193 let request = RequestImpl::new(self, path.as_ref(), command).await?;
1194 request.response_data(false).await
1195 }
1196
1197 #[maybe_async::async_impl]
1238 pub async fn get_object_range_to_writer<T, S>(
1239 &self,
1240 path: S,
1241 start: u64,
1242 end: Option<u64>,
1243 writer: &mut T,
1244 ) -> Result<u16, S3Error>
1245 where
1246 T: AsyncWrite + Send + Unpin + ?Sized,
1247 S: AsRef<str>,
1248 {
1249 if let Some(end) = end {
1250 assert!(start <= end);
1251 }
1252
1253 let command = Command::GetObjectRange { start, end };
1254 let request = RequestImpl::new(self, path.as_ref(), command).await?;
1255 request.response_data_to_writer(writer).await
1256 }
1257
1258 #[maybe_async::sync_impl]
1259 pub fn get_object_range_to_writer<T: std::io::Write + Send + ?Sized, S: AsRef<str>>(
1260 &self,
1261 path: S,
1262 start: u64,
1263 end: Option<u64>,
1264 writer: &mut T,
1265 ) -> Result<u16, S3Error> {
1266 if let Some(end) = end {
1267 assert!(start <= end);
1268 }
1269
1270 let command = Command::GetObjectRange { start, end };
1271 let request = RequestImpl::new(self, path.as_ref(), command)?;
1272 request.response_data_to_writer(writer)
1273 }
1274
1275 #[maybe_async::async_impl]
1313 pub async fn get_object_to_writer<T: AsyncWrite + Send + Unpin + ?Sized, S: AsRef<str>>(
1314 &self,
1315 path: S,
1316 writer: &mut T,
1317 ) -> Result<u16, S3Error> {
1318 let command = Command::GetObject;
1319 let request = RequestImpl::new(self, path.as_ref(), command).await?;
1320 request.response_data_to_writer(writer).await
1321 }
1322
1323 #[maybe_async::sync_impl]
1324 pub fn get_object_to_writer<T: std::io::Write + Send + ?Sized, S: AsRef<str>>(
1325 &self,
1326 path: S,
1327 writer: &mut T,
1328 ) -> Result<u16, S3Error> {
1329 let command = Command::GetObject;
1330 let request = RequestImpl::new(self, path.as_ref(), command)?;
1331 request.response_data_to_writer(writer)
1332 }
1333
1334 #[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
1376 pub async fn get_object_stream<S: AsRef<str>>(
1377 &self,
1378 path: S,
1379 ) -> Result<ResponseDataStream, S3Error> {
1380 let command = Command::GetObject;
1381 let request = RequestImpl::new(self, path.as_ref(), command).await?;
1382 request.response_data_to_stream().await
1383 }
1384
1385 #[maybe_async::async_impl]
1432 pub async fn put_object_stream<R: AsyncRead + Unpin + ?Sized>(
1433 &self,
1434 reader: &mut R,
1435 s3_path: impl AsRef<str>,
1436 ) -> Result<PutStreamResponse, S3Error> {
1437 self._put_object_stream_with_content_type(
1438 reader,
1439 s3_path.as_ref(),
1440 "application/octet-stream",
1441 )
1442 .await
1443 }
1444
1445 #[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
1478 pub fn put_object_stream_builder<S: AsRef<str>>(
1479 &self,
1480 path: S,
1481 ) -> crate::put_object_request::PutObjectStreamRequest<'_> {
1482 crate::put_object_request::PutObjectStreamRequest::new(self, path)
1483 }
1484
1485 #[maybe_async::sync_impl]
1486 pub fn put_object_stream<R: Read>(
1487 &self,
1488 reader: &mut R,
1489 s3_path: impl AsRef<str>,
1490 ) -> Result<u16, S3Error> {
1491 self._put_object_stream_with_content_type(
1492 reader,
1493 s3_path.as_ref(),
1494 "application/octet-stream",
1495 )
1496 }
1497
1498 #[maybe_async::async_impl]
1549 pub async fn put_object_stream_with_content_type<R: AsyncRead + Unpin>(
1550 &self,
1551 reader: &mut R,
1552 s3_path: impl AsRef<str>,
1553 content_type: impl AsRef<str>,
1554 ) -> Result<PutStreamResponse, S3Error> {
1555 self._put_object_stream_with_content_type(reader, s3_path.as_ref(), content_type.as_ref())
1556 .await
1557 }
1558
1559 #[maybe_async::sync_impl]
1560 pub fn put_object_stream_with_content_type<R: Read>(
1561 &self,
1562 reader: &mut R,
1563 s3_path: impl AsRef<str>,
1564 content_type: impl AsRef<str>,
1565 ) -> Result<u16, S3Error> {
1566 self._put_object_stream_with_content_type(reader, s3_path.as_ref(), content_type.as_ref())
1567 }
1568
1569 #[maybe_async::async_impl]
1570 async fn make_multipart_request(
1571 &self,
1572 path: &str,
1573 chunk: Vec<u8>,
1574 part_number: u32,
1575 upload_id: &str,
1576 content_type: &str,
1577 ) -> Result<ResponseData, S3Error> {
1578 let command = Command::PutObject {
1579 content: &chunk,
1580 multipart: Some(Multipart::new(part_number, upload_id)), custom_headers: None,
1582 content_type,
1583 };
1584 let request = RequestImpl::new(self, path, command).await?;
1585 request.response_data(true).await
1586 }
1587
1588 #[maybe_async::async_impl]
1589 async fn _put_object_stream_with_content_type<R: AsyncRead + Unpin + ?Sized>(
1590 &self,
1591 reader: &mut R,
1592 s3_path: &str,
1593 content_type: &str,
1594 ) -> Result<PutStreamResponse, S3Error> {
1595 self._put_object_stream_with_content_type_and_headers(reader, s3_path, content_type, None)
1596 .await
1597 }
1598
1599 #[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
1602 fn calculate_max_concurrent_chunks() -> usize {
1603 let mut system = System::new();
1605 system.refresh_memory_specifics(MemoryRefreshKind::everything());
1606
1607 let available_memory = system.available_memory();
1609
1610 if available_memory == 0 {
1612 return 3;
1613 }
1614
1615 let safety_factor = 3;
1619 let memory_per_chunk = CHUNK_SIZE as u64 * safety_factor;
1620
1621 let calculated_chunks = (available_memory / memory_per_chunk) as usize;
1623
1624 calculated_chunks.clamp(2, 100)
1628 }
1629
1630 #[maybe_async::async_impl]
1631 pub(crate) async fn _put_object_stream_with_content_type_and_headers<
1632 R: AsyncRead + Unpin + ?Sized,
1633 >(
1634 &self,
1635 reader: &mut R,
1636 s3_path: &str,
1637 content_type: &str,
1638 custom_headers: Option<http::HeaderMap>,
1639 ) -> Result<PutStreamResponse, S3Error> {
1640 let first_chunk = crate::utils::read_chunk_async(reader).await?;
1643 if first_chunk.len() < CHUNK_SIZE {
1645 let total_size = first_chunk.len();
1646 let mut builder = self
1648 .put_object_builder(s3_path, first_chunk.as_slice())
1649 .with_content_type(content_type);
1650
1651 if let Some(headers) = custom_headers {
1653 builder = builder.with_headers(headers);
1654 }
1655
1656 let response_data = builder.execute().await?;
1657 if response_data.status_code() >= 300 {
1658 return Err(error_from_response_data(response_data)?);
1659 }
1660 return Ok(PutStreamResponse::new(
1661 response_data.status_code(),
1662 total_size,
1663 ));
1664 }
1665
1666 let msg = self
1667 .initiate_multipart_upload(s3_path, content_type)
1668 .await?;
1669 let path = msg.key;
1670 let upload_id = &msg.upload_id;
1671
1672 let max_concurrent_chunks = Self::calculate_max_concurrent_chunks();
1674
1675 use futures_util::FutureExt;
1677 use futures_util::stream::{FuturesUnordered, StreamExt};
1678
1679 let mut part_number: u32 = 0;
1680 let mut total_size = 0;
1681 let mut etags = Vec::new();
1682 let mut active_uploads: FuturesUnordered<
1683 futures_util::future::BoxFuture<'_, (u32, Result<ResponseData, S3Error>)>,
1684 > = FuturesUnordered::new();
1685 let mut reading_done = false;
1686
1687 part_number += 1;
1689 total_size += first_chunk.len();
1690 if first_chunk.len() < CHUNK_SIZE {
1691 reading_done = true;
1692 }
1693
1694 let path_clone = path.clone();
1695 let upload_id_clone = upload_id.clone();
1696 let content_type_clone = content_type.to_string();
1697 let bucket_clone = self.clone();
1698
1699 active_uploads.push(
1700 async move {
1701 let result = bucket_clone
1702 .make_multipart_request(
1703 &path_clone,
1704 first_chunk,
1705 1,
1706 &upload_id_clone,
1707 &content_type_clone,
1708 )
1709 .await;
1710 (1, result)
1711 }
1712 .boxed(),
1713 );
1714
1715 while !active_uploads.is_empty() || !reading_done {
1717 while active_uploads.len() < max_concurrent_chunks && !reading_done {
1719 let chunk = crate::utils::read_chunk_async(reader).await?;
1720 let chunk_len = chunk.len();
1721
1722 if chunk_len == 0 {
1723 reading_done = true;
1724 break;
1725 }
1726
1727 total_size += chunk_len;
1728 part_number += 1;
1729
1730 if chunk_len < CHUNK_SIZE {
1731 reading_done = true;
1732 }
1733
1734 let current_part = part_number;
1735 let path_clone = path.clone();
1736 let upload_id_clone = upload_id.clone();
1737 let content_type_clone = content_type.to_string();
1738 let bucket_clone = self.clone();
1739
1740 active_uploads.push(
1741 async move {
1742 let result = bucket_clone
1743 .make_multipart_request(
1744 &path_clone,
1745 chunk,
1746 current_part,
1747 &upload_id_clone,
1748 &content_type_clone,
1749 )
1750 .await;
1751 (current_part, result)
1752 }
1753 .boxed(),
1754 );
1755 }
1756
1757 if let Some((part_num, result)) = active_uploads.next().await {
1759 let response_data = result?;
1760 if !(200..300).contains(&response_data.status_code()) {
1761 match self.abort_upload(&path, upload_id).await {
1763 Ok(_) => {
1764 return Err(error_from_response_data(response_data)?);
1765 }
1766 Err(error) => {
1767 return Err(error);
1768 }
1769 }
1770 }
1771
1772 let etag = response_data.as_str()?;
1773 etags.push((part_num, etag.to_string()));
1775 }
1776 }
1777
1778 etags.sort_by_key(|k| k.0);
1780 let etags: Vec<String> = etags.into_iter().map(|(_, etag)| etag).collect();
1781
1782 let inner_data = etags
1784 .clone()
1785 .into_iter()
1786 .enumerate()
1787 .map(|(i, x)| Part {
1788 etag: x,
1789 part_number: i as u32 + 1,
1790 })
1791 .collect::<Vec<Part>>();
1792 let response_data = self
1793 .complete_multipart_upload(&path, &msg.upload_id, inner_data)
1794 .await?;
1795
1796 Ok(PutStreamResponse::new(
1797 response_data.status_code(),
1798 total_size,
1799 ))
1800 }
1801
1802 #[maybe_async::sync_impl]
1803 fn _put_object_stream_with_content_type<R: Read + ?Sized>(
1804 &self,
1805 reader: &mut R,
1806 s3_path: &str,
1807 content_type: &str,
1808 ) -> Result<u16, S3Error> {
1809 let msg = self.initiate_multipart_upload(s3_path, content_type)?;
1810 let path = msg.key;
1811 let upload_id = &msg.upload_id;
1812
1813 let mut part_number: u32 = 0;
1814 let mut etags = Vec::new();
1815 loop {
1816 let chunk = crate::utils::read_chunk(reader)?;
1817
1818 if chunk.len() < CHUNK_SIZE {
1819 if part_number == 0 {
1820 self.abort_upload(&path, upload_id)?;
1822
1823 return Ok(self.put_object(s3_path, chunk.as_slice())?.status_code());
1824 } else {
1825 part_number += 1;
1826 let part = self.put_multipart_chunk(
1827 &chunk,
1828 &path,
1829 part_number,
1830 upload_id,
1831 content_type,
1832 )?;
1833 etags.push(part.etag);
1834 let inner_data = etags
1835 .into_iter()
1836 .enumerate()
1837 .map(|(i, x)| Part {
1838 etag: x,
1839 part_number: i as u32 + 1,
1840 })
1841 .collect::<Vec<Part>>();
1842 return Ok(self
1843 .complete_multipart_upload(&path, upload_id, inner_data)?
1844 .status_code());
1845 }
1847 } else {
1848 part_number += 1;
1849 let part =
1850 self.put_multipart_chunk(&chunk, &path, part_number, upload_id, content_type)?;
1851 etags.push(part.etag.to_string());
1852 }
1853 }
1854 }
1855
1856 #[maybe_async::async_impl]
1858 pub async fn initiate_multipart_upload(
1859 &self,
1860 s3_path: &str,
1861 content_type: &str,
1862 ) -> Result<InitiateMultipartUploadResponse, S3Error> {
1863 let command = Command::InitiateMultipartUpload { content_type };
1864 let request = RequestImpl::new(self, s3_path, command).await?;
1865 let response_data = request.response_data(false).await?;
1866 if response_data.status_code() >= 300 {
1867 return Err(error_from_response_data(response_data)?);
1868 }
1869
1870 let msg: InitiateMultipartUploadResponse =
1871 quick_xml::de::from_str(response_data.as_str()?)?;
1872 Ok(msg)
1873 }
1874
1875 #[maybe_async::sync_impl]
1876 pub fn initiate_multipart_upload(
1877 &self,
1878 s3_path: &str,
1879 content_type: &str,
1880 ) -> Result<InitiateMultipartUploadResponse, S3Error> {
1881 let command = Command::InitiateMultipartUpload { content_type };
1882 let request = RequestImpl::new(self, s3_path, command)?;
1883 let response_data = request.response_data(false)?;
1884 if response_data.status_code() >= 300 {
1885 return Err(error_from_response_data(response_data)?);
1886 }
1887
1888 let msg: InitiateMultipartUploadResponse =
1889 quick_xml::de::from_str(response_data.as_str()?)?;
1890 Ok(msg)
1891 }
1892
1893 #[maybe_async::async_impl]
1895 pub async fn put_multipart_stream<R: Read + Unpin>(
1896 &self,
1897 reader: &mut R,
1898 path: &str,
1899 part_number: u32,
1900 upload_id: &str,
1901 content_type: &str,
1902 ) -> Result<Part, S3Error> {
1903 let chunk = crate::utils::read_chunk(reader)?;
1904 self.put_multipart_chunk(chunk, path, part_number, upload_id, content_type)
1905 .await
1906 }
1907
1908 #[maybe_async::sync_impl]
1909 pub async fn put_multipart_stream<R: Read + Unpin>(
1910 &self,
1911 reader: &mut R,
1912 path: &str,
1913 part_number: u32,
1914 upload_id: &str,
1915 content_type: &str,
1916 ) -> Result<Part, S3Error> {
1917 let chunk = crate::utils::read_chunk(reader)?;
1918 self.put_multipart_chunk(&chunk, path, part_number, upload_id, content_type)
1919 }
1920
1921 #[maybe_async::async_impl]
1923 pub async fn put_multipart_chunk(
1924 &self,
1925 chunk: Vec<u8>,
1926 path: &str,
1927 part_number: u32,
1928 upload_id: &str,
1929 content_type: &str,
1930 ) -> Result<Part, S3Error> {
1931 let command = Command::PutObject {
1932 content: &chunk,
1934 multipart: Some(Multipart::new(part_number, upload_id)), custom_headers: None,
1936 content_type,
1937 };
1938 let request = RequestImpl::new(self, path, command).await?;
1939 let response_data = request.response_data(true).await?;
1940 if !(200..300).contains(&response_data.status_code()) {
1941 match self.abort_upload(path, upload_id).await {
1943 Ok(_) => {
1944 return Err(error_from_response_data(response_data)?);
1945 }
1946 Err(error) => {
1947 return Err(error);
1948 }
1949 }
1950 }
1951 let etag = response_data.as_str()?;
1952 Ok(Part {
1953 etag: etag.to_string(),
1954 part_number,
1955 })
1956 }
1957
1958 #[maybe_async::sync_impl]
1959 pub fn put_multipart_chunk(
1960 &self,
1961 chunk: &[u8],
1962 path: &str,
1963 part_number: u32,
1964 upload_id: &str,
1965 content_type: &str,
1966 ) -> Result<Part, S3Error> {
1967 let command = Command::PutObject {
1968 content: chunk,
1970 multipart: Some(Multipart::new(part_number, upload_id)), custom_headers: None,
1972 content_type,
1973 };
1974 let request = RequestImpl::new(self, path, command)?;
1975 let response_data = request.response_data(true)?;
1976 if !(200..300).contains(&response_data.status_code()) {
1977 match self.abort_upload(path, upload_id) {
1979 Ok(_) => {
1980 return Err(error_from_response_data(response_data)?);
1981 }
1982 Err(error) => {
1983 return Err(error);
1984 }
1985 }
1986 }
1987 let etag = response_data.as_str()?;
1988 Ok(Part {
1989 etag: etag.to_string(),
1990 part_number,
1991 })
1992 }
1993
1994 #[maybe_async::async_impl]
1996 pub async fn complete_multipart_upload(
1997 &self,
1998 path: &str,
1999 upload_id: &str,
2000 parts: Vec<Part>,
2001 ) -> Result<ResponseData, S3Error> {
2002 let data = CompleteMultipartUploadData { parts };
2003 let complete = Command::CompleteMultipartUpload { upload_id, data };
2004 let complete_request = RequestImpl::new(self, path, complete).await?;
2005 complete_request.response_data(false).await
2006 }
2007
2008 #[maybe_async::sync_impl]
2009 pub fn complete_multipart_upload(
2010 &self,
2011 path: &str,
2012 upload_id: &str,
2013 parts: Vec<Part>,
2014 ) -> Result<ResponseData, S3Error> {
2015 let data = CompleteMultipartUploadData { parts };
2016 let complete = Command::CompleteMultipartUpload { upload_id, data };
2017 let complete_request = RequestImpl::new(self, path, complete)?;
2018 complete_request.response_data(false)
2019 }
2020
2021 #[maybe_async::maybe_async]
2054 pub async fn location(&self) -> Result<(Region, u16), S3Error> {
2055 let request = RequestImpl::new(self, "?location", Command::GetBucketLocation).await?;
2056 let response_data = request.response_data(false).await?;
2057 let region_string = String::from_utf8_lossy(response_data.as_slice());
2058 let region = match quick_xml::de::from_reader(region_string.as_bytes()) {
2059 Ok(r) => {
2060 let location_result: BucketLocationResult = r;
2061 location_result.region.parse()?
2062 }
2063 Err(e) => {
2064 if response_data.status_code() == 200 {
2065 Region::Custom {
2066 region: "Custom".to_string(),
2067 endpoint: "".to_string(),
2068 }
2069 } else {
2070 Region::Custom {
2071 region: format!("Error encountered : {}", e),
2072 endpoint: "".to_string(),
2073 }
2074 }
2075 }
2076 };
2077 Ok((region, response_data.status_code()))
2078 }
2079
2080 #[maybe_async::maybe_async]
2113 pub async fn delete_object<S: AsRef<str>>(&self, path: S) -> Result<ResponseData, S3Error> {
2114 let command = Command::DeleteObject;
2115 let request = RequestImpl::new(self, path.as_ref(), command).await?;
2116 request.response_data(false).await
2117 }
2118
2119 #[maybe_async::maybe_async]
2152 pub async fn head_object<S: AsRef<str>>(
2153 &self,
2154 path: S,
2155 ) -> Result<(HeadObjectResult, u16), S3Error> {
2156 let command = Command::HeadObject;
2157 let request = RequestImpl::new(self, path.as_ref(), command).await?;
2158 let (headers, status) = request.response_header().await?;
2159 let header_object = HeadObjectResult::from(&headers);
2160 Ok((header_object, status))
2161 }
2162
2163 #[maybe_async::maybe_async]
2197 pub async fn put_object_with_content_type<S: AsRef<str>>(
2198 &self,
2199 path: S,
2200 content: &[u8],
2201 content_type: &str,
2202 ) -> Result<ResponseData, S3Error> {
2203 let command = Command::PutObject {
2204 content,
2205 content_type,
2206 custom_headers: None,
2207 multipart: None,
2208 };
2209 let request = RequestImpl::new(self, path.as_ref(), command).await?;
2210 request.response_data(true).await
2211 }
2212
2213 #[maybe_async::maybe_async]
2256 pub async fn put_object_with_content_type_and_headers<S: AsRef<str>>(
2257 &self,
2258 path: S,
2259 content: &[u8],
2260 content_type: &str,
2261 custom_headers: Option<HeaderMap>,
2262 ) -> Result<ResponseData, S3Error> {
2263 let command = Command::PutObject {
2264 content,
2265 content_type,
2266 custom_headers,
2267 multipart: None,
2268 };
2269 let request = RequestImpl::new(self, path.as_ref(), command).await?;
2270 request.response_data(true).await
2271 }
2272
2273 #[maybe_async::maybe_async]
2313 pub async fn put_object_with_headers<S: AsRef<str>>(
2314 &self,
2315 path: S,
2316 content: &[u8],
2317 custom_headers: Option<HeaderMap>,
2318 ) -> Result<ResponseData, S3Error> {
2319 self.put_object_with_content_type_and_headers(
2320 path,
2321 content,
2322 "application/octet-stream",
2323 custom_headers,
2324 )
2325 .await
2326 }
2327
2328 #[maybe_async::maybe_async]
2362 pub async fn put_object<S: AsRef<str>>(
2363 &self,
2364 path: S,
2365 content: &[u8],
2366 ) -> Result<ResponseData, S3Error> {
2367 self.put_object_with_content_type(path, content, "application/octet-stream")
2368 .await
2369 }
2370
2371 pub fn put_object_builder<S: AsRef<str>>(
2400 &self,
2401 path: S,
2402 content: &[u8],
2403 ) -> crate::put_object_request::PutObjectRequest<'_> {
2404 crate::put_object_request::PutObjectRequest::new(self, path, content)
2405 }
2406
2407 fn _tags_xml<S: AsRef<str>>(&self, tags: &[(S, S)]) -> String {
2408 let mut s = String::new();
2409 let content = tags
2410 .iter()
2411 .map(|(name, value)| {
2412 format!(
2413 "<Tag><Key>{}</Key><Value>{}</Value></Tag>",
2414 name.as_ref(),
2415 value.as_ref()
2416 )
2417 })
2418 .fold(String::new(), |mut a, b| {
2419 a.push_str(b.as_str());
2420 a
2421 });
2422 s.push_str("<Tagging><TagSet>");
2423 s.push_str(&content);
2424 s.push_str("</TagSet></Tagging>");
2425 s
2426 }
2427
2428 #[maybe_async::maybe_async]
2461 pub async fn put_object_tagging<S: AsRef<str>>(
2462 &self,
2463 path: &str,
2464 tags: &[(S, S)],
2465 ) -> Result<ResponseData, S3Error> {
2466 let content = self._tags_xml(tags);
2467 let command = Command::PutObjectTagging { tags: &content };
2468 let request = RequestImpl::new(self, path, command).await?;
2469 request.response_data(false).await
2470 }
2471
2472 #[maybe_async::maybe_async]
2505 pub async fn delete_object_tagging<S: AsRef<str>>(
2506 &self,
2507 path: S,
2508 ) -> Result<ResponseData, S3Error> {
2509 let command = Command::DeleteObjectTagging;
2510 let request = RequestImpl::new(self, path.as_ref(), command).await?;
2511 request.response_data(false).await
2512 }
2513
2514 #[cfg(feature = "tags")]
2547 #[maybe_async::maybe_async]
2548 pub async fn get_object_tagging<S: AsRef<str>>(
2549 &self,
2550 path: S,
2551 ) -> Result<(Vec<Tag>, u16), S3Error> {
2552 let command = Command::GetObjectTagging {};
2553 let request = RequestImpl::new(self, path.as_ref(), command).await?;
2554 let result = request.response_data(false).await?;
2555
2556 let mut tags = Vec::new();
2557
2558 if result.status_code() == 200 {
2559 let result_string = String::from_utf8_lossy(result.as_slice());
2560
2561 let ns = "http://s3.amazonaws.com/doc/2006-03-01/";
2563 let result_string =
2564 if let Err(minidom::Error::MissingNamespace) = result_string.parse::<Element>() {
2565 result_string
2566 .replace("<Tagging>", &format!("<Tagging xmlns=\"{}\">", ns))
2567 .into()
2568 } else {
2569 result_string
2570 };
2571
2572 if let Ok(tagging) = result_string.parse::<Element>() {
2573 for tag_set in tagging.children() {
2574 if tag_set.is("TagSet", ns) {
2575 for tag in tag_set.children() {
2576 if tag.is("Tag", ns) {
2577 let key = if let Some(element) = tag.get_child("Key", ns) {
2578 element.text()
2579 } else {
2580 "Could not parse Key from Tag".to_string()
2581 };
2582 let value = if let Some(element) = tag.get_child("Value", ns) {
2583 element.text()
2584 } else {
2585 "Could not parse Values from Tag".to_string()
2586 };
2587 tags.push(Tag { key, value });
2588 }
2589 }
2590 }
2591 }
2592 }
2593 }
2594
2595 Ok((tags, result.status_code()))
2596 }
2597
2598 #[maybe_async::maybe_async]
2599 pub async fn list_page(
2600 &self,
2601 prefix: String,
2602 delimiter: Option<String>,
2603 continuation_token: Option<String>,
2604 start_after: Option<String>,
2605 max_keys: Option<usize>,
2606 ) -> Result<(ListBucketResult, u16), S3Error> {
2607 let command = if self.listobjects_v2 {
2608 Command::ListObjectsV2 {
2609 prefix,
2610 delimiter,
2611 continuation_token,
2612 start_after,
2613 max_keys,
2614 }
2615 } else {
2616 Command::ListObjects {
2620 prefix,
2621 delimiter,
2622 marker: std::cmp::max(continuation_token, start_after),
2623 max_keys,
2624 }
2625 };
2626 let request = RequestImpl::new(self, "/", command).await?;
2627 let response_data = request.response_data(false).await?;
2628 let list_bucket_result = quick_xml::de::from_reader(response_data.as_slice())?;
2629
2630 Ok((list_bucket_result, response_data.status_code()))
2631 }
2632
2633 #[maybe_async::maybe_async]
2666 #[allow(clippy::assigning_clones)]
2667 pub async fn list(
2668 &self,
2669 prefix: String,
2670 delimiter: Option<String>,
2671 ) -> Result<Vec<ListBucketResult>, S3Error> {
2672 let the_bucket = self.to_owned();
2673 let mut results = Vec::new();
2674 let mut continuation_token = None;
2675
2676 loop {
2677 let (list_bucket_result, _) = the_bucket
2678 .list_page(
2679 prefix.clone(),
2680 delimiter.clone(),
2681 continuation_token,
2682 None,
2683 None,
2684 )
2685 .await?;
2686 continuation_token = list_bucket_result.next_continuation_token.clone();
2687 results.push(list_bucket_result);
2688 if continuation_token.is_none() {
2689 break;
2690 }
2691 }
2692
2693 Ok(results)
2694 }
2695
2696 #[maybe_async::maybe_async]
2697 pub async fn list_multiparts_uploads_page(
2698 &self,
2699 prefix: Option<&str>,
2700 delimiter: Option<&str>,
2701 key_marker: Option<String>,
2702 max_uploads: Option<usize>,
2703 ) -> Result<(ListMultipartUploadsResult, u16), S3Error> {
2704 let command = Command::ListMultipartUploads {
2705 prefix,
2706 delimiter,
2707 key_marker,
2708 max_uploads,
2709 };
2710 let request = RequestImpl::new(self, "/", command).await?;
2711 let response_data = request.response_data(false).await?;
2712 let list_bucket_result = quick_xml::de::from_reader(response_data.as_slice())?;
2713
2714 Ok((list_bucket_result, response_data.status_code()))
2715 }
2716
2717 #[maybe_async::maybe_async]
2751 #[allow(clippy::assigning_clones)]
2752 pub async fn list_multiparts_uploads(
2753 &self,
2754 prefix: Option<&str>,
2755 delimiter: Option<&str>,
2756 ) -> Result<Vec<ListMultipartUploadsResult>, S3Error> {
2757 let the_bucket = self.to_owned();
2758 let mut results = Vec::new();
2759 let mut next_marker: Option<String> = None;
2760
2761 loop {
2762 let (list_multiparts_uploads_result, _) = the_bucket
2763 .list_multiparts_uploads_page(prefix, delimiter, next_marker, None)
2764 .await?;
2765
2766 let is_truncated = list_multiparts_uploads_result.is_truncated;
2767
2768 next_marker = list_multiparts_uploads_result.next_marker.clone();
2769 results.push(list_multiparts_uploads_result);
2770
2771 if !is_truncated {
2772 break;
2773 }
2774 }
2775
2776 Ok(results)
2777 }
2778
2779 #[maybe_async::maybe_async]
2812 pub async fn abort_upload(&self, key: &str, upload_id: &str) -> Result<(), S3Error> {
2813 let abort = Command::AbortMultipartUpload { upload_id };
2814 let abort_request = RequestImpl::new(self, key, abort).await?;
2815 let response_data = abort_request.response_data(false).await?;
2816
2817 if (200..300).contains(&response_data.status_code()) {
2818 Ok(())
2819 } else {
2820 let utf8_content = String::from_utf8(response_data.as_slice().to_vec())?;
2821 Err(S3Error::HttpFailWithBody(
2822 response_data.status_code(),
2823 utf8_content,
2824 ))
2825 }
2826 }
2827
2828 pub fn is_path_style(&self) -> bool {
2830 self.path_style
2831 }
2832
2833 pub fn is_subdomain_style(&self) -> bool {
2835 !self.path_style
2836 }
2837
2838 pub fn set_path_style(&mut self) {
2840 self.path_style = true;
2841 }
2842
2843 pub fn set_subdomain_style(&mut self) {
2845 self.path_style = false;
2846 }
2847
2848 pub fn set_request_timeout(&mut self, timeout: Option<Duration>) {
2855 self.request_timeout = timeout;
2856 }
2857
2858 pub fn set_listobjects_v1(&mut self) {
2864 self.listobjects_v2 = false;
2865 }
2866
2867 pub fn set_listobjects_v2(&mut self) {
2869 self.listobjects_v2 = true;
2870 }
2871
2872 pub fn name(&self) -> String {
2874 self.name.to_string()
2875 }
2876
2877 pub fn host(&self) -> String {
2879 if self.path_style {
2880 self.path_style_host()
2881 } else {
2882 self.subdomain_style_host()
2883 }
2884 }
2885
2886 pub fn url(&self) -> String {
2887 if self.path_style {
2888 format!(
2889 "{}://{}/{}",
2890 self.scheme(),
2891 self.path_style_host(),
2892 self.name()
2893 )
2894 } else {
2895 format!("{}://{}", self.scheme(), self.subdomain_style_host())
2896 }
2897 }
2898
2899 pub fn path_style_host(&self) -> String {
2901 self.region.host()
2902 }
2903
2904 pub fn subdomain_style_host(&self) -> String {
2905 format!("{}.{}", self.name, self.region.host())
2906 }
2907
2908 pub fn scheme(&self) -> String {
2913 self.region.scheme()
2914 }
2915
2916 pub fn region(&self) -> Region {
2918 self.region.clone()
2919 }
2920
2921 #[maybe_async::maybe_async]
2923 pub async fn access_key(&self) -> Result<Option<String>, S3Error> {
2924 Ok(self.credentials().await?.access_key)
2925 }
2926
2927 #[maybe_async::maybe_async]
2929 pub async fn secret_key(&self) -> Result<Option<String>, S3Error> {
2930 Ok(self.credentials().await?.secret_key)
2931 }
2932
2933 #[maybe_async::maybe_async]
2935 pub async fn security_token(&self) -> Result<Option<String>, S3Error> {
2936 Ok(self.credentials().await?.security_token)
2937 }
2938
2939 #[maybe_async::maybe_async]
2941 pub async fn session_token(&self) -> Result<Option<String>, S3Error> {
2942 Ok(self.credentials().await?.session_token)
2943 }
2944
2945 #[maybe_async::async_impl]
2948 pub async fn credentials(&self) -> Result<Credentials, S3Error> {
2949 Ok(self.credentials.read().await.clone())
2950 }
2951
2952 #[maybe_async::sync_impl]
2953 pub fn credentials(&self) -> Result<Credentials, S3Error> {
2954 match self.credentials.read() {
2955 Ok(credentials) => Ok(credentials.clone()),
2956 Err(_) => Err(S3Error::CredentialsReadLock),
2957 }
2958 }
2959
2960 pub fn set_credentials(&mut self, credentials: Credentials) {
2962 self.credentials = Arc::new(RwLock::new(credentials));
2963 }
2964
2965 pub fn add_header(&mut self, key: &str, value: &str) {
2978 self.extra_headers
2979 .insert(HeaderName::from_str(key).unwrap(), value.parse().unwrap());
2980 }
2981
2982 pub fn extra_headers(&self) -> &HeaderMap {
2984 &self.extra_headers
2985 }
2986
2987 pub fn extra_headers_mut(&mut self) -> &mut HeaderMap {
2990 &mut self.extra_headers
2991 }
2992
2993 pub fn add_query(&mut self, key: &str, value: &str) {
2995 self.extra_query.insert(key.into(), value.into());
2996 }
2997
2998 pub fn extra_query(&self) -> &Query {
3000 &self.extra_query
3001 }
3002
3003 pub fn extra_query_mut(&mut self) -> &mut Query {
3006 &mut self.extra_query
3007 }
3008
3009 pub fn request_timeout(&self) -> Option<Duration> {
3010 self.request_timeout
3011 }
3012}
3013
3014#[cfg(test)]
3015mod test {
3016
3017 use crate::BucketConfiguration;
3018 use crate::Tag;
3019 use crate::creds::Credentials;
3020 use crate::post_policy::{PostPolicyField, PostPolicyValue};
3021 use crate::region::Region;
3022 use crate::serde_types::{
3023 BucketLifecycleConfiguration, CorsConfiguration, CorsRule, Expiration, LifecycleFilter,
3024 LifecycleRule,
3025 };
3026 use crate::{Bucket, PostPolicy};
3027 use http::header::{CACHE_CONTROL, HeaderMap, HeaderName, HeaderValue};
3028 use std::env;
3029
3030 fn init() {
3031 let _ = env_logger::builder().is_test(true).try_init();
3032 }
3033
3034 fn test_aws_credentials() -> Credentials {
3035 Credentials::new(
3036 Some(&env::var("EU_AWS_ACCESS_KEY_ID").unwrap()),
3037 Some(&env::var("EU_AWS_SECRET_ACCESS_KEY").unwrap()),
3038 None,
3039 None,
3040 None,
3041 )
3042 .unwrap()
3043 }
3044
3045 fn test_gc_credentials() -> Credentials {
3046 Credentials::new(
3047 Some(&env::var("GC_ACCESS_KEY_ID").unwrap()),
3048 Some(&env::var("GC_SECRET_ACCESS_KEY").unwrap()),
3049 None,
3050 None,
3051 None,
3052 )
3053 .unwrap()
3054 }
3055
3056 fn test_wasabi_credentials() -> Credentials {
3057 Credentials::new(
3058 Some(&env::var("WASABI_ACCESS_KEY_ID").unwrap()),
3059 Some(&env::var("WASABI_SECRET_ACCESS_KEY").unwrap()),
3060 None,
3061 None,
3062 None,
3063 )
3064 .unwrap()
3065 }
3066
3067 fn test_minio_credentials() -> Credentials {
3068 Credentials::new(
3069 Some(&env::var("MINIO_ACCESS_KEY_ID").unwrap()),
3070 Some(&env::var("MINIO_SECRET_ACCESS_KEY").unwrap()),
3071 None,
3072 None,
3073 None,
3074 )
3075 .unwrap()
3076 }
3077
3078 fn test_digital_ocean_credentials() -> Credentials {
3079 Credentials::new(
3080 Some(&env::var("DIGITAL_OCEAN_ACCESS_KEY_ID").unwrap()),
3081 Some(&env::var("DIGITAL_OCEAN_SECRET_ACCESS_KEY").unwrap()),
3082 None,
3083 None,
3084 None,
3085 )
3086 .unwrap()
3087 }
3088
3089 fn test_r2_credentials() -> Credentials {
3090 Credentials::new(
3091 Some(&env::var("R2_ACCESS_KEY_ID").unwrap()),
3092 Some(&env::var("R2_SECRET_ACCESS_KEY").unwrap()),
3093 None,
3094 None,
3095 None,
3096 )
3097 .unwrap()
3098 }
3099
3100 fn test_aws_bucket() -> Box<Bucket> {
3101 Bucket::new(
3102 "rust-s3-test",
3103 "eu-central-1".parse().unwrap(),
3104 test_aws_credentials(),
3105 )
3106 .unwrap()
3107 }
3108
3109 fn test_wasabi_bucket() -> Box<Bucket> {
3110 Bucket::new(
3111 "rust-s3",
3112 "wa-eu-central-1".parse().unwrap(),
3113 test_wasabi_credentials(),
3114 )
3115 .unwrap()
3116 }
3117
3118 fn test_gc_bucket() -> Box<Bucket> {
3119 let mut bucket = Bucket::new(
3120 "rust-s3",
3121 Region::Custom {
3122 region: "us-east1".to_owned(),
3123 endpoint: "https://storage.googleapis.com".to_owned(),
3124 },
3125 test_gc_credentials(),
3126 )
3127 .unwrap();
3128 bucket.set_listobjects_v1();
3129 bucket
3130 }
3131
3132 fn test_minio_bucket() -> Box<Bucket> {
3133 Bucket::new(
3134 "rust-s3",
3135 Region::Custom {
3136 region: "us-east-1".to_owned(),
3137 endpoint: "http://localhost:9000".to_owned(),
3138 },
3139 test_minio_credentials(),
3140 )
3141 .unwrap()
3142 .with_path_style()
3143 }
3144
3145 #[allow(dead_code)]
3146 fn test_digital_ocean_bucket() -> Box<Bucket> {
3147 Bucket::new("rust-s3", Region::DoFra1, test_digital_ocean_credentials()).unwrap()
3148 }
3149
3150 fn test_r2_bucket() -> Box<Bucket> {
3151 Bucket::new(
3152 "rust-s3",
3153 Region::R2 {
3154 account_id: "f048f3132be36fa1aaa8611992002b3f".to_string(),
3155 },
3156 test_r2_credentials(),
3157 )
3158 .unwrap()
3159 }
3160
3161 fn object(size: u32) -> Vec<u8> {
3162 (0..size).map(|_| 33).collect()
3163 }
3164
3165 #[maybe_async::maybe_async]
3166 async fn put_head_get_delete_object(bucket: Bucket, head: bool) {
3167 let s3_path = "/+test.file";
3168 let non_existant_path = "/+non_existant.file";
3169 let test: Vec<u8> = object(3072);
3170
3171 let response_data = bucket.put_object(s3_path, &test).await.unwrap();
3172 assert_eq!(response_data.status_code(), 200);
3173
3174 let response_data = bucket.get_object(s3_path).await.unwrap();
3180 assert_eq!(response_data.status_code(), 200);
3181 assert_eq!(test, response_data.as_slice());
3182
3183 let exists = bucket.object_exists(s3_path).await.unwrap();
3184 assert!(exists);
3185
3186 let not_exists = bucket.object_exists(non_existant_path).await.unwrap();
3187 assert!(!not_exists);
3188
3189 let response_data = bucket
3190 .get_object_range(s3_path, 100, Some(1000))
3191 .await
3192 .unwrap();
3193 assert_eq!(response_data.status_code(), 206);
3194 assert_eq!(test[100..1001].to_vec(), response_data.as_slice());
3195
3196 let response_data = bucket
3198 .get_object_range(s3_path, 100, Some(100))
3199 .await
3200 .unwrap();
3201 assert_eq!(response_data.status_code(), 206);
3202 assert_eq!(vec![test[100]], response_data.as_slice());
3203
3204 if head {
3205 let (_head_object_result, code) = bucket.head_object(s3_path).await.unwrap();
3206 assert_eq!(code, 200);
3208 }
3209
3210 let response_data = bucket.delete_object(s3_path).await.unwrap();
3212 assert_eq!(response_data.status_code(), 204);
3213 }
3214
3215 #[maybe_async::maybe_async]
3216 async fn put_head_delete_object_with_headers(bucket: Bucket) {
3217 let s3_path = "/+test.file";
3218 let non_existant_path = "/+non_existant.file";
3219 let test: Vec<u8> = object(3072);
3220 let header_value = "max-age=42";
3221
3222 let mut custom_headers = HeaderMap::new();
3223 custom_headers.insert(CACHE_CONTROL, HeaderValue::from_static(header_value));
3224 custom_headers.insert(
3225 HeaderName::from_static("test-key"),
3226 "value".parse().unwrap(),
3227 );
3228
3229 let response_data = bucket
3230 .put_object_with_headers(s3_path, &test, Some(custom_headers.clone()))
3231 .await
3232 .expect("Put object with custom headers failed");
3233 assert_eq!(response_data.status_code(), 200);
3234
3235 let response_data = bucket.get_object(s3_path).await.unwrap();
3236 assert_eq!(response_data.status_code(), 200);
3237 assert_eq!(test, response_data.as_slice());
3238
3239 let exists = bucket.object_exists(s3_path).await.unwrap();
3240 assert!(exists);
3241
3242 let not_exists = bucket.object_exists(non_existant_path).await.unwrap();
3243 assert!(!not_exists);
3244
3245 let response_data = bucket
3246 .get_object_range(s3_path, 100, Some(1000))
3247 .await
3248 .unwrap();
3249 assert_eq!(response_data.status_code(), 206);
3250 assert_eq!(test[100..1001].to_vec(), response_data.as_slice());
3251
3252 let (head_object_result, code) = bucket.head_object(s3_path).await.unwrap();
3253 assert_eq!(code, 200);
3255 assert_eq!(
3256 head_object_result.cache_control,
3257 Some(header_value.to_string())
3258 );
3259
3260 let response_data = bucket.delete_object(s3_path).await.unwrap();
3261 assert_eq!(response_data.status_code(), 204);
3262 }
3263
3264 #[ignore]
3265 #[cfg(feature = "tags")]
3266 #[maybe_async::test(
3267 feature = "sync",
3268 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3269 async(
3270 all(not(feature = "sync"), feature = "with-async-std"),
3271 async_std::test
3272 )
3273 )]
3274 async fn test_tagging_aws() {
3275 let bucket = test_aws_bucket();
3276 let _target_tags = vec![
3277 Tag {
3278 key: "Tag1".to_string(),
3279 value: "Value1".to_string(),
3280 },
3281 Tag {
3282 key: "Tag2".to_string(),
3283 value: "Value2".to_string(),
3284 },
3285 ];
3286 let empty_tags: Vec<Tag> = Vec::new();
3287 let response_data = bucket
3288 .put_object("tagging_test", b"Gimme tags")
3289 .await
3290 .unwrap();
3291 assert_eq!(response_data.status_code(), 200);
3292 let (tags, _code) = bucket.get_object_tagging("tagging_test").await.unwrap();
3293 assert_eq!(tags, empty_tags);
3294 let response_data = bucket
3295 .put_object_tagging("tagging_test", &[("Tag1", "Value1"), ("Tag2", "Value2")])
3296 .await
3297 .unwrap();
3298 assert_eq!(response_data.status_code(), 200);
3299 let (_tags, _code) = bucket.get_object_tagging("tagging_test").await.unwrap();
3301 let _response_data = bucket.delete_object("tagging_test").await.unwrap();
3303 }
3304
3305 #[ignore]
3306 #[cfg(feature = "tags")]
3307 #[maybe_async::test(
3308 feature = "sync",
3309 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3310 async(
3311 all(not(feature = "sync"), feature = "with-async-std"),
3312 async_std::test
3313 )
3314 )]
3315 async fn test_tagging_minio() {
3316 let bucket = test_minio_bucket();
3317 let _target_tags = vec![
3318 Tag {
3319 key: "Tag1".to_string(),
3320 value: "Value1".to_string(),
3321 },
3322 Tag {
3323 key: "Tag2".to_string(),
3324 value: "Value2".to_string(),
3325 },
3326 ];
3327 let empty_tags: Vec<Tag> = Vec::new();
3328 let response_data = bucket
3329 .put_object("tagging_test", b"Gimme tags")
3330 .await
3331 .unwrap();
3332 assert_eq!(response_data.status_code(), 200);
3333 let (tags, _code) = bucket.get_object_tagging("tagging_test").await.unwrap();
3334 assert_eq!(tags, empty_tags);
3335 let response_data = bucket
3336 .put_object_tagging("tagging_test", &[("Tag1", "Value1"), ("Tag2", "Value2")])
3337 .await
3338 .unwrap();
3339 assert_eq!(response_data.status_code(), 200);
3340 let (_tags, _code) = bucket.get_object_tagging("tagging_test").await.unwrap();
3342 let _response_data = bucket.delete_object("tagging_test").await.unwrap();
3344 }
3345
3346 #[ignore]
3347 #[maybe_async::test(
3348 feature = "sync",
3349 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3350 async(
3351 all(not(feature = "sync"), feature = "with-async-std"),
3352 async_std::test
3353 )
3354 )]
3355 async fn streaming_big_aws_put_head_get_delete_object() {
3356 streaming_test_put_get_delete_big_object(*test_aws_bucket()).await;
3357 }
3358
3359 #[ignore]
3360 #[maybe_async::test(
3361 feature = "sync",
3362 async(
3363 all(
3364 not(feature = "sync"),
3365 not(feature = "tokio-rustls-tls"),
3366 feature = "with-tokio"
3367 ),
3368 tokio::test
3369 ),
3370 async(
3371 all(not(feature = "sync"), feature = "with-async-std"),
3372 async_std::test
3373 )
3374 )]
3375 async fn streaming_big_gc_put_head_get_delete_object() {
3376 streaming_test_put_get_delete_big_object(*test_gc_bucket()).await;
3377 }
3378
3379 #[ignore]
3380 #[maybe_async::test(
3381 feature = "sync",
3382 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3383 async(
3384 all(not(feature = "sync"), feature = "with-async-std"),
3385 async_std::test
3386 )
3387 )]
3388 async fn streaming_big_minio_put_head_get_delete_object() {
3389 streaming_test_put_get_delete_big_object(*test_minio_bucket()).await;
3390 }
3391
3392 #[ignore]
3393 #[maybe_async::test(
3394 feature = "sync",
3395 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3396 async(
3397 all(not(feature = "sync"), feature = "with-async-std"),
3398 async_std::test
3399 )
3400 )]
3401 async fn streaming_big_r2_put_head_get_delete_object() {
3402 streaming_test_put_get_delete_big_object(*test_r2_bucket()).await;
3403 }
3404
3405 #[maybe_async::maybe_async]
3407 async fn streaming_test_put_get_delete_big_object(bucket: Bucket) {
3408 #[cfg(feature = "with-async-std")]
3409 use async_std::fs::File;
3410 #[cfg(feature = "with-async-std")]
3411 use async_std::io::WriteExt;
3412 #[cfg(feature = "with-async-std")]
3413 use async_std::stream::StreamExt;
3414 #[cfg(feature = "with-tokio")]
3415 use futures_util::StreamExt;
3416 #[cfg(not(any(feature = "with-tokio", feature = "with-async-std")))]
3417 use std::fs::File;
3418 #[cfg(not(any(feature = "with-tokio", feature = "with-async-std")))]
3419 use std::io::Write;
3420 #[cfg(feature = "with-tokio")]
3421 use tokio::fs::File;
3422 #[cfg(feature = "with-tokio")]
3423 use tokio::io::AsyncWriteExt;
3424
3425 init();
3426 let remote_path = "+stream_test_big";
3427 let local_path = "+stream_test_big";
3428 std::fs::remove_file(remote_path).unwrap_or(());
3429 let content: Vec<u8> = object(20_000_000);
3430
3431 let mut file = File::create(local_path).await.unwrap();
3432 file.write_all(&content).await.unwrap();
3433 file.flush().await.unwrap();
3434 let mut reader = File::open(local_path).await.unwrap();
3435
3436 let response = bucket
3437 .put_object_stream(&mut reader, remote_path)
3438 .await
3439 .unwrap();
3440 #[cfg(not(feature = "sync"))]
3441 assert_eq!(response.status_code(), 200);
3442 #[cfg(feature = "sync")]
3443 assert_eq!(response, 200);
3444 let mut writer = Vec::new();
3445 let code = bucket
3446 .get_object_to_writer(remote_path, &mut writer)
3447 .await
3448 .unwrap();
3449 assert_eq!(code, 200);
3450 assert_eq!(content.len(), writer.len());
3452 assert_eq!(content.len(), 20_000_000);
3453
3454 #[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
3455 {
3456 let mut response_data_stream = bucket.get_object_stream(remote_path).await.unwrap();
3457
3458 let mut bytes = vec![];
3459
3460 while let Some(chunk) = response_data_stream.bytes().next().await {
3461 bytes.push(chunk)
3462 }
3463 assert_ne!(bytes.len(), 0);
3464 }
3465
3466 let response_data = bucket.delete_object(remote_path).await.unwrap();
3467 assert_eq!(response_data.status_code(), 204);
3468 std::fs::remove_file(local_path).unwrap_or(());
3469 }
3470
3471 #[ignore]
3472 #[maybe_async::test(
3473 feature = "sync",
3474 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3475 async(
3476 all(not(feature = "sync"), feature = "with-async-std"),
3477 async_std::test
3478 )
3479 )]
3480 async fn streaming_aws_put_head_get_delete_object() {
3481 streaming_test_put_get_delete_small_object(test_aws_bucket()).await;
3482 }
3483
3484 #[ignore]
3485 #[maybe_async::test(
3486 feature = "sync",
3487 async(
3488 all(
3489 not(feature = "sync"),
3490 not(feature = "tokio-rustls-tls"),
3491 feature = "with-tokio"
3492 ),
3493 tokio::test
3494 ),
3495 async(
3496 all(not(feature = "sync"), feature = "with-async-std"),
3497 async_std::test
3498 )
3499 )]
3500 async fn streaming_gc_put_head_get_delete_object() {
3501 streaming_test_put_get_delete_small_object(test_gc_bucket()).await;
3502 }
3503
3504 #[ignore]
3505 #[maybe_async::test(
3506 feature = "sync",
3507 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3508 async(
3509 all(not(feature = "sync"), feature = "with-async-std"),
3510 async_std::test
3511 )
3512 )]
3513 async fn streaming_r2_put_head_get_delete_object() {
3514 streaming_test_put_get_delete_small_object(test_r2_bucket()).await;
3515 }
3516
3517 #[ignore]
3518 #[maybe_async::test(
3519 feature = "sync",
3520 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3521 async(
3522 all(not(feature = "sync"), feature = "with-async-std"),
3523 async_std::test
3524 )
3525 )]
3526 async fn streaming_minio_put_head_get_delete_object() {
3527 streaming_test_put_get_delete_small_object(test_minio_bucket()).await;
3528 }
3529
3530 #[maybe_async::maybe_async]
3531 async fn streaming_test_put_get_delete_small_object(bucket: Box<Bucket>) {
3532 init();
3533 let remote_path = "+stream_test_small";
3534 let content: Vec<u8> = object(1000);
3535 #[cfg(feature = "with-tokio")]
3536 let mut reader = std::io::Cursor::new(&content);
3537 #[cfg(feature = "with-async-std")]
3538 let mut reader = async_std::io::Cursor::new(&content);
3539 #[cfg(feature = "sync")]
3540 let mut reader = std::io::Cursor::new(&content);
3541
3542 let response = bucket
3543 .put_object_stream(&mut reader, remote_path)
3544 .await
3545 .unwrap();
3546 #[cfg(not(feature = "sync"))]
3547 assert_eq!(response.status_code(), 200);
3548 #[cfg(feature = "sync")]
3549 assert_eq!(response, 200);
3550 let mut writer = Vec::new();
3551 let code = bucket
3552 .get_object_to_writer(remote_path, &mut writer)
3553 .await
3554 .unwrap();
3555 assert_eq!(code, 200);
3556 assert_eq!(content, writer);
3557
3558 let response_data = bucket.delete_object(remote_path).await.unwrap();
3559 assert_eq!(response_data.status_code(), 204);
3560 }
3561
3562 #[cfg(feature = "blocking")]
3563 fn put_head_get_list_delete_object_blocking(bucket: Bucket) {
3564 let s3_path = "/test_blocking.file";
3565 let s3_path_2 = "/test_blocking.file2";
3566 let s3_path_3 = "/test_blocking.file3";
3567 let test: Vec<u8> = object(3072);
3568
3569 let response_data = bucket.put_object_blocking(s3_path, &test).unwrap();
3571 assert_eq!(response_data.status_code(), 200);
3572
3573 let response_data = bucket.get_object_blocking(s3_path).unwrap();
3575 assert_eq!(response_data.status_code(), 200);
3576 assert_eq!(test, response_data.as_slice());
3577
3578 let response_data = bucket
3580 .get_object_range_blocking(s3_path, 100, Some(1000))
3581 .unwrap();
3582 assert_eq!(response_data.status_code(), 206);
3583 assert_eq!(test[100..1001].to_vec(), response_data.as_slice());
3584
3585 let response_data = bucket
3587 .get_object_range_blocking(s3_path, 100, Some(100))
3588 .unwrap();
3589 assert_eq!(response_data.status_code(), 206);
3590 assert_eq!(vec![test[100]], response_data.as_slice());
3591
3592 let (head_object_result, code) = bucket.head_object_blocking(s3_path).unwrap();
3594 assert_eq!(code, 200);
3595 assert_eq!(
3596 head_object_result.content_type.unwrap(),
3597 "application/octet-stream".to_owned()
3598 );
3599 let response_data = bucket.put_object_blocking(s3_path_2, &test).unwrap();
3603 assert_eq!(response_data.status_code(), 200);
3604 let response_data = bucket.put_object_blocking(s3_path_3, &test).unwrap();
3605 assert_eq!(response_data.status_code(), 200);
3606
3607 let (result, code) = bucket
3609 .list_page_blocking(
3610 "test_blocking.".to_string(),
3611 Some("/".to_string()),
3612 None,
3613 None,
3614 Some(2),
3615 )
3616 .unwrap();
3617 assert_eq!(code, 200);
3618 assert_eq!(result.contents.len(), 2);
3619 assert_eq!(result.contents[0].key, s3_path[1..]);
3620 assert_eq!(result.contents[1].key, s3_path_2[1..]);
3621
3622 let cont_token = result.next_continuation_token.unwrap();
3623
3624 let (result, code) = bucket
3625 .list_page_blocking(
3626 "test_blocking.".to_string(),
3627 Some("/".to_string()),
3628 Some(cont_token),
3629 None,
3630 Some(2),
3631 )
3632 .unwrap();
3633 assert_eq!(code, 200);
3634 assert_eq!(result.contents.len(), 1);
3635 assert_eq!(result.contents[0].key, s3_path_3[1..]);
3636 assert!(result.next_continuation_token.is_none());
3637
3638 let response_data = bucket.delete_object_blocking(s3_path).unwrap();
3640 assert_eq!(code, 200);
3641 let response_data = bucket.delete_object_blocking(s3_path_2).unwrap();
3642 assert_eq!(code, 200);
3643 let response_data = bucket.delete_object_blocking(s3_path_3).unwrap();
3644 assert_eq!(code, 200);
3645 }
3646
3647 #[ignore]
3648 #[cfg(all(
3649 any(feature = "with-tokio", feature = "with-async-std"),
3650 feature = "blocking"
3651 ))]
3652 #[test]
3653 fn aws_put_head_get_delete_object_blocking() {
3654 put_head_get_list_delete_object_blocking(*test_aws_bucket())
3655 }
3656
3657 #[ignore]
3658 #[cfg(all(
3659 any(feature = "with-tokio", feature = "with-async-std"),
3660 feature = "blocking"
3661 ))]
3662 #[test]
3663 fn gc_put_head_get_delete_object_blocking() {
3664 put_head_get_list_delete_object_blocking(*test_gc_bucket())
3665 }
3666
3667 #[ignore]
3668 #[cfg(all(
3669 any(feature = "with-tokio", feature = "with-async-std"),
3670 feature = "blocking"
3671 ))]
3672 #[test]
3673 fn wasabi_put_head_get_delete_object_blocking() {
3674 put_head_get_list_delete_object_blocking(*test_wasabi_bucket())
3675 }
3676
3677 #[ignore]
3678 #[cfg(all(
3679 any(feature = "with-tokio", feature = "with-async-std"),
3680 feature = "blocking"
3681 ))]
3682 #[test]
3683 fn minio_put_head_get_delete_object_blocking() {
3684 put_head_get_list_delete_object_blocking(*test_minio_bucket())
3685 }
3686
3687 #[ignore]
3688 #[cfg(all(
3689 any(feature = "with-tokio", feature = "with-async-std"),
3690 feature = "blocking"
3691 ))]
3692 #[test]
3693 fn digital_ocean_put_head_get_delete_object_blocking() {
3694 put_head_get_list_delete_object_blocking(*test_digital_ocean_bucket())
3695 }
3696
3697 #[ignore]
3698 #[maybe_async::test(
3699 feature = "sync",
3700 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3701 async(
3702 all(not(feature = "sync"), feature = "with-async-std"),
3703 async_std::test
3704 )
3705 )]
3706 async fn aws_put_head_get_delete_object() {
3707 put_head_get_delete_object(*test_aws_bucket(), true).await;
3708 put_head_delete_object_with_headers(*test_aws_bucket()).await;
3709 }
3710
3711 #[ignore]
3712 #[maybe_async::test(
3713 feature = "sync",
3714 async(
3715 all(
3716 not(any(feature = "sync", feature = "tokio-rustls-tls")),
3717 feature = "with-tokio"
3718 ),
3719 tokio::test
3720 ),
3721 async(
3722 all(not(feature = "sync"), feature = "with-async-std"),
3723 async_std::test
3724 )
3725 )]
3726 async fn gc_test_put_head_get_delete_object() {
3727 put_head_get_delete_object(*test_gc_bucket(), true).await;
3728 put_head_delete_object_with_headers(*test_gc_bucket()).await;
3729 }
3730
3731 #[ignore]
3732 #[maybe_async::test(
3733 feature = "sync",
3734 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3735 async(
3736 all(not(feature = "sync"), feature = "with-async-std"),
3737 async_std::test
3738 )
3739 )]
3740 async fn wasabi_test_put_head_get_delete_object() {
3741 put_head_get_delete_object(*test_wasabi_bucket(), true).await;
3742 put_head_delete_object_with_headers(*test_wasabi_bucket()).await;
3743 }
3744
3745 #[ignore]
3746 #[maybe_async::test(
3747 feature = "sync",
3748 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3749 async(
3750 all(not(feature = "sync"), feature = "with-async-std"),
3751 async_std::test
3752 )
3753 )]
3754 async fn minio_test_put_head_get_delete_object() {
3755 put_head_get_delete_object(*test_minio_bucket(), true).await;
3756 put_head_delete_object_with_headers(*test_minio_bucket()).await;
3757 }
3758
3759 #[ignore]
3774 #[maybe_async::test(
3775 feature = "sync",
3776 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3777 async(
3778 all(not(feature = "sync"), feature = "with-async-std"),
3779 async_std::test
3780 )
3781 )]
3782 async fn r2_test_put_head_get_delete_object() {
3783 put_head_get_delete_object(*test_r2_bucket(), false).await;
3784 put_head_delete_object_with_headers(*test_r2_bucket()).await;
3785 }
3786
3787 #[maybe_async::test(
3788 feature = "sync",
3789 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3790 async(
3791 all(not(feature = "sync"), feature = "with-async-std"),
3792 async_std::test
3793 )
3794 )]
3795 async fn test_presign_put() {
3796 let s3_path = "/test/test.file";
3797 let bucket = test_minio_bucket();
3798
3799 let mut custom_headers = HeaderMap::new();
3800 custom_headers.insert(
3801 HeaderName::from_static("custom_header"),
3802 "custom_value".parse().unwrap(),
3803 );
3804
3805 let url = bucket
3806 .presign_put(s3_path, 86400, Some(custom_headers), None)
3807 .await
3808 .unwrap();
3809
3810 assert!(url.contains("custom_header%3Bhost"));
3811 assert!(url.contains("/test/test.file"))
3812 }
3813
3814 #[maybe_async::test(
3815 feature = "sync",
3816 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3817 async(
3818 all(not(feature = "sync"), feature = "with-async-std"),
3819 async_std::test
3820 )
3821 )]
3822 async fn test_presign_post() {
3823 use std::borrow::Cow;
3824
3825 let bucket = test_minio_bucket();
3826
3827 let policy = PostPolicy::new(86400)
3829 .condition(
3830 PostPolicyField::Key,
3831 PostPolicyValue::StartsWith(Cow::from("user/user1/")),
3832 )
3833 .unwrap();
3834
3835 let data = bucket.presign_post(policy).await.unwrap();
3836
3837 assert_eq!(data.url, "http://localhost:9000/rust-s3");
3838 assert_eq!(data.fields.len(), 6);
3839 assert_eq!(data.dynamic_fields.len(), 1);
3840 }
3841
3842 #[maybe_async::test(
3843 feature = "sync",
3844 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3845 async(
3846 all(not(feature = "sync"), feature = "with-async-std"),
3847 async_std::test
3848 )
3849 )]
3850 async fn test_presign_get() {
3851 let s3_path = "/test/test.file";
3852 let bucket = test_minio_bucket();
3853
3854 let url = bucket.presign_get(s3_path, 86400, None).await.unwrap();
3855 assert!(url.contains("/test/test.file?"))
3856 }
3857
3858 #[maybe_async::test(
3859 feature = "sync",
3860 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3861 async(
3862 all(not(feature = "sync"), feature = "with-async-std"),
3863 async_std::test
3864 )
3865 )]
3866 async fn test_presign_delete() {
3867 let s3_path = "/test/test.file";
3868 let bucket = test_minio_bucket();
3869
3870 let url = bucket.presign_delete(s3_path, 86400).await.unwrap();
3871 assert!(url.contains("/test/test.file?"))
3872 }
3873
3874 #[maybe_async::test(
3875 feature = "sync",
3876 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3877 async(
3878 all(not(feature = "sync"), feature = "with-async-std"),
3879 async_std::test
3880 )
3881 )]
3882 async fn test_presign_url_standard_ports() {
3883 let region_http_80 = Region::Custom {
3888 region: "eu-central-1".to_owned(),
3889 endpoint: "http://minio:80".to_owned(),
3890 };
3891 let credentials = Credentials::new(
3892 Some("test_access_key"),
3893 Some("test_secret_key"),
3894 None,
3895 None,
3896 None,
3897 )
3898 .unwrap();
3899 let bucket_http_80 = Bucket::new("test-bucket", region_http_80, credentials.clone())
3900 .unwrap()
3901 .with_path_style();
3902
3903 let presigned_url_80 = bucket_http_80
3904 .presign_get("/test.file", 3600, None)
3905 .await
3906 .unwrap();
3907 println!("Presigned URL with port 80: {}", presigned_url_80);
3908
3909 assert!(
3911 presigned_url_80.starts_with("http://minio:80/"),
3912 "URL must preserve port 80, got: {}",
3913 presigned_url_80
3914 );
3915
3916 let region_https_443 = Region::Custom {
3918 region: "eu-central-1".to_owned(),
3919 endpoint: "https://minio:443".to_owned(),
3920 };
3921 let bucket_https_443 = Bucket::new("test-bucket", region_https_443, credentials.clone())
3922 .unwrap()
3923 .with_path_style();
3924
3925 let presigned_url_443 = bucket_https_443
3926 .presign_get("/test.file", 3600, None)
3927 .await
3928 .unwrap();
3929 println!("Presigned URL with port 443: {}", presigned_url_443);
3930
3931 assert!(
3933 presigned_url_443.starts_with("https://minio:443/"),
3934 "URL must preserve port 443, got: {}",
3935 presigned_url_443
3936 );
3937
3938 let region_http_9000 = Region::Custom {
3940 region: "eu-central-1".to_owned(),
3941 endpoint: "http://minio:9000".to_owned(),
3942 };
3943 let bucket_http_9000 = Bucket::new("test-bucket", region_http_9000, credentials)
3944 .unwrap()
3945 .with_path_style();
3946
3947 let presigned_url_9000 = bucket_http_9000
3948 .presign_get("/test.file", 3600, None)
3949 .await
3950 .unwrap();
3951 assert!(
3952 presigned_url_9000.contains("minio:9000"),
3953 "Non-standard port should be preserved in URL"
3954 );
3955 }
3956
3957 #[maybe_async::test(
3958 feature = "sync",
3959 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3960 async(
3961 all(not(feature = "sync"), feature = "with-async-std"),
3962 async_std::test
3963 )
3964 )]
3965 #[ignore]
3966 async fn test_bucket_create_delete_default_region() {
3967 let config = BucketConfiguration::default();
3968 let response = Bucket::create(
3969 &uuid::Uuid::new_v4().to_string(),
3970 "us-east-1".parse().unwrap(),
3971 test_aws_credentials(),
3972 config,
3973 )
3974 .await
3975 .unwrap();
3976
3977 assert_eq!(&response.response_text, "");
3978
3979 assert_eq!(response.response_code, 200);
3980
3981 let response_code = response.bucket.delete().await.unwrap();
3982 assert!(response_code < 300);
3983 }
3984
3985 #[ignore]
3986 #[maybe_async::test(
3987 feature = "sync",
3988 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3989 async(
3990 all(not(feature = "sync"), feature = "with-async-std"),
3991 async_std::test
3992 )
3993 )]
3994 async fn test_bucket_create_delete_non_default_region() {
3995 let config = BucketConfiguration::default();
3996 let response = Bucket::create(
3997 &uuid::Uuid::new_v4().to_string(),
3998 "eu-central-1".parse().unwrap(),
3999 test_aws_credentials(),
4000 config,
4001 )
4002 .await
4003 .unwrap();
4004
4005 assert_eq!(&response.response_text, "");
4006
4007 assert_eq!(response.response_code, 200);
4008
4009 let response_code = response.bucket.delete().await.unwrap();
4010 assert!(response_code < 300);
4011 }
4012
4013 #[ignore]
4014 #[maybe_async::test(
4015 feature = "sync",
4016 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4017 async(
4018 all(not(feature = "sync"), feature = "with-async-std"),
4019 async_std::test
4020 )
4021 )]
4022 async fn test_bucket_create_delete_non_default_region_public() {
4023 let config = BucketConfiguration::public();
4024 let response = Bucket::create(
4025 &uuid::Uuid::new_v4().to_string(),
4026 "eu-central-1".parse().unwrap(),
4027 test_aws_credentials(),
4028 config,
4029 )
4030 .await
4031 .unwrap();
4032
4033 assert_eq!(&response.response_text, "");
4034
4035 assert_eq!(response.response_code, 200);
4036
4037 let response_code = response.bucket.delete().await.unwrap();
4038 assert!(response_code < 300);
4039 }
4040
4041 #[test]
4042 fn test_tag_has_key_and_value_functions() {
4043 let key = "key".to_owned();
4044 let value = "value".to_owned();
4045 let tag = Tag { key, value };
4046 assert_eq!["key", tag.key()];
4047 assert_eq!["value", tag.value()];
4048 }
4049
4050 #[test]
4051 #[ignore]
4052 fn test_builder_composition() {
4053 use std::time::Duration;
4054
4055 let bucket = Bucket::new(
4056 "test-bucket",
4057 "eu-central-1".parse().unwrap(),
4058 test_aws_credentials(),
4059 )
4060 .unwrap()
4061 .with_request_timeout(Duration::from_secs(10))
4062 .unwrap();
4063
4064 assert_eq!(bucket.request_timeout(), Some(Duration::from_secs(10)));
4065 }
4066
4067 #[maybe_async::test(
4068 feature = "sync",
4069 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4070 async(
4071 all(not(feature = "sync"), feature = "with-async-std"),
4072 async_std::test
4073 )
4074 )]
4075 #[ignore]
4076 async fn test_bucket_cors() {
4077 let bucket = test_aws_bucket();
4078 let rule = CorsRule::new(
4079 None,
4080 vec!["GET".to_string()],
4081 vec!["*".to_string()],
4082 None,
4083 None,
4084 None,
4085 );
4086 let expected_bucket_owner = "904662384344";
4087 let cors_config = CorsConfiguration::new(vec![rule]);
4088 let response = bucket
4089 .put_bucket_cors(expected_bucket_owner, &cors_config)
4090 .await
4091 .unwrap();
4092 assert_eq!(response.status_code(), 200);
4093
4094 let cors_response = bucket.get_bucket_cors(expected_bucket_owner).await.unwrap();
4095 assert_eq!(cors_response, cors_config);
4096
4097 let response = bucket
4098 .delete_bucket_cors(expected_bucket_owner)
4099 .await
4100 .unwrap();
4101 assert_eq!(response.status_code(), 204);
4102 }
4103
4104 #[maybe_async::test(
4105 feature = "sync",
4106 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4107 async(
4108 all(not(feature = "sync"), feature = "with-async-std"),
4109 async_std::test
4110 )
4111 )]
4112 #[ignore]
4113 async fn test_bucket_lifecycle() {
4114 let bucket = test_aws_bucket();
4115
4116 let rule = LifecycleRule::builder("Enabled")
4118 .id("test-rule")
4119 .filter(LifecycleFilter {
4120 prefix: Some("test/".to_string()),
4121 ..Default::default()
4122 })
4123 .expiration(Expiration {
4124 days: Some(1),
4125 ..Default::default()
4126 })
4127 .build();
4128
4129 let lifecycle_config = BucketLifecycleConfiguration::new(vec![rule]);
4130
4131 let response = bucket
4133 .put_bucket_lifecycle(lifecycle_config.clone())
4134 .await
4135 .unwrap();
4136 assert_eq!(response.status_code(), 200);
4137
4138 let retrieved_config = bucket.get_bucket_lifecycle().await.unwrap();
4140 assert_eq!(retrieved_config.rules.len(), 1);
4141 assert_eq!(retrieved_config.rules[0].id, Some("test-rule".to_string()));
4142 assert_eq!(retrieved_config.rules[0].status, "Enabled");
4143
4144 let response = bucket.delete_bucket_lifecycle().await.unwrap();
4146 assert_eq!(response.status_code(), 204);
4147 }
4148
4149 #[ignore]
4150 #[cfg(any(feature = "tokio-native-tls", feature = "tokio-rustls-tls"))]
4151 #[maybe_async::test(
4152 feature = "sync",
4153 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4154 async(
4155 all(not(feature = "sync"), feature = "with-async-std"),
4156 async_std::test
4157 )
4158 )]
4159 async fn test_bucket_exists_with_dangerous_config() {
4160 init();
4161
4162 let credentials = test_aws_credentials();
4170 let region = "eu-central-1".parse().unwrap();
4171 let bucket_name = "rust-s3-test";
4172
4173 let bucket = Bucket::new(bucket_name, region, credentials)
4175 .unwrap()
4176 .with_path_style();
4177
4178 let bucket = bucket.set_dangereous_config(true, true).unwrap();
4180
4181 let exists_result = bucket.exists().await;
4184
4185 assert!(
4187 exists_result.is_ok(),
4188 "Bucket::exists() failed with dangerous config"
4189 );
4190 let exists = exists_result.unwrap();
4191 assert!(exists, "Test bucket should exist");
4192
4193 let list_result = bucket.list("".to_string(), Some("/".to_string())).await;
4196 assert!(
4197 list_result.is_ok(),
4198 "List operation should work with dangerous config"
4199 );
4200 }
4201
4202 #[ignore]
4203 #[maybe_async::test(
4204 feature = "sync",
4205 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4206 async(
4207 all(not(feature = "sync"), feature = "with-async-std"),
4208 async_std::test
4209 )
4210 )]
4211 async fn test_bucket_exists_without_dangerous_config() {
4212 init();
4213
4214 let credentials = test_aws_credentials();
4216 let region = "eu-central-1".parse().unwrap();
4217 let bucket_name = "rust-s3-test";
4218
4219 let bucket = Bucket::new(bucket_name, region, credentials)
4221 .unwrap()
4222 .with_path_style();
4223
4224 let exists_result = bucket.exists().await;
4226 assert!(
4227 exists_result.is_ok(),
4228 "Bucket::exists() should work without dangerous config"
4229 );
4230 let exists = exists_result.unwrap();
4231 assert!(exists, "Test bucket should exist");
4232 }
4233}