use super::package_identifier;
use crate::{
project::Environment, pypi_marker_env::determine_marker_environment,
pypi_tags::is_python_record,
};
use itertools::Itertools;
use miette::Diagnostic;
use pep440_rs::VersionSpecifiers;
use pep508_rs::Requirement;
use rattler_conda_types::{MatchSpec, ParseMatchSpecError, Platform};
use rattler_lock::{CondaPackage, Package, PypiPackage};
use rip::types::NormalizedPackageName;
use std::{
collections::{HashMap, HashSet},
str::FromStr,
};
use thiserror::Error;
#[derive(Debug, Error, Diagnostic)]
pub enum EnvironmentUnsat {
#[error("the channels in the lock-file do not match the environments channels")]
ChannelsMismatch,
}
#[derive(Debug, Error, Diagnostic)]
pub enum PlatformUnsat {
#[error("the requirement '{0}' could not be satisfied (required by '{1}')")]
UnsatisfiableMatchSpec(MatchSpec, String),
#[error("the requirement '{0}' could not be satisfied (required by '{1}')")]
UnsatisfiableRequirement(Requirement, String),
#[error("there was a duplicate entry for '{0}'")]
DuplicateEntry(String),
#[error("the requirement '{0}' failed to parse")]
FailedToParseMatchSpec(String, #[source] ParseMatchSpecError),
#[error("there are more conda packages in the lock-file than are used by the environment")]
TooManyCondaPackages,
#[error("there are more pypi packages in the lock-file than are used by the environment")]
TooManyPypiPackages,
#[error("there are PyPi dependencies but a python interpreter is missing from the lock-file")]
MissingPythonInterpreter,
#[error(
"a marker environment could not be derived from the python interpreter in the lock-file"
)]
FailedToDetermineMarkerEnvironment(#[source] Box<dyn Diagnostic + Send + Sync>),
#[error("{0} requires python version {1} but the python interpreter in the lock-file has version {2}")]
PythonVersionMismatch(String, VersionSpecifiers, Box<pep440_rs::Version>),
}
pub fn verify_environment_satisfiability(
environment: &Environment<'_>,
locked_environment: &rattler_lock::Environment,
) -> Result<(), EnvironmentUnsat> {
let channels = environment
.channels()
.into_iter()
.map(|channel| rattler_lock::Channel::from(channel.base_url().to_string()))
.collect_vec();
if !locked_environment.channels().eq(&channels) {
return Err(EnvironmentUnsat::ChannelsMismatch);
}
Ok(())
}
pub fn verify_platform_satisfiability(
environment: &Environment<'_>,
locked_environment: &rattler_lock::Environment,
platform: Platform,
) -> Result<(), PlatformUnsat> {
let conda_packages = locked_environment
.packages(platform)
.into_iter()
.flatten()
.filter_map(Package::into_conda)
.collect_vec();
verify_conda_platform_satisfiability(environment, &conda_packages, platform)?;
let pypi_packages = locked_environment
.packages(platform)
.into_iter()
.flatten()
.filter_map(Package::into_pypi)
.collect_vec();
verify_pypi_platform_satisfiability(environment, &conda_packages, &pypi_packages, platform)?;
Ok(())
}
pub fn verify_conda_platform_satisfiability(
environment: &Environment<'_>,
locked_environment: &Vec<CondaPackage>,
platform: Platform,
) -> Result<(), PlatformUnsat> {
let mut specs = environment
.dependencies(None, Some(platform))
.into_match_specs()
.map(|spec| (spec, "<environment>"))
.collect_vec();
let mut name_to_record = HashMap::new();
for (record_idx, record) in locked_environment.iter().enumerate() {
if name_to_record
.insert(
record.package_record().name.as_normalized().to_string(),
record_idx,
)
.is_some()
{
return Err(PlatformUnsat::DuplicateEntry(
record.package_record().name.as_normalized().to_string(),
));
}
}
let virtual_packages = environment
.virtual_packages(platform)
.into_iter()
.map(|vpkg| (vpkg.name.clone(), vpkg))
.collect::<HashMap<_, _>>();
let mut records_visited = HashSet::new();
while let Some((spec, source)) = specs.pop() {
let matching_record_idx = match &spec.name {
None => {
locked_environment.iter().position(|r| r.satisfies(&spec))
}
Some(name) => {
if let Some(vpkg) = virtual_packages.get(name) {
if spec
.version
.as_ref()
.map(|spec| spec.matches(&vpkg.version))
.unwrap_or(true)
&& spec
.build
.as_ref()
.map(|spec| spec.matches(&vpkg.build_string))
.unwrap_or(true)
{
continue;
}
}
name_to_record
.get(name.as_normalized())
.copied()
.and_then(|idx| {
let record = &locked_environment[idx];
if record.satisfies(&spec) {
Some(idx)
} else {
None
}
})
}
};
let Some(matching_record_idx) = matching_record_idx else {
return Err(PlatformUnsat::UnsatisfiableMatchSpec(
spec,
source.to_string(),
));
};
if !records_visited.insert(matching_record_idx) {
continue;
}
let record = &locked_environment[matching_record_idx];
let source = record
.file_name()
.unwrap_or_else(|| record.package_record().name.as_normalized());
for depends in &record.package_record().depends {
let spec = MatchSpec::from_str(depends.as_str())
.map_err(|e| PlatformUnsat::FailedToParseMatchSpec(depends.clone(), e))?;
specs.push((spec, source))
}
}
if records_visited.len() != locked_environment.len() {
return Err(PlatformUnsat::TooManyCondaPackages);
}
Ok(())
}
pub fn verify_pypi_platform_satisfiability(
environment: &Environment<'_>,
locked_conda_packages: &[CondaPackage],
locked_pypi_environment: &[PypiPackage],
platform: Platform,
) -> Result<(), PlatformUnsat> {
let mut requirements = environment
.pypi_dependencies(Some(platform))
.iter()
.flat_map(|(name, reqs)| {
reqs.iter()
.map(move |req| (req.as_pep508(name), "<environment>"))
})
.collect_vec();
if requirements.is_empty() {
return if !locked_pypi_environment.is_empty() {
Err(PlatformUnsat::TooManyPypiPackages)
} else {
Ok(())
};
}
let package_identifiers =
package_identifiers_from_locked_packages(locked_conda_packages, locked_pypi_environment);
let mut name_to_package_identifiers = HashMap::new();
for (idx, (identifier, _)) in package_identifiers.iter().enumerate() {
name_to_package_identifiers
.entry(identifier.name.clone())
.or_insert_with(Vec::new)
.push(idx);
}
let Some(python_record) = locked_conda_packages.iter().find(|r| is_python_record(*r)) else {
return Err(PlatformUnsat::MissingPythonInterpreter);
};
let marker_environment =
match determine_marker_environment(platform, python_record.package_record()) {
Ok(marker_environment) => marker_environment,
Err(e) => return Err(PlatformUnsat::FailedToDetermineMarkerEnvironment(e.into())),
};
let mut requirements_visited = requirements
.iter()
.map(|(req, _source)| req.clone())
.collect::<HashSet<_>>();
let mut packages_visited = HashSet::new();
while let Some((requirement, source)) = requirements.pop() {
let Ok(name) = NormalizedPackageName::from_str(requirement.name.as_str()) else {
return Err(PlatformUnsat::UnsatisfiableRequirement(
requirement,
source.to_string(),
));
};
let matched_package = name_to_package_identifiers
.get(&name)
.into_iter()
.flat_map(|idxs| idxs.iter().map(|idx| &package_identifiers[*idx]))
.find(|(identifier, _pypi_package_idx)| identifier.satisfies(&requirement));
let Some((_identifier, pypi_package_idx)) = matched_package else {
return Err(PlatformUnsat::UnsatisfiableRequirement(
requirement,
source.to_string(),
));
};
let Some(pypi_package_idx) = *pypi_package_idx else {
continue;
};
let pkg_data = locked_pypi_environment[pypi_package_idx].data().package;
packages_visited.insert(pypi_package_idx);
if let Some(required_python_version) = &pkg_data.requires_python {
if !required_python_version.contains(&marker_environment.python_full_version.version) {
return Err(PlatformUnsat::PythonVersionMismatch(
pkg_data.name.clone(),
required_python_version.clone(),
marker_environment
.python_full_version
.version
.clone()
.into(),
));
}
}
for dependency in pkg_data.requires_dist.iter() {
if !dependency.evaluate_markers(
&marker_environment,
requirement.extras.clone().unwrap_or_default(),
) {
continue;
}
if requirements_visited.get(dependency).is_some() {
continue;
}
requirements_visited.insert(dependency.clone());
requirements.push((dependency.clone(), &pkg_data.name));
}
}
if packages_visited.len() != locked_pypi_environment.len() {
return Err(PlatformUnsat::TooManyPypiPackages);
}
Ok(())
}
fn package_identifiers_from_locked_packages(
locked_conda_environment: &[CondaPackage],
locked_pypi_environment: &[PypiPackage],
) -> Vec<(package_identifier::PypiPackageIdentifier, Option<usize>)> {
let conda_package_identifiers = locked_conda_environment
.iter()
.map(package_identifier::PypiPackageIdentifier::from_locked_conda_dependency)
.filter_map(Result::ok)
.flatten()
.map(|pkg| (pkg, None));
let pypi_package_identifiers =
locked_pypi_environment
.iter()
.enumerate()
.filter_map(|(idx, pypi_package)| {
Some((
package_identifier::PypiPackageIdentifier::from_locked_pypi_dependency(
pypi_package,
)
.ok()?,
Some(idx),
))
});
itertools::chain(conda_package_identifiers, pypi_package_identifiers).collect()
}