1#![deny(rustdoc::broken_intra_doc_links)]
17#![cfg_attr(docsrs, feature(doc_cfg))]
18
19use std::fmt;
20use std::ops::Deref;
21use std::path::Path;
22use std::path::PathBuf;
23use std::sync::Arc;
24use std::time::Duration;
25
26use http_cache_stream_reqwest::Cache;
27use http_cache_stream_reqwest::storage::DefaultCacheStorage;
28use reqwest::Client;
29use reqwest::StatusCode;
30use reqwest_middleware::ClientBuilder;
31use reqwest_middleware::ClientWithMiddleware;
32use tokio::fs::OpenOptions;
33use tokio::io::BufReader;
34use tokio::io::BufWriter;
35use tokio::sync::broadcast;
36use tokio_retry2::RetryError;
37use tokio_util::io::ReaderStream;
38use tokio_util::io::StreamReader;
39use tokio_util::sync::CancellationToken;
40use tracing::info;
41use tracing::warn;
42use url::Url;
43
44use crate::backend::StorageBackend;
45use crate::backend::azure::AzureBlobStorageBackend;
46use crate::backend::generic::GenericStorageBackend;
47use crate::backend::google::GoogleStorageBackend;
48use crate::backend::s3::S3StorageBackend;
49use crate::streams::TransferStream;
50use crate::transfer::FileTransfer;
51
52mod backend;
53#[cfg(feature = "cli")]
54#[cfg_attr(docsrs, doc(cfg(feature = "cli")))]
55pub mod cli;
56mod config;
57mod generator;
58mod os;
59mod pool;
60mod streams;
61mod transfer;
62
63pub use backend::azure::AzureError;
64pub use backend::google::GoogleError;
65pub use backend::s3::S3Error;
66pub use config::*;
67pub use generator::*;
68
69const USER_AGENT: &str = concat!(
71 "cloud-copy/",
72 env!("CARGO_PKG_VERSION"),
73 " (https://github.com/stjude-rust-labs/cloud-copy)"
74);
75
76const ONE_MEBIBYTE: u64 = 1024 * 1024;
78
79const BLOCK_SIZE_THRESHOLD: u64 = 256 * ONE_MEBIBYTE;
82
83fn notify_retry(e: &Error, duration: Duration) {
85 if !duration.is_zero() {
87 let secs = duration.as_secs();
88 warn!(
89 "network operation failed (retried after waiting {secs} second{s}): {e}",
90 s = if secs == 1 { "" } else { "s" }
91 );
92 }
93}
94
95#[derive(Debug, Clone, PartialEq, Eq, Hash)]
97pub enum Location<'a> {
98 Path(&'a Path),
100 Url(Url),
102}
103
104impl<'a> Location<'a> {
105 pub fn new(s: &'a str) -> Self {
107 match s.parse::<Url>() {
108 Ok(url) => Self::Url(url),
109 Err(_) => Self::Path(Path::new(s)),
110 }
111 }
112}
113
114impl fmt::Display for Location<'_> {
115 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116 match self {
117 Self::Path(path) => write!(f, "{path}", path = path.display()),
118 Self::Url(url) => write!(f, "{url}", url = url.display()),
119 }
120 }
121}
122
123impl<'a> From<&'a str> for Location<'a> {
124 fn from(value: &'a str) -> Self {
125 Self::new(value)
126 }
127}
128
129impl<'a> From<&'a String> for Location<'a> {
130 fn from(value: &'a String) -> Self {
131 Self::new(value)
132 }
133}
134
135impl<'a> From<&'a Path> for Location<'a> {
136 fn from(value: &'a Path) -> Self {
137 Self::Path(value)
138 }
139}
140
141impl<'a> From<&'a PathBuf> for Location<'a> {
142 fn from(value: &'a PathBuf) -> Self {
143 Self::Path(value.as_path())
144 }
145}
146
147impl From<Url> for Location<'_> {
148 fn from(value: Url) -> Self {
149 Self::Url(value)
150 }
151}
152
153pub trait UrlExt {
155 fn to_local_path(&self) -> Result<Option<PathBuf>>;
162
163 fn display(&self) -> impl fmt::Display;
168}
169
170impl UrlExt for Url {
171 fn to_local_path(&self) -> Result<Option<PathBuf>> {
172 if self.scheme() != "file" {
173 return Ok(None);
174 }
175
176 self.to_file_path()
177 .map(Some)
178 .map_err(|_| Error::InvalidFileUrl(self.clone()))
179 }
180
181 fn display(&self) -> impl fmt::Display {
182 struct Display<'a>(&'a Url);
184
185 impl fmt::Display for Display<'_> {
186 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
187 write!(
188 f,
189 "{scheme}://{host}{path}",
190 scheme = self.0.scheme(),
191 host = self.0.host_str().unwrap_or_default(),
192 path = self.0.path()
193 )
194 }
195 }
196
197 Display(self)
198 }
199}
200
201#[derive(Clone)]
203pub struct HttpClient {
204 client: ClientWithMiddleware,
206 cache: Option<Arc<Cache<DefaultCacheStorage>>>,
210}
211
212impl HttpClient {
213 const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
214 const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(60);
215
216 pub fn new() -> Self {
218 let client = Client::builder()
219 .connect_timeout(Self::DEFAULT_CONNECT_TIMEOUT)
220 .read_timeout(Self::DEFAULT_READ_TIMEOUT)
221 .build()
222 .expect("failed to build HTTP client");
223
224 Self::from_existing(client)
225 }
226
227 pub fn new_with_cache(cache_dir: impl AsRef<Path>) -> Self {
229 let client = Client::builder()
230 .connect_timeout(Self::DEFAULT_CONNECT_TIMEOUT)
231 .read_timeout(Self::DEFAULT_READ_TIMEOUT)
232 .build()
233 .expect("failed to build HTTP client");
234
235 Self::from_existing_with_cache(client, cache_dir)
236 }
237
238 pub fn from_existing(client: reqwest::Client) -> Self {
240 Self {
241 client: ClientWithMiddleware::new(client, Vec::new()),
242 cache: None,
243 }
244 }
245
246 pub fn from_existing_with_cache(client: reqwest::Client, cache_dir: impl AsRef<Path>) -> Self {
249 let cache_dir = cache_dir.as_ref();
250 info!(
251 "using HTTP download cache directory `{dir}`",
252 dir = cache_dir.display()
253 );
254
255 let cache = Arc::new(Cache::new(DefaultCacheStorage::new(cache_dir)));
256
257 Self {
258 client: ClientBuilder::new(client).with_arc(cache.clone()).build(),
259 cache: Some(cache),
260 }
261 }
262
263 pub fn cache(&self) -> Option<&Cache<DefaultCacheStorage>> {
267 self.cache.as_deref()
268 }
269}
270
271impl Default for HttpClient {
272 fn default() -> Self {
273 Self::new()
274 }
275}
276
277impl Deref for HttpClient {
278 type Target = ClientWithMiddleware;
279
280 fn deref(&self) -> &Self::Target {
281 &self.client
282 }
283}
284
285struct DisplayMessage<'a> {
287 status: StatusCode,
289 message: &'a str,
293}
294
295impl fmt::Display for DisplayMessage<'_> {
296 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
297 if self.message.is_empty() {
298 write!(
299 f,
300 " ({reason})",
301 reason = self
302 .status
303 .canonical_reason()
304 .unwrap_or("<unknown status code>")
305 .to_lowercase()
306 )
307 } else {
308 write!(f, ": {message}", message = self.message)
309 }
310 }
311}
312
313#[derive(Debug, thiserror::Error)]
315pub enum Error {
316 #[error("the operation was canceled")]
318 Canceled,
319 #[error("copying between remote locations is not supported")]
321 RemoteCopyNotSupported,
322 #[error("remote URL has an unsupported URL scheme `{0}`")]
324 UnsupportedUrlScheme(String),
325 #[error("URL `{url}` is not for a supported cloud service", url = .0.display())]
327 UnsupportedUrl(Url),
328 #[error("file URL `{url}` cannot be represented as a local path", url = .0.display())]
330 InvalidFileUrl(Url),
331 #[error("the specified path cannot be a root directory or empty")]
333 InvalidPath,
334 #[error("the remote content was modified during the download")]
336 RemoteContentModified,
337 #[error("failed to create directory `{path}`: {error}", path = .path.display())]
339 DirectoryCreationFailed {
340 path: PathBuf,
342 error: std::io::Error,
344 },
345 #[error("failed to create temporary file: {error}")]
347 CreateTempFile {
348 error: std::io::Error,
350 },
351 #[error("failed to persist temporary file: {error}")]
353 PersistTempFile {
354 error: std::io::Error,
356 },
357 #[error("the destination path `{path}` already exists", path = .0.display())]
359 DestinationExists(PathBuf),
360 #[error(
362 "server returned status {status}{message}",
363 status = .status.as_u16(),
364 message = DisplayMessage { status: *.status, message }
365 )]
366 Server {
367 status: reqwest::StatusCode,
369 message: String,
373 },
374 #[error(
376 "server responded with a `content-range` header that does not start at the requested \
377 offset"
378 )]
379 UnexpectedContentRangeStart,
380 #[error(transparent)]
382 Azure(#[from] AzureError),
383 #[error(transparent)]
385 S3(#[from] S3Error),
386 #[error(transparent)]
388 Google(#[from] GoogleError),
389 #[error(transparent)]
391 Io(#[from] std::io::Error),
392 #[error(transparent)]
394 Walk(#[from] walkdir::Error),
395 #[error(transparent)]
397 Reqwest(#[from] reqwest::Error),
398 #[error(transparent)]
400 Middleware(#[from] reqwest_middleware::Error),
401 #[error(transparent)]
403 Temp(#[from] tempfile::PersistError),
404}
405
406impl Error {
407 fn into_retry_error(self) -> RetryError<Self> {
409 match &self {
410 Error::Server { status, .. }
411 | Error::Azure(AzureError::UnexpectedResponse { status, .. })
412 if status.is_server_error() =>
413 {
414 RetryError::transient(self)
415 }
416 Error::Io(_) | Error::Reqwest(_) | Error::Middleware(_) => RetryError::transient(self),
417 _ => RetryError::permanent(self),
418 }
419 }
420}
421
422pub type Result<T> = std::result::Result<T, Error>;
424
425#[derive(Debug, Clone)]
427pub enum TransferEvent {
428 TransferStarted {
430 id: u64,
434 path: PathBuf,
436 blocks: u64,
438 size: Option<u64>,
442 },
443 BlockStarted {
445 id: u64,
447 block: u64,
449 size: Option<u64>,
453 },
454 BlockProgress {
456 id: u64,
458 block: u64,
460 transferred: u64,
462 },
463 BlockCompleted {
465 id: u64,
467 block: u64,
469 failed: bool,
471 },
472 TransferCompleted {
474 id: u64,
476 failed: bool,
478 },
479}
480
481async fn copy_local(
485 source: &Path,
486 destination: &Path,
487 cancel: CancellationToken,
488 events: Option<broadcast::Sender<TransferEvent>>,
489) -> Result<()> {
490 const ID: u64 = 0;
492 const BLOCK: u64 = 0;
494
495 let mut reader = StreamReader::new(TransferStream::new(
497 ReaderStream::new(BufReader::new(
498 OpenOptions::new().read(true).open(source).await?,
499 )),
500 ID,
501 BLOCK,
502 0,
503 events.clone(),
504 ));
505
506 let mut writer = BufWriter::new(
507 OpenOptions::new()
508 .create_new(true)
509 .write(true)
510 .open(destination)
511 .await?,
512 );
513
514 if let Some(events) = &events {
516 let size = std::fs::metadata(source)?.len();
517
518 events
519 .send(TransferEvent::TransferStarted {
520 id: ID,
521 path: destination.to_path_buf(),
522 blocks: 1,
523 size: Some(size),
524 })
525 .ok();
526
527 events
528 .send(TransferEvent::BlockStarted {
529 id: ID,
530 block: BLOCK,
531 size: Some(size),
532 })
533 .ok();
534 }
535
536 let result = tokio::select! {
538 _ = cancel.cancelled() => {
539 drop(writer);
540 std::fs::remove_file(destination).ok();
541 Err(Error::Canceled)
542 },
543 r = tokio::io::copy(&mut reader, &mut writer) => {
544 r?;
545 Ok(())
546 }
547 };
548
549 if let Some(events) = &events {
551 events
552 .send(TransferEvent::BlockCompleted {
553 id: ID,
554 block: BLOCK,
555 failed: result.is_err(),
556 })
557 .ok();
558
559 events
560 .send(TransferEvent::TransferCompleted {
561 id: ID,
562 failed: result.is_err(),
563 })
564 .ok();
565 }
566
567 result
568}
569
570pub async fn copy(
614 config: Config,
615 client: HttpClient,
616 source: impl Into<Location<'_>>,
617 destination: impl Into<Location<'_>>,
618 cancel: CancellationToken,
619 events: Option<broadcast::Sender<TransferEvent>>,
620) -> Result<()> {
621 let source = source.into();
622 let destination = destination.into();
623
624 match (source, destination) {
625 (Location::Path(source), Location::Path(destination)) => {
626 if destination.exists() {
627 return Err(Error::DestinationExists(destination.to_path_buf()));
628 }
629
630 Ok(copy_local(source, destination, cancel, events).await?)
632 }
633 (Location::Path(source), Location::Url(destination)) => {
634 if let Some(destination) = destination.to_local_path()? {
636 return copy_local(source, &destination, cancel, events).await;
637 }
638
639 if AzureBlobStorageBackend::is_supported_url(&config, &destination) {
640 let transfer =
641 FileTransfer::new(AzureBlobStorageBackend::new(config, client, events), cancel);
642 transfer.upload(source, destination).await
643 } else if S3StorageBackend::is_supported_url(&config, &destination) {
644 let transfer =
645 FileTransfer::new(S3StorageBackend::new(config, client, events), cancel);
646 transfer.upload(source, destination).await
647 } else if GoogleStorageBackend::is_supported_url(&config, &destination) {
648 let transfer =
649 FileTransfer::new(GoogleStorageBackend::new(config, client, events), cancel);
650 transfer.upload(source, destination).await
651 } else {
652 Err(Error::UnsupportedUrl(destination))
653 }
654 }
655 (Location::Url(source), Location::Path(destination)) => {
656 if destination.exists() {
657 return Err(Error::DestinationExists(destination.to_path_buf()));
658 }
659
660 if let Some(source) = source.to_local_path()? {
662 return copy_local(&source, destination, cancel, events).await;
663 }
664
665 if AzureBlobStorageBackend::is_supported_url(&config, &source) {
666 let transfer =
667 FileTransfer::new(AzureBlobStorageBackend::new(config, client, events), cancel);
668 transfer.download(source, destination).await
669 } else if S3StorageBackend::is_supported_url(&config, &source) {
670 let transfer =
671 FileTransfer::new(S3StorageBackend::new(config, client, events), cancel);
672 transfer.download(source, destination).await
673 } else if GoogleStorageBackend::is_supported_url(&config, &source) {
674 let transfer =
675 FileTransfer::new(GoogleStorageBackend::new(config, client, events), cancel);
676 transfer.download(source, destination).await
677 } else {
678 let transfer =
679 FileTransfer::new(GenericStorageBackend::new(config, client, events), cancel);
680 transfer.download(source, destination).await
681 }
682 }
683 (Location::Url(source), Location::Url(destination)) => {
684 if let (Some(source), Some(destination)) =
685 (source.to_local_path()?, destination.to_local_path()?)
686 {
687 return copy_local(&source, &destination, cancel, events).await;
689 }
690
691 Err(Error::RemoteCopyNotSupported)
692 }
693 }
694}