use crate::environment::PythonStatus;
use crate::prefix::Prefix;
use crate::progress;
use crate::progress::ProgressBarMessageFormatter;
use futures::{stream, Stream, StreamExt, TryFutureExt, TryStreamExt};
use indexmap::IndexSet;
use indicatif::ProgressBar;
use itertools::Itertools;
use miette::{IntoDiagnostic, WrapErr};
use rip::resolve::solve_options::{ResolveOptions, SDistResolution};
use crate::consts::PROJECT_MANIFEST;
use crate::project::manifest::SystemRequirements;
use crate::pypi_marker_env::determine_marker_environment;
use crate::pypi_tags::{is_python_record, project_platform_tags};
use pep508_rs::MarkerEnvironment;
use rattler_conda_types::{Platform, RepoDataRecord};
use rattler_lock::{PypiPackageData, PypiPackageEnvironmentData};
use rip::artifacts::wheel::{InstallPaths, UnpackWheelOptions};
use rip::artifacts::Wheel;
use rip::index::PackageDb;
use rip::python_env::{
find_distributions_in_venv, uninstall_distribution, Distribution, PythonLocation, WheelTag,
WheelTags,
};
use rip::types::{
ArtifactHashes, ArtifactInfo, ArtifactName, Extra, HasArtifactName, NormalizedPackageName,
};
use rip::wheel_builder::WheelBuilder;
use std::collections::{HashMap, HashSet};
use std::ops::Deref;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinError;
pub(crate) const PIXI_PYPI_INSTALLER: &str = env!("CARGO_PKG_NAME");
type CombinedPypiPackageData = (PypiPackageData, PypiPackageEnvironmentData);
#[allow(clippy::too_many_arguments)]
pub async fn update_python_distributions(
package_db: Arc<PackageDb>,
prefix: &Prefix,
conda_package: &[RepoDataRecord],
python_packages: &[CombinedPypiPackageData],
platform: Platform,
status: &PythonStatus,
system_requirements: &SystemRequirements,
sdist_resolution: SDistResolution,
env_variables: HashMap<String, String>,
) -> miette::Result<()> {
let Some(python_info) = status.current_info() else {
return Ok(());
};
let python_location = prefix.root().join(&python_info.path);
let python_version = (
python_info.short_version.0 as u32,
python_info.short_version.1 as u32,
0,
);
let install_paths = InstallPaths::for_venv(python_version, platform.is_windows());
let current_python_packages = find_distributions_in_venv(prefix.root(), &install_paths)
.into_diagnostic()
.context(
"failed to locate python packages that have not been installed as conda packages",
)?;
let python_packages = python_packages.iter().collect_vec();
let (python_distributions_to_remove, python_distributions_to_install) =
determine_python_distributions_to_remove_and_install(
prefix.root(),
current_python_packages,
python_packages,
);
let python_record = conda_package
.iter()
.find(|r| is_python_record(r))
.ok_or_else(|| miette::miette!("could not resolve pypi dependencies because no python interpreter is added to the dependencies of the project.\nMake sure to add a python interpreter to the [dependencies] section of the {PROJECT_MANIFEST}, or run:\n\n\tpixi add python"))?;
let marker_environment = Arc::new(determine_marker_environment(
platform,
python_record.as_ref(),
)?);
let compatible_tags = Arc::new(project_platform_tags(
platform,
system_requirements,
python_record.as_ref(),
));
let resolve_options = Arc::new(ResolveOptions {
sdist_resolution,
python_location: PythonLocation::Custom(python_location),
..Default::default()
});
let (package_stream, package_stream_pb) = stream_python_artifacts(
package_db,
marker_environment,
compatible_tags,
resolve_options,
python_distributions_to_install.clone(),
env_variables,
);
if !python_distributions_to_remove.is_empty() {
let site_package_path = install_paths.site_packages();
for python_distribution in python_distributions_to_remove {
uninstall_pixi_installed_distribution(prefix, site_package_path, &python_distribution)?;
}
}
let package_install_pb = install_python_distributions(
prefix,
install_paths,
&prefix.root().join(python_info.path()),
package_stream,
)
.await?;
for pb in package_install_pb
.into_iter()
.chain(package_stream_pb.into_iter())
{
pb.finish_and_clear();
}
Ok(())
}
async fn install_python_distributions(
prefix: &Prefix,
install_paths: InstallPaths,
python_executable_path: &Path,
package_stream: impl Stream<Item = miette::Result<(Option<String>, HashSet<Extra>, Wheel)>> + Sized,
) -> miette::Result<Option<ProgressBar>> {
let len = {
let (lower_bound, upper_bound) = package_stream.size_hint();
upper_bound.unwrap_or(lower_bound)
};
if len == 0 {
return Ok(None);
}
let pb = progress::global_multi_progress().add(ProgressBar::new(len as u64));
pb.set_style(progress::default_progress_style());
pb.set_prefix("unpacking wheels");
pb.enable_steady_tick(Duration::from_millis(100));
let message_formatter = ProgressBarMessageFormatter::new(pb.clone());
let install_pb = pb.clone();
package_stream
.try_for_each_concurrent(Some(20), move |(hash, extras, wheel)| {
let install_paths = install_paths.clone();
let root = prefix.root().to_path_buf();
let message_formatter = message_formatter.clone();
let pb = install_pb.clone();
let python_executable_path = python_executable_path.to_owned();
async move {
let pb_task = message_formatter.start(wheel.name().to_string()).await;
let unpack_result = tokio::task::spawn_blocking(move || {
wheel
.unpack(
&root,
&install_paths,
&python_executable_path,
&UnpackWheelOptions {
installer: Some(PIXI_PYPI_INSTALLER.into()),
extras: Some(extras),
..Default::default()
},
)
.into_diagnostic()
.and_then(|unpacked_wheel| {
if let Some(hash) = hash {
std::fs::write(unpacked_wheel.dist_info.join("HASH"), hash)
.into_diagnostic()
} else {
Ok(())
}
})
})
.map_err(JoinError::try_into_panic)
.await;
pb_task.finish().await;
pb.inc(1);
match unpack_result {
Ok(unpack_result) => unpack_result,
Err(Ok(panic)) => std::panic::resume_unwind(panic),
Err(Err(e)) => Err(miette::miette!("{e}")),
}
}
})
.await?;
pb.set_style(progress::finished_progress_style());
pb.finish();
Ok(Some(pb))
}
fn stream_python_artifacts(
package_db: Arc<PackageDb>,
marker_environment: Arc<MarkerEnvironment>,
compatible_tags: Arc<WheelTags>,
resolve_options: Arc<ResolveOptions>,
packages_to_download: Vec<&CombinedPypiPackageData>,
env_variables: HashMap<String, String>,
) -> (
impl Stream<Item = miette::Result<(Option<String>, HashSet<Extra>, Wheel)>> + '_,
Option<ProgressBar>,
) {
if packages_to_download.is_empty() {
return (stream::empty().left_stream(), None);
}
let pb =
progress::global_multi_progress().add(ProgressBar::new(packages_to_download.len() as u64));
pb.set_style(progress::default_progress_style());
pb.set_prefix("acquiring wheels");
pb.enable_steady_tick(Duration::from_millis(100));
let message_formatter = ProgressBarMessageFormatter::new(pb.clone());
let stream_pb = pb.clone();
let total_packages = packages_to_download.len();
let download_stream = stream::iter(packages_to_download)
.map(move |(pkg_data, pkg_env_data)| {
let pb = stream_pb.clone();
let message_formatter = message_formatter.clone();
let marker_environment = marker_environment.clone();
let compatible_tags = compatible_tags.clone();
let resolve_options = resolve_options.clone();
let package_db = package_db.clone();
let env_variables = env_variables.clone();
async move {
let filename = pkg_data
.url
.path_segments()
.and_then(|s| s.last())
.expect("url is missing a path");
let name = NormalizedPackageName::from_str(&pkg_data.name)
.into_diagnostic()
.with_context(|| {
format!("'{}' is not a valid python package name", &pkg_data.name)
})?;
let artifact_name = ArtifactName::from_filename(filename, Some(pkg_data.url.clone()), &name)
.expect("failed to convert filename to artifact name");
let (artifact_name, is_direct_url) = if let ArtifactName::STree(mut stree) = artifact_name{
stree.version = pkg_data.version.clone();
(ArtifactName::STree(stree), true)
} else {
(artifact_name, false)
};
tracing::info!("downloading python package {filename}");
let pb_task = message_formatter.start(filename.to_string()).await;
let artifact_info = ArtifactInfo {
filename: artifact_name,
url: pkg_data.url.clone(),
hashes: pkg_data.hash.as_ref().map(|hash| ArtifactHashes {
sha256: hash.sha256().cloned(),
}),
requires_python: pkg_data.requires_python.clone(),
dist_info_metadata: Default::default(),
yanked: Default::default(),
is_direct_url,
};
let (wheel, _) = tokio::spawn({
let marker_environment = marker_environment.clone();
let compatible_tags = compatible_tags.clone();
let resolve_options = resolve_options.clone();
let package_db = package_db.clone();
async move {
let wheel_builder = WheelBuilder::new(
package_db.clone(),
marker_environment,
Some(compatible_tags),
resolve_options.deref().clone(),
env_variables,
)
.into_diagnostic()
.context("error in construction of WheelBuilder for `pypi-dependencies` installation")?;
package_db.get_wheel(&artifact_info, Some(&wheel_builder)).await
}
})
.await.unwrap_or_else(|e| match e.try_into_panic() {
Ok(panic) => std::panic::resume_unwind(panic),
Err(_) => Err(miette::miette!("operation was cancelled"))
})?;
pb_task.finish().await;
pb.inc(1);
if pb.position() == total_packages as u64 {
pb.set_style(progress::finished_progress_style());
pb.finish();
}
let hash = pkg_data
.hash
.as_ref()
.and_then(|h| h.sha256())
.map(|sha256| format!("sha256-{:x}", sha256));
Ok((
hash,
pkg_env_data
.extras
.iter()
.filter_map(|e| Extra::from_str(e).ok())
.collect(),
wheel,
))
}
})
.buffer_unordered(20)
.right_stream();
(download_stream, Some(pb))
}
pub fn remove_old_python_distributions(
prefix: &Prefix,
platform: Platform,
python_changed: &PythonStatus,
) -> miette::Result<()> {
let python_version = match python_changed {
PythonStatus::Removed { old } | PythonStatus::Changed { old, .. } => old,
PythonStatus::Added { .. } | PythonStatus::DoesNotExist | PythonStatus::Unchanged(_) => {
return Ok(())
}
};
let python_version = (
python_version.short_version.0 as u32,
python_version.short_version.1 as u32,
0,
);
let install_paths = InstallPaths::for_venv(python_version, platform.is_windows());
let current_python_packages = find_distributions_in_venv(prefix.root(), &install_paths)
.into_diagnostic()
.with_context(|| format!("failed to determine the python packages installed for a previous version of python ({}.{})", python_version.0, python_version.1))?
.into_iter().filter(|d| d.installer.as_deref() != Some("conda") && d.installer.is_some()).collect_vec();
let pb = progress::global_multi_progress()
.add(ProgressBar::new(current_python_packages.len() as u64));
pb.set_style(progress::default_progress_style());
pb.set_message("removing old python packages");
pb.enable_steady_tick(Duration::from_millis(100));
let site_package_path = install_paths.site_packages();
for python_package in current_python_packages {
pb.set_message(format!(
"{} {}",
&python_package.name, &python_package.version
));
uninstall_pixi_installed_distribution(prefix, site_package_path, &python_package)?;
pb.inc(1);
}
Ok(())
}
fn uninstall_pixi_installed_distribution(
prefix: &Prefix,
site_package_path: &Path,
python_package: &Distribution,
) -> miette::Result<()> {
tracing::info!(
"uninstalling python package {}-{}",
&python_package.name,
&python_package.version
);
let relative_dist_info = python_package
.dist_info
.strip_prefix(site_package_path)
.expect("the dist-info path must be a sub-path of the site-packages path");
let _ = std::fs::remove_file(prefix.root().join(&python_package.dist_info).join("HASH"));
uninstall_distribution(&prefix.root().join(site_package_path), relative_dist_info)
.into_diagnostic()
.with_context(|| format!("could not uninstall python package {}-{}. Manually remove the `.pixi/env` folder and try again.", &python_package.name, &python_package.version))?;
Ok(())
}
fn determine_python_distributions_to_remove_and_install<'p>(
prefix: &Path,
mut current_python_packages: Vec<Distribution>,
desired_python_packages: Vec<&'p CombinedPypiPackageData>,
) -> (Vec<Distribution>, Vec<&'p CombinedPypiPackageData>) {
let mut desired_python_packages = extract_locked_tags(desired_python_packages);
current_python_packages.retain(|current_python_packages| {
if current_python_packages.installer.is_none() {
return false;
}
if let Some(found_desired_packages_idx) =
desired_python_packages
.iter()
.position(|(pkg, artifact_name)| {
does_installed_match_locked_package(
prefix,
current_python_packages,
(pkg, artifact_name.as_ref()),
)
})
{
desired_python_packages.remove(found_desired_packages_idx);
false
} else {
current_python_packages.installer.as_deref() == Some(PIXI_PYPI_INSTALLER)
}
});
(
current_python_packages,
desired_python_packages
.into_iter()
.map(|(pkg, _)| pkg)
.collect(),
)
}
fn extract_locked_tags(
desired_python_packages: Vec<&CombinedPypiPackageData>,
) -> Vec<(&CombinedPypiPackageData, Option<IndexSet<WheelTag>>)> {
desired_python_packages
.into_iter()
.map(|pkg@(pkg_data, _pkg_env_data)| {
let Some(filename) = pkg_data.url.path_segments().and_then(|s| s.last()) else {
tracing::warn!(
"failed to determine the artifact name of the python package {}-{} from url {}: the url has no filename.",
&pkg_data.name, pkg_data.version, &pkg_data.url);
return (pkg, None);
};
let Ok(name) = NormalizedPackageName::from_str(&pkg_data.name) else {
tracing::warn!(
"failed to determine the artifact name of the python package {}-{} from url {}: {} is not a valid package name.",
&pkg_data.name, pkg_data.version, &pkg_data.url, &pkg_data.name);
return (pkg, None);
};
match ArtifactName::from_filename(filename, Some(pkg_data.url.clone()), &name) {
Ok(ArtifactName::Wheel(name)) => (pkg, Some(IndexSet::from_iter(name.all_tags_iter()))),
Ok(_) => (pkg, None),
Err(err) => {
tracing::warn!(
"failed to determine the artifact name of the python package {}-{}. Could not determine the name from the url {}: {err}",
&pkg_data.name, pkg_data.version, &pkg_data.url);
(pkg, None)
}
}
})
.collect()
}
fn does_installed_match_locked_package(
prefix_root: &Path,
installed_python_package: &Distribution,
locked_python_package: (&CombinedPypiPackageData, Option<&IndexSet<WheelTag>>),
) -> bool {
let ((pkg_data, _), artifact_tags) = locked_python_package;
if pkg_data.name != installed_python_package.name.as_str()
|| pkg_data.version != installed_python_package.version
{
return false;
}
if installed_python_package.installer.as_deref() == Some(PIXI_PYPI_INSTALLER) {
let expected_hash = pkg_data
.hash
.as_ref()
.and_then(|hash| hash.sha256())
.map(|sha256| format!("sha256-{:x}", sha256));
if let Some(expected_hash) = expected_hash {
let hash_path = prefix_root
.join(&installed_python_package.dist_info)
.join("HASH");
if let Ok(actual_hash) = std::fs::read_to_string(hash_path) {
return actual_hash == expected_hash;
}
}
}
match (artifact_tags, &installed_python_package.tags) {
(None, _) | (_, None) => {
false
}
(Some(locked_tags), Some(installed_tags)) => locked_tags == installed_tags,
}
}