agpm_cli/installer/
selective.rs

1//! Selective installation support for updated resources.
2//!
3//! This module provides targeted installation of only resources that have been
4//! updated, rather than reinstalling all resources. It's designed for efficient
5//! update operations where only a subset of dependencies have changed.
6
7use anyhow::Result;
8use futures::stream::{self, StreamExt};
9use std::collections::HashSet;
10use std::sync::Arc;
11use tokio::sync::Mutex;
12
13use crate::constants::default_lock_timeout;
14use crate::core::ResourceIterator;
15use crate::lockfile::LockFile;
16use crate::manifest::Manifest;
17use indicatif::ProgressBar;
18
19use super::{InstallContext, install_resource_for_parallel};
20
21/// Install only specific updated resources in parallel (selective installation).
22///
23/// This function provides targeted installation of only the resources that have
24/// been updated, rather than reinstalling all resources. It's designed for
25/// efficient update operations where only a subset of dependencies have changed.
26/// The function uses the same parallel processing architecture as full installations
27/// but operates on a filtered set of resources.
28///
29/// # Arguments
30///
31/// * `updates` - Vector of tuples containing (name, `old_version`, `new_version`) for each updated resource
32/// * `lockfile` - Lockfile containing all available resources (updated resources must exist here)
33/// * `manifest` - Project manifest providing configuration and target directories
34/// * `project_dir` - Root directory where resources will be installed
35/// * `cache` - Cache instance for Git repository and worktree management
36/// * `pb` - Optional progress bar for user feedback during installation
37/// * `_quiet` - Quiet mode flag (currently unused, maintained for API compatibility)
38///
39/// # Update Tuple Format
40///
41/// Each update tuple contains:
42/// - `name`: Resource name as defined in the lockfile
43/// - `old_version`: Previous version (used for logging and user feedback)
44/// - `new_version`: New version that will be installed
45///
46/// # Selective Processing
47///
48/// The function implements selective resource processing:
49/// 1. **Filtering**: Only processes resources listed in the `updates` vector
50/// 2. **Lookup**: Finds corresponding entries in the lockfile for each update
51/// 3. **Validation**: Ensures all specified resources exist before processing
52/// 4. **Installation**: Uses the same parallel architecture as full installations
53///
54/// # Examples
55///
56/// ```rust,no_run
57/// use agpm_cli::installer::{install_updated_resources, InstallContext};
58/// use agpm_cli::lockfile::LockFile;
59/// use agpm_cli::manifest::Manifest;
60/// use agpm_cli::cache::Cache;
61/// use indicatif::ProgressBar;
62/// use std::path::Path;
63/// use std::sync::Arc;
64///
65/// # async fn example() -> anyhow::Result<()> {
66/// let lockfile = Arc::new(LockFile::load(Path::new("agpm.lock"))?);
67/// let manifest = Manifest::load(Path::new("agpm.toml"))?;
68/// let cache = Cache::new()?;
69/// let pb = ProgressBar::new(3);
70///
71/// // Create installation context
72/// let project_dir = Path::new(".");
73/// let context = InstallContext::builder(project_dir, &cache).build();
74///
75/// // Define which resources to update
76/// let updates = vec![
77///     ("ai-agent".to_string(), None, "v1.0.0".to_string(), "v1.1.0".to_string()),
78///     ("helper-tool".to_string(), Some("community".to_string()), "v2.0.0".to_string(), "v2.1.0".to_string()),
79///     ("data-processor".to_string(), None, "v1.5.0".to_string(), "v1.6.0".to_string()),
80/// ];
81/// let count = install_updated_resources(
82///     &updates,
83///     &lockfile,
84///     &manifest,
85///     &context,
86///     Some(&pb),
87///     false
88/// ).await?;
89///
90/// println!("Updated {} resources", count);
91/// # Ok(())
92/// # }
93/// ```
94///
95/// # Performance Benefits
96///
97/// Selective installation provides significant performance benefits:
98/// - **Reduced processing**: Only installs resources that have actually changed
99/// - **Faster execution**: Avoids redundant operations on unchanged resources
100/// - **Network efficiency**: Only fetches Git data for repositories with updates
101/// - **Disk efficiency**: Minimizes file system operations and cache usage
102///
103/// # Integration with Update Command
104///
105/// This function is typically used by the `agpm update` command after dependency
106/// resolution determines which resources have new versions available:
107///
108/// ```text
109/// Update Flow:
110/// 1. Resolve dependencies → identify version changes
111/// 2. Update lockfile → record new versions and checksums
112/// 3. Selective installation → install only changed resources
113/// ```
114///
115/// # Returns
116///
117/// Returns the total number of resources that were successfully installed.
118/// This represents the actual number of files that were updated on disk.
119///
120/// # Errors
121///
122/// Returns an error if:
123/// - Any specified resource name is not found in the lockfile
124/// - Git repository access fails for resources being updated
125/// - File system operations fail during installation
126/// - Any individual resource installation encounters an error
127///
128/// The function uses atomic error handling - if any resource fails, the entire
129/// operation fails and detailed error information is provided.
130pub async fn install_updated_resources(
131    updates: &[(String, Option<String>, String, String)], // (name, source, old_version, new_version)
132    lockfile: &Arc<LockFile>,
133    manifest: &Manifest,
134    install_ctx: &InstallContext<'_>,
135    pb: Option<&ProgressBar>,
136    _quiet: bool,
137) -> Result<usize> {
138    let project_dir = install_ctx.project_dir;
139    let cache = install_ctx.cache;
140    if updates.is_empty() {
141        return Ok(0);
142    }
143
144    let total = updates.len();
145
146    // Collect all entries to install
147    let mut entries_to_install = Vec::new();
148    for (name, source, _, _) in updates {
149        if let Some((resource_type, entry)) =
150            ResourceIterator::find_resource_by_name_and_source(lockfile, name, source.as_deref())
151        {
152            // Get artifact configuration path
153            let tool = entry.tool.as_deref().unwrap_or("claude-code");
154            let artifact_path = manifest
155                .get_artifact_resource_path(tool, resource_type)
156                .ok_or_else(|| {
157                    anyhow::anyhow!(
158                        "Resource type '{}' is not supported by tool '{}' - check tool configuration",
159                        resource_type,
160                        tool
161                    )
162                })?;
163            let target_dir = artifact_path.display().to_string();
164            entries_to_install.push((entry.clone(), target_dir));
165        }
166    }
167
168    if entries_to_install.is_empty() {
169        return Ok(0);
170    }
171
172    // Pre-warm the cache by creating all needed worktrees upfront
173    if let Some(pb) = pb {
174        pb.set_message("Preparing resources...");
175    }
176
177    // Collect unique (source, url, sha) triples to pre-create worktrees
178    let mut unique_worktrees = HashSet::new();
179    for (entry, _) in &entries_to_install {
180        if let Some(source_name) = &entry.source
181            && let Some(url) = &entry.url
182        {
183            // Only pre-warm if we have a valid SHA
184            if let Some(sha) = entry.resolved_commit.as_ref().filter(|commit| {
185                commit.len() == 40 && commit.chars().all(|c| c.is_ascii_hexdigit())
186            }) {
187                unique_worktrees.insert((source_name.clone(), url.clone(), sha.clone()));
188            }
189        }
190    }
191
192    // Pre-create all worktrees in parallel
193    if !unique_worktrees.is_empty() {
194        use futures::future;
195        let worktree_futures: Vec<_> = unique_worktrees
196            .into_iter()
197            .map(|(source, url, sha)| {
198                async move {
199                    cache
200                        .get_or_create_worktree_for_sha(
201                            &source,
202                            &url,
203                            &sha,
204                            Some("update-pre-warm"),
205                        )
206                        .await
207                        .ok(); // Ignore errors during pre-warming
208                }
209            })
210            .collect();
211
212        // Execute all worktree creations in parallel
213        future::join_all(worktree_futures).await;
214    }
215
216    // Create thread-safe progress tracking
217    let installed_count = Arc::new(Mutex::new(0));
218    let pb = pb.map(Arc::new);
219    let cache = Arc::new(cache);
220
221    // Set initial progress
222    if let Some(ref pb) = pb {
223        pb.set_message(format!("Installing 0/{total} resources"));
224    }
225
226    // Use concurrent stream processing for parallel installation
227    let results: Vec<Result<(), anyhow::Error>> = stream::iter(entries_to_install)
228        .map(|(entry, resource_dir)| {
229            let project_dir = project_dir.to_path_buf();
230            let installed_count = Arc::clone(&installed_count);
231            let pb = pb.clone();
232            let cache = Arc::clone(&cache);
233            let lockfile = Arc::clone(lockfile);
234
235            async move {
236                // Install the resource
237                let mut builder = InstallContext::builder(&project_dir, cache.as_ref())
238                    .manifest(manifest)
239                    .lockfile(&lockfile);
240
241                // Add optional fields from the passed install_ctx
242                if let Some(patches) = install_ctx.project_patches {
243                    builder = builder.project_patches(patches);
244                }
245                if let Some(patches) = install_ctx.private_patches {
246                    builder = builder.private_patches(patches);
247                }
248                if let Some(size) = install_ctx.max_content_file_size {
249                    builder = builder.max_content_file_size(size);
250                }
251
252                let context = builder.build();
253                install_resource_for_parallel(&entry, &resource_dir, &context).await?;
254
255                // Update progress
256                let timeout = default_lock_timeout();
257                let mut count = match tokio::time::timeout(timeout, installed_count.lock()).await {
258                    Ok(guard) => guard,
259                    Err(_) => {
260                        eprintln!("[DEADLOCK] Timeout waiting for installed_count lock after {:?}", timeout);
261                        anyhow::bail!("Timeout waiting for installed_count lock after {:?} - possible deadlock", timeout);
262                    }
263                };
264                *count += 1;
265
266                if let Some(pb) = pb {
267                    pb.set_message(format!("Installing {}/{} resources", *count, total));
268                    pb.inc(1);
269                }
270
271                Ok::<(), anyhow::Error>(())
272            }
273        })
274        .buffered(usize::MAX) // Allow unlimited task concurrency while preserving input order for deterministic checksums
275        .collect()
276        .await;
277
278    // Check all results for errors
279    for result in results {
280        result?;
281    }
282
283    let timeout = default_lock_timeout();
284    let final_count = match tokio::time::timeout(timeout, installed_count.lock()).await {
285        Ok(guard) => *guard,
286        Err(_) => {
287            eprintln!("[DEADLOCK] Timeout waiting for installed_count lock after {:?}", timeout);
288            anyhow::bail!(
289                "Timeout waiting for installed_count lock after {:?} - possible deadlock",
290                timeout
291            );
292        }
293    };
294    Ok(final_count)
295}