1use crate::command::{Command, CompleteMultipartUploadData, Part};
2use crate::constants::LONG_DATE_TIME;
3use crate::credentials::Credentials;
4use crate::error::S3Error;
5use crate::types::Multipart;
6use crate::types::{
7 HeadObjectResult, InitiateMultipartUploadResponse, ListBucketResult, PutStreamResponse,
8};
9use crate::{md5_url_encode, signature, Region, S3Response, S3StatusCode};
10use hmac::Hmac;
11use http::header::{ACCEPT, AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE, DATE, HOST, RANGE};
12use http::{HeaderMap, HeaderName, HeaderValue};
13use reqwest::Response;
14use sha2::digest::Mac;
15use sha2::Sha256;
16use std::fmt::Write;
17use std::sync::OnceLock;
18use std::time::Duration;
19use std::{env, mem};
20use time::format_description::well_known::Rfc2822;
21use time::OffsetDateTime;
22use tokio::io::{AsyncRead, AsyncReadExt};
23use tracing::{debug, error};
24use url::Url;
25
26static CLIENT: OnceLock<reqwest::Client> = OnceLock::new();
27
28const CHUNK_SIZE: usize = 8 * 1024 * 1024; #[derive(Debug)]
31pub struct BucketOptions {
32 pub path_style: bool,
33 pub list_objects_v2: bool,
34}
35
36impl Default for BucketOptions {
37 fn default() -> Self {
38 Self {
39 path_style: env::var("S3_PATH_STYLE")
40 .unwrap_or_else(|_| "false".to_string())
41 .parse::<bool>()
42 .expect("S3_PATH_STYLE cannot be parsed as bool"),
43 list_objects_v2: true,
44 }
45 }
46}
47
48#[derive(Debug, Clone)]
49pub struct Bucket {
50 pub host: Url,
51 pub name: String,
52 pub region: Region,
53 pub credentials: Credentials,
54 path_style: bool,
55 list_objects_v2: bool,
56}
57
58#[allow(dead_code)]
59#[allow(clippy::assigning_clones)] impl Bucket {
61 fn host_domain(&self) -> String {
62 match self.host.domain() {
63 None => {
64 let host_str = self
66 .host
67 .host_str()
68 .expect("host_str to exist when domain does not");
69 if let Some(port) = self.host.port() {
70 format!("{}:{}", host_str, port,)
71 } else {
72 host_str.to_string()
73 }
74 }
75 Some(domain) => {
76 if let Some(port) = self.host.port() {
77 format!("{}:{}", domain, port)
78 } else {
79 domain.to_string()
80 }
81 }
82 }
83 }
84
85 pub fn new(
86 host: Url,
87 name: String,
88 region: Region,
89 credentials: Credentials,
90 options: Option<BucketOptions>,
91 ) -> Result<Self, S3Error> {
92 let options = options.unwrap_or_default();
93 Ok(Self {
94 host,
95 name,
96 region,
97 credentials,
98 path_style: options.path_style,
99 list_objects_v2: options.list_objects_v2,
100 })
101 }
102
103 pub fn try_from_env() -> Result<Self, S3Error> {
104 let host_env = env::var("S3_URL")?;
105 let host = host_env.parse::<Url>()?;
106
107 let name = env::var("S3_BUCKET")?;
108 let region = Region::try_from_env()?;
109 let credentials = Credentials::try_from_env()?;
110 let options = BucketOptions::default();
111
112 Ok(Self {
113 host,
114 name,
115 region,
116 credentials,
117 path_style: options.path_style,
118 list_objects_v2: options.list_objects_v2,
119 })
120 }
121
122 pub async fn head<S: AsRef<str>>(&self, path: S) -> Result<HeadObjectResult, S3Error> {
124 let res = self
125 .send_request(Command::HeadObject, path.as_ref())
126 .await?;
127 Ok(HeadObjectResult::from(res.headers()))
128 }
129
130 pub async fn get<P>(&self, path: P) -> Result<S3Response, S3Error>
132 where
133 P: AsRef<str>,
134 {
135 self.send_request(Command::GetObject, path.as_ref()).await
136 }
137
138 pub async fn get_range<S: AsRef<str>>(
139 &self,
140 path: S,
141 start: u64,
142 end: Option<u64>,
143 ) -> Result<S3Response, S3Error> {
144 if let Some(end) = end {
145 if start >= end {
146 return Err(S3Error::Range("start must be < than end"));
147 }
148 }
149 self.send_request(Command::GetObjectRange { start, end }, path.as_ref())
150 .await
151 }
152
153 pub async fn delete<S: AsRef<str>>(&self, path: S) -> Result<S3Response, S3Error> {
155 self.send_request(Command::DeleteObject, path.as_ref())
156 .await
157 }
158
159 pub async fn put<S: AsRef<str>>(&self, path: S, content: &[u8]) -> Result<S3Response, S3Error> {
161 self.put_with_content_type(path, content, "application/octet-stream")
162 .await
163 }
164
165 pub async fn put_with_content_type<S: AsRef<str>>(
167 &self,
168 path: S,
169 content: &[u8],
170 content_type: &str,
171 ) -> Result<S3Response, S3Error> {
172 self.send_request(
173 Command::PutObject {
174 content,
175 content_type,
176 multipart: None,
177 },
178 path.as_ref(),
179 )
180 .await
181 }
182
183 pub async fn put_stream<R>(
185 &self,
186 reader: &mut R,
187 path: String,
188 ) -> Result<PutStreamResponse, S3Error>
189 where
190 R: AsyncRead + Unpin,
191 {
192 self.put_stream_with_content_type(reader, path, "application/octet-stream".to_string())
193 .await
194 }
195
196 async fn initiate_multipart_upload(
197 &self,
198 path: &str,
199 content_type: &str,
200 ) -> Result<InitiateMultipartUploadResponse, S3Error> {
201 let res = self
202 .send_request(Command::InitiateMultipartUpload { content_type }, path)
203 .await?;
204 Ok(quick_xml::de::from_str(&res.text().await?)?)
205 }
206
207 async fn multipart_request(
208 &self,
209 path: &str,
210 chunk: Vec<u8>,
211 part_number: u32,
212 upload_id: &str,
213 content_type: &str,
214 ) -> Result<Response, S3Error> {
215 self.send_request(
216 Command::PutObject {
217 content: &chunk,
219 multipart: Some(Multipart::new(part_number, upload_id)),
220 content_type,
221 },
222 path,
223 )
224 .await
225 }
226
227 async fn complete_multipart_upload(
228 &self,
229 path: &str,
230 upload_id: &str,
231 parts: Vec<Part>,
232 ) -> Result<Response, S3Error> {
233 let data = CompleteMultipartUploadData { parts };
234 self.send_request(Command::CompleteMultipartUpload { upload_id, data }, path)
235 .await
236 }
237
238 #[tracing::instrument(level = "debug", skip_all, fields(path = path))]
240 pub async fn put_stream_with_content_type<R>(
241 &self,
242 reader: &mut R,
243 path: String,
244 content_type: String,
245 ) -> Result<PutStreamResponse, S3Error>
246 where
247 R: AsyncRead + Unpin,
248 {
249 let mut first_chunk = Vec::with_capacity(CHUNK_SIZE);
252 let first_chunk_size = reader
253 .take(CHUNK_SIZE as u64)
254 .read_to_end(&mut first_chunk)
255 .await?;
256
257 debug!("first_chunk size: {}", first_chunk.len());
258 if first_chunk_size < CHUNK_SIZE {
259 debug!("first_chunk_size < CHUNK_SIZE -> doing normal PUT without stream");
260 let res = self
261 .put_with_content_type(&path, first_chunk.as_slice(), &content_type)
262 .await;
263
264 return match res {
265 Ok(res) => Ok(PutStreamResponse {
266 status_code: res.status().as_u16(),
267 uploaded_bytes: first_chunk_size,
268 }),
269 Err(err) => Err(err),
270 };
271 }
272
273 debug!("first_chunk_size > CHUNK_SIZE -> initiate streaming upload");
274
275 let (tx, rx) = flume::bounded(2);
283
284 let slf = self.clone();
286 let handle_writer = tokio::spawn(async move {
287 debug!("writer task has been started");
288
289 let msg = slf.initiate_multipart_upload(&path, &content_type).await?;
290 debug!("{:?}", msg);
291 let path = msg.key;
292 let upload_id = &msg.upload_id;
293
294 let mut part_number: u32 = 0;
295 let mut etags = Vec::new();
296
297 let mut total_size = 0;
298 loop {
299 let chunk = if part_number == 0 {
300 let mut bytes = Vec::default();
302 mem::swap(&mut first_chunk, &mut bytes);
303 bytes
304 } else {
305 match rx.recv_async().await {
306 Ok(Some(chunk)) => chunk,
307 Ok(None) => {
308 debug!("no more parts available in reader - finishing upload");
309 break;
310 }
311 Err(err) => {
312 debug!("chunk reader channel has been closed: {}", err);
313 break;
314 }
315 }
316 };
317 debug!("chunk size in loop {}: {}", part_number + 1, chunk.len());
318
319 total_size += chunk.len();
320
321 part_number += 1;
323 let res = slf
324 .multipart_request(&path, chunk, part_number, upload_id, &content_type)
325 .await;
326
327 match res {
328 Ok(res) => {
329 let etag = res
330 .headers()
331 .get("etag")
332 .expect("ETag in multipart response headers")
333 .to_str()
334 .expect("ETag to convert to str successfully");
335 etags.push(etag.to_string());
336 }
337 Err(err) => {
338 slf.abort_upload(&path, upload_id).await?;
340 return Err(err);
341 }
342 }
343 }
344 debug!(
345 "multipart uploading finished after {} parts with total size of {} bytes",
346 part_number, total_size
347 );
348
349 let inner_data = etags
351 .into_iter()
352 .enumerate()
353 .map(|(i, etag)| Part {
354 etag,
355 part_number: i as u32 + 1,
356 })
357 .collect::<Vec<Part>>();
358 debug!("data for multipart finishing: {:?}", inner_data);
359 let res = slf
360 .complete_multipart_upload(&path, &msg.upload_id, inner_data)
361 .await;
362
363 match res {
364 Ok(res) => Ok(PutStreamResponse {
365 status_code: res.status().as_u16(),
366 uploaded_bytes: total_size,
367 }),
368 Err(err) => Err(err),
369 }
370 });
371
372 loop {
374 let mut buf = Vec::with_capacity(CHUNK_SIZE);
375 match reader.take(CHUNK_SIZE as u64).read_to_end(&mut buf).await {
376 Ok(size) => {
377 if size == 0 {
378 debug!("stream reader finished reading");
379 if let Err(err) = tx.send_async(None).await {
380 error!("sending the 'no more data' message in reader: {}", err);
381 }
382 break;
383 }
384
385 debug!("stream reader read {} bytes", size);
386 if let Err(err) = tx.send_async(Some(buf)).await {
387 error!(
388 "Stream Writer has been closed before reader finished: {}",
389 err
390 );
391 break;
392 }
393 }
394 Err(err) => {
395 error!("stream reader error: {}", err);
396 break;
397 }
398 }
399 }
400
401 handle_writer.await?
402 }
403
404 async fn list_page(
405 &self,
406 prefix: &str,
407 delimiter: Option<&str>,
408 continuation_token: Option<String>,
409 start_after: Option<String>,
410 max_keys: Option<usize>,
411 ) -> Result<ListBucketResult, S3Error> {
412 let command = if self.list_objects_v2 {
413 Command::ListObjectsV2 {
414 prefix,
415 delimiter,
416 continuation_token,
417 start_after,
418 max_keys,
419 }
420 } else {
421 Command::ListObjects {
425 prefix,
426 delimiter,
427 marker: std::cmp::max(continuation_token, start_after),
428 max_keys,
429 }
430 };
431
432 let resp = self.send_request(command, "/").await?;
433 let bytes = resp.bytes().await?;
434 let list_bucket_result = quick_xml::de::from_reader(bytes.as_ref())?;
435 Ok(list_bucket_result)
436 }
437
438 pub async fn list(
440 &self,
441 prefix: &str,
442 delimiter: Option<&str>,
443 ) -> Result<Vec<ListBucketResult>, S3Error> {
444 let mut results = Vec::new();
445 let mut continuation_token = None;
446
447 loop {
448 let list_bucket_result = self
449 .list_page(prefix, delimiter, continuation_token, None, None)
450 .await?;
451 continuation_token = list_bucket_result.next_continuation_token.clone();
452 results.push(list_bucket_result);
453 if continuation_token.is_none() {
454 break;
455 }
456 }
457
458 Ok(results)
459 }
460
461 pub async fn copy_internal<F, T>(&self, from: F, to: T) -> Result<S3StatusCode, S3Error>
463 where
464 F: AsRef<str>,
465 T: AsRef<str>,
466 {
467 let fq_from = {
468 let from = from.as_ref();
469 let from = from.strip_prefix('/').unwrap_or(from);
470 format!("{}/{}", self.name, from)
471 };
472 Ok(self
473 .send_request(Command::CopyObject { from: &fq_from }, to.as_ref())
474 .await?
475 .status())
476 }
477
478 pub async fn copy_internal_from<B, F, T>(&self, from_bucket: B, from_object: F, to: T) -> Result<S3StatusCode, S3Error>
480 where
481 B: AsRef<str>,
482 F: AsRef<str>,
483 T: AsRef<str>,
484 {
485 let fq_from = {
486 let from_object = from_object.as_ref();
487 let from_object = from_object.strip_prefix('/').unwrap_or(from_object);
488 format!("{}/{}", from_bucket.as_ref(), from_object)
489 };
490 Ok(self
491 .send_request(Command::CopyObject { from: &fq_from }, to.as_ref())
492 .await?
493 .status())
494 }
495
496 async fn abort_upload(&self, key: &str, upload_id: &str) -> Result<(), S3Error> {
497 let resp = self
498 .send_request(Command::AbortMultipartUpload { upload_id }, key)
499 .await?;
500
501 let status = resp.status();
502 if status.is_success() {
503 Ok(())
504 } else {
505 let utf8_content = String::from_utf8(resp.bytes().await?.to_vec())?;
506 Err(S3Error::HttpFailWithBody(status.as_u16(), utf8_content))
507 }
508 }
509
510 async fn send_request(
511 &self,
512 command: Command<'_>,
513 path: &str,
514 ) -> Result<reqwest::Response, S3Error> {
515 let url = self.build_url(&command, path)?;
516 let headers = self.build_headers(&command, &url).await?;
517
518 let builder = Self::get_client()
519 .request(command.http_method(), url)
520 .headers(headers);
521
522 let res = match command {
523 Command::PutObject { content, .. } => builder.body(content.to_vec()),
524 Command::PutObjectTagging { tags } => builder.body(tags.to_string()),
525 Command::UploadPart { content, .. } => builder.body(content.to_vec()),
526 Command::CompleteMultipartUpload { ref data, .. } => {
527 let body = data.to_string();
528 builder.body(body)
529 }
530 _ => builder.body(Vec::default()),
531 }
532 .send()
533 .await?;
534
535 if res.status().is_success() {
536 Ok(res)
537 } else {
538 Err(S3Error::HttpFailWithBody(
539 res.status().as_u16(),
540 res.text().await?,
541 ))
542 }
543 }
544
545 fn get_client<'a>() -> &'a reqwest::Client {
546 CLIENT.get_or_init(|| {
547 let mut builder = reqwest::Client::builder()
548 .brotli(true)
549 .connect_timeout(Duration::from_secs(10))
550 .tcp_keepalive(Duration::from_secs(30))
551 .pool_idle_timeout(Duration::from_secs(600))
552 .use_rustls_tls();
553 if env::var("S3_DANGER_ALLOW_INSECURE").as_deref() == Ok("true") {
554 builder = builder.danger_accept_invalid_certs(true);
555 }
556 builder.build().unwrap()
557 })
558 }
559
560 async fn build_headers(&self, command: &Command<'_>, url: &Url) -> Result<HeaderMap, S3Error> {
561 let cmd_hash = command.sha256();
562 let now = OffsetDateTime::now_utc();
563
564 let mut headers = HeaderMap::with_capacity(4);
565
566 let domain = self.host_domain();
568 if self.path_style {
569 headers.insert(HOST, HeaderValue::from_str(domain.as_str())?);
570 } else {
571 headers.insert(
572 HOST,
573 HeaderValue::try_from(format!("{}.{}", self.name, domain))?,
574 );
575 }
576
577 match command {
579 Command::CopyObject { from } => {
580 headers.insert(
581 HeaderName::from_static("x-amz-copy-source"),
582 HeaderValue::from_str(from)?,
583 );
584 }
585 Command::ListObjects { .. } => {}
586 Command::ListObjectsV2 { .. } => {}
587 Command::GetObject => {}
588 Command::GetObjectTagging => {}
589 Command::GetBucketLocation => {}
590
591 Command::DeleteObject => {}
594 Command::GetObjectRange { .. } => {}
595 Command::HeadObject { .. } => {}
596
597 _ => {
598 headers.insert(
599 CONTENT_LENGTH,
600 HeaderValue::try_from(command.content_length().to_string())?,
601 );
602 headers.insert(CONTENT_TYPE, HeaderValue::from_str(command.content_type())?);
603 }
604 }
605
606 headers.insert(
608 HeaderName::from_static("x-amz-content-sha256"),
609 HeaderValue::from_str(&cmd_hash)?,
610 );
611 headers.insert(
612 HeaderName::from_static("x-amz-date"),
613 HeaderValue::try_from(now.format(LONG_DATE_TIME)?)?,
614 );
615
616 match command {
617 Command::PutObjectTagging { tags } => {
618 headers.insert(
619 HeaderName::from_static("content-md5"),
620 HeaderValue::try_from(md5_url_encode(tags.as_bytes()))?,
621 );
622 }
623 Command::PutObject { content, .. } => {
624 headers.insert(
625 HeaderName::from_static("content-md5"),
626 HeaderValue::try_from(md5_url_encode(content))?,
627 );
628 }
629 Command::UploadPart { content, .. } => {
630 headers.insert(
631 HeaderName::from_static("content-md5"),
632 HeaderValue::try_from(md5_url_encode(content))?,
633 );
634 }
635 Command::GetObject => {
636 headers.insert(ACCEPT, HeaderValue::from_static("application/octet-stream"));
637 }
638 Command::GetObjectRange { start, end } => {
639 headers.insert(ACCEPT, HeaderValue::from_static("application/octet-stream"));
640
641 let range = if let Some(end) = end {
642 format!("bytes={}-{}", start, end)
643 } else {
644 format!("bytes={}-", start)
645 };
646 headers.insert(RANGE, HeaderValue::try_from(range)?);
647 }
648 _ => {}
649 }
650
651 let canonical_request =
653 signature::canonical_request(&command.http_method(), url, &headers, &cmd_hash)?;
654 let string_to_sign =
655 signature::string_to_sign(&now, &self.region, canonical_request.as_bytes())?;
656 let signing_key =
657 signature::signing_key(&now, &self.credentials.access_key_secret, &self.region)?;
658 let mut hmac = Hmac::<Sha256>::new_from_slice(&signing_key)?;
659 hmac.update(string_to_sign.as_bytes());
660 let signature = hex::encode(hmac.finalize().into_bytes());
661 let signed_header = signature::signed_header_string(&headers);
662 let authorization = signature::authorization_header(
663 &self.credentials.access_key_id,
664 &now,
665 &self.region,
666 &signed_header,
667 &signature,
668 )?;
669 headers.insert(AUTHORIZATION, HeaderValue::try_from(authorization)?);
670
671 headers.insert(DATE, HeaderValue::try_from(now.format(&Rfc2822)?)?);
678
679 Ok(headers)
680 }
681
682 fn build_url(&self, command: &Command, path: &str) -> Result<Url, S3Error> {
683 let mut url = if self.path_style {
684 format!(
685 "{}://{}/{}",
686 self.host.scheme(),
687 self.host_domain(),
688 self.name,
689 )
690 } else {
691 format!(
692 "{}://{}.{}",
693 self.host.scheme(),
694 self.name,
695 self.host_domain(),
696 )
697 };
698
699 let path = if let Some(stripped) = path.strip_prefix('/') {
700 stripped
701 } else {
702 path
703 };
704
705 url.push('/');
706 url.push_str(&signature::uri_encode(path, false));
707
708 match command {
709 Command::InitiateMultipartUpload { .. } | Command::ListMultipartUploads { .. } => {
710 url.push_str("?uploads")
711 }
712 Command::AbortMultipartUpload { upload_id } => {
713 write!(url, "?uploadId={}", upload_id).expect("write! to succeed");
714 }
715 Command::CompleteMultipartUpload { upload_id, .. } => {
716 write!(url, "?uploadId={}", upload_id).expect("write! to succeed");
717 }
718 Command::PutObject {
719 multipart: Some(multipart),
720 ..
721 } => url.push_str(&multipart.query_string()),
722 _ => {}
723 }
724
725 let mut url = Url::parse(&url)?;
726
727 match command {
728 Command::ListObjectsV2 {
729 prefix,
730 delimiter,
731 continuation_token,
732 start_after,
733 max_keys,
734 } => {
735 let mut query_pairs = url.query_pairs_mut();
736 if let Some(d) = delimiter {
737 query_pairs.append_pair("delimiter", d);
738 }
739
740 query_pairs.append_pair("prefix", prefix);
741 query_pairs.append_pair("list-type", "2");
742 if let Some(token) = continuation_token {
743 query_pairs.append_pair("continuation-token", token);
744 }
745 if let Some(start_after) = start_after {
746 query_pairs.append_pair("start-after", start_after);
747 }
748 if let Some(max_keys) = max_keys {
749 query_pairs.append_pair("max-keys", &max_keys.to_string());
750 }
751 }
752
753 Command::ListObjects {
754 prefix,
755 delimiter,
756 marker,
757 max_keys,
758 } => {
759 let mut query_pairs = url.query_pairs_mut();
760 if let Some(d) = delimiter {
761 query_pairs.append_pair("delimiter", d);
762 }
763
764 query_pairs.append_pair("prefix", prefix);
765 if let Some(marker) = marker {
766 query_pairs.append_pair("marker", marker);
767 }
768 if let Some(max_keys) = max_keys {
769 query_pairs.append_pair("max-keys", &max_keys.to_string());
770 }
771 }
772
773 Command::ListMultipartUploads {
774 prefix,
775 delimiter,
776 key_marker,
777 max_uploads,
778 } => {
779 let mut query_pairs = url.query_pairs_mut();
780 delimiter.map(|d| query_pairs.append_pair("delimiter", d));
781 if let Some(prefix) = prefix {
782 query_pairs.append_pair("prefix", prefix);
783 }
784 if let Some(key_marker) = key_marker {
785 query_pairs.append_pair("key-marker", key_marker);
786 }
787 if let Some(max_uploads) = max_uploads {
788 query_pairs.append_pair("max-uploads", max_uploads.to_string().as_str());
789 }
790 }
791
792 Command::PutObjectTagging { .. }
793 | Command::GetObjectTagging
794 | Command::DeleteObjectTagging => {
795 url.query_pairs_mut().append_pair("tagging", "");
796 }
797
798 _ => {}
799 }
800
801 Ok(url)
802 }
803}
804
805#[cfg(test)]
806mod tests {
807 use super::*;
808 use pretty_assertions::assert_eq;
809 use tokio::fs;
810 use tracing_test::traced_test;
811
812 #[traced_test]
813 #[tokio::test]
814 async fn test_object_flow() -> Result<(), S3Error> {
815 dotenvy::dotenv().ok().unwrap();
816
817 let bucket = Bucket::try_from_env().expect("env vars to be set in .env");
818
819 let file_sizes = vec![
821 0,
822 1,
823 CHUNK_SIZE / 2,
824 CHUNK_SIZE - 1,
825 CHUNK_SIZE,
826 CHUNK_SIZE + 1,
827 ];
828
829 for file_size in file_sizes {
830 println!("test_object_flow with {} bytes", file_size);
831
832 let _ = fs::create_dir_all("test_files").await;
833 let file_name_input = format!("test_data_{}", file_size);
834 let input_path = format!("test_files/{}", file_name_input);
835 let file_name_output = format!("test_data_{}.out", file_size);
836 let output_path = format!("test_files/{}", file_name_output);
837
838 let bytes = (0..file_size).into_iter().map(|_| 0u8).collect::<Vec<u8>>();
840 fs::write(&input_path, &bytes).await?;
841
842 let res = bucket.put(&file_name_input, &bytes).await?;
844 let status = res.status();
845 let body = res.text().await?;
846 println!("response body:\n{}", body);
847 assert!(status.is_success());
848
849 tokio::time::sleep(Duration::from_secs(1)).await;
851
852 let res = bucket.get(&file_name_input).await?;
854 assert!(res.status().is_success());
855 let body = res.bytes().await?;
856 assert_eq!(body.len(), file_size);
857 fs::write(&output_path, body.as_ref()).await?;
858
859 let input_bytes = fs::read(input_path).await?;
861 let output_bytes = fs::read(output_path).await?;
862 assert_eq!(input_bytes.len(), file_size);
863 assert_eq!(input_bytes.len(), output_bytes.len());
864 assert_eq!(input_bytes, output_bytes);
865
866 let list = bucket.list(&bucket.name, None).await?;
868 for entry in list.iter() {
869 if entry.name == bucket.name {
870 for object in entry.contents.iter() {
871 if object.key == file_name_input {
872 assert_eq!(object.size, file_size as u64);
874 break;
875 }
876 }
877 }
878 }
879
880 let res = bucket.head(&file_name_input).await?;
882 assert_eq!(res.content_length, Some(file_size as u64));
883
884 if file_size > CHUNK_SIZE / 2 {
885 let end = CHUNK_SIZE / 2 + 1;
887 let res = bucket
888 .get_range(&file_name_input, 0, Some(end as u64))
889 .await?;
890 assert!(res.status().is_success());
891 let body = res.bytes().await?;
892 assert_eq!(body.len(), end as usize + 1);
894 }
895
896 let res = bucket
898 .copy_internal(&file_name_input, &file_name_output)
899 .await?;
900 assert!(res.is_success());
901
902 let res = bucket.get(&file_name_output).await?;
904 assert!(res.status().is_success());
905 let body = res.bytes().await?;
906 assert_eq!(body.len(), file_size);
907
908 let res = bucket.delete(&file_name_input).await?;
910 assert!(res.status().is_success());
911 let res = bucket.delete(&file_name_output).await?;
912 assert!(res.status().is_success());
913
914 let list = bucket.list(&bucket.name, None).await?;
916 for entry in list.iter() {
917 if entry.name == bucket.name {
918 for object in entry.contents.iter() {
919 if object.key == file_name_input {
920 panic!("test file has not been deleted");
921 }
922 }
923 }
924 }
925 }
926
927 Ok(())
928 }
929
930 #[traced_test]
931 #[tokio::test]
932 async fn test_multipart() -> Result<(), S3Error> {
933 use futures_util::stream::StreamExt;
934 use std::os::unix::fs::MetadataExt;
935 use tokio::io::AsyncWriteExt;
936
937 dotenvy::dotenv().ok().unwrap();
938 let bucket = Bucket::try_from_env().expect("env vars to be set in .env");
939
940 let file_sizes = vec![
942 CHUNK_SIZE - 1,
943 CHUNK_SIZE,
944 CHUNK_SIZE + 1,
945 CHUNK_SIZE * 2,
946 CHUNK_SIZE * 3,
947 CHUNK_SIZE * 3 + 1,
948 ];
949
950 for file_size in file_sizes {
951 let _ = fs::create_dir_all("test_files").await;
953 let file_name_input = format!("test_data_mp_{}", file_size);
954 let input_path = format!("test_files/{}", file_name_input);
955 let file_name_output = format!("test_data_mp_{}.out", file_size);
956 let output_path = format!("test_files/{}", file_name_output);
957
958 let bytes = (0..file_size).into_iter().map(|_| 0u8).collect::<Vec<u8>>();
959 fs::write(&input_path, &bytes).await?;
960
961 let mut reader_file = fs::File::open(&input_path).await?;
963 let res = bucket
964 .put_stream(&mut reader_file, file_name_input.clone())
965 .await?;
966 assert!(res.status_code < 300);
967 assert_eq!(res.uploaded_bytes, file_size);
968
969 let mut file = fs::File::create(&output_path).await?;
971
972 let res = bucket.get(&file_name_input).await?;
973 assert!(res.status().is_success());
974
975 let stream = res.bytes_stream();
976 tokio::pin!(stream);
977 while let Some(Ok(item)) = stream.next().await {
978 file.write_all(item.as_ref()).await?;
979 }
980 file.sync_all().await?;
982
983 let f_in = fs::File::open(&input_path).await?;
985 let f_out = fs::File::open(&output_path).await?;
986 let meta_in = f_in.metadata().await.unwrap();
987 let meta_out = f_out.metadata().await.unwrap();
988 assert_eq!(meta_in.size(), meta_out.size());
989 }
990
991 Ok(())
992 }
993}