s3_simple/
bucket.rs

1use crate::command::{Command, CompleteMultipartUploadData, Part};
2use crate::constants::LONG_DATE_TIME;
3use crate::credentials::Credentials;
4use crate::error::S3Error;
5use crate::types::Multipart;
6use crate::types::{
7    HeadObjectResult, InitiateMultipartUploadResponse, ListBucketResult, PutStreamResponse,
8};
9use crate::{md5_url_encode, signature, Region, S3Response, S3StatusCode};
10use hmac::Hmac;
11use http::header::{ACCEPT, AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE, DATE, HOST, RANGE};
12use http::{HeaderMap, HeaderName, HeaderValue};
13use reqwest::Response;
14use sha2::digest::Mac;
15use sha2::Sha256;
16use std::fmt::Write;
17use std::sync::OnceLock;
18use std::time::Duration;
19use std::{env, mem};
20use time::format_description::well_known::Rfc2822;
21use time::OffsetDateTime;
22use tokio::io::{AsyncRead, AsyncReadExt};
23use tracing::{debug, error};
24use url::Url;
25
26static CLIENT: OnceLock<reqwest::Client> = OnceLock::new();
27
28const CHUNK_SIZE: usize = 8 * 1024 * 1024; // 8 MiB, min for S3 is 5MiB
29
30#[derive(Debug)]
31pub struct BucketOptions {
32    pub path_style: bool,
33    pub list_objects_v2: bool,
34}
35
36impl Default for BucketOptions {
37    fn default() -> Self {
38        Self {
39            path_style: env::var("S3_PATH_STYLE")
40                .unwrap_or_else(|_| "false".to_string())
41                .parse::<bool>()
42                .expect("S3_PATH_STYLE cannot be parsed as bool"),
43            list_objects_v2: true,
44        }
45    }
46}
47
48#[derive(Debug, Clone)]
49pub struct Bucket {
50    pub host: Url,
51    pub name: String,
52    pub region: Region,
53    pub credentials: Credentials,
54    path_style: bool,
55    list_objects_v2: bool,
56}
57
58#[allow(dead_code)]
59#[allow(clippy::assigning_clones)] // false-positive warnings
60impl Bucket {
61    fn host_domain(&self) -> String {
62        match self.host.domain() {
63            None => {
64                // in this case, we have an IP as part of the domain
65                let host_str = self
66                    .host
67                    .host_str()
68                    .expect("host_str to exist when domain does not");
69                if let Some(port) = self.host.port() {
70                    format!("{}:{}", host_str, port,)
71                } else {
72                    host_str.to_string()
73                }
74            }
75            Some(domain) => {
76                if let Some(port) = self.host.port() {
77                    format!("{}:{}", domain, port)
78                } else {
79                    domain.to_string()
80                }
81            }
82        }
83    }
84
85    pub fn new(
86        host: Url,
87        name: String,
88        region: Region,
89        credentials: Credentials,
90        options: Option<BucketOptions>,
91    ) -> Result<Self, S3Error> {
92        let options = options.unwrap_or_default();
93        Ok(Self {
94            host,
95            name,
96            region,
97            credentials,
98            path_style: options.path_style,
99            list_objects_v2: options.list_objects_v2,
100        })
101    }
102
103    pub fn try_from_env() -> Result<Self, S3Error> {
104        let host_env = env::var("S3_URL")?;
105        let host = host_env.parse::<Url>()?;
106
107        let name = env::var("S3_BUCKET")?;
108        let region = Region::try_from_env()?;
109        let credentials = Credentials::try_from_env()?;
110        let options = BucketOptions::default();
111
112        Ok(Self {
113            host,
114            name,
115            region,
116            credentials,
117            path_style: options.path_style,
118            list_objects_v2: options.list_objects_v2,
119        })
120    }
121
122    /// HEAD information for an object
123    pub async fn head<S: AsRef<str>>(&self, path: S) -> Result<HeadObjectResult, S3Error> {
124        let res = self
125            .send_request(Command::HeadObject, path.as_ref())
126            .await?;
127        Ok(HeadObjectResult::from(res.headers()))
128    }
129
130    /// GET an object
131    pub async fn get<P>(&self, path: P) -> Result<S3Response, S3Error>
132    where
133        P: AsRef<str>,
134    {
135        self.send_request(Command::GetObject, path.as_ref()).await
136    }
137
138    pub async fn get_range<S: AsRef<str>>(
139        &self,
140        path: S,
141        start: u64,
142        end: Option<u64>,
143    ) -> Result<S3Response, S3Error> {
144        if let Some(end) = end {
145            if start >= end {
146                return Err(S3Error::Range("start must be < than end"));
147            }
148        }
149        self.send_request(Command::GetObjectRange { start, end }, path.as_ref())
150            .await
151    }
152
153    /// DELETE an object
154    pub async fn delete<S: AsRef<str>>(&self, path: S) -> Result<S3Response, S3Error> {
155        self.send_request(Command::DeleteObject, path.as_ref())
156            .await
157    }
158
159    /// PUT an object
160    pub async fn put<S: AsRef<str>>(&self, path: S, content: &[u8]) -> Result<S3Response, S3Error> {
161        self.put_with_content_type(path, content, "application/octet-stream")
162            .await
163    }
164
165    /// PUT an object with a specific content type
166    pub async fn put_with_content_type<S: AsRef<str>>(
167        &self,
168        path: S,
169        content: &[u8],
170        content_type: &str,
171    ) -> Result<S3Response, S3Error> {
172        self.send_request(
173            Command::PutObject {
174                content,
175                content_type,
176                multipart: None,
177            },
178            path.as_ref(),
179        )
180        .await
181    }
182
183    /// Streaming object upload from any reader that implements `AsyncRead`
184    pub async fn put_stream<R>(
185        &self,
186        reader: &mut R,
187        path: String,
188    ) -> Result<PutStreamResponse, S3Error>
189    where
190        R: AsyncRead + Unpin,
191    {
192        self.put_stream_with_content_type(reader, path, "application/octet-stream".to_string())
193            .await
194    }
195
196    async fn initiate_multipart_upload(
197        &self,
198        path: &str,
199        content_type: &str,
200    ) -> Result<InitiateMultipartUploadResponse, S3Error> {
201        let res = self
202            .send_request(Command::InitiateMultipartUpload { content_type }, path)
203            .await?;
204        Ok(quick_xml::de::from_str(&res.text().await?)?)
205    }
206
207    async fn multipart_request(
208        &self,
209        path: &str,
210        chunk: Vec<u8>,
211        part_number: u32,
212        upload_id: &str,
213        content_type: &str,
214    ) -> Result<Response, S3Error> {
215        self.send_request(
216            Command::PutObject {
217                // TODO switch to owned data would make sense here probably
218                content: &chunk,
219                multipart: Some(Multipart::new(part_number, upload_id)),
220                content_type,
221            },
222            path,
223        )
224        .await
225    }
226
227    async fn complete_multipart_upload(
228        &self,
229        path: &str,
230        upload_id: &str,
231        parts: Vec<Part>,
232    ) -> Result<Response, S3Error> {
233        let data = CompleteMultipartUploadData { parts };
234        self.send_request(Command::CompleteMultipartUpload { upload_id, data }, path)
235            .await
236    }
237
238    /// Streaming object upload from any reader that implements `AsyncRead`
239    #[tracing::instrument(level = "debug", skip_all, fields(path = path))]
240    pub async fn put_stream_with_content_type<R>(
241        &self,
242        reader: &mut R,
243        path: String,
244        content_type: String,
245    ) -> Result<PutStreamResponse, S3Error>
246    where
247        R: AsyncRead + Unpin,
248    {
249        // If the file is smaller CHUNK_SIZE, just do a regular upload,
250        // Otherwise, perform a multipart upload.
251        let mut first_chunk = Vec::with_capacity(CHUNK_SIZE);
252        let first_chunk_size = reader
253            .take(CHUNK_SIZE as u64)
254            .read_to_end(&mut first_chunk)
255            .await?;
256
257        debug!("first_chunk size: {}", first_chunk.len());
258        if first_chunk_size < CHUNK_SIZE {
259            debug!("first_chunk_size < CHUNK_SIZE -> doing normal PUT without stream");
260            let res = self
261                .put_with_content_type(&path, first_chunk.as_slice(), &content_type)
262                .await;
263
264            return match res {
265                Ok(res) => Ok(PutStreamResponse {
266                    status_code: res.status().as_u16(),
267                    uploaded_bytes: first_chunk_size,
268                }),
269                Err(err) => Err(err),
270            };
271        }
272
273        debug!("first_chunk_size > CHUNK_SIZE -> initiate streaming upload");
274
275        // At this point, the file exceeds the CHUNK_SIZE.
276        // This means we will upload at least 2 chunks.
277        // To optimize the performance, the writer will be spawned on a dedicated
278        // tokio top level tasks to make optimal use of multiple cores.
279        // The very little cloned data is worth it to get better throughput.
280        // A channel with 2-chunk buffer will be used for the communication to
281        // get optimal performance out of the slower in / out pipelines.
282        let (tx, rx) = flume::bounded(2);
283
284        // Writer task
285        let slf = self.clone();
286        let handle_writer = tokio::spawn(async move {
287            debug!("writer task has been started");
288
289            let msg = slf.initiate_multipart_upload(&path, &content_type).await?;
290            debug!("{:?}", msg);
291            let path = msg.key;
292            let upload_id = &msg.upload_id;
293
294            let mut part_number: u32 = 0;
295            let mut etags = Vec::new();
296
297            let mut total_size = 0;
298            loop {
299                let chunk = if part_number == 0 {
300                    // this memory swap avoids a clone of the first chunk
301                    let mut bytes = Vec::default();
302                    mem::swap(&mut first_chunk, &mut bytes);
303                    bytes
304                } else {
305                    match rx.recv_async().await {
306                        Ok(Some(chunk)) => chunk,
307                        Ok(None) => {
308                            debug!("no more parts available in reader - finishing upload");
309                            break;
310                        }
311                        Err(err) => {
312                            debug!("chunk reader channel has been closed: {}", err);
313                            break;
314                        }
315                    }
316                };
317                debug!("chunk size in loop {}: {}", part_number + 1, chunk.len());
318
319                total_size += chunk.len();
320
321                // chunk upload
322                part_number += 1;
323                let res = slf
324                    .multipart_request(&path, chunk, part_number, upload_id, &content_type)
325                    .await;
326
327                match res {
328                    Ok(res) => {
329                        let etag = res
330                            .headers()
331                            .get("etag")
332                            .expect("ETag in multipart response headers")
333                            .to_str()
334                            .expect("ETag to convert to str successfully");
335                        etags.push(etag.to_string());
336                    }
337                    Err(err) => {
338                        // if chunk upload failed - abort the upload
339                        slf.abort_upload(&path, upload_id).await?;
340                        return Err(err);
341                    }
342                }
343            }
344            debug!(
345                "multipart uploading finished after {} parts with total size of {} bytes",
346                part_number, total_size
347            );
348
349            // Finish the upload
350            let inner_data = etags
351                .into_iter()
352                .enumerate()
353                .map(|(i, etag)| Part {
354                    etag,
355                    part_number: i as u32 + 1,
356                })
357                .collect::<Vec<Part>>();
358            debug!("data for multipart finishing: {:?}", inner_data);
359            let res = slf
360                .complete_multipart_upload(&path, &msg.upload_id, inner_data)
361                .await;
362
363            match res {
364                Ok(res) => Ok(PutStreamResponse {
365                    status_code: res.status().as_u16(),
366                    uploaded_bytes: total_size,
367                }),
368                Err(err) => Err(err),
369            }
370        });
371
372        // The reader will run in this task for simplifying lifetimes
373        loop {
374            let mut buf = Vec::with_capacity(CHUNK_SIZE);
375            match reader.take(CHUNK_SIZE as u64).read_to_end(&mut buf).await {
376                Ok(size) => {
377                    if size == 0 {
378                        debug!("stream reader finished reading");
379                        if let Err(err) = tx.send_async(None).await {
380                            error!("sending the 'no more data' message in reader: {}", err);
381                        }
382                        break;
383                    }
384
385                    debug!("stream reader read {} bytes", size);
386                    if let Err(err) = tx.send_async(Some(buf)).await {
387                        error!(
388                            "Stream Writer has been closed before reader finished: {}",
389                            err
390                        );
391                        break;
392                    }
393                }
394                Err(err) => {
395                    error!("stream reader error: {}", err);
396                    break;
397                }
398            }
399        }
400
401        handle_writer.await?
402    }
403
404    async fn list_page(
405        &self,
406        prefix: &str,
407        delimiter: Option<&str>,
408        continuation_token: Option<String>,
409        start_after: Option<String>,
410        max_keys: Option<usize>,
411    ) -> Result<ListBucketResult, S3Error> {
412        let command = if self.list_objects_v2 {
413            Command::ListObjectsV2 {
414                prefix,
415                delimiter,
416                continuation_token,
417                start_after,
418                max_keys,
419            }
420        } else {
421            // In the v1 ListObjects request, there is only one "marker"
422            // field that serves as both the initial starting position,
423            // and as the continuation token.
424            Command::ListObjects {
425                prefix,
426                delimiter,
427                marker: std::cmp::max(continuation_token, start_after),
428                max_keys,
429            }
430        };
431
432        let resp = self.send_request(command, "/").await?;
433        let bytes = resp.bytes().await?;
434        let list_bucket_result = quick_xml::de::from_reader(bytes.as_ref())?;
435        Ok(list_bucket_result)
436    }
437
438    /// List bucket contents
439    pub async fn list(
440        &self,
441        prefix: &str,
442        delimiter: Option<&str>,
443    ) -> Result<Vec<ListBucketResult>, S3Error> {
444        let mut results = Vec::new();
445        let mut continuation_token = None;
446
447        loop {
448            let list_bucket_result = self
449                .list_page(prefix, delimiter, continuation_token, None, None)
450                .await?;
451            continuation_token = list_bucket_result.next_continuation_token.clone();
452            results.push(list_bucket_result);
453            if continuation_token.is_none() {
454                break;
455            }
456        }
457
458        Ok(results)
459    }
460
461    /// S3 internal copy an object from one place to another inside the same bucket
462    pub async fn copy_internal<F, T>(&self, from: F, to: T) -> Result<S3StatusCode, S3Error>
463    where
464        F: AsRef<str>,
465        T: AsRef<str>,
466    {
467        let fq_from = {
468            let from = from.as_ref();
469            let from = from.strip_prefix('/').unwrap_or(from);
470            format!("{}/{}", self.name, from)
471        };
472        Ok(self
473            .send_request(Command::CopyObject { from: &fq_from }, to.as_ref())
474            .await?
475            .status())
476    }
477
478    /// S3 internal copy an object from another bucket into "this" bucket
479    pub async fn copy_internal_from<B, F, T>(&self, from_bucket: B, from_object: F, to: T) -> Result<S3StatusCode, S3Error>
480    where
481        B: AsRef<str>,
482        F: AsRef<str>,
483        T: AsRef<str>,
484    {
485        let fq_from = {
486            let from_object = from_object.as_ref();
487            let from_object = from_object.strip_prefix('/').unwrap_or(from_object);
488            format!("{}/{}", from_bucket.as_ref(), from_object)
489        };
490        Ok(self
491            .send_request(Command::CopyObject { from: &fq_from }, to.as_ref())
492            .await?
493            .status())
494    }
495
496    async fn abort_upload(&self, key: &str, upload_id: &str) -> Result<(), S3Error> {
497        let resp = self
498            .send_request(Command::AbortMultipartUpload { upload_id }, key)
499            .await?;
500
501        let status = resp.status();
502        if status.is_success() {
503            Ok(())
504        } else {
505            let utf8_content = String::from_utf8(resp.bytes().await?.to_vec())?;
506            Err(S3Error::HttpFailWithBody(status.as_u16(), utf8_content))
507        }
508    }
509
510    async fn send_request(
511        &self,
512        command: Command<'_>,
513        path: &str,
514    ) -> Result<reqwest::Response, S3Error> {
515        let url = self.build_url(&command, path)?;
516        let headers = self.build_headers(&command, &url).await?;
517
518        let builder = Self::get_client()
519            .request(command.http_method(), url)
520            .headers(headers);
521
522        let res = match command {
523            Command::PutObject { content, .. } => builder.body(content.to_vec()),
524            Command::PutObjectTagging { tags } => builder.body(tags.to_string()),
525            Command::UploadPart { content, .. } => builder.body(content.to_vec()),
526            Command::CompleteMultipartUpload { ref data, .. } => {
527                let body = data.to_string();
528                builder.body(body)
529            }
530            _ => builder.body(Vec::default()),
531        }
532        .send()
533        .await?;
534
535        if res.status().is_success() {
536            Ok(res)
537        } else {
538            Err(S3Error::HttpFailWithBody(
539                res.status().as_u16(),
540                res.text().await?,
541            ))
542        }
543    }
544
545    fn get_client<'a>() -> &'a reqwest::Client {
546        CLIENT.get_or_init(|| {
547            let mut builder = reqwest::Client::builder()
548                .brotli(true)
549                .connect_timeout(Duration::from_secs(10))
550                .tcp_keepalive(Duration::from_secs(30))
551                .pool_idle_timeout(Duration::from_secs(600))
552                .use_rustls_tls();
553            if env::var("S3_DANGER_ALLOW_INSECURE").as_deref() == Ok("true") {
554                builder = builder.danger_accept_invalid_certs(true);
555            }
556            builder.build().unwrap()
557        })
558    }
559
560    async fn build_headers(&self, command: &Command<'_>, url: &Url) -> Result<HeaderMap, S3Error> {
561        let cmd_hash = command.sha256();
562        let now = OffsetDateTime::now_utc();
563
564        let mut headers = HeaderMap::with_capacity(4);
565
566        // host header
567        let domain = self.host_domain();
568        if self.path_style {
569            headers.insert(HOST, HeaderValue::from_str(domain.as_str())?);
570        } else {
571            headers.insert(
572                HOST,
573                HeaderValue::try_from(format!("{}.{}", self.name, domain))?,
574            );
575        }
576
577        // add command specific header
578        match command {
579            Command::CopyObject { from } => {
580                headers.insert(
581                    HeaderName::from_static("x-amz-copy-source"),
582                    HeaderValue::from_str(from)?,
583                );
584            }
585            Command::ListObjects { .. } => {}
586            Command::ListObjectsV2 { .. } => {}
587            Command::GetObject => {}
588            Command::GetObjectTagging => {}
589            Command::GetBucketLocation => {}
590
591            // Needed to make Garage work while Minio
592            // seems to ignore `content-length: 0` for these
593            Command::DeleteObject => {}
594            Command::GetObjectRange { .. } => {}
595            Command::HeadObject { .. } => {}
596
597            _ => {
598                headers.insert(
599                    CONTENT_LENGTH,
600                    HeaderValue::try_from(command.content_length().to_string())?,
601                );
602                headers.insert(CONTENT_TYPE, HeaderValue::from_str(command.content_type())?);
603            }
604        }
605
606        // hash and date
607        headers.insert(
608            HeaderName::from_static("x-amz-content-sha256"),
609            HeaderValue::from_str(&cmd_hash)?,
610        );
611        headers.insert(
612            HeaderName::from_static("x-amz-date"),
613            HeaderValue::try_from(now.format(LONG_DATE_TIME)?)?,
614        );
615
616        match command {
617            Command::PutObjectTagging { tags } => {
618                headers.insert(
619                    HeaderName::from_static("content-md5"),
620                    HeaderValue::try_from(md5_url_encode(tags.as_bytes()))?,
621                );
622            }
623            Command::PutObject { content, .. } => {
624                headers.insert(
625                    HeaderName::from_static("content-md5"),
626                    HeaderValue::try_from(md5_url_encode(content))?,
627                );
628            }
629            Command::UploadPart { content, .. } => {
630                headers.insert(
631                    HeaderName::from_static("content-md5"),
632                    HeaderValue::try_from(md5_url_encode(content))?,
633                );
634            }
635            Command::GetObject => {
636                headers.insert(ACCEPT, HeaderValue::from_static("application/octet-stream"));
637            }
638            Command::GetObjectRange { start, end } => {
639                headers.insert(ACCEPT, HeaderValue::from_static("application/octet-stream"));
640
641                let range = if let Some(end) = end {
642                    format!("bytes={}-{}", start, end)
643                } else {
644                    format!("bytes={}-", start)
645                };
646                headers.insert(RANGE, HeaderValue::try_from(range)?);
647            }
648            _ => {}
649        }
650
651        // sign all the above heavers with the secret
652        let canonical_request =
653            signature::canonical_request(&command.http_method(), url, &headers, &cmd_hash)?;
654        let string_to_sign =
655            signature::string_to_sign(&now, &self.region, canonical_request.as_bytes())?;
656        let signing_key =
657            signature::signing_key(&now, &self.credentials.access_key_secret, &self.region)?;
658        let mut hmac = Hmac::<Sha256>::new_from_slice(&signing_key)?;
659        hmac.update(string_to_sign.as_bytes());
660        let signature = hex::encode(hmac.finalize().into_bytes());
661        let signed_header = signature::signed_header_string(&headers);
662        let authorization = signature::authorization_header(
663            &self.credentials.access_key_id,
664            &now,
665            &self.region,
666            &signed_header,
667            &signature,
668        )?;
669        headers.insert(AUTHORIZATION, HeaderValue::try_from(authorization)?);
670
671        // The format of RFC2822 is somewhat malleable, so including it in
672        // signed headers can cause signature mismatches. We do include the
673        // X-Amz-Date header, so requests are still properly limited to a date
674        // range and can't be used again e.g. reply attacks. Adding this header
675        // after the generation of the Authorization header leaves it out of
676        // the signed headers.
677        headers.insert(DATE, HeaderValue::try_from(now.format(&Rfc2822)?)?);
678
679        Ok(headers)
680    }
681
682    fn build_url(&self, command: &Command, path: &str) -> Result<Url, S3Error> {
683        let mut url = if self.path_style {
684            format!(
685                "{}://{}/{}",
686                self.host.scheme(),
687                self.host_domain(),
688                self.name,
689            )
690        } else {
691            format!(
692                "{}://{}.{}",
693                self.host.scheme(),
694                self.name,
695                self.host_domain(),
696            )
697        };
698
699        let path = if let Some(stripped) = path.strip_prefix('/') {
700            stripped
701        } else {
702            path
703        };
704
705        url.push('/');
706        url.push_str(&signature::uri_encode(path, false));
707
708        match command {
709            Command::InitiateMultipartUpload { .. } | Command::ListMultipartUploads { .. } => {
710                url.push_str("?uploads")
711            }
712            Command::AbortMultipartUpload { upload_id } => {
713                write!(url, "?uploadId={}", upload_id).expect("write! to succeed");
714            }
715            Command::CompleteMultipartUpload { upload_id, .. } => {
716                write!(url, "?uploadId={}", upload_id).expect("write! to succeed");
717            }
718            Command::PutObject {
719                multipart: Some(multipart),
720                ..
721            } => url.push_str(&multipart.query_string()),
722            _ => {}
723        }
724
725        let mut url = Url::parse(&url)?;
726
727        match command {
728            Command::ListObjectsV2 {
729                prefix,
730                delimiter,
731                continuation_token,
732                start_after,
733                max_keys,
734            } => {
735                let mut query_pairs = url.query_pairs_mut();
736                if let Some(d) = delimiter {
737                    query_pairs.append_pair("delimiter", d);
738                }
739
740                query_pairs.append_pair("prefix", prefix);
741                query_pairs.append_pair("list-type", "2");
742                if let Some(token) = continuation_token {
743                    query_pairs.append_pair("continuation-token", token);
744                }
745                if let Some(start_after) = start_after {
746                    query_pairs.append_pair("start-after", start_after);
747                }
748                if let Some(max_keys) = max_keys {
749                    query_pairs.append_pair("max-keys", &max_keys.to_string());
750                }
751            }
752
753            Command::ListObjects {
754                prefix,
755                delimiter,
756                marker,
757                max_keys,
758            } => {
759                let mut query_pairs = url.query_pairs_mut();
760                if let Some(d) = delimiter {
761                    query_pairs.append_pair("delimiter", d);
762                }
763
764                query_pairs.append_pair("prefix", prefix);
765                if let Some(marker) = marker {
766                    query_pairs.append_pair("marker", marker);
767                }
768                if let Some(max_keys) = max_keys {
769                    query_pairs.append_pair("max-keys", &max_keys.to_string());
770                }
771            }
772
773            Command::ListMultipartUploads {
774                prefix,
775                delimiter,
776                key_marker,
777                max_uploads,
778            } => {
779                let mut query_pairs = url.query_pairs_mut();
780                delimiter.map(|d| query_pairs.append_pair("delimiter", d));
781                if let Some(prefix) = prefix {
782                    query_pairs.append_pair("prefix", prefix);
783                }
784                if let Some(key_marker) = key_marker {
785                    query_pairs.append_pair("key-marker", key_marker);
786                }
787                if let Some(max_uploads) = max_uploads {
788                    query_pairs.append_pair("max-uploads", max_uploads.to_string().as_str());
789                }
790            }
791
792            Command::PutObjectTagging { .. }
793            | Command::GetObjectTagging
794            | Command::DeleteObjectTagging => {
795                url.query_pairs_mut().append_pair("tagging", "");
796            }
797
798            _ => {}
799        }
800
801        Ok(url)
802    }
803}
804
805#[cfg(test)]
806mod tests {
807    use super::*;
808    use pretty_assertions::assert_eq;
809    use tokio::fs;
810    use tracing_test::traced_test;
811
812    #[traced_test]
813    #[tokio::test]
814    async fn test_object_flow() -> Result<(), S3Error> {
815        dotenvy::dotenv().ok().unwrap();
816
817        let bucket = Bucket::try_from_env().expect("env vars to be set in .env");
818
819        // we do not use rstest here since the tests start multiple conflicting runtimes
820        let file_sizes = vec![
821            0,
822            1,
823            CHUNK_SIZE / 2,
824            CHUNK_SIZE - 1,
825            CHUNK_SIZE,
826            CHUNK_SIZE + 1,
827        ];
828
829        for file_size in file_sizes {
830            println!("test_object_flow with {} bytes", file_size);
831
832            let _ = fs::create_dir_all("test_files").await;
833            let file_name_input = format!("test_data_{}", file_size);
834            let input_path = format!("test_files/{}", file_name_input);
835            let file_name_output = format!("test_data_{}.out", file_size);
836            let output_path = format!("test_files/{}", file_name_output);
837
838            // create and write some test data
839            let bytes = (0..file_size).into_iter().map(|_| 0u8).collect::<Vec<u8>>();
840            fs::write(&input_path, &bytes).await?;
841
842            // upload the file
843            let res = bucket.put(&file_name_input, &bytes).await?;
844            let status = res.status();
845            let body = res.text().await?;
846            println!("response body:\n{}", body);
847            assert!(status.is_success());
848
849            // give the s3 replication under the hood a second
850            tokio::time::sleep(Duration::from_secs(1)).await;
851
852            // GET the file back
853            let res = bucket.get(&file_name_input).await?;
854            assert!(res.status().is_success());
855            let body = res.bytes().await?;
856            assert_eq!(body.len(), file_size);
857            fs::write(&output_path, body.as_ref()).await?;
858
859            // make sure input and output are the same
860            let input_bytes = fs::read(input_path).await?;
861            let output_bytes = fs::read(output_path).await?;
862            assert_eq!(input_bytes.len(), file_size);
863            assert_eq!(input_bytes.len(), output_bytes.len());
864            assert_eq!(input_bytes, output_bytes);
865
866            // list bucket content and make sure it shows up
867            let list = bucket.list(&bucket.name, None).await?;
868            for entry in list.iter() {
869                if entry.name == bucket.name {
870                    for object in entry.contents.iter() {
871                        if object.key == file_name_input {
872                            // we found our dummy object, check the size
873                            assert_eq!(object.size, file_size as u64);
874                            break;
875                        }
876                    }
877                }
878            }
879
880            // validate that HEAD is working too
881            let res = bucket.head(&file_name_input).await?;
882            assert_eq!(res.content_length, Some(file_size as u64));
883
884            if file_size > CHUNK_SIZE / 2 {
885                // get only a part of the object back
886                let end = CHUNK_SIZE / 2 + 1;
887                let res = bucket
888                    .get_range(&file_name_input, 0, Some(end as u64))
889                    .await?;
890                assert!(res.status().is_success());
891                let body = res.bytes().await?;
892                // the GET range included the end -> 1 additional byte
893                assert_eq!(body.len(), end as usize + 1);
894            }
895
896            // test internal object copy
897            let res = bucket
898                .copy_internal(&file_name_input, &file_name_output)
899                .await?;
900            assert!(res.is_success());
901
902            // GET the new copied object
903            let res = bucket.get(&file_name_output).await?;
904            assert!(res.status().is_success());
905            let body = res.bytes().await?;
906            assert_eq!(body.len(), file_size);
907
908            // clean up and delete the test file
909            let res = bucket.delete(&file_name_input).await?;
910            assert!(res.status().is_success());
911            let res = bucket.delete(&file_name_output).await?;
912            assert!(res.status().is_success());
913
914            // list bucket content again and make sure its gone
915            let list = bucket.list(&bucket.name, None).await?;
916            for entry in list.iter() {
917                if entry.name == bucket.name {
918                    for object in entry.contents.iter() {
919                        if object.key == file_name_input {
920                            panic!("test file has not been deleted");
921                        }
922                    }
923                }
924            }
925        }
926
927        Ok(())
928    }
929
930    #[traced_test]
931    #[tokio::test]
932    async fn test_multipart() -> Result<(), S3Error> {
933        use futures_util::stream::StreamExt;
934        use std::os::unix::fs::MetadataExt;
935        use tokio::io::AsyncWriteExt;
936
937        dotenvy::dotenv().ok().unwrap();
938        let bucket = Bucket::try_from_env().expect("env vars to be set in .env");
939
940        // we do not use rstest here since the tests seem to interfere with each other on the IO layer
941        let file_sizes = vec![
942            CHUNK_SIZE - 1,
943            CHUNK_SIZE,
944            CHUNK_SIZE + 1,
945            CHUNK_SIZE * 2,
946            CHUNK_SIZE * 3,
947            CHUNK_SIZE * 3 + 1,
948        ];
949
950        for file_size in file_sizes {
951            // create and write some test data
952            let _ = fs::create_dir_all("test_files").await;
953            let file_name_input = format!("test_data_mp_{}", file_size);
954            let input_path = format!("test_files/{}", file_name_input);
955            let file_name_output = format!("test_data_mp_{}.out", file_size);
956            let output_path = format!("test_files/{}", file_name_output);
957
958            let bytes = (0..file_size).into_iter().map(|_| 0u8).collect::<Vec<u8>>();
959            fs::write(&input_path, &bytes).await?;
960
961            // streaming upload
962            let mut reader_file = fs::File::open(&input_path).await?;
963            let res = bucket
964                .put_stream(&mut reader_file, file_name_input.clone())
965                .await?;
966            assert!(res.status_code < 300);
967            assert_eq!(res.uploaded_bytes, file_size);
968
969            // streaming download
970            let mut file = fs::File::create(&output_path).await?;
971
972            let res = bucket.get(&file_name_input).await?;
973            assert!(res.status().is_success());
974
975            let stream = res.bytes_stream();
976            tokio::pin!(stream);
977            while let Some(Ok(item)) = stream.next().await {
978                file.write_all(item.as_ref()).await?;
979            }
980            // flush / sync all possibly left over data
981            file.sync_all().await?;
982
983            // make sure the files match
984            let f_in = fs::File::open(&input_path).await?;
985            let f_out = fs::File::open(&output_path).await?;
986            let meta_in = f_in.metadata().await.unwrap();
987            let meta_out = f_out.metadata().await.unwrap();
988            assert_eq!(meta_in.size(), meta_out.size());
989        }
990
991        Ok(())
992    }
993}