use crate::constants::{
FALLBACK_CORE_COUNT, MIN_PARALLELISM, PARALLELISM_CORE_MULTIPLIER, default_lock_timeout,
};
use crate::lockfile::ResourceId;
use crate::utils::progress::{InstallationPhase, MultiPhaseProgress};
use anyhow::Result;
mod cleanup;
mod config_check;
mod context;
pub mod project_lock;
mod resource;
mod selective;
mod skills;
#[cfg(test)]
mod tests;
pub use cleanup::cleanup_removed_artifacts;
pub use config_check::{ConfigValidation, validate_config};
pub use context::InstallContext;
pub use project_lock::ProjectLock;
pub use selective::install_updated_resources;
use resource::{
apply_resource_patches, compute_file_checksum, read_source_content, render_resource_content,
should_skip_installation, should_skip_trusted, validate_markdown_content,
write_resource_to_disk,
};
use skills::{collect_skill_patches, compute_skill_directory_checksum, install_skill_directory};
type InstallResult = Result<
(
crate::lockfile::ResourceId,
bool,
String,
Option<String>,
crate::manifest::patches::AppliedPatches,
Option<u64>, // approximate token count
),
(crate::lockfile::ResourceId, anyhow::Error),
>;
#[derive(Debug, Clone)]
pub struct InstallationResults {
pub installed_count: usize,
pub checksums: Vec<(crate::lockfile::ResourceId, String)>,
pub context_checksums: Vec<(crate::lockfile::ResourceId, Option<String>)>,
pub applied_patches:
Vec<(crate::lockfile::ResourceId, crate::manifest::patches::AppliedPatches)>,
pub token_counts: Vec<(crate::lockfile::ResourceId, Option<u64>)>,
}
impl InstallationResults {
pub fn new(
installed_count: usize,
checksums: Vec<(crate::lockfile::ResourceId, String)>,
context_checksums: Vec<(crate::lockfile::ResourceId, Option<String>)>,
applied_patches: Vec<(
crate::lockfile::ResourceId,
crate::manifest::patches::AppliedPatches,
)>,
token_counts: Vec<(crate::lockfile::ResourceId, Option<u64>)>,
) -> Self {
Self {
installed_count,
checksums,
context_checksums,
applied_patches,
token_counts,
}
}
pub fn is_empty(&self) -> bool {
self.installed_count == 0
}
pub fn len(&self) -> usize {
self.installed_count
}
}
use futures::stream::{self, StreamExt};
use std::path::Path;
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::cache::Cache;
use crate::core::ResourceIterator;
use crate::lockfile::{LockFile, LockedResource};
use crate::manifest::Manifest;
use indicatif::ProgressBar;
use std::collections::HashSet;
pub async fn install_resource(
entry: &LockedResource,
resource_dir: &str,
context: &InstallContext<'_>,
) -> Result<(bool, String, Option<String>, crate::manifest::patches::AppliedPatches, Option<u64>)> {
let dest_path = if entry.installed_at.is_empty() {
if entry.resource_type == crate::core::ResourceType::Skill {
context.project_dir.join(resource_dir).join(&entry.name)
} else {
context.project_dir.join(resource_dir).join(format!("{}.md", entry.name))
}
} else {
context.project_dir.join(&entry.installed_at)
};
if let Some(result) = should_skip_trusted(entry, &dest_path, context) {
return Ok(result);
}
let existing_checksum = if entry.resource_type == crate::core::ResourceType::Skill {
if dest_path.exists() && dest_path.is_dir() {
let path = dest_path.clone();
tokio::task::spawn_blocking(move || LockFile::compute_directory_checksum(&path))
.await??
.into()
} else {
None
}
} else if dest_path.exists() {
let path = dest_path.clone();
tokio::task::spawn_blocking(move || LockFile::compute_checksum(&path)).await??.into()
} else {
None
};
if let Some((checksum, context_checksum, patches, token_count)) =
should_skip_installation(entry, &dest_path, existing_checksum.as_ref(), context)
{
return Ok((false, checksum, context_checksum, patches, token_count));
}
if entry.is_local() {
tracing::debug!(
"Processing local dependency: {} (early-exit optimization skipped)",
entry.name
);
}
let (actually_installed, file_checksum, context_checksum, applied_patches, token_count) =
if entry.resource_type == crate::core::ResourceType::Skill {
let content_changed = existing_checksum.as_ref() != Some(&entry.checksum);
let should_install = entry.install.unwrap_or(true);
let applied_patches = collect_skill_patches(entry, context);
let actually_installed = install_skill_directory(
entry,
&dest_path,
&applied_patches,
should_install,
content_changed,
context,
)
.await?;
let dir_checksum = if actually_installed {
compute_skill_directory_checksum(entry, context).await?
} else {
entry.checksum.clone()
};
(actually_installed, dir_checksum, None, applied_patches, None)
} else {
let content = read_source_content(entry, context).await?;
validate_markdown_content(&content)?;
let (patched_content, applied_patches) =
apply_resource_patches(&content, entry, context)?;
let (final_content, _templating_was_applied, context_checksum) =
render_resource_content(&patched_content, entry, context).await?;
let token_count = crate::tokens::count_tokens(&final_content);
if let Some(threshold) = context.token_warning_threshold {
if token_count as u64 > threshold {
let formatted = crate::tokens::format_token_count(token_count);
let threshold_formatted = crate::tokens::format_token_count(threshold as usize);
tracing::warn!(
"Resource '{}' has ~{} tokens (threshold: {})",
entry.name,
formatted,
threshold_formatted
);
}
}
let file_checksum = compute_file_checksum(&final_content);
let content_changed = existing_checksum.as_ref() != Some(&file_checksum);
let should_install = entry.install.unwrap_or(true);
let actually_installed = write_resource_to_disk(
&dest_path,
&final_content,
should_install,
content_changed,
context,
)
.await?;
(
actually_installed,
file_checksum,
context_checksum,
applied_patches,
Some(token_count as u64),
)
};
Ok((actually_installed, file_checksum, context_checksum, applied_patches, token_count))
}
pub async fn install_resource_with_progress(
entry: &LockedResource,
resource_dir: &str,
context: &InstallContext<'_>,
pb: &ProgressBar,
) -> Result<(bool, String, Option<String>, crate::manifest::patches::AppliedPatches, Option<u64>)> {
pb.set_message(format!("Installing {}", entry.name));
install_resource(entry, resource_dir, context).await
}
pub(crate) async fn install_resource_for_parallel(
entry: &LockedResource,
resource_dir: &str,
context: &InstallContext<'_>,
) -> Result<(bool, String, Option<String>, crate::manifest::patches::AppliedPatches, Option<u64>)> {
install_resource(entry, resource_dir, context).await
}
pub enum ResourceFilter {
All,
Updated(Vec<(String, Option<String>, String, String)>),
}
fn collect_install_entries(
filter: &ResourceFilter,
lockfile: &LockFile,
manifest: &Manifest,
) -> Vec<(LockedResource, String)> {
let all_entries: Vec<(LockedResource, String)> = match filter {
ResourceFilter::All => {
ResourceIterator::collect_all_entries(lockfile, manifest)
.into_iter()
.map(|(entry, dir)| (entry.clone(), dir.into_owned()))
.collect()
}
ResourceFilter::Updated(updates) => {
let mut entries = Vec::new();
for (name, source, _, _) in updates {
if let Some((resource_type, entry)) =
ResourceIterator::find_resource_by_name_and_source(
lockfile,
name,
source.as_deref(),
)
{
let tool = entry.tool.as_deref().unwrap_or("claude-code");
let Some(artifact_path) =
manifest.get_artifact_resource_path(tool, resource_type)
else {
tracing::warn!(
name = %name,
tool = %tool,
resource_type = %resource_type,
"Skipping resource: tool does not support this resource type"
);
continue;
};
let target_dir = artifact_path.display().to_string();
entries.push((entry.clone(), target_dir));
}
}
entries
}
};
if all_entries.is_empty() {
return Vec::new();
}
let mut sorted_entries = all_entries;
sorted_entries.sort_by(|(a, _), (b, _)| {
a.resource_type.cmp(&b.resource_type).then_with(|| a.name.cmp(&b.name))
});
sorted_entries
}
async fn pre_warm_worktrees(
entries: &[(LockedResource, String)],
cache: &Cache,
filter: &ResourceFilter,
max_concurrency: usize,
) {
let mut unique_worktrees = HashSet::new();
for (entry, _) in entries {
if let Some(source_name) = &entry.source
&& let Some(url) = &entry.url
{
if let Some(sha) = entry.resolved_commit.as_ref().filter(|commit| {
commit.len() == 40 && commit.chars().all(|c| c.is_ascii_hexdigit())
}) {
unique_worktrees.insert((source_name.clone(), url.clone(), sha.clone()));
}
}
}
if unique_worktrees.is_empty() {
return;
}
let context = match filter {
ResourceFilter::All => "pre-warm",
ResourceFilter::Updated(_) => "update-pre-warm",
};
let total = unique_worktrees.len();
tracing::debug!(
"Starting worktree pre-warming for {} worktrees with concurrency {}",
total,
max_concurrency
);
stream::iter(unique_worktrees)
.map(|(source, url, sha)| {
let cache = cache.clone();
async move {
let display_name = format!("{}@{}", source, &sha[..8]);
tracing::trace!("Pre-warming worktree: {}", display_name);
let start = std::time::Instant::now();
cache.get_or_create_worktree_for_sha(&source, &url, &sha, Some(context)).await.ok(); let elapsed = start.elapsed();
tracing::trace!("Worktree {} took {:?}", display_name, elapsed);
}
})
.buffer_unordered(max_concurrency)
.collect::<Vec<_>>()
.await;
tracing::debug!("Completed worktree pre-warming");
}
#[allow(clippy::too_many_arguments)]
async fn execute_parallel_installation(
entries: Vec<(LockedResource, String)>,
project_dir: &Path,
cache: &Cache,
manifest: &Manifest,
lockfile: &Arc<LockFile>,
force_refresh: bool,
verbose: bool,
max_concurrency: Option<usize>,
progress: Option<Arc<MultiPhaseProgress>>,
old_lockfile: Option<&LockFile>,
trust_lockfile_checksums: bool,
token_warning_threshold: Option<u64>,
) -> Vec<InstallResult> {
let installed_count = Arc::new(Mutex::new(0));
let type_counts =
Arc::new(Mutex::new(std::collections::HashMap::<crate::core::ResourceType, usize>::new()));
let concurrency = max_concurrency.unwrap_or(usize::MAX).max(1);
let total = entries.len();
stream::iter(entries)
.map(|(entry, resource_dir)| {
let project_dir = project_dir.to_path_buf();
let installed_count = Arc::clone(&installed_count);
let type_counts = Arc::clone(&type_counts);
let cache = cache.clone();
let progress = progress.clone();
let entry_type = entry.resource_type;
async move {
if let Some(ref pm) = progress {
pm.mark_resource_active(&entry);
}
let install_context = InstallContext::with_common_options_and_trust(
&project_dir,
&cache,
Some(manifest),
Some(lockfile),
force_refresh,
verbose,
old_lockfile,
trust_lockfile_checksums,
token_warning_threshold,
);
let res =
install_resource_for_parallel(&entry, &resource_dir, &install_context).await;
match res {
Ok((actually_installed, file_checksum, context_checksum, applied_patches, token_count)) => {
let timeout = default_lock_timeout();
let mut count = match tokio::time::timeout(timeout, installed_count.lock()).await {
Ok(guard) => guard,
Err(_) => {
eprintln!("[DEADLOCK] Timeout waiting for installed_count lock after {:?}", timeout);
return Err((entry.id(), anyhow::anyhow!("Timeout waiting for installed_count lock after {:?} - possible deadlock", timeout)));
}
};
*count += 1;
if actually_installed {
let mut type_guard = match tokio::time::timeout(timeout, type_counts.lock()).await {
Ok(guard) => guard,
Err(_) => {
eprintln!("[DEADLOCK] Timeout waiting for type_counts lock after {:?}", timeout);
return Err((entry.id(), anyhow::anyhow!("Timeout waiting for type_counts lock after {:?} - possible deadlock", timeout)));
}
};
*type_guard.entry(entry_type).or_insert(0) += 1;
}
if let Some(ref pm) = progress {
pm.mark_resource_complete(&entry, *count, total);
}
Ok((
entry.id(),
actually_installed,
file_checksum,
context_checksum,
applied_patches,
token_count,
))
}
Err(err) => {
let timeout = default_lock_timeout();
let mut count = match tokio::time::timeout(timeout, installed_count.lock()).await {
Ok(guard) => guard,
Err(_) => {
eprintln!("[DEADLOCK] Timeout waiting for installed_count lock after {:?}", timeout);
return Err((entry.id(), anyhow::anyhow!("Timeout waiting for installed_count lock after {:?} - possible deadlock", timeout)));
}
};
*count += 1;
if let Some(ref pm) = progress {
pm.mark_resource_complete(&entry, *count, total);
}
Err((entry.id(), err))
}
}
}
})
.buffered(concurrency)
.collect()
.await
}
fn process_install_results(
results: Vec<InstallResult>,
progress: Option<Arc<MultiPhaseProgress>>,
) -> Result<InstallationResults> {
let mut errors = Vec::new();
let mut checksums = Vec::new();
let mut context_checksums = Vec::new();
let mut applied_patches_list = Vec::new();
let mut token_counts = Vec::new();
for result in results {
match result {
Ok((id, _installed, file_checksum, context_checksum, applied_patches, token_count)) => {
checksums.push((id.clone(), file_checksum));
context_checksums.push((id.clone(), context_checksum));
applied_patches_list.push((id.clone(), applied_patches));
token_counts.push((id, token_count));
}
Err((id, error)) => {
errors.push((id, error));
}
}
}
if let Some(ref pm) = progress {
if !errors.is_empty() {
pm.complete_phase_with_window(Some(&format!(
"Failed to install {} resources",
errors.len()
)));
} else {
let installed_count = checksums.len();
if installed_count > 0 {
pm.complete_phase_with_window(Some(&format!(
"Installed {installed_count} resources"
)));
}
}
}
if !errors.is_empty() {
let mut unique_errors: std::collections::HashMap<ResourceId, anyhow::Error> =
std::collections::HashMap::new();
for (id, error) in errors {
unique_errors.entry(id).or_insert(error);
}
let error_msgs: Vec<String> = unique_errors
.into_iter()
.map(|(id, error)| {
let mut current_error: &dyn std::error::Error = error.as_ref();
loop {
if let Some(template_error) =
current_error.downcast_ref::<crate::templating::TemplateError>()
{
return format!(
" {}:\n{}",
id, template_error.format_with_context()
);
}
match current_error.source() {
Some(source) => current_error = source,
None => break,
}
}
format!(" {}: {:#}", id, error) })
.collect();
return Err(anyhow::anyhow!(
"Installation incomplete: {} resource(s) could not be set up\n{}",
error_msgs.len(),
error_msgs.join("\n\n")
));
}
let installed_count = checksums.len();
Ok(InstallationResults::new(
installed_count,
checksums,
context_checksums,
applied_patches_list,
token_counts,
))
}
#[allow(clippy::too_many_arguments)]
pub async fn install_resources(
filter: ResourceFilter,
lockfile: &Arc<LockFile>,
manifest: &Manifest,
project_dir: &Path,
cache: Cache,
force_refresh: bool,
max_concurrency: Option<usize>,
progress: Option<Arc<MultiPhaseProgress>>,
verbose: bool,
old_lockfile: Option<&LockFile>,
trust_lockfile_checksums: bool,
token_warning_threshold: Option<u64>,
) -> Result<InstallationResults> {
let all_entries = collect_install_entries(&filter, lockfile, manifest);
if all_entries.is_empty() {
return Ok(InstallationResults::new(0, Vec::new(), Vec::new(), Vec::new(), Vec::new()));
}
let total = all_entries.len();
let concurrency = max_concurrency.unwrap_or_else(|| {
let cores = std::thread::available_parallelism()
.map(std::num::NonZero::get)
.unwrap_or(FALLBACK_CORE_COUNT);
std::cmp::max(MIN_PARALLELISM, cores * PARALLELISM_CORE_MULTIPLIER)
});
let window_size =
crate::utils::progress::MultiPhaseProgress::calculate_window_size(concurrency);
pre_warm_worktrees(&all_entries, &cache, &filter, concurrency).await;
if let Some(ref pm) = progress {
pm.start_phase_with_active_tracking(
InstallationPhase::InstallingResources,
total,
window_size,
);
}
let results = execute_parallel_installation(
all_entries,
project_dir,
&cache,
manifest,
lockfile,
force_refresh,
verbose,
max_concurrency,
progress.clone(),
old_lockfile,
trust_lockfile_checksums,
token_warning_threshold,
)
.await;
process_install_results(results, progress)
}
pub async fn finalize_installation(
lockfile: &mut LockFile,
manifest: &Manifest,
project_dir: &Path,
cache: &Cache,
old_lockfile: Option<&LockFile>,
quiet: bool,
no_lock: bool,
) -> Result<(usize, usize)> {
use anyhow::Context;
let mut hook_count = 0;
let mut server_count = 0;
if !lockfile.hooks.is_empty() {
let hooks_changed = crate::hooks::install_hooks(lockfile, project_dir, cache).await?;
hook_count = lockfile.hooks.len();
if !quiet {
if hook_count == 1 {
if hooks_changed == 1 {
println!("✓ Configured 1 hook (1 changed)");
} else {
println!("✓ Configured 1 hook ({hooks_changed} changed)");
}
} else {
println!("✓ Configured {hook_count} hooks ({hooks_changed} changed)");
}
}
}
if !lockfile.mcp_servers.is_empty() {
use crate::mcp::handlers::McpHandler;
use std::collections::HashMap;
let mut servers_by_type: HashMap<String, Vec<crate::lockfile::LockedResource>> =
HashMap::new();
{
for server in &lockfile.mcp_servers {
let tool = server.tool.clone().unwrap_or_else(|| "claude-code".to_string());
servers_by_type.entry(tool).or_default().push(server.clone());
}
}
let mut all_mcp_patches: Vec<(String, crate::manifest::patches::AppliedPatches)> =
Vec::new();
let mut total_mcp_changed = 0;
for (artifact_type, servers) in servers_by_type {
if let Some(handler) = crate::mcp::handlers::get_mcp_handler(&artifact_type) {
let artifact_base = manifest
.get_tool_config(&artifact_type)
.map(|c| &c.path)
.ok_or_else(|| {
anyhow::anyhow!(
"Tool '{}' is not configured. Please define it in [default-tools] section.",
artifact_type
)
})?;
let artifact_base = project_dir.join(artifact_base);
let server_entries = servers.clone();
let (applied_patches_list, changed_count) = handler
.configure_mcp_servers(
project_dir,
&artifact_base,
&server_entries,
cache,
manifest,
)
.await
.with_context(|| {
format!(
"Failed to configure MCP servers for artifact type '{}'",
artifact_type
)
})?;
all_mcp_patches.extend(applied_patches_list);
total_mcp_changed += changed_count;
server_count += servers.len();
}
}
for (name, applied_patches) in all_mcp_patches {
lockfile.update_resource_applied_patches(&name, &applied_patches);
}
let mcp_servers_changed = total_mcp_changed;
if server_count > 0 && !quiet {
if server_count == 1 {
if mcp_servers_changed == 1 {
println!("✓ Configured 1 MCP server (1 changed)");
} else {
println!("✓ Configured 1 MCP server ({mcp_servers_changed} changed)");
}
} else {
println!("✓ Configured {server_count} MCP servers ({mcp_servers_changed} changed)");
}
}
}
if let Some(old) = old_lockfile {
if let Ok(removed) = cleanup_removed_artifacts(old, lockfile, project_dir).await {
if !removed.is_empty() && !quiet {
println!("✓ Cleaned up {} moved or removed artifact(s)", removed.len());
}
}
}
if !no_lock {
let (public_lock, private_lock) = lockfile.split_by_privacy();
public_lock.save(&project_dir.join("agpm.lock")).with_context(|| {
format!("Failed to save lockfile to {}", project_dir.join("agpm.lock").display())
})?;
private_lock.save(project_dir).with_context(|| "Failed to save private lockfile")?;
}
Ok((hook_count, server_count))
}
pub fn find_parent_resources(lockfile: &LockFile, resource_name: &str) -> Vec<String> {
use crate::core::ResourceIterator;
let mut parents = Vec::new();
for (entry, _dir) in
ResourceIterator::collect_all_entries(lockfile, &crate::manifest::Manifest::default())
{
if entry.dependencies.iter().any(|dep| dep == resource_name) {
let parent_name = entry.manifest_alias.as_ref().unwrap_or(&entry.name).clone();
parents.push(parent_name);
}
}
parents
}