use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use anyhow::{Context, Result};
use dashmap::DashMap;
use futures::future::join_all;
use tokio::sync::{Mutex, MutexGuard};
use crate::core::ResourceType;
use crate::lockfile::lockfile_dependency_ref::LockfileDependencyRef;
use crate::manifest::{DetailedDependency, ResourceDependency};
use crate::metadata::MetadataExtractor;
use crate::utils;
use crate::version::conflict::ConflictDetector;
use crate::version::constraints::VersionConstraint;
use super::dependency_graph::{DependencyGraph, DependencyNode};
use super::pattern_expander::generate_dependency_name;
use super::types::{DependencyKey, TransitiveContext, apply_manifest_override};
use super::version_resolver::{PreparedSourceVersion, VersionResolutionService};
use super::{PatternExpansionService, ResourceFetchingService, is_file_relative_path};
use crate::constants::{batch_operation_timeout, default_lock_timeout};
async fn acquire_mutex_with_timeout<'a, T>(
mutex: &'a Mutex<T>,
name: &str,
) -> Result<MutexGuard<'a, T>> {
let timeout = default_lock_timeout();
match tokio::time::timeout(timeout, mutex.lock()).await {
Ok(guard) => Ok(guard),
Err(_) => {
eprintln!("[DEADLOCK] Timeout waiting for mutex '{}' after {:?}", name, timeout);
anyhow::bail!(
"Timeout waiting for Mutex '{}' after {:?} - possible deadlock",
name,
timeout
)
}
}
}
fn is_semver_version(version: Option<&str>) -> bool {
match version {
Some(v) => VersionConstraint::parse(v).is_ok_and(|c| c.is_semver()),
None => false,
}
}
fn should_replace_existing(existing: &ResourceDependency, new: &ResourceDependency) -> bool {
let existing_is_semver = is_semver_version(existing.get_version());
let new_is_semver = is_semver_version(new.get_version());
if new_is_semver && !existing_is_semver {
tracing::debug!(
"Preferring semver '{}' over git ref '{}'",
new.get_version().unwrap_or("none"),
existing.get_version().unwrap_or("none")
);
return true;
}
false
}
pub struct ResolutionServices<'a> {
pub version_service: &'a VersionResolutionService,
pub pattern_service: &'a PatternExpansionService,
}
pub struct TransitiveResolutionParams<'a> {
pub ctx: &'a mut TransitiveContext<'a>,
pub core: &'a super::ResolutionCore,
pub base_deps: &'a [(String, ResourceDependency, ResourceType)],
pub enable_transitive: bool,
pub prepared_versions: &'a Arc<DashMap<String, PreparedSourceVersion>>,
pub pattern_alias_map: &'a Arc<DashMap<(ResourceType, String), String>>,
pub services: &'a ResolutionServices<'a>,
pub progress: Option<std::sync::Arc<crate::utils::MultiPhaseProgress>>,
}
struct TransitiveDepProcessingParams<'a> {
ctx: &'a TransitiveContext<'a>,
core: &'a super::ResolutionCore,
parent_dep: &'a ResourceDependency,
dep_resource_type: ResourceType,
parent_resource_type: ResourceType,
parent_name: &'a str,
dep_spec: &'a crate::manifest::DependencySpec,
version_service: &'a VersionResolutionService,
prepared_versions: &'a Arc<DashMap<String, PreparedSourceVersion>>,
}
struct TransitiveProcessingContext<'a> {
input: TransitiveInput,
shared: TransitiveSharedState<'a>,
resolution: TransitiveResolutionContext<'a>,
progress: Option<Arc<utils::MultiPhaseProgress>>,
}
#[derive(Debug, Clone)]
struct TransitiveInput {
name: String,
dep: ResourceDependency,
resource_type: ResourceType,
variant_hash: String,
}
type QueueEntry = (String, ResourceDependency, Option<ResourceType>, String);
type CanonicalPathKey = (ResourceType, String, Option<String>, Option<String>, String);
struct TransitiveSharedState<'a> {
graph: Arc<tokio::sync::Mutex<DependencyGraph>>,
all_deps: Arc<DashMap<DependencyKey, ResourceDependency>>,
processed: Arc<DashMap<DependencyKey, ()>>,
queue: Arc<tokio::sync::Mutex<Vec<QueueEntry>>>,
queue_len: Arc<AtomicUsize>,
pattern_alias_map: Arc<DashMap<(ResourceType, String), String>>,
completed_counter: Arc<AtomicUsize>,
dependency_map: &'a Arc<DashMap<DependencyKey, Vec<String>>>,
custom_names: &'a Arc<DashMap<DependencyKey, String>>,
prepared_versions: &'a Arc<DashMap<String, PreparedSourceVersion>>,
canonical_path_index: Arc<DashMap<CanonicalPathKey, String>>,
}
struct TransitiveResolutionContext<'a> {
ctx_base: &'a super::types::ResolutionContext<'a>,
manifest_overrides: &'a super::types::ManifestOverrideIndex,
core: &'a super::ResolutionCore,
services: &'a ResolutionServices<'a>,
}
async fn process_transitive_dependency_spec(
params: TransitiveDepProcessingParams<'_>,
) -> Result<(ResourceDependency, String)> {
let parent_file_path = ResourceFetchingService::get_canonical_path(
params.core,
params.parent_dep,
params.version_service,
)
.await
.with_context(|| {
format!("Failed to get parent path for transitive dependencies of '{}'", params.parent_name)
})?;
let trans_canonical =
resolve_transitive_path(&parent_file_path, ¶ms.dep_spec.path, params.parent_name)?;
let trans_dep = create_transitive_dependency(
params.ctx,
params.parent_dep,
params.dep_resource_type,
params.parent_resource_type,
params.parent_name,
params.dep_spec,
&parent_file_path,
&trans_canonical,
params.prepared_versions,
)
.await?;
let trans_name = if trans_dep.get_source().is_none() {
let manifest_dir = params
.ctx
.base
.manifest
.manifest_dir
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Manifest directory not available"))?;
let source_context = crate::resolver::source_context::SourceContext::local(manifest_dir);
generate_dependency_name(trans_dep.get_path(), &source_context)
} else {
let source_name = trans_dep
.get_source()
.ok_or_else(|| anyhow::anyhow!("Git dependency missing source name"))?;
let source_context = crate::resolver::source_context::SourceContext::remote(source_name);
generate_dependency_name(trans_dep.get_path(), &source_context)
};
Ok((trans_dep, trans_name))
}
fn resolve_transitive_path(
parent_file_path: &Path,
dep_path: &str,
parent_name: &str,
) -> Result<PathBuf> {
let is_pattern = dep_path.contains('*') || dep_path.contains('?') || dep_path.contains('[');
if is_pattern {
let parent_dir = parent_file_path.parent().ok_or_else(|| {
anyhow::anyhow!(
"Failed to resolve transitive dependency '{}' for '{}': parent file has no directory",
dep_path,
parent_name
)
})?;
let resolved = parent_dir.join(dep_path);
let mut result = PathBuf::new();
for component in resolved.components() {
match component {
std::path::Component::RootDir => result.push(component),
std::path::Component::ParentDir => {
result.pop();
}
std::path::Component::CurDir => {}
_ => result.push(component),
}
}
Ok(result)
} else if is_file_relative_path(dep_path) || !dep_path.contains('/') {
let parent_dir = parent_file_path.parent().ok_or_else(|| {
anyhow::anyhow!(
"Failed to resolve transitive dependency '{}' for '{}': parent file has no directory",
dep_path,
parent_name
)
})?;
let resolved = parent_dir.join(dep_path);
resolved.canonicalize().map_err(|e| {
let file_error = crate::core::file_error::FileOperationError::new(
crate::core::file_error::FileOperationContext::new(
crate::core::file_error::FileOperation::Canonicalize,
&resolved,
format!("resolving transitive dependency '{}' for '{}'", dep_path, parent_name),
"transitive_resolver::resolve_transitive_path",
),
e,
);
anyhow::Error::from(file_error)
})
} else {
resolve_repo_relative_path(parent_file_path, dep_path, parent_name)
}
}
fn resolve_repo_relative_path(
parent_file_path: &Path,
dep_path: &str,
parent_name: &str,
) -> Result<PathBuf> {
let repo_root = parent_file_path
.ancestors()
.find(|p| {
let git_path = p.join(".git");
git_path.is_file()
})
.or_else(|| parent_file_path.ancestors().nth(2)) .ok_or_else(|| {
anyhow::anyhow!(
"Failed to find repository root for transitive dependency '{}'",
dep_path
)
})?;
let full_path = repo_root.join(dep_path);
full_path.canonicalize().with_context(|| {
format!(
"Failed to resolve repo-relative transitive dependency '{}' for '{}': {} (repo root: {})",
dep_path,
parent_name,
full_path.display(),
repo_root.display()
)
})
}
#[allow(clippy::too_many_arguments)]
async fn create_transitive_dependency(
ctx: &TransitiveContext<'_>,
parent_dep: &ResourceDependency,
dep_resource_type: ResourceType,
parent_resource_type: ResourceType,
_parent_name: &str,
dep_spec: &crate::manifest::DependencySpec,
parent_file_path: &Path,
trans_canonical: &Path,
prepared_versions: &Arc<DashMap<String, PreparedSourceVersion>>,
) -> Result<ResourceDependency> {
use super::types::{OverrideKey, normalize_lookup_path};
let mut dep = if parent_dep.get_source().is_none() {
create_path_only_transitive_dep(
ctx,
parent_dep,
dep_resource_type,
parent_resource_type,
dep_spec,
trans_canonical,
)?
} else {
create_git_backed_transitive_dep(
ctx,
parent_dep,
dep_resource_type,
parent_resource_type,
dep_spec,
parent_file_path,
trans_canonical,
prepared_versions,
)
.await?
};
let normalized_path = normalize_lookup_path(dep.get_path());
let source = dep.get_source().map(std::string::ToString::to_string);
let tool = dep
.get_tool()
.map(str::to_string)
.unwrap_or_else(|| ctx.base.manifest.get_default_tool(dep_resource_type));
let variant_hash =
super::lockfile_builder::compute_merged_variant_hash(ctx.base.manifest, &dep);
let override_key = OverrideKey {
resource_type: dep_resource_type,
normalized_path: normalized_path.clone(),
source,
tool,
variant_hash,
};
if let Some(override_info) = ctx.manifest_overrides.get(&override_key) {
apply_manifest_override(&mut dep, override_info, &normalized_path);
}
Ok(dep)
}
fn create_path_only_transitive_dep(
ctx: &TransitiveContext<'_>,
parent_dep: &ResourceDependency,
dep_resource_type: ResourceType,
parent_resource_type: ResourceType,
dep_spec: &crate::manifest::DependencySpec,
trans_canonical: &Path,
) -> Result<ResourceDependency> {
let manifest_dir = ctx.base.manifest.manifest_dir.as_ref().ok_or_else(|| {
anyhow::anyhow!("Manifest directory not available for path-only transitive dep")
})?;
let dep_path_str = match manifest_dir.canonicalize() {
Ok(canonical_manifest) => {
utils::compute_relative_path(&canonical_manifest, trans_canonical)
}
Err(e) => {
eprintln!(
"Warning: Could not canonicalize manifest directory {}: {}. Using non-canonical path.",
manifest_dir.display(),
e
);
utils::compute_relative_path(manifest_dir, trans_canonical)
}
};
let trans_tool = determine_transitive_tool(
ctx,
parent_dep,
dep_spec,
parent_resource_type,
dep_resource_type,
);
Ok(ResourceDependency::Detailed(Box::new(DetailedDependency {
source: None,
path: utils::normalize_path_for_storage(dep_path_str),
version: None,
branch: None,
rev: None,
command: None,
args: None,
target: None,
filename: None,
dependencies: None,
tool: trans_tool,
flatten: None,
install: dep_spec.install.or(Some(true)),
template_vars: Some(super::lockfile_builder::build_merged_variant_inputs(
ctx.base.manifest,
parent_dep,
)),
})))
}
#[allow(clippy::too_many_arguments)]
async fn create_git_backed_transitive_dep(
ctx: &TransitiveContext<'_>,
parent_dep: &ResourceDependency,
dep_resource_type: ResourceType,
parent_resource_type: ResourceType,
dep_spec: &crate::manifest::DependencySpec,
parent_file_path: &Path,
trans_canonical: &Path,
_prepared_versions: &Arc<DashMap<String, PreparedSourceVersion>>,
) -> Result<ResourceDependency> {
let source_name = parent_dep
.get_source()
.ok_or_else(|| anyhow::anyhow!("Expected source for Git-backed dependency"))?;
let source_url = ctx
.base
.source_manager
.get_source_url(source_name)
.ok_or_else(|| anyhow::anyhow!("Source '{source_name}' not found"))?;
let repo_relative = if utils::is_local_path(&source_url) {
strip_local_source_prefix(&source_url, trans_canonical)?
} else {
strip_git_worktree_prefix_from_parent(parent_file_path, trans_canonical)?
};
let trans_tool = determine_transitive_tool(
ctx,
parent_dep,
dep_spec,
parent_resource_type,
dep_resource_type,
);
Ok(ResourceDependency::Detailed(Box::new(DetailedDependency {
source: Some(source_name.to_string()),
path: utils::normalize_path_for_storage(repo_relative.to_string_lossy().to_string()),
version: dep_spec
.version
.clone()
.or_else(|| parent_dep.get_version().map(|v| v.to_string())),
branch: None,
rev: None,
command: None,
args: None,
target: None,
filename: None,
dependencies: None,
tool: trans_tool,
flatten: None,
install: dep_spec.install.or(Some(true)),
template_vars: Some(super::lockfile_builder::build_merged_variant_inputs(
ctx.base.manifest,
parent_dep,
)),
})))
}
fn strip_local_source_prefix(source_url: &str, trans_canonical: &Path) -> Result<PathBuf> {
let source_url_path = PathBuf::from(source_url);
let source_path = source_url_path.canonicalize().map_err(|e| {
let file_error = crate::core::file_error::FileOperationError::new(
crate::core::file_error::FileOperationContext::new(
crate::core::file_error::FileOperation::Canonicalize,
&source_url_path,
"canonicalizing local source path for transitive dependency".to_string(),
"transitive_resolver::strip_local_source_prefix",
),
e,
);
anyhow::Error::from(file_error)
})?;
let trans_str = trans_canonical.to_string_lossy();
let is_pattern = trans_str.contains('*') || trans_str.contains('?') || trans_str.contains('[');
if is_pattern {
let parent_dir = trans_canonical.parent().ok_or_else(|| {
anyhow::anyhow!("Pattern path has no parent directory: {}", trans_canonical.display())
})?;
let filename = trans_canonical.file_name().ok_or_else(|| {
anyhow::anyhow!("Pattern path has no filename: {}", trans_canonical.display())
})?;
let canonical_dir = parent_dir.canonicalize().map_err(|e| {
let file_error = crate::core::file_error::FileOperationError::new(
crate::core::file_error::FileOperationContext::new(
crate::core::file_error::FileOperation::Canonicalize,
parent_dir,
"canonicalizing pattern directory for local source".to_string(),
"transitive_resolver::strip_local_source_prefix",
),
e,
);
anyhow::Error::from(file_error)
})?;
let canonical_pattern = canonical_dir.join(filename);
canonical_pattern
.strip_prefix(&source_path)
.with_context(|| {
format!(
"Transitive pattern dep outside parent's source: {} not under {}",
canonical_pattern.display(),
source_path.display()
)
})
.map(|p| p.to_path_buf())
} else {
trans_canonical
.strip_prefix(&source_path)
.with_context(|| {
format!(
"Transitive dep resolved outside parent's source directory: {} not under {}",
trans_canonical.display(),
source_path.display()
)
})
.map(|p| p.to_path_buf())
}
}
fn strip_git_worktree_prefix_from_parent(
parent_file_path: &Path,
trans_canonical: &Path,
) -> Result<PathBuf> {
let worktree_root = parent_file_path
.ancestors()
.find(|p| {
let git_path = p.join(".git");
git_path.is_file()
})
.ok_or_else(|| {
anyhow::anyhow!(
"Failed to find worktree root from parent file: {}",
parent_file_path.display()
)
})?;
let canonical_worktree = worktree_root.canonicalize().map_err(|e| {
let file_error = crate::core::file_error::FileOperationError::new(
crate::core::file_error::FileOperationContext::new(
crate::core::file_error::FileOperation::Canonicalize,
worktree_root,
"canonicalizing worktree root for transitive dependency".to_string(),
"transitive_resolver::strip_git_worktree_prefix_from_parent",
),
e,
);
anyhow::Error::from(file_error)
})?;
let trans_str = trans_canonical.to_string_lossy();
let is_pattern = trans_str.contains('*') || trans_str.contains('?') || trans_str.contains('[');
if is_pattern {
let parent_dir = trans_canonical.parent().ok_or_else(|| {
anyhow::anyhow!("Pattern path has no parent directory: {}", trans_canonical.display())
})?;
let filename = trans_canonical.file_name().ok_or_else(|| {
anyhow::anyhow!("Pattern path has no filename: {}", trans_canonical.display())
})?;
let canonical_dir = parent_dir.canonicalize().map_err(|e| {
let file_error = crate::core::file_error::FileOperationError::new(
crate::core::file_error::FileOperationContext::new(
crate::core::file_error::FileOperation::Canonicalize,
parent_dir,
"canonicalizing pattern directory for Git worktree".to_string(),
"transitive_resolver::strip_git_worktree_prefix_from_parent",
),
e,
);
anyhow::Error::from(file_error)
})?;
let canonical_pattern = canonical_dir.join(filename);
canonical_pattern
.strip_prefix(&canonical_worktree)
.with_context(|| {
format!(
"Transitive pattern dep outside parent's worktree: {} not under {}",
canonical_pattern.display(),
canonical_worktree.display()
)
})
.map(|p| p.to_path_buf())
} else {
trans_canonical
.strip_prefix(&canonical_worktree)
.with_context(|| {
format!(
"Transitive dep outside parent's worktree: {} not under {}",
trans_canonical.display(),
canonical_worktree.display()
)
})
.map(|p| p.to_path_buf())
}
}
fn determine_transitive_tool(
ctx: &TransitiveContext<'_>,
parent_dep: &ResourceDependency,
dep_spec: &crate::manifest::DependencySpec,
parent_resource_type: ResourceType,
dep_resource_type: ResourceType,
) -> Option<String> {
if let Some(explicit_tool) = &dep_spec.tool {
Some(explicit_tool.clone())
} else {
let parent_tool = parent_dep
.get_tool()
.map(str::to_string)
.unwrap_or_else(|| ctx.base.manifest.get_default_tool(parent_resource_type));
if ctx.base.manifest.is_resource_supported(&parent_tool, dep_resource_type) {
Some(parent_tool)
} else {
Some(ctx.base.manifest.get_default_tool(dep_resource_type))
}
}
}
fn build_ordered_result(
all_deps: Arc<DashMap<DependencyKey, ResourceDependency>>,
ordered_nodes: Vec<DependencyNode>,
) -> Result<Vec<(String, ResourceDependency, ResourceType)>> {
let mut result = Vec::new();
let mut added_keys = HashSet::new();
tracing::debug!(
"Transitive resolution - topological order has {} nodes, all_deps has {} entries",
ordered_nodes.len(),
all_deps.len()
);
for node in ordered_nodes {
tracing::debug!(
"Processing ordered node: {}/{} (source: {:?})",
node.resource_type,
node.name,
node.source
);
for entry in all_deps.iter() {
let (key, dep) = (entry.key(), entry.value());
if key.0 == node.resource_type && key.1 == node.name && key.2 == node.source {
tracing::debug!(
" -> Found match in all_deps, adding to result with type {:?}",
node.resource_type
);
result.push((node.name.clone(), dep.clone(), node.resource_type));
added_keys.insert(key.clone());
break;
}
}
}
for entry in all_deps.iter() {
let (key, dep) = (entry.key(), entry.value());
if !added_keys.contains(key) && !dep.is_pattern() {
tracing::debug!(
"Adding non-graph dependency: {}/{} (source: {:?}) with type {:?}",
key.0,
key.1,
key.2,
key.0
);
result.push((key.1.clone(), dep.clone(), key.0));
}
}
tracing::debug!("Transitive resolution returning {} dependencies", result.len());
Ok(result)
}
pub fn group_key(source: &str, version: &str) -> String {
format!("{source}::{version}")
}
async fn process_single_transitive_dependency<'a>(
ctx: TransitiveProcessingContext<'a>,
) -> Result<()> {
let source = ctx.input.dep.get_source().map(std::string::ToString::to_string);
let tool =
Some(ctx.input.dep.get_tool().map(std::string::ToString::to_string).unwrap_or_else(|| {
ctx.resolution.ctx_base.manifest.get_default_tool(ctx.input.resource_type)
}));
let canonical_name = if source.is_none() {
let manifest_dir = ctx
.resolution
.ctx_base
.manifest
.manifest_dir
.as_deref()
.unwrap_or(std::path::Path::new("."));
let source_context = crate::resolver::source_context::SourceContext::local(manifest_dir);
generate_dependency_name(ctx.input.dep.get_path(), &source_context)
} else {
let source_name = source.as_deref().unwrap_or("unknown");
let source_context = crate::resolver::source_context::SourceContext::remote(source_name);
generate_dependency_name(ctx.input.dep.get_path(), &source_context)
};
let key = (
ctx.input.resource_type,
ctx.input.name.clone(),
source.clone(),
tool.clone(),
ctx.input.variant_hash.clone(),
);
let display_name = if source.is_some() {
if let Some(version) = ctx.input.dep.get_version() {
format!("{}@{}", ctx.input.name, version)
} else {
format!("{}@HEAD", ctx.input.name)
}
} else {
ctx.input.name.clone()
};
let progress_key = format!("{}:{}", ctx.input.resource_type, &display_name);
if let Some(ref pm) = ctx.progress {
pm.mark_item_active(&display_name, &progress_key);
}
tracing::debug!(
"[TRANSITIVE] Processing: '{}' (type: {:?}, source: {:?})",
ctx.input.name,
ctx.input.resource_type,
source
);
let is_stale = ctx
.shared
.all_deps
.get(&key)
.map(|current_dep| current_dep.get_version() != ctx.input.dep.get_version())
.unwrap_or(false);
if is_stale {
tracing::debug!("[TRANSITIVE] Skipped stale: '{}'", ctx.input.name);
if let Some(ref pm) = ctx.progress {
let completed = ctx.shared.completed_counter.fetch_add(1, Ordering::SeqCst) + 1;
let total = completed + ctx.shared.queue_len.load(Ordering::SeqCst);
pm.mark_item_complete(
&progress_key,
Some(&display_name),
completed,
total,
"Scanning dependencies",
);
}
return Ok(());
}
if ctx.shared.processed.contains_key(&key) {
tracing::debug!("[TRANSITIVE] Already processed: '{}'", ctx.input.name);
if let Some(ref pm) = ctx.progress {
let completed = ctx.shared.completed_counter.fetch_add(1, Ordering::SeqCst) + 1;
let total = completed + ctx.shared.queue_len.load(Ordering::SeqCst);
pm.mark_item_complete(
&progress_key,
Some(&display_name),
completed,
total,
"Scanning dependencies",
);
}
return Ok(());
}
ctx.shared.processed.insert(key.clone(), ());
if ctx.input.dep.is_pattern() {
tracing::debug!("[TRANSITIVE] Expanding pattern: '{}'", ctx.input.name);
match ctx
.resolution
.services
.pattern_service
.expand_pattern(
ctx.resolution.core,
&ctx.input.dep,
ctx.input.resource_type,
ctx.shared.prepared_versions.as_ref(),
)
.await
{
Ok(concrete_deps) => {
let mut items_to_queue = Vec::new();
for (concrete_name, concrete_dep) in concrete_deps {
ctx.shared.pattern_alias_map.insert(
(ctx.input.resource_type, concrete_name.clone()),
ctx.input.name.clone(),
);
let concrete_source =
concrete_dep.get_source().map(std::string::ToString::to_string);
let concrete_tool =
concrete_dep.get_tool().map(std::string::ToString::to_string);
let concrete_variant_hash =
super::lockfile_builder::compute_merged_variant_hash(
ctx.resolution.ctx_base.manifest,
&concrete_dep,
);
let concrete_key = (
ctx.input.resource_type,
concrete_name.clone(),
concrete_source,
concrete_tool,
concrete_variant_hash.clone(),
);
match ctx.shared.all_deps.entry(concrete_key) {
dashmap::mapref::entry::Entry::Vacant(e) => {
e.insert(concrete_dep.clone());
items_to_queue.push((
concrete_name,
concrete_dep,
Some(ctx.input.resource_type),
concrete_variant_hash,
));
}
dashmap::mapref::entry::Entry::Occupied(mut e) => {
let existing = e.get();
if should_replace_existing(existing, &concrete_dep) {
tracing::debug!(
"[PATTERN] Replacing existing dep '{}' with semver version",
concrete_name
);
e.insert(concrete_dep.clone());
items_to_queue.push((
concrete_name,
concrete_dep,
Some(ctx.input.resource_type),
concrete_variant_hash,
));
}
}
}
}
if !items_to_queue.is_empty() {
let items_count = items_to_queue.len();
let mut queue =
acquire_mutex_with_timeout(&ctx.shared.queue, "transitive_queue").await?;
queue.extend(items_to_queue);
ctx.shared.queue_len.fetch_add(items_count, Ordering::SeqCst);
}
}
Err(e) => {
anyhow::bail!("Failed to expand pattern '{}': {}", ctx.input.dep.get_path(), e);
}
}
if let Some(ref pm) = ctx.progress {
let completed = ctx.shared.completed_counter.fetch_add(1, Ordering::SeqCst) + 1;
let total = completed + ctx.shared.queue_len.load(Ordering::SeqCst);
pm.mark_item_complete(
&progress_key,
Some(&display_name),
completed,
total,
"Scanning dependencies",
);
}
return Ok(());
}
let content = if ctx.input.resource_type == ResourceType::Skill {
let skill_md_dep = create_skill_md_dependency(&ctx.input.dep);
ResourceFetchingService::fetch_content(
ctx.resolution.core,
&skill_md_dep,
ctx.resolution.services.version_service,
)
.await
.with_context(|| {
format!(
"Failed to fetch SKILL.md for skill '{}' ({})",
ctx.input.name,
ctx.input.dep.get_path()
)
})?
} else {
ResourceFetchingService::fetch_content(
ctx.resolution.core,
&ctx.input.dep,
ctx.resolution.services.version_service,
)
.await
.with_context(|| {
format!(
"Failed to fetch resource '{}' ({}) for transitive deps",
ctx.input.name,
ctx.input.dep.get_path()
)
})?
};
tracing::debug!(
"[TRANSITIVE] Fetched content for '{}' ({} bytes)",
ctx.input.name,
content.len()
);
let variant_inputs_value = super::lockfile_builder::build_merged_variant_inputs(
ctx.resolution.ctx_base.manifest,
&ctx.input.dep,
);
let variant_inputs = Some(&variant_inputs_value);
let path = if ctx.input.resource_type == ResourceType::Skill {
PathBuf::from(format!("{}/SKILL.md", ctx.input.dep.get_path().trim_end_matches('/')))
} else {
PathBuf::from(ctx.input.dep.get_path())
};
let metadata = MetadataExtractor::extract(
&path,
&content,
variant_inputs,
ctx.resolution.ctx_base.operation_context.map(|arc| arc.as_ref()),
)?;
tracing::debug!(
"[DEBUG] Extracted metadata for '{}': has_deps={}",
ctx.input.name,
metadata.get_dependencies().is_some()
);
if let Some(deps_map) = metadata.get_dependencies() {
tracing::debug!(
"[DEBUG] Found {} dependency type(s) for '{}': {:?}",
deps_map.len(),
ctx.input.name,
deps_map.keys().collect::<Vec<_>>()
);
let mut items_to_queue = Vec::new();
let mut graph_edges: Vec<(DependencyNode, DependencyNode)> = Vec::new();
let declared_count = metadata.dependency_count();
let declared_deps: Vec<(String, String)> = deps_map
.iter()
.flat_map(|(rtype, specs)| specs.iter().map(move |s| (rtype.clone(), s.path.clone())))
.collect();
for (dep_resource_type_str, dep_specs) in deps_map {
let dep_resource_type: ResourceType =
dep_resource_type_str.parse().unwrap_or(ResourceType::Snippet);
for dep_spec in dep_specs {
let mut dummy_conflict_detector = ConflictDetector::new();
let temp_ctx = super::types::TransitiveContext {
base: *ctx.resolution.ctx_base,
dependency_map: ctx.shared.dependency_map,
transitive_custom_names: ctx.shared.custom_names,
conflict_detector: &mut dummy_conflict_detector,
manifest_overrides: ctx.resolution.manifest_overrides,
};
let (trans_dep, trans_name) =
process_transitive_dependency_spec(TransitiveDepProcessingParams {
ctx: &temp_ctx,
core: ctx.resolution.core,
parent_dep: &ctx.input.dep,
dep_resource_type,
parent_resource_type: ctx.input.resource_type,
parent_name: &ctx.input.name,
dep_spec,
version_service: ctx.resolution.services.version_service,
prepared_versions: ctx.shared.prepared_versions,
})
.await?;
let trans_source = trans_dep.get_source().map(std::string::ToString::to_string);
let trans_tool = Some(
trans_dep.get_tool().map(std::string::ToString::to_string).unwrap_or_else(
|| ctx.resolution.ctx_base.manifest.get_default_tool(dep_resource_type),
),
);
let trans_variant_hash = super::lockfile_builder::compute_merged_variant_hash(
ctx.resolution.ctx_base.manifest,
&trans_dep,
);
let canonical_path = super::types::normalize_lookup_path(trans_dep.get_path());
let canonical_lookup_key = (
dep_resource_type,
canonical_path.clone(),
trans_source.clone(),
trans_tool.clone(),
trans_variant_hash.clone(),
);
let effective_name = if let Some(manifest_alias) =
ctx.shared.canonical_path_index.get(&canonical_lookup_key)
{
let alias = manifest_alias.value().clone();
tracing::debug!(
"[TRANSITIVE] Transitive dep '{}' matches manifest dep '{}' - using alias for deduplication",
trans_name,
alias
);
alias
} else {
trans_name.clone()
};
let trans_key = (
dep_resource_type,
effective_name.clone(),
trans_source.clone(),
trans_tool.clone(),
trans_variant_hash.clone(),
);
let graph_dep_name = trans_name.clone();
tracing::debug!(
"[TRANSITIVE] Found transitive dep '{}' (type: {:?}, tool: {:?}, parent: {})",
trans_name,
dep_resource_type,
trans_tool,
ctx.input.name
);
if let Some(custom_name) = &dep_spec.name {
ctx.shared.custom_names.insert(trans_key.clone(), custom_name.clone());
tracing::debug!(
"Storing custom name '{}' for transitive dep '{}'",
custom_name,
trans_name
);
}
let from_node = DependencyNode::with_source(
ctx.input.resource_type,
&canonical_name,
source.clone(),
);
let to_node = DependencyNode::with_source(
dep_resource_type,
&graph_dep_name,
trans_source.clone(),
);
graph_edges.push((from_node, to_node));
let from_key = (
ctx.input.resource_type,
ctx.input.name.clone(),
source.clone(),
tool.clone(),
ctx.input.variant_hash.clone(),
);
let dep_ref =
LockfileDependencyRef::local(dep_resource_type, graph_dep_name.clone(), None)
.to_string();
tracing::debug!(
"[DEBUG] Adding to dependency_map: parent='{}' (type={:?}, source={:?}, tool={:?}, hash={}), child='{}' (type={:?})",
ctx.input.name,
ctx.input.resource_type,
source,
tool,
&ctx.input.variant_hash[..8],
dep_ref,
dep_resource_type
);
ctx.shared.dependency_map.entry(from_key).or_default().push(dep_ref);
match ctx.shared.all_deps.entry(trans_key) {
dashmap::mapref::entry::Entry::Vacant(e) => {
tracing::debug!(
"Adding transitive dep '{}' (parent: {})",
trans_name,
ctx.input.name
);
e.insert(trans_dep.clone());
items_to_queue.push((
trans_name,
trans_dep,
Some(dep_resource_type),
trans_variant_hash,
));
}
dashmap::mapref::entry::Entry::Occupied(mut e) => {
let existing = e.get();
if should_replace_existing(existing, &trans_dep) {
tracing::debug!(
"[TRANSITIVE] Replacing existing dep '{}' (version: {:?}) with semver version {:?}",
trans_name,
existing.get_version(),
trans_dep.get_version()
);
e.insert(trans_dep.clone());
items_to_queue.push((
trans_name,
trans_dep,
Some(dep_resource_type),
trans_variant_hash,
));
} else {
tracing::debug!(
"[TRANSITIVE] Keeping existing dep '{}' (version: {:?} vs new {:?})",
trans_name,
existing.get_version(),
trans_dep.get_version()
);
}
}
}
}
}
let resolved_count = graph_edges.len();
if !graph_edges.is_empty() {
let mut graph =
acquire_mutex_with_timeout(&ctx.shared.graph, "dependency_graph").await?;
for (from_node, to_node) in graph_edges {
graph.add_dependency(from_node, to_node);
}
}
if !items_to_queue.is_empty() {
let items_count = items_to_queue.len();
let mut queue =
acquire_mutex_with_timeout(&ctx.shared.queue, "transitive_queue").await?;
queue.extend(items_to_queue);
ctx.shared.queue_len.fetch_add(items_count, Ordering::SeqCst);
}
if resolved_count < declared_count {
return Err(crate::core::AgpmError::DependencyResolutionMismatch {
resource: ctx.input.name.clone(),
declared_count,
resolved_count,
declared_deps,
}
.into());
}
}
if let Some(ref pm) = ctx.progress {
let completed = ctx.shared.completed_counter.fetch_add(1, Ordering::SeqCst) + 1;
let total = completed + ctx.shared.queue_len.load(Ordering::SeqCst);
pm.mark_item_complete(
&progress_key,
Some(&display_name),
completed,
total,
"Scanning dependencies",
);
}
Ok(())
}
pub async fn resolve_with_services(
params: TransitiveResolutionParams<'_>,
) -> Result<Vec<(String, ResourceDependency, ResourceType)>> {
let TransitiveResolutionParams {
ctx,
core,
base_deps,
enable_transitive,
prepared_versions,
pattern_alias_map,
services,
progress,
} = params;
ctx.dependency_map.clear();
if !enable_transitive {
return Ok(base_deps.to_vec());
}
let graph = Arc::new(tokio::sync::Mutex::new(DependencyGraph::new()));
let all_deps: Arc<DashMap<DependencyKey, ResourceDependency>> = Arc::new(DashMap::new());
let processed: Arc<DashMap<DependencyKey, ()>> = Arc::new(DashMap::new()); let canonical_path_index: Arc<DashMap<CanonicalPathKey, String>> = Arc::new(DashMap::new());
type QueueItem = (String, ResourceDependency, Option<ResourceType>, String);
#[allow(clippy::type_complexity)]
let queue: Arc<tokio::sync::Mutex<Vec<QueueItem>>> =
Arc::new(tokio::sync::Mutex::new(Vec::new()));
let queue_len = Arc::new(AtomicUsize::new(0));
{
let mut queue_guard = acquire_mutex_with_timeout(&queue, "transitive_queue").await?;
for (name, dep, resource_type) in base_deps {
let source = dep.get_source().map(std::string::ToString::to_string);
let tool = Some(
dep.get_tool()
.map(std::string::ToString::to_string)
.unwrap_or_else(|| ctx.base.manifest.get_default_tool(*resource_type)),
);
let merged_variant_inputs =
super::lockfile_builder::build_merged_variant_inputs(ctx.base.manifest, dep);
let variant_hash = crate::utils::compute_variant_inputs_hash(&merged_variant_inputs)
.unwrap_or_else(|_| crate::utils::EMPTY_VARIANT_INPUTS_HASH.to_string());
tracing::debug!(
"[DEBUG] Adding base dep to queue: '{}' (type: {:?}, source: {:?}, tool: {:?}, is_local: {})",
name,
resource_type,
source,
tool,
dep.is_local()
);
queue_guard.push((
name.clone(),
dep.clone(),
Some(*resource_type),
variant_hash.clone(),
));
all_deps.insert(
(*resource_type, name.clone(), source.clone(), tool.clone(), variant_hash.clone()),
dep.clone(),
);
let canonical_path = super::types::normalize_lookup_path(dep.get_path());
canonical_path_index
.insert((*resource_type, canonical_path, source, tool, variant_hash), name.clone());
}
queue_len.store(queue_guard.len(), Ordering::SeqCst);
}
let completed_counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
let cores = std::thread::available_parallelism().map(std::num::NonZero::get).unwrap_or(4);
let max_concurrent = std::cmp::max(10, cores * 2);
let ctx_dependency_map = ctx.dependency_map;
let ctx_custom_names = ctx.transitive_custom_names;
let ctx_base = &ctx.base;
let ctx_manifest_overrides = ctx.manifest_overrides;
loop {
let batch: Vec<QueueEntry> = {
let mut q = acquire_mutex_with_timeout(&queue, "transitive_queue").await?;
let current_queue_len = q.len();
let batch_size = std::cmp::min(max_concurrent, current_queue_len);
if batch_size == 0 {
break; }
let mut batch_vec =
q.drain(current_queue_len.saturating_sub(batch_size)..).collect::<Vec<_>>();
batch_vec.reverse(); queue_len.fetch_sub(batch_vec.len(), Ordering::SeqCst);
batch_vec
};
let batch_futures: Vec<_> = batch
.into_iter()
.map(|(name, dep, resource_type, variant_hash)| {
let graph_clone = Arc::clone(&graph);
let all_deps_clone = Arc::clone(&all_deps);
let processed_clone = Arc::clone(&processed);
let queue_clone = Arc::clone(&queue);
let queue_len_clone = Arc::clone(&queue_len);
let pattern_alias_map_clone = Arc::clone(pattern_alias_map);
let progress_clone = progress.clone();
let counter_clone = Arc::clone(&completed_counter);
let prepared_versions_clone = Arc::clone(prepared_versions);
let dependency_map_clone = ctx_dependency_map;
let custom_names_clone = ctx_custom_names;
let manifest_overrides_clone = ctx_manifest_overrides;
let canonical_path_index_clone = Arc::clone(&canonical_path_index);
async move {
let resource_type = resource_type
.expect("resource_type should always be threaded through queue");
let ctx = TransitiveProcessingContext {
input: TransitiveInput {
name,
dep,
resource_type,
variant_hash,
},
shared: TransitiveSharedState {
graph: graph_clone,
all_deps: all_deps_clone,
processed: processed_clone,
queue: queue_clone,
queue_len: queue_len_clone,
pattern_alias_map: pattern_alias_map_clone,
completed_counter: counter_clone,
dependency_map: dependency_map_clone,
custom_names: custom_names_clone,
prepared_versions: &prepared_versions_clone,
canonical_path_index: canonical_path_index_clone,
},
resolution: TransitiveResolutionContext {
ctx_base,
manifest_overrides: manifest_overrides_clone,
core,
services,
},
progress: progress_clone,
};
process_single_transitive_dependency(ctx).await
}
})
.collect();
let timeout_duration = batch_operation_timeout();
let results = tokio::time::timeout(timeout_duration, join_all(batch_futures))
.await
.with_context(|| {
format!(
"Batch transitive resolution timed out after {:?} - possible deadlock",
timeout_duration
)
})?;
for result in results {
result?;
}
}
acquire_mutex_with_timeout(&graph, "dependency_graph").await?.detect_cycles()?;
let ordered_nodes =
acquire_mutex_with_timeout(&graph, "dependency_graph").await?.topological_order()?;
build_ordered_result(all_deps, ordered_nodes)
}
fn create_skill_md_dependency(dep: &ResourceDependency) -> ResourceDependency {
match dep {
ResourceDependency::Simple(path) => {
let skill_md_path = format!("{}/SKILL.md", path.trim_end_matches('/'));
ResourceDependency::Simple(skill_md_path)
}
ResourceDependency::Detailed(detailed) => {
let skill_md_path = format!("{}/SKILL.md", detailed.path.trim_end_matches('/'));
ResourceDependency::Detailed(Box::new(DetailedDependency {
path: skill_md_path,
source: detailed.source.clone(),
version: detailed.version.clone(),
branch: detailed.branch.clone(),
rev: detailed.rev.clone(),
command: detailed.command.clone(),
args: detailed.args.clone(),
target: detailed.target.clone(),
filename: detailed.filename.clone(),
dependencies: detailed.dependencies.clone(),
tool: detailed.tool.clone(),
flatten: detailed.flatten,
install: detailed.install,
template_vars: detailed.template_vars.clone(),
}))
}
}
}