use anyhow::Result;
use futures::stream::{self, StreamExt};
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::constants::default_lock_timeout;
use crate::core::ResourceIterator;
use crate::lockfile::LockFile;
use crate::manifest::Manifest;
use indicatif::ProgressBar;
use super::{InstallContext, install_resource_for_parallel};
pub async fn install_updated_resources(
updates: &[(String, Option<String>, String, String)], lockfile: &Arc<LockFile>,
manifest: &Manifest,
install_ctx: &InstallContext<'_>,
pb: Option<&ProgressBar>,
_quiet: bool,
) -> Result<usize> {
let project_dir = install_ctx.project_dir;
let cache = install_ctx.cache;
if updates.is_empty() {
return Ok(0);
}
let total = updates.len();
let mut entries_to_install = Vec::new();
for (name, source, _, _) in updates {
if let Some((resource_type, entry)) =
ResourceIterator::find_resource_by_name_and_source(lockfile, name, source.as_deref())
{
let tool = entry.tool.as_deref().unwrap_or("claude-code");
let artifact_path = manifest
.get_artifact_resource_path(tool, resource_type)
.ok_or_else(|| {
anyhow::anyhow!(
"Resource type '{}' is not supported by tool '{}' - check tool configuration",
resource_type,
tool
)
})?;
let target_dir = artifact_path.display().to_string();
entries_to_install.push((entry.clone(), target_dir));
}
}
if entries_to_install.is_empty() {
return Ok(0);
}
if let Some(pb) = pb {
pb.set_message("Preparing resources...");
}
let mut unique_worktrees = HashSet::new();
for (entry, _) in &entries_to_install {
if let Some(source_name) = &entry.source
&& let Some(url) = &entry.url
{
if let Some(sha) = entry.resolved_commit.as_ref().filter(|commit| {
commit.len() == 40 && commit.chars().all(|c| c.is_ascii_hexdigit())
}) {
unique_worktrees.insert((source_name.clone(), url.clone(), sha.clone()));
}
}
}
if !unique_worktrees.is_empty() {
use futures::future;
let worktree_futures: Vec<_> = unique_worktrees
.into_iter()
.map(|(source, url, sha)| {
async move {
cache
.get_or_create_worktree_for_sha(
&source,
&url,
&sha,
Some("update-pre-warm"),
)
.await
.ok(); }
})
.collect();
future::join_all(worktree_futures).await;
}
let installed_count = Arc::new(Mutex::new(0));
let pb = pb.map(Arc::new);
let cache = Arc::new(cache);
if let Some(ref pb) = pb {
pb.set_message(format!("Installing 0/{total} resources"));
}
let results: Vec<Result<(), anyhow::Error>> = stream::iter(entries_to_install)
.map(|(entry, resource_dir)| {
let project_dir = project_dir.to_path_buf();
let installed_count = Arc::clone(&installed_count);
let pb = pb.clone();
let cache = Arc::clone(&cache);
let lockfile = Arc::clone(lockfile);
async move {
let mut builder = InstallContext::builder(&project_dir, cache.as_ref())
.manifest(manifest)
.lockfile(&lockfile);
if let Some(patches) = install_ctx.project_patches {
builder = builder.project_patches(patches);
}
if let Some(patches) = install_ctx.private_patches {
builder = builder.private_patches(patches);
}
if let Some(size) = install_ctx.max_content_file_size {
builder = builder.max_content_file_size(size);
}
let context = builder.build();
install_resource_for_parallel(&entry, &resource_dir, &context).await?;
let timeout = default_lock_timeout();
let mut count = match tokio::time::timeout(timeout, installed_count.lock()).await {
Ok(guard) => guard,
Err(_) => {
eprintln!("[DEADLOCK] Timeout waiting for installed_count lock after {:?}", timeout);
anyhow::bail!("Timeout waiting for installed_count lock after {:?} - possible deadlock", timeout);
}
};
*count += 1;
if let Some(pb) = pb {
pb.set_message(format!("Installing {}/{} resources", *count, total));
pb.inc(1);
}
Ok::<(), anyhow::Error>(())
}
})
.buffered(usize::MAX) .collect()
.await;
for result in results {
result?;
}
let timeout = default_lock_timeout();
let final_count = match tokio::time::timeout(timeout, installed_count.lock()).await {
Ok(guard) => *guard,
Err(_) => {
eprintln!("[DEADLOCK] Timeout waiting for installed_count lock after {:?}", timeout);
anyhow::bail!(
"Timeout waiting for installed_count lock after {:?} - possible deadlock",
timeout
);
}
};
Ok(final_count)
}