agpm_cli/installer/selective.rs
1//! Selective installation support for updated resources.
2//!
3//! This module provides targeted installation of only resources that have been
4//! updated, rather than reinstalling all resources. It's designed for efficient
5//! update operations where only a subset of dependencies have changed.
6
7use anyhow::Result;
8use futures::stream::{self, StreamExt};
9use std::collections::HashSet;
10use std::sync::Arc;
11use tokio::sync::Mutex;
12
13use crate::constants::default_lock_timeout;
14use crate::core::ResourceIterator;
15use crate::lockfile::LockFile;
16use crate::manifest::Manifest;
17use indicatif::ProgressBar;
18
19use super::{InstallContext, install_resource_for_parallel};
20
21/// Install only specific updated resources in parallel (selective installation).
22///
23/// This function provides targeted installation of only the resources that have
24/// been updated, rather than reinstalling all resources. It's designed for
25/// efficient update operations where only a subset of dependencies have changed.
26/// The function uses the same parallel processing architecture as full installations
27/// but operates on a filtered set of resources.
28///
29/// # Arguments
30///
31/// * `updates` - Vector of tuples containing (name, `old_version`, `new_version`) for each updated resource
32/// * `lockfile` - Lockfile containing all available resources (updated resources must exist here)
33/// * `manifest` - Project manifest providing configuration and target directories
34/// * `project_dir` - Root directory where resources will be installed
35/// * `cache` - Cache instance for Git repository and worktree management
36/// * `pb` - Optional progress bar for user feedback during installation
37/// * `_quiet` - Quiet mode flag (currently unused, maintained for API compatibility)
38///
39/// # Update Tuple Format
40///
41/// Each update tuple contains:
42/// - `name`: Resource name as defined in the lockfile
43/// - `old_version`: Previous version (used for logging and user feedback)
44/// - `new_version`: New version that will be installed
45///
46/// # Selective Processing
47///
48/// The function implements selective resource processing:
49/// 1. **Filtering**: Only processes resources listed in the `updates` vector
50/// 2. **Lookup**: Finds corresponding entries in the lockfile for each update
51/// 3. **Validation**: Ensures all specified resources exist before processing
52/// 4. **Installation**: Uses the same parallel architecture as full installations
53///
54/// # Examples
55///
56/// ```rust,no_run
57/// use agpm_cli::installer::{install_updated_resources, InstallContext};
58/// use agpm_cli::lockfile::LockFile;
59/// use agpm_cli::manifest::Manifest;
60/// use agpm_cli::cache::Cache;
61/// use indicatif::ProgressBar;
62/// use std::path::Path;
63/// use std::sync::Arc;
64///
65/// # async fn example() -> anyhow::Result<()> {
66/// let lockfile = Arc::new(LockFile::load(Path::new("agpm.lock"))?);
67/// let manifest = Manifest::load(Path::new("agpm.toml"))?;
68/// let cache = Cache::new()?;
69/// let pb = ProgressBar::new(3);
70///
71/// // Create installation context
72/// let project_dir = Path::new(".");
73/// let context = InstallContext::builder(project_dir, &cache).build();
74///
75/// // Define which resources to update
76/// let updates = vec![
77/// ("ai-agent".to_string(), None, "v1.0.0".to_string(), "v1.1.0".to_string()),
78/// ("helper-tool".to_string(), Some("community".to_string()), "v2.0.0".to_string(), "v2.1.0".to_string()),
79/// ("data-processor".to_string(), None, "v1.5.0".to_string(), "v1.6.0".to_string()),
80/// ];
81/// let count = install_updated_resources(
82/// &updates,
83/// &lockfile,
84/// &manifest,
85/// &context,
86/// Some(&pb),
87/// false
88/// ).await?;
89///
90/// println!("Updated {} resources", count);
91/// # Ok(())
92/// # }
93/// ```
94///
95/// # Performance Benefits
96///
97/// Selective installation provides significant performance benefits:
98/// - **Reduced processing**: Only installs resources that have actually changed
99/// - **Faster execution**: Avoids redundant operations on unchanged resources
100/// - **Network efficiency**: Only fetches Git data for repositories with updates
101/// - **Disk efficiency**: Minimizes file system operations and cache usage
102///
103/// # Integration with Update Command
104///
105/// This function is typically used by the `agpm update` command after dependency
106/// resolution determines which resources have new versions available:
107///
108/// ```text
109/// Update Flow:
110/// 1. Resolve dependencies → identify version changes
111/// 2. Update lockfile → record new versions and checksums
112/// 3. Selective installation → install only changed resources
113/// ```
114///
115/// # Returns
116///
117/// Returns the total number of resources that were successfully installed.
118/// This represents the actual number of files that were updated on disk.
119///
120/// # Errors
121///
122/// Returns an error if:
123/// - Any specified resource name is not found in the lockfile
124/// - Git repository access fails for resources being updated
125/// - File system operations fail during installation
126/// - Any individual resource installation encounters an error
127///
128/// The function uses atomic error handling - if any resource fails, the entire
129/// operation fails and detailed error information is provided.
130pub async fn install_updated_resources(
131 updates: &[(String, Option<String>, String, String)], // (name, source, old_version, new_version)
132 lockfile: &Arc<LockFile>,
133 manifest: &Manifest,
134 install_ctx: &InstallContext<'_>,
135 pb: Option<&ProgressBar>,
136 _quiet: bool,
137) -> Result<usize> {
138 let project_dir = install_ctx.project_dir;
139 let cache = install_ctx.cache;
140 if updates.is_empty() {
141 return Ok(0);
142 }
143
144 let total = updates.len();
145
146 // Collect all entries to install
147 let mut entries_to_install = Vec::new();
148 for (name, source, _, _) in updates {
149 if let Some((resource_type, entry)) =
150 ResourceIterator::find_resource_by_name_and_source(lockfile, name, source.as_deref())
151 {
152 // Get artifact configuration path
153 let tool = entry.tool.as_deref().unwrap_or("claude-code");
154 let artifact_path = manifest
155 .get_artifact_resource_path(tool, resource_type)
156 .ok_or_else(|| {
157 anyhow::anyhow!(
158 "Resource type '{}' is not supported by tool '{}' - check tool configuration",
159 resource_type,
160 tool
161 )
162 })?;
163 let target_dir = artifact_path.display().to_string();
164 entries_to_install.push((entry.clone(), target_dir));
165 }
166 }
167
168 if entries_to_install.is_empty() {
169 return Ok(0);
170 }
171
172 // Pre-warm the cache by creating all needed worktrees upfront
173 if let Some(pb) = pb {
174 pb.set_message("Preparing resources...");
175 }
176
177 // Collect unique (source, url, sha) triples to pre-create worktrees
178 let mut unique_worktrees = HashSet::new();
179 for (entry, _) in &entries_to_install {
180 if let Some(source_name) = &entry.source
181 && let Some(url) = &entry.url
182 {
183 // Only pre-warm if we have a valid SHA
184 if let Some(sha) = entry.resolved_commit.as_ref().filter(|commit| {
185 commit.len() == 40 && commit.chars().all(|c| c.is_ascii_hexdigit())
186 }) {
187 unique_worktrees.insert((source_name.clone(), url.clone(), sha.clone()));
188 }
189 }
190 }
191
192 // Pre-create all worktrees in parallel
193 if !unique_worktrees.is_empty() {
194 use futures::future;
195 let worktree_futures: Vec<_> = unique_worktrees
196 .into_iter()
197 .map(|(source, url, sha)| {
198 async move {
199 cache
200 .get_or_create_worktree_for_sha(
201 &source,
202 &url,
203 &sha,
204 Some("update-pre-warm"),
205 )
206 .await
207 .ok(); // Ignore errors during pre-warming
208 }
209 })
210 .collect();
211
212 // Execute all worktree creations in parallel
213 future::join_all(worktree_futures).await;
214 }
215
216 // Create thread-safe progress tracking
217 let installed_count = Arc::new(Mutex::new(0));
218 let pb = pb.map(Arc::new);
219 let cache = Arc::new(cache);
220
221 // Set initial progress
222 if let Some(ref pb) = pb {
223 pb.set_message(format!("Installing 0/{total} resources"));
224 }
225
226 // Use concurrent stream processing for parallel installation
227 let results: Vec<Result<(), anyhow::Error>> = stream::iter(entries_to_install)
228 .map(|(entry, resource_dir)| {
229 let project_dir = project_dir.to_path_buf();
230 let installed_count = Arc::clone(&installed_count);
231 let pb = pb.clone();
232 let cache = Arc::clone(&cache);
233 let lockfile = Arc::clone(lockfile);
234
235 async move {
236 // Install the resource
237 let mut builder = InstallContext::builder(&project_dir, cache.as_ref())
238 .manifest(manifest)
239 .lockfile(&lockfile);
240
241 // Add optional fields from the passed install_ctx
242 if let Some(patches) = install_ctx.project_patches {
243 builder = builder.project_patches(patches);
244 }
245 if let Some(patches) = install_ctx.private_patches {
246 builder = builder.private_patches(patches);
247 }
248 if let Some(size) = install_ctx.max_content_file_size {
249 builder = builder.max_content_file_size(size);
250 }
251
252 let context = builder.build();
253 install_resource_for_parallel(&entry, &resource_dir, &context).await?;
254
255 // Update progress
256 let timeout = default_lock_timeout();
257 let mut count = match tokio::time::timeout(timeout, installed_count.lock()).await {
258 Ok(guard) => guard,
259 Err(_) => {
260 eprintln!("[DEADLOCK] Timeout waiting for installed_count lock after {:?}", timeout);
261 anyhow::bail!("Timeout waiting for installed_count lock after {:?} - possible deadlock", timeout);
262 }
263 };
264 *count += 1;
265
266 if let Some(pb) = pb {
267 pb.set_message(format!("Installing {}/{} resources", *count, total));
268 pb.inc(1);
269 }
270
271 Ok::<(), anyhow::Error>(())
272 }
273 })
274 .buffered(usize::MAX) // Allow unlimited task concurrency while preserving input order for deterministic checksums
275 .collect()
276 .await;
277
278 // Check all results for errors
279 for result in results {
280 result?;
281 }
282
283 let timeout = default_lock_timeout();
284 let final_count = match tokio::time::timeout(timeout, installed_count.lock()).await {
285 Ok(guard) => *guard,
286 Err(_) => {
287 eprintln!("[DEADLOCK] Timeout waiting for installed_count lock after {:?}", timeout);
288 anyhow::bail!(
289 "Timeout waiting for installed_count lock after {:?} - possible deadlock",
290 timeout
291 );
292 }
293 };
294 Ok(final_count)
295}