cloud_copy/
lib.rs

1//! Cloud storage copy utility.
2//!
3//! The `cloud-copy` crate offers a simple API for transferring files to and
4//! from Azure Blob Storage, Amazon S3, and Google Cloud Storage.
5//!
6//! It exports a function named [`copy`] which is responsible for copying a
7//! source to a destination.
8//!
9//! An optional transfer event stream provided to the [`copy`] function can be
10//! used to display transfer progress.
11//!
12//! Additionally, when this crate is built with the `cli` feature enabled, a
13//! [`handle_events`][cli::handle_events] function is exported that will display
14//! progress bars using the `tracing_indicatif` crate.
15
16#![deny(rustdoc::broken_intra_doc_links)]
17#![cfg_attr(docsrs, feature(doc_cfg))]
18
19use std::borrow::Cow;
20use std::fmt;
21use std::ops::Deref;
22use std::path::Path;
23use std::path::PathBuf;
24use std::sync::Arc;
25use std::time::Duration;
26
27use http_cache_stream_reqwest::Cache;
28use http_cache_stream_reqwest::storage::DefaultCacheStorage;
29use reqwest::Client;
30use reqwest::StatusCode;
31use reqwest_middleware::ClientBuilder;
32use reqwest_middleware::ClientWithMiddleware;
33use tokio::fs::OpenOptions;
34use tokio::io::BufReader;
35use tokio::io::BufWriter;
36use tokio::sync::broadcast;
37use tokio_retry2::RetryError;
38use tokio_util::io::ReaderStream;
39use tokio_util::io::StreamReader;
40use tokio_util::sync::CancellationToken;
41use tracing::info;
42use tracing::warn;
43use url::Url;
44
45use crate::backend::StorageBackend;
46use crate::backend::azure::AzureBlobStorageBackend;
47use crate::backend::generic::GenericStorageBackend;
48use crate::backend::google::GoogleStorageBackend;
49use crate::backend::s3::S3StorageBackend;
50use crate::streams::TransferStream;
51use crate::transfer::FileTransfer;
52
53mod backend;
54#[cfg(feature = "cli")]
55#[cfg_attr(docsrs, doc(cfg(feature = "cli")))]
56pub mod cli;
57mod config;
58mod generator;
59mod pool;
60mod streams;
61mod transfer;
62
63pub use backend::azure::AzureError;
64pub use backend::google::GoogleError;
65pub use backend::s3::S3Error;
66pub use config::*;
67pub use generator::*;
68
69/// The utility user agent.
70const USER_AGENT: &str = concat!(
71    "cloud-copy/",
72    env!("CARGO_PKG_VERSION"),
73    " (https://github.com/stjude-rust-labs/cloud-copy)"
74);
75
76/// Represents one mebibyte in bytes.
77const ONE_MEBIBYTE: u64 = 1024 * 1024;
78
79/// The threshold for which block size calculation uses to minimize the block
80/// size (256 MiB).
81const BLOCK_SIZE_THRESHOLD: u64 = 256 * ONE_MEBIBYTE;
82
83/// Helper for notifying that a network operation failed and will be retried.
84fn notify_retry(e: &Error, duration: Duration) {
85    // Duration of 0 indicates the first attempt; only print the message for a retry
86    if !duration.is_zero() {
87        let secs = duration.as_secs();
88        warn!(
89            "network operation failed (retried after waiting {secs} second{s}): {e}",
90            s = if secs == 1 { "" } else { "s" }
91        );
92    }
93}
94
95/// Represents either a local or remote location.
96#[derive(Debug, Clone, PartialEq, Eq, Hash)]
97pub enum Location<'a> {
98    /// The location is a local path.
99    Path(&'a Path),
100    /// The location is a URL.
101    Url(Cow<'a, Url>),
102}
103
104impl<'a> Location<'a> {
105    /// Constructs a new location from a string.
106    pub fn new(s: &'a str) -> Self {
107        match s.parse::<Url>() {
108            Ok(url) => Self::Url(Cow::Owned(url)),
109            Err(_) => Self::Path(Path::new(s)),
110        }
111    }
112}
113
114impl fmt::Display for Location<'_> {
115    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116        match self {
117            Self::Path(path) => write!(f, "{path}", path = path.display()),
118            Self::Url(url) => write!(f, "{url}", url = url.display()),
119        }
120    }
121}
122
123impl<'a> From<&'a str> for Location<'a> {
124    fn from(value: &'a str) -> Self {
125        Self::new(value)
126    }
127}
128
129impl<'a> From<&'a String> for Location<'a> {
130    fn from(value: &'a String) -> Self {
131        Self::new(value)
132    }
133}
134
135impl<'a> From<&'a Path> for Location<'a> {
136    fn from(value: &'a Path) -> Self {
137        Self::Path(value)
138    }
139}
140
141impl<'a> From<&'a PathBuf> for Location<'a> {
142    fn from(value: &'a PathBuf) -> Self {
143        Self::Path(value.as_path())
144    }
145}
146
147impl<'a> From<&'a Url> for Location<'a> {
148    fn from(value: &'a Url) -> Self {
149        Self::Url(Cow::Borrowed(value))
150    }
151}
152
153impl From<Url> for Location<'_> {
154    fn from(value: Url) -> Self {
155        Self::Url(Cow::Owned(value))
156    }
157}
158
159/// Extension trait for `Url`.
160pub trait UrlExt {
161    /// Converts the URL to a local path if it uses the `file` scheme.
162    ///
163    /// Returns `Ok(None)` if the URL is not a `file` scheme.
164    ///
165    /// Returns an error if the URL uses a `file` scheme but cannot be
166    /// represented as a local path.
167    fn to_local_path(&self) -> Result<Option<PathBuf>>;
168
169    /// Displays a URL without its query parameters.
170    ///
171    /// This is used to prevent authentication information from being displayed
172    /// to users.
173    fn display(&self) -> impl fmt::Display;
174}
175
176impl UrlExt for Url {
177    fn to_local_path(&self) -> Result<Option<PathBuf>> {
178        if self.scheme() != "file" {
179            return Ok(None);
180        }
181
182        self.to_file_path()
183            .map(Some)
184            .map_err(|_| Error::InvalidFileUrl(self.clone()))
185    }
186
187    fn display(&self) -> impl fmt::Display {
188        /// Utility for displaying URLs without query parameters.
189        struct Display<'a>(&'a Url);
190
191        impl fmt::Display for Display<'_> {
192            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
193                write!(
194                    f,
195                    "{scheme}://{host}{path}",
196                    scheme = self.0.scheme(),
197                    host = self.0.host_str().unwrap_or_default(),
198                    path = self.0.path()
199                )
200            }
201        }
202
203        Display(self)
204    }
205}
206
207/// Represents a client to use for making HTTP requests.
208#[derive(Clone)]
209pub struct HttpClient {
210    /// The underlying HTTP client.
211    client: ClientWithMiddleware,
212    /// The cache to use for storing previous requests.
213    ///
214    /// If `None`, the client is not using a cache.
215    cache: Option<Arc<Cache<DefaultCacheStorage>>>,
216}
217
218impl HttpClient {
219    const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
220    const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(60);
221
222    /// Constructs a new HTTP client.
223    pub fn new() -> Self {
224        let client = Client::builder()
225            .connect_timeout(Self::DEFAULT_CONNECT_TIMEOUT)
226            .read_timeout(Self::DEFAULT_READ_TIMEOUT)
227            .build()
228            .expect("failed to build HTTP client");
229
230        Self::from_existing(client)
231    }
232
233    /// Constructs a new HTTP client using the given cache directory.
234    pub fn new_with_cache(cache_dir: impl AsRef<Path>) -> Self {
235        let client = Client::builder()
236            .connect_timeout(Self::DEFAULT_CONNECT_TIMEOUT)
237            .read_timeout(Self::DEFAULT_READ_TIMEOUT)
238            .build()
239            .expect("failed to build HTTP client");
240
241        Self::from_existing_with_cache(client, cache_dir)
242    }
243
244    /// Constructs a new HTTP client using an existing client.
245    pub fn from_existing(client: reqwest::Client) -> Self {
246        Self {
247            client: ClientWithMiddleware::new(client, Vec::new()),
248            cache: None,
249        }
250    }
251
252    /// Constructs a new HTTP client using an existing client and the given
253    /// cache directory.
254    pub fn from_existing_with_cache(client: reqwest::Client, cache_dir: impl AsRef<Path>) -> Self {
255        let cache_dir = cache_dir.as_ref();
256        info!(
257            "using HTTP download cache directory `{dir}`",
258            dir = cache_dir.display()
259        );
260
261        let cache = Arc::new(Cache::new(DefaultCacheStorage::new(cache_dir)));
262
263        Self {
264            client: ClientBuilder::new(client).with_arc(cache.clone()).build(),
265            cache: Some(cache),
266        }
267    }
268
269    /// Gets the associated cache.
270    ///
271    /// If `None`, the client is not configured for caching.
272    pub fn cache(&self) -> Option<&Cache<DefaultCacheStorage>> {
273        self.cache.as_deref()
274    }
275}
276
277impl Default for HttpClient {
278    fn default() -> Self {
279        Self::new()
280    }
281}
282
283impl Deref for HttpClient {
284    type Target = ClientWithMiddleware;
285
286    fn deref(&self) -> &Self::Target {
287        &self.client
288    }
289}
290
291/// Helper for displaying a message in `Error`.
292struct DisplayMessage<'a> {
293    /// The status code of the error.
294    status: StatusCode,
295    /// The message to display.
296    ///
297    /// If empty, the status code's canonical reason will be used.
298    message: &'a str,
299}
300
301impl fmt::Display for DisplayMessage<'_> {
302    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
303        if self.message.is_empty() {
304            write!(
305                f,
306                " ({reason})",
307                reason = self
308                    .status
309                    .canonical_reason()
310                    .unwrap_or("<unknown status code>")
311                    .to_lowercase()
312            )
313        } else {
314            write!(f, ": {message}", message = self.message)
315        }
316    }
317}
318
319/// Represents a copy operation error.
320#[derive(Debug, thiserror::Error)]
321pub enum Error {
322    /// The operation was canceled.
323    #[error("the operation was canceled")]
324    Canceled,
325    /// Copying between remote locations is not supported.
326    #[error("copying between remote locations is not supported")]
327    RemoteCopyNotSupported,
328    /// A remote URL has an unsupported URL scheme.
329    #[error("remote URL has an unsupported URL scheme `{0}`")]
330    UnsupportedUrlScheme(String),
331    /// Unsupported remote URL.
332    #[error("URL `{url}` is not for a supported cloud service", url = .0.display())]
333    UnsupportedUrl(Url),
334    /// Invalid URl with a `file` scheme.
335    #[error("file URL `{url}` cannot be represented as a local path", url = .0.display())]
336    InvalidFileUrl(Url),
337    /// The specified path is invalid.
338    #[error("the specified path cannot be a root directory or empty")]
339    InvalidPath,
340    /// The remote content was modified during a download.
341    #[error("the remote content was modified during the download")]
342    RemoteContentModified,
343    /// Failed to create a directory.
344    #[error("failed to create directory `{path}`: {error}", path = .path.display())]
345    DirectoryCreationFailed {
346        /// The path to the directory that failed to be created.
347        path: PathBuf,
348        /// The error that occurred creating the directory.
349        error: std::io::Error,
350    },
351    /// Failed to create a temporary file.
352    #[error("failed to create temporary file: {error}")]
353    CreateTempFile {
354        /// The error that occurred creating the temporary file.
355        error: std::io::Error,
356    },
357    /// Failed to persist a temporary file.
358    #[error("failed to persist temporary file: {error}")]
359    PersistTempFile {
360        /// The error that occurred creating the temporary file.
361        error: std::io::Error,
362    },
363    /// The local destination path already exists.
364    #[error("the destination path `{path}` already exists", path = .0.display())]
365    LocalDestinationExists(PathBuf),
366    /// The remote destination URL already exists.
367    #[error("the destination URL `{url}` already exists", url = .0.display())]
368    RemoteDestinationExists(Url),
369    /// The server returned an error.
370    #[error(
371        "server returned status {status}{message}",
372        status = .status.as_u16(),
373        message = DisplayMessage { status: *.status, message }
374    )]
375    Server {
376        /// The response status code.
377        status: reqwest::StatusCode,
378        /// The response error message.
379        ///
380        /// This may be the contents of the entire response body.
381        message: String,
382    },
383    /// Server responded with an unexpected `content-range` header.
384    #[error(
385        "server responded with a `content-range` header that does not start at the requested \
386         offset"
387    )]
388    UnexpectedContentRangeStart,
389    /// An Azure error occurred.
390    #[error(transparent)]
391    Azure(#[from] AzureError),
392    /// An S3 error occurred.
393    #[error(transparent)]
394    S3(#[from] S3Error),
395    /// A Google Cloud Storage error occurred.
396    #[error(transparent)]
397    Google(#[from] GoogleError),
398    /// An I/O error occurred.
399    #[error(transparent)]
400    Io(#[from] std::io::Error),
401    /// A directory walking error occurred.
402    #[error(transparent)]
403    Walk(#[from] walkdir::Error),
404    /// A reqwest error occurred.
405    #[error(transparent)]
406    Reqwest(#[from] reqwest::Error),
407    /// A reqwest middleware error occurred.
408    #[error(transparent)]
409    Middleware(#[from] reqwest_middleware::Error),
410    /// A temp file persist error occurred.
411    #[error(transparent)]
412    Temp(#[from] tempfile::PersistError),
413}
414
415impl Error {
416    /// Converts the error into a retry error.
417    fn into_retry_error(self) -> RetryError<Self> {
418        match &self {
419            Error::Server { status, .. }
420            | Error::Azure(AzureError::UnexpectedResponse { status, .. })
421                if status.is_server_error() =>
422            {
423                RetryError::transient(self)
424            }
425            Error::Io(_) | Error::Reqwest(_) | Error::Middleware(_) => RetryError::transient(self),
426            _ => RetryError::permanent(self),
427        }
428    }
429}
430
431/// Represents a result for copy operations.
432pub type Result<T> = std::result::Result<T, Error>;
433
434/// Represents an event that may occur during a file transfer.
435#[derive(Debug, Clone)]
436pub enum TransferEvent {
437    /// A transfer of a file has been started.
438    TransferStarted {
439        /// The id of the file transfer.
440        ///
441        /// This is a monotonic counter that is increased every transfer.
442        id: u64,
443        /// The path of the file being transferred.
444        path: PathBuf,
445        /// The number of blocks in the file.
446        blocks: u64,
447        /// The size of the file being transferred.
448        ///
449        /// This is `None` when downloading a file of unknown size.
450        size: Option<u64>,
451    },
452    /// A transfer of a block has started.
453    BlockStarted {
454        /// The id of the file transfer.
455        id: u64,
456        /// The block number being transferred.
457        block: u64,
458        /// The size of the block being transferred.
459        ///
460        /// This is `None` when downloading a file of unknown size.
461        size: Option<u64>,
462    },
463    /// A transfer of a block has made progress.
464    BlockProgress {
465        /// The id of the transfer.
466        id: u64,
467        /// The block number being transferred.
468        block: u64,
469        /// The total number of bytes transferred in the block so far.
470        transferred: u64,
471    },
472    /// A transfer of a block has completed.
473    BlockCompleted {
474        /// The id of the transfer.
475        id: u64,
476        /// The block number being transferred.
477        block: u64,
478        /// Whether or not the transfer failed.
479        failed: bool,
480    },
481    /// A file transfer has completed.
482    TransferCompleted {
483        /// The id of the transfer.
484        id: u64,
485        /// Whether or not the transfer failed.
486        failed: bool,
487    },
488}
489
490/// Copies a local file to another path.
491///
492/// This differs from `tokio::fs::copy` in that progress events will be sent.
493async fn copy_local(
494    source: &Path,
495    destination: &Path,
496    cancel: CancellationToken,
497    events: Option<broadcast::Sender<TransferEvent>>,
498) -> Result<()> {
499    // The transfer id for the copy.
500    const ID: u64 = 0;
501    /// The block index for the copy.
502    const BLOCK: u64 = 0;
503
504    // Wrap the source stream with a transfer stream to emit events
505    let mut reader = StreamReader::new(TransferStream::new(
506        ReaderStream::new(BufReader::new(
507            OpenOptions::new().read(true).open(source).await?,
508        )),
509        ID,
510        BLOCK,
511        0,
512        events.clone(),
513    ));
514
515    let mut writer = BufWriter::new(
516        OpenOptions::new()
517            .create_new(true)
518            .write(true)
519            .open(destination)
520            .await?,
521    );
522
523    // Send the transfer and block started event
524    if let Some(events) = &events {
525        let size = std::fs::metadata(source)?.len();
526
527        events
528            .send(TransferEvent::TransferStarted {
529                id: ID,
530                path: destination.to_path_buf(),
531                blocks: 1,
532                size: Some(size),
533            })
534            .ok();
535
536        events
537            .send(TransferEvent::BlockStarted {
538                id: ID,
539                block: BLOCK,
540                size: Some(size),
541            })
542            .ok();
543    }
544
545    // Copy the reader stream to the writer stream
546    let result = tokio::select! {
547        _ = cancel.cancelled() => {
548            drop(writer);
549            std::fs::remove_file(destination).ok();
550            Err(Error::Canceled)
551        },
552        r = tokio::io::copy(&mut reader, &mut writer) => {
553            r?;
554            Ok(())
555        }
556    };
557
558    // Send the block and transfer end event
559    if let Some(events) = &events {
560        events
561            .send(TransferEvent::BlockCompleted {
562                id: ID,
563                block: BLOCK,
564                failed: result.is_err(),
565            })
566            .ok();
567
568        events
569            .send(TransferEvent::TransferCompleted {
570                id: ID,
571                failed: result.is_err(),
572            })
573            .ok();
574    }
575
576    result
577}
578
579/// Copies a source location to a destination location.
580///
581/// A location may either be a local path or a remote URL.
582///
583/// If provided, the `events` sender will be used to send transfer events.
584///
585/// _Note: copying between two remote locations is not supported._
586///
587/// # Azure Blob Storage
588///
589/// Supported remote URLs for Azure Blob Storage:
590///
591/// * `az` schemed URLs in the format `az://<account>/<container>/<blob>`.
592/// * `https` schemed URLs in the format `https://<account>.blob.core.windows.net/<container>/<blob>`.
593///
594/// If authentication is required, the URL is expected to contain a SAS token in
595/// its query parameters.
596///
597/// # Amazon S3
598///
599/// Supported remote URLs for S3 Storage:
600///
601/// * `s3` schemed URLs in the format: `s3://<bucket>/<object>` (note: uses the
602///   default region).
603/// * `https` schemed URLs in the format `https://<bucket>.s3.<region>.amazonaws.com/<object>`.
604/// * `https` schemed URLs in the format `https://<region>.s3.amazonaws.com/<bucket>/<object>`.
605///
606/// If authentication is required, the provided `Config` must have S3
607/// authentication information.
608///
609/// # Google Cloud Storage
610///
611/// Supported remote URLs for Google Cloud Storage:
612///
613/// * `gs` schemed URLs in the format: `gs://<bucket>/<object>`.
614/// * `https` schemed URLs in the format `https://<bucket>.storage.googleapis.com/<object>`.
615/// * `https` schemed URLs in the format `https://storage.googleapis.com/<bucket>/<object>`.
616///
617/// If authentication is required, the provided `Config` must have Google
618/// authentication information.
619///
620/// Note that [HMAC authentication](https://cloud.google.com/storage/docs/authentication/hmackeys)
621/// is used for Google Cloud Storage access.
622pub async fn copy(
623    config: Config,
624    client: HttpClient,
625    source: impl Into<Location<'_>>,
626    destination: impl Into<Location<'_>>,
627    cancel: CancellationToken,
628    events: Option<broadcast::Sender<TransferEvent>>,
629) -> Result<()> {
630    let source = source.into();
631    let destination = destination.into();
632
633    match (source, destination) {
634        (Location::Path(source), Location::Path(destination)) => {
635            if !config.overwrite && destination.exists() {
636                return Err(Error::LocalDestinationExists(destination.to_path_buf()));
637            }
638
639            // Two local locations, just perform a copy
640            Ok(copy_local(source, destination, cancel, events).await?)
641        }
642        (Location::Path(source), Location::Url(destination)) => {
643            // Perform a copy if the the destination is a local path
644            if let Some(destination) = destination.to_local_path()? {
645                return copy_local(source, &destination, cancel, events).await;
646            }
647
648            if AzureBlobStorageBackend::is_supported_url(&config, &destination) {
649                let destination = AzureBlobStorageBackend::rewrite_url(&config, &destination)?;
650                let transfer =
651                    FileTransfer::new(AzureBlobStorageBackend::new(config, client, events), cancel);
652                transfer.upload(source, destination.into_owned()).await
653            } else if S3StorageBackend::is_supported_url(&config, &destination) {
654                let destination = S3StorageBackend::rewrite_url(&config, &destination)?;
655                let transfer =
656                    FileTransfer::new(S3StorageBackend::new(config, client, events), cancel);
657                transfer.upload(source, destination.into_owned()).await
658            } else if GoogleStorageBackend::is_supported_url(&config, &destination) {
659                let destination = GoogleStorageBackend::rewrite_url(&config, &destination)?;
660                let transfer =
661                    FileTransfer::new(GoogleStorageBackend::new(config, client, events), cancel);
662                transfer.upload(source, destination.into_owned()).await
663            } else {
664                Err(Error::UnsupportedUrl(destination.into_owned()))
665            }
666        }
667        (Location::Url(source), Location::Path(destination)) => {
668            if !config.overwrite && destination.exists() {
669                return Err(Error::LocalDestinationExists(destination.to_path_buf()));
670            }
671
672            // Perform a copy if the the source is a local path
673            if let Some(source) = source.to_local_path()? {
674                return copy_local(&source, destination, cancel, events).await;
675            }
676
677            if AzureBlobStorageBackend::is_supported_url(&config, &source) {
678                let source = AzureBlobStorageBackend::rewrite_url(&config, &source)?;
679                let transfer =
680                    FileTransfer::new(AzureBlobStorageBackend::new(config, client, events), cancel);
681                transfer.download(source.into_owned(), destination).await
682            } else if S3StorageBackend::is_supported_url(&config, &source) {
683                let source = S3StorageBackend::rewrite_url(&config, &source)?;
684                let transfer =
685                    FileTransfer::new(S3StorageBackend::new(config, client, events), cancel);
686                transfer.download(source.into_owned(), destination).await
687            } else if GoogleStorageBackend::is_supported_url(&config, &source) {
688                let source = GoogleStorageBackend::rewrite_url(&config, &source)?;
689                let transfer =
690                    FileTransfer::new(GoogleStorageBackend::new(config, client, events), cancel);
691                transfer.download(source.into_owned(), destination).await
692            } else {
693                let transfer =
694                    FileTransfer::new(GenericStorageBackend::new(config, client, events), cancel);
695                transfer.download(source.into_owned(), destination).await
696            }
697        }
698        (Location::Url(source), Location::Url(destination)) => {
699            if let (Some(source), Some(destination)) =
700                (source.to_local_path()?, destination.to_local_path()?)
701            {
702                // Two local locations, just perform a copy
703                return copy_local(&source, &destination, cancel, events).await;
704            }
705
706            Err(Error::RemoteCopyNotSupported)
707        }
708    }
709}
710
711/// Re-writes a cloud storage schemed URL (e.g. `az://`, `s3://`, `gs://`) to a corresponding `https://` URL.
712///
713/// If the URL is not a cloud storage schemed URL, the given URL is returned.
714///
715/// Returns an error if the given URL is not a valid cloud storage URL.
716pub fn rewrite_url<'a>(config: &Config, url: &'a Url) -> Result<Cow<'a, Url>> {
717    if AzureBlobStorageBackend::is_supported_url(config, url) {
718        AzureBlobStorageBackend::rewrite_url(config, url)
719    } else if S3StorageBackend::is_supported_url(config, url) {
720        S3StorageBackend::rewrite_url(config, url)
721    } else if GoogleStorageBackend::is_supported_url(config, url) {
722        GoogleStorageBackend::rewrite_url(config, url)
723    } else {
724        Ok(Cow::Borrowed(url))
725    }
726}
727
728#[cfg(test)]
729mod test {
730    use super::*;
731
732    #[test]
733    fn rewrite_urls() {
734        let config = Config::default();
735
736        assert_eq!(
737            rewrite_url(&config, &"http://example.com".parse().unwrap())
738                .unwrap()
739                .as_str(),
740            "http://example.com/"
741        );
742
743        assert_eq!(
744            rewrite_url(&config, &"az://foo/bar/baz".parse().unwrap())
745                .unwrap()
746                .as_str(),
747            "https://foo.blob.core.windows.net/bar/baz"
748        );
749
750        assert_eq!(
751            rewrite_url(&config, &"s3://foo/bar/baz".parse().unwrap())
752                .unwrap()
753                .as_str(),
754            "https://foo.s3.us-east-1.amazonaws.com/bar/baz"
755        );
756
757        assert_eq!(
758            rewrite_url(&config, &"gs://foo/bar/baz".parse().unwrap())
759                .unwrap()
760                .as_str(),
761            "https://foo.storage.googleapis.com/bar/baz"
762        );
763
764        let config = Config {
765            s3: S3Config {
766                region: Some("my-region".into()),
767                ..Default::default()
768            },
769            ..Default::default()
770        };
771
772        assert_eq!(
773            rewrite_url(&config, &"s3://foo/bar/baz".parse().unwrap())
774                .unwrap()
775                .as_str(),
776            "https://foo.s3.my-region.amazonaws.com/bar/baz"
777        );
778
779        let config = Config {
780            azure: AzureConfig { use_azurite: true },
781            s3: S3Config {
782                use_localstack: true,
783                ..Default::default()
784            },
785            ..Default::default()
786        };
787
788        assert_eq!(
789            rewrite_url(&config, &"az://foo/bar/baz".parse().unwrap())
790                .unwrap()
791                .as_str(),
792            "http://foo.blob.core.windows.net.localhost:10000/bar/baz"
793        );
794
795        assert_eq!(
796            rewrite_url(&config, &"s3://foo/bar/baz".parse().unwrap())
797                .unwrap()
798                .as_str(),
799            "http://foo.s3.us-east-1.localhost.localstack.cloud:4566/bar/baz"
800        );
801    }
802}