1#[cfg(feature = "blocking")]
2use block_on_proc::block_on;
3#[cfg(feature = "tags")]
4use minidom::Element;
5use std::collections::HashMap;
6use std::time::Duration;
7
8use crate::bucket_ops::{BucketConfiguration, CreateBucketResponse};
9use crate::command::{Command, Multipart};
10use crate::creds::Credentials;
11use crate::region::Region;
12use crate::request::ResponseData;
13#[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
14use crate::request::ResponseDataStream;
15use std::str::FromStr;
16use std::sync::{Arc, RwLock};
17
18pub type Query = HashMap<String, String>;
19
20#[cfg(feature = "with-async-std")]
21use crate::request::async_std_backend::SurfRequest as RequestImpl;
22#[cfg(feature = "with-tokio")]
23use crate::request::tokio_backend::Reqwest as RequestImpl;
24
25#[cfg(feature = "with-async-std")]
26use futures_io::AsyncWrite;
27#[cfg(feature = "with-tokio")]
28use tokio::io::AsyncWrite;
29
30#[cfg(feature = "sync")]
31use crate::request::blocking::AttoRequest as RequestImpl;
32use std::io::Read;
33
34#[cfg(feature = "with-tokio")]
35use tokio::io::AsyncRead;
36
37#[cfg(feature = "with-async-std")]
38use futures::io::AsyncRead;
39
40use crate::error::S3Error;
41use crate::request::Request;
42use crate::serde_types::{
43 BucketLocationResult, CompleteMultipartUploadData, HeadObjectResult,
44 InitiateMultipartUploadResponse, ListBucketResult, ListMultipartUploadsResult, Part,
45};
46use crate::utils::error_from_response_data;
47use http::header::HeaderName;
48use http::HeaderMap;
49
50pub const CHUNK_SIZE: usize = 8_388_608; const DEFAULT_REQUEST_TIMEOUT: Option<Duration> = Some(Duration::from_secs(60));
53
54#[derive(Debug, PartialEq, Eq)]
55pub struct Tag {
56 key: String,
57 value: String,
58}
59
60impl Tag {
61 pub fn key(&self) -> String {
62 self.key.to_owned()
63 }
64
65 pub fn value(&self) -> String {
66 self.value.to_owned()
67 }
68}
69
70#[derive(Clone, Debug)]
85pub struct Bucket {
86 pub name: String,
87 pub region: Region,
88 pub credentials: Arc<RwLock<Credentials>>,
89 pub extra_headers: HeaderMap,
90 pub extra_query: Query,
91 pub request_timeout: Option<Duration>,
92 path_style: bool,
93 listobjects_v2: bool,
94}
95
96impl Bucket {
97 pub fn credentials_refresh(&self) -> Result<(), S3Error> {
98 Ok(self
99 .credentials
100 .try_write()
101 .map_err(|_| S3Error::WLCredentials)?
102 .refresh()?)
103 }
104}
105
106fn validate_expiry(expiry_secs: u32) -> Result<(), S3Error> {
107 if 604800 < expiry_secs {
108 return Err(S3Error::MaxExpiry(expiry_secs));
109 }
110 Ok(())
111}
112
113#[cfg_attr(all(feature = "with-tokio", feature = "blocking"), block_on("tokio"))]
114#[cfg_attr(
115 all(feature = "with-async-std", feature = "blocking"),
116 block_on("async-std")
117)]
118impl Bucket {
119 pub fn presign_get<S: AsRef<str>>(
144 &self,
145 path: S,
146 expiry_secs: u32,
147 custom_queries: Option<HashMap<String, String>>,
148 ) -> Result<String, S3Error> {
149 validate_expiry(expiry_secs)?;
150 let request = RequestImpl::new(
151 self,
152 path.as_ref(),
153 Command::PresignGet {
154 expiry_secs,
155 custom_queries,
156 },
157 )?;
158 request.presigned()
159 }
160
161 pub fn presign_post<S: AsRef<str>>(
182 &self,
183 path: S,
184 expiry_secs: u32,
185 post_policy: String,
187 ) -> Result<String, S3Error> {
188 validate_expiry(expiry_secs)?;
189 let request = RequestImpl::new(
190 self,
191 path.as_ref(),
192 Command::PresignPost {
193 expiry_secs,
194 post_policy,
195 },
196 )?;
197 request.presigned()
198 }
199
200 pub fn presign_put<S: AsRef<str>>(
226 &self,
227 path: S,
228 expiry_secs: u32,
229 custom_headers: Option<HeaderMap>,
230 ) -> Result<String, S3Error> {
231 validate_expiry(expiry_secs)?;
232 let request = RequestImpl::new(
233 self,
234 path.as_ref(),
235 Command::PresignPut {
236 expiry_secs,
237 custom_headers,
238 },
239 )?;
240 request.presigned()
241 }
242
243 pub fn presign_delete<S: AsRef<str>>(
260 &self,
261 path: S,
262 expiry_secs: u32,
263 ) -> Result<String, S3Error> {
264 validate_expiry(expiry_secs)?;
265 let request =
266 RequestImpl::new(self, path.as_ref(), Command::PresignDelete { expiry_secs })?;
267 request.presigned()
268 }
269
270 #[maybe_async::maybe_async]
303 pub async fn create(
304 name: &str,
305 region: Region,
306 credentials: Credentials,
307 config: BucketConfiguration,
308 ) -> Result<CreateBucketResponse, S3Error> {
309 let mut config = config;
310 config.set_region(region.clone());
311 let command = Command::CreateBucket { config };
312 let bucket = Bucket::new(name, region, credentials)?;
313 let request = RequestImpl::new(&bucket, "", command)?;
314 let response_data = request.response_data(false).await?;
315 let response_text = response_data.as_str()?;
316 Ok(CreateBucketResponse {
317 bucket,
318 response_text: response_text.to_string(),
319 response_code: response_data.status_code(),
320 })
321 }
322
323 #[maybe_async::maybe_async]
356 pub async fn create_with_path_style(
357 name: &str,
358 region: Region,
359 credentials: Credentials,
360 config: BucketConfiguration,
361 ) -> Result<CreateBucketResponse, S3Error> {
362 let mut config = config;
363 config.set_region(region.clone());
364 let command = Command::CreateBucket { config };
365 let bucket = Bucket::new(name, region, credentials)?.with_path_style();
366 let request = RequestImpl::new(&bucket, "", command)?;
367 let response_data = request.response_data(false).await?;
368 let response_text = response_data.to_string()?;
369 Ok(CreateBucketResponse {
370 bucket,
371 response_text,
372 response_code: response_data.status_code(),
373 })
374 }
375
376 #[maybe_async::maybe_async]
407 pub async fn delete(&self) -> Result<u16, S3Error> {
408 let command = Command::DeleteBucket;
409 let request = RequestImpl::new(self, "", command)?;
410 let response_data = request.response_data(false).await?;
411 Ok(response_data.status_code())
412 }
413
414 pub fn new(name: &str, region: Region, credentials: Credentials) -> Result<Bucket, S3Error> {
429 Ok(Bucket {
430 name: name.into(),
431 region,
432 credentials: Arc::new(RwLock::new(credentials)),
433 extra_headers: HeaderMap::new(),
434 extra_query: HashMap::new(),
435 request_timeout: DEFAULT_REQUEST_TIMEOUT,
436 path_style: false,
437 listobjects_v2: true,
438 })
439 }
440
441 pub fn new_public(name: &str, region: Region) -> Result<Bucket, S3Error> {
453 Ok(Bucket {
454 name: name.into(),
455 region,
456 credentials: Arc::new(RwLock::new(Credentials::anonymous()?)),
457 extra_headers: HeaderMap::new(),
458 extra_query: HashMap::new(),
459 request_timeout: DEFAULT_REQUEST_TIMEOUT,
460 path_style: false,
461 listobjects_v2: true,
462 })
463 }
464
465 pub fn with_path_style(&self) -> Bucket {
466 Bucket {
467 name: self.name.clone(),
468 region: self.region.clone(),
469 credentials: self.credentials.clone(),
470 extra_headers: self.extra_headers.clone(),
471 extra_query: self.extra_query.clone(),
472 request_timeout: self.request_timeout,
473 path_style: true,
474 listobjects_v2: self.listobjects_v2,
475 }
476 }
477
478 pub fn with_extra_headers(&self, extra_headers: HeaderMap) -> Bucket {
479 Bucket {
480 name: self.name.clone(),
481 region: self.region.clone(),
482 credentials: self.credentials.clone(),
483 extra_headers,
484 extra_query: self.extra_query.clone(),
485 request_timeout: self.request_timeout,
486 path_style: self.path_style,
487 listobjects_v2: self.listobjects_v2,
488 }
489 }
490
491 pub fn with_extra_query(&self, extra_query: HashMap<String, String>) -> Bucket {
492 Bucket {
493 name: self.name.clone(),
494 region: self.region.clone(),
495 credentials: self.credentials.clone(),
496 extra_headers: self.extra_headers.clone(),
497 extra_query,
498 request_timeout: self.request_timeout,
499 path_style: self.path_style,
500 listobjects_v2: self.listobjects_v2,
501 }
502 }
503
504 pub fn with_request_timeout(&self, request_timeout: Duration) -> Bucket {
505 Bucket {
506 name: self.name.clone(),
507 region: self.region.clone(),
508 credentials: self.credentials.clone(),
509 extra_headers: self.extra_headers.clone(),
510 extra_query: self.extra_query.clone(),
511 request_timeout: Some(request_timeout),
512 path_style: self.path_style,
513 listobjects_v2: self.listobjects_v2,
514 }
515 }
516
517 pub fn with_listobjects_v1(&self) -> Bucket {
518 Bucket {
519 name: self.name.clone(),
520 region: self.region.clone(),
521 credentials: self.credentials.clone(),
522 extra_headers: self.extra_headers.clone(),
523 extra_query: self.extra_query.clone(),
524 request_timeout: self.request_timeout,
525 path_style: self.path_style,
526 listobjects_v2: false,
527 }
528 }
529
530 #[maybe_async::maybe_async]
558 pub async fn copy_object_internal<F: AsRef<str>, T: AsRef<str>>(
559 &self,
560 from: F,
561 to: T,
562 ) -> Result<u16, S3Error> {
563 let fq_from = {
564 let from = from.as_ref();
565 let from = from.strip_prefix('/').unwrap_or(from);
566 format!("{bucket}/{path}", bucket = self.name(), path = from)
567 };
568 self.copy_object(fq_from, to).await
569 }
570
571 #[maybe_async::maybe_async]
572 async fn copy_object<F: AsRef<str>, T: AsRef<str>>(
573 &self,
574 from: F,
575 to: T,
576 ) -> Result<u16, S3Error> {
577 let command = Command::CopyObject {
578 from: from.as_ref(),
579 };
580 let request = RequestImpl::new(self, to.as_ref(), command)?;
581 let response_data = request.response_data(false).await?;
582 Ok(response_data.status_code())
583 }
584
585 #[maybe_async::maybe_async]
617 pub async fn get_object<S: AsRef<str>>(&self, path: S) -> Result<ResponseData, S3Error> {
618 let command = Command::GetObject;
619 let request = RequestImpl::new(self, path.as_ref(), command)?;
620 request.response_data(false).await
621 }
622
623 #[maybe_async::maybe_async]
655 pub async fn get_object_torrent<S: AsRef<str>>(
656 &self,
657 path: S,
658 ) -> Result<ResponseData, S3Error> {
659 let command = Command::GetObjectTorrent;
660 let request = RequestImpl::new(self, path.as_ref(), command)?;
661 request.response_data(false).await
662 }
663
664 #[maybe_async::maybe_async]
697 pub async fn get_object_range<S: AsRef<str>>(
698 &self,
699 path: S,
700 start: u64,
701 end: Option<u64>,
702 ) -> Result<ResponseData, S3Error> {
703 if let Some(end) = end {
704 assert!(start < end);
705 }
706
707 let command = Command::GetObjectRange { start, end };
708 let request = RequestImpl::new(self, path.as_ref(), command)?;
709 request.response_data(false).await
710 }
711
712 #[maybe_async::async_impl]
753 pub async fn get_object_range_to_writer<T: AsyncWrite + Send + Unpin, S: AsRef<str>>(
754 &self,
755 path: S,
756 start: u64,
757 end: Option<u64>,
758 writer: &mut T,
759 ) -> Result<u16, S3Error> {
760 if let Some(end) = end {
761 assert!(start < end);
762 }
763
764 let command = Command::GetObjectRange { start, end };
765 let request = RequestImpl::new(self, path.as_ref(), command)?;
766 request.response_data_to_writer(writer).await
767 }
768
769 #[maybe_async::sync_impl]
770 pub async fn get_object_range_to_writer<T: std::io::Write + Send, S: AsRef<str>>(
771 &self,
772 path: S,
773 start: u64,
774 end: Option<u64>,
775 writer: &mut T,
776 ) -> Result<u16, S3Error> {
777 if let Some(end) = end {
778 assert!(start < end);
779 }
780
781 let command = Command::GetObjectRange { start, end };
782 let request = RequestImpl::new(self, path.as_ref(), command)?;
783 request.response_data_to_writer(writer)
784 }
785
786 #[maybe_async::async_impl]
824 pub async fn get_object_to_writer<T: AsyncWrite + Send + Unpin, S: AsRef<str>>(
825 &self,
826 path: S,
827 writer: &mut T,
828 ) -> Result<u16, S3Error> {
829 let command = Command::GetObject;
830 let request = RequestImpl::new(self, path.as_ref(), command)?;
831 request.response_data_to_writer(writer).await
832 }
833
834 #[maybe_async::sync_impl]
835 pub fn get_object_to_writer<T: std::io::Write + Send, S: AsRef<str>>(
836 &self,
837 path: S,
838 writer: &mut T,
839 ) -> Result<u16, S3Error> {
840 let command = Command::GetObject;
841 let request = RequestImpl::new(self, path.as_ref(), command)?;
842 request.response_data_to_writer(writer)
843 }
844
845 #[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
887 pub async fn get_object_stream<S: AsRef<str>>(
888 &self,
889 path: S,
890 ) -> Result<ResponseDataStream, S3Error> {
891 let command = Command::GetObject;
892 let request = RequestImpl::new(self, path.as_ref(), command)?;
893 request.response_data_to_stream().await
894 }
895
896 #[maybe_async::async_impl]
943 pub async fn put_object_stream<R: AsyncRead + Unpin>(
944 &self,
945 reader: &mut R,
946 s3_path: impl AsRef<str>,
947 ) -> Result<u16, S3Error> {
948 self._put_object_stream_with_content_type(
949 reader,
950 s3_path.as_ref(),
951 "application/octet-stream",
952 )
953 .await
954 }
955
956 #[maybe_async::async_impl]
957 pub async fn mc_put_object_stream<R: AsyncRead + Unpin>(
958 &self,
959 reader: &mut R,
960 s3_path: impl AsRef<str>,
961 ) -> Result<u16, S3Error> {
962 self._put_object_stream_with_content_type_in_sequence(
963 reader,
964 s3_path.as_ref(),
965 "application/octet-stream",
966 )
967 .await
968 }
969
970
971 #[maybe_async::sync_impl]
972 pub fn put_object_stream<R: Read>(
973 &self,
974 reader: &mut R,
975 s3_path: impl AsRef<str>,
976 ) -> Result<u16, S3Error> {
977 self._put_object_stream_with_content_type(
978 reader,
979 s3_path.as_ref(),
980 "application/octet-stream",
981 )
982 }
983
984 #[maybe_async::async_impl]
1035 pub async fn put_object_stream_with_content_type<R: AsyncRead + Unpin>(
1036 &self,
1037 reader: &mut R,
1038 s3_path: impl AsRef<str>,
1039 content_type: impl AsRef<str>,
1040 ) -> Result<u16, S3Error> {
1041 self._put_object_stream_with_content_type(reader, s3_path.as_ref(), content_type.as_ref())
1042 .await
1043 }
1044
1045 #[maybe_async::sync_impl]
1046 pub fn put_object_stream_with_content_type<R: Read>(
1047 &self,
1048 reader: &mut R,
1049 s3_path: impl AsRef<str>,
1050 content_type: impl AsRef<str>,
1051 ) -> Result<u16, S3Error> {
1052 self._put_object_stream_with_content_type(reader, s3_path.as_ref(), content_type.as_ref())
1053 }
1054
1055 #[maybe_async::async_impl]
1056 async fn make_multipart_request(
1057 &self,
1058 path: &str,
1059 chunk: Vec<u8>,
1060 part_number: u32,
1061 upload_id: &str,
1062 content_type: &str,
1063 ) -> Result<ResponseData, S3Error> {
1064 let command = Command::PutObject {
1065 content: &chunk,
1066 multipart: Some(Multipart::new(part_number, upload_id)), content_type,
1068 };
1069 let request = RequestImpl::new(self, path, command)?;
1070 request.response_data(true).await
1071 }
1072
1073 #[maybe_async::async_impl]
1074 async fn _put_object_stream_with_content_type<R: AsyncRead + Unpin>(
1075 &self,
1076 reader: &mut R,
1077 s3_path: &str,
1078 content_type: &str,
1079 ) -> Result<u16, S3Error> {
1080 let first_chunk = crate::utils::read_chunk_async(reader).await?;
1083 if first_chunk.len() < CHUNK_SIZE {
1084 let response_data = self
1085 .put_object_with_content_type(s3_path, first_chunk.as_slice(), content_type)
1086 .await?;
1087 if response_data.status_code() >= 300 {
1088 return Err(error_from_response_data(response_data)?);
1089 }
1090 return Ok(response_data.status_code());
1091 }
1092
1093 let msg = self
1094 .initiate_multipart_upload(s3_path, content_type)
1095 .await?;
1096 let path = msg.key;
1097 let upload_id = &msg.upload_id;
1098
1099 let mut part_number: u32 = 0;
1100 let mut etags = Vec::new();
1101
1102 let mut handles = vec![];
1104 loop {
1105 let chunk = if part_number == 0 {
1106 first_chunk.clone()
1107 } else {
1108 crate::utils::read_chunk_async(reader).await?
1109 };
1110
1111 let done = chunk.len() < CHUNK_SIZE;
1112
1113 part_number += 1;
1115 handles.push(self.make_multipart_request(
1116 &path,
1117 chunk,
1118 part_number,
1119 upload_id,
1120 content_type,
1121 ));
1122
1123 if done {
1124 break;
1125 }
1126 }
1127
1128 let responses = futures::future::join_all(handles).await;
1130
1131 for response in responses {
1132 let response_data = response?;
1133 if !(200..300).contains(&response_data.status_code()) {
1134 match self.abort_upload(&path, upload_id).await {
1136 Ok(_) => {
1137 return Err(error_from_response_data(response_data)?);
1138 }
1139 Err(error) => {
1140 return Err(error);
1141 }
1142 }
1143 }
1144
1145 let etag = response_data.as_str()?;
1146 etags.push(etag.to_string());
1147 }
1148
1149 let inner_data = etags
1151 .clone()
1152 .into_iter()
1153 .enumerate()
1154 .map(|(i, x)| Part {
1155 etag: x,
1156 part_number: i as u32 + 1,
1157 })
1158 .collect::<Vec<Part>>();
1159 let response_data = self
1160 .complete_multipart_upload(&path, &msg.upload_id, inner_data)
1161 .await?;
1162
1163 Ok(response_data.status_code())
1164 }
1165
1166 #[maybe_async::async_impl]
1167 async fn _put_object_stream_with_content_type_in_sequence<R: AsyncRead + Unpin>(
1168 &self,
1169 reader: &mut R,
1170 s3_path: &str,
1171 content_type: &str,
1172 ) -> Result<u16, S3Error> {
1173 let first_chunk = crate::utils::read_chunk_async(reader).await?;
1176 if first_chunk.len() < CHUNK_SIZE {
1177 let response_data = self
1178 .put_object_with_content_type(s3_path, first_chunk.as_slice(), content_type)
1179 .await?;
1180 if response_data.status_code() >= 300 {
1181 return Err(error_from_response_data(response_data)?);
1182 }
1183 return Ok(response_data.status_code());
1184 }
1185
1186 let msg = self
1187 .initiate_multipart_upload(s3_path, content_type)
1188 .await?;
1189 let path = msg.key;
1190 let upload_id = &msg.upload_id;
1191
1192 let mut part_number: u32 = 0;
1193 let mut etags = Vec::new();
1194
1195 let mut responses = vec![];
1197 loop {
1198 let chunk = if part_number == 0 {
1199 first_chunk.clone()
1200 } else {
1201 crate::utils::read_chunk_async(reader).await?
1202 };
1203
1204 let done = chunk.len() < CHUNK_SIZE;
1205
1206 part_number += 1;
1208 responses.push(self.make_multipart_request(
1209 &path,
1210 chunk,
1211 part_number,
1212 upload_id,
1213 content_type,
1214 ).await);
1215
1216 if done {
1217 break;
1218 }
1219 }
1220
1221 for response in responses {
1222 let response_data = response?;
1223 if !(200..300).contains(&response_data.status_code()) {
1224 match self.abort_upload(&path, upload_id).await {
1226 Ok(_) => {
1227 return Err(error_from_response_data(response_data)?);
1228 }
1229 Err(error) => {
1230 return Err(error);
1231 }
1232 }
1233 }
1234
1235 let etag = response_data.as_str()?;
1236 etags.push(etag.to_string());
1237 }
1238
1239 let inner_data = etags
1241 .clone()
1242 .into_iter()
1243 .enumerate()
1244 .map(|(i, x)| Part {
1245 etag: x,
1246 part_number: i as u32 + 1,
1247 })
1248 .collect::<Vec<Part>>();
1249 let response_data = self
1250 .complete_multipart_upload(&path, &msg.upload_id, inner_data)
1251 .await?;
1252
1253 Ok(response_data.status_code())
1254 }
1255
1256 #[maybe_async::sync_impl]
1257 fn _put_object_stream_with_content_type<R: Read>(
1258 &self,
1259 reader: &mut R,
1260 s3_path: &str,
1261 content_type: &str,
1262 ) -> Result<u16, S3Error> {
1263 let msg = self.initiate_multipart_upload(s3_path, content_type)?;
1264 let path = msg.key;
1265 let upload_id = &msg.upload_id;
1266
1267 let mut part_number: u32 = 0;
1268 let mut etags = Vec::new();
1269 loop {
1270 let chunk = crate::utils::read_chunk(reader)?;
1271
1272 if chunk.len() < CHUNK_SIZE {
1273 if part_number == 0 {
1274 self.abort_upload(&path, upload_id)?;
1276
1277 self.put_object(s3_path, chunk.as_slice())?;
1278 } else {
1279 part_number += 1;
1280 let part = self.put_multipart_chunk(
1281 chunk,
1282 &path,
1283 part_number,
1284 upload_id,
1285 content_type,
1286 )?;
1287 etags.push(part.etag);
1288 let inner_data = etags
1289 .into_iter()
1290 .enumerate()
1291 .map(|(i, x)| Part {
1292 etag: x,
1293 part_number: i as u32 + 1,
1294 })
1295 .collect::<Vec<Part>>();
1296 return Ok(self
1297 .complete_multipart_upload(&path, upload_id, inner_data)?
1298 .status_code());
1299 }
1301 } else {
1302 part_number += 1;
1303 let part =
1304 self.put_multipart_chunk(chunk, &path, part_number, upload_id, content_type)?;
1305 etags.push(part.etag.to_string());
1306 }
1307 }
1308 }
1309
1310 #[maybe_async::async_impl]
1312 pub async fn initiate_multipart_upload(
1313 &self,
1314 s3_path: &str,
1315 content_type: &str,
1316 ) -> Result<InitiateMultipartUploadResponse, S3Error> {
1317 let command = Command::InitiateMultipartUpload { content_type };
1318 let request = RequestImpl::new(self, s3_path, command)?;
1319 let response_data = request.response_data(false).await?;
1320 if response_data.status_code() >= 300 {
1321 return Err(error_from_response_data(response_data)?);
1322 }
1323
1324 let msg: InitiateMultipartUploadResponse =
1325 quick_xml::de::from_str(response_data.as_str()?)?;
1326 Ok(msg)
1327 }
1328
1329 #[maybe_async::sync_impl]
1330 pub fn initiate_multipart_upload(
1331 &self,
1332 s3_path: &str,
1333 content_type: &str,
1334 ) -> Result<InitiateMultipartUploadResponse, S3Error> {
1335 let command = Command::InitiateMultipartUpload { content_type };
1336 let request = RequestImpl::new(self, s3_path, command)?;
1337 let response_data = request.response_data(false)?;
1338 if response_data.status_code() >= 300 {
1339 return Err(error_from_response_data(response_data)?);
1340 }
1341
1342 let msg: InitiateMultipartUploadResponse =
1343 quick_xml::de::from_str(response_data.as_str()?)?;
1344 Ok(msg)
1345 }
1346
1347 #[maybe_async::async_impl]
1349 pub async fn put_multipart_stream<R: Read + Unpin>(
1350 &self,
1351 reader: &mut R,
1352 path: &str,
1353 part_number: u32,
1354 upload_id: &str,
1355 content_type: &str,
1356 ) -> Result<Part, S3Error> {
1357 let chunk = crate::utils::read_chunk(reader)?;
1358 self.put_multipart_chunk(chunk, path, part_number, upload_id, content_type)
1359 .await
1360 }
1361
1362 #[maybe_async::sync_impl]
1363 pub async fn put_multipart_stream<R: Read + Unpin>(
1364 &self,
1365 reader: &mut R,
1366 path: &str,
1367 part_number: u32,
1368 upload_id: &str,
1369 content_type: &str,
1370 ) -> Result<Part, S3Error> {
1371 let chunk = crate::utils::read_chunk(reader)?;
1372 self.put_multipart_chunk(chunk, path, part_number, upload_id, content_type)
1373 }
1374
1375 #[maybe_async::async_impl]
1377 pub async fn put_multipart_chunk(
1378 &self,
1379 chunk: Vec<u8>,
1380 path: &str,
1381 part_number: u32,
1382 upload_id: &str,
1383 content_type: &str,
1384 ) -> Result<Part, S3Error> {
1385 let command = Command::PutObject {
1386 content: &chunk,
1388 multipart: Some(Multipart::new(part_number, upload_id)), content_type,
1390 };
1391 let request = RequestImpl::new(self, path, command)?;
1392 let response_data = request.response_data(true).await?;
1393 if !(200..300).contains(&response_data.status_code()) {
1394 match self.abort_upload(path, upload_id).await {
1396 Ok(_) => {
1397 return Err(error_from_response_data(response_data)?);
1398 }
1399 Err(error) => {
1400 return Err(error);
1401 }
1402 }
1403 }
1404 let etag = response_data.as_str()?;
1405 Ok(Part {
1406 etag: etag.to_string(),
1407 part_number,
1408 })
1409 }
1410
1411 #[maybe_async::sync_impl]
1412 pub fn put_multipart_chunk(
1413 &self,
1414 chunk: Vec<u8>,
1415 path: &str,
1416 part_number: u32,
1417 upload_id: &str,
1418 content_type: &str,
1419 ) -> Result<Part, S3Error> {
1420 let command = Command::PutObject {
1421 content: &chunk,
1423 multipart: Some(Multipart::new(part_number, upload_id)), content_type,
1425 };
1426 let request = RequestImpl::new(self, path, command)?;
1427 let response_data = request.response_data(true)?;
1428 if !(200..300).contains(&response_data.status_code()) {
1429 match self.abort_upload(path, upload_id) {
1431 Ok(_) => {
1432 return Err(error_from_response_data(response_data)?);
1433 }
1434 Err(error) => {
1435 return Err(error);
1436 }
1437 }
1438 }
1439 let etag = response_data.as_str()?;
1440 Ok(Part {
1441 etag: etag.to_string(),
1442 part_number,
1443 })
1444 }
1445
1446 #[maybe_async::async_impl]
1448 pub async fn complete_multipart_upload(
1449 &self,
1450 path: &str,
1451 upload_id: &str,
1452 parts: Vec<Part>,
1453 ) -> Result<ResponseData, S3Error> {
1454 let data = CompleteMultipartUploadData { parts };
1455 let complete = Command::CompleteMultipartUpload { upload_id, data };
1456 let complete_request = RequestImpl::new(self, path, complete)?;
1457 complete_request.response_data(false).await
1458 }
1459
1460 #[maybe_async::sync_impl]
1461 pub fn complete_multipart_upload(
1462 &self,
1463 path: &str,
1464 upload_id: &str,
1465 parts: Vec<Part>,
1466 ) -> Result<ResponseData, S3Error> {
1467 let data = CompleteMultipartUploadData { parts };
1468 let complete = Command::CompleteMultipartUpload { upload_id, data };
1469 let complete_request = RequestImpl::new(self, path, complete)?;
1470 complete_request.response_data(false)
1471 }
1472
1473 #[maybe_async::maybe_async]
1506 pub async fn location(&self) -> Result<(Region, u16), S3Error> {
1507 let request = RequestImpl::new(self, "?location", Command::GetBucketLocation)?;
1508 let response_data = request.response_data(false).await?;
1509 let region_string = String::from_utf8_lossy(response_data.as_slice());
1510 let region = match quick_xml::de::from_reader(region_string.as_bytes()) {
1511 Ok(r) => {
1512 let location_result: BucketLocationResult = r;
1513 location_result.region.parse()?
1514 }
1515 Err(e) => {
1516 if response_data.status_code() == 200 {
1517 Region::Custom {
1518 region: "Custom".to_string(),
1519 endpoint: "".to_string(),
1520 }
1521 } else {
1522 Region::Custom {
1523 region: format!("Error encountered : {}", e),
1524 endpoint: "".to_string(),
1525 }
1526 }
1527 }
1528 };
1529 Ok((region, response_data.status_code()))
1530 }
1531
1532 #[maybe_async::maybe_async]
1565 pub async fn delete_object<S: AsRef<str>>(&self, path: S) -> Result<ResponseData, S3Error> {
1566 let command = Command::DeleteObject;
1567 let request = RequestImpl::new(self, path.as_ref(), command)?;
1568 request.response_data(false).await
1569 }
1570
1571 #[maybe_async::maybe_async]
1604 pub async fn head_object<S: AsRef<str>>(
1605 &self,
1606 path: S,
1607 ) -> Result<(HeadObjectResult, u16), S3Error> {
1608 let command = Command::HeadObject;
1609 let request = RequestImpl::new(self, path.as_ref(), command)?;
1610 let (headers, status) = request.response_header().await?;
1611 let header_object = HeadObjectResult::from(&headers);
1612 Ok((header_object, status))
1613 }
1614
1615 #[maybe_async::maybe_async]
1649 pub async fn put_object_with_content_type<S: AsRef<str>>(
1650 &self,
1651 path: S,
1652 content: &[u8],
1653 content_type: &str,
1654 ) -> Result<ResponseData, S3Error> {
1655 let command = Command::PutObject {
1656 content,
1657 content_type,
1658 multipart: None,
1659 };
1660 let request = RequestImpl::new(self, path.as_ref(), command)?;
1661 request.response_data(true).await
1662 }
1663
1664 #[maybe_async::maybe_async]
1698 pub async fn put_object<S: AsRef<str>>(
1699 &self,
1700 path: S,
1701 content: &[u8],
1702 ) -> Result<ResponseData, S3Error> {
1703 self.put_object_with_content_type(path, content, "application/octet-stream")
1704 .await
1705 }
1706
1707 fn _tags_xml<S: AsRef<str>>(&self, tags: &[(S, S)]) -> String {
1708 let mut s = String::new();
1709 let content = tags
1710 .iter()
1711 .map(|(name, value)| {
1712 format!(
1713 "<Tag><Key>{}</Key><Value>{}</Value></Tag>",
1714 name.as_ref(),
1715 value.as_ref()
1716 )
1717 })
1718 .fold(String::new(), |mut a, b| {
1719 a.push_str(b.as_str());
1720 a
1721 });
1722 s.push_str("<Tagging><TagSet>");
1723 s.push_str(&content);
1724 s.push_str("</TagSet></Tagging>");
1725 s
1726 }
1727
1728 #[maybe_async::maybe_async]
1761 pub async fn put_object_tagging<S: AsRef<str>>(
1762 &self,
1763 path: &str,
1764 tags: &[(S, S)],
1765 ) -> Result<ResponseData, S3Error> {
1766 let content = self._tags_xml(tags);
1767 let command = Command::PutObjectTagging { tags: &content };
1768 let request = RequestImpl::new(self, path, command)?;
1769 request.response_data(false).await
1770 }
1771
1772 #[maybe_async::maybe_async]
1805 pub async fn delete_object_tagging<S: AsRef<str>>(
1806 &self,
1807 path: S,
1808 ) -> Result<ResponseData, S3Error> {
1809 let command = Command::DeleteObjectTagging;
1810 let request = RequestImpl::new(self, path.as_ref(), command)?;
1811 request.response_data(false).await
1812 }
1813
1814 #[cfg(feature = "tags")]
1847 #[maybe_async::maybe_async]
1848 pub async fn get_object_tagging<S: AsRef<str>>(
1849 &self,
1850 path: S,
1851 ) -> Result<(Vec<Tag>, u16), S3Error> {
1852 let command = Command::GetObjectTagging {};
1853 let request = RequestImpl::new(self, path.as_ref(), command)?;
1854 let result = request.response_data(false).await?;
1855
1856 let mut tags = Vec::new();
1857
1858 if result.status_code() == 200 {
1859 let result_string = String::from_utf8_lossy(result.as_slice());
1860
1861 let ns = "http://s3.amazonaws.com/doc/2006-03-01/";
1863 let result_string =
1864 if let Err(minidom::Error::MissingNamespace) = result_string.parse::<Element>() {
1865 result_string
1866 .replace("<Tagging>", &format!("<Tagging xmlns=\"{}\">", ns))
1867 .into()
1868 } else {
1869 result_string
1870 };
1871
1872 if let Ok(tagging) = result_string.parse::<Element>() {
1873 for tag_set in tagging.children() {
1874 if tag_set.is("TagSet", ns) {
1875 for tag in tag_set.children() {
1876 if tag.is("Tag", ns) {
1877 let key = if let Some(element) = tag.get_child("Key", ns) {
1878 element.text()
1879 } else {
1880 "Could not parse Key from Tag".to_string()
1881 };
1882 let value = if let Some(element) = tag.get_child("Value", ns) {
1883 element.text()
1884 } else {
1885 "Could not parse Values from Tag".to_string()
1886 };
1887 tags.push(Tag { key, value });
1888 }
1889 }
1890 }
1891 }
1892 }
1893 }
1894
1895 Ok((tags, result.status_code()))
1896 }
1897
1898 #[maybe_async::maybe_async]
1899 pub async fn list_page(
1900 &self,
1901 prefix: String,
1902 delimiter: Option<String>,
1903 continuation_token: Option<String>,
1904 start_after: Option<String>,
1905 max_keys: Option<usize>,
1906 ) -> Result<(ListBucketResult, u16), S3Error> {
1907 let command = if self.listobjects_v2 {
1908 Command::ListObjectsV2 {
1909 prefix,
1910 delimiter,
1911 continuation_token,
1912 start_after,
1913 max_keys,
1914 }
1915 } else {
1916 Command::ListObjects {
1920 prefix,
1921 delimiter,
1922 marker: std::cmp::max(continuation_token, start_after),
1923 max_keys,
1924 }
1925 };
1926 let request = RequestImpl::new(self, "/", command)?;
1927 let response_data = request.response_data(false).await?;
1928 let list_bucket_result = quick_xml::de::from_reader(response_data.as_slice())?;
1929
1930 Ok((list_bucket_result, response_data.status_code()))
1931 }
1932
1933 #[maybe_async::maybe_async]
1966 pub async fn list(
1967 &self,
1968 prefix: String,
1969 delimiter: Option<String>,
1970 ) -> Result<Vec<ListBucketResult>, S3Error> {
1971 let the_bucket = self.to_owned();
1972 let mut results = Vec::new();
1973 let mut continuation_token = None;
1974
1975 loop {
1976 let (list_bucket_result, _) = the_bucket
1977 .list_page(
1978 prefix.clone(),
1979 delimiter.clone(),
1980 continuation_token,
1981 None,
1982 None,
1983 )
1984 .await?;
1985 continuation_token = list_bucket_result.next_continuation_token.clone();
1986 results.push(list_bucket_result);
1987 if continuation_token.is_none() {
1988 break;
1989 }
1990 }
1991
1992 Ok(results)
1993 }
1994
1995 #[maybe_async::maybe_async]
1996 pub async fn list_multiparts_uploads_page(
1997 &self,
1998 prefix: Option<&str>,
1999 delimiter: Option<&str>,
2000 key_marker: Option<String>,
2001 max_uploads: Option<usize>,
2002 ) -> Result<(ListMultipartUploadsResult, u16), S3Error> {
2003 let command = Command::ListMultipartUploads {
2004 prefix,
2005 delimiter,
2006 key_marker,
2007 max_uploads,
2008 };
2009 let request = RequestImpl::new(self, "/", command)?;
2010 let response_data = request.response_data(false).await?;
2011 let list_bucket_result = quick_xml::de::from_reader(response_data.as_slice())?;
2012
2013 Ok((list_bucket_result, response_data.status_code()))
2014 }
2015
2016 #[maybe_async::maybe_async]
2050 pub async fn list_multiparts_uploads(
2051 &self,
2052 prefix: Option<&str>,
2053 delimiter: Option<&str>,
2054 ) -> Result<Vec<ListMultipartUploadsResult>, S3Error> {
2055 let the_bucket = self.to_owned();
2056 let mut results = Vec::new();
2057 let mut next_marker: Option<String> = None;
2058
2059 loop {
2060 let (list_multiparts_uploads_result, _) = the_bucket
2061 .list_multiparts_uploads_page(prefix, delimiter, next_marker, None)
2062 .await?;
2063
2064 let is_truncated = list_multiparts_uploads_result.is_truncated;
2065 next_marker = list_multiparts_uploads_result.next_marker.clone();
2066 results.push(list_multiparts_uploads_result);
2067
2068 if !is_truncated {
2069 break;
2070 }
2071 }
2072
2073 Ok(results)
2074 }
2075
2076 #[maybe_async::maybe_async]
2109 pub async fn abort_upload(&self, key: &str, upload_id: &str) -> Result<(), S3Error> {
2110 let abort = Command::AbortMultipartUpload { upload_id };
2111 let abort_request = RequestImpl::new(self, key, abort)?;
2112 let response_data = abort_request.response_data(false).await?;
2113
2114 if (200..300).contains(&response_data.status_code()) {
2115 Ok(())
2116 } else {
2117 let utf8_content = String::from_utf8(response_data.as_slice().to_vec())?;
2118 Err(S3Error::Http(response_data.status_code(), utf8_content))
2119 }
2120 }
2121
2122 pub fn is_path_style(&self) -> bool {
2124 self.path_style
2125 }
2126
2127 pub fn is_subdomain_style(&self) -> bool {
2129 !self.path_style
2130 }
2131
2132 pub fn set_path_style(&mut self) {
2134 self.path_style = true;
2135 }
2136
2137 pub fn set_subdomain_style(&mut self) {
2139 self.path_style = false;
2140 }
2141
2142 pub fn set_request_timeout(&mut self, timeout: Option<Duration>) {
2149 self.request_timeout = timeout;
2150 }
2151
2152 pub fn set_listobjects_v1(&mut self) {
2158 self.listobjects_v2 = false;
2159 }
2160
2161 pub fn set_listobjects_v2(&mut self) {
2163 self.listobjects_v2 = true;
2164 }
2165
2166 pub fn name(&self) -> String {
2168 self.name.to_string()
2169 }
2170
2171 pub fn host(&self) -> String {
2173 if self.path_style {
2174 self.path_style_host()
2175 } else {
2176 self.subdomain_style_host()
2177 }
2178 }
2179
2180 pub fn url(&self) -> String {
2181 if self.path_style {
2182 format!(
2183 "{}://{}/{}",
2184 self.scheme(),
2185 self.path_style_host(),
2186 self.name()
2187 )
2188 } else {
2189 format!("{}://{}", self.scheme(), self.subdomain_style_host())
2190 }
2191 }
2192
2193 pub fn path_style_host(&self) -> String {
2195 self.region.host()
2196 }
2197
2198 pub fn subdomain_style_host(&self) -> String {
2199 format!("{}.{}", self.name, self.region.host())
2200 }
2201
2202 pub fn scheme(&self) -> String {
2207 self.region.scheme()
2208 }
2209
2210 pub fn region(&self) -> Region {
2212 self.region.clone()
2213 }
2214
2215 pub fn access_key(&self) -> Result<Option<String>, S3Error> {
2217 Ok(self
2218 .credentials()
2219 .try_read()
2220 .map_err(|_| S3Error::RLCredentials)?
2221 .access_key
2222 .clone()
2223 .map(|key| key.replace('\n', "")))
2224 }
2225
2226 pub fn secret_key(&self) -> Result<Option<String>, S3Error> {
2228 Ok(self
2229 .credentials()
2230 .try_read()
2231 .map_err(|_| S3Error::RLCredentials)?
2232 .secret_key
2233 .clone()
2234 .map(|key| key.replace('\n', "")))
2235 }
2236
2237 pub fn security_token(&self) -> Result<Option<String>, S3Error> {
2239 Ok(self
2240 .credentials()
2241 .try_read()
2242 .map_err(|_| S3Error::RLCredentials)?
2243 .security_token
2244 .clone())
2245 }
2246
2247 pub fn session_token(&self) -> Result<Option<String>, S3Error> {
2249 Ok(self
2250 .credentials()
2251 .try_read()
2252 .map_err(|_| S3Error::RLCredentials)?
2253 .session_token
2254 .clone())
2255 }
2256
2257 pub fn credentials(&self) -> Arc<RwLock<Credentials>> {
2260 self.credentials.clone()
2261 }
2262
2263 pub fn set_credentials(&mut self, credentials: Credentials) {
2265 self.credentials = Arc::new(RwLock::new(credentials));
2266 }
2267
2268 pub fn add_header(&mut self, key: &str, value: &str) {
2281 self.extra_headers
2282 .insert(HeaderName::from_str(key).unwrap(), value.parse().unwrap());
2283 }
2284
2285 pub fn extra_headers(&self) -> &HeaderMap {
2287 &self.extra_headers
2288 }
2289
2290 pub fn extra_headers_mut(&mut self) -> &mut HeaderMap {
2293 &mut self.extra_headers
2294 }
2295
2296 pub fn add_query(&mut self, key: &str, value: &str) {
2298 self.extra_query.insert(key.into(), value.into());
2299 }
2300
2301 pub fn extra_query(&self) -> &Query {
2303 &self.extra_query
2304 }
2305
2306 pub fn extra_query_mut(&mut self) -> &mut Query {
2309 &mut self.extra_query
2310 }
2311
2312 pub fn request_timeout(&self) -> Option<Duration> {
2313 self.request_timeout
2314 }
2315}
2316
2317#[cfg(test)]
2318mod test {
2319
2320 use crate::creds::Credentials;
2321 use crate::region::Region;
2322 use crate::Bucket;
2323 use crate::BucketConfiguration;
2324 use crate::Tag;
2325 use http::header::HeaderName;
2326 use http::HeaderMap;
2327 use std::env;
2328
2329 fn init() {
2330 let _ = env_logger::builder().is_test(true).try_init();
2331 }
2332
2333 fn test_aws_credentials() -> Credentials {
2334 Credentials::new(
2335 Some(&env::var("EU_AWS_ACCESS_KEY_ID").unwrap()),
2336 Some(&env::var("EU_AWS_SECRET_ACCESS_KEY").unwrap()),
2337 None,
2338 None,
2339 None,
2340 )
2341 .unwrap()
2342 }
2343
2344 fn test_gc_credentials() -> Credentials {
2345 Credentials::new(
2346 Some(&env::var("GC_ACCESS_KEY_ID").unwrap()),
2347 Some(&env::var("GC_SECRET_ACCESS_KEY").unwrap()),
2348 None,
2349 None,
2350 None,
2351 )
2352 .unwrap()
2353 }
2354
2355 fn test_wasabi_credentials() -> Credentials {
2356 Credentials::new(
2357 Some(&env::var("WASABI_ACCESS_KEY_ID").unwrap()),
2358 Some(&env::var("WASABI_SECRET_ACCESS_KEY").unwrap()),
2359 None,
2360 None,
2361 None,
2362 )
2363 .unwrap()
2364 }
2365
2366 fn test_minio_credentials() -> Credentials {
2367 Credentials::new(Some("test"), Some("test1234"), None, None, None).unwrap()
2368 }
2369
2370 fn test_digital_ocean_credentials() -> Credentials {
2371 Credentials::new(
2372 Some(&env::var("DIGITAL_OCEAN_ACCESS_KEY_ID").unwrap()),
2373 Some(&env::var("DIGITAL_OCEAN_SECRET_ACCESS_KEY").unwrap()),
2374 None,
2375 None,
2376 None,
2377 )
2378 .unwrap()
2379 }
2380
2381 fn test_r2_credentials() -> Credentials {
2382 Credentials::new(
2383 Some(&env::var("R2_ACCESS_KEY_ID").unwrap()),
2384 Some(&env::var("R2_SECRET_ACCESS_KEY").unwrap()),
2385 None,
2386 None,
2387 None,
2388 )
2389 .unwrap()
2390 }
2391
2392 fn test_aws_bucket() -> Bucket {
2393 Bucket::new(
2394 "rust-s3-test",
2395 "eu-central-1".parse().unwrap(),
2396 test_aws_credentials(),
2397 )
2398 .unwrap()
2399 }
2400
2401 fn test_wasabi_bucket() -> Bucket {
2402 Bucket::new(
2403 "rust-s3",
2404 "wa-eu-central-1".parse().unwrap(),
2405 test_wasabi_credentials(),
2406 )
2407 .unwrap()
2408 }
2409
2410 fn test_gc_bucket() -> Bucket {
2411 let mut bucket = Bucket::new(
2412 "rust-s3",
2413 Region::Custom {
2414 region: "us-east1".to_owned(),
2415 endpoint: "https://storage.googleapis.com".to_owned(),
2416 },
2417 test_gc_credentials(),
2418 )
2419 .unwrap();
2420 bucket.set_listobjects_v1();
2421 bucket
2422 }
2423
2424 fn test_minio_bucket() -> Bucket {
2425 Bucket::new(
2426 "rust-s3",
2427 Region::Custom {
2428 region: "eu-central-1".to_owned(),
2429 endpoint: "http://localhost:9000".to_owned(),
2430 },
2431 test_minio_credentials(),
2432 )
2433 .unwrap()
2434 .with_path_style()
2435 }
2436
2437 fn test_digital_ocean_bucket() -> Bucket {
2438 Bucket::new("rust-s3", Region::DoFra1, test_digital_ocean_credentials()).unwrap()
2439 }
2440
2441 fn test_r2_bucket() -> Bucket {
2442 Bucket::new(
2443 "rust-s3",
2444 Region::R2 {
2445 account_id: "f048f3132be36fa1aaa8611992002b3f".to_string(),
2446 },
2447 test_r2_credentials(),
2448 )
2449 .unwrap()
2450 }
2451
2452 fn object(size: u32) -> Vec<u8> {
2453 (0..size).map(|_| 33).collect()
2454 }
2455
2456 #[maybe_async::maybe_async]
2457 async fn put_head_get_delete_object(bucket: Bucket, head: bool) {
2458 let s3_path = "/+test.file";
2459 let test: Vec<u8> = object(3072);
2460
2461 let response_data = bucket.put_object(s3_path, &test).await.unwrap();
2462 assert_eq!(response_data.status_code(), 200);
2463 let response_data = bucket.get_object(s3_path).await.unwrap();
2464 assert_eq!(response_data.status_code(), 200);
2465 assert_eq!(test, response_data.as_slice());
2466
2467 let response_data = bucket
2468 .get_object_range(s3_path, 100, Some(1000))
2469 .await
2470 .unwrap();
2471 assert_eq!(response_data.status_code(), 206);
2472 assert_eq!(test[100..1001].to_vec(), response_data.as_slice());
2473 if head {
2474 let (head_object_result, code) = bucket.head_object(s3_path).await.unwrap();
2475 assert_eq!(code, 200);
2476 assert_eq!(
2477 head_object_result.content_type.unwrap(),
2478 "application/octet-stream".to_owned()
2479 );
2480 }
2481
2482 let response_data = bucket.delete_object(s3_path).await.unwrap();
2484 assert_eq!(response_data.status_code(), 204);
2485 }
2486
2487 #[ignore]
2488 #[cfg(feature = "tags")]
2489 #[maybe_async::test(
2490 feature = "sync",
2491 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
2492 async(
2493 all(not(feature = "sync"), feature = "with-async-std"),
2494 async_std::test
2495 )
2496 )]
2497 async fn test_tagging_aws() {
2498 let bucket = test_aws_bucket();
2499 let _target_tags = vec![
2500 Tag {
2501 key: "Tag1".to_string(),
2502 value: "Value1".to_string(),
2503 },
2504 Tag {
2505 key: "Tag2".to_string(),
2506 value: "Value2".to_string(),
2507 },
2508 ];
2509 let empty_tags: Vec<Tag> = Vec::new();
2510 let response_data = bucket
2511 .put_object("tagging_test", b"Gimme tags")
2512 .await
2513 .unwrap();
2514 assert_eq!(response_data.status_code(), 200);
2515 let (tags, _code) = bucket.get_object_tagging("tagging_test").await.unwrap();
2516 assert_eq!(tags, empty_tags);
2517 let response_data = bucket
2518 .put_object_tagging("tagging_test", &[("Tag1", "Value1"), ("Tag2", "Value2")])
2519 .await
2520 .unwrap();
2521 assert_eq!(response_data.status_code(), 200);
2522 let (_tags, _code) = bucket.get_object_tagging("tagging_test").await.unwrap();
2524 let _response_data = bucket.delete_object("tagging_test").await.unwrap();
2526 }
2527
2528 #[ignore]
2529 #[cfg(feature = "tags")]
2530 #[maybe_async::test(
2531 feature = "sync",
2532 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
2533 async(
2534 all(not(feature = "sync"), feature = "with-async-std"),
2535 async_std::test
2536 )
2537 )]
2538 async fn test_tagging_minio() {
2539 let bucket = test_minio_bucket();
2540 let _target_tags = vec![
2541 Tag {
2542 key: "Tag1".to_string(),
2543 value: "Value1".to_string(),
2544 },
2545 Tag {
2546 key: "Tag2".to_string(),
2547 value: "Value2".to_string(),
2548 },
2549 ];
2550 let empty_tags: Vec<Tag> = Vec::new();
2551 let response_data = bucket
2552 .put_object("tagging_test", b"Gimme tags")
2553 .await
2554 .unwrap();
2555 assert_eq!(response_data.status_code(), 200);
2556 let (tags, _code) = bucket.get_object_tagging("tagging_test").await.unwrap();
2557 assert_eq!(tags, empty_tags);
2558 let response_data = bucket
2559 .put_object_tagging("tagging_test", &[("Tag1", "Value1"), ("Tag2", "Value2")])
2560 .await
2561 .unwrap();
2562 assert_eq!(response_data.status_code(), 200);
2563 let (_tags, _code) = bucket.get_object_tagging("tagging_test").await.unwrap();
2565 let _response_data = bucket.delete_object("tagging_test").await.unwrap();
2567 }
2568
2569 #[ignore]
2570 #[maybe_async::test(
2571 feature = "sync",
2572 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
2573 async(
2574 all(not(feature = "sync"), feature = "with-async-std"),
2575 async_std::test
2576 )
2577 )]
2578 async fn streaming_big_aws_put_head_get_delete_object() {
2579 streaming_test_put_get_delete_big_object(test_aws_bucket()).await;
2580 }
2581
2582 #[ignore]
2583 #[maybe_async::test(
2584 feature = "sync",
2585 async(
2586 all(
2587 not(feature = "sync"),
2588 not(feature = "tokio-rustls-tls"),
2589 feature = "with-tokio"
2590 ),
2591 tokio::test
2592 ),
2593 async(
2594 all(not(feature = "sync"), feature = "with-async-std"),
2595 async_std::test
2596 )
2597 )]
2598 async fn streaming_big_gc_put_head_get_delete_object() {
2599 streaming_test_put_get_delete_big_object(test_gc_bucket()).await;
2600 }
2601
2602 #[ignore]
2603 #[maybe_async::test(
2604 feature = "sync",
2605 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
2606 async(
2607 all(not(feature = "sync"), feature = "with-async-std"),
2608 async_std::test
2609 )
2610 )]
2611 async fn streaming_big_minio_put_head_get_delete_object() {
2612 streaming_test_put_get_delete_big_object(test_minio_bucket()).await;
2613 }
2614
2615 #[maybe_async::maybe_async]
2617 async fn streaming_test_put_get_delete_big_object(bucket: Bucket) {
2618 #[cfg(feature = "with-async-std")]
2619 use async_std::fs::File;
2620 #[cfg(feature = "with-async-std")]
2621 use async_std::io::WriteExt;
2622 #[cfg(feature = "with-async-std")]
2623 use async_std::stream::StreamExt;
2624 #[cfg(feature = "with-tokio")]
2625 use futures::StreamExt;
2626 #[cfg(not(any(feature = "with-tokio", feature = "with-async-std")))]
2627 use std::fs::File;
2628 #[cfg(not(any(feature = "with-tokio", feature = "with-async-std")))]
2629 use std::io::Write;
2630 #[cfg(feature = "with-tokio")]
2631 use tokio::fs::File;
2632 #[cfg(feature = "with-tokio")]
2633 use tokio::io::AsyncWriteExt;
2634
2635 init();
2636 let remote_path = "+stream_test_big";
2637 let local_path = "+stream_test_big";
2638 std::fs::remove_file(remote_path).unwrap_or_else(|_| {});
2639 let content: Vec<u8> = object(20_000_000);
2640
2641 let mut file = File::create(local_path).await.unwrap();
2642 file.write_all(&content).await.unwrap();
2643 let mut reader = File::open(local_path).await.unwrap();
2644
2645 let code = bucket
2646 .put_object_stream(&mut reader, remote_path)
2647 .await
2648 .unwrap();
2649 assert_eq!(code, 200);
2650 let mut writer = Vec::new();
2651 let code = bucket
2652 .get_object_to_writer(remote_path, &mut writer)
2653 .await
2654 .unwrap();
2655 assert_eq!(code, 200);
2656 assert_eq!(content.len(), writer.len());
2658 assert_eq!(content.len(), 20_000_000);
2659
2660 #[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
2661 {
2662 let mut response_data_stream = bucket.get_object_stream(remote_path).await.unwrap();
2663
2664 let mut bytes = vec![];
2665
2666 while let Some(chunk) = response_data_stream.bytes().next().await {
2667 bytes.push(chunk)
2668 }
2669 assert_ne!(bytes.len(), 0);
2670 }
2671
2672 let response_data = bucket.delete_object(remote_path).await.unwrap();
2673 assert_eq!(response_data.status_code(), 204);
2674 std::fs::remove_file(local_path).unwrap_or_else(|_| {});
2675 }
2676
2677 #[ignore]
2678 #[maybe_async::test(
2679 feature = "sync",
2680 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
2681 async(
2682 all(not(feature = "sync"), feature = "with-async-std"),
2683 async_std::test
2684 )
2685 )]
2686 async fn streaming_aws_put_head_get_delete_object() {
2687 streaming_test_put_get_delete_small_object(test_aws_bucket()).await;
2688 }
2689
2690 #[ignore]
2691 #[maybe_async::test(
2692 feature = "sync",
2693 async(
2694 all(
2695 not(feature = "sync"),
2696 not(feature = "tokio-rustls-tls"),
2697 feature = "with-tokio"
2698 ),
2699 tokio::test
2700 ),
2701 async(
2702 all(not(feature = "sync"), feature = "with-async-std"),
2703 async_std::test
2704 )
2705 )]
2706 async fn streaming_gc_put_head_get_delete_object() {
2707 streaming_test_put_get_delete_small_object(test_gc_bucket()).await;
2708 }
2709
2710 #[ignore]
2711 #[maybe_async::test(
2712 feature = "sync",
2713 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
2714 async(
2715 all(not(feature = "sync"), feature = "with-async-std"),
2716 async_std::test
2717 )
2718 )]
2719 async fn streaming_r2_put_head_get_delete_object() {
2720 streaming_test_put_get_delete_small_object(test_r2_bucket()).await;
2721 }
2722
2723 #[ignore]
2724 #[maybe_async::test(
2725 feature = "sync",
2726 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
2727 async(
2728 all(not(feature = "sync"), feature = "with-async-std"),
2729 async_std::test
2730 )
2731 )]
2732 async fn streaming_minio_put_head_get_delete_object() {
2733 streaming_test_put_get_delete_small_object(test_minio_bucket()).await;
2734 }
2735
2736 #[maybe_async::maybe_async]
2737 async fn streaming_test_put_get_delete_small_object(bucket: Bucket) {
2738 init();
2739 let remote_path = "+stream_test_small";
2740 let content: Vec<u8> = object(1000);
2741 #[cfg(feature = "with-tokio")]
2742 let mut reader = std::io::Cursor::new(&content);
2743 #[cfg(feature = "with-async-std")]
2744 let mut reader = async_std::io::Cursor::new(&content);
2745
2746 let code = bucket
2747 .put_object_stream(&mut reader, remote_path)
2748 .await
2749 .unwrap();
2750 assert_eq!(code, 200);
2751 let mut writer = Vec::new();
2752 let code = bucket
2753 .get_object_to_writer(remote_path, &mut writer)
2754 .await
2755 .unwrap();
2756 assert_eq!(code, 200);
2757 assert_eq!(content, writer);
2758
2759 let response_data = bucket.delete_object(remote_path).await.unwrap();
2760 assert_eq!(response_data.status_code(), 204);
2761 }
2762
2763 #[cfg(feature = "blocking")]
2764 fn put_head_get_list_delete_object_blocking(bucket: Bucket) {
2765 let s3_path = "/test_blocking.file";
2766 let s3_path_2 = "/test_blocking.file2";
2767 let s3_path_3 = "/test_blocking.file3";
2768 let test: Vec<u8> = object(3072);
2769
2770 let response_data = bucket.put_object_blocking(s3_path, &test).unwrap();
2772 assert_eq!(response_data.status_code(), 200);
2773
2774 let response_data = bucket.get_object_blocking(s3_path).unwrap();
2776 assert_eq!(response_data.status_code(), 200);
2777 assert_eq!(test, response_data.as_slice());
2778
2779 let response_data = bucket
2781 .get_object_range_blocking(s3_path, 100, Some(1000))
2782 .unwrap();
2783 assert_eq!(response_data.status_code(), 206);
2784 assert_eq!(test[100..1001].to_vec(), response_data.as_slice());
2785
2786 let (head_object_result, code) = bucket.head_object_blocking(s3_path).unwrap();
2788 assert_eq!(code, 200);
2789 assert_eq!(
2790 head_object_result.content_type.unwrap(),
2791 "application/octet-stream".to_owned()
2792 );
2793 let response_data = bucket.put_object_blocking(s3_path_2, &test).unwrap();
2797 assert_eq!(response_data.status_code(), 200);
2798 let response_data = bucket.put_object_blocking(s3_path_3, &test).unwrap();
2799 assert_eq!(response_data.status_code(), 200);
2800
2801 let (result, code) = bucket
2803 .list_page_blocking(
2804 "test_blocking.".to_string(),
2805 Some("/".to_string()),
2806 None,
2807 None,
2808 Some(2),
2809 )
2810 .unwrap();
2811 assert_eq!(code, 200);
2812 assert_eq!(result.contents.len(), 2);
2813 assert_eq!(result.contents[0].key, s3_path[1..]);
2814 assert_eq!(result.contents[1].key, s3_path_2[1..]);
2815
2816 let cont_token = result.next_continuation_token.unwrap();
2817
2818 let (result, code) = bucket
2819 .list_page_blocking(
2820 "test_blocking.".to_string(),
2821 Some("/".to_string()),
2822 Some(cont_token),
2823 None,
2824 Some(2),
2825 )
2826 .unwrap();
2827 assert_eq!(code, 200);
2828 assert_eq!(result.contents.len(), 1);
2829 assert_eq!(result.contents[0].key, s3_path_3[1..]);
2830 assert!(result.next_continuation_token.is_none());
2831
2832 let response_data = bucket.delete_object_blocking(s3_path).unwrap();
2834 assert_eq!(code, 200);
2835 let response_data = bucket.delete_object_blocking(s3_path_2).unwrap();
2836 assert_eq!(code, 200);
2837 let response_data = bucket.delete_object_blocking(s3_path_3).unwrap();
2838 assert_eq!(code, 200);
2839 }
2840
2841 #[ignore]
2842 #[cfg(all(
2843 any(feature = "with-tokio", feature = "with-async-std"),
2844 feature = "blocking"
2845 ))]
2846 #[test]
2847 fn aws_put_head_get_delete_object_blocking() {
2848 put_head_get_list_delete_object_blocking(test_aws_bucket())
2849 }
2850
2851 #[ignore]
2852 #[cfg(all(
2853 any(feature = "with-tokio", feature = "with-async-std"),
2854 feature = "blocking"
2855 ))]
2856 #[test]
2857 fn gc_put_head_get_delete_object_blocking() {
2858 put_head_get_list_delete_object_blocking(test_gc_bucket())
2859 }
2860
2861 #[ignore]
2862 #[cfg(all(
2863 any(feature = "with-tokio", feature = "with-async-std"),
2864 feature = "blocking"
2865 ))]
2866 #[test]
2867 fn wasabi_put_head_get_delete_object_blocking() {
2868 put_head_get_list_delete_object_blocking(test_wasabi_bucket())
2869 }
2870
2871 #[ignore]
2872 #[cfg(all(
2873 any(feature = "with-tokio", feature = "with-async-std"),
2874 feature = "blocking"
2875 ))]
2876 #[test]
2877 fn minio_put_head_get_delete_object_blocking() {
2878 put_head_get_list_delete_object_blocking(test_minio_bucket())
2879 }
2880
2881 #[ignore]
2882 #[cfg(all(
2883 any(feature = "with-tokio", feature = "with-async-std"),
2884 feature = "blocking"
2885 ))]
2886 #[test]
2887 fn digital_ocean_put_head_get_delete_object_blocking() {
2888 put_head_get_list_delete_object_blocking(test_digital_ocean_bucket())
2889 }
2890
2891 #[ignore]
2892 #[maybe_async::test(
2893 feature = "sync",
2894 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
2895 async(
2896 all(not(feature = "sync"), feature = "with-async-std"),
2897 async_std::test
2898 )
2899 )]
2900 async fn aws_put_head_get_delete_object() {
2901 put_head_get_delete_object(test_aws_bucket(), true).await;
2902 }
2903
2904 #[ignore]
2905 #[maybe_async::test(
2906 feature = "sync",
2907 async(
2908 all(
2909 not(any(feature = "sync", feature = "tokio-rustls-tls")),
2910 feature = "with-tokio"
2911 ),
2912 tokio::test
2913 ),
2914 async(
2915 all(not(feature = "sync"), feature = "with-async-std"),
2916 async_std::test
2917 )
2918 )]
2919 async fn gc_test_put_head_get_delete_object() {
2920 put_head_get_delete_object(test_gc_bucket(), true).await;
2921 }
2922
2923 #[ignore]
2924 #[maybe_async::test(
2925 feature = "sync",
2926 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
2927 async(
2928 all(not(feature = "sync"), feature = "with-async-std"),
2929 async_std::test
2930 )
2931 )]
2932 async fn wasabi_test_put_head_get_delete_object() {
2933 put_head_get_delete_object(test_wasabi_bucket(), true).await;
2934 }
2935
2936 #[ignore]
2937 #[maybe_async::test(
2938 feature = "sync",
2939 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
2940 async(
2941 all(not(feature = "sync"), feature = "with-async-std"),
2942 async_std::test
2943 )
2944 )]
2945 async fn minio_test_put_head_get_delete_object() {
2946 put_head_get_delete_object(test_minio_bucket(), true).await;
2947 }
2948
2949 #[ignore]
2964 #[maybe_async::test(
2965 feature = "sync",
2966 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
2967 async(
2968 all(not(feature = "sync"), feature = "with-async-std"),
2969 async_std::test
2970 )
2971 )]
2972 async fn r2_test_put_head_get_delete_object() {
2973 put_head_get_delete_object(test_r2_bucket(), false).await;
2974 }
2975
2976 #[test]
2977 #[ignore]
2978 fn test_presign_put() {
2979 let s3_path = "/test/test.file";
2980 let bucket = test_aws_bucket();
2981
2982 let mut custom_headers = HeaderMap::new();
2983 custom_headers.insert(
2984 HeaderName::from_static("custom_header"),
2985 "custom_value".parse().unwrap(),
2986 );
2987
2988 let url = bucket
2989 .presign_put(s3_path, 86400, Some(custom_headers))
2990 .unwrap();
2991
2992 assert!(url.contains("host%3Bcustom_header"));
2993 assert!(url.contains("/test/test.file"))
2994 }
2995
2996 #[test]
2997 #[ignore]
2998 fn test_presign_get() {
2999 let s3_path = "/test/test.file";
3000 let bucket = test_aws_bucket();
3001
3002 let url = bucket.presign_get(s3_path, 86400, None).unwrap();
3003 assert!(url.contains("/test/test.file?"))
3004 }
3005
3006 #[test]
3007 #[ignore]
3008 fn test_presign_delete() {
3009 let s3_path = "/test/test.file";
3010 let bucket = test_aws_bucket();
3011
3012 let url = bucket.presign_delete(s3_path, 86400).unwrap();
3013 assert!(url.contains("/test/test.file?"))
3014 }
3015
3016 #[maybe_async::test(
3017 feature = "sync",
3018 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3019 async(
3020 all(not(feature = "sync"), feature = "with-async-std"),
3021 async_std::test
3022 )
3023 )]
3024 #[ignore]
3025 async fn test_bucket_create_delete_default_region() {
3026 let config = BucketConfiguration::default();
3027 let response = Bucket::create(
3028 &uuid::Uuid::new_v4().to_string(),
3029 "us-east-1".parse().unwrap(),
3030 test_aws_credentials(),
3031 config,
3032 )
3033 .await
3034 .unwrap();
3035
3036 assert_eq!(&response.response_text, "");
3037
3038 assert_eq!(response.response_code, 200);
3039
3040 let response_code = response.bucket.delete().await.unwrap();
3041 assert!(response_code < 300);
3042 }
3043
3044 #[ignore]
3045 #[maybe_async::test(
3046 feature = "sync",
3047 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3048 async(
3049 all(not(feature = "sync"), feature = "with-async-std"),
3050 async_std::test
3051 )
3052 )]
3053 async fn test_bucket_create_delete_non_default_region() {
3054 let config = BucketConfiguration::default();
3055 let response = Bucket::create(
3056 &uuid::Uuid::new_v4().to_string(),
3057 "eu-central-1".parse().unwrap(),
3058 test_aws_credentials(),
3059 config,
3060 )
3061 .await
3062 .unwrap();
3063
3064 assert_eq!(&response.response_text, "");
3065
3066 assert_eq!(response.response_code, 200);
3067
3068 let response_code = response.bucket.delete().await.unwrap();
3069 assert!(response_code < 300);
3070 }
3071
3072 #[ignore]
3073 #[maybe_async::test(
3074 feature = "sync",
3075 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3076 async(
3077 all(not(feature = "sync"), feature = "with-async-std"),
3078 async_std::test
3079 )
3080 )]
3081 async fn test_bucket_create_delete_non_default_region_public() {
3082 let config = BucketConfiguration::public();
3083 let response = Bucket::create(
3084 &uuid::Uuid::new_v4().to_string(),
3085 "eu-central-1".parse().unwrap(),
3086 test_aws_credentials(),
3087 config,
3088 )
3089 .await
3090 .unwrap();
3091
3092 assert_eq!(&response.response_text, "");
3093
3094 assert_eq!(response.response_code, 200);
3095
3096 let response_code = response.bucket.delete().await.unwrap();
3097 assert!(response_code < 300);
3098 }
3099
3100 #[test]
3101 fn test_tag_has_key_and_value_functions() {
3102 let key = "key".to_owned();
3103 let value = "value".to_owned();
3104 let tag = Tag { key, value };
3105 assert_eq!["key", tag.key()];
3106 assert_eq!["value", tag.value()];
3107 }
3108
3109 #[test]
3110 #[ignore]
3111 fn test_builder_composition() {
3112 use std::time::Duration;
3113
3114 let bucket = Bucket::new(
3115 "test-bucket",
3116 "eu-central-1".parse().unwrap(),
3117 test_aws_credentials(),
3118 )
3119 .unwrap()
3120 .with_request_timeout(Duration::from_secs(10));
3121
3122 assert_eq!(bucket.request_timeout(), Some(Duration::from_secs(10)));
3123 }
3124}