cloud_copy/backend/
s3.rs

1//! Implementation of the S3 storage backend.
2
3use std::borrow::Cow;
4use std::sync::Arc;
5
6use bytes::Bytes;
7use chrono::DateTime;
8use chrono::Utc;
9use http_cache_stream_reqwest::Cache;
10use http_cache_stream_reqwest::storage::DefaultCacheStorage;
11use reqwest::Body;
12use reqwest::Request;
13use reqwest::Response;
14use reqwest::StatusCode;
15use reqwest::header;
16use reqwest::header::HeaderValue;
17use secrecy::ExposeSecret;
18use serde::Deserialize;
19use serde::Serialize;
20use tokio::sync::broadcast;
21use tracing::debug;
22use url::Url;
23
24use crate::BLOCK_SIZE_THRESHOLD;
25use crate::Config;
26use crate::Error;
27use crate::HttpClient;
28use crate::ONE_MEBIBYTE;
29use crate::Result;
30use crate::S3AuthConfig;
31use crate::TransferEvent;
32use crate::USER_AGENT;
33use crate::UrlExt as _;
34use crate::backend::StorageBackend;
35use crate::backend::Upload;
36use crate::backend::auth::RequestSigner;
37use crate::backend::auth::SignatureProvider;
38use crate::backend::auth::sha256_hex_string;
39use crate::streams::ByteStream;
40use crate::streams::TransferStream;
41
42/// The root domain for AWS.
43const AWS_ROOT_DOMAIN: &str = "amazonaws.com";
44
45/// The root domain for localstack.
46const LOCALSTACK_ROOT_DOMAIN: &str = "localhost.localstack.cloud";
47
48/// The default S3 URL region.
49const DEFAULT_REGION: &str = "us-east-1";
50
51/// The maximum number of parts in an upload.
52const MAX_PARTS: u64 = 10000;
53
54/// The minimum size of a part in bytes (5 MiB); applies to every part except
55/// the last.
56const MIN_PART_SIZE: u64 = 5 * ONE_MEBIBYTE;
57
58/// The maximum size in bytes (5 GiB) for an upload part.
59const MAX_PART_SIZE: u64 = MIN_PART_SIZE * 1024;
60
61/// The maximum size of a file on S3 in bytes (5 TiB).
62const MAX_FILE_SIZE: u64 = MAX_PART_SIZE * 1024;
63
64/// The AWS date header name.
65const AWS_DATE_HEADER: &str = "x-amz-date";
66
67/// The AWS content SHA256 header name.
68const AWS_CONTENT_SHA256_HEADER: &str = "x-amz-content-sha256";
69
70/// Represents a S3-specific copy operation error.
71#[derive(Debug, thiserror::Error)]
72pub enum S3Error {
73    /// The specified S3 block size exceeds the maximum.
74    #[error("S3 block size cannot exceed {MAX_PART_SIZE} bytes")]
75    InvalidBlockSize,
76    /// The source size exceeds the supported maximum size.
77    #[error("the size of the source file exceeds the supported maximum of {MAX_FILE_SIZE} bytes")]
78    MaximumSizeExceeded,
79    /// Invalid URL with an `s3` scheme.
80    #[error("invalid URL with `s3` scheme: the URL is not in a supported format")]
81    InvalidScheme,
82    /// A "path-style" URL is missing the bucket.
83    #[error("URL is missing the bucket in the path")]
84    MissingBucket,
85    /// The S3 secret access key is invalid.
86    #[error("invalid S3 secret access key")]
87    InvalidSecretAccessKey,
88    /// The response was missing an ETag header.
89    #[error("response from server was missing an ETag header")]
90    ResponseMissingETag,
91    /// The bucket name in the URL was invalid.
92    #[error("the bucket name specified in the URL is invalid")]
93    InvalidBucketName,
94    /// Unexpected response from server.
95    #[error("unexpected {status} response from server: failed to deserialize response contents: {error}", status = .status.as_u16())]
96    UnexpectedResponse {
97        /// The response status code.
98        status: reqwest::StatusCode,
99        /// The deserialization error.
100        error: serde_xml_rs::Error,
101    },
102}
103
104/// Represents content in a list operation results.
105#[derive(Debug, Deserialize)]
106pub struct Content {
107    /// The key of the S3 object.
108    #[serde(rename = "Key")]
109    pub key: String,
110}
111
112/// Represents results of a list operation.
113#[derive(Debug, Deserialize)]
114#[serde(rename = "ListBucketResult")]
115pub struct ListBucketResult {
116    /// The contents of the results.
117    #[serde(default, rename = "Contents")]
118    pub contents: Vec<Content>,
119    /// The next continuation token.
120    #[serde(rename = "NextContinuationToken", default)]
121    pub token: Option<String>,
122}
123
124/// Represents the result of initiating an upload.
125#[derive(Default, Deserialize)]
126#[serde(rename = "InitiateMultipartUploadResult")]
127pub struct InitiateMultipartUploadResult {
128    /// The upload identifier.
129    #[serde(rename = "UploadId")]
130    pub upload_id: String,
131}
132
133/// Represents a S3 signature provider.
134pub struct S3SignatureProvider<'a> {
135    /// The region for the request.
136    region: &'a str,
137    /// The S3 authentication configuration.
138    auth: &'a S3AuthConfig,
139}
140
141impl SignatureProvider for S3SignatureProvider<'_> {
142    fn algorithm(&self) -> &str {
143        "AWS4-HMAC-SHA256"
144    }
145
146    fn secret_key_prefix(&self) -> &str {
147        "AWS4"
148    }
149
150    fn request_type(&self) -> &str {
151        "aws4_request"
152    }
153
154    fn region(&self) -> &str {
155        self.region
156    }
157
158    fn service(&self) -> &str {
159        "s3"
160    }
161
162    fn date_header_name(&self) -> &str {
163        AWS_DATE_HEADER
164    }
165
166    fn content_hash_header_name(&self) -> &str {
167        AWS_CONTENT_SHA256_HEADER
168    }
169
170    fn access_key_id(&self) -> &str {
171        &self.auth.access_key_id
172    }
173
174    fn secret_access_key(&self) -> &str {
175        self.auth.secret_access_key.expose_secret()
176    }
177}
178
179/// Appends the authentication header to the request.
180fn append_authentication_header(
181    auth: &S3AuthConfig,
182    date: DateTime<Utc>,
183    request: &mut Request,
184) -> Result<()> {
185    let signer = RequestSigner::new(S3SignatureProvider {
186        region: request.url().region(),
187        auth,
188    });
189    let auth = signer
190        .sign(date, request)
191        .ok_or(S3Error::InvalidSecretAccessKey)?;
192    request.headers_mut().append(
193        header::AUTHORIZATION,
194        HeaderValue::try_from(auth).expect("value should be valid"),
195    );
196    Ok(())
197}
198
199/// URL extensions for S3.
200trait UrlExt {
201    /// Extracts the region from the URL.
202    ///
203    /// # Panics
204    ///
205    /// Panics if the URL is not a valid S3 URL.
206    fn region(&self) -> &str;
207
208    /// Extracts the bucket name and object path from the URL.
209    ///
210    /// # Panics
211    ///
212    /// Panics if the URL is not a valid S3 URL.
213    fn bucket_and_path(&self) -> (&str, &str);
214}
215
216impl UrlExt for Url {
217    fn region(&self) -> &str {
218        let domain = self.domain().expect("URL should have domain");
219
220        if domain.starts_with("s3.") || domain.starts_with("S3.") {
221            // Path-style URL of the form https://s3.<region>.amazonaws.com/<bucket>/<path>
222            let mut parts = domain.splitn(3, '.');
223            match (parts.next(), parts.next()) {
224                (_, Some(region)) => region,
225                _ => panic!("invalid S3 URL"),
226            }
227        } else {
228            // Virtual host style URL of the form https://<bucket>.s3.<region>.amazonaws.com/<path>
229            let mut parts = domain.splitn(4, '.');
230
231            match (parts.next(), parts.next(), parts.next()) {
232                (_, _, Some(region)) => region,
233                _ => panic!("invalid S3 URL"),
234            }
235        }
236    }
237
238    fn bucket_and_path(&self) -> (&str, &str) {
239        let domain = self.domain().expect("URL should have domain");
240
241        if domain.starts_with("s3.") || domain.starts_with("S3.") {
242            // Path-style URL of the form https://s3.<region>.amazonaws.com/<bucket>/<path>
243            let bucket = self
244                .path_segments()
245                .expect("URL should have path")
246                .next()
247                .expect("URL should have at least one path segment");
248
249            (
250                bucket,
251                self.path()
252                    .strip_prefix('/')
253                    .unwrap()
254                    .strip_prefix(bucket)
255                    .unwrap(),
256            )
257        } else {
258            // Virtual host style URL of the form https://<bucket>.s3.<region>.amazonaws.com/<path>
259            let Some((bucket, _)) = domain.split_once('.') else {
260                panic!("URL domain does not contain a bucket");
261            };
262
263            (bucket, self.path())
264        }
265    }
266}
267
268/// Extension trait for response.
269trait ResponseExt {
270    /// Converts an error response from S3 into an `Error`.
271    async fn into_error(self) -> Error;
272}
273
274impl ResponseExt for Response {
275    async fn into_error(self) -> Error {
276        /// Represents an error response.
277        #[derive(Default, Deserialize)]
278        #[serde(rename = "Error")]
279        struct ErrorResponse {
280            /// The error message.
281            #[serde(rename = "Message")]
282            message: String,
283        }
284
285        let status = self.status();
286
287        // Improve a 301 response which is likely due to using the wrong region
288        if status == StatusCode::MOVED_PERMANENTLY {
289            return Error::Server {
290                status,
291                message: "the AWS region being used may not be the correct region for the storage \
292                          bucket"
293                    .into(),
294            };
295        }
296
297        let text: String = match self.text().await {
298            Ok(text) => text,
299            Err(e) => return e.into(),
300        };
301
302        if text.is_empty() {
303            return Error::Server {
304                status,
305                message: text,
306            };
307        }
308
309        let message = match serde_xml_rs::from_str::<ErrorResponse>(&text) {
310            Ok(response) => response.message,
311            Err(e) => {
312                return S3Error::UnexpectedResponse { status, error: e }.into();
313            }
314        };
315
316        Error::Server { status, message }
317    }
318}
319
320/// Represents a completed part of an upload.
321#[derive(Default, Clone, Serialize)]
322#[serde(rename = "Part")]
323pub struct S3UploadPart {
324    /// The part number of the upload.
325    #[serde(rename = "PartNumber")]
326    number: u64,
327    /// The ETag of the part.
328    #[serde(rename = "ETag")]
329    etag: String,
330}
331
332/// Represents an S3 file upload.
333pub struct S3Upload {
334    /// The configuration to use for the upload.
335    config: Arc<Config>,
336    /// The HTTP client to use for uploading.
337    client: HttpClient,
338    /// The URL of the object being uploaded.
339    url: Url,
340    /// The identifier of this upload.
341    id: String,
342    /// The channel for sending progress updates.
343    events: Option<broadcast::Sender<TransferEvent>>,
344}
345
346impl Upload for S3Upload {
347    type Part = S3UploadPart;
348
349    async fn put(&self, id: u64, block: u64, bytes: Bytes) -> Result<Option<Self::Part>> {
350        // See: https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPart.html
351
352        debug!(
353            "sending PUT request for block {block} of `{url}`",
354            url = self.url.display()
355        );
356
357        let mut url = self.url.clone();
358
359        {
360            let mut pairs = url.query_pairs_mut();
361            pairs.append_pair("partNumber", &format!("{number}", number = block + 1));
362            pairs.append_pair("uploadId", &self.id);
363        }
364
365        let digest = sha256_hex_string(&bytes);
366        let length = bytes.len();
367        let body = Body::wrap_stream(TransferStream::new(
368            ByteStream::new(bytes),
369            id,
370            block,
371            0,
372            self.events.clone(),
373        ));
374
375        let date = Utc::now();
376        let mut request = self
377            .client
378            .put(url)
379            .header(header::USER_AGENT, USER_AGENT)
380            .header(header::CONTENT_LENGTH, length)
381            .header(header::CONTENT_TYPE, "application/octet-stream")
382            .header(AWS_DATE_HEADER, date.format("%Y%m%dT%H%M%SZ").to_string())
383            .header(AWS_CONTENT_SHA256_HEADER, &digest)
384            .body(body)
385            .build()?;
386
387        if let Some(auth) = &self.config.s3.auth {
388            append_authentication_header(auth, date, &mut request)?;
389        }
390
391        let response = self.client.execute(request).await?;
392        if !response.status().is_success() {
393            return Err(response.into_error().await);
394        }
395
396        let etag = response
397            .headers()
398            .get(header::ETAG)
399            .and_then(|v| v.to_str().ok())
400            .ok_or(S3Error::ResponseMissingETag)?;
401
402        Ok(Some(S3UploadPart {
403            number: block + 1,
404            etag: etag.to_string(),
405        }))
406    }
407
408    async fn finalize(&self, parts: &[Self::Part]) -> Result<()> {
409        // See: https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
410
411        /// Represents the request body for completing a multipart upload.
412        #[derive(Serialize)]
413        #[serde(rename = "CompleteMultipartUpload")]
414        struct CompleteUpload<'a> {
415            /// The parts of the upload.
416            #[serde(rename = "Part")]
417            parts: &'a [S3UploadPart],
418        }
419
420        debug!(
421            "sending POST request to finalize upload of `{url}`",
422            url = self.url.display()
423        );
424
425        let mut url = self.url.clone();
426
427        {
428            let mut pairs = url.query_pairs_mut();
429            pairs.append_pair("uploadId", &self.id);
430        }
431
432        let body = serde_xml_rs::SerdeXml::new()
433            .default_namespace("http://s3.amazonaws.com/doc/2006-03-01/")
434            .to_string(&CompleteUpload { parts })
435            .expect("should serialize");
436
437        let date = Utc::now();
438        let mut request = self
439            .client
440            .post(url)
441            .header(header::USER_AGENT, USER_AGENT)
442            .header(header::CONTENT_LENGTH, body.len())
443            .header(header::CONTENT_TYPE, "application/xml")
444            .header(AWS_DATE_HEADER, date.format("%Y%m%dT%H%M%SZ").to_string())
445            .header(AWS_CONTENT_SHA256_HEADER, sha256_hex_string(&body))
446            .body(body)
447            .build()?;
448
449        if let Some(auth) = &self.config.s3.auth {
450            append_authentication_header(auth, date, &mut request)?;
451        }
452
453        let response = self.client.execute(request).await?;
454        if !response.status().is_success() {
455            return Err(response.into_error().await);
456        }
457
458        Ok(())
459    }
460}
461
462/// Represents the S3 storage backend.
463pub struct S3StorageBackend {
464    /// The config to use for transferring files.
465    config: Arc<Config>,
466    /// The HTTP client to use for transferring files.
467    client: HttpClient,
468    /// The channel for sending transfer events.
469    events: Option<broadcast::Sender<TransferEvent>>,
470}
471
472impl S3StorageBackend {
473    /// Constructs a new S3 storage backend.
474    pub fn new(
475        config: Config,
476        client: HttpClient,
477        events: Option<broadcast::Sender<TransferEvent>>,
478    ) -> Self {
479        Self {
480            config: Arc::new(config),
481            client,
482            events,
483        }
484    }
485}
486
487impl StorageBackend for S3StorageBackend {
488    type Upload = S3Upload;
489
490    fn config(&self) -> &Config {
491        &self.config
492    }
493
494    fn cache(&self) -> Option<&Cache<DefaultCacheStorage>> {
495        self.client.cache()
496    }
497
498    fn events(&self) -> &Option<broadcast::Sender<TransferEvent>> {
499        &self.events
500    }
501
502    fn block_size(&self, file_size: u64) -> Result<u64> {
503        /// The number of blocks to increment by in search of a block size
504        const BLOCK_COUNT_INCREMENT: u64 = 50;
505
506        // Return the block size if one was specified
507        if let Some(size) = self.config.block_size {
508            if size > MAX_PART_SIZE {
509                return Err(S3Error::InvalidBlockSize.into());
510            }
511
512            return Ok(size);
513        }
514
515        // Try to balance the number of blocks with the size of the blocks
516        let mut num_blocks: u64 = BLOCK_COUNT_INCREMENT;
517        while num_blocks < MAX_PARTS {
518            let block_size = file_size.div_ceil(num_blocks).next_power_of_two();
519            if block_size <= BLOCK_SIZE_THRESHOLD {
520                return Ok(block_size.max(MIN_PART_SIZE));
521            }
522
523            num_blocks += BLOCK_COUNT_INCREMENT;
524        }
525
526        // Couldn't fit the number of blocks within the size threshold; fallback to
527        // whatever will fit
528        let block_size: u64 = file_size.div_ceil(MAX_PARTS);
529        if block_size > MAX_PART_SIZE {
530            return Err(S3Error::MaximumSizeExceeded.into());
531        }
532
533        Ok(block_size)
534    }
535
536    fn is_supported_url(config: &Config, url: &Url) -> bool {
537        match url.scheme() {
538            "s3" => true,
539            "http" | "https" => {
540                let Some(domain) = url.domain() else {
541                    return false;
542                };
543
544                if domain.starts_with("s3.") || domain.starts_with("S3.") {
545                    // Path-style URL of the form https://s3.<region>.amazonaws.com/<bucket>/<path>
546                    let domain = &domain[3..];
547                    let Some((region, domain)) = domain.split_once('.') else {
548                        return false;
549                    };
550
551                    // There must be at least two path segments
552                    !region.is_empty()
553                        && (domain.eq_ignore_ascii_case(AWS_ROOT_DOMAIN)
554                            || (config.s3.use_localstack
555                                && domain.eq_ignore_ascii_case(LOCALSTACK_ROOT_DOMAIN)))
556                        && url
557                            .path_segments()
558                            .map(|mut s| s.nth(1).is_some())
559                            .unwrap_or(false)
560                } else {
561                    // Virtual host style URL of the form https://<bucket>.s3.<region>.amazonaws.com/<path>
562                    let mut parts = domain.splitn(4, '.');
563                    match (parts.next(), parts.next(), parts.next(), parts.next()) {
564                        (Some(bucket), Some(service), Some(region), Some(domain)) => {
565                            // There must be at least one path segment
566                            !bucket.is_empty()
567                                && !region.is_empty()
568                                && service.eq_ignore_ascii_case("s3")
569                                && (domain.eq_ignore_ascii_case(AWS_ROOT_DOMAIN)
570                                    || (config.s3.use_localstack
571                                        && domain.eq_ignore_ascii_case(LOCALSTACK_ROOT_DOMAIN)))
572                                && url
573                                    .path_segments()
574                                    .map(|mut s| s.next().is_some())
575                                    .unwrap_or(false)
576                        }
577                        _ => false,
578                    }
579                }
580            }
581            _ => false,
582        }
583    }
584
585    fn rewrite_url<'a>(config: &Config, url: &'a Url) -> Result<Cow<'a, Url>> {
586        match url.scheme() {
587            "s3" => {
588                let region = config.s3.region.as_deref().unwrap_or(DEFAULT_REGION);
589                let bucket = url.host_str().ok_or(S3Error::InvalidScheme)?;
590                let path = url.path();
591
592                if url.path() == "/" {
593                    return Err(S3Error::InvalidScheme.into());
594                }
595
596                let (scheme, root, port) = if config.s3.use_localstack {
597                    ("http", LOCALSTACK_ROOT_DOMAIN, ":4566")
598                } else {
599                    ("https", AWS_ROOT_DOMAIN, "")
600                };
601
602                match (url.query(), url.fragment()) {
603                    (None, None) => format!("{scheme}://{bucket}.s3.{region}.{root}{port}{path}"),
604                    (None, Some(fragment)) => {
605                        format!("{scheme}://{bucket}.s3.{region}.{root}{port}{path}#{fragment}")
606                    }
607                    (Some(query), None) => {
608                        format!("{scheme}://{bucket}.s3.{region}.{root}{port}{path}?{query}")
609                    }
610                    (Some(query), Some(fragment)) => {
611                        format!(
612                            "{scheme}://{bucket}.s3.{region}.{root}{port}{path}?{query}#{fragment}"
613                        )
614                    }
615                }
616                .parse()
617                .map(Cow::Owned)
618                .map_err(|_| S3Error::InvalidScheme.into())
619            }
620            _ => Ok(Cow::Borrowed(url)),
621        }
622    }
623
624    fn join_url<'a>(&self, mut url: Url, segments: impl Iterator<Item = &'a str>) -> Result<Url> {
625        // Append on the segments
626        {
627            let mut existing = url.path_segments_mut().expect("url should have path");
628            existing.pop_if_empty();
629            existing.extend(segments);
630        }
631
632        Ok(url)
633    }
634
635    async fn head(&self, url: Url, must_exist: bool) -> Result<Response> {
636        debug_assert!(
637            Self::is_supported_url(&self.config, &url),
638            "{url} is not a supported S3 URL",
639            url = url.as_str()
640        );
641
642        debug!("sending HEAD request for `{url}`", url = url.display());
643
644        let date = Utc::now();
645        let mut request = self
646            .client
647            .head(url)
648            .header(header::USER_AGENT, USER_AGENT)
649            .header(AWS_DATE_HEADER, date.format("%Y%m%dT%H%M%SZ").to_string())
650            .header(AWS_CONTENT_SHA256_HEADER, sha256_hex_string([]))
651            .build()?;
652
653        if let Some(auth) = &self.config.s3.auth {
654            append_authentication_header(auth, date, &mut request)?;
655        }
656
657        let response = self.client.execute(request).await?;
658        if !response.status().is_success() {
659            // If the resource isn't required to exist and it's a 404, return the response.
660            if !must_exist && response.status() == StatusCode::NOT_FOUND {
661                return Ok(response);
662            }
663
664            return Err(response.into_error().await);
665        }
666
667        Ok(response)
668    }
669
670    async fn get(&self, url: Url) -> Result<Response> {
671        debug_assert!(
672            Self::is_supported_url(&self.config, &url),
673            "{url} is not a supported S3 URL",
674            url = url.as_str()
675        );
676
677        debug!("sending GET request for `{url}`", url = url.display());
678
679        let date = Utc::now();
680        let mut request = self
681            .client
682            .get(url)
683            .header(header::USER_AGENT, USER_AGENT)
684            .header(AWS_DATE_HEADER, date.format("%Y%m%dT%H%M%SZ").to_string())
685            .header(AWS_CONTENT_SHA256_HEADER, sha256_hex_string([]))
686            .build()?;
687
688        if let Some(auth) = &self.config.s3.auth {
689            append_authentication_header(auth, date, &mut request)?;
690        }
691
692        let response = self.client.execute(request).await?;
693        if !response.status().is_success() {
694            return Err(response.into_error().await);
695        }
696
697        Ok(response)
698    }
699
700    async fn get_at_offset(&self, url: Url, etag: &str, offset: u64) -> Result<Response> {
701        debug_assert!(
702            Self::is_supported_url(&self.config, &url),
703            "{url} is not a supported S3 URL",
704            url = url.as_str()
705        );
706
707        debug!(
708            "sending GET request at offset {offset} for `{url}`",
709            url = url.display(),
710        );
711
712        let date = Utc::now();
713
714        let mut request = self
715            .client
716            .get(url)
717            .header(header::USER_AGENT, USER_AGENT)
718            .header(AWS_DATE_HEADER, date.format("%Y%m%dT%H%M%SZ").to_string())
719            .header(AWS_CONTENT_SHA256_HEADER, sha256_hex_string([]))
720            .header(header::RANGE, format!("bytes={offset}-"))
721            .header(header::IF_MATCH, etag)
722            .build()?;
723
724        if let Some(auth) = &self.config.s3.auth {
725            append_authentication_header(auth, date, &mut request)?;
726        }
727
728        let response = self.client.execute(request).await?;
729        let status = response.status();
730
731        // Handle precondition failed as remote content modified
732        if status == StatusCode::PRECONDITION_FAILED {
733            return Err(Error::RemoteContentModified);
734        }
735
736        // Handle error response
737        if !status.is_success() {
738            return Err(response.into_error().await);
739        }
740
741        // We expect partial content, otherwise treat as remote content modified
742        if status != StatusCode::PARTIAL_CONTENT {
743            return Err(Error::RemoteContentModified);
744        }
745
746        Ok(response)
747    }
748
749    async fn walk(&self, mut url: Url) -> Result<Vec<String>> {
750        // See: https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html
751
752        debug_assert!(
753            Self::is_supported_url(&self.config, &url),
754            "{url} is not a supported S3 URL",
755            url = url.as_str()
756        );
757
758        debug!("walking `{url}` as a directory", url = url.display());
759
760        let (bucket, path) = url.bucket_and_path();
761
762        // The prefix should end with `/` to signify a directory.
763        let mut prefix = path.strip_prefix('/').unwrap_or(path).to_string();
764        prefix.push('/');
765
766        // Format the request to always use the virtual-host style URL
767        let domain = url.domain().expect("URL should have domain");
768        if domain.starts_with("s3") || domain.starts_with("S3") {
769            // Set the host to a virtual host
770            url.set_host(Some(&format!("{bucket}.{domain}")))
771                .map_err(|_| S3Error::InvalidBucketName)?;
772        }
773
774        url.set_path("/");
775
776        {
777            let mut pairs = url.query_pairs_mut();
778            // Use version 2.0 of the API
779            pairs.append_pair("list-type", "2");
780            // Only return objects with this prefix
781            pairs.append_pair("prefix", &prefix);
782        }
783
784        let date = Utc::now();
785        let mut token = String::new();
786        let mut paths = Vec::new();
787        loop {
788            let mut url = url.clone();
789            if !token.is_empty() {
790                url.query_pairs_mut()
791                    .append_pair("continuation-token", &token);
792            }
793
794            // List the objects with the prefix
795            let mut request = self
796                .client
797                .get(url)
798                .header(header::USER_AGENT, USER_AGENT)
799                .header(AWS_DATE_HEADER, date.format("%Y%m%dT%H%M%SZ").to_string())
800                .header(AWS_CONTENT_SHA256_HEADER, sha256_hex_string([]))
801                .build()?;
802
803            if let Some(auth) = &self.config.s3.auth {
804                append_authentication_header(auth, date, &mut request)?;
805            }
806
807            let response = self.client.execute(request).await?;
808
809            let status = response.status();
810            if !status.is_success() {
811                return Err(response.into_error().await);
812            }
813
814            let text = response.text().await?;
815            let results: ListBucketResult = match serde_xml_rs::from_str(&text) {
816                Ok(response) => response,
817                Err(e) => {
818                    return Err(S3Error::UnexpectedResponse { status, error: e }.into());
819                }
820            };
821
822            // If there is only one result and the result is an empty path, then the given
823            // URL was to a file and not a "directory"
824            if paths.is_empty()
825                && results.contents.len() == 1
826                && results.token.is_none()
827                && let Some("") = results.contents[0].key.strip_prefix(&prefix)
828            {
829                return Ok(paths);
830            }
831
832            paths.extend(
833                results
834                    .contents
835                    .into_iter()
836                    .map(|c| c.key.strip_prefix(&prefix).map(Into::into).unwrap_or(c.key)),
837            );
838
839            token = results.token.unwrap_or_default();
840            if token.is_empty() {
841                break;
842            }
843        }
844
845        Ok(paths)
846    }
847
848    async fn new_upload(&self, url: Url) -> Result<Self::Upload> {
849        // See: https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html
850
851        debug_assert!(
852            Self::is_supported_url(&self.config, &url),
853            "{url} is not a supported S3 URL",
854            url = url.as_str()
855        );
856
857        // S3 doesn't support conditional requests for `CreateMultipartUpload`.
858        // Therefore, we must issue a HEAD request for the object if not overwriting.
859        // See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/conditional-writes.html#conditional-write-key-names
860        if !self.config.overwrite {
861            let response = self.head(url.clone(), false).await?;
862            if response.status() != StatusCode::NOT_FOUND {
863                return Err(Error::RemoteDestinationExists(url));
864            }
865        }
866
867        debug!("sending POST request for `{url}`", url = url.display());
868
869        let mut create = url.clone();
870        create.query_pairs_mut().append_key_only("uploads");
871
872        let date = Utc::now();
873        let mut request = self
874            .client
875            .post(create)
876            .header(header::USER_AGENT, USER_AGENT)
877            .header(AWS_DATE_HEADER, date.format("%Y%m%dT%H%M%SZ").to_string())
878            .header(AWS_CONTENT_SHA256_HEADER, sha256_hex_string([]))
879            .build()?;
880
881        if let Some(auth) = &self.config.s3.auth {
882            append_authentication_header(auth, date, &mut request)?;
883        }
884
885        let response = self.client.execute(request).await?;
886        let status = response.status();
887        if !status.is_success() {
888            return Err(response.into_error().await);
889        }
890
891        let text: String = match response.text().await {
892            Ok(text) => text,
893            Err(e) => return Err(e.into()),
894        };
895
896        let id = match serde_xml_rs::from_str::<InitiateMultipartUploadResult>(&text) {
897            Ok(response) => response.upload_id,
898            Err(e) => {
899                return Err(S3Error::UnexpectedResponse { status, error: e }.into());
900            }
901        };
902
903        Ok(S3Upload {
904            config: self.config.clone(),
905            client: self.client.clone(),
906            url,
907            id,
908            events: self.events.clone(),
909        })
910    }
911}