use anyhow::Result;
use futures::stream::{self, StreamExt};
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::core::ResourceIterator;
use crate::lockfile::LockFile;
use crate::manifest::Manifest;
use crate::utils::progress::ProgressBar;
use super::{InstallContext, install_resource_for_parallel};
pub async fn install_updated_resources(
updates: &[(String, Option<String>, String, String)], lockfile: &Arc<LockFile>,
manifest: &Manifest,
install_ctx: &InstallContext<'_>,
pb: Option<&ProgressBar>,
_quiet: bool,
) -> Result<usize> {
let project_dir = install_ctx.project_dir;
let cache = install_ctx.cache;
if updates.is_empty() {
return Ok(0);
}
let total = updates.len();
let mut entries_to_install = Vec::new();
for (name, source, _, _) in updates {
if let Some((resource_type, entry)) =
ResourceIterator::find_resource_by_name_and_source(lockfile, name, source.as_deref())
{
let tool = entry.tool.as_deref().unwrap_or("claude-code");
let artifact_path = manifest
.get_artifact_resource_path(tool, resource_type)
.expect("Resource type should be supported by configured tools");
let target_dir = artifact_path.display().to_string();
entries_to_install.push((entry.clone(), target_dir));
}
}
if entries_to_install.is_empty() {
return Ok(0);
}
if let Some(pb) = pb {
pb.set_message("Preparing resources...");
}
let mut unique_worktrees = HashSet::new();
for (entry, _) in &entries_to_install {
if let Some(source_name) = &entry.source
&& let Some(url) = &entry.url
{
if let Some(sha) = entry.resolved_commit.as_ref().filter(|commit| {
commit.len() == 40 && commit.chars().all(|c| c.is_ascii_hexdigit())
}) {
unique_worktrees.insert((source_name.clone(), url.clone(), sha.clone()));
}
}
}
if !unique_worktrees.is_empty() {
use futures::future;
let worktree_futures: Vec<_> = unique_worktrees
.into_iter()
.map(|(source, url, sha)| {
async move {
cache
.get_or_create_worktree_for_sha(
&source,
&url,
&sha,
Some("update-pre-warm"),
)
.await
.ok(); }
})
.collect();
future::join_all(worktree_futures).await;
}
let installed_count = Arc::new(Mutex::new(0));
let pb = pb.map(Arc::new);
let cache = Arc::new(cache);
if let Some(ref pb) = pb {
pb.set_message(format!("Installing 0/{total} resources"));
}
let results: Vec<Result<(), anyhow::Error>> = stream::iter(entries_to_install)
.map(|(entry, resource_dir)| {
let project_dir = project_dir.to_path_buf();
let installed_count = Arc::clone(&installed_count);
let pb = pb.clone();
let cache = Arc::clone(&cache);
let lockfile = Arc::clone(lockfile);
async move {
let context = InstallContext::new(
&project_dir,
cache.as_ref(),
false,
false, Some(manifest),
Some(&lockfile),
None, install_ctx.project_patches,
install_ctx.private_patches,
install_ctx.gitignore_lock,
install_ctx.max_content_file_size,
);
install_resource_for_parallel(&entry, &resource_dir, &context).await?;
let mut count = installed_count.lock().await;
*count += 1;
if let Some(pb) = pb {
pb.set_message(format!("Installing {}/{} resources", *count, total));
pb.inc(1);
}
Ok::<(), anyhow::Error>(())
}
})
.buffered(usize::MAX) .collect()
.await;
for result in results {
result?;
}
let final_count = *installed_count.lock().await;
Ok(final_count)
}