#![deny(rustdoc::broken_intra_doc_links)]
#![cfg_attr(docsrs, feature(doc_cfg))]
use std::borrow::Cow;
use std::fmt;
use std::ops::Deref;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use http_cache_stream_reqwest::Cache;
use http_cache_stream_reqwest::storage::DefaultCacheStorage;
use reqwest::Client;
use reqwest::StatusCode;
use reqwest_middleware::ClientBuilder;
use reqwest_middleware::ClientWithMiddleware;
use tokio::fs::OpenOptions;
use tokio::io::BufReader;
use tokio::io::BufWriter;
use tokio::sync::broadcast;
use tokio_retry2::RetryError;
use tokio_util::io::ReaderStream;
use tokio_util::io::StreamReader;
use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing::warn;
use url::Url;
use crate::backend::StorageBackend;
use crate::backend::azure::AzureBlobStorageBackend;
use crate::backend::generic::GenericStorageBackend;
use crate::backend::google::GoogleStorageBackend;
use crate::backend::s3::S3StorageBackend;
use crate::streams::TransferStream;
use crate::transfer::FileTransfer;
mod backend;
#[cfg(feature = "cli")]
#[cfg_attr(docsrs, doc(cfg(feature = "cli")))]
pub mod cli;
mod config;
mod generator;
mod pool;
mod streams;
mod transfer;
pub use backend::azure::AzureError;
pub use backend::google::GoogleError;
pub use backend::s3::S3Error;
pub use config::*;
pub use generator::*;
const USER_AGENT: &str = concat!(
"cloud-copy/",
env!("CARGO_PKG_VERSION"),
" (https://github.com/stjude-rust-labs/cloud-copy)"
);
const ONE_MEBIBYTE: u64 = 1024 * 1024;
const BLOCK_SIZE_THRESHOLD: u64 = 256 * ONE_MEBIBYTE;
fn notify_retry(e: &Error, duration: Duration) {
if !duration.is_zero() {
let secs = duration.as_secs();
warn!(
"network operation failed (retried after waiting {secs} second{s}): {e}",
s = if secs == 1 { "" } else { "s" }
);
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum Location<'a> {
Path(&'a Path),
Url(Cow<'a, Url>),
}
impl<'a> Location<'a> {
pub fn new(s: &'a str) -> Self {
match s.parse::<Url>() {
Ok(url) => Self::Url(Cow::Owned(url)),
Err(_) => Self::Path(Path::new(s)),
}
}
}
impl fmt::Display for Location<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Path(path) => write!(f, "{path}", path = path.display()),
Self::Url(url) => write!(f, "{url}", url = url.display()),
}
}
}
impl<'a> From<&'a str> for Location<'a> {
fn from(value: &'a str) -> Self {
Self::new(value)
}
}
impl<'a> From<&'a String> for Location<'a> {
fn from(value: &'a String) -> Self {
Self::new(value)
}
}
impl<'a> From<&'a Path> for Location<'a> {
fn from(value: &'a Path) -> Self {
Self::Path(value)
}
}
impl<'a> From<&'a PathBuf> for Location<'a> {
fn from(value: &'a PathBuf) -> Self {
Self::Path(value.as_path())
}
}
impl<'a> From<&'a Url> for Location<'a> {
fn from(value: &'a Url) -> Self {
Self::Url(Cow::Borrowed(value))
}
}
impl From<Url> for Location<'_> {
fn from(value: Url) -> Self {
Self::Url(Cow::Owned(value))
}
}
pub trait UrlExt {
fn to_local_path(&self) -> Result<Option<PathBuf>>;
fn display(&self) -> impl fmt::Display;
}
impl UrlExt for Url {
fn to_local_path(&self) -> Result<Option<PathBuf>> {
if self.scheme() != "file" {
return Ok(None);
}
self.to_file_path()
.map(Some)
.map_err(|_| Error::InvalidFileUrl(self.clone()))
}
fn display(&self) -> impl fmt::Display {
struct Display<'a>(&'a Url);
impl fmt::Display for Display<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{scheme}://{host}{path}",
scheme = self.0.scheme(),
host = self.0.host_str().unwrap_or_default(),
path = self.0.path()
)
}
}
Display(self)
}
}
#[derive(Clone)]
pub struct HttpClient {
client: ClientWithMiddleware,
cache: Option<Arc<Cache<DefaultCacheStorage>>>,
}
impl HttpClient {
const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(60);
pub fn new() -> Self {
let client = Client::builder()
.connect_timeout(Self::DEFAULT_CONNECT_TIMEOUT)
.read_timeout(Self::DEFAULT_READ_TIMEOUT)
.build()
.expect("failed to build HTTP client");
Self::from_existing(client)
}
pub fn new_with_cache(cache_dir: impl AsRef<Path>) -> Self {
let client = Client::builder()
.connect_timeout(Self::DEFAULT_CONNECT_TIMEOUT)
.read_timeout(Self::DEFAULT_READ_TIMEOUT)
.build()
.expect("failed to build HTTP client");
Self::from_existing_with_cache(client, cache_dir)
}
pub fn from_existing(client: reqwest::Client) -> Self {
Self {
client: ClientWithMiddleware::new(client, Vec::new()),
cache: None,
}
}
pub fn from_existing_with_cache(client: reqwest::Client, cache_dir: impl AsRef<Path>) -> Self {
let cache_dir = cache_dir.as_ref();
info!(
"using HTTP download cache directory `{dir}`",
dir = cache_dir.display()
);
let cache = Arc::new(Cache::new(DefaultCacheStorage::new(cache_dir)));
Self {
client: ClientBuilder::new(client).with_arc(cache.clone()).build(),
cache: Some(cache),
}
}
pub fn cache(&self) -> Option<&Cache<DefaultCacheStorage>> {
self.cache.as_deref()
}
}
impl Default for HttpClient {
fn default() -> Self {
Self::new()
}
}
impl Deref for HttpClient {
type Target = ClientWithMiddleware;
fn deref(&self) -> &Self::Target {
&self.client
}
}
struct DisplayMessage<'a> {
status: StatusCode,
message: &'a str,
}
impl fmt::Display for DisplayMessage<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.message.is_empty() {
write!(
f,
" ({reason})",
reason = self
.status
.canonical_reason()
.unwrap_or("<unknown status code>")
.to_lowercase()
)
} else {
write!(f, ": {message}", message = self.message)
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("the operation was canceled")]
Canceled,
#[error("copying between remote locations is not supported")]
RemoteCopyNotSupported,
#[error("remote URL has an unsupported URL scheme `{0}`")]
UnsupportedUrlScheme(String),
#[error("URL `{url}` is not for a supported cloud service", url = .0.display())]
UnsupportedUrl(Url),
#[error("file URL `{url}` cannot be represented as a local path", url = .0.display())]
InvalidFileUrl(Url),
#[error("the specified path cannot be a root directory or empty")]
InvalidPath,
#[error("the remote content was modified during the download")]
RemoteContentModified,
#[error("failed to create directory `{path}`: {error}", path = .path.display())]
DirectoryCreationFailed {
path: PathBuf,
error: std::io::Error,
},
#[error("failed to create temporary file: {error}")]
CreateTempFile {
error: std::io::Error,
},
#[error("failed to persist temporary file: {error}")]
PersistTempFile {
error: std::io::Error,
},
#[error("the destination path `{path}` already exists", path = .0.display())]
LocalDestinationExists(PathBuf),
#[error("the destination URL `{url}` already exists", url = .0.display())]
RemoteDestinationExists(Url),
#[error(
"server returned status {status}{message}",
status = .status.as_u16(),
message = DisplayMessage { status: *.status, message }
)]
Server {
status: reqwest::StatusCode,
message: String,
},
#[error(
"server responded with a `content-range` header that does not start at the requested \
offset"
)]
UnexpectedContentRangeStart,
#[error(transparent)]
Azure(#[from] AzureError),
#[error(transparent)]
S3(#[from] S3Error),
#[error(transparent)]
Google(#[from] GoogleError),
#[error(transparent)]
Io(#[from] std::io::Error),
#[error(transparent)]
Walk(#[from] walkdir::Error),
#[error(transparent)]
Reqwest(#[from] reqwest::Error),
#[error(transparent)]
Middleware(#[from] reqwest_middleware::Error),
#[error(transparent)]
Temp(#[from] tempfile::PersistError),
}
impl Error {
fn into_retry_error(self) -> RetryError<Self> {
match &self {
Error::Server { status, .. }
| Error::Azure(AzureError::UnexpectedResponse { status, .. })
if status.is_server_error() =>
{
RetryError::transient(self)
}
Error::Io(_) | Error::Reqwest(_) | Error::Middleware(_) => RetryError::transient(self),
_ => RetryError::permanent(self),
}
}
}
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Clone)]
pub enum TransferEvent {
TransferStarted {
id: u64,
path: PathBuf,
blocks: u64,
size: Option<u64>,
},
BlockStarted {
id: u64,
block: u64,
size: Option<u64>,
},
BlockProgress {
id: u64,
block: u64,
transferred: u64,
},
BlockCompleted {
id: u64,
block: u64,
failed: bool,
},
TransferCompleted {
id: u64,
failed: bool,
},
}
async fn copy_local(
source: &Path,
destination: &Path,
cancel: CancellationToken,
events: Option<broadcast::Sender<TransferEvent>>,
) -> Result<()> {
const ID: u64 = 0;
const BLOCK: u64 = 0;
let mut reader = StreamReader::new(TransferStream::new(
ReaderStream::new(BufReader::new(
OpenOptions::new().read(true).open(source).await?,
)),
ID,
BLOCK,
0,
events.clone(),
));
let mut writer = BufWriter::new(
OpenOptions::new()
.create_new(true)
.write(true)
.open(destination)
.await?,
);
if let Some(events) = &events {
let size = std::fs::metadata(source)?.len();
events
.send(TransferEvent::TransferStarted {
id: ID,
path: destination.to_path_buf(),
blocks: 1,
size: Some(size),
})
.ok();
events
.send(TransferEvent::BlockStarted {
id: ID,
block: BLOCK,
size: Some(size),
})
.ok();
}
let result = tokio::select! {
_ = cancel.cancelled() => {
drop(writer);
std::fs::remove_file(destination).ok();
Err(Error::Canceled)
},
r = tokio::io::copy(&mut reader, &mut writer) => {
r?;
Ok(())
}
};
if let Some(events) = &events {
events
.send(TransferEvent::BlockCompleted {
id: ID,
block: BLOCK,
failed: result.is_err(),
})
.ok();
events
.send(TransferEvent::TransferCompleted {
id: ID,
failed: result.is_err(),
})
.ok();
}
result
}
pub async fn copy(
config: Config,
client: HttpClient,
source: impl Into<Location<'_>>,
destination: impl Into<Location<'_>>,
cancel: CancellationToken,
events: Option<broadcast::Sender<TransferEvent>>,
) -> Result<()> {
let source = source.into();
let destination = destination.into();
match (source, destination) {
(Location::Path(source), Location::Path(destination)) => {
if !config.overwrite && destination.exists() {
return Err(Error::LocalDestinationExists(destination.to_path_buf()));
}
Ok(copy_local(source, destination, cancel, events).await?)
}
(Location::Path(source), Location::Url(destination)) => {
if let Some(destination) = destination.to_local_path()? {
return copy_local(source, &destination, cancel, events).await;
}
if AzureBlobStorageBackend::is_supported_url(&config, &destination) {
let destination = AzureBlobStorageBackend::rewrite_url(&config, &destination)?;
let transfer =
FileTransfer::new(AzureBlobStorageBackend::new(config, client, events), cancel);
transfer.upload(source, destination.into_owned()).await
} else if S3StorageBackend::is_supported_url(&config, &destination) {
let destination = S3StorageBackend::rewrite_url(&config, &destination)?;
let transfer =
FileTransfer::new(S3StorageBackend::new(config, client, events), cancel);
transfer.upload(source, destination.into_owned()).await
} else if GoogleStorageBackend::is_supported_url(&config, &destination) {
let destination = GoogleStorageBackend::rewrite_url(&config, &destination)?;
let transfer =
FileTransfer::new(GoogleStorageBackend::new(config, client, events), cancel);
transfer.upload(source, destination.into_owned()).await
} else {
Err(Error::UnsupportedUrl(destination.into_owned()))
}
}
(Location::Url(source), Location::Path(destination)) => {
if !config.overwrite && destination.exists() {
return Err(Error::LocalDestinationExists(destination.to_path_buf()));
}
if let Some(source) = source.to_local_path()? {
return copy_local(&source, destination, cancel, events).await;
}
if AzureBlobStorageBackend::is_supported_url(&config, &source) {
let source = AzureBlobStorageBackend::rewrite_url(&config, &source)?;
let transfer =
FileTransfer::new(AzureBlobStorageBackend::new(config, client, events), cancel);
transfer.download(source.into_owned(), destination).await
} else if S3StorageBackend::is_supported_url(&config, &source) {
let source = S3StorageBackend::rewrite_url(&config, &source)?;
let transfer =
FileTransfer::new(S3StorageBackend::new(config, client, events), cancel);
transfer.download(source.into_owned(), destination).await
} else if GoogleStorageBackend::is_supported_url(&config, &source) {
let source = GoogleStorageBackend::rewrite_url(&config, &source)?;
let transfer =
FileTransfer::new(GoogleStorageBackend::new(config, client, events), cancel);
transfer.download(source.into_owned(), destination).await
} else {
let transfer =
FileTransfer::new(GenericStorageBackend::new(config, client, events), cancel);
transfer.download(source.into_owned(), destination).await
}
}
(Location::Url(source), Location::Url(destination)) => {
if let (Some(source), Some(destination)) =
(source.to_local_path()?, destination.to_local_path()?)
{
return copy_local(&source, &destination, cancel, events).await;
}
Err(Error::RemoteCopyNotSupported)
}
}
}
pub fn rewrite_url<'a>(config: &Config, url: &'a Url) -> Result<Cow<'a, Url>> {
if AzureBlobStorageBackend::is_supported_url(config, url) {
AzureBlobStorageBackend::rewrite_url(config, url)
} else if S3StorageBackend::is_supported_url(config, url) {
S3StorageBackend::rewrite_url(config, url)
} else if GoogleStorageBackend::is_supported_url(config, url) {
GoogleStorageBackend::rewrite_url(config, url)
} else {
Ok(Cow::Borrowed(url))
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn rewrite_urls() {
let config = Config::default();
assert_eq!(
rewrite_url(&config, &"http://example.com".parse().unwrap())
.unwrap()
.as_str(),
"http://example.com/"
);
assert_eq!(
rewrite_url(&config, &"az://foo/bar/baz".parse().unwrap())
.unwrap()
.as_str(),
"https://foo.blob.core.windows.net/bar/baz"
);
assert_eq!(
rewrite_url(&config, &"s3://foo/bar/baz".parse().unwrap())
.unwrap()
.as_str(),
"https://foo.s3.us-east-1.amazonaws.com/bar/baz"
);
assert_eq!(
rewrite_url(&config, &"gs://foo/bar/baz".parse().unwrap())
.unwrap()
.as_str(),
"https://foo.storage.googleapis.com/bar/baz"
);
let config = Config {
s3: S3Config {
region: Some("my-region".into()),
..Default::default()
},
..Default::default()
};
assert_eq!(
rewrite_url(&config, &"s3://foo/bar/baz".parse().unwrap())
.unwrap()
.as_str(),
"https://foo.s3.my-region.amazonaws.com/bar/baz"
);
let config = Config {
azure: AzureConfig { use_azurite: true },
s3: S3Config {
use_localstack: true,
..Default::default()
},
..Default::default()
};
assert_eq!(
rewrite_url(&config, &"az://foo/bar/baz".parse().unwrap())
.unwrap()
.as_str(),
"http://foo.blob.core.windows.net.localhost:10000/bar/baz"
);
assert_eq!(
rewrite_url(&config, &"s3://foo/bar/baz".parse().unwrap())
.unwrap()
.as_str(),
"http://foo.s3.us-east-1.localhost.localstack.cloud:4566/bar/baz"
);
}
}