use anyhow::{Context, Result};
use dashmap::DashMap;
use futures::stream::{self, StreamExt};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use super::types::ResolutionMode;
use crate::cache::Cache;
use crate::git::GitRepo;
use crate::manifest::ResourceDependency;
use crate::source::SourceManager;
#[derive(Debug, Clone)]
pub struct VersionEntry {
pub source: String,
pub url: String,
pub version: Option<String>,
pub resolved_sha: Option<String>,
pub resolved_version: Option<String>,
pub resolution_mode: ResolutionMode,
}
impl VersionEntry {
pub fn format_display(&self) -> String {
let version = self.version.as_deref().unwrap_or("HEAD");
format!("{}@{}", self.source, version)
}
pub fn unique_key(&self) -> String {
let version = self.version.as_deref().unwrap_or("HEAD");
format!("{}:{}", self.source, version)
}
}
#[derive(Debug, Clone)]
pub struct ResolvedVersion {
pub sha: String,
pub resolved_ref: String,
}
pub struct VersionResolver {
cache: Cache,
entries: Arc<DashMap<(String, String), VersionEntry>>,
resolved: Arc<DashMap<(String, String), ResolvedVersion>>,
bare_repos: Arc<DashMap<String, PathBuf>>,
max_concurrency: usize,
}
impl VersionResolver {
pub fn new(cache: Cache) -> Self {
let cores = std::thread::available_parallelism().map(std::num::NonZero::get).unwrap_or(4);
let default_concurrency = std::cmp::max(10, cores * 2);
Self {
cache,
entries: Arc::new(DashMap::new()),
resolved: Arc::new(DashMap::new()),
bare_repos: Arc::new(DashMap::new()),
max_concurrency: default_concurrency,
}
}
pub fn with_concurrency(cache: Cache, max_concurrency: usize) -> Self {
Self {
cache,
entries: Arc::new(DashMap::new()),
resolved: Arc::new(DashMap::new()),
bare_repos: Arc::new(DashMap::new()),
max_concurrency,
}
}
pub fn add_version(
&self,
source: &str,
url: &str,
version: Option<&str>,
resolution_mode: ResolutionMode,
) {
let version_key = version.unwrap_or("HEAD").to_string();
let key = (source.to_string(), version_key);
self.entries.entry(key).or_insert_with(|| VersionEntry {
source: source.to_string(),
url: url.to_string(),
version: version.map(std::string::ToString::to_string),
resolved_sha: None,
resolved_version: None,
resolution_mode,
});
}
pub async fn resolve_all(
&self,
progress: Option<std::sync::Arc<crate::utils::MultiPhaseProgress>>,
) -> Result<()> {
let mut by_source: HashMap<String, Vec<(String, VersionEntry)>> = HashMap::new();
for entry_ref in self.entries.iter() {
let (key, entry) = entry_ref.pair();
by_source.entry(entry.source.clone()).or_default().push((key.1.clone(), entry.clone()));
}
let total_versions: usize = by_source.values().map(|v| v.len()).sum();
let completed_counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
for (source, versions) in by_source {
let repo_path = self
.bare_repos
.get(&source)
.ok_or_else(|| {
anyhow::anyhow!("Repository for source '{source}' was not pre-synced. Call pre_sync_sources() first.")
})?
.clone();
let repo = GitRepo::new(&repo_path);
let tags_cache = if versions.iter().any(|(_, e)| !crate::utils::is_local_path(&e.url)) {
repo.list_tags().await.ok()
} else {
None
};
let mut version_to_ref: Vec<(String, VersionEntry, String)> = Vec::new();
let mut branch_checks_needed: Vec<(String, String)> = Vec::new();
for (version_str, entry) in &versions {
if let Some(ref pm) = progress {
let display = entry.format_display();
let key = entry.unique_key();
pm.mark_item_active(&display, &key);
}
let is_local = crate::utils::is_local_path(&entry.url);
if is_local {
version_to_ref.push((version_str.clone(), entry.clone(), "local".to_string()));
continue;
}
let resolved_ref = if let Some(ref version) = entry.version {
if is_version_constraint(version) {
let tags = tags_cache.as_ref().ok_or_else(|| {
anyhow::anyhow!(
"Tags should have been pre-fetched for constraint '{version}'"
)
})?;
find_best_matching_tag(version, tags.clone())
.with_context(|| format!("Failed to resolve version constraint '{version}' for source '{source}'"))?
} else {
version.clone()
}
} else {
repo.get_default_branch().await.unwrap_or_else(|_| "main".to_string())
};
let ref_result = determine_ref_to_resolve(&resolved_ref, tags_cache.as_ref());
match ref_result {
RefResolutionResult::DirectSha(sha) => {
let key = (source.clone(), version_str.clone());
self.resolved.insert(
key,
ResolvedVersion {
sha: sha.clone(),
resolved_ref: resolved_ref.clone(),
},
);
if let Some(ref pm) = progress {
let completed = completed_counter
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
+ 1;
pm.mark_item_complete(
&entry.unique_key(),
Some(&entry.format_display()),
completed,
total_versions,
"Resolving dependencies",
);
}
}
RefResolutionResult::DirectRef(ref_name) => {
version_to_ref.push((version_str.clone(), entry.clone(), ref_name));
}
RefResolutionResult::NeedsBranchCheck {
origin_ref,
} => {
branch_checks_needed.push((origin_ref.clone(), version_str.clone()));
version_to_ref.push((version_str.clone(), entry.clone(), origin_ref));
}
}
}
if !branch_checks_needed.is_empty() {
let refs_to_check: Vec<&str> = branch_checks_needed
.iter()
.map(|(origin_ref, _)| origin_ref.as_str())
.collect();
let origin_exists = repo.resolve_refs_batch(&refs_to_check).await?;
for (version_str, entry, ref_name) in version_to_ref.iter_mut() {
if ref_name.starts_with("origin/") {
let branch = ref_name.strip_prefix("origin/").unwrap();
if origin_exists.get(ref_name.as_str()).and_then(|v| v.as_ref()).is_none() {
*ref_name = branch.to_string();
}
let _ = (version_str, entry); }
}
}
let refs_to_resolve: Vec<&str> = version_to_ref
.iter()
.filter(|(_, _, ref_name)| ref_name != "local")
.map(|(_, _, ref_name)| ref_name.as_str())
.collect();
let sha_results = if !refs_to_resolve.is_empty() {
repo.resolve_refs_batch(&refs_to_resolve).await?
} else {
HashMap::new()
};
for (version_str, entry, ref_name) in version_to_ref {
if ref_name == "local" {
if let Some(ref pm) = progress {
let completed =
completed_counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1;
pm.mark_item_complete(
&entry.unique_key(),
Some(&entry.format_display()),
completed,
total_versions,
"Resolving dependencies",
);
}
continue;
}
let sha = sha_results.get(&ref_name).and_then(|v| v.clone());
if let Some(sha_value) = sha {
tracing::debug!(
"RESOLVE: source='{}' version='{}' ref='{}' -> SHA={}",
source,
version_str,
ref_name,
&sha_value[..8.min(sha_value.len())]
);
let key = (source.clone(), version_str);
self.resolved.insert(
key,
ResolvedVersion {
sha: sha_value,
resolved_ref: ref_name,
},
);
} else {
return Err(anyhow::anyhow!(
"Failed to resolve version '{}' (ref '{}') for source '{}'",
version_str,
ref_name,
source
));
}
if let Some(ref pm) = progress {
let completed =
completed_counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1;
pm.mark_item_complete(
&entry.unique_key(),
Some(&entry.format_display()),
completed,
total_versions,
"Resolving dependencies",
);
}
}
}
Ok(())
}
pub async fn resolve_single(
&self,
source: &str,
url: &str,
version: Option<&str>,
) -> Result<String> {
let repo_path = self
.cache
.get_or_clone_source(source, url, None)
.await
.with_context(|| format!("Failed to prepare repository for source '{source}'"))?;
let repo = GitRepo::new(&repo_path);
let sha = repo.resolve_to_sha(version).await.with_context(|| {
format!(
"Failed to resolve version '{}' for source '{}'",
version.unwrap_or("HEAD"),
source
)
})?;
let resolved_ref = if let Some(v) = version {
v.to_string()
} else {
repo.get_default_branch().await.unwrap_or_else(|_| "main".to_string())
};
let version_key = version.unwrap_or("HEAD").to_string();
let key = (source.to_string(), version_key);
self.resolved.insert(
key,
ResolvedVersion {
sha: sha.clone(),
resolved_ref,
},
);
Ok(sha)
}
pub fn get_resolved_sha(&self, source: &str, version: &str) -> Option<String> {
let key = (source.to_string(), version.to_string());
self.resolved.get(&key).map(|rv| rv.sha.clone())
}
pub fn get_all_resolved(&self) -> HashMap<(String, String), String> {
self.resolved.iter().map(|entry| (entry.key().clone(), entry.value().sha.clone())).collect()
}
pub fn get_all_resolved_full(&self) -> HashMap<(String, String), ResolvedVersion> {
self.resolved.iter().map(|entry| (entry.key().clone(), entry.value().clone())).collect()
}
pub fn is_resolved(&self, source: &str, version: &str) -> bool {
let key = (source.to_string(), version.to_string());
self.resolved.contains_key(&key)
}
pub async fn pre_sync_sources(
&self,
progress: Option<std::sync::Arc<crate::utils::MultiPhaseProgress>>,
) -> Result<()> {
let mut unique_sources: HashMap<String, String> = HashMap::new();
for entry_ref in self.entries.iter() {
let entry = entry_ref.value();
unique_sources.insert(entry.source.clone(), entry.url.clone());
}
let total = unique_sources.len();
if total == 0 {
return Ok(());
}
let concurrency = std::cmp::min(self.max_concurrency, total);
if let Some(ref pm) = progress {
let window_size =
crate::utils::progress::MultiPhaseProgress::calculate_window_size(concurrency);
pm.start_phase_with_active_tracking(
crate::utils::progress::InstallationPhase::SyncingSources,
total,
window_size,
);
}
let completed = std::sync::atomic::AtomicUsize::new(0);
let results: Vec<Result<(String, PathBuf), anyhow::Error>> = stream::iter(unique_sources)
.map(|(source, url)| {
let cache = self.cache.clone();
let progress_clone = progress.clone();
let completed_ref = &completed;
let total_count = total;
let display_name = format_source_display(&source, &url);
async move {
if let Some(ref pm) = progress_clone {
pm.mark_item_active(&display_name, &source);
}
let repo_path =
cache.get_or_clone_source(&source, &url, None).await.with_context(
|| format!("Failed to sync repository for source '{source}'"),
)?;
if let Some(ref pm) = progress_clone {
let done =
completed_ref.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1;
pm.mark_item_complete(
&source,
Some(&display_name),
done,
total_count,
"Syncing sources",
);
}
Ok((source, repo_path))
}
})
.buffer_unordered(concurrency)
.collect()
.await;
if let Some(ref pm) = progress {
pm.complete_phase_with_window(Some("Sources synced"));
}
let mut errors = Vec::new();
for result in results {
match result {
Ok((source, repo_path)) => {
self.bare_repos.insert(source, repo_path);
}
Err(e) => {
errors.push(e);
}
}
}
if !errors.is_empty() {
if errors.len() == 1 {
return Err(errors.into_iter().next().unwrap());
}
let error_messages: Vec<String> = errors.iter().map(|e| format!(" - {e}")).collect();
return Err(anyhow::anyhow!(
"Failed to sync {} sources:\n{}",
errors.len(),
error_messages.join("\n")
));
}
Ok(())
}
pub fn get_bare_repo_path(&self, source: &str) -> Option<PathBuf> {
self.bare_repos.get(source).map(|entry| entry.value().clone())
}
pub fn register_bare_repo(&self, source: String, repo_path: PathBuf) {
self.bare_repos.insert(source, repo_path);
}
pub fn clear(&self) {
self.entries.clear();
self.resolved.clear();
self.bare_repos.clear();
}
pub fn pending_count(&self) -> usize {
self.entries.len()
}
pub fn has_entries(&self) -> bool {
!self.entries.is_empty()
}
pub fn resolved_count(&self) -> usize {
self.resolved.len()
}
}
use super::types::ResolutionCore;
use std::path::Path;
pub struct VersionResolutionService {
version_resolver: VersionResolver,
prepared_versions: std::sync::Arc<dashmap::DashMap<String, PreparedVersionState>>,
}
impl VersionResolutionService {
fn resolution_mode_from_version(version: Option<&str>) -> ResolutionMode {
match version {
Some(v) => {
if v.starts_with('^')
|| v.starts_with('~')
|| v.starts_with('>')
|| v.starts_with('<')
|| v.starts_with('=')
|| v.starts_with('v')
|| v == "latest"
{
ResolutionMode::Version
} else {
ResolutionMode::GitRef
}
}
None => ResolutionMode::Version, }
}
pub fn new(cache: crate::cache::Cache) -> Self {
Self {
version_resolver: VersionResolver::new(cache),
prepared_versions: std::sync::Arc::new(dashmap::DashMap::new()),
}
}
pub fn with_concurrency(cache: crate::cache::Cache, max_concurrency: usize) -> Self {
Self {
version_resolver: VersionResolver::with_concurrency(cache, max_concurrency),
prepared_versions: std::sync::Arc::new(dashmap::DashMap::new()),
}
}
pub async fn pre_sync_sources(
&self,
core: &ResolutionCore,
deps: &[(String, ResourceDependency)],
progress: Option<std::sync::Arc<crate::utils::MultiPhaseProgress>>,
) -> Result<()> {
self.version_resolver.clear();
for (_name, dep) in deps {
if let Some(source) = dep.get_source() {
let version = dep.get_version();
let source_url = core
.source_manager
.get_source_url(source)
.ok_or_else(|| anyhow::anyhow!("Source '{}' not found", source))?;
self.version_resolver.add_version(
source,
&source_url,
version,
dep.resolution_mode(),
);
}
}
self.version_resolver.pre_sync_sources(progress.clone()).await?;
self.version_resolver.resolve_all(progress).await?;
for (_name, dep) in deps {
if let Some(source) = dep.get_source() {
let source_url = core
.source_manager
.get_source_url(source)
.ok_or_else(|| anyhow::anyhow!("Source '{}' not found", source))?;
if crate::utils::is_local_path(&source_url) {
let version_key = dep.get_version().unwrap_or("HEAD");
let group_key = format!("{}::{}", source, version_key);
self.prepared_versions.insert(
group_key,
PreparedVersionState::Ready(PreparedSourceVersion {
worktree_path: PathBuf::from(&source_url),
resolved_version: Some("local".to_string()),
resolved_commit: String::new(), resource_variants: dashmap::DashMap::new(),
}),
);
}
}
}
let worktree_manager =
WorktreeManager::new(&core.cache, &core.source_manager, &self.version_resolver);
let prepared = worktree_manager.create_worktrees_for_resolved_versions().await?;
for (key, value) in prepared {
self.prepared_versions.insert(key, PreparedVersionState::Ready(value));
}
Ok(())
}
pub fn get_prepared_version(&self, group_key: &str) -> Option<PreparedSourceVersion> {
self.prepared_versions.get(group_key).and_then(|entry| {
if let PreparedVersionState::Ready(prepared) = entry.value() {
Some(prepared.clone())
} else {
None
}
})
}
pub fn prepared_versions(
&self,
) -> &std::sync::Arc<dashmap::DashMap<String, PreparedVersionState>> {
&self.prepared_versions
}
pub fn prepared_versions_arc(
&self,
) -> std::sync::Arc<dashmap::DashMap<String, PreparedVersionState>> {
std::sync::Arc::clone(&self.prepared_versions)
}
pub fn prepared_versions_ready(
&self,
) -> std::sync::Arc<dashmap::DashMap<String, PreparedSourceVersion>> {
let ready_map = dashmap::DashMap::new();
for entry in self.prepared_versions.iter() {
if let PreparedVersionState::Ready(prepared) = entry.value() {
ready_map.insert(entry.key().clone(), prepared.clone());
}
}
std::sync::Arc::new(ready_map)
}
pub fn prepared_versions_ready_arc(
&self,
) -> std::sync::Arc<dashmap::DashMap<String, PreparedSourceVersion>> {
self.prepared_versions_ready()
}
pub async fn get_or_prepare_version(
&self,
core: &ResolutionCore,
source_name: &str,
version: Option<&str>,
) -> Result<PreparedSourceVersion> {
let version_key = version.unwrap_or("HEAD");
let group_key = format!("{}::{}", source_name, version_key);
let timeout_duration = crate::constants::pending_state_timeout();
loop {
let action = {
let entry = self.prepared_versions.entry(group_key.clone());
match entry {
dashmap::mapref::entry::Entry::Occupied(occ) => {
match occ.get() {
PreparedVersionState::Ready(prepared) => {
return Ok(prepared.clone());
}
PreparedVersionState::Preparing(notify) => {
let notify = notify.clone();
drop(occ);
Some(notify)
}
}
}
dashmap::mapref::entry::Entry::Vacant(vac) => {
let notify = std::sync::Arc::new(tokio::sync::Notify::new());
vac.insert(PreparedVersionState::Preparing(notify.clone()));
None }
}
};
match action {
Some(notify) => {
tracing::debug!(
target: "version_resolver",
"get_or_prepare_version: waiting for {} @ {} (another task preparing)",
source_name,
version_key
);
let notified = notify.notified();
tokio::pin!(notified);
match tokio::time::timeout(timeout_duration, &mut notified).await {
Ok(()) => {
continue;
}
Err(_) => {
if let Some(prepared) = self.get_prepared_version(&group_key) {
return Ok(prepared);
}
tracing::warn!(
target: "version_resolver",
"get_or_prepare_version: timeout waiting for {} @ {}, retrying",
source_name,
version_key
);
continue;
}
}
}
None => {
let result =
self.do_prepare_version(core, source_name, version, &group_key).await;
match result {
Ok(prepared) => {
return Ok(prepared);
}
Err(e) => {
if let Some((_, PreparedVersionState::Preparing(notify))) =
self.prepared_versions.remove(&group_key)
{
notify.notify_waiters();
}
return Err(e);
}
}
}
}
}
}
async fn do_prepare_version(
&self,
core: &ResolutionCore,
source_name: &str,
version: Option<&str>,
group_key: &str,
) -> Result<PreparedSourceVersion> {
let version_key = version.unwrap_or("HEAD");
tracing::debug!(
target: "version_resolver",
"do_prepare_version: starting for {} @ {}",
source_name,
version_key
);
let source_url = core
.source_manager
.get_source_url(source_name)
.ok_or_else(|| anyhow::anyhow!("Source '{}' not found", source_name))?;
if crate::utils::is_local_path(&source_url) {
let prepared = PreparedSourceVersion {
worktree_path: PathBuf::from(&source_url),
resolved_version: Some("local".to_string()),
resolved_commit: String::new(),
resource_variants: dashmap::DashMap::new(),
};
if let Some(mut entry) = self.prepared_versions.get_mut(group_key) {
if let PreparedVersionState::Preparing(notify) = entry.value() {
let notify = notify.clone();
*entry.value_mut() = PreparedVersionState::Ready(prepared.clone());
drop(entry);
notify.notify_waiters();
}
}
return Ok(prepared);
}
let resolution_mode = Self::resolution_mode_from_version(version);
self.version_resolver.add_version(source_name, &source_url, version, resolution_mode);
if self.version_resolver.get_bare_repo_path(source_name).is_none() {
let (owner, repo) = crate::git::parse_git_url(&source_url)
.unwrap_or(("direct".to_string(), "repo".to_string()));
let bare_repo_path =
core.cache.cache_dir().join("sources").join(format!("{owner}_{repo}.git"));
self.version_resolver.register_bare_repo(source_name.to_string(), bare_repo_path);
}
tracing::debug!(
target: "version_resolver",
"do_prepare_version: calling resolve_all for {} @ {}",
source_name,
version_key
);
self.version_resolver.resolve_all(None).await?;
let resolved_version_data = self
.version_resolver
.get_all_resolved_full()
.get(&(source_name.to_string(), version_key.to_string()))
.ok_or_else(|| {
anyhow::anyhow!("Failed to resolve version for {} @ {}", source_name, version_key)
})?
.clone();
let sha = resolved_version_data.sha.clone();
let resolved_ref = resolved_version_data.resolved_ref.clone();
tracing::debug!(
target: "version_resolver",
"do_prepare_version: creating worktree for {} @ {} (SHA: {})",
source_name,
version_key,
&sha[..8.min(sha.len())]
);
let worktree_path =
core.cache.get_or_create_worktree_for_sha(source_name, &source_url, &sha, None).await?;
let prepared = PreparedSourceVersion {
worktree_path,
resolved_version: Some(resolved_ref),
resolved_commit: sha,
resource_variants: dashmap::DashMap::new(),
};
if let Some(mut entry) = self.prepared_versions.get_mut(group_key) {
if let PreparedVersionState::Preparing(notify) = entry.value() {
let notify = notify.clone();
*entry.value_mut() = PreparedVersionState::Ready(prepared.clone());
drop(entry);
notify.notify_waiters();
}
}
tracing::debug!(
target: "version_resolver",
"do_prepare_version: completed for {} @ {}",
source_name,
version_key
);
Ok(prepared)
}
pub async fn prepare_additional_version(
&self,
core: &ResolutionCore,
source_name: &str,
version: Option<&str>,
) -> Result<()> {
self.get_or_prepare_version(core, source_name, version).await?;
Ok(())
}
pub async fn get_available_versions(
_core: &ResolutionCore,
repo_path: &Path,
) -> Result<Vec<String>> {
let repo = GitRepo::new(repo_path);
let tags = repo.list_tags().await.context("Failed to list tags")?;
let versions = tags;
Ok(versions)
}
pub fn get_bare_repo_path(&self, source: &str) -> Option<PathBuf> {
self.version_resolver.get_bare_repo_path(source)
}
#[cfg(test)]
pub fn version_resolver(&self) -> &VersionResolver {
&self.version_resolver
}
}
fn determine_ref_to_resolve(
version: &str,
tags_cache: Option<&Vec<String>>,
) -> RefResolutionResult {
if version.len() == 40 && version.chars().all(|c| c.is_ascii_hexdigit()) {
return RefResolutionResult::DirectSha(version.to_string());
}
let is_tag = tags_cache.is_some_and(|tags| tags.contains(&version.to_string()));
if is_tag {
RefResolutionResult::DirectRef(version.to_string())
} else if !version.contains('/') && version != "HEAD" {
RefResolutionResult::NeedsBranchCheck {
origin_ref: format!("origin/{version}"),
}
} else {
RefResolutionResult::DirectRef(version.to_string())
}
}
#[derive(Debug, Clone)]
enum RefResolutionResult {
DirectSha(String),
DirectRef(String),
NeedsBranchCheck {
origin_ref: String,
},
}
use crate::version::constraints::{ConstraintSet, VersionConstraint};
use semver::Version;
#[must_use]
pub fn is_version_constraint(version: &str) -> bool {
let (_prefix, version_str) = crate::version::split_prefix_and_version(version);
if version_str == "*" {
return true;
}
if version_str.starts_with('^')
|| version_str.starts_with('~')
|| version_str.starts_with('>')
|| version_str.starts_with('<')
|| version_str.starts_with('=')
|| version_str.contains(',')
{
return true;
}
false
}
pub fn sort_versions_deterministic(pairs: &mut [(String, Version)]) {
pairs.sort_by(|a, b| match b.1.cmp(&a.1) {
std::cmp::Ordering::Equal => a.0.cmp(&b.0), other => other,
});
}
#[must_use]
pub fn parse_tags_to_versions(tags: Vec<String>) -> Vec<(String, Version)> {
let mut versions = Vec::new();
for tag in tags {
let (_prefix, version_str) = crate::version::split_prefix_and_version(&tag);
let cleaned = version_str.trim_start_matches('v').trim_start_matches('V');
if let Ok(version) = Version::parse(cleaned) {
versions.push((tag, version));
}
}
sort_versions_deterministic(&mut versions);
versions
}
pub fn find_best_matching_tag(constraint_str: &str, tags: Vec<String>) -> Result<String> {
let (constraint_prefix, version_str) = crate::version::split_prefix_and_version(constraint_str);
let filtered_tags: Vec<String> = tags
.into_iter()
.filter(|tag| {
let (tag_prefix, _) = crate::version::split_prefix_and_version(tag);
tag_prefix.as_ref() == constraint_prefix.as_ref()
})
.collect();
if filtered_tags.is_empty() {
return Err(anyhow::anyhow!(
"No tags found with matching prefix for constraint: {constraint_str}"
));
}
let tag_versions = parse_tags_to_versions(filtered_tags);
if tag_versions.is_empty() {
return Err(anyhow::anyhow!(
"No valid semantic version tags found for constraint: {constraint_str}"
));
}
if version_str == "*" {
return Ok(tag_versions[0].0.clone());
}
let constraint = VersionConstraint::parse(version_str)?;
let versions: Vec<Version> = tag_versions.iter().map(|(_, v)| v.clone()).collect();
let mut constraint_set = ConstraintSet::new();
constraint_set.add(constraint)?;
if let Some(best_version) = constraint_set.find_best_match(&versions) {
for (tag_name, version) in tag_versions {
if &version == best_version {
return Ok(tag_name);
}
}
}
Err(anyhow::anyhow!("No tag found matching constraint: {constraint_str}"))
}
#[derive(Clone, Debug)]
pub struct PreparedSourceVersion {
pub worktree_path: std::path::PathBuf,
pub resolved_version: Option<String>,
pub resolved_commit: String,
pub resource_variants: dashmap::DashMap<String, Option<serde_json::Value>>,
}
impl Default for PreparedSourceVersion {
fn default() -> Self {
Self {
worktree_path: std::path::PathBuf::new(),
resolved_version: None,
resolved_commit: String::new(),
resource_variants: dashmap::DashMap::new(),
}
}
}
#[derive(Clone)]
pub enum PreparedVersionState {
Preparing(std::sync::Arc<tokio::sync::Notify>),
Ready(PreparedSourceVersion),
}
pub struct WorktreeManager<'a> {
cache: &'a Cache,
source_manager: &'a SourceManager,
version_resolver: &'a VersionResolver,
}
impl<'a> WorktreeManager<'a> {
pub fn new(
cache: &'a Cache,
source_manager: &'a SourceManager,
version_resolver: &'a VersionResolver,
) -> Self {
Self {
cache,
source_manager,
version_resolver,
}
}
pub fn group_key(source: &str, version: &str) -> String {
format!("{source}::{version}")
}
pub async fn create_worktrees_for_resolved_versions(
&self,
) -> Result<HashMap<String, PreparedSourceVersion>> {
use crate::core::AgpmError;
use futures::future::join_all;
let resolved_full = self.version_resolver.get_all_resolved_full().clone();
let mut prepared_versions = HashMap::new();
let mut futures = Vec::new();
for ((source_name, version_key), resolved_version) in resolved_full {
let sha = resolved_version.sha;
let resolved_ref = resolved_version.resolved_ref;
let repo_key = Self::group_key(&source_name, &version_key);
let cache_clone = self.cache.clone();
let source_name_clone = source_name.clone();
let source_url_clone = self
.source_manager
.get_source_url(&source_name)
.ok_or_else(|| AgpmError::SourceNotFound {
name: source_name.to_string(),
})?
.to_string();
let sha_clone = sha.clone();
let resolved_ref_clone = resolved_ref.clone();
let future = async move {
let worktree_path = cache_clone
.get_or_create_worktree_for_sha(
&source_name_clone,
&source_url_clone,
&sha_clone,
Some(&source_name_clone), )
.await?;
Ok::<_, anyhow::Error>((
repo_key,
PreparedSourceVersion {
worktree_path,
resolved_version: Some(resolved_ref_clone),
resolved_commit: sha_clone,
resource_variants: dashmap::DashMap::new(),
},
))
};
futures.push(future);
}
let timeout_duration = crate::constants::batch_operation_timeout();
let results =
tokio::time::timeout(timeout_duration, join_all(futures)).await.with_context(|| {
format!(
"Worktree creation batch timed out after {:?} - possible deadlock",
timeout_duration
)
})?;
for result in results {
let (key, prepared) = result?;
prepared_versions.insert(key, prepared);
}
Ok(prepared_versions)
}
}
fn format_source_display(source: &str, url: &str) -> String {
let clean_url = if let Some(stripped) = url.strip_prefix("https://") {
stripped.trim_end_matches(".git")
} else if let Some(stripped) = url.strip_prefix("http://") {
stripped.trim_end_matches(".git")
} else if let Some(stripped) = url.strip_prefix("git@") {
return format!("{source} ({})", stripped.replace(':', "/").trim_end_matches(".git"));
} else {
url
};
format!("{source} ({clean_url})")
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_version_resolver_deduplication() {
let temp_dir = TempDir::new().unwrap();
let cache = Cache::with_dir(temp_dir.path().to_path_buf()).unwrap();
let resolver = VersionResolver::new(cache);
resolver.add_version(
"source1",
"https://example.com/repo.git",
Some("v1.0.0"),
ResolutionMode::Version,
);
resolver.add_version(
"source1",
"https://example.com/repo.git",
Some("v1.0.0"),
ResolutionMode::Version,
);
resolver.add_version(
"source1",
"https://example.com/repo.git",
Some("v1.0.0"),
ResolutionMode::Version,
);
assert_eq!(resolver.pending_count(), 1);
}
#[tokio::test]
async fn test_sha_optimization() {
let temp_dir = TempDir::new().unwrap();
let cache = Cache::with_dir(temp_dir.path().to_path_buf()).unwrap();
let _resolver = VersionResolver::new(cache);
let full_sha = "a".repeat(40);
assert_eq!(full_sha.len(), 40);
assert!(full_sha.chars().all(|c| c.is_ascii_hexdigit()));
}
#[tokio::test]
async fn test_resolved_retrieval() {
let temp_dir = TempDir::new().unwrap();
let cache = Cache::with_dir(temp_dir.path().to_path_buf()).unwrap();
let resolver = VersionResolver::new(cache);
let key = ("test_source".to_string(), "v1.0.0".to_string());
let sha = "1234567890abcdef1234567890abcdef12345678";
resolver.resolved.insert(
key,
ResolvedVersion {
sha: sha.to_string(),
resolved_ref: "v1.0.0".to_string(),
},
);
assert!(resolver.is_resolved("test_source", "v1.0.0"));
assert_eq!(resolver.get_resolved_sha("test_source", "v1.0.0"), Some(sha.to_string()));
assert!(!resolver.is_resolved("test_source", "v2.0.0"));
}
#[tokio::test]
async fn test_worktree_group_key() {
assert_eq!(WorktreeManager::group_key("source", "version"), "source::version");
assert_eq!(WorktreeManager::group_key("community", "v1.0.0"), "community::v1.0.0");
}
#[test]
fn test_format_source_display() {
assert_eq!(
format_source_display("community", "https://github.com/org/repo.git"),
"community (github.com/org/repo)"
);
assert_eq!(
format_source_display("other", "https://gitlab.com/org/repo"),
"other (gitlab.com/org/repo)"
);
assert_eq!(
format_source_display("test", "http://example.com/repo.git"),
"test (example.com/repo)"
);
assert_eq!(
format_source_display("ssh-source", "git@github.com:org/repo.git"),
"ssh-source (github.com/org/repo)"
);
assert_eq!(
format_source_display("local", "file:///path/to/repo"),
"local (file:///path/to/repo)"
);
assert_eq!(format_source_display("relative", "../some/path"), "relative (../some/path)");
}
}