use super::pypi_name_mapping;
use pep508_rs::{Requirement, VersionOrUrl};
use rattler_conda_types::{PackageUrl, RepoDataRecord};
use rattler_lock::CondaPackage;
use rip::resolve::PinnedPackage;
use rip::types::{Extra, NormalizedPackageName, ParsePackageNameError};
use std::{collections::HashSet, str::FromStr};
use thiserror::Error;
#[derive(Debug)]
pub struct PypiPackageIdentifier {
pub name: NormalizedPackageName,
pub version: pep440_rs::Version,
pub extras: HashSet<Extra>,
}
impl PypiPackageIdentifier {
pub fn from_record(record: &RepoDataRecord) -> Result<Vec<Self>, ConversionError> {
let mut result = Vec::new();
Self::from_record_into(record, &mut result)?;
Ok(result)
}
pub fn from_locked_package(
package: &rattler_lock::Package,
) -> Result<Vec<Self>, ConversionError> {
match package {
rattler_lock::Package::Conda(pkg) => Self::from_locked_conda_dependency(pkg),
rattler_lock::Package::Pypi(pkg) => Ok(vec![Self::from_locked_pypi_dependency(pkg)?]),
}
}
pub fn from_locked_pypi_dependency(
package: &rattler_lock::PypiPackage,
) -> Result<Self, ConversionError> {
let name = NormalizedPackageName::from_str(&package.data().package.name)
.map_err(|e| ConversionError::PackageName(package.data().package.name.clone(), e))?;
let version = package.data().package.version.clone();
let extras = package
.extras()
.iter()
.map(|e| Extra::from_str(e).map_err(|_| ConversionError::Extra(e.clone())))
.collect::<Result<_, _>>()?;
Ok(Self {
name,
version,
extras,
})
}
pub fn from_locked_conda_dependency(
package: &CondaPackage,
) -> Result<Vec<Self>, ConversionError> {
let record = package.package_record();
let mut result = Vec::new();
let mut has_pypi_purl = false;
for purl in record.purls.iter() {
if let Some(entry) = Self::try_from_purl(purl, &record.version.as_str())? {
result.push(entry);
has_pypi_purl = true;
}
}
if !has_pypi_purl && pypi_name_mapping::is_conda_forge_url(package.url()) {
let name = NormalizedPackageName::from_str(record.name.as_normalized()).ok();
let version = pep440_rs::Version::from_str(&record.version.as_str()).ok();
if let (Some(name), Some(version)) = (name, version) {
result.push(PypiPackageIdentifier {
name,
version,
extras: Default::default(),
});
}
}
Ok(result)
}
fn from_record_into(
record: &RepoDataRecord,
result: &mut Vec<Self>,
) -> Result<(), ConversionError> {
let mut has_pypi_purl = false;
for purl in record.package_record.purls.iter() {
if let Some(entry) = Self::try_from_purl(purl, &record.package_record.version.as_str())?
{
result.push(entry);
has_pypi_purl = true;
}
}
if !has_pypi_purl && pypi_name_mapping::is_conda_forge_record(record) {
let name = NormalizedPackageName::from_str(record.package_record.name.as_source()).ok();
let version =
pep440_rs::Version::from_str(&record.package_record.version.as_str()).ok();
if let (Some(name), Some(version)) = (name, version) {
result.push(PypiPackageIdentifier {
name,
version,
extras: Default::default(),
})
}
}
Ok(())
}
pub fn from_records(records: &[RepoDataRecord]) -> Result<Vec<Self>, ConversionError> {
let mut result = Vec::new();
for record in records {
Self::from_record_into(record, &mut result)?;
}
Ok(result)
}
pub fn try_from_purl(
package_url: &PackageUrl,
fallback_version: &str,
) -> Result<Option<Self>, ConversionError> {
if package_url.package_type() == "pypi" {
Self::from_pypi_purl(package_url, fallback_version).map(Some)
} else {
Ok(None)
}
}
pub fn from_pypi_purl(
package_url: &PackageUrl,
fallback_version: &str,
) -> Result<Self, ConversionError> {
assert_eq!(package_url.package_type(), "pypi");
let name = package_url.name();
let name = NormalizedPackageName::from_str(name)
.map_err(|e| ConversionError::PackageName(name.to_string(), e))?;
let version_str = package_url.version().unwrap_or(fallback_version);
let version = pep440_rs::Version::from_str(version_str)
.map_err(|_| ConversionError::Version(version_str.to_string()))?;
let extras = HashSet::new();
Ok(Self {
name,
version,
extras,
})
}
pub fn satisfies(&self, requirement: &Requirement) -> bool {
let Ok(req_name) = NormalizedPackageName::from_str(&requirement.name) else {
return false;
};
if self.name != req_name {
return false;
}
match &requirement.version_or_url {
None => {}
Some(VersionOrUrl::Url(_)) => {
return true;
}
Some(VersionOrUrl::VersionSpecifier(spec)) => {
if !spec.contains(&self.version) {
return false;
}
}
}
for extra in requirement.extras.iter().flat_map(|e| e.iter()) {
if !self.extras.contains(extra.as_str()) {
return false;
}
}
true
}
}
#[derive(Error, Debug)]
pub enum ConversionError {
#[error("'{0}' is not a valid python package name")]
PackageName(String, #[source] ParsePackageNameError),
#[error("'{0}' is not a valid python version")]
Version(String),
#[error("'{0}' is not a valid python extra")]
Extra(String),
}
impl From<PypiPackageIdentifier> for PinnedPackage {
fn from(value: PypiPackageIdentifier) -> Self {
PinnedPackage {
name: value.name,
version: value.version,
extras: value.extras,
artifacts: vec![],
url: None,
}
}
}