use crate::project::grouped_environment::GroupedEnvironmentName;
use crate::{
config, consts,
environment::{
self, LockFileUsage, PerEnvironmentAndPlatform, PerGroup, PerGroupAndPlatform, PythonStatus,
},
load_lock_file,
lock_file::{
self, update, OutdatedEnvironments, PypiPackageIdentifier, PypiRecordsByName,
RepoDataRecordsByName,
},
prefix::Prefix,
progress::global_multi_progress,
project::{grouped_environment::GroupedEnvironment, Environment},
repodata::fetch_sparse_repodata_targets,
utils::BarrierCell,
EnvironmentName, Project,
};
use futures::{future::Either, stream::FuturesUnordered, FutureExt, StreamExt, TryFutureExt};
use indexmap::{IndexMap, IndexSet};
use indicatif::ProgressBar;
use itertools::Itertools;
use miette::{IntoDiagnostic, WrapErr};
use rattler::package_cache::PackageCache;
use rattler_conda_types::{Channel, MatchSpec, PackageName, Platform, RepoDataRecord};
use rattler_lock::{LockFile, PypiPackageData, PypiPackageEnvironmentData};
use rattler_repodata_gateway::sparse::SparseRepoData;
use rip::resolve::solve_options::SDistResolution;
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
convert::identity,
future::{ready, Future},
sync::Arc,
time::{Duration, Instant},
};
use tokio::sync::Semaphore;
use tracing::Instrument;
impl Project {
pub async fn up_to_date_lock_file(
&self,
options: UpdateLockFileOptions,
) -> miette::Result<LockFileDerivedData<'_>> {
update::ensure_up_to_date_lock_file(self, options).await
}
}
#[derive(Default)]
pub struct UpdateLockFileOptions {
pub lock_file_usage: LockFileUsage,
pub no_install: bool,
pub existing_repo_data: IndexMap<(Channel, Platform), SparseRepoData>,
pub max_concurrent_solves: Option<usize>,
}
pub struct LockFileDerivedData<'p> {
pub lock_file: LockFile,
pub package_cache: Arc<PackageCache>,
pub repo_data: IndexMap<(Channel, Platform), SparseRepoData>,
pub updated_conda_prefixes: HashMap<Environment<'p>, (Prefix, PythonStatus)>,
pub updated_pypi_prefixes: HashMap<Environment<'p>, Prefix>,
}
impl<'p> LockFileDerivedData<'p> {
pub async fn prefix(&mut self, environment: &Environment<'p>) -> miette::Result<Prefix> {
if let Some(prefix) = self.updated_pypi_prefixes.get(environment) {
return Ok(prefix.clone());
}
let platform = Platform::current();
let package_db = environment.project().pypi_package_db()?;
let (prefix, python_status) = self.conda_prefix(environment).await?;
let repodata_records = self
.repodata_records(environment, platform)
.unwrap_or_default();
let pypi_records = self.pypi_records(environment, platform).unwrap_or_default();
let env_variables = environment.project().get_env_variables(environment).await?;
environment::update_prefix_pypi(
environment.name(),
&prefix,
platform,
package_db,
&repodata_records,
&pypi_records,
&python_status,
&environment.system_requirements(),
SDistResolution::default(),
env_variables.clone(),
)
.await?;
self.updated_pypi_prefixes
.insert(environment.clone(), prefix.clone());
Ok(prefix)
}
fn pypi_records(
&self,
environment: &Environment<'p>,
platform: Platform,
) -> Option<Vec<(PypiPackageData, PypiPackageEnvironmentData)>> {
let locked_env = self
.lock_file
.environment(environment.name().as_str())
.expect("the lock-file should be up-to-date so it should also include the environment");
locked_env.pypi_packages_for_platform(platform)
}
fn repodata_records(
&self,
environment: &Environment<'p>,
platform: Platform,
) -> Option<Vec<RepoDataRecord>> {
let locked_env = self
.lock_file
.environment(environment.name().as_str())
.expect("the lock-file should be up-to-date so it should also include the environment");
locked_env.conda_repodata_records_for_platform(platform).expect("since the lock-file is up to date we should be able to extract the repodata records from it")
}
async fn conda_prefix(
&mut self,
environment: &Environment<'p>,
) -> miette::Result<(Prefix, PythonStatus)> {
if let Some((prefix, python_status)) = self.updated_conda_prefixes.get(environment) {
return Ok((prefix.clone(), python_status.clone()));
}
let prefix = Prefix::new(environment.dir());
let platform = Platform::current();
let installed_packages = prefix
.find_installed_packages(None)
.await
.with_context(|| {
format!(
"failed to determine the currently installed packages for '{}'",
environment.name(),
)
})?;
let records = self
.repodata_records(environment, platform)
.unwrap_or_default();
let python_status = environment::update_prefix_conda(
GroupedEnvironmentName::Environment(environment.name().clone()),
&prefix,
self.package_cache.clone(),
environment.project().authenticated_client().clone(),
installed_packages,
&records,
platform,
)
.await?;
self.updated_conda_prefixes
.insert(environment.clone(), (prefix.clone(), python_status.clone()));
Ok((prefix, python_status))
}
}
#[derive(Default)]
struct UpdateContext<'p> {
repo_data: Arc<IndexMap<(Channel, Platform), SparseRepoData>>,
locked_repodata_records: PerEnvironmentAndPlatform<'p, Arc<RepoDataRecordsByName>>,
locked_grouped_repodata_records: PerGroupAndPlatform<'p, Arc<RepoDataRecordsByName>>,
locked_pypi_records: PerEnvironmentAndPlatform<'p, Arc<PypiRecordsByName>>,
solved_repodata_records:
PerEnvironmentAndPlatform<'p, Arc<BarrierCell<Arc<RepoDataRecordsByName>>>>,
grouped_solved_repodata_records:
PerGroupAndPlatform<'p, Arc<BarrierCell<Arc<RepoDataRecordsByName>>>>,
instantiated_conda_prefixes: PerGroup<'p, Arc<BarrierCell<(Prefix, PythonStatus)>>>,
solved_pypi_records: PerEnvironmentAndPlatform<'p, Arc<BarrierCell<Arc<PypiRecordsByName>>>>,
grouped_solved_pypi_records: PerGroupAndPlatform<'p, Arc<BarrierCell<Arc<PypiRecordsByName>>>>,
}
impl<'p> UpdateContext<'p> {
pub fn get_latest_repodata_records(
&self,
environment: &Environment<'p>,
platform: Platform,
) -> Option<impl Future<Output = Arc<RepoDataRecordsByName>>> {
self.solved_repodata_records
.get(environment)
.and_then(|records| records.get(&platform))
.map(|records| {
let records = records.clone();
Either::Left(async move { records.wait().await.clone() })
})
.or_else(|| {
self.locked_repodata_records
.get(environment)
.and_then(|records| records.get(&platform))
.cloned()
.map(ready)
.map(Either::Right)
})
}
pub fn get_latest_group_repodata_records(
&self,
group: &GroupedEnvironment<'p>,
platform: Platform,
) -> Option<impl Future<Output = Arc<RepoDataRecordsByName>>> {
if let Some(pending_records) = self
.grouped_solved_repodata_records
.get(group)
.and_then(|records| records.get(&platform))
.cloned()
{
return Some((async move { pending_records.wait().await.clone() }).left_future());
}
let locked_records = self
.locked_grouped_repodata_records
.get(group)
.and_then(|records| records.get(&platform))?
.clone();
Some(ready(locked_records).right_future())
}
pub fn take_latest_repodata_records(
&mut self,
environment: &Environment<'p>,
platform: Platform,
) -> Option<RepoDataRecordsByName> {
self.solved_repodata_records
.get_mut(environment)
.and_then(|records| records.remove(&platform))
.map(|cell| {
Arc::into_inner(cell)
.expect("records must not be shared")
.into_inner()
.expect("records must be available")
})
.or_else(|| {
self.locked_repodata_records
.get_mut(environment)
.and_then(|records| records.remove(&platform))
})
.map(|records| Arc::try_unwrap(records).unwrap_or_else(|arc| (*arc).clone()))
}
pub fn take_latest_pypi_records(
&mut self,
environment: &Environment<'p>,
platform: Platform,
) -> Option<PypiRecordsByName> {
self.solved_pypi_records
.get_mut(environment)
.and_then(|records| records.remove(&platform))
.map(|cell| {
Arc::into_inner(cell)
.expect("records must not be shared")
.into_inner()
.expect("records must be available")
})
.or_else(|| {
self.locked_pypi_records
.get_mut(environment)
.and_then(|records| records.remove(&platform))
})
.map(|records| Arc::try_unwrap(records).unwrap_or_else(|arc| (*arc).clone()))
}
pub fn take_instantiated_conda_prefixes(
&mut self,
) -> HashMap<Environment<'p>, (Prefix, PythonStatus)> {
self.instantiated_conda_prefixes
.drain()
.filter_map(|(env, cell)| match env {
GroupedEnvironment::Environment(env) => {
let prefix = Arc::into_inner(cell)
.expect("prefixes must not be shared")
.into_inner()
.expect("prefix must be available");
Some((env, prefix))
}
_ => None,
})
.collect()
}
pub fn get_conda_prefix(
&self,
environment: &GroupedEnvironment<'p>,
) -> Option<impl Future<Output = (Prefix, PythonStatus)>> {
let cell = self.instantiated_conda_prefixes.get(environment)?.clone();
Some(async move { cell.wait().await.clone() })
}
}
fn default_max_concurrent_solves() -> usize {
let available_parallelism = std::thread::available_parallelism().map_or(1, |n| n.get());
(available_parallelism.saturating_sub(2)).min(4).max(1)
}
pub async fn ensure_up_to_date_lock_file(
project: &Project,
options: UpdateLockFileOptions,
) -> miette::Result<LockFileDerivedData<'_>> {
let lock_file = load_lock_file(project).await?;
let current_platform = Platform::current();
let package_cache = Arc::new(PackageCache::new(config::get_cache_dir()?.join("pkgs")));
let max_concurrent_solves = options
.max_concurrent_solves
.unwrap_or_else(default_max_concurrent_solves);
let solve_semaphore = Arc::new(Semaphore::new(max_concurrent_solves));
if !options.lock_file_usage.should_check_if_out_of_date() {
tracing::info!("skipping check if lock-file is up-to-date");
return Ok(LockFileDerivedData {
lock_file,
package_cache,
repo_data: options.existing_repo_data,
updated_conda_prefixes: Default::default(),
updated_pypi_prefixes: Default::default(),
});
}
let outdated = OutdatedEnvironments::from_project_and_lock_file(project, &lock_file);
if outdated.is_empty() {
tracing::info!("the lock-file is up-to-date");
return Ok(LockFileDerivedData {
lock_file,
package_cache,
repo_data: options.existing_repo_data,
updated_conda_prefixes: Default::default(),
updated_pypi_prefixes: Default::default(),
});
}
if !options.lock_file_usage.allows_lock_file_updates() {
miette::bail!("lock-file not up-to-date with the project");
}
let mut fetch_targets = IndexSet::new();
for (environment, platforms) in outdated.conda.iter() {
for channel in environment.channels() {
for platform in platforms {
fetch_targets.insert((channel.clone(), *platform));
}
fetch_targets.insert((channel.clone(), Platform::NoArch));
}
}
let mut repo_data = fetch_sparse_repodata_targets(
fetch_targets
.into_iter()
.filter(|target| !options.existing_repo_data.contains_key(target)),
project.authenticated_client(),
)
.await?;
repo_data.extend(options.existing_repo_data);
let locked_repodata_records = project
.environments()
.into_iter()
.flat_map(|env| {
lock_file
.environment(env.name().as_str())
.into_iter()
.map(move |locked_env| {
locked_env.conda_repodata_records().map(|records| {
(
env.clone(),
records
.into_iter()
.map(|(platform, records)| {
(
platform,
Arc::new(RepoDataRecordsByName::from_iter(records)),
)
})
.collect(),
)
})
})
})
.collect::<Result<HashMap<_, HashMap<_, _>>, _>>()
.into_diagnostic()?;
let locked_pypi_records = project
.environments()
.into_iter()
.flat_map(|env| {
lock_file
.environment(env.name().as_str())
.into_iter()
.map(move |locked_env| {
(
env.clone(),
locked_env
.pypi_packages()
.into_iter()
.map(|(platform, records)| {
(platform, Arc::new(PypiRecordsByName::from_iter(records)))
})
.collect(),
)
})
})
.collect::<HashMap<_, HashMap<_, _>>>();
let all_grouped_environments = project
.environments()
.into_iter()
.map(GroupedEnvironment::from)
.unique()
.collect_vec();
let locked_grouped_repodata_records = all_grouped_environments
.iter()
.filter_map(|group| {
let records = match group {
GroupedEnvironment::Environment(env) => locked_repodata_records.get(env)?.clone(),
GroupedEnvironment::Group(group) => {
let mut by_platform = HashMap::new();
for env in group.environments() {
let Some(records) = locked_repodata_records.get(&env) else {
continue;
};
for (platform, records) in records.iter() {
by_platform
.entry(*platform)
.or_insert_with(Vec::new)
.extend(records.records.iter().cloned());
}
}
by_platform
.into_iter()
.map(|(platform, records)| {
(
platform,
Arc::new(RepoDataRecordsByName::from_iter(records)),
)
})
.collect()
}
};
Some((group.clone(), records))
})
.collect();
let mut context = UpdateContext {
repo_data: Arc::new(repo_data),
locked_repodata_records,
locked_grouped_repodata_records,
locked_pypi_records,
solved_repodata_records: HashMap::new(),
instantiated_conda_prefixes: HashMap::new(),
solved_pypi_records: HashMap::new(),
grouped_solved_repodata_records: HashMap::new(),
grouped_solved_pypi_records: HashMap::new(),
};
let mut pending_futures = FuturesUnordered::new();
for (environment, platforms) in outdated.conda {
let mut ordered_platforms = platforms.into_iter().collect::<IndexSet<_>>();
if let Some(current_platform_index) = ordered_platforms.get_index_of(¤t_platform) {
ordered_platforms.move_index(current_platform_index, 0);
}
let source = GroupedEnvironment::from(environment.clone());
for platform in ordered_platforms {
let group_solve_records = if let Some(cell) = context
.grouped_solved_repodata_records
.get(&source)
.and_then(|platforms| platforms.get(&platform))
{
cell.clone()
} else {
let locked_group_records = context
.locked_grouped_repodata_records
.get(&source)
.and_then(|records| records.get(&platform))
.cloned()
.unwrap_or_default();
let group_solve_task = spawn_solve_conda_environment_task(
source.clone(),
locked_group_records,
context.repo_data.clone(),
platform,
solve_semaphore.clone(),
)
.boxed_local();
pending_futures.push(group_solve_task);
let cell = Arc::new(BarrierCell::new());
let previous_cell = context
.grouped_solved_repodata_records
.entry(source.clone())
.or_default()
.insert(platform, cell.clone());
assert!(
previous_cell.is_none(),
"a cell has already been added to update conda records"
);
cell
};
let records_future =
spawn_extract_conda_environment_task(environment.clone(), platform, async move {
group_solve_records.wait().await.clone()
})
.boxed_local();
pending_futures.push(records_future);
let previous_cell = context
.solved_repodata_records
.entry(environment.clone())
.or_default()
.insert(platform, Arc::default());
assert!(
previous_cell.is_none(),
"a cell has already been added to update conda records"
);
}
}
for (environment, platforms) in outdated.pypi.iter() {
if !platforms
.iter()
.any(|p| !environment.pypi_dependencies(Some(*p)).is_empty())
{
continue;
}
if options.no_install {
miette::bail!("Cannot update pypi dependencies without first installing a conda prefix that includes python.");
}
let group = GroupedEnvironment::from(environment.clone());
if context.instantiated_conda_prefixes.contains_key(&group) {
continue;
}
let records_future = context
.get_latest_group_repodata_records(&group, current_platform)
.expect("conda records should be available now or in the future");
let environment_name = environment.name().clone();
let pypi_env_task =
spawn_create_prefix_task(group.clone(), package_cache.clone(), records_future)
.map_err(move |e| {
e.context(format!(
"failed to instantiate a prefix for '{}'",
environment_name
))
})
.boxed_local();
pending_futures.push(pypi_env_task);
let previous_cell = context
.instantiated_conda_prefixes
.insert(group, Arc::new(BarrierCell::new()));
assert!(
previous_cell.is_none(),
"cannot update the same group twice"
)
}
for (environment, platform) in outdated
.pypi
.into_iter()
.flat_map(|(env, platforms)| platforms.into_iter().map(move |p| (env.clone(), p)))
{
let dependencies = environment.pypi_dependencies(Some(platform));
if dependencies.is_empty() {
pending_futures.push(
ready(Ok(TaskResult::PypiSolved(
environment.name().clone(),
platform,
Arc::default(),
)))
.boxed_local(),
);
} else {
let group = GroupedEnvironment::from(environment.clone());
let grouped_pypi_records = if let Some(cell) = context
.grouped_solved_pypi_records
.get(&group)
.and_then(|records| records.get(&platform))
{
cell.clone()
} else {
let repodata_future = context
.get_latest_group_repodata_records(&group, platform)
.expect("conda records should be available now or in the future");
let prefix_future = context
.get_conda_prefix(&group)
.expect("prefix should be available now or in the future");
let env_variables = project.get_env_variables(&environment).await?;
let pypi_solve_future = spawn_solve_pypi_task(
group.clone(),
platform,
repodata_future,
prefix_future,
SDistResolution::default(),
env_variables,
);
pending_futures.push(pypi_solve_future.boxed_local());
let cell = Arc::new(BarrierCell::new());
let previous_cell = context
.grouped_solved_pypi_records
.entry(group)
.or_default()
.insert(platform, cell.clone());
assert!(
previous_cell.is_none(),
"a cell has already been added to update pypi records"
);
cell
};
let pypi_records_future = async move { grouped_pypi_records.wait().await.clone() };
let conda_records_future = context
.get_latest_repodata_records(&environment, platform)
.expect("must have conda records available");
let records_future = spawn_extract_pypi_environment_task(
environment.clone(),
platform,
conda_records_future,
pypi_records_future,
)
.boxed_local();
pending_futures.push(records_future);
}
let previous_cell = context
.solved_pypi_records
.entry(environment)
.or_default()
.insert(platform, Arc::default());
assert!(
previous_cell.is_none(),
"a cell has already been added to extract pypi records"
);
}
let top_level_progress =
global_multi_progress().add(ProgressBar::new(pending_futures.len() as u64));
top_level_progress.set_style(indicatif::ProgressStyle::default_bar()
.template("{spinner:.cyan} {prefix:20!} [{elapsed_precise}] [{bar:40!.bright.yellow/dim.white}] {pos:>4}/{len:4} {wide_msg:.dim}").unwrap()
.progress_chars("━━╾─"));
top_level_progress.enable_steady_tick(Duration::from_millis(50));
top_level_progress.set_prefix("updating lock-file");
while let Some(result) = pending_futures.next().await {
top_level_progress.inc(1);
match result? {
TaskResult::CondaGroupSolved(group_name, platform, records, duration) => {
let group = GroupedEnvironment::from_name(project, &group_name)
.expect("group should exist");
context
.grouped_solved_repodata_records
.get_mut(&group)
.expect("the entry for this environment should exist")
.get_mut(&platform)
.expect("the entry for this platform should exist")
.set(Arc::new(records))
.expect("records should not be solved twice");
match group_name {
GroupedEnvironmentName::Group(_) => {
tracing::info!(
"resolved conda environment for solve group '{}' '{}' in {}",
group_name.fancy_display(),
consts::PLATFORM_STYLE.apply_to(platform),
humantime::format_duration(duration)
);
}
GroupedEnvironmentName::Environment(env_name) => {
tracing::info!(
"resolved conda environment for environment '{}' '{}' in {}",
env_name.fancy_display(),
consts::PLATFORM_STYLE.apply_to(platform),
humantime::format_duration(duration)
);
}
}
}
TaskResult::CondaSolved(environment, platform, records) => {
let environment = project
.environment(&environment)
.expect("environment should exist");
context
.solved_repodata_records
.get_mut(&environment)
.expect("the entry for this environment should exist")
.get_mut(&platform)
.expect("the entry for this platform should exist")
.set(records)
.expect("records should not be solved twice");
let group = GroupedEnvironment::from(environment.clone());
if matches!(group, GroupedEnvironment::Group(_)) {
tracing::info!(
"extracted conda packages for '{}' '{}' from the '{}' group",
environment.name().fancy_display(),
consts::PLATFORM_STYLE.apply_to(platform),
group.name().fancy_display(),
);
}
}
TaskResult::CondaPrefixUpdated(group_name, prefix, python_status, duration) => {
let group = GroupedEnvironment::from_name(project, &group_name)
.expect("grouped environment should exist");
context
.instantiated_conda_prefixes
.get_mut(&group)
.expect("the entry for this environment should exists")
.set((prefix, *python_status))
.expect("prefix should not be instantiated twice");
tracing::info!(
"updated conda packages in the '{}' prefix in {}",
group.name().fancy_display(),
humantime::format_duration(duration)
);
}
TaskResult::PypiGroupSolved(group_name, platform, records, duration) => {
let group = GroupedEnvironment::from_name(project, &group_name)
.expect("group should exist");
context
.grouped_solved_pypi_records
.get_mut(&group)
.expect("the entry for this environment should exist")
.get_mut(&platform)
.expect("the entry for this platform should exist")
.set(Arc::new(records))
.expect("records should not be solved twice");
match group_name {
GroupedEnvironmentName::Group(_) => {
tracing::info!(
"resolved pypi packages for solve group '{}' '{}' in {}",
group_name.fancy_display(),
consts::PLATFORM_STYLE.apply_to(platform),
humantime::format_duration(duration),
);
}
GroupedEnvironmentName::Environment(env_name) => {
tracing::info!(
"resolved pypi packages for environment '{}' '{}' in {}",
env_name.fancy_display(),
consts::PLATFORM_STYLE.apply_to(platform),
humantime::format_duration(duration),
);
}
}
}
TaskResult::PypiSolved(environment, platform, records) => {
let environment = project
.environment(&environment)
.expect("environment should exist");
context
.solved_pypi_records
.get_mut(&environment)
.expect("the entry for this environment should exist")
.get_mut(&platform)
.expect("the entry for this platform should exist")
.set(records)
.expect("records should not be solved twice");
let group = GroupedEnvironment::from(environment.clone());
if matches!(group, GroupedEnvironment::Group(_)) {
tracing::info!(
"extracted pypi packages for '{}' '{}' from the '{}' group",
environment.name().fancy_display(),
consts::PLATFORM_STYLE.apply_to(platform),
group.name().fancy_display(),
);
}
}
}
}
let mut builder = LockFile::builder();
for environment in project.environments() {
builder.set_channels(
environment.name().as_str(),
environment
.channels()
.into_iter()
.map(|channel| rattler_lock::Channel::from(channel.base_url().to_string())),
);
for platform in environment.platforms() {
if let Some(records) = context.take_latest_repodata_records(&environment, platform) {
for record in records.into_inner() {
builder.add_conda_package(environment.name().as_str(), platform, record.into());
}
}
if let Some(records) = context.take_latest_pypi_records(&environment, platform) {
for (pkg_data, pkg_env_data) in records.into_inner() {
builder.add_pypi_package(
environment.name().as_str(),
platform,
pkg_data,
pkg_env_data,
);
}
}
}
}
let lock_file = builder.finish();
lock_file
.to_path(&project.lock_file_path())
.into_diagnostic()
.context("failed to write lock-file to disk")?;
top_level_progress.finish_and_clear();
Ok(LockFileDerivedData {
lock_file,
package_cache,
updated_conda_prefixes: context.take_instantiated_conda_prefixes(),
updated_pypi_prefixes: HashMap::default(),
repo_data: Arc::into_inner(context.repo_data)
.expect("repo data should not be shared anymore"),
})
}
enum TaskResult {
CondaGroupSolved(
GroupedEnvironmentName,
Platform,
RepoDataRecordsByName,
Duration,
),
CondaSolved(EnvironmentName, Platform, Arc<RepoDataRecordsByName>),
CondaPrefixUpdated(GroupedEnvironmentName, Prefix, Box<PythonStatus>, Duration),
PypiGroupSolved(
GroupedEnvironmentName,
Platform,
PypiRecordsByName,
Duration,
),
PypiSolved(EnvironmentName, Platform, Arc<PypiRecordsByName>),
}
async fn spawn_solve_conda_environment_task(
group: GroupedEnvironment<'_>,
existing_repodata_records: Arc<RepoDataRecordsByName>,
sparse_repo_data: Arc<IndexMap<(Channel, Platform), SparseRepoData>>,
platform: Platform,
concurrency_semaphore: Arc<Semaphore>,
) -> miette::Result<TaskResult> {
let dependencies = group.dependencies(None, Some(platform));
let virtual_packages = group.virtual_packages(platform);
let group_name = group.name();
let channels = group.channels().into_iter().cloned().collect_vec();
let sparse_repo_data = sparse_repo_data.clone();
let has_pypi_dependencies = group.has_pypi_dependencies();
tokio::spawn(
async move {
let _permit = concurrency_semaphore
.acquire()
.await
.expect("the semaphore is never closed");
let pb = SolveProgressBar::new(
global_multi_progress().add(ProgressBar::hidden()),
platform,
group_name.clone(),
);
pb.start();
let start = Instant::now();
let match_specs = dependencies
.iter_specs()
.map(|(name, constraint)| {
MatchSpec::from_nameless(constraint.clone(), Some(name.clone()))
})
.collect_vec();
let package_names = dependencies.names().cloned().collect_vec();
pb.set_message("loading repodata");
let available_packages = load_sparse_repo_data_async(
package_names.clone(),
sparse_repo_data,
channels,
platform,
)
.await?;
pb.set_message("resolving conda");
let mut records = lock_file::resolve_conda(
match_specs,
virtual_packages,
existing_repodata_records.records.clone(),
available_packages,
)
.await
.with_context(|| {
format!(
"failed to solve the conda requirements of '{}' '{}'",
group_name.fancy_display(),
consts::PLATFORM_STYLE.apply_to(platform)
)
})?;
if has_pypi_dependencies {
lock_file::pypi::amend_pypi_purls(&mut records).await?;
}
let records_by_name = RepoDataRecordsByName::from(records);
let end = Instant::now();
pb.finish();
Ok(TaskResult::CondaGroupSolved(
group_name,
platform,
records_by_name,
end - start,
))
}
.instrument(tracing::info_span!(
"resolve_conda",
group = %group.name().as_str(),
platform = %platform
)),
)
.await
.unwrap_or_else(|e| match e.try_into_panic() {
Ok(panic) => std::panic::resume_unwind(panic),
Err(_err) => Err(miette::miette!("the operation was cancelled")),
})
}
async fn spawn_extract_conda_environment_task(
environment: Environment<'_>,
platform: Platform,
solve_group_records: impl Future<Output = Arc<RepoDataRecordsByName>>,
) -> miette::Result<TaskResult> {
let group = GroupedEnvironment::from(environment.clone());
let group_records = solve_group_records.await;
let records = match group {
GroupedEnvironment::Environment(_) => {
group_records.clone()
}
GroupedEnvironment::Group(_) => {
let virtual_package_names = group
.virtual_packages(platform)
.into_iter()
.map(|vp| vp.name)
.collect::<HashSet<_>>();
let environment_dependencies = environment.dependencies(None, Some(platform));
Arc::new(group_records.subset(
environment_dependencies.into_iter().map(|(name, _)| name),
&virtual_package_names,
))
}
};
Ok(TaskResult::CondaSolved(
environment.name().clone(),
platform,
records,
))
}
async fn spawn_extract_pypi_environment_task(
environment: Environment<'_>,
platform: Platform,
conda_records: impl Future<Output = Arc<RepoDataRecordsByName>>,
solve_group_records: impl Future<Output = Arc<PypiRecordsByName>>,
) -> miette::Result<TaskResult> {
let group = GroupedEnvironment::from(environment.clone());
let dependencies = environment.pypi_dependencies(Some(platform));
let records = match group {
GroupedEnvironment::Environment(_) => {
solve_group_records.await.clone()
}
GroupedEnvironment::Group(_) => {
let conda_package_identifiers = conda_records
.await
.records
.iter()
.filter_map(|record| PypiPackageIdentifier::from_record(record).ok())
.flatten()
.map(|identifier| (identifier.name.clone().into(), identifier))
.collect::<HashMap<_, _>>();
Arc::new(
solve_group_records
.await
.subset(dependencies.into_keys(), &conda_package_identifiers),
)
}
};
Ok(TaskResult::PypiSolved(
environment.name().clone(),
platform,
records,
))
}
async fn spawn_solve_pypi_task(
environment: GroupedEnvironment<'_>,
platform: Platform,
repodata_records: impl Future<Output = Arc<RepoDataRecordsByName>>,
prefix: impl Future<Output = (Prefix, PythonStatus)>,
sdist_resolution: SDistResolution,
env_variables: &HashMap<String, String>,
) -> miette::Result<TaskResult> {
let dependencies = environment.pypi_dependencies(Some(platform));
if dependencies.is_empty() {
return Ok(TaskResult::PypiGroupSolved(
environment.name().clone(),
platform,
PypiRecordsByName::default(),
Duration::from_millis(0),
));
}
let system_requirements = environment.system_requirements();
let package_db = environment.project().pypi_package_db()?;
let (repodata_records, (prefix, python_status)) = tokio::join!(repodata_records, prefix);
let environment_name = environment.name().clone();
let envs = env_variables.clone();
let (pypi_packages, duration) = tokio::spawn(
async move {
let pb = SolveProgressBar::new(
global_multi_progress().add(ProgressBar::hidden()),
platform,
environment_name,
);
pb.start();
let start = Instant::now();
let records = lock_file::resolve_pypi(
package_db,
dependencies,
system_requirements,
&repodata_records.records,
&[],
platform,
&pb.pb,
python_status
.location()
.map(|path| prefix.root().join(path))
.as_deref(),
sdist_resolution,
envs,
)
.await?;
let end = Instant::now();
pb.finish();
Ok((PypiRecordsByName::from_iter(records), end - start))
}
.instrument(tracing::info_span!(
"resolve_pypi",
group = %environment.name().as_str(),
platform = %platform
)),
)
.await
.unwrap_or_else(|e| match e.try_into_panic() {
Ok(panic) => std::panic::resume_unwind(panic),
Err(_err) => Err(miette::miette!("the operation was cancelled")),
})?;
Ok(TaskResult::PypiGroupSolved(
environment.name().clone(),
platform,
pypi_packages,
duration,
))
}
async fn spawn_create_prefix_task(
group: GroupedEnvironment<'_>,
package_cache: Arc<PackageCache>,
conda_records: impl Future<Output = Arc<RepoDataRecordsByName>>,
) -> miette::Result<TaskResult> {
let group_name = group.name().clone();
let prefix = group.prefix();
let client = group.project().authenticated_client().clone();
let installed_packages_future = tokio::spawn({
let prefix = prefix.clone();
async move { prefix.find_installed_packages(None).await }
})
.unwrap_or_else(|e| match e.try_into_panic() {
Ok(panic) => std::panic::resume_unwind(panic),
Err(_err) => Err(miette::miette!("the operation was cancelled")),
});
let (conda_records, installed_packages) =
tokio::try_join!(conda_records.map(Ok), installed_packages_future)?;
let (python_status, duration) = tokio::spawn({
let prefix = prefix.clone();
let group_name = group_name.clone();
async move {
let start = Instant::now();
let python_status = environment::update_prefix_conda(
group_name,
&prefix,
package_cache,
client,
installed_packages,
&conda_records.records,
Platform::current(),
)
.await?;
let end = Instant::now();
Ok((python_status, end - start))
}
})
.await
.unwrap_or_else(|e| match e.try_into_panic() {
Ok(panic) => std::panic::resume_unwind(panic),
Err(_err) => Err(miette::miette!("the operation was cancelled")),
})?;
Ok(TaskResult::CondaPrefixUpdated(
group_name,
prefix,
Box::new(python_status),
duration,
))
}
pub async fn load_sparse_repo_data_async(
package_names: Vec<PackageName>,
sparse_repo_data: Arc<IndexMap<(Channel, Platform), SparseRepoData>>,
channels: Vec<Channel>,
platform: Platform,
) -> miette::Result<Vec<Vec<RepoDataRecord>>> {
tokio::task::spawn_blocking(move || {
let sparse = channels
.into_iter()
.cartesian_product(vec![platform, Platform::NoArch])
.filter_map(|target| sparse_repo_data.get(&target));
SparseRepoData::load_records_recursive(sparse, package_names, None).into_diagnostic()
})
.await
.map_err(|e| match e.try_into_panic() {
Ok(panic) => std::panic::resume_unwind(panic),
Err(_err) => miette::miette!("the operation was cancelled"),
})
.map_or_else(Err, identity)
.with_context(|| {
format!(
"failed to load repodata records for platform '{}'",
platform.as_str()
)
})
}
#[derive(Clone)]
pub(crate) struct SolveProgressBar {
pb: ProgressBar,
platform: Platform,
environment_name: GroupedEnvironmentName,
}
impl SolveProgressBar {
pub fn new(
pb: ProgressBar,
platform: Platform,
environment_name: GroupedEnvironmentName,
) -> Self {
pb.set_style(
indicatif::ProgressStyle::with_template(&format!(
" ({:>12}) {:<9} ..",
environment_name.fancy_display(),
consts::PLATFORM_STYLE.apply_to(platform),
))
.unwrap(),
);
pb.enable_steady_tick(Duration::from_millis(100));
Self {
pb,
platform,
environment_name,
}
}
pub fn start(&self) {
self.pb.reset_elapsed();
self.pb.set_style(
indicatif::ProgressStyle::with_template(&format!(
" {{spinner:.dim}} {:>12}: {:<9} [{{elapsed_precise}}] {{msg:.dim}}",
self.environment_name.fancy_display(),
consts::PLATFORM_STYLE.apply_to(self.platform),
))
.unwrap(),
);
}
pub fn set_message(&self, msg: impl Into<Cow<'static, str>>) {
self.pb.set_message(msg);
}
pub fn finish(&self) {
self.pb.set_style(
indicatif::ProgressStyle::with_template(&format!(
" {} ({:>12}) {:<9} [{{elapsed_precise}}]",
console::style(console::Emoji("✔", "↳")).green(),
self.environment_name.fancy_display(),
consts::PLATFORM_STYLE.apply_to(self.platform),
))
.unwrap(),
);
self.pb.finish_and_clear();
}
}