pub async fn install_resources_parallel_with_progress(
lockfile: &Arc<LockFile>,
manifest: &Manifest,
install_ctx: &InstallContext<'_>,
max_concurrency: Option<usize>,
progress_sender: Option<UnboundedSender<InstallProgress>>,
) -> Result<usize>Expand description
Install resources in parallel with detailed progress updates via async channels.
This function performs parallel resource installation while providing real-time progress updates through an async channel. It’s designed for UI implementations that need detailed visibility into parallel installation operations, showing which specific dependencies are being processed at any given time.
§Arguments
lockfile- Lockfile containing all resources to installmanifest- Project manifest providing configuration contextproject_dir- Root directory for resource installationcache- Cache instance for Git repository and worktree managementforce_refresh- Whether to force refresh of cached repositoriesmax_concurrency- Optional limit on concurrent operations (None= unlimited)progress_sender- Optional channel sender for progress updates
§Progress Updates
When progress_sender is provided, the function sends InstallProgress
updates that include:
- Active dependencies currently being processed
- Completed count (successful and failed installations)
- Total dependency count for completion calculation
Updates are sent at key points:
- When a dependency starts processing (added to
active_deps) - When a dependency completes (removed from
active_deps,completed_countincremented)
§Channel-Based Architecture
use agpm_cli::installer::{install_resources_parallel_with_progress, InstallProgress};
use agpm_cli::lockfile::LockFile;
use agpm_cli::manifest::Manifest;
use agpm_cli::cache::Cache;
use tokio::sync::mpsc;
use std::path::Path;
let (tx, mut rx) = mpsc::unbounded_channel::<InstallProgress>();
let lockfile = LockFile::load(Path::new("agpm.lock"))?;
let manifest = Manifest::load(Path::new("agpm.toml"))?;
let cache = Cache::new()?;
// Spawn installation task
let install_task = tokio::spawn(async move {
install_resources_parallel_with_progress(
&lockfile,
&manifest,
Path::new("."),
&cache,
false,
Some(8), // Max 8 concurrent operations
Some(tx) // Progress updates
).await
});
// Handle progress updates
tokio::spawn(async move {
while let Some(progress) = rx.recv().await {
println!("Progress: {}/{}, Active: {:?}",
progress.completed_count,
progress.total_count,
progress.active_deps
);
}
});
let count = install_task.await??;
println!("Installed {} resources", count);§Concurrency Control
The function implements the same parallel processing architecture as
install_resources_parallel but adds channel-based progress reporting:
- Pre-warming of Git worktrees for optimal parallelism
- Configurable concurrency limits via
max_concurrency - Thread-safe progress tracking with atomic updates
§Performance Characteristics
Progress updates are designed to have minimal performance impact:
- Updates are sent asynchronously without blocking installation
- Failed channel sends are silently ignored to prevent installation failures
- State updates are batched to reduce contention
§Returns
Returns the total number of resources that were successfully installed. This count only includes resources that were actually modified (new or updated content), not resources that already existed with identical content.
§Errors
Returns an error if any resource installation fails. The error includes details about all failed installations with specific error context. Progress updates continue until the error occurs.