use std::fs;
use std::path::Path;
use std::path::PathBuf;
use anyhow::Context;
use anyhow::Result;
use anyhow::bail;
use cloud_copy::Alphanumeric;
use cloud_copy::AzureConfig;
use cloud_copy::Config;
use cloud_copy::ContentDigest;
use cloud_copy::Error;
use cloud_copy::HashAlgorithm;
use cloud_copy::HttpClient;
use cloud_copy::Location;
use cloud_copy::S3Config;
use cloud_copy::TransferEvent;
use cloud_copy::rewrite_url;
use futures::FutureExt;
use futures::future::LocalBoxFuture;
use pretty_assertions::assert_eq;
use rand::Rng;
use tempfile::NamedTempFile;
use tempfile::tempdir;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::io::BufWriter;
use tokio::sync::broadcast;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
use url::Url;
use walkdir::WalkDir;
const ONE_MEBIBYTE: usize = 1024 * 1024;
const TEST_BUCKET_NAME: &str = "cloud-copy-test";
async fn write_random_bytes(file: File, mut size: usize) -> Result<()> {
let mut rng = rand::rng();
let mut buffer = [0; 4096];
let mut writer = BufWriter::new(file);
while size > 0 {
let to_write = size.min(buffer.len());
let buffer = &mut buffer[..to_write];
rng.fill(buffer);
size -= writer
.write(buffer)
.await
.context("failed to write random file")?;
}
writer.flush().await.expect("failed to flush file");
Ok(())
}
async fn same_file_content(first: &Path, second: &Path) -> Result<bool> {
if first
.metadata()
.context("failed to read metadata of first file")?
.len()
!= second
.metadata()
.context("failed to read metadata of second file")?
.len()
{
return Ok(false);
}
let mut first = BufReader::new(
File::open(first)
.await
.context("failed to open first file for comparison")?,
);
let mut second = BufReader::new(
File::open(second)
.await
.context("failed to open second file for comparison")?,
);
let mut first_buffer = [0; 4096];
let mut second_buffer = [0; 4096];
loop {
let read = first
.read(&mut first_buffer)
.await
.context("failed to read first file for comparison")?;
if read == 0 {
break;
}
second
.read_exact(&mut second_buffer[0..read])
.await
.context("failed to read first file for comparison")?;
if first_buffer != second_buffer {
return Ok(false);
}
}
Ok(true)
}
fn create_random_files<'a>(dir: &'a Path, depth: usize) -> LocalBoxFuture<'a, Result<()>> {
async move {
let mut rng = rand::rng();
for i in 0..5 {
let file = File::create(dir.join(format!("file-{i}")))
.await
.context("failed to create test file")?;
let size = { rng.random_range(0..ONE_MEBIBYTE) };
write_random_bytes(file, size).await?;
}
if depth > 0 {
let subdir = dir.join(format!("dir-{depth}"));
fs::create_dir(&subdir).context("failed to create test directory")?;
create_random_files(&subdir, depth - 1).await?;
}
Ok(())
}
.boxed_local()
}
fn walk_dir(dir: &Path) -> Result<Vec<PathBuf>> {
let mut entries = Vec::new();
for entry in WalkDir::new(dir) {
let entry = entry.context("failed to walk directory")?;
entries.push(
entry
.path()
.strip_prefix(dir)
.context("failed to strip directory prefix")?
.to_path_buf(),
);
}
entries.sort();
Ok(entries)
}
fn urls(test: &str) -> Vec<Url> {
vec![
format!("s3://{TEST_BUCKET_NAME}/1/{test}").parse().unwrap(),
format!("http://s3.us-east-1.localhost.localstack.cloud:4566/{TEST_BUCKET_NAME}/2/{test}")
.parse()
.unwrap(),
format!("http://{TEST_BUCKET_NAME}.s3.us-east-1.localhost.localstack.cloud:4566/3/{test}")
.parse()
.unwrap(),
format!("az://devstoreaccount1/{TEST_BUCKET_NAME}/1/{test}").parse().unwrap(),
format!("http://devstoreaccount1.blob.core.windows.net.localhost:10000/{TEST_BUCKET_NAME}/2/{test}").parse().unwrap(),
]
}
fn azure_config() -> AzureConfig {
AzureConfig::default().with_use_azurite(true).with_auth(
"devstoreaccount1",
"Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==",
)
}
fn s3_config() -> S3Config {
S3Config::default()
.with_use_localstack(true)
.with_auth("test", "test")
}
fn config(overwrite: bool) -> Config {
Config::builder()
.with_overwrite(overwrite)
.with_azure(azure_config())
.with_s3(s3_config())
.build()
}
async fn roundtrip_file(test: &str, size: usize) -> Result<()> {
let config = config(true);
let client = HttpClient::default();
let cancel = CancellationToken::new();
let (file, source) = NamedTempFile::new()
.context("failed to create temp file")?
.into_parts();
write_random_bytes(file.into(), size)
.await
.context("failed to write random bytes into file")?;
let destination = NamedTempFile::new()
.context("failed to create temp file")?
.into_temp_path();
for url in urls(test) {
cloud_copy::copy(
config.clone(),
client.clone(),
&*source,
&url,
cancel.clone(),
None,
)
.await
.context("failed to upload file")?;
cloud_copy::copy(
config.clone(),
client.clone(),
&url,
&*destination,
cancel.clone(),
None,
)
.await
.context("failed to download file")?;
if !same_file_content(&source, &destination)
.await
.context("failed to compare files")?
{
bail!("the downloaded file is not equal to the uploaded");
}
}
Ok(())
}
#[tokio::test]
async fn copy_generic_url() -> Result<()> {
let destination = NamedTempFile::new()
.context("failed to create temp file")?
.into_temp_path();
fs::remove_file(&destination).context("failed to delete destination file")?;
let cancel = CancellationToken::new();
cloud_copy::copy(
config(true),
Default::default(),
"https://example.com",
&*destination,
cancel.clone(),
None,
)
.await
.context("failed to download file")?;
Ok(())
}
#[tokio::test]
async fn no_overwrite() -> Result<()> {
let test = format!("{random}", random = Alphanumeric::new(10));
let config = config(true);
let client = HttpClient::default();
let cancel = CancellationToken::new();
let source = NamedTempFile::new()
.context("failed to create temp file")?
.into_temp_path();
for url in urls(&test) {
cloud_copy::copy(
config.clone(),
client.clone(),
&*source,
&url,
cancel.clone(),
None,
)
.await
.context("failed to upload file")?;
}
let config = crate::config(false);
for url in urls(&test) {
match cloud_copy::copy(
config.clone(),
client.clone(),
&*source,
&url,
cancel.clone(),
None,
)
.await
{
Ok(_) => panic!("copy operation should fail for `{url}`"),
Err(Error::RemoteDestinationExists(_)) => {}
Err(e) => panic!("unexpected error `{e}` for `{url}`"),
}
}
for url in urls(&test) {
match cloud_copy::copy(
config.clone(),
client.clone(),
&url,
&*source,
cancel.clone(),
None,
)
.await
{
Ok(_) => panic!("copy operation should fail for `{url}`"),
Err(Error::LocalDestinationExists(_)) => {}
Err(e) => panic!("unexpected error `{e}` for `{url}`"),
}
}
let config = crate::config(true);
for url in urls(&test) {
cloud_copy::copy(
config.clone(),
client.clone(),
&*source,
&url,
cancel.clone(),
None,
)
.await
.context("failed to upload file")?;
}
for url in urls(&test) {
cloud_copy::copy(
config.clone(),
client.clone(),
&url,
&*source,
cancel.clone(),
None,
)
.await
.context("failed to download file")?;
}
Ok(())
}
#[tokio::test]
async fn roundtrip_empty_file() -> Result<()> {
let test = format!("{random}", random = Alphanumeric::new(10));
roundtrip_file(&test, 0).await
}
#[tokio::test]
async fn roundtrip_small_file() -> Result<()> {
let test = format!("{random}", random = Alphanumeric::new(10));
roundtrip_file(&test, rand::rng().random_range(1..10 * ONE_MEBIBYTE)).await
}
#[tokio::test]
async fn roundtrip_medium_file() -> Result<()> {
let test = format!("{random}", random = Alphanumeric::new(10));
roundtrip_file(
&test,
rand::rng().random_range(10 * ONE_MEBIBYTE..50 * ONE_MEBIBYTE),
)
.await
}
#[tokio::test]
async fn roundtrip_large_file() -> Result<()> {
let test = format!("{random}", random = Alphanumeric::new(10));
roundtrip_file(
&test,
rand::rng().random_range(50 * ONE_MEBIBYTE..100 * ONE_MEBIBYTE),
)
.await
}
#[tokio::test]
async fn roundtrip_directory() -> Result<()> {
let test = format!("{random}", random = Alphanumeric::new(10));
let source = tempdir().context("failed to create temporary directory")?;
let destination = tempdir().context("failed to create temporary directory")?;
create_random_files(source.path(), 3)
.await
.context("failed to populate temporary directory")?;
let config = config(true);
let client = HttpClient::default();
let cancel = CancellationToken::new();
for url in urls(&test)
.into_iter()
.chain(urls(&test).into_iter().map(|mut u| {
{
let mut segments = u.path_segments_mut().unwrap();
segments.pop_if_empty();
segments.push("");
}
u
}))
{
cloud_copy::copy(
config.clone(),
client.clone(),
source.path(),
&url,
cancel.clone(),
None,
)
.await
.context("failed to upload directory")?;
fs::remove_dir_all(destination.path()).context("failed to delete destination directory")?;
cloud_copy::copy(
config.clone(),
client.clone(),
&url,
destination.path(),
cancel.clone(),
None,
)
.await
.context("failed to download directory")?;
let source_entries = walk_dir(source.path()).context("failed to walk source directory")?;
let destination_entries =
walk_dir(destination.path()).context("failed to walk destination directory")?;
if source_entries.len() != destination_entries.len() {
bail!("source directory and download directory have a different number of entries");
}
for (orig, new) in source_entries.into_iter().zip(destination_entries) {
if orig != new {
bail!(
"original entry `{orig}` does not match the new entry `{new}`",
orig = orig.display(),
new = new.display()
);
}
let source = source.path().join(orig);
let destination = destination.path().join(new);
match (source.is_dir(), destination.is_dir()) {
(true, true) => continue,
(false, false) => {
if !same_file_content(&source, &destination)
.await
.context("failed to compare files")?
{
bail!(
"contents of uploaded file `{source}` does not match the contents of \
downloaded file `{destination}`",
source = source.display(),
destination = destination.display()
);
}
}
_ => bail!(
"source entry `{source}` does not match the type of download entry \
`{destination}`",
source = source.display(),
destination = destination.display()
),
}
}
}
Ok(())
}
#[cfg(unix)]
#[tokio::test]
async fn link_to_cache() -> Result<()> {
use std::os::unix::fs::MetadataExt;
let cache_dir = tempdir().context("failed to create temp directory")?;
let destination = NamedTempFile::new()
.context("failed to create destination file")?
.into_temp_path();
fs::remove_file(&destination).context("failed to delete destination file")?;
let cancel = CancellationToken::new();
let config = Config::default();
let client = HttpClient::new_with_cache(config.clone(), &cache_dir);
cloud_copy::copy(
config,
client.clone(),
"https://example.com",
&*destination,
cancel.clone(),
None,
)
.await
.context("failed to download file")?;
assert!(destination.is_file(), "destination is not a file");
fs::remove_file(&destination).context("failed to delete destination file")?;
cloud_copy::copy(
Default::default(),
client.clone(),
"https://example.com",
&*destination,
cancel.clone(),
None,
)
.await
.context("failed to download file")?;
assert_eq!(
destination
.metadata()
.context("failed to read metadata of destination")?
.nlink(),
1,
"expected only a single link to the file"
);
cloud_copy::copy(
Config::builder()
.with_overwrite(true)
.with_link_to_cache(true)
.build(),
client,
"https://example.com",
&*destination,
cancel,
None,
)
.await
.context("failed to download file")?;
assert_eq!(
destination
.metadata()
.context("failed to read metadata of destination")?
.nlink(),
2,
"expected two links to the file"
);
Ok(())
}
#[tokio::test]
async fn events() -> Result<()> {
const FILE_SIZE: u64 = 1024;
let test = format!("{random}", random = Alphanumeric::new(10));
let config = config(true);
let client = HttpClient::default();
let cancel = CancellationToken::new();
let (file, source) = NamedTempFile::new()
.context("failed to create temp file")?
.into_parts();
write_random_bytes(file.into(), FILE_SIZE as usize)
.await
.context("failed to write random bytes into file")?;
let destination = NamedTempFile::new()
.context("failed to create temp file")?
.into_temp_path();
let (events_tx, mut events_rx) = broadcast::channel(1000);
let (urls_tx, urls_rx) = oneshot::channel();
tokio::spawn(async move {
let mut urls = Vec::new();
loop {
match events_rx.recv().await {
Ok(event) => match event {
TransferEvent::TransferStarted {
source,
destination,
blocks,
size,
..
} => {
assert_eq!(blocks, 1, "unexpected number of blocks");
assert_eq!(size, Some(FILE_SIZE), "unexpected file size");
if let Location::Url(url) = source {
urls.push(url);
}
if let Location::Url(url) = destination {
urls.push(url);
}
}
TransferEvent::BlockStarted { block, size, .. } => {
assert_eq!(block, 0, "unexpected block id");
assert_eq!(size, Some(FILE_SIZE), "unexpected file size");
}
TransferEvent::BlockProgress { .. } => continue,
TransferEvent::BlockCompleted { block, failed, .. } => {
assert_eq!(block, 0, "unexpected block id");
assert!(!failed, "block failed to transfer");
}
TransferEvent::TransferCompleted { failed, .. } => {
assert!(!failed, "file failed to transfer");
}
},
Err(broadcast::error::RecvError::Closed) => break,
Err(broadcast::error::RecvError::Lagged(_)) => panic!("event channel lagged"),
}
}
urls_tx.send(urls).expect("failed to send urls");
});
let expected = urls(&test);
for url in &expected {
cloud_copy::copy(
config.clone(),
client.clone(),
&*source,
url,
cancel.clone(),
Some(events_tx.clone()),
)
.await
.context("failed to upload file")?;
cloud_copy::copy(
config.clone(),
client.clone(),
url,
&*destination,
cancel.clone(),
Some(events_tx.clone()),
)
.await
.context("failed to download file")?;
if !same_file_content(&source, &destination)
.await
.context("failed to compare files")?
{
bail!("the downloaded file is not equal to the uploaded");
}
}
drop(events_tx);
let urls = urls_rx.await.expect("failed to receive events");
assert!(!urls.is_empty());
for (i, expected) in expected.iter().enumerate() {
let expected = rewrite_url(&config, expected).expect("URL should rewrite");
assert_eq!(&urls[i * 2], expected.as_ref(), "unexpected upload URL");
assert_eq!(
&urls[(i * 2) + 1],
expected.as_ref(),
"unexpected download URL"
);
}
Ok(())
}
#[tokio::test]
async fn walk() -> Result<()> {
const FILE_SIZE: u64 = 1024;
let test = format!("{random}", random = Alphanumeric::new(10));
let config = config(true);
let client = HttpClient::default();
let cancel = CancellationToken::new();
let (file, source) = NamedTempFile::new()
.context("failed to create temp file")?
.into_parts();
write_random_bytes(file.into(), FILE_SIZE as usize)
.await
.context("failed to write random bytes into file")?;
for url in urls(&test) {
for i in 0..10 {
let mut url = url.clone();
url.path_segments_mut().unwrap().push(&i.to_string());
cloud_copy::copy(
config.clone(),
client.clone(),
&*source,
url,
cancel.clone(),
None,
)
.await
.context("failed to upload file")?;
}
let files = cloud_copy::walk(config.clone(), client.clone(), url.clone())
.await
.expect("should walk");
assert_eq!(
files,
&["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"],
"unexpected walk output for URL `{url}`"
);
}
Ok(())
}
#[tokio::test]
async fn digests() -> Result<()> {
const FILE_SIZE: u64 = 1024;
let test = format!("{random}", random = Alphanumeric::new(10));
let client = HttpClient::default();
let cancel = CancellationToken::new();
let (file, source) = NamedTempFile::new()
.context("failed to create temp file")?
.into_parts();
write_random_bytes(file.into(), FILE_SIZE as usize)
.await
.context("failed to write random bytes into file")?;
for algorithm in [
HashAlgorithm::None,
HashAlgorithm::Sha256,
HashAlgorithm::Blake3,
] {
let config = Config::builder()
.with_overwrite(true)
.with_hash_algorithm(algorithm)
.with_azure(azure_config())
.with_s3(s3_config())
.build();
let expected_digest = algorithm
.calculate_content_digest(&source, &cancel)
.await
.unwrap()
.map(|v| ContentDigest::parse_header(&v).unwrap());
for url in urls(&test) {
cloud_copy::copy(
config.clone(),
client.clone(),
&*source,
url.clone(),
cancel.clone(),
None,
)
.await
.context("failed to upload file")?;
let digest =
cloud_copy::get_content_digest(config.clone(), client.clone(), url.clone()).await?;
match (&expected_digest, &digest) {
(None, Some(ContentDigest::ETag(v))) => {
assert!(!v.starts_with("W/"));
}
(Some(expected), Some(actual)) => {
assert_eq!(expected, actual, "digest mismatch");
}
_ => panic!("unexpected content digest `{digest:?}` for `{url}`"),
}
}
}
Ok(())
}
#[tokio::test]
async fn exists() -> Result<()> {
let test = format!("{random}", random = Alphanumeric::new(10));
let source = tempdir().context("failed to create temporary directory")?;
create_random_files(source.path(), 2)
.await
.context("failed to populate temporary directory")?;
let config = config(true);
let client = HttpClient::default();
let cancel = CancellationToken::new();
for url in urls(&test) {
cloud_copy::copy(
config.clone(),
client.clone(),
source.path(),
&url,
cancel.clone(),
None,
)
.await
.context("failed to upload directory")?;
}
for entry in WalkDir::new(source.path()) {
let entry = entry.context("walk should succeed")?;
let path = entry
.path()
.strip_prefix(source.path())
.context("should be relative")?;
for mut url in urls(&test) {
url.path_segments_mut()
.expect("missing path segments")
.pop_if_empty()
.push("");
let url = url
.join(path.as_os_str().to_str().context("should be UTF-8")?)
.context("url should join")?;
let exists = cloud_copy::exists(config.clone(), client.clone(), url.clone())
.await
.context("checking for existence")?;
assert!(exists, "URL `{url}` should exist");
let url = url.join("does-not-exist").context("url should join")?;
let exists = cloud_copy::exists(config.clone(), client.clone(), url.clone())
.await
.context("checking for existence")?;
assert!(!exists, "URL `{url}` should not exist");
}
}
assert!(
cloud_copy::exists(
config.clone(),
client.clone(),
"https://google.com".parse().context("should parse")?
)
.await
.context("checking for existence")?
);
assert!(
!cloud_copy::exists(
config.clone(),
client.clone(),
"https://google.com/404".parse().context("should parse")?
)
.await
.context("checking for existence")?
);
Ok(())
}
#[tokio::test]
async fn size() -> Result<()> {
const FILE_SIZE: u64 = 1024;
let test = format!("{random}", random = Alphanumeric::new(10));
let (file, source) = NamedTempFile::new()
.context("failed to create temp file")?
.into_parts();
write_random_bytes(file.into(), FILE_SIZE as usize)
.await
.context("failed to write random bytes into file")?;
let config = config(true);
let client = HttpClient::default();
let cancel = CancellationToken::new();
for url in urls(&test) {
cloud_copy::copy(
config.clone(),
client.clone(),
source.as_ref() as &Path,
&url,
cancel.clone(),
None,
)
.await
.context("failed to upload directory")?;
let size = cloud_copy::size(config.clone(), client.clone(), url).await?;
assert_eq!(size, Some(FILE_SIZE));
}
Ok(())
}