cloud_copy/
lib.rs

1//! Cloud storage copy utility.
2//!
3//! The `cloud-copy` crate offers a simple API for transferring files to and
4//! from Azure Blob Storage, Amazon S3, and Google Cloud Storage.
5//!
6//! It exports a function named [`copy`] which is responsible for copying a
7//! source to a destination.
8//!
9//! An optional transfer event stream provided to the [`copy`] function can be
10//! used to display transfer progress.
11//!
12//! Additionally, when this crate is built with the `cli` feature enabled, a
13//! [`handle_events`][cli::handle_events] function is exported that will display
14//! progress bars using the `tracing_indicatif` crate.
15
16#![deny(rustdoc::broken_intra_doc_links)]
17#![cfg_attr(docsrs, feature(doc_cfg))]
18
19use std::fmt;
20use std::ops::Deref;
21use std::path::Path;
22use std::path::PathBuf;
23use std::sync::Arc;
24use std::time::Duration;
25
26use http_cache_stream_reqwest::Cache;
27use http_cache_stream_reqwest::storage::DefaultCacheStorage;
28use reqwest::Client;
29use reqwest::StatusCode;
30use reqwest_middleware::ClientBuilder;
31use reqwest_middleware::ClientWithMiddleware;
32use tokio::fs::OpenOptions;
33use tokio::io::BufReader;
34use tokio::io::BufWriter;
35use tokio::sync::broadcast;
36use tokio_retry2::RetryError;
37use tokio_util::io::ReaderStream;
38use tokio_util::io::StreamReader;
39use tokio_util::sync::CancellationToken;
40use tracing::info;
41use tracing::warn;
42use url::Url;
43
44use crate::backend::StorageBackend;
45use crate::backend::azure::AzureBlobStorageBackend;
46use crate::backend::generic::GenericStorageBackend;
47use crate::backend::google::GoogleStorageBackend;
48use crate::backend::s3::S3StorageBackend;
49use crate::streams::TransferStream;
50use crate::transfer::FileTransfer;
51
52mod backend;
53#[cfg(feature = "cli")]
54#[cfg_attr(docsrs, doc(cfg(feature = "cli")))]
55pub mod cli;
56mod config;
57mod generator;
58mod os;
59mod pool;
60mod streams;
61mod transfer;
62
63pub use backend::azure::AzureError;
64pub use backend::google::GoogleError;
65pub use backend::s3::S3Error;
66pub use config::*;
67pub use generator::*;
68
69/// The utility user agent.
70const USER_AGENT: &str = concat!(
71    "cloud-copy/",
72    env!("CARGO_PKG_VERSION"),
73    " (https://github.com/stjude-rust-labs/cloud-copy)"
74);
75
76/// Represents one mebibyte in bytes.
77const ONE_MEBIBYTE: u64 = 1024 * 1024;
78
79/// The threshold for which block size calculation uses to minimize the block
80/// size (256 MiB).
81const BLOCK_SIZE_THRESHOLD: u64 = 256 * ONE_MEBIBYTE;
82
83/// Helper for notifying that a network operation failed and will be retried.
84fn notify_retry(e: &Error, duration: Duration) {
85    // Duration of 0 indicates the first attempt; only print the message for a retry
86    if !duration.is_zero() {
87        let secs = duration.as_secs();
88        warn!(
89            "network operation failed (retried after waiting {secs} second{s}): {e}",
90            s = if secs == 1 { "" } else { "s" }
91        );
92    }
93}
94
95/// Represents either a local or remote location.
96#[derive(Debug, Clone, PartialEq, Eq, Hash)]
97pub enum Location<'a> {
98    /// The location is a local path.
99    Path(&'a Path),
100    /// The location is a URL.
101    Url(Url),
102}
103
104impl<'a> Location<'a> {
105    /// Constructs a new location from a string.
106    pub fn new(s: &'a str) -> Self {
107        match s.parse::<Url>() {
108            Ok(url) => Self::Url(url),
109            Err(_) => Self::Path(Path::new(s)),
110        }
111    }
112}
113
114impl fmt::Display for Location<'_> {
115    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116        match self {
117            Self::Path(path) => write!(f, "{path}", path = path.display()),
118            Self::Url(url) => write!(f, "{url}", url = url.display()),
119        }
120    }
121}
122
123impl<'a> From<&'a str> for Location<'a> {
124    fn from(value: &'a str) -> Self {
125        Self::new(value)
126    }
127}
128
129impl<'a> From<&'a String> for Location<'a> {
130    fn from(value: &'a String) -> Self {
131        Self::new(value)
132    }
133}
134
135impl<'a> From<&'a Path> for Location<'a> {
136    fn from(value: &'a Path) -> Self {
137        Self::Path(value)
138    }
139}
140
141impl<'a> From<&'a PathBuf> for Location<'a> {
142    fn from(value: &'a PathBuf) -> Self {
143        Self::Path(value.as_path())
144    }
145}
146
147impl From<Url> for Location<'_> {
148    fn from(value: Url) -> Self {
149        Self::Url(value)
150    }
151}
152
153/// Extension trait for `Url`.
154pub trait UrlExt {
155    /// Converts the URL to a local path if it uses the `file` scheme.
156    ///
157    /// Returns `Ok(None)` if the URL is not a `file` scheme.
158    ///
159    /// Returns an error if the URL uses a `file` scheme but cannot be
160    /// represented as a local path.
161    fn to_local_path(&self) -> Result<Option<PathBuf>>;
162
163    /// Displays a URL without its query parameters.
164    ///
165    /// This is used to prevent authentication information from being displayed
166    /// to users.
167    fn display(&self) -> impl fmt::Display;
168}
169
170impl UrlExt for Url {
171    fn to_local_path(&self) -> Result<Option<PathBuf>> {
172        if self.scheme() != "file" {
173            return Ok(None);
174        }
175
176        self.to_file_path()
177            .map(Some)
178            .map_err(|_| Error::InvalidFileUrl(self.clone()))
179    }
180
181    fn display(&self) -> impl fmt::Display {
182        /// Utility for displaying URLs without query parameters.
183        struct Display<'a>(&'a Url);
184
185        impl fmt::Display for Display<'_> {
186            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
187                write!(
188                    f,
189                    "{scheme}://{host}{path}",
190                    scheme = self.0.scheme(),
191                    host = self.0.host_str().unwrap_or_default(),
192                    path = self.0.path()
193                )
194            }
195        }
196
197        Display(self)
198    }
199}
200
201/// Represents a client to use for making HTTP requests.
202#[derive(Clone)]
203pub struct HttpClient {
204    /// The underlying HTTP client.
205    client: ClientWithMiddleware,
206    /// The cache to use for storing previous requests.
207    ///
208    /// If `None`, the client is not using a cache.
209    cache: Option<Arc<Cache<DefaultCacheStorage>>>,
210}
211
212impl HttpClient {
213    const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
214    const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(60);
215
216    /// Constructs a new HTTP client.
217    pub fn new() -> Self {
218        let client = Client::builder()
219            .connect_timeout(Self::DEFAULT_CONNECT_TIMEOUT)
220            .read_timeout(Self::DEFAULT_READ_TIMEOUT)
221            .build()
222            .expect("failed to build HTTP client");
223
224        Self::from_existing(client)
225    }
226
227    /// Constructs a new HTTP client using the given cache directory.
228    pub fn new_with_cache(cache_dir: impl AsRef<Path>) -> Self {
229        let client = Client::builder()
230            .connect_timeout(Self::DEFAULT_CONNECT_TIMEOUT)
231            .read_timeout(Self::DEFAULT_READ_TIMEOUT)
232            .build()
233            .expect("failed to build HTTP client");
234
235        Self::from_existing_with_cache(client, cache_dir)
236    }
237
238    /// Constructs a new HTTP client using an existing client.
239    pub fn from_existing(client: reqwest::Client) -> Self {
240        Self {
241            client: ClientWithMiddleware::new(client, Vec::new()),
242            cache: None,
243        }
244    }
245
246    /// Constructs a new HTTP client using an existing client and the given
247    /// cache directory.
248    pub fn from_existing_with_cache(client: reqwest::Client, cache_dir: impl AsRef<Path>) -> Self {
249        let cache_dir = cache_dir.as_ref();
250        info!(
251            "using HTTP download cache directory `{dir}`",
252            dir = cache_dir.display()
253        );
254
255        let cache = Arc::new(Cache::new(DefaultCacheStorage::new(cache_dir)));
256
257        Self {
258            client: ClientBuilder::new(client).with_arc(cache.clone()).build(),
259            cache: Some(cache),
260        }
261    }
262
263    /// Gets the associated cache.
264    ///
265    /// If `None`, the client is not configured for caching.
266    pub fn cache(&self) -> Option<&Cache<DefaultCacheStorage>> {
267        self.cache.as_deref()
268    }
269}
270
271impl Default for HttpClient {
272    fn default() -> Self {
273        Self::new()
274    }
275}
276
277impl Deref for HttpClient {
278    type Target = ClientWithMiddleware;
279
280    fn deref(&self) -> &Self::Target {
281        &self.client
282    }
283}
284
285/// Helper for displaying a message in `Error`.
286struct DisplayMessage<'a> {
287    /// The status code of the error.
288    status: StatusCode,
289    /// The message to display.
290    ///
291    /// If empty, the status code's canonical reason will be used.
292    message: &'a str,
293}
294
295impl fmt::Display for DisplayMessage<'_> {
296    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
297        if self.message.is_empty() {
298            write!(
299                f,
300                " ({reason})",
301                reason = self
302                    .status
303                    .canonical_reason()
304                    .unwrap_or("<unknown status code>")
305                    .to_lowercase()
306            )
307        } else {
308            write!(f, ": {message}", message = self.message)
309        }
310    }
311}
312
313/// Represents a copy operation error.
314#[derive(Debug, thiserror::Error)]
315pub enum Error {
316    /// The operation was canceled.
317    #[error("the operation was canceled")]
318    Canceled,
319    /// Copying between remote locations is not supported.
320    #[error("copying between remote locations is not supported")]
321    RemoteCopyNotSupported,
322    /// A remote URL has an unsupported URL scheme.
323    #[error("remote URL has an unsupported URL scheme `{0}`")]
324    UnsupportedUrlScheme(String),
325    /// Unsupported remote URL.
326    #[error("URL `{url}` is not for a supported cloud service", url = .0.display())]
327    UnsupportedUrl(Url),
328    /// Invalid URl with a `file` scheme.
329    #[error("file URL `{url}` cannot be represented as a local path", url = .0.display())]
330    InvalidFileUrl(Url),
331    /// The specified path is invalid.
332    #[error("the specified path cannot be a root directory or empty")]
333    InvalidPath,
334    /// The remote content was modified during a download.
335    #[error("the remote content was modified during the download")]
336    RemoteContentModified,
337    /// Failed to create a directory.
338    #[error("failed to create directory `{path}`: {error}", path = .path.display())]
339    DirectoryCreationFailed {
340        /// The path to the directory that failed to be created.
341        path: PathBuf,
342        /// The error that occurred creating the directory.
343        error: std::io::Error,
344    },
345    /// Failed to create a temporary file.
346    #[error("failed to create temporary file: {error}")]
347    CreateTempFile {
348        /// The error that occurred creating the temporary file.
349        error: std::io::Error,
350    },
351    /// Failed to persist a temporary file.
352    #[error("failed to persist temporary file: {error}")]
353    PersistTempFile {
354        /// The error that occurred creating the temporary file.
355        error: std::io::Error,
356    },
357    /// The destination path already exists.
358    #[error("the destination path `{path}` already exists", path = .0.display())]
359    DestinationExists(PathBuf),
360    /// The server returned an error.
361    #[error(
362        "server returned status {status}{message}",
363        status = .status.as_u16(),
364        message = DisplayMessage { status: *.status, message }
365    )]
366    Server {
367        /// The response status code.
368        status: reqwest::StatusCode,
369        /// The response error message.
370        ///
371        /// This may be the contents of the entire response body.
372        message: String,
373    },
374    /// Server responded with an unexpected `content-range` header.
375    #[error(
376        "server responded with a `content-range` header that does not start at the requested \
377         offset"
378    )]
379    UnexpectedContentRangeStart,
380    /// An Azure error occurred.
381    #[error(transparent)]
382    Azure(#[from] AzureError),
383    /// An S3 error occurred.
384    #[error(transparent)]
385    S3(#[from] S3Error),
386    /// A Google Cloud Storage error occurred.
387    #[error(transparent)]
388    Google(#[from] GoogleError),
389    /// An I/O error occurred.
390    #[error(transparent)]
391    Io(#[from] std::io::Error),
392    /// A directory walking error occurred.
393    #[error(transparent)]
394    Walk(#[from] walkdir::Error),
395    /// A reqwest error occurred.
396    #[error(transparent)]
397    Reqwest(#[from] reqwest::Error),
398    /// A reqwest middleware error occurred.
399    #[error(transparent)]
400    Middleware(#[from] reqwest_middleware::Error),
401    /// A temp file persist error occurred.
402    #[error(transparent)]
403    Temp(#[from] tempfile::PersistError),
404}
405
406impl Error {
407    /// Converts the error into a retry error.
408    fn into_retry_error(self) -> RetryError<Self> {
409        match &self {
410            Error::Server { status, .. }
411            | Error::Azure(AzureError::UnexpectedResponse { status, .. })
412                if status.is_server_error() =>
413            {
414                RetryError::transient(self)
415            }
416            Error::Io(_) | Error::Reqwest(_) | Error::Middleware(_) => RetryError::transient(self),
417            _ => RetryError::permanent(self),
418        }
419    }
420}
421
422/// Represents a result for copy operations.
423pub type Result<T> = std::result::Result<T, Error>;
424
425/// Represents an event that may occur during a file transfer.
426#[derive(Debug, Clone)]
427pub enum TransferEvent {
428    /// A transfer of a file has been started.
429    TransferStarted {
430        /// The id of the file transfer.
431        ///
432        /// This is a monotonic counter that is increased every transfer.
433        id: u64,
434        /// The path of the file being transferred.
435        path: PathBuf,
436        /// The number of blocks in the file.
437        blocks: u64,
438        /// The size of the file being transferred.
439        ///
440        /// This is `None` when downloading a file of unknown size.
441        size: Option<u64>,
442    },
443    /// A transfer of a block has started.
444    BlockStarted {
445        /// The id of the file transfer.
446        id: u64,
447        /// The block number being transferred.
448        block: u64,
449        /// The size of the block being transferred.
450        ///
451        /// This is `None` when downloading a file of unknown size.
452        size: Option<u64>,
453    },
454    /// A transfer of a block has made progress.
455    BlockProgress {
456        /// The id of the transfer.
457        id: u64,
458        /// The block number being transferred.
459        block: u64,
460        /// The total number of bytes transferred in the block so far.
461        transferred: u64,
462    },
463    /// A transfer of a block has completed.
464    BlockCompleted {
465        /// The id of the transfer.
466        id: u64,
467        /// The block number being transferred.
468        block: u64,
469        /// Whether or not the transfer failed.
470        failed: bool,
471    },
472    /// A file transfer has completed.
473    TransferCompleted {
474        /// The id of the transfer.
475        id: u64,
476        /// Whether or not the transfer failed.
477        failed: bool,
478    },
479}
480
481/// Copies a local file to another path.
482///
483/// This differs from `tokio::fs::copy` in that progress events will be sent.
484async fn copy_local(
485    source: &Path,
486    destination: &Path,
487    cancel: CancellationToken,
488    events: Option<broadcast::Sender<TransferEvent>>,
489) -> Result<()> {
490    // The transfer id for the copy.
491    const ID: u64 = 0;
492    /// The block index for the copy.
493    const BLOCK: u64 = 0;
494
495    // Wrap the source stream with a transfer stream to emit events
496    let mut reader = StreamReader::new(TransferStream::new(
497        ReaderStream::new(BufReader::new(
498            OpenOptions::new().read(true).open(source).await?,
499        )),
500        ID,
501        BLOCK,
502        0,
503        events.clone(),
504    ));
505
506    let mut writer = BufWriter::new(
507        OpenOptions::new()
508            .create_new(true)
509            .write(true)
510            .open(destination)
511            .await?,
512    );
513
514    // Send the transfer and block started event
515    if let Some(events) = &events {
516        let size = std::fs::metadata(source)?.len();
517
518        events
519            .send(TransferEvent::TransferStarted {
520                id: ID,
521                path: destination.to_path_buf(),
522                blocks: 1,
523                size: Some(size),
524            })
525            .ok();
526
527        events
528            .send(TransferEvent::BlockStarted {
529                id: ID,
530                block: BLOCK,
531                size: Some(size),
532            })
533            .ok();
534    }
535
536    // Copy the reader stream to the writer stream
537    let result = tokio::select! {
538        _ = cancel.cancelled() => {
539            drop(writer);
540            std::fs::remove_file(destination).ok();
541            Err(Error::Canceled)
542        },
543        r = tokio::io::copy(&mut reader, &mut writer) => {
544            r?;
545            Ok(())
546        }
547    };
548
549    // Send the block and transfer end event
550    if let Some(events) = &events {
551        events
552            .send(TransferEvent::BlockCompleted {
553                id: ID,
554                block: BLOCK,
555                failed: result.is_err(),
556            })
557            .ok();
558
559        events
560            .send(TransferEvent::TransferCompleted {
561                id: ID,
562                failed: result.is_err(),
563            })
564            .ok();
565    }
566
567    result
568}
569
570/// Copies a source location to a destination location.
571///
572/// A location may either be a local path or a remote URL.
573///
574/// If provided, the `events` sender will be used to send transfer events.
575///
576/// _Note: copying between two remote locations is not supported._
577///
578/// # Azure Blob Storage
579///
580/// Supported remote URLs for Azure Blob Storage:
581///
582/// * `az` schemed URLs in the format `az://<account>/<container>/<blob>`.
583/// * `https` schemed URLs in the format `https://<account>.blob.core.windows.net/<container>/<blob>`.
584///
585/// If authentication is required, the URL is expected to contain a SAS token in
586/// its query parameters.
587///
588/// # Amazon S3
589///
590/// Supported remote URLs for S3 Storage:
591///
592/// * `s3` schemed URLs in the format: `s3://<bucket>/<object>` (note: uses the
593///   default region).
594/// * `https` schemed URLs in the format `https://<bucket>.s3.<region>.amazonaws.com/<object>`.
595/// * `https` schemed URLs in the format `https://<region>.s3.amazonaws.com/<bucket>/<object>`.
596///
597/// If authentication is required, the provided `Config` must have S3
598/// authentication information.
599///
600/// # Google Cloud Storage
601///
602/// Supported remote URLs for Google Cloud Storage:
603///
604/// * `gs` schemed URLs in the format: `gs://<bucket>/<object>`.
605/// * `https` schemed URLs in the format `https://<bucket>.storage.googleapis.com/<object>`.
606/// * `https` schemed URLs in the format `https://storage.googleapis.com/<bucket>/<object>`.
607///
608/// If authentication is required, the provided `Config` must have Google
609/// authentication information.
610///
611/// Note that [HMAC authentication](https://cloud.google.com/storage/docs/authentication/hmackeys)
612/// is used for Google Cloud Storage access.
613pub async fn copy(
614    config: Config,
615    client: HttpClient,
616    source: impl Into<Location<'_>>,
617    destination: impl Into<Location<'_>>,
618    cancel: CancellationToken,
619    events: Option<broadcast::Sender<TransferEvent>>,
620) -> Result<()> {
621    let source = source.into();
622    let destination = destination.into();
623
624    match (source, destination) {
625        (Location::Path(source), Location::Path(destination)) => {
626            if destination.exists() {
627                return Err(Error::DestinationExists(destination.to_path_buf()));
628            }
629
630            // Two local locations, just perform a copy
631            Ok(copy_local(source, destination, cancel, events).await?)
632        }
633        (Location::Path(source), Location::Url(destination)) => {
634            // Perform a copy if the the destination is a local path
635            if let Some(destination) = destination.to_local_path()? {
636                return copy_local(source, &destination, cancel, events).await;
637            }
638
639            if AzureBlobStorageBackend::is_supported_url(&config, &destination) {
640                let transfer =
641                    FileTransfer::new(AzureBlobStorageBackend::new(config, client, events), cancel);
642                transfer.upload(source, destination).await
643            } else if S3StorageBackend::is_supported_url(&config, &destination) {
644                let transfer =
645                    FileTransfer::new(S3StorageBackend::new(config, client, events), cancel);
646                transfer.upload(source, destination).await
647            } else if GoogleStorageBackend::is_supported_url(&config, &destination) {
648                let transfer =
649                    FileTransfer::new(GoogleStorageBackend::new(config, client, events), cancel);
650                transfer.upload(source, destination).await
651            } else {
652                Err(Error::UnsupportedUrl(destination))
653            }
654        }
655        (Location::Url(source), Location::Path(destination)) => {
656            if destination.exists() {
657                return Err(Error::DestinationExists(destination.to_path_buf()));
658            }
659
660            // Perform a copy if the the source is a local path
661            if let Some(source) = source.to_local_path()? {
662                return copy_local(&source, destination, cancel, events).await;
663            }
664
665            if AzureBlobStorageBackend::is_supported_url(&config, &source) {
666                let transfer =
667                    FileTransfer::new(AzureBlobStorageBackend::new(config, client, events), cancel);
668                transfer.download(source, destination).await
669            } else if S3StorageBackend::is_supported_url(&config, &source) {
670                let transfer =
671                    FileTransfer::new(S3StorageBackend::new(config, client, events), cancel);
672                transfer.download(source, destination).await
673            } else if GoogleStorageBackend::is_supported_url(&config, &source) {
674                let transfer =
675                    FileTransfer::new(GoogleStorageBackend::new(config, client, events), cancel);
676                transfer.download(source, destination).await
677            } else {
678                let transfer =
679                    FileTransfer::new(GenericStorageBackend::new(config, client, events), cancel);
680                transfer.download(source, destination).await
681            }
682        }
683        (Location::Url(source), Location::Url(destination)) => {
684            if let (Some(source), Some(destination)) =
685                (source.to_local_path()?, destination.to_local_path()?)
686            {
687                // Two local locations, just perform a copy
688                return copy_local(&source, &destination, cancel, events).await;
689            }
690
691            Err(Error::RemoteCopyNotSupported)
692        }
693    }
694}