1use crate::command::{Command, CompleteMultipartUploadData, Part};
2use crate::constants::LONG_DATE_TIME;
3use crate::credentials::Credentials;
4use crate::error::S3Error;
5use crate::types::Multipart;
6use crate::types::{
7 HeadObjectResult, InitiateMultipartUploadResponse, ListBucketResult, PutStreamResponse,
8};
9use crate::{md5_url_encode, signature, Region, S3Response, S3StatusCode};
10use hmac::Hmac;
11use http::header::{ACCEPT, AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE, DATE, HOST, RANGE};
12use http::{HeaderMap, HeaderName, HeaderValue};
13use reqwest::Response;
14use sha2::digest::Mac;
15use sha2::Sha256;
16use std::fmt::Write;
17use std::sync::OnceLock;
18use std::time::Duration;
19use std::{env, mem};
20use time::format_description::well_known::Rfc2822;
21use time::OffsetDateTime;
22use tokio::io::{AsyncRead, AsyncReadExt};
23use tracing::{debug, error, warn};
24use url::Url;
25
26static CLIENT: OnceLock<reqwest::Client> = OnceLock::new();
27
28const CHUNK_SIZE: usize = 8 * 1024 * 1024; #[derive(Debug)]
31pub struct BucketOptions {
32 pub path_style: bool,
33 pub list_objects_v2: bool,
34}
35
36impl Default for BucketOptions {
37 fn default() -> Self {
38 Self {
39 path_style: env::var("S3_PATH_STYLE")
40 .unwrap_or_else(|_| "false".to_string())
41 .parse::<bool>()
42 .expect("S3_PATH_STYLE cannot be parsed as bool"),
43 list_objects_v2: true,
44 }
45 }
46}
47
48#[derive(Debug, Clone)]
49pub struct Bucket {
50 pub host: Url,
51 pub name: String,
52 pub region: Region,
53 pub credentials: Credentials,
54 path_style: bool,
55 list_objects_v2: bool,
56}
57
58#[allow(dead_code)]
59#[allow(clippy::assigning_clones)] impl Bucket {
61 fn host_domain(&self) -> String {
62 match self.host.domain() {
63 None => {
64 let host_str = self
66 .host
67 .host_str()
68 .expect("host_str to exist when domain does not");
69 if let Some(port) = self.host.port() {
70 format!("{}:{}", host_str, port,)
71 } else {
72 host_str.to_string()
73 }
74 }
75 Some(domain) => {
76 if let Some(port) = self.host.port() {
77 format!("{}:{}", domain, port)
78 } else {
79 domain.to_string()
80 }
81 }
82 }
83 }
84
85 pub fn new(
86 host: Url,
87 name: String,
88 region: Region,
89 credentials: Credentials,
90 options: Option<BucketOptions>,
91 ) -> Result<Self, S3Error> {
92 let options = options.unwrap_or_default();
93 Ok(Self {
94 host,
95 name,
96 region,
97 credentials,
98 path_style: options.path_style,
99 list_objects_v2: options.list_objects_v2,
100 })
101 }
102
103 pub fn try_from_env() -> Result<Self, S3Error> {
104 let host_env = env::var("S3_URL")?;
105 let host = host_env.parse::<Url>()?;
106
107 let name = env::var("S3_BUCKET")?;
108 let region = Region::try_from_env()?;
109 let credentials = Credentials::try_from_env()?;
110 let options = BucketOptions::default();
111
112 Ok(Self {
113 host,
114 name,
115 region,
116 credentials,
117 path_style: options.path_style,
118 list_objects_v2: options.list_objects_v2,
119 })
120 }
121
122 pub async fn head<S: AsRef<str>>(&self, path: S) -> Result<HeadObjectResult, S3Error> {
124 let res = self
125 .send_request(Command::HeadObject, path.as_ref())
126 .await?;
127 Ok(HeadObjectResult::from(res.headers()))
128 }
129
130 pub async fn get<P>(&self, path: P) -> Result<S3Response, S3Error>
132 where
133 P: AsRef<str>,
134 {
135 self.send_request(Command::GetObject, path.as_ref()).await
136 }
137
138 pub async fn get_range<S: AsRef<str>>(
139 &self,
140 path: S,
141 start: u64,
142 end: Option<u64>,
143 ) -> Result<S3Response, S3Error> {
144 if let Some(end) = end {
145 if start >= end {
146 return Err(S3Error::Range("start must be < than end"));
147 }
148 }
149 self.send_request(Command::GetObjectRange { start, end }, path.as_ref())
150 .await
151 }
152
153 pub async fn delete<S: AsRef<str>>(&self, path: S) -> Result<S3Response, S3Error> {
155 self.send_request(Command::DeleteObject, path.as_ref())
156 .await
157 }
158
159 pub async fn put<S: AsRef<str>>(&self, path: S, content: &[u8]) -> Result<S3Response, S3Error> {
161 self.put_with_content_type(path, content, "application/octet-stream")
162 .await
163 }
164
165 pub async fn put_with_content_type<S: AsRef<str>>(
167 &self,
168 path: S,
169 content: &[u8],
170 content_type: &str,
171 ) -> Result<S3Response, S3Error> {
172 let mut headers = HeaderMap::new();
173 headers.insert(CONTENT_TYPE, HeaderValue::from_str(content_type)?);
174
175 self.send_request(
176 Command::PutObject {
177 content,
178 headers,
179 multipart: None,
180 },
181 path.as_ref(),
182 )
183 .await
184 }
185
186 pub async fn put_with<S: AsRef<str>>(
207 &self,
208 path: S,
209 content: &[u8],
210 extra_headers: HeaderMap,
211 ) -> Result<S3Response, S3Error> {
212 self.send_request(
213 Command::PutObject {
214 content,
215 headers: extra_headers,
216 multipart: None,
217 },
218 path.as_ref(),
219 )
220 .await
221 }
222
223 pub async fn put_stream<R>(
225 &self,
226 reader: &mut R,
227 path: String,
228 ) -> Result<PutStreamResponse, S3Error>
229 where
230 R: AsyncRead + Unpin,
231 {
232 self.put_stream_with_content_type(reader, path, "application/octet-stream".to_string())
233 .await
234 }
235
236 async fn initiate_multipart_upload(
237 &self,
238 path: &str,
239 extra_headers: HeaderMap,
240 ) -> Result<InitiateMultipartUploadResponse, S3Error> {
241 let res = self
242 .send_request(
243 Command::InitiateMultipartUpload {
244 headers: extra_headers,
245 },
246 path,
247 )
248 .await?;
249 Ok(quick_xml::de::from_str(&res.text().await?)?)
250 }
251
252 async fn multipart_request(
253 &self,
254 path: &str,
255 chunk: Vec<u8>,
256 part_number: u32,
257 upload_id: &str,
258 ) -> Result<Response, S3Error> {
259 self.send_request(
260 Command::PutObject {
261 content: &chunk,
263 multipart: Some(Multipart::new(part_number, upload_id)),
264 headers: HeaderMap::new(),
265 },
266 path,
267 )
268 .await
269 }
270
271 async fn complete_multipart_upload(
272 &self,
273 path: &str,
274 upload_id: &str,
275 parts: Vec<Part>,
276 ) -> Result<Response, S3Error> {
277 let data = CompleteMultipartUploadData { parts };
278 self.send_request(Command::CompleteMultipartUpload { upload_id, data }, path)
279 .await
280 }
281
282 #[tracing::instrument(level = "debug", skip_all, fields(path = path))]
284 pub async fn put_stream_with_content_type<R>(
285 &self,
286 reader: &mut R,
287 path: String,
288 content_type: String,
289 ) -> Result<PutStreamResponse, S3Error>
290 where
291 R: AsyncRead + Unpin,
292 {
293 let mut headers = HeaderMap::new();
294 headers.insert(CONTENT_TYPE, HeaderValue::from_str(&content_type)?);
295
296 self.put_stream_with(reader, path, headers).await
297 }
298
299 #[tracing::instrument(level = "debug", skip_all, fields(path = path))]
305 pub async fn put_stream_with<R>(
306 &self,
307 reader: &mut R,
308 path: String,
309 extra_headers: HeaderMap,
310 ) -> Result<PutStreamResponse, S3Error>
311 where
312 R: AsyncRead + Unpin,
313 {
314 let mut first_chunk = Vec::with_capacity(CHUNK_SIZE);
319 let first_chunk_size = reader
320 .take(CHUNK_SIZE as u64)
321 .read_to_end(&mut first_chunk)
322 .await?;
323
324 debug!("first_chunk size: {}", first_chunk.len());
325 if first_chunk_size < CHUNK_SIZE {
326 debug!("first_chunk_size < CHUNK_SIZE -> doing normal PUT without stream");
327 let res = self
328 .put_with(&path, first_chunk.as_slice(), extra_headers)
329 .await;
330
331 return match res {
332 Ok(res) => Ok(PutStreamResponse {
333 status_code: res.status().as_u16(),
334 uploaded_bytes: first_chunk_size,
335 }),
336 Err(err) => Err(err),
337 };
338 }
339
340 debug!("first_chunk_size > CHUNK_SIZE -> initiate streaming upload");
341
342 let (tx, rx) = flume::bounded(2);
350
351 let slf = self.clone();
353 let handle_writer = tokio::spawn(async move {
354 debug!("writer task has been started");
355
356 let msg = slf.initiate_multipart_upload(&path, extra_headers).await?;
357 debug!("{:?}", msg);
358 let path = msg.key;
359 let upload_id = &msg.upload_id;
360
361 let mut part_number: u32 = 0;
362 let mut etags = Vec::new();
363
364 let mut total_size = 0;
365 loop {
366 let chunk = if part_number == 0 {
367 let mut bytes = Vec::default();
369 mem::swap(&mut first_chunk, &mut bytes);
370 bytes
371 } else {
372 match rx.recv_async().await {
373 Ok(Some(chunk)) => chunk,
374 Ok(None) => {
375 debug!("no more parts available in reader - finishing upload");
376 break;
377 }
378 Err(err) => {
379 debug!("chunk reader channel has been closed: {}", err);
380 break;
381 }
382 }
383 };
384 debug!("chunk size in loop {}: {}", part_number + 1, chunk.len());
385
386 total_size += chunk.len();
387
388 part_number += 1;
390 let res = slf
391 .multipart_request(&path, chunk, part_number, upload_id)
392 .await;
393
394 match res {
395 Ok(res) => {
396 let etag = res
397 .headers()
398 .get("etag")
399 .ok_or_else(|| {
400 S3Error::UnexpectedResponse(
401 "missing ETag in multipart response headers",
402 )
403 })?
404 .to_str()
405 .map_err(S3Error::HeaderToStr)?;
406 etags.push(etag.to_string());
407 }
408 Err(err) => {
409 slf.abort_upload(&path, upload_id).await?;
411 return Err(err);
412 }
413 }
414 }
415 debug!(
416 "multipart uploading finished after {} parts with total size of {} bytes",
417 part_number, total_size
418 );
419
420 let inner_data = etags
422 .into_iter()
423 .enumerate()
424 .map(|(i, etag)| Part {
425 etag,
426 part_number: i as u32 + 1,
427 })
428 .collect::<Vec<Part>>();
429 debug!("data for multipart finishing: {:?}", inner_data);
430 let res = slf
431 .complete_multipart_upload(&path, &msg.upload_id, inner_data)
432 .await;
433
434 match res {
435 Ok(res) => Ok(PutStreamResponse {
436 status_code: res.status().as_u16(),
437 uploaded_bytes: total_size,
438 }),
439 Err(err) => Err(err),
440 }
441 });
442
443 loop {
445 let mut buf = Vec::with_capacity(CHUNK_SIZE);
446 match reader.take(CHUNK_SIZE as u64).read_to_end(&mut buf).await {
447 Ok(size) => {
448 if size == 0 {
449 debug!("stream reader finished reading");
450 if let Err(err) = tx.send_async(None).await {
451 error!("sending the 'no more data' message in reader: {}", err);
452 }
453 break;
454 }
455
456 debug!("stream reader read {} bytes", size);
457 if let Err(err) = tx.send_async(Some(buf)).await {
458 warn!(
459 "Stream Writer has been closed before reader finished: {}",
460 err
461 );
462 break;
463 }
464 }
465 Err(err) => {
466 error!("stream reader error: {}", err);
467 break;
468 }
469 }
470 }
471
472 handle_writer.await?
473 }
474
475 async fn list_page(
476 &self,
477 prefix: &str,
478 delimiter: Option<&str>,
479 continuation_token: Option<String>,
480 start_after: Option<String>,
481 max_keys: Option<usize>,
482 ) -> Result<ListBucketResult, S3Error> {
483 let command = if self.list_objects_v2 {
484 Command::ListObjectsV2 {
485 prefix,
486 delimiter,
487 continuation_token,
488 start_after,
489 max_keys,
490 }
491 } else {
492 Command::ListObjects {
496 prefix,
497 delimiter,
498 marker: std::cmp::max(continuation_token, start_after),
499 max_keys,
500 }
501 };
502
503 let resp = self.send_request(command, "/").await?;
504 let bytes = resp.bytes().await?;
505 let list_bucket_result = quick_xml::de::from_reader(bytes.as_ref())?;
506 Ok(list_bucket_result)
507 }
508
509 pub async fn list(
511 &self,
512 prefix: &str,
513 delimiter: Option<&str>,
514 ) -> Result<Vec<ListBucketResult>, S3Error> {
515 let mut results = Vec::new();
516 let mut continuation_token = None;
517
518 loop {
519 let list_bucket_result = self
520 .list_page(prefix, delimiter, continuation_token, None, None)
521 .await?;
522 continuation_token = list_bucket_result.next_continuation_token.clone();
523 results.push(list_bucket_result);
524 if continuation_token.is_none() {
525 break;
526 }
527 }
528
529 Ok(results)
530 }
531
532 pub async fn copy_internal<F, T>(&self, from: F, to: T) -> Result<S3StatusCode, S3Error>
534 where
535 F: AsRef<str>,
536 T: AsRef<str>,
537 {
538 self.copy_internal_with(from, to, HeaderMap::new()).await
539 }
540
541 pub async fn copy_internal_with<F, T>(
563 &self,
564 from: F,
565 to: T,
566 extra_headers: HeaderMap,
567 ) -> Result<S3StatusCode, S3Error>
568 where
569 F: AsRef<str>,
570 T: AsRef<str>,
571 {
572 let fq_from = {
573 let from = from.as_ref();
574 let from = from.strip_prefix('/').unwrap_or(from);
575 format!("{}/{}", self.name, from)
576 };
577 Ok(self
578 .send_request(
579 Command::CopyObject {
580 from: &fq_from,
581 headers: extra_headers,
582 },
583 to.as_ref(),
584 )
585 .await?
586 .status())
587 }
588
589 pub async fn copy_internal_from<B, F, T>(
591 &self,
592 from_bucket: B,
593 from_object: F,
594 to: T,
595 ) -> Result<S3StatusCode, S3Error>
596 where
597 B: AsRef<str>,
598 F: AsRef<str>,
599 T: AsRef<str>,
600 {
601 let fq_from = {
602 let from_object = from_object.as_ref();
603 let from_object = from_object.strip_prefix('/').unwrap_or(from_object);
604 format!("{}/{}", from_bucket.as_ref(), from_object)
605 };
606 Ok(self
607 .send_request(
608 Command::CopyObject {
609 from: &fq_from,
610 headers: HeaderMap::new(),
611 },
612 to.as_ref(),
613 )
614 .await?
615 .status())
616 }
617
618 async fn abort_upload(&self, key: &str, upload_id: &str) -> Result<(), S3Error> {
619 let resp = self
620 .send_request(Command::AbortMultipartUpload { upload_id }, key)
621 .await?;
622
623 let status = resp.status();
624 if status.is_success() {
625 Ok(())
626 } else {
627 let utf8_content = String::from_utf8(resp.bytes().await?.to_vec())?;
628 Err(S3Error::HttpFailWithBody(status.as_u16(), utf8_content))
629 }
630 }
631
632 async fn send_request(
633 &self,
634 mut command: Command<'_>,
635 path: &str,
636 ) -> Result<reqwest::Response, S3Error> {
637 let url = self.build_url(&command, path)?;
638 let headers = self.build_headers(&mut command, &url).await?;
639
640 let builder = Self::get_client()
641 .request(command.http_method(), url)
642 .headers(headers);
643
644 let res = match command {
645 Command::PutObject { content, .. } => builder.body(content.to_vec()),
646 Command::PutObjectTagging { tags } => builder.body(tags.to_string()),
647 Command::UploadPart { content, .. } => builder.body(content.to_vec()),
648 Command::CompleteMultipartUpload { ref data, .. } => {
649 let body = data.to_string();
650 builder.body(body)
651 }
652 _ => builder.body(Vec::default()),
653 }
654 .send()
655 .await?;
656
657 if res.status().is_success() {
658 Ok(res)
659 } else {
660 Err(S3Error::HttpFailWithBody(
661 res.status().as_u16(),
662 res.text().await?,
663 ))
664 }
665 }
666
667 fn get_client<'a>() -> &'a reqwest::Client {
668 CLIENT.get_or_init(|| {
669 let mut builder = reqwest::Client::builder()
670 .brotli(true)
671 .connect_timeout(Duration::from_secs(10))
672 .tcp_keepalive(Duration::from_secs(30))
673 .pool_idle_timeout(Duration::from_secs(600));
674
675 #[cfg(any(
676 feature = "rustls-tls",
677 feature = "rustls-tls-no-provider",
678 feature = "rustls-tls-manual-roots",
679 feature = "rustls-tls-webpki-roots",
680 feature = "rustls-tls-native-roots"
681 ))]
682 {
683 builder = builder.use_rustls_tls();
684
685 if env::var("S3_DANGER_ALLOW_INSECURE").as_deref() == Ok("true") {
686 builder = builder.danger_accept_invalid_certs(true);
687 }
688 }
689
690 builder.build().unwrap()
691 })
692 }
693
694 async fn build_headers(
699 &self,
700 command: &mut Command<'_>,
701 url: &Url,
702 ) -> Result<HeaderMap, S3Error> {
703 let cmd_hash = command.sha256();
704 let now = OffsetDateTime::now_utc();
705
706 let mut headers = match command {
708 Command::PutObject { headers, .. }
709 | Command::InitiateMultipartUpload { headers, .. }
710 | Command::CopyObject { headers, .. } => std::mem::take(headers),
711 _ => HeaderMap::with_capacity(4),
712 };
713
714 let domain = self.host_domain();
716 if self.path_style {
717 headers.insert(HOST, HeaderValue::from_str(domain.as_str())?);
718 } else {
719 headers.insert(
720 HOST,
721 HeaderValue::try_from(format!("{}.{}", self.name, domain))?,
722 );
723 }
724
725 match command {
727 Command::CopyObject { from, .. } => {
728 headers.insert(
729 HeaderName::from_static("x-amz-copy-source"),
730 HeaderValue::from_str(from)?,
731 );
732 }
733 Command::ListObjects { .. } => {}
734 Command::ListObjectsV2 { .. } => {}
735 Command::GetObject => {}
736 Command::GetObjectTagging => {}
737 Command::GetBucketLocation => {}
738
739 Command::InitiateMultipartUpload { .. } => {
740 if !headers.contains_key(CONTENT_TYPE) {
741 headers.insert(
742 CONTENT_TYPE,
743 HeaderValue::from_str("application/octet-stream")?,
744 );
745 }
746 }
747 Command::CompleteMultipartUpload { .. } => {
748 headers.insert(CONTENT_TYPE, HeaderValue::from_str("application/xml")?);
749 }
750 Command::PutObject { multipart, .. } => {
751 if multipart.is_none() && !headers.contains_key(CONTENT_TYPE) {
756 headers.insert(
757 CONTENT_TYPE,
758 HeaderValue::from_str("application/octet-stream")?,
759 );
760 }
761 }
762
763 Command::DeleteObject => {}
766 Command::GetObjectRange { .. } => {}
767 Command::HeadObject { .. } => {}
768
769 _ => {
770 headers.insert(
771 CONTENT_LENGTH,
772 HeaderValue::try_from(command.content_length().to_string())?,
773 );
774 headers.insert(CONTENT_TYPE, HeaderValue::from_str("text/plain")?);
775 }
776 }
777
778 headers.insert(
780 HeaderName::from_static("x-amz-content-sha256"),
781 HeaderValue::from_str(&cmd_hash)?,
782 );
783 headers.insert(
784 HeaderName::from_static("x-amz-date"),
785 HeaderValue::try_from(now.format(LONG_DATE_TIME)?)?,
786 );
787
788 match command {
789 Command::PutObjectTagging { tags } => {
790 headers.insert(
791 HeaderName::from_static("content-md5"),
792 HeaderValue::try_from(md5_url_encode(tags.as_bytes()))?,
793 );
794 }
795 Command::PutObject { content, .. } => {
796 headers.insert(
797 HeaderName::from_static("content-md5"),
798 HeaderValue::try_from(md5_url_encode(content))?,
799 );
800 }
801 Command::UploadPart { content, .. } => {
802 headers.insert(
803 HeaderName::from_static("content-md5"),
804 HeaderValue::try_from(md5_url_encode(content))?,
805 );
806 }
807 Command::GetObject => {
808 headers.insert(ACCEPT, HeaderValue::from_static("application/octet-stream"));
809 }
810 Command::GetObjectRange { start, end } => {
811 headers.insert(ACCEPT, HeaderValue::from_static("application/octet-stream"));
812
813 let range = if let Some(end) = end {
814 format!("bytes={}-{}", start, end)
815 } else {
816 format!("bytes={}-", start)
817 };
818 headers.insert(RANGE, HeaderValue::try_from(range)?);
819 }
820 _ => {}
821 }
822
823 let canonical_request =
825 signature::canonical_request(&command.http_method(), url, &headers, &cmd_hash)?;
826 let string_to_sign =
827 signature::string_to_sign(&now, &self.region, canonical_request.as_bytes())?;
828 let signing_key =
829 signature::signing_key(&now, &self.credentials.access_key_secret, &self.region)?;
830 let mut hmac = Hmac::<Sha256>::new_from_slice(&signing_key)?;
831 hmac.update(string_to_sign.as_bytes());
832 let signature = hex::encode(hmac.finalize().into_bytes());
833 let signed_header = signature::signed_header_string(&headers);
834 let authorization = signature::authorization_header(
835 &self.credentials.access_key_id,
836 &now,
837 &self.region,
838 &signed_header,
839 &signature,
840 )?;
841 headers.insert(AUTHORIZATION, HeaderValue::try_from(authorization)?);
842
843 headers.insert(DATE, HeaderValue::try_from(now.format(&Rfc2822)?)?);
850
851 Ok(headers)
852 }
853
854 fn build_url(&self, command: &Command, path: &str) -> Result<Url, S3Error> {
855 let mut url = if self.path_style {
856 format!(
857 "{}://{}/{}",
858 self.host.scheme(),
859 self.host_domain(),
860 self.name,
861 )
862 } else {
863 format!(
864 "{}://{}.{}",
865 self.host.scheme(),
866 self.name,
867 self.host_domain(),
868 )
869 };
870
871 let path = if let Some(stripped) = path.strip_prefix('/') {
872 stripped
873 } else {
874 path
875 };
876
877 url.push('/');
878 url.push_str(&signature::uri_encode(path, false));
879
880 match command {
881 Command::InitiateMultipartUpload { .. } | Command::ListMultipartUploads { .. } => {
882 url.push_str("?uploads")
883 }
884 Command::AbortMultipartUpload { upload_id } => {
885 write!(url, "?uploadId={}", upload_id).expect("write! to succeed");
886 }
887 Command::CompleteMultipartUpload { upload_id, .. } => {
888 write!(url, "?uploadId={}", upload_id).expect("write! to succeed");
889 }
890 Command::PutObject {
891 multipart: Some(multipart),
892 ..
893 } => url.push_str(&multipart.query_string()),
894 _ => {}
895 }
896
897 let mut url = Url::parse(&url)?;
898
899 match command {
900 Command::ListObjectsV2 {
901 prefix,
902 delimiter,
903 continuation_token,
904 start_after,
905 max_keys,
906 } => {
907 let mut query_pairs = url.query_pairs_mut();
908 if let Some(d) = delimiter {
909 query_pairs.append_pair("delimiter", d);
910 }
911
912 query_pairs.append_pair("prefix", prefix);
913 query_pairs.append_pair("list-type", "2");
914 if let Some(token) = continuation_token {
915 query_pairs.append_pair("continuation-token", token);
916 }
917 if let Some(start_after) = start_after {
918 query_pairs.append_pair("start-after", start_after);
919 }
920 if let Some(max_keys) = max_keys {
921 query_pairs.append_pair("max-keys", &max_keys.to_string());
922 }
923 }
924
925 Command::ListObjects {
926 prefix,
927 delimiter,
928 marker,
929 max_keys,
930 } => {
931 let mut query_pairs = url.query_pairs_mut();
932 if let Some(d) = delimiter {
933 query_pairs.append_pair("delimiter", d);
934 }
935
936 query_pairs.append_pair("prefix", prefix);
937 if let Some(marker) = marker {
938 query_pairs.append_pair("marker", marker);
939 }
940 if let Some(max_keys) = max_keys {
941 query_pairs.append_pair("max-keys", &max_keys.to_string());
942 }
943 }
944
945 Command::ListMultipartUploads {
946 prefix,
947 delimiter,
948 key_marker,
949 max_uploads,
950 } => {
951 let mut query_pairs = url.query_pairs_mut();
952 delimiter.map(|d| query_pairs.append_pair("delimiter", d));
953 if let Some(prefix) = prefix {
954 query_pairs.append_pair("prefix", prefix);
955 }
956 if let Some(key_marker) = key_marker {
957 query_pairs.append_pair("key-marker", key_marker);
958 }
959 if let Some(max_uploads) = max_uploads {
960 query_pairs.append_pair("max-uploads", max_uploads.to_string().as_str());
961 }
962 }
963
964 Command::PutObjectTagging { .. }
965 | Command::GetObjectTagging
966 | Command::DeleteObjectTagging => {
967 url.query_pairs_mut().append_pair("tagging", "");
968 }
969
970 _ => {}
971 }
972
973 Ok(url)
974 }
975}
976
977#[cfg(test)]
978mod tests {
979 use super::*;
980 use pretty_assertions::assert_eq;
981 use tokio::fs;
982 use tracing_test::traced_test;
983
984 #[traced_test]
985 #[tokio::test]
986 async fn test_object_flow() -> Result<(), S3Error> {
987 dotenvy::dotenv().ok().unwrap();
988
989 let bucket = Bucket::try_from_env().expect("env vars to be set in .env");
990
991 let file_sizes = vec![
993 0,
994 1,
995 CHUNK_SIZE / 2,
996 CHUNK_SIZE - 1,
997 CHUNK_SIZE,
998 CHUNK_SIZE + 1,
999 ];
1000
1001 for file_size in file_sizes {
1002 println!("test_object_flow with {} bytes", file_size);
1003
1004 let _ = fs::create_dir_all("test_files").await;
1005 let file_name_input = format!("test_data_{}", file_size);
1006 let input_path = format!("test_files/{}", file_name_input);
1007 let file_name_output = format!("test_data_{}.out", file_size);
1008 let output_path = format!("test_files/{}", file_name_output);
1009
1010 let bytes = (0..file_size).into_iter().map(|_| 0u8).collect::<Vec<u8>>();
1012 fs::write(&input_path, &bytes).await?;
1013
1014 let res = bucket.put(&file_name_input, &bytes).await?;
1016 let status = res.status();
1017 let body = res.text().await?;
1018 println!("response body:\n{}", body);
1019 assert!(status.is_success());
1020
1021 tokio::time::sleep(Duration::from_secs(1)).await;
1023
1024 let res = bucket.get(&file_name_input).await?;
1026 assert!(res.status().is_success());
1027 let body = res.bytes().await?;
1028 assert_eq!(body.len(), file_size);
1029 fs::write(&output_path, body.as_ref()).await?;
1030
1031 let input_bytes = fs::read(input_path).await?;
1033 let output_bytes = fs::read(output_path).await?;
1034 assert_eq!(input_bytes.len(), file_size);
1035 assert_eq!(input_bytes.len(), output_bytes.len());
1036 assert_eq!(input_bytes, output_bytes);
1037
1038 let list = bucket.list(&bucket.name, None).await?;
1040 for entry in list.iter() {
1041 if entry.name == bucket.name {
1042 for object in entry.contents.iter() {
1043 if object.key == file_name_input {
1044 assert_eq!(object.size, file_size as u64);
1046 break;
1047 }
1048 }
1049 }
1050 }
1051
1052 let res = bucket.head(&file_name_input).await?;
1054 assert_eq!(res.content_length, Some(file_size as u64));
1055
1056 if file_size > CHUNK_SIZE / 2 {
1057 let end = CHUNK_SIZE / 2 + 1;
1059 let res = bucket
1060 .get_range(&file_name_input, 0, Some(end as u64))
1061 .await?;
1062 assert!(res.status().is_success());
1063 let body = res.bytes().await?;
1064 assert_eq!(body.len(), end as usize + 1);
1066 }
1067
1068 let res = bucket
1070 .copy_internal(&file_name_input, &file_name_output)
1071 .await?;
1072 assert!(res.is_success());
1073
1074 let res = bucket.get(&file_name_output).await?;
1076 assert!(res.status().is_success());
1077 let body = res.bytes().await?;
1078 assert_eq!(body.len(), file_size);
1079
1080 let res = bucket.delete(&file_name_input).await?;
1082 assert!(res.status().is_success());
1083 let res = bucket.delete(&file_name_output).await?;
1084 assert!(res.status().is_success());
1085
1086 let list = bucket.list(&bucket.name, None).await?;
1088 for entry in list.iter() {
1089 if entry.name == bucket.name {
1090 for object in entry.contents.iter() {
1091 if object.key == file_name_input {
1092 panic!("test file has not been deleted");
1093 }
1094 }
1095 }
1096 }
1097 }
1098
1099 Ok(())
1100 }
1101
1102 #[traced_test]
1103 #[tokio::test]
1104 async fn test_multipart() -> Result<(), S3Error> {
1105 use futures_util::stream::StreamExt;
1106 use std::os::unix::fs::MetadataExt;
1107 use tokio::io::AsyncWriteExt;
1108
1109 dotenvy::dotenv().ok().unwrap();
1110 let bucket = Bucket::try_from_env().expect("env vars to be set in .env");
1111
1112 let file_sizes = vec![
1114 CHUNK_SIZE - 1,
1115 CHUNK_SIZE,
1116 CHUNK_SIZE + 1,
1117 CHUNK_SIZE * 2,
1118 CHUNK_SIZE * 3,
1119 CHUNK_SIZE * 3 + 1,
1120 ];
1121
1122 for file_size in file_sizes {
1123 let _ = fs::create_dir_all("test_files").await;
1125 let file_name_input = format!("test_data_mp_{}", file_size);
1126 let input_path = format!("test_files/{}", file_name_input);
1127 let file_name_output = format!("test_data_mp_{}.out", file_size);
1128 let output_path = format!("test_files/{}", file_name_output);
1129
1130 let bytes = (0..file_size).into_iter().map(|_| 0u8).collect::<Vec<u8>>();
1131 fs::write(&input_path, &bytes).await?;
1132
1133 let mut reader_file = fs::File::open(&input_path).await?;
1135 let res = bucket
1136 .put_stream(&mut reader_file, file_name_input.clone())
1137 .await?;
1138 assert!(res.status_code < 300);
1139 assert_eq!(res.uploaded_bytes, file_size);
1140
1141 let mut file = fs::File::create(&output_path).await?;
1143
1144 let res = bucket.get(&file_name_input).await?;
1145 assert!(res.status().is_success());
1146
1147 let stream = res.bytes_stream();
1148 tokio::pin!(stream);
1149 while let Some(Ok(item)) = stream.next().await {
1150 file.write_all(item.as_ref()).await?;
1151 }
1152 file.sync_all().await?;
1154
1155 let f_in = fs::File::open(&input_path).await?;
1157 let f_out = fs::File::open(&output_path).await?;
1158 let meta_in = f_in.metadata().await.unwrap();
1159 let meta_out = f_out.metadata().await.unwrap();
1160 assert_eq!(meta_in.size(), meta_out.size());
1161 }
1162
1163 Ok(())
1164 }
1165}