agpm_cli/utils/fs/parallel.rs
1//! Parallel file operations for concurrent processing of multiple files and directories.
2//!
3//! This module provides async functions that perform file operations in parallel
4//! using Tokio's thread pool. These functions are optimized for:
5//! - Processing multiple files or directories simultaneously
6//! - Avoiding blocking the async runtime with I/O operations
7//! - Scaling with available CPU cores
8//!
9//! # Examples
10//!
11//! ```rust,no_run
12//! use agpm_cli::utils::fs::parallel::copy_files_parallel;
13//! use std::path::PathBuf;
14//!
15//! # async fn example() -> anyhow::Result<()> {
16//! let copy_operations = vec![
17//! (PathBuf::from("src/agent1.md"), PathBuf::from("output/agent1.md")),
18//! (PathBuf::from("src/agent2.md"), PathBuf::from("output/agent2.md")),
19//! ];
20//!
21//! copy_files_parallel(©_operations).await?;
22//! println!("All files copied successfully!");
23//! # Ok(())
24//! # }
25//! ```
26
27use anyhow::{Context, Result};
28use futures::future::try_join_all;
29use std::fs;
30use std::path::PathBuf;
31
32/// Copies multiple files concurrently using parallel processing.
33///
34/// This function performs multiple file copy operations in parallel, which can
35/// significantly improve performance when copying many files, especially on
36/// systems with fast storage and multiple CPU cores.
37///
38/// # Arguments
39///
40/// * `sources_and_destinations` - A slice of (source, destination) path pairs
41///
42/// # Returns
43///
44/// - `Ok(())` if all files were copied successfully
45/// - `Err` if any copy operation fails, with details about all failures
46///
47/// # Examples
48///
49/// ```rust,no_run
50/// use agpm_cli::utils::fs::parallel::copy_files_parallel;
51/// use std::path::PathBuf;
52///
53/// # async fn example() -> anyhow::Result<()> {
54/// let copy_operations = vec![
55/// (PathBuf::from("src/agent1.md"), PathBuf::from("output/agent1.md")),
56/// (PathBuf::from("src/agent2.md"), PathBuf::from("output/agent2.md")),
57/// (PathBuf::from("src/snippet.md"), PathBuf::from("output/snippet.md")),
58/// ];
59///
60/// copy_files_parallel(©_operations).await?;
61/// println!("All files copied successfully!");
62/// # Ok(())
63/// # }
64/// ```
65///
66/// # Features
67///
68/// - **Parallel execution**: Uses thread pool for concurrent operations
69/// - **Automatic directory creation**: Creates destination directories as needed
70/// - **Atomic behavior**: Either all files copy successfully or none do
71/// - **Progress tracking**: Can be combined with progress bars for user feedback
72///
73/// # Performance Characteristics
74///
75/// - Best for many small to medium files
76/// - Scales with available CPU cores and I/O bandwidth
77/// - May not improve performance for very large files (I/O bound)
78/// - Respects filesystem limits on concurrent operations
79///
80/// # Error Handling
81///
82/// All copy operations must succeed for the function to return `Ok(())`. If any
83/// operation fails, detailed error information is provided for troubleshooting.
84///
85/// # See Also
86///
87/// - [`copy_dirs_parallel`] for directory copying
88/// - [`super::atomic::atomic_write_multiple`] for writing multiple files
89pub async fn copy_files_parallel(sources_and_destinations: &[(PathBuf, PathBuf)]) -> Result<()> {
90 if sources_and_destinations.is_empty() {
91 return Ok(());
92 }
93
94 let mut tasks = Vec::new();
95
96 for (src, dst) in sources_and_destinations {
97 let src = src.clone();
98 let dst = dst.clone();
99 let task = tokio::task::spawn_blocking(move || {
100 // Ensure destination directory exists
101 if let Some(parent) = dst.parent() {
102 super::dirs::ensure_dir(parent)?;
103 }
104
105 // Copy file
106 fs::copy(&src, &dst).with_context(|| {
107 format!("Failed to copy file from {} to {}", src.display(), dst.display())
108 })?;
109
110 Ok::<_, anyhow::Error>((src, dst))
111 });
112 tasks.push(task);
113 }
114
115 let results = try_join_all(tasks).await.context("Failed to join file copy tasks")?;
116
117 let mut errors = Vec::new();
118
119 for result in results {
120 if let Err(e) = result {
121 errors.push(e);
122 }
123 }
124
125 if !errors.is_empty() {
126 let error_msgs: Vec<String> =
127 errors.into_iter().map(|error| format!(" {error}")).collect();
128 return Err(anyhow::anyhow!(
129 "Failed to copy {} files:\n{}",
130 error_msgs.len(),
131 error_msgs.join("\n")
132 ));
133 }
134
135 Ok(())
136}
137
138/// Copies multiple directories concurrently.
139///
140/// This function performs multiple directory copy operations in parallel,
141/// which can improve performance when copying several separate directory trees.
142/// Each directory is copied recursively using the same logic as [`super::dirs::copy_dir`].
143///
144/// # Arguments
145///
146/// * `sources_and_destinations` - A slice of (source, destination) directory pairs
147///
148/// # Returns
149///
150/// - `Ok(())` if all directories were copied successfully
151/// - `Err` if any copy operation fails, with details about all failures
152///
153/// # Examples
154///
155/// ```rust,no_run
156/// use agpm_cli::utils::fs::parallel::copy_dirs_parallel;
157/// use std::path::PathBuf;
158///
159/// # async fn example() -> anyhow::Result<()> {
160/// let copy_operations = vec![
161/// (PathBuf::from("cache/agents"), PathBuf::from("output/agents")),
162/// (PathBuf::from("cache/snippets"), PathBuf::from("output/snippets")),
163/// (PathBuf::from("cache/templates"), PathBuf::from("output/templates")),
164/// ];
165///
166/// copy_dirs_parallel(©_operations).await?;
167/// println!("All directories copied successfully!");
168/// # Ok(())
169/// # }
170/// ```
171///
172/// # Features
173///
174/// - **Recursive copying**: Each directory is copied with all subdirectories
175/// - **Parallel execution**: Multiple directory trees copied concurrently
176/// - **Atomic behavior**: Either all directories copy successfully or operation fails
177/// - **Automatic creation**: Destination directories are created as needed
178///
179/// # Use Cases
180///
181/// - Copying multiple resource categories simultaneously
182/// - Batch operations on directory structures
183/// - Backup operations for multiple directories
184/// - Installation processes involving multiple components
185///
186/// # Performance Considerations
187///
188/// - Best performance with multiple separate directory trees
189/// - May not improve performance if directories share the same disk
190/// - Memory usage scales with number of directories and their sizes
191/// - Respects filesystem concurrent operation limits
192///
193/// # See Also
194///
195/// - [`super::dirs::copy_dir`] for single directory copying
196/// - [`copy_files_parallel`] for individual file copying
197pub async fn copy_dirs_parallel(sources_and_destinations: &[(PathBuf, PathBuf)]) -> Result<()> {
198 if sources_and_destinations.is_empty() {
199 return Ok(());
200 }
201
202 let mut tasks = Vec::new();
203
204 for (src, dst) in sources_and_destinations {
205 let src = src.clone();
206 let dst = dst.clone();
207 let task = tokio::task::spawn_blocking(move || {
208 super::dirs::copy_dir(&src, &dst).map(|()| (src, dst))
209 });
210 tasks.push(task);
211 }
212
213 let results = try_join_all(tasks).await.context("Failed to join directory copy tasks")?;
214
215 let mut errors = Vec::new();
216
217 for result in results {
218 if let Err(e) = result {
219 errors.push(e);
220 }
221 }
222
223 if !errors.is_empty() {
224 let error_msgs: Vec<String> =
225 errors.into_iter().map(|error| format!(" {error}")).collect();
226 return Err(anyhow::anyhow!(
227 "Failed to copy {} directories:\n{}",
228 error_msgs.len(),
229 error_msgs.join("\n")
230 ));
231 }
232
233 Ok(())
234}
235
236/// Reads multiple files concurrently and returns their contents.
237///
238/// This function reads multiple text files in parallel, which can improve
239/// performance when processing many files, especially on systems with
240/// fast storage and multiple CPU cores.
241///
242/// # Arguments
243///
244/// * `paths` - A slice of file paths to read
245///
246/// # Returns
247///
248/// A vector of tuples containing each file path and its content as a UTF-8 string,
249/// in the same order as the input paths. Returns an error if any file fails
250/// to be read or contains invalid UTF-8.
251///
252/// # Examples
253///
254/// ```rust,no_run
255/// use agpm_cli::utils::fs::parallel::read_files_parallel;
256/// use std::path::PathBuf;
257///
258/// # async fn example() -> anyhow::Result<()> {
259/// let config_files = vec![
260/// PathBuf::from("agpm.toml"),
261/// PathBuf::from("agents/agent1.md"),
262/// PathBuf::from("snippets/snippet1.md"),
263/// ];
264///
265/// let results = read_files_parallel(&config_files).await?;
266/// for (path, content) in results {
267/// println!("{}: {} characters", path.display(), content.len());
268/// }
269/// # Ok(())
270/// # }
271/// ```
272///
273/// # Performance
274///
275/// This function uses `tokio::task::spawn_blocking` to perform file I/O
276/// on separate threads:
277/// - Multiple files read simultaneously
278/// - Non-blocking for the async runtime
279/// - Scales with available CPU cores and I/O bandwidth
280/// - Maintains result ordering
281///
282/// # UTF-8 Handling
283///
284/// All files must contain valid UTF-8 text. If any file contains invalid
285/// UTF-8 bytes, the operation will fail with a descriptive error.
286///
287/// # Error Handling
288///
289/// If any file fails to be read (due to permissions, missing file, or
290/// invalid UTF-8), the entire operation fails. This ensures consistency
291/// when processing related files that should all be available.
292///
293/// # See Also
294///
295/// - [`super::metadata::calculate_checksums_parallel`] for file integrity verification
296/// - [`super::atomic::atomic_write_multiple`] for batch writing operations
297pub async fn read_files_parallel(paths: &[PathBuf]) -> Result<Vec<(PathBuf, String)>> {
298 if paths.is_empty() {
299 return Ok(Vec::new());
300 }
301
302 let mut tasks = Vec::new();
303
304 for (index, path) in paths.iter().enumerate() {
305 let path = path.clone();
306 let task = tokio::task::spawn_blocking(move || {
307 fs::read_to_string(&path).map(|content| (index, path, content))
308 });
309 tasks.push(task);
310 }
311
312 let results = try_join_all(tasks).await.context("Failed to join file read tasks")?;
313
314 let mut successes = Vec::new();
315 let mut errors = Vec::new();
316
317 for result in results {
318 match result {
319 Ok((index, path, content)) => successes.push((index, path, content)),
320 Err(e) => errors.push(e),
321 }
322 }
323
324 if !errors.is_empty() {
325 let error_msgs: Vec<String> =
326 errors.into_iter().map(|error| format!(" {error}")).collect();
327 return Err(anyhow::anyhow!(
328 "Failed to read {} files:\n{}",
329 error_msgs.len(),
330 error_msgs.join("\n")
331 ));
332 }
333
334 // Sort results by original index to maintain order
335 successes.sort_by_key(|(index, _, _)| *index);
336 let ordered_results: Vec<(PathBuf, String)> =
337 successes.into_iter().map(|(_, path, content)| (path, content)).collect();
338
339 Ok(ordered_results)
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345 use tempfile::tempdir;
346
347 #[tokio::test]
348 async fn test_copy_files_parallel() {
349 let temp = tempdir().unwrap();
350 let src1 = temp.path().join("src1.txt");
351 let src2 = temp.path().join("src2.txt");
352 let dst1 = temp.path().join("dst").join("dst1.txt");
353 let dst2 = temp.path().join("dst").join("dst2.txt");
354
355 std::fs::write(&src1, "content1").unwrap();
356 std::fs::write(&src2, "content2").unwrap();
357
358 let pairs = vec![(src1.clone(), dst1.clone()), (src2.clone(), dst2.clone())];
359 copy_files_parallel(&pairs).await.unwrap();
360
361 assert!(dst1.exists());
362 assert!(dst2.exists());
363 assert_eq!(std::fs::read_to_string(&dst1).unwrap(), "content1");
364 assert_eq!(std::fs::read_to_string(&dst2).unwrap(), "content2");
365 }
366
367 #[tokio::test]
368 async fn test_copy_dirs_parallel() {
369 let temp = tempdir().unwrap();
370 let src1 = temp.path().join("src1");
371 let src2 = temp.path().join("src2");
372 let dst1 = temp.path().join("dst1");
373 let dst2 = temp.path().join("dst2");
374
375 super::super::dirs::ensure_dir(&src1).unwrap();
376 super::super::dirs::ensure_dir(&src2).unwrap();
377 std::fs::write(src1.join("file1.txt"), "content1").unwrap();
378 std::fs::write(src2.join("file2.txt"), "content2").unwrap();
379
380 let pairs = vec![(src1.clone(), dst1.clone()), (src2.clone(), dst2.clone())];
381 copy_dirs_parallel(&pairs).await.unwrap();
382
383 assert!(dst1.join("file1.txt").exists());
384 assert!(dst2.join("file2.txt").exists());
385 }
386
387 #[tokio::test]
388 async fn test_read_files_parallel() {
389 let temp = tempdir().unwrap();
390 let file1 = temp.path().join("read1.txt");
391 let file2 = temp.path().join("read2.txt");
392
393 std::fs::write(&file1, "content1").unwrap();
394 std::fs::write(&file2, "content2").unwrap();
395
396 let paths = vec![file1.clone(), file2.clone()];
397 let results = read_files_parallel(&paths).await.unwrap();
398
399 assert_eq!(results.len(), 2);
400 assert_eq!(results[0].0, file1);
401 assert_eq!(results[0].1, "content1");
402 assert_eq!(results[1].0, file2);
403 assert_eq!(results[1].1, "content2");
404 }
405
406 #[tokio::test]
407 async fn test_parallel_operations_empty() -> anyhow::Result<()> {
408 // Test parallel operations with empty inputs
409 copy_files_parallel(&[]).await?;
410
411 copy_dirs_parallel(&[]).await?;
412
413 let result = read_files_parallel(&[]).await?;
414 assert!(result.is_empty());
415 Ok(())
416 }
417
418 #[tokio::test]
419 async fn test_copy_files_parallel_errors() {
420 let temp = tempdir().unwrap();
421 let src = temp.path().join("nonexistent.txt");
422 let dst = temp.path().join("dest.txt");
423
424 let pairs = vec![(src, dst)];
425 let result = copy_files_parallel(&pairs).await;
426
427 assert!(result.is_err());
428 }
429
430 #[tokio::test]
431 async fn test_read_files_parallel_mixed() {
432 let temp = tempdir().unwrap();
433 let valid = temp.path().join("valid.txt");
434 let invalid = temp.path().join("invalid.txt");
435
436 std::fs::write(&valid, "content").unwrap();
437
438 let paths = vec![valid, invalid];
439 let result = read_files_parallel(&paths).await;
440
441 // Should fail if any file cannot be read
442 assert!(result.is_err());
443 }
444
445 #[tokio::test]
446 async fn test_copy_dirs_parallel_errors() {
447 let temp = tempdir().unwrap();
448 let src = temp.path().join("nonexistent");
449 let dst = temp.path().join("dest");
450
451 let pairs = vec![(src, dst)];
452 let result = copy_dirs_parallel(&pairs).await;
453
454 assert!(result.is_err());
455 }
456}