use anyhow::{Context, Result};
use futures::future::try_join_all;
use std::fs;
use std::path::PathBuf;
pub async fn copy_files_parallel(sources_and_destinations: &[(PathBuf, PathBuf)]) -> Result<()> {
if sources_and_destinations.is_empty() {
return Ok(());
}
let mut tasks = Vec::new();
for (src, dst) in sources_and_destinations {
let src = src.clone();
let dst = dst.clone();
let task = tokio::task::spawn_blocking(move || {
if let Some(parent) = dst.parent() {
super::dirs::ensure_dir(parent)?;
}
fs::copy(&src, &dst).with_context(|| {
format!("Failed to copy file from {} to {}", src.display(), dst.display())
})?;
Ok::<_, anyhow::Error>((src, dst))
});
tasks.push(task);
}
let results = try_join_all(tasks).await.context("Failed to join file copy tasks")?;
let mut errors = Vec::new();
for result in results {
if let Err(e) = result {
errors.push(e);
}
}
if !errors.is_empty() {
let error_msgs: Vec<String> =
errors.into_iter().map(|error| format!(" {error}")).collect();
return Err(anyhow::anyhow!(
"Failed to copy {} files:\n{}",
error_msgs.len(),
error_msgs.join("\n")
));
}
Ok(())
}
pub async fn copy_dirs_parallel(sources_and_destinations: &[(PathBuf, PathBuf)]) -> Result<()> {
if sources_and_destinations.is_empty() {
return Ok(());
}
let mut tasks = Vec::new();
for (src, dst) in sources_and_destinations {
let src = src.clone();
let dst = dst.clone();
let task = tokio::task::spawn_blocking(move || {
super::dirs::copy_dir(&src, &dst).map(|()| (src, dst))
});
tasks.push(task);
}
let results = try_join_all(tasks).await.context("Failed to join directory copy tasks")?;
let mut errors = Vec::new();
for result in results {
if let Err(e) = result {
errors.push(e);
}
}
if !errors.is_empty() {
let error_msgs: Vec<String> =
errors.into_iter().map(|error| format!(" {error}")).collect();
return Err(anyhow::anyhow!(
"Failed to copy {} directories:\n{}",
error_msgs.len(),
error_msgs.join("\n")
));
}
Ok(())
}
pub async fn read_files_parallel(paths: &[PathBuf]) -> Result<Vec<(PathBuf, String)>> {
if paths.is_empty() {
return Ok(Vec::new());
}
let mut tasks = Vec::new();
for (index, path) in paths.iter().enumerate() {
let path = path.clone();
let task = tokio::task::spawn_blocking(move || {
fs::read_to_string(&path).map(|content| (index, path, content))
});
tasks.push(task);
}
let results = try_join_all(tasks).await.context("Failed to join file read tasks")?;
let mut successes = Vec::new();
let mut errors = Vec::new();
for result in results {
match result {
Ok((index, path, content)) => successes.push((index, path, content)),
Err(e) => errors.push(e),
}
}
if !errors.is_empty() {
let error_msgs: Vec<String> =
errors.into_iter().map(|error| format!(" {error}")).collect();
return Err(anyhow::anyhow!(
"Failed to read {} files:\n{}",
error_msgs.len(),
error_msgs.join("\n")
));
}
successes.sort_by_key(|(index, _, _)| *index);
let ordered_results: Vec<(PathBuf, String)> =
successes.into_iter().map(|(_, path, content)| (path, content)).collect();
Ok(ordered_results)
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[tokio::test]
async fn test_copy_files_parallel() {
let temp = tempdir().unwrap();
let src1 = temp.path().join("src1.txt");
let src2 = temp.path().join("src2.txt");
let dst1 = temp.path().join("dst").join("dst1.txt");
let dst2 = temp.path().join("dst").join("dst2.txt");
std::fs::write(&src1, "content1").unwrap();
std::fs::write(&src2, "content2").unwrap();
let pairs = vec![(src1.clone(), dst1.clone()), (src2.clone(), dst2.clone())];
copy_files_parallel(&pairs).await.unwrap();
assert!(dst1.exists());
assert!(dst2.exists());
assert_eq!(std::fs::read_to_string(&dst1).unwrap(), "content1");
assert_eq!(std::fs::read_to_string(&dst2).unwrap(), "content2");
}
#[tokio::test]
async fn test_copy_dirs_parallel() {
let temp = tempdir().unwrap();
let src1 = temp.path().join("src1");
let src2 = temp.path().join("src2");
let dst1 = temp.path().join("dst1");
let dst2 = temp.path().join("dst2");
super::super::dirs::ensure_dir(&src1).unwrap();
super::super::dirs::ensure_dir(&src2).unwrap();
std::fs::write(src1.join("file1.txt"), "content1").unwrap();
std::fs::write(src2.join("file2.txt"), "content2").unwrap();
let pairs = vec![(src1.clone(), dst1.clone()), (src2.clone(), dst2.clone())];
copy_dirs_parallel(&pairs).await.unwrap();
assert!(dst1.join("file1.txt").exists());
assert!(dst2.join("file2.txt").exists());
}
#[tokio::test]
async fn test_read_files_parallel() {
let temp = tempdir().unwrap();
let file1 = temp.path().join("read1.txt");
let file2 = temp.path().join("read2.txt");
std::fs::write(&file1, "content1").unwrap();
std::fs::write(&file2, "content2").unwrap();
let paths = vec![file1.clone(), file2.clone()];
let results = read_files_parallel(&paths).await.unwrap();
assert_eq!(results.len(), 2);
assert_eq!(results[0].0, file1);
assert_eq!(results[0].1, "content1");
assert_eq!(results[1].0, file2);
assert_eq!(results[1].1, "content2");
}
#[tokio::test]
async fn test_parallel_operations_empty() -> anyhow::Result<()> {
copy_files_parallel(&[]).await?;
copy_dirs_parallel(&[]).await?;
let result = read_files_parallel(&[]).await?;
assert!(result.is_empty());
Ok(())
}
#[tokio::test]
async fn test_copy_files_parallel_errors() {
let temp = tempdir().unwrap();
let src = temp.path().join("nonexistent.txt");
let dst = temp.path().join("dest.txt");
let pairs = vec![(src, dst)];
let result = copy_files_parallel(&pairs).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_read_files_parallel_mixed() {
let temp = tempdir().unwrap();
let valid = temp.path().join("valid.txt");
let invalid = temp.path().join("invalid.txt");
std::fs::write(&valid, "content").unwrap();
let paths = vec![valid, invalid];
let result = read_files_parallel(&paths).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_copy_dirs_parallel_errors() {
let temp = tempdir().unwrap();
let src = temp.path().join("nonexistent");
let dst = temp.path().join("dest");
let pairs = vec![(src, dst)];
let result = copy_dirs_parallel(&pairs).await;
assert!(result.is_err());
}
}