s3_simple/
bucket.rs

1use crate::command::{Command, CompleteMultipartUploadData, Part};
2use crate::constants::LONG_DATE_TIME;
3use crate::credentials::Credentials;
4use crate::error::S3Error;
5use crate::types::Multipart;
6use crate::types::{
7    HeadObjectResult, InitiateMultipartUploadResponse, ListBucketResult, PutStreamResponse,
8};
9use crate::{md5_url_encode, signature, Region, S3Response, S3StatusCode};
10use hmac::Hmac;
11use http::header::{ACCEPT, AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE, DATE, HOST, RANGE};
12use http::{HeaderMap, HeaderName, HeaderValue};
13use reqwest::Response;
14use sha2::digest::Mac;
15use sha2::Sha256;
16use std::fmt::Write;
17use std::sync::OnceLock;
18use std::time::Duration;
19use std::{env, mem};
20use time::format_description::well_known::Rfc2822;
21use time::OffsetDateTime;
22use tokio::io::{AsyncRead, AsyncReadExt};
23use tracing::{debug, error, warn};
24use url::Url;
25
26static CLIENT: OnceLock<reqwest::Client> = OnceLock::new();
27
28const CHUNK_SIZE: usize = 8 * 1024 * 1024; // 8 MiB, min for S3 is 5MiB
29
30#[derive(Debug)]
31pub struct BucketOptions {
32    pub path_style: bool,
33    pub list_objects_v2: bool,
34}
35
36impl Default for BucketOptions {
37    fn default() -> Self {
38        Self {
39            path_style: env::var("S3_PATH_STYLE")
40                .unwrap_or_else(|_| "false".to_string())
41                .parse::<bool>()
42                .expect("S3_PATH_STYLE cannot be parsed as bool"),
43            list_objects_v2: true,
44        }
45    }
46}
47
48#[derive(Debug, Clone)]
49pub struct Bucket {
50    pub host: Url,
51    pub name: String,
52    pub region: Region,
53    pub credentials: Credentials,
54    path_style: bool,
55    list_objects_v2: bool,
56}
57
58#[allow(dead_code)]
59#[allow(clippy::assigning_clones)] // false-positive warnings
60impl Bucket {
61    fn host_domain(&self) -> String {
62        match self.host.domain() {
63            None => {
64                // in this case, we have an IP as part of the domain
65                let host_str = self
66                    .host
67                    .host_str()
68                    .expect("host_str to exist when domain does not");
69                if let Some(port) = self.host.port() {
70                    format!("{}:{}", host_str, port,)
71                } else {
72                    host_str.to_string()
73                }
74            }
75            Some(domain) => {
76                if let Some(port) = self.host.port() {
77                    format!("{}:{}", domain, port)
78                } else {
79                    domain.to_string()
80                }
81            }
82        }
83    }
84
85    pub fn new(
86        host: Url,
87        name: String,
88        region: Region,
89        credentials: Credentials,
90        options: Option<BucketOptions>,
91    ) -> Result<Self, S3Error> {
92        let options = options.unwrap_or_default();
93        Ok(Self {
94            host,
95            name,
96            region,
97            credentials,
98            path_style: options.path_style,
99            list_objects_v2: options.list_objects_v2,
100        })
101    }
102
103    pub fn try_from_env() -> Result<Self, S3Error> {
104        let host_env = env::var("S3_URL")?;
105        let host = host_env.parse::<Url>()?;
106
107        let name = env::var("S3_BUCKET")?;
108        let region = Region::try_from_env()?;
109        let credentials = Credentials::try_from_env()?;
110        let options = BucketOptions::default();
111
112        Ok(Self {
113            host,
114            name,
115            region,
116            credentials,
117            path_style: options.path_style,
118            list_objects_v2: options.list_objects_v2,
119        })
120    }
121
122    /// HEAD information for an object
123    pub async fn head<S: AsRef<str>>(&self, path: S) -> Result<HeadObjectResult, S3Error> {
124        let res = self
125            .send_request(Command::HeadObject, path.as_ref())
126            .await?;
127        Ok(HeadObjectResult::from(res.headers()))
128    }
129
130    /// GET an object
131    pub async fn get<P>(&self, path: P) -> Result<S3Response, S3Error>
132    where
133        P: AsRef<str>,
134    {
135        self.send_request(Command::GetObject, path.as_ref()).await
136    }
137
138    pub async fn get_range<S: AsRef<str>>(
139        &self,
140        path: S,
141        start: u64,
142        end: Option<u64>,
143    ) -> Result<S3Response, S3Error> {
144        if let Some(end) = end {
145            if start >= end {
146                return Err(S3Error::Range("start must be < than end"));
147            }
148        }
149        self.send_request(Command::GetObjectRange { start, end }, path.as_ref())
150            .await
151    }
152
153    /// DELETE an object
154    pub async fn delete<S: AsRef<str>>(&self, path: S) -> Result<S3Response, S3Error> {
155        self.send_request(Command::DeleteObject, path.as_ref())
156            .await
157    }
158
159    /// PUT an object
160    pub async fn put<S: AsRef<str>>(&self, path: S, content: &[u8]) -> Result<S3Response, S3Error> {
161        self.put_with_content_type(path, content, "application/octet-stream")
162            .await
163    }
164
165    /// PUT an object with a specific content type
166    pub async fn put_with_content_type<S: AsRef<str>>(
167        &self,
168        path: S,
169        content: &[u8],
170        content_type: &str,
171    ) -> Result<S3Response, S3Error> {
172        let mut headers = HeaderMap::new();
173        headers.insert(CONTENT_TYPE, HeaderValue::from_str(content_type)?);
174
175        self.send_request(
176            Command::PutObject {
177                content,
178                headers,
179                multipart: None,
180            },
181            path.as_ref(),
182        )
183        .await
184    }
185
186    /// PUT an object with specific headers.
187    ///
188    /// `headers` accepts additional headers to include in the request. Required headers for the
189    /// request (i.e. `Authorization`, `Content-Length`) don't need to be included, as they are
190    /// still handled automatically.
191    ///
192    /// # Examples
193    ///
194    /// ```no_run
195    /// let bucket = Bucket::try_from_env().await?;
196    /// let mut headers = HeaderMap::new();
197    /// let content = b"world";
198    ///
199    /// // Denote that this a text file.
200    /// headers.insert("Content-Type", "text/plain");
201    /// // Tell S3 what the caching behavior this object should respond to clients with.
202    /// headers.insert("Cache-Control", "public, max-age=3600");
203    ///
204    /// bucket.put_with("hello.txt", content, headers).await?;
205    /// ```
206    pub async fn put_with<S: AsRef<str>>(
207        &self,
208        path: S,
209        content: &[u8],
210        extra_headers: HeaderMap,
211    ) -> Result<S3Response, S3Error> {
212        self.send_request(
213            Command::PutObject {
214                content,
215                headers: extra_headers,
216                multipart: None,
217            },
218            path.as_ref(),
219        )
220        .await
221    }
222
223    /// Streaming object upload from any reader that implements `AsyncRead`
224    pub async fn put_stream<R>(
225        &self,
226        reader: &mut R,
227        path: String,
228    ) -> Result<PutStreamResponse, S3Error>
229    where
230        R: AsyncRead + Unpin,
231    {
232        self.put_stream_with_content_type(reader, path, "application/octet-stream".to_string())
233            .await
234    }
235
236    async fn initiate_multipart_upload(
237        &self,
238        path: &str,
239        extra_headers: HeaderMap,
240    ) -> Result<InitiateMultipartUploadResponse, S3Error> {
241        let res = self
242            .send_request(
243                Command::InitiateMultipartUpload {
244                    headers: extra_headers,
245                },
246                path,
247            )
248            .await?;
249        Ok(quick_xml::de::from_str(&res.text().await?)?)
250    }
251
252    async fn multipart_request(
253        &self,
254        path: &str,
255        chunk: Vec<u8>,
256        part_number: u32,
257        upload_id: &str,
258    ) -> Result<Response, S3Error> {
259        self.send_request(
260            Command::PutObject {
261                // TODO switch to owned data would make sense here probably
262                content: &chunk,
263                multipart: Some(Multipart::new(part_number, upload_id)),
264                headers: HeaderMap::new(),
265            },
266            path,
267        )
268        .await
269    }
270
271    async fn complete_multipart_upload(
272        &self,
273        path: &str,
274        upload_id: &str,
275        parts: Vec<Part>,
276    ) -> Result<Response, S3Error> {
277        let data = CompleteMultipartUploadData { parts };
278        self.send_request(Command::CompleteMultipartUpload { upload_id, data }, path)
279            .await
280    }
281
282    /// Streaming object upload from any reader that implements `AsyncRead`
283    #[tracing::instrument(level = "debug", skip_all, fields(path = path))]
284    pub async fn put_stream_with_content_type<R>(
285        &self,
286        reader: &mut R,
287        path: String,
288        content_type: String,
289    ) -> Result<PutStreamResponse, S3Error>
290    where
291        R: AsyncRead + Unpin,
292    {
293        let mut headers = HeaderMap::new();
294        headers.insert(CONTENT_TYPE, HeaderValue::from_str(&content_type)?);
295
296        self.put_stream_with(reader, path, headers).await
297    }
298
299    /// Streaming object upload from any reader that implements [`AsyncRead`].
300    ///
301    /// `headers` accepts additional headers to include in the request. Required headers for the
302    /// request (i.e. `Authorization`, `Content-Length`) don't need to be included, as they are
303    /// still handled automatically.
304    #[tracing::instrument(level = "debug", skip_all, fields(path = path))]
305    pub async fn put_stream_with<R>(
306        &self,
307        reader: &mut R,
308        path: String,
309        extra_headers: HeaderMap,
310    ) -> Result<PutStreamResponse, S3Error>
311    where
312        R: AsyncRead + Unpin,
313    {
314        // Grab the content type.
315
316        // If the file is smaller CHUNK_SIZE, just do a regular upload,
317        // Otherwise, perform a multipart upload.
318        let mut first_chunk = Vec::with_capacity(CHUNK_SIZE);
319        let first_chunk_size = reader
320            .take(CHUNK_SIZE as u64)
321            .read_to_end(&mut first_chunk)
322            .await?;
323
324        debug!("first_chunk size: {}", first_chunk.len());
325        if first_chunk_size < CHUNK_SIZE {
326            debug!("first_chunk_size < CHUNK_SIZE -> doing normal PUT without stream");
327            let res = self
328                .put_with(&path, first_chunk.as_slice(), extra_headers)
329                .await;
330
331            return match res {
332                Ok(res) => Ok(PutStreamResponse {
333                    status_code: res.status().as_u16(),
334                    uploaded_bytes: first_chunk_size,
335                }),
336                Err(err) => Err(err),
337            };
338        }
339
340        debug!("first_chunk_size > CHUNK_SIZE -> initiate streaming upload");
341
342        // At this point, the file exceeds the CHUNK_SIZE.
343        // This means we will upload at least 2 chunks.
344        // To optimize the performance, the writer will be spawned on a dedicated
345        // tokio top level tasks to make optimal use of multiple cores.
346        // The very little cloned data is worth it to get better throughput.
347        // A channel with 2-chunk buffer will be used for the communication to
348        // get optimal performance out of the slower in / out pipelines.
349        let (tx, rx) = flume::bounded(2);
350
351        // Writer task
352        let slf = self.clone();
353        let handle_writer = tokio::spawn(async move {
354            debug!("writer task has been started");
355
356            let msg = slf.initiate_multipart_upload(&path, extra_headers).await?;
357            debug!("{:?}", msg);
358            let path = msg.key;
359            let upload_id = &msg.upload_id;
360
361            let mut part_number: u32 = 0;
362            let mut etags = Vec::new();
363
364            let mut total_size = 0;
365            loop {
366                let chunk = if part_number == 0 {
367                    // this memory swap avoids a clone of the first chunk
368                    let mut bytes = Vec::default();
369                    mem::swap(&mut first_chunk, &mut bytes);
370                    bytes
371                } else {
372                    match rx.recv_async().await {
373                        Ok(Some(chunk)) => chunk,
374                        Ok(None) => {
375                            debug!("no more parts available in reader - finishing upload");
376                            break;
377                        }
378                        Err(err) => {
379                            debug!("chunk reader channel has been closed: {}", err);
380                            break;
381                        }
382                    }
383                };
384                debug!("chunk size in loop {}: {}", part_number + 1, chunk.len());
385
386                total_size += chunk.len();
387
388                // chunk upload
389                part_number += 1;
390                let res = slf
391                    .multipart_request(&path, chunk, part_number, upload_id)
392                    .await;
393
394                match res {
395                    Ok(res) => {
396                        let etag = res
397                            .headers()
398                            .get("etag")
399                            .ok_or_else(|| {
400                                S3Error::UnexpectedResponse(
401                                    "missing ETag in multipart response headers",
402                                )
403                            })?
404                            .to_str()
405                            .map_err(S3Error::HeaderToStr)?;
406                        etags.push(etag.to_string());
407                    }
408                    Err(err) => {
409                        // if chunk upload failed - abort the upload
410                        slf.abort_upload(&path, upload_id).await?;
411                        return Err(err);
412                    }
413                }
414            }
415            debug!(
416                "multipart uploading finished after {} parts with total size of {} bytes",
417                part_number, total_size
418            );
419
420            // Finish the upload
421            let inner_data = etags
422                .into_iter()
423                .enumerate()
424                .map(|(i, etag)| Part {
425                    etag,
426                    part_number: i as u32 + 1,
427                })
428                .collect::<Vec<Part>>();
429            debug!("data for multipart finishing: {:?}", inner_data);
430            let res = slf
431                .complete_multipart_upload(&path, &msg.upload_id, inner_data)
432                .await;
433
434            match res {
435                Ok(res) => Ok(PutStreamResponse {
436                    status_code: res.status().as_u16(),
437                    uploaded_bytes: total_size,
438                }),
439                Err(err) => Err(err),
440            }
441        });
442
443        // The reader will run in this task for simplifying lifetimes
444        loop {
445            let mut buf = Vec::with_capacity(CHUNK_SIZE);
446            match reader.take(CHUNK_SIZE as u64).read_to_end(&mut buf).await {
447                Ok(size) => {
448                    if size == 0 {
449                        debug!("stream reader finished reading");
450                        if let Err(err) = tx.send_async(None).await {
451                            error!("sending the 'no more data' message in reader: {}", err);
452                        }
453                        break;
454                    }
455
456                    debug!("stream reader read {} bytes", size);
457                    if let Err(err) = tx.send_async(Some(buf)).await {
458                        warn!(
459                            "Stream Writer has been closed before reader finished: {}",
460                            err
461                        );
462                        break;
463                    }
464                }
465                Err(err) => {
466                    error!("stream reader error: {}", err);
467                    break;
468                }
469            }
470        }
471
472        handle_writer.await?
473    }
474
475    async fn list_page(
476        &self,
477        prefix: &str,
478        delimiter: Option<&str>,
479        continuation_token: Option<String>,
480        start_after: Option<String>,
481        max_keys: Option<usize>,
482    ) -> Result<ListBucketResult, S3Error> {
483        let command = if self.list_objects_v2 {
484            Command::ListObjectsV2 {
485                prefix,
486                delimiter,
487                continuation_token,
488                start_after,
489                max_keys,
490            }
491        } else {
492            // In the v1 ListObjects request, there is only one "marker"
493            // field that serves as both the initial starting position,
494            // and as the continuation token.
495            Command::ListObjects {
496                prefix,
497                delimiter,
498                marker: std::cmp::max(continuation_token, start_after),
499                max_keys,
500            }
501        };
502
503        let resp = self.send_request(command, "/").await?;
504        let bytes = resp.bytes().await?;
505        let list_bucket_result = quick_xml::de::from_reader(bytes.as_ref())?;
506        Ok(list_bucket_result)
507    }
508
509    /// List bucket contents
510    pub async fn list(
511        &self,
512        prefix: &str,
513        delimiter: Option<&str>,
514    ) -> Result<Vec<ListBucketResult>, S3Error> {
515        let mut results = Vec::new();
516        let mut continuation_token = None;
517
518        loop {
519            let list_bucket_result = self
520                .list_page(prefix, delimiter, continuation_token, None, None)
521                .await?;
522            continuation_token = list_bucket_result.next_continuation_token.clone();
523            results.push(list_bucket_result);
524            if continuation_token.is_none() {
525                break;
526            }
527        }
528
529        Ok(results)
530    }
531
532    /// S3 internal copy an object from one place to another inside the same bucket
533    pub async fn copy_internal<F, T>(&self, from: F, to: T) -> Result<S3StatusCode, S3Error>
534    where
535        F: AsRef<str>,
536        T: AsRef<str>,
537    {
538        self.copy_internal_with(from, to, HeaderMap::new()).await
539    }
540
541    /// S3 internal copy an object from one place to another inside the same bucket.
542    ///
543    /// `headers` accepts additional headers to include in the request. Required headers for the
544    /// request (i.e. `Authorization`, `Content-Length`) don't need to be included, as they are
545    /// still handled automatically.
546    ///
547    /// # Examples
548    ///
549    /// This example shows how to modify the metadata of an existing object in S3.
550    ///
551    /// ```no_run
552    /// let bucket = Bucket::try_from_env().await?;
553    /// let mut headers = HeaderMap::new();
554    ///
555    /// // `x-amz-metadata-directive` tells S3 what to do with the existing object metadata.
556    /// headers.insert("x-amz-metadata-directive", "REPLACE");
557    /// headers.insert("Content-Type", "image/jpeg");
558    /// headers.insert("Cache-Control", "public, max-age=86400");
559    ///
560    /// bucket.copy_internal_with("cat.jpg", "cat.jpg", headers).await?;
561    /// ```
562    pub async fn copy_internal_with<F, T>(
563        &self,
564        from: F,
565        to: T,
566        extra_headers: HeaderMap,
567    ) -> Result<S3StatusCode, S3Error>
568    where
569        F: AsRef<str>,
570        T: AsRef<str>,
571    {
572        let fq_from = {
573            let from = from.as_ref();
574            let from = from.strip_prefix('/').unwrap_or(from);
575            format!("{}/{}", self.name, from)
576        };
577        Ok(self
578            .send_request(
579                Command::CopyObject {
580                    from: &fq_from,
581                    headers: extra_headers,
582                },
583                to.as_ref(),
584            )
585            .await?
586            .status())
587    }
588
589    /// S3 internal copy an object from another bucket into "this" bucket
590    pub async fn copy_internal_from<B, F, T>(
591        &self,
592        from_bucket: B,
593        from_object: F,
594        to: T,
595    ) -> Result<S3StatusCode, S3Error>
596    where
597        B: AsRef<str>,
598        F: AsRef<str>,
599        T: AsRef<str>,
600    {
601        let fq_from = {
602            let from_object = from_object.as_ref();
603            let from_object = from_object.strip_prefix('/').unwrap_or(from_object);
604            format!("{}/{}", from_bucket.as_ref(), from_object)
605        };
606        Ok(self
607            .send_request(
608                Command::CopyObject {
609                    from: &fq_from,
610                    headers: HeaderMap::new(),
611                },
612                to.as_ref(),
613            )
614            .await?
615            .status())
616    }
617
618    async fn abort_upload(&self, key: &str, upload_id: &str) -> Result<(), S3Error> {
619        let resp = self
620            .send_request(Command::AbortMultipartUpload { upload_id }, key)
621            .await?;
622
623        let status = resp.status();
624        if status.is_success() {
625            Ok(())
626        } else {
627            let utf8_content = String::from_utf8(resp.bytes().await?.to_vec())?;
628            Err(S3Error::HttpFailWithBody(status.as_u16(), utf8_content))
629        }
630    }
631
632    async fn send_request(
633        &self,
634        mut command: Command<'_>,
635        path: &str,
636    ) -> Result<reqwest::Response, S3Error> {
637        let url = self.build_url(&command, path)?;
638        let headers = self.build_headers(&mut command, &url).await?;
639
640        let builder = Self::get_client()
641            .request(command.http_method(), url)
642            .headers(headers);
643
644        let res = match command {
645            Command::PutObject { content, .. } => builder.body(content.to_vec()),
646            Command::PutObjectTagging { tags } => builder.body(tags.to_string()),
647            Command::UploadPart { content, .. } => builder.body(content.to_vec()),
648            Command::CompleteMultipartUpload { ref data, .. } => {
649                let body = data.to_string();
650                builder.body(body)
651            }
652            _ => builder.body(Vec::default()),
653        }
654        .send()
655        .await?;
656
657        if res.status().is_success() {
658            Ok(res)
659        } else {
660            Err(S3Error::HttpFailWithBody(
661                res.status().as_u16(),
662                res.text().await?,
663            ))
664        }
665    }
666
667    fn get_client<'a>() -> &'a reqwest::Client {
668        CLIENT.get_or_init(|| {
669            let mut builder = reqwest::Client::builder()
670                .brotli(true)
671                .connect_timeout(Duration::from_secs(10))
672                .tcp_keepalive(Duration::from_secs(30))
673                .pool_idle_timeout(Duration::from_secs(600));
674
675            #[cfg(any(
676                feature = "rustls-tls",
677                feature = "rustls-tls-no-provider",
678                feature = "rustls-tls-manual-roots",
679                feature = "rustls-tls-webpki-roots",
680                feature = "rustls-tls-native-roots"
681            ))]
682            {
683                builder = builder.use_rustls_tls();
684
685                if env::var("S3_DANGER_ALLOW_INSECURE").as_deref() == Ok("true") {
686                    builder = builder.danger_accept_invalid_certs(true);
687                }
688            }
689
690            builder.build().unwrap()
691        })
692    }
693
694    /// Builds headers for the request.
695    ///
696    /// `command` is `&mut` since this function will consume any `headers` that were passed in from
697    /// the client.
698    async fn build_headers(
699        &self,
700        command: &mut Command<'_>,
701        url: &Url,
702    ) -> Result<HeaderMap, S3Error> {
703        let cmd_hash = command.sha256();
704        let now = OffsetDateTime::now_utc();
705
706        // For commands that accept the `HeaderMap` as part of the command, re-use the map.
707        let mut headers = match command {
708            Command::PutObject { headers, .. }
709            | Command::InitiateMultipartUpload { headers, .. }
710            | Command::CopyObject { headers, .. } => std::mem::take(headers),
711            _ => HeaderMap::with_capacity(4),
712        };
713
714        // host header
715        let domain = self.host_domain();
716        if self.path_style {
717            headers.insert(HOST, HeaderValue::from_str(domain.as_str())?);
718        } else {
719            headers.insert(
720                HOST,
721                HeaderValue::try_from(format!("{}.{}", self.name, domain))?,
722            );
723        }
724
725        // add command specific header
726        match command {
727            Command::CopyObject { from, .. } => {
728                headers.insert(
729                    HeaderName::from_static("x-amz-copy-source"),
730                    HeaderValue::from_str(from)?,
731                );
732            }
733            Command::ListObjects { .. } => {}
734            Command::ListObjectsV2 { .. } => {}
735            Command::GetObject => {}
736            Command::GetObjectTagging => {}
737            Command::GetBucketLocation => {}
738
739            Command::InitiateMultipartUpload { .. } => {
740                if !headers.contains_key(CONTENT_TYPE) {
741                    headers.insert(
742                        CONTENT_TYPE,
743                        HeaderValue::from_str("application/octet-stream")?,
744                    );
745                }
746            }
747            Command::CompleteMultipartUpload { .. } => {
748                headers.insert(CONTENT_TYPE, HeaderValue::from_str("application/xml")?);
749            }
750            Command::PutObject { multipart, .. } => {
751                // If this is not a multipart upload, default to `application/octet-stream` in case
752                // the content type was never set.
753                //
754                // N.B.: For multipart uploads, the content type is set during initiation.
755                if multipart.is_none() && !headers.contains_key(CONTENT_TYPE) {
756                    headers.insert(
757                        CONTENT_TYPE,
758                        HeaderValue::from_str("application/octet-stream")?,
759                    );
760                }
761            }
762
763            // Needed to make Garage work while Minio
764            // seems to ignore `content-length: 0` for these
765            Command::DeleteObject => {}
766            Command::GetObjectRange { .. } => {}
767            Command::HeadObject { .. } => {}
768
769            _ => {
770                headers.insert(
771                    CONTENT_LENGTH,
772                    HeaderValue::try_from(command.content_length().to_string())?,
773                );
774                headers.insert(CONTENT_TYPE, HeaderValue::from_str("text/plain")?);
775            }
776        }
777
778        // hash and date
779        headers.insert(
780            HeaderName::from_static("x-amz-content-sha256"),
781            HeaderValue::from_str(&cmd_hash)?,
782        );
783        headers.insert(
784            HeaderName::from_static("x-amz-date"),
785            HeaderValue::try_from(now.format(LONG_DATE_TIME)?)?,
786        );
787
788        match command {
789            Command::PutObjectTagging { tags } => {
790                headers.insert(
791                    HeaderName::from_static("content-md5"),
792                    HeaderValue::try_from(md5_url_encode(tags.as_bytes()))?,
793                );
794            }
795            Command::PutObject { content, .. } => {
796                headers.insert(
797                    HeaderName::from_static("content-md5"),
798                    HeaderValue::try_from(md5_url_encode(content))?,
799                );
800            }
801            Command::UploadPart { content, .. } => {
802                headers.insert(
803                    HeaderName::from_static("content-md5"),
804                    HeaderValue::try_from(md5_url_encode(content))?,
805                );
806            }
807            Command::GetObject => {
808                headers.insert(ACCEPT, HeaderValue::from_static("application/octet-stream"));
809            }
810            Command::GetObjectRange { start, end } => {
811                headers.insert(ACCEPT, HeaderValue::from_static("application/octet-stream"));
812
813                let range = if let Some(end) = end {
814                    format!("bytes={}-{}", start, end)
815                } else {
816                    format!("bytes={}-", start)
817                };
818                headers.insert(RANGE, HeaderValue::try_from(range)?);
819            }
820            _ => {}
821        }
822
823        // sign all the above heavers with the secret
824        let canonical_request =
825            signature::canonical_request(&command.http_method(), url, &headers, &cmd_hash)?;
826        let string_to_sign =
827            signature::string_to_sign(&now, &self.region, canonical_request.as_bytes())?;
828        let signing_key =
829            signature::signing_key(&now, &self.credentials.access_key_secret, &self.region)?;
830        let mut hmac = Hmac::<Sha256>::new_from_slice(&signing_key)?;
831        hmac.update(string_to_sign.as_bytes());
832        let signature = hex::encode(hmac.finalize().into_bytes());
833        let signed_header = signature::signed_header_string(&headers);
834        let authorization = signature::authorization_header(
835            &self.credentials.access_key_id,
836            &now,
837            &self.region,
838            &signed_header,
839            &signature,
840        )?;
841        headers.insert(AUTHORIZATION, HeaderValue::try_from(authorization)?);
842
843        // The format of RFC2822 is somewhat malleable, so including it in
844        // signed headers can cause signature mismatches. We do include the
845        // X-Amz-Date header, so requests are still properly limited to a date
846        // range and can't be used again e.g. reply attacks. Adding this header
847        // after the generation of the Authorization header leaves it out of
848        // the signed headers.
849        headers.insert(DATE, HeaderValue::try_from(now.format(&Rfc2822)?)?);
850
851        Ok(headers)
852    }
853
854    fn build_url(&self, command: &Command, path: &str) -> Result<Url, S3Error> {
855        let mut url = if self.path_style {
856            format!(
857                "{}://{}/{}",
858                self.host.scheme(),
859                self.host_domain(),
860                self.name,
861            )
862        } else {
863            format!(
864                "{}://{}.{}",
865                self.host.scheme(),
866                self.name,
867                self.host_domain(),
868            )
869        };
870
871        let path = if let Some(stripped) = path.strip_prefix('/') {
872            stripped
873        } else {
874            path
875        };
876
877        url.push('/');
878        url.push_str(&signature::uri_encode(path, false));
879
880        match command {
881            Command::InitiateMultipartUpload { .. } | Command::ListMultipartUploads { .. } => {
882                url.push_str("?uploads")
883            }
884            Command::AbortMultipartUpload { upload_id } => {
885                write!(url, "?uploadId={}", upload_id).expect("write! to succeed");
886            }
887            Command::CompleteMultipartUpload { upload_id, .. } => {
888                write!(url, "?uploadId={}", upload_id).expect("write! to succeed");
889            }
890            Command::PutObject {
891                multipart: Some(multipart),
892                ..
893            } => url.push_str(&multipart.query_string()),
894            _ => {}
895        }
896
897        let mut url = Url::parse(&url)?;
898
899        match command {
900            Command::ListObjectsV2 {
901                prefix,
902                delimiter,
903                continuation_token,
904                start_after,
905                max_keys,
906            } => {
907                let mut query_pairs = url.query_pairs_mut();
908                if let Some(d) = delimiter {
909                    query_pairs.append_pair("delimiter", d);
910                }
911
912                query_pairs.append_pair("prefix", prefix);
913                query_pairs.append_pair("list-type", "2");
914                if let Some(token) = continuation_token {
915                    query_pairs.append_pair("continuation-token", token);
916                }
917                if let Some(start_after) = start_after {
918                    query_pairs.append_pair("start-after", start_after);
919                }
920                if let Some(max_keys) = max_keys {
921                    query_pairs.append_pair("max-keys", &max_keys.to_string());
922                }
923            }
924
925            Command::ListObjects {
926                prefix,
927                delimiter,
928                marker,
929                max_keys,
930            } => {
931                let mut query_pairs = url.query_pairs_mut();
932                if let Some(d) = delimiter {
933                    query_pairs.append_pair("delimiter", d);
934                }
935
936                query_pairs.append_pair("prefix", prefix);
937                if let Some(marker) = marker {
938                    query_pairs.append_pair("marker", marker);
939                }
940                if let Some(max_keys) = max_keys {
941                    query_pairs.append_pair("max-keys", &max_keys.to_string());
942                }
943            }
944
945            Command::ListMultipartUploads {
946                prefix,
947                delimiter,
948                key_marker,
949                max_uploads,
950            } => {
951                let mut query_pairs = url.query_pairs_mut();
952                delimiter.map(|d| query_pairs.append_pair("delimiter", d));
953                if let Some(prefix) = prefix {
954                    query_pairs.append_pair("prefix", prefix);
955                }
956                if let Some(key_marker) = key_marker {
957                    query_pairs.append_pair("key-marker", key_marker);
958                }
959                if let Some(max_uploads) = max_uploads {
960                    query_pairs.append_pair("max-uploads", max_uploads.to_string().as_str());
961                }
962            }
963
964            Command::PutObjectTagging { .. }
965            | Command::GetObjectTagging
966            | Command::DeleteObjectTagging => {
967                url.query_pairs_mut().append_pair("tagging", "");
968            }
969
970            _ => {}
971        }
972
973        Ok(url)
974    }
975}
976
977#[cfg(test)]
978mod tests {
979    use super::*;
980    use pretty_assertions::assert_eq;
981    use tokio::fs;
982    use tracing_test::traced_test;
983
984    #[traced_test]
985    #[tokio::test]
986    async fn test_object_flow() -> Result<(), S3Error> {
987        dotenvy::dotenv().ok().unwrap();
988
989        let bucket = Bucket::try_from_env().expect("env vars to be set in .env");
990
991        // we do not use rstest here since the tests start multiple conflicting runtimes
992        let file_sizes = vec![
993            0,
994            1,
995            CHUNK_SIZE / 2,
996            CHUNK_SIZE - 1,
997            CHUNK_SIZE,
998            CHUNK_SIZE + 1,
999        ];
1000
1001        for file_size in file_sizes {
1002            println!("test_object_flow with {} bytes", file_size);
1003
1004            let _ = fs::create_dir_all("test_files").await;
1005            let file_name_input = format!("test_data_{}", file_size);
1006            let input_path = format!("test_files/{}", file_name_input);
1007            let file_name_output = format!("test_data_{}.out", file_size);
1008            let output_path = format!("test_files/{}", file_name_output);
1009
1010            // create and write some test data
1011            let bytes = (0..file_size).into_iter().map(|_| 0u8).collect::<Vec<u8>>();
1012            fs::write(&input_path, &bytes).await?;
1013
1014            // upload the file
1015            let res = bucket.put(&file_name_input, &bytes).await?;
1016            let status = res.status();
1017            let body = res.text().await?;
1018            println!("response body:\n{}", body);
1019            assert!(status.is_success());
1020
1021            // give the s3 replication under the hood a second
1022            tokio::time::sleep(Duration::from_secs(1)).await;
1023
1024            // GET the file back
1025            let res = bucket.get(&file_name_input).await?;
1026            assert!(res.status().is_success());
1027            let body = res.bytes().await?;
1028            assert_eq!(body.len(), file_size);
1029            fs::write(&output_path, body.as_ref()).await?;
1030
1031            // make sure input and output are the same
1032            let input_bytes = fs::read(input_path).await?;
1033            let output_bytes = fs::read(output_path).await?;
1034            assert_eq!(input_bytes.len(), file_size);
1035            assert_eq!(input_bytes.len(), output_bytes.len());
1036            assert_eq!(input_bytes, output_bytes);
1037
1038            // list bucket content and make sure it shows up
1039            let list = bucket.list(&bucket.name, None).await?;
1040            for entry in list.iter() {
1041                if entry.name == bucket.name {
1042                    for object in entry.contents.iter() {
1043                        if object.key == file_name_input {
1044                            // we found our dummy object, check the size
1045                            assert_eq!(object.size, file_size as u64);
1046                            break;
1047                        }
1048                    }
1049                }
1050            }
1051
1052            // validate that HEAD is working too
1053            let res = bucket.head(&file_name_input).await?;
1054            assert_eq!(res.content_length, Some(file_size as u64));
1055
1056            if file_size > CHUNK_SIZE / 2 {
1057                // get only a part of the object back
1058                let end = CHUNK_SIZE / 2 + 1;
1059                let res = bucket
1060                    .get_range(&file_name_input, 0, Some(end as u64))
1061                    .await?;
1062                assert!(res.status().is_success());
1063                let body = res.bytes().await?;
1064                // the GET range included the end -> 1 additional byte
1065                assert_eq!(body.len(), end as usize + 1);
1066            }
1067
1068            // test internal object copy
1069            let res = bucket
1070                .copy_internal(&file_name_input, &file_name_output)
1071                .await?;
1072            assert!(res.is_success());
1073
1074            // GET the new copied object
1075            let res = bucket.get(&file_name_output).await?;
1076            assert!(res.status().is_success());
1077            let body = res.bytes().await?;
1078            assert_eq!(body.len(), file_size);
1079
1080            // clean up and delete the test file
1081            let res = bucket.delete(&file_name_input).await?;
1082            assert!(res.status().is_success());
1083            let res = bucket.delete(&file_name_output).await?;
1084            assert!(res.status().is_success());
1085
1086            // list bucket content again and make sure its gone
1087            let list = bucket.list(&bucket.name, None).await?;
1088            for entry in list.iter() {
1089                if entry.name == bucket.name {
1090                    for object in entry.contents.iter() {
1091                        if object.key == file_name_input {
1092                            panic!("test file has not been deleted");
1093                        }
1094                    }
1095                }
1096            }
1097        }
1098
1099        Ok(())
1100    }
1101
1102    #[traced_test]
1103    #[tokio::test]
1104    async fn test_multipart() -> Result<(), S3Error> {
1105        use futures_util::stream::StreamExt;
1106        use std::os::unix::fs::MetadataExt;
1107        use tokio::io::AsyncWriteExt;
1108
1109        dotenvy::dotenv().ok().unwrap();
1110        let bucket = Bucket::try_from_env().expect("env vars to be set in .env");
1111
1112        // we do not use rstest here since the tests seem to interfere with each other on the IO layer
1113        let file_sizes = vec![
1114            CHUNK_SIZE - 1,
1115            CHUNK_SIZE,
1116            CHUNK_SIZE + 1,
1117            CHUNK_SIZE * 2,
1118            CHUNK_SIZE * 3,
1119            CHUNK_SIZE * 3 + 1,
1120        ];
1121
1122        for file_size in file_sizes {
1123            // create and write some test data
1124            let _ = fs::create_dir_all("test_files").await;
1125            let file_name_input = format!("test_data_mp_{}", file_size);
1126            let input_path = format!("test_files/{}", file_name_input);
1127            let file_name_output = format!("test_data_mp_{}.out", file_size);
1128            let output_path = format!("test_files/{}", file_name_output);
1129
1130            let bytes = (0..file_size).into_iter().map(|_| 0u8).collect::<Vec<u8>>();
1131            fs::write(&input_path, &bytes).await?;
1132
1133            // streaming upload
1134            let mut reader_file = fs::File::open(&input_path).await?;
1135            let res = bucket
1136                .put_stream(&mut reader_file, file_name_input.clone())
1137                .await?;
1138            assert!(res.status_code < 300);
1139            assert_eq!(res.uploaded_bytes, file_size);
1140
1141            // streaming download
1142            let mut file = fs::File::create(&output_path).await?;
1143
1144            let res = bucket.get(&file_name_input).await?;
1145            assert!(res.status().is_success());
1146
1147            let stream = res.bytes_stream();
1148            tokio::pin!(stream);
1149            while let Some(Ok(item)) = stream.next().await {
1150                file.write_all(item.as_ref()).await?;
1151            }
1152            // flush / sync all possibly left over data
1153            file.sync_all().await?;
1154
1155            // make sure the files match
1156            let f_in = fs::File::open(&input_path).await?;
1157            let f_out = fs::File::open(&output_path).await?;
1158            let meta_in = f_in.metadata().await.unwrap();
1159            let meta_out = f_out.metadata().await.unwrap();
1160            assert_eq!(meta_in.size(), meta_out.size());
1161        }
1162
1163        Ok(())
1164    }
1165}