1#![deny(rustdoc::broken_intra_doc_links)]
17#![cfg_attr(docsrs, feature(doc_cfg))]
18
19use std::borrow::Cow;
20use std::fmt;
21use std::ops::Deref;
22use std::path::Path;
23use std::path::PathBuf;
24use std::sync::Arc;
25use std::time::Duration;
26
27use http_cache_stream_reqwest::Cache;
28use http_cache_stream_reqwest::storage::DefaultCacheStorage;
29use reqwest::Client;
30use reqwest::StatusCode;
31use reqwest_middleware::ClientBuilder;
32use reqwest_middleware::ClientWithMiddleware;
33use tokio::fs::OpenOptions;
34use tokio::io::BufReader;
35use tokio::io::BufWriter;
36use tokio::sync::broadcast;
37use tokio_retry2::RetryError;
38use tokio_util::io::ReaderStream;
39use tokio_util::io::StreamReader;
40use tokio_util::sync::CancellationToken;
41use tracing::info;
42use tracing::warn;
43use url::Url;
44
45use crate::backend::StorageBackend;
46use crate::backend::azure::AzureBlobStorageBackend;
47use crate::backend::generic::GenericStorageBackend;
48use crate::backend::google::GoogleStorageBackend;
49use crate::backend::s3::S3StorageBackend;
50use crate::streams::TransferStream;
51use crate::transfer::FileTransfer;
52
53mod backend;
54#[cfg(feature = "cli")]
55#[cfg_attr(docsrs, doc(cfg(feature = "cli")))]
56pub mod cli;
57mod config;
58mod generator;
59mod pool;
60mod streams;
61mod transfer;
62
63pub use backend::azure::AzureError;
64pub use backend::google::GoogleError;
65pub use backend::s3::S3Error;
66pub use config::*;
67pub use generator::*;
68
69const USER_AGENT: &str = concat!(
71 "cloud-copy/",
72 env!("CARGO_PKG_VERSION"),
73 " (https://github.com/stjude-rust-labs/cloud-copy)"
74);
75
76const ONE_MEBIBYTE: u64 = 1024 * 1024;
78
79const BLOCK_SIZE_THRESHOLD: u64 = 256 * ONE_MEBIBYTE;
82
83fn notify_retry(e: &Error, duration: Duration) {
85 if !duration.is_zero() {
87 let secs = duration.as_secs();
88 warn!(
89 "network operation failed (retried after waiting {secs} second{s}): {e}",
90 s = if secs == 1 { "" } else { "s" }
91 );
92 }
93}
94
95#[derive(Debug, Clone, PartialEq, Eq, Hash)]
97pub enum Location<'a> {
98 Path(&'a Path),
100 Url(Cow<'a, Url>),
102}
103
104impl<'a> Location<'a> {
105 pub fn new(s: &'a str) -> Self {
107 match s.parse::<Url>() {
108 Ok(url) => Self::Url(Cow::Owned(url)),
109 Err(_) => Self::Path(Path::new(s)),
110 }
111 }
112}
113
114impl fmt::Display for Location<'_> {
115 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116 match self {
117 Self::Path(path) => write!(f, "{path}", path = path.display()),
118 Self::Url(url) => write!(f, "{url}", url = url.display()),
119 }
120 }
121}
122
123impl<'a> From<&'a str> for Location<'a> {
124 fn from(value: &'a str) -> Self {
125 Self::new(value)
126 }
127}
128
129impl<'a> From<&'a String> for Location<'a> {
130 fn from(value: &'a String) -> Self {
131 Self::new(value)
132 }
133}
134
135impl<'a> From<&'a Path> for Location<'a> {
136 fn from(value: &'a Path) -> Self {
137 Self::Path(value)
138 }
139}
140
141impl<'a> From<&'a PathBuf> for Location<'a> {
142 fn from(value: &'a PathBuf) -> Self {
143 Self::Path(value.as_path())
144 }
145}
146
147impl<'a> From<&'a Url> for Location<'a> {
148 fn from(value: &'a Url) -> Self {
149 Self::Url(Cow::Borrowed(value))
150 }
151}
152
153impl From<Url> for Location<'_> {
154 fn from(value: Url) -> Self {
155 Self::Url(Cow::Owned(value))
156 }
157}
158
159pub trait UrlExt {
161 fn to_local_path(&self) -> Result<Option<PathBuf>>;
168
169 fn display(&self) -> impl fmt::Display;
174}
175
176impl UrlExt for Url {
177 fn to_local_path(&self) -> Result<Option<PathBuf>> {
178 if self.scheme() != "file" {
179 return Ok(None);
180 }
181
182 self.to_file_path()
183 .map(Some)
184 .map_err(|_| Error::InvalidFileUrl(self.clone()))
185 }
186
187 fn display(&self) -> impl fmt::Display {
188 struct Display<'a>(&'a Url);
190
191 impl fmt::Display for Display<'_> {
192 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
193 write!(
194 f,
195 "{scheme}://{host}{path}",
196 scheme = self.0.scheme(),
197 host = self.0.host_str().unwrap_or_default(),
198 path = self.0.path()
199 )
200 }
201 }
202
203 Display(self)
204 }
205}
206
207#[derive(Clone)]
209pub struct HttpClient {
210 client: ClientWithMiddleware,
212 cache: Option<Arc<Cache<DefaultCacheStorage>>>,
216}
217
218impl HttpClient {
219 const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
220 const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(60);
221
222 pub fn new() -> Self {
224 let client = Client::builder()
225 .connect_timeout(Self::DEFAULT_CONNECT_TIMEOUT)
226 .read_timeout(Self::DEFAULT_READ_TIMEOUT)
227 .build()
228 .expect("failed to build HTTP client");
229
230 Self::from_existing(client)
231 }
232
233 pub fn new_with_cache(cache_dir: impl AsRef<Path>) -> Self {
235 let client = Client::builder()
236 .connect_timeout(Self::DEFAULT_CONNECT_TIMEOUT)
237 .read_timeout(Self::DEFAULT_READ_TIMEOUT)
238 .build()
239 .expect("failed to build HTTP client");
240
241 Self::from_existing_with_cache(client, cache_dir)
242 }
243
244 pub fn from_existing(client: reqwest::Client) -> Self {
246 Self {
247 client: ClientWithMiddleware::new(client, Vec::new()),
248 cache: None,
249 }
250 }
251
252 pub fn from_existing_with_cache(client: reqwest::Client, cache_dir: impl AsRef<Path>) -> Self {
255 let cache_dir = cache_dir.as_ref();
256 info!(
257 "using HTTP download cache directory `{dir}`",
258 dir = cache_dir.display()
259 );
260
261 let cache = Arc::new(Cache::new(DefaultCacheStorage::new(cache_dir)));
262
263 Self {
264 client: ClientBuilder::new(client).with_arc(cache.clone()).build(),
265 cache: Some(cache),
266 }
267 }
268
269 pub fn cache(&self) -> Option<&Cache<DefaultCacheStorage>> {
273 self.cache.as_deref()
274 }
275}
276
277impl Default for HttpClient {
278 fn default() -> Self {
279 Self::new()
280 }
281}
282
283impl Deref for HttpClient {
284 type Target = ClientWithMiddleware;
285
286 fn deref(&self) -> &Self::Target {
287 &self.client
288 }
289}
290
291struct DisplayMessage<'a> {
293 status: StatusCode,
295 message: &'a str,
299}
300
301impl fmt::Display for DisplayMessage<'_> {
302 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
303 if self.message.is_empty() {
304 write!(
305 f,
306 " ({reason})",
307 reason = self
308 .status
309 .canonical_reason()
310 .unwrap_or("<unknown status code>")
311 .to_lowercase()
312 )
313 } else {
314 write!(f, ": {message}", message = self.message)
315 }
316 }
317}
318
319#[derive(Debug, thiserror::Error)]
321pub enum Error {
322 #[error("the operation was canceled")]
324 Canceled,
325 #[error("copying between remote locations is not supported")]
327 RemoteCopyNotSupported,
328 #[error("remote URL has an unsupported URL scheme `{0}`")]
330 UnsupportedUrlScheme(String),
331 #[error("URL `{url}` is not for a supported cloud service", url = .0.display())]
333 UnsupportedUrl(Url),
334 #[error("file URL `{url}` cannot be represented as a local path", url = .0.display())]
336 InvalidFileUrl(Url),
337 #[error("the specified path cannot be a root directory or empty")]
339 InvalidPath,
340 #[error("the remote content was modified during the download")]
342 RemoteContentModified,
343 #[error("failed to create directory `{path}`: {error}", path = .path.display())]
345 DirectoryCreationFailed {
346 path: PathBuf,
348 error: std::io::Error,
350 },
351 #[error("failed to create temporary file: {error}")]
353 CreateTempFile {
354 error: std::io::Error,
356 },
357 #[error("failed to persist temporary file: {error}")]
359 PersistTempFile {
360 error: std::io::Error,
362 },
363 #[error("the destination path `{path}` already exists", path = .0.display())]
365 LocalDestinationExists(PathBuf),
366 #[error("the destination URL `{url}` already exists", url = .0.display())]
368 RemoteDestinationExists(Url),
369 #[error(
371 "server returned status {status}{message}",
372 status = .status.as_u16(),
373 message = DisplayMessage { status: *.status, message }
374 )]
375 Server {
376 status: reqwest::StatusCode,
378 message: String,
382 },
383 #[error(
385 "server responded with a `content-range` header that does not start at the requested \
386 offset"
387 )]
388 UnexpectedContentRangeStart,
389 #[error(transparent)]
391 Azure(#[from] AzureError),
392 #[error(transparent)]
394 S3(#[from] S3Error),
395 #[error(transparent)]
397 Google(#[from] GoogleError),
398 #[error(transparent)]
400 Io(#[from] std::io::Error),
401 #[error(transparent)]
403 Walk(#[from] walkdir::Error),
404 #[error(transparent)]
406 Reqwest(#[from] reqwest::Error),
407 #[error(transparent)]
409 Middleware(#[from] reqwest_middleware::Error),
410 #[error(transparent)]
412 Temp(#[from] tempfile::PersistError),
413}
414
415impl Error {
416 fn into_retry_error(self) -> RetryError<Self> {
418 match &self {
419 Error::Server { status, .. }
420 | Error::Azure(AzureError::UnexpectedResponse { status, .. })
421 if status.is_server_error() =>
422 {
423 RetryError::transient(self)
424 }
425 Error::Io(_) | Error::Reqwest(_) | Error::Middleware(_) => RetryError::transient(self),
426 _ => RetryError::permanent(self),
427 }
428 }
429}
430
431pub type Result<T> = std::result::Result<T, Error>;
433
434#[derive(Debug, Clone)]
436pub enum TransferEvent {
437 TransferStarted {
439 id: u64,
443 path: PathBuf,
445 blocks: u64,
447 size: Option<u64>,
451 },
452 BlockStarted {
454 id: u64,
456 block: u64,
458 size: Option<u64>,
462 },
463 BlockProgress {
465 id: u64,
467 block: u64,
469 transferred: u64,
471 },
472 BlockCompleted {
474 id: u64,
476 block: u64,
478 failed: bool,
480 },
481 TransferCompleted {
483 id: u64,
485 failed: bool,
487 },
488}
489
490async fn copy_local(
494 source: &Path,
495 destination: &Path,
496 cancel: CancellationToken,
497 events: Option<broadcast::Sender<TransferEvent>>,
498) -> Result<()> {
499 const ID: u64 = 0;
501 const BLOCK: u64 = 0;
503
504 let mut reader = StreamReader::new(TransferStream::new(
506 ReaderStream::new(BufReader::new(
507 OpenOptions::new().read(true).open(source).await?,
508 )),
509 ID,
510 BLOCK,
511 0,
512 events.clone(),
513 ));
514
515 let mut writer = BufWriter::new(
516 OpenOptions::new()
517 .create_new(true)
518 .write(true)
519 .open(destination)
520 .await?,
521 );
522
523 if let Some(events) = &events {
525 let size = std::fs::metadata(source)?.len();
526
527 events
528 .send(TransferEvent::TransferStarted {
529 id: ID,
530 path: destination.to_path_buf(),
531 blocks: 1,
532 size: Some(size),
533 })
534 .ok();
535
536 events
537 .send(TransferEvent::BlockStarted {
538 id: ID,
539 block: BLOCK,
540 size: Some(size),
541 })
542 .ok();
543 }
544
545 let result = tokio::select! {
547 _ = cancel.cancelled() => {
548 drop(writer);
549 std::fs::remove_file(destination).ok();
550 Err(Error::Canceled)
551 },
552 r = tokio::io::copy(&mut reader, &mut writer) => {
553 r?;
554 Ok(())
555 }
556 };
557
558 if let Some(events) = &events {
560 events
561 .send(TransferEvent::BlockCompleted {
562 id: ID,
563 block: BLOCK,
564 failed: result.is_err(),
565 })
566 .ok();
567
568 events
569 .send(TransferEvent::TransferCompleted {
570 id: ID,
571 failed: result.is_err(),
572 })
573 .ok();
574 }
575
576 result
577}
578
579pub async fn copy(
623 config: Config,
624 client: HttpClient,
625 source: impl Into<Location<'_>>,
626 destination: impl Into<Location<'_>>,
627 cancel: CancellationToken,
628 events: Option<broadcast::Sender<TransferEvent>>,
629) -> Result<()> {
630 let source = source.into();
631 let destination = destination.into();
632
633 match (source, destination) {
634 (Location::Path(source), Location::Path(destination)) => {
635 if !config.overwrite && destination.exists() {
636 return Err(Error::LocalDestinationExists(destination.to_path_buf()));
637 }
638
639 Ok(copy_local(source, destination, cancel, events).await?)
641 }
642 (Location::Path(source), Location::Url(destination)) => {
643 if let Some(destination) = destination.to_local_path()? {
645 return copy_local(source, &destination, cancel, events).await;
646 }
647
648 if AzureBlobStorageBackend::is_supported_url(&config, &destination) {
649 let destination = AzureBlobStorageBackend::rewrite_url(&config, &destination)?;
650 let transfer =
651 FileTransfer::new(AzureBlobStorageBackend::new(config, client, events), cancel);
652 transfer.upload(source, destination.into_owned()).await
653 } else if S3StorageBackend::is_supported_url(&config, &destination) {
654 let destination = S3StorageBackend::rewrite_url(&config, &destination)?;
655 let transfer =
656 FileTransfer::new(S3StorageBackend::new(config, client, events), cancel);
657 transfer.upload(source, destination.into_owned()).await
658 } else if GoogleStorageBackend::is_supported_url(&config, &destination) {
659 let destination = GoogleStorageBackend::rewrite_url(&config, &destination)?;
660 let transfer =
661 FileTransfer::new(GoogleStorageBackend::new(config, client, events), cancel);
662 transfer.upload(source, destination.into_owned()).await
663 } else {
664 Err(Error::UnsupportedUrl(destination.into_owned()))
665 }
666 }
667 (Location::Url(source), Location::Path(destination)) => {
668 if !config.overwrite && destination.exists() {
669 return Err(Error::LocalDestinationExists(destination.to_path_buf()));
670 }
671
672 if let Some(source) = source.to_local_path()? {
674 return copy_local(&source, destination, cancel, events).await;
675 }
676
677 if AzureBlobStorageBackend::is_supported_url(&config, &source) {
678 let source = AzureBlobStorageBackend::rewrite_url(&config, &source)?;
679 let transfer =
680 FileTransfer::new(AzureBlobStorageBackend::new(config, client, events), cancel);
681 transfer.download(source.into_owned(), destination).await
682 } else if S3StorageBackend::is_supported_url(&config, &source) {
683 let source = S3StorageBackend::rewrite_url(&config, &source)?;
684 let transfer =
685 FileTransfer::new(S3StorageBackend::new(config, client, events), cancel);
686 transfer.download(source.into_owned(), destination).await
687 } else if GoogleStorageBackend::is_supported_url(&config, &source) {
688 let source = GoogleStorageBackend::rewrite_url(&config, &source)?;
689 let transfer =
690 FileTransfer::new(GoogleStorageBackend::new(config, client, events), cancel);
691 transfer.download(source.into_owned(), destination).await
692 } else {
693 let transfer =
694 FileTransfer::new(GenericStorageBackend::new(config, client, events), cancel);
695 transfer.download(source.into_owned(), destination).await
696 }
697 }
698 (Location::Url(source), Location::Url(destination)) => {
699 if let (Some(source), Some(destination)) =
700 (source.to_local_path()?, destination.to_local_path()?)
701 {
702 return copy_local(&source, &destination, cancel, events).await;
704 }
705
706 Err(Error::RemoteCopyNotSupported)
707 }
708 }
709}
710
711pub fn rewrite_url<'a>(config: &Config, url: &'a Url) -> Result<Cow<'a, Url>> {
717 if AzureBlobStorageBackend::is_supported_url(config, url) {
718 AzureBlobStorageBackend::rewrite_url(config, url)
719 } else if S3StorageBackend::is_supported_url(config, url) {
720 S3StorageBackend::rewrite_url(config, url)
721 } else if GoogleStorageBackend::is_supported_url(config, url) {
722 GoogleStorageBackend::rewrite_url(config, url)
723 } else {
724 Ok(Cow::Borrowed(url))
725 }
726}
727
728#[cfg(test)]
729mod test {
730 use super::*;
731
732 #[test]
733 fn rewrite_urls() {
734 let config = Config::default();
735
736 assert_eq!(
737 rewrite_url(&config, &"http://example.com".parse().unwrap())
738 .unwrap()
739 .as_str(),
740 "http://example.com/"
741 );
742
743 assert_eq!(
744 rewrite_url(&config, &"az://foo/bar/baz".parse().unwrap())
745 .unwrap()
746 .as_str(),
747 "https://foo.blob.core.windows.net/bar/baz"
748 );
749
750 assert_eq!(
751 rewrite_url(&config, &"s3://foo/bar/baz".parse().unwrap())
752 .unwrap()
753 .as_str(),
754 "https://foo.s3.us-east-1.amazonaws.com/bar/baz"
755 );
756
757 assert_eq!(
758 rewrite_url(&config, &"gs://foo/bar/baz".parse().unwrap())
759 .unwrap()
760 .as_str(),
761 "https://foo.storage.googleapis.com/bar/baz"
762 );
763
764 let config = Config {
765 s3: S3Config {
766 region: Some("my-region".into()),
767 ..Default::default()
768 },
769 ..Default::default()
770 };
771
772 assert_eq!(
773 rewrite_url(&config, &"s3://foo/bar/baz".parse().unwrap())
774 .unwrap()
775 .as_str(),
776 "https://foo.s3.my-region.amazonaws.com/bar/baz"
777 );
778
779 let config = Config {
780 azure: AzureConfig { use_azurite: true },
781 s3: S3Config {
782 use_localstack: true,
783 ..Default::default()
784 },
785 ..Default::default()
786 };
787
788 assert_eq!(
789 rewrite_url(&config, &"az://foo/bar/baz".parse().unwrap())
790 .unwrap()
791 .as_str(),
792 "http://foo.blob.core.windows.net.localhost:10000/bar/baz"
793 );
794
795 assert_eq!(
796 rewrite_url(&config, &"s3://foo/bar/baz".parse().unwrap())
797 .unwrap()
798 .as_str(),
799 "http://foo.s3.us-east-1.localhost.localstack.cloud:4566/bar/baz"
800 );
801 }
802}