use std::borrow::Cow;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Write;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use itertools::Itertools;
use owo_colors::OwoColorize;
use tracing::{debug, trace, warn};
use uv_auth::CredentialsCache;
use uv_cache::{Cache, CacheBucket};
use uv_cache_key::cache_digest;
use uv_client::{BaseClientBuilder, FlatIndexClient, RegistryClientBuilder};
use uv_configuration::{
Concurrency, Constraints, DependencyGroupsWithDefaults, DryRun, ExtrasSpecification,
GitLfsSetting, Reinstall, TargetTriple, Upgrade,
};
use uv_dispatch::{BuildDispatch, SharedState};
use uv_distribution::{DistributionDatabase, LoweredExtraBuildDependencies, LoweredRequirement};
use uv_distribution_types::{
ExtraBuildRequirement, ExtraBuildRequires, Index, Requirement, RequiresPython, Resolution,
UnresolvedRequirement, UnresolvedRequirementSpecification,
};
use uv_fs::{CWD, LockedFile, LockedFileError, LockedFileMode, Simplified};
use uv_git::ResolvedRepositoryReference;
use uv_installer::{InstallationStrategy, SatisfiesResult, SitePackages};
use uv_normalize::{DEV_DEPENDENCIES, DefaultGroups, ExtraName, GroupName, PackageName};
use uv_pep440::{TildeVersionSpecifier, Version, VersionSpecifiers};
use uv_pep508::MarkerTreeContents;
use uv_preview::Preview;
use uv_pypi_types::{ConflictItem, ConflictKind, ConflictSet, Conflicts};
use uv_python::{
BrokenLink, EnvironmentPreference, Interpreter, InvalidEnvironmentKind, PythonDownloads,
PythonEnvironment, PythonInstallation, PythonPreference, PythonRequest, PythonSource,
PythonVariant, PythonVersionFile, VersionFileDiscoveryOptions, VersionRequest,
};
use uv_requirements::upgrade::{LockedRequirements, read_lock_requirements};
use uv_requirements::{NamedRequirementsResolver, RequirementsSpecification};
use uv_resolver::{
FlatIndex, Installable, Lock, OptionsBuilder, Preference, PythonRequirement,
ResolverEnvironment, ResolverOutput,
};
use uv_scripts::Pep723ItemRef;
use uv_settings::PythonInstallMirrors;
use uv_static::EnvVars;
use uv_torch::{TorchSource, TorchStrategy};
use uv_types::{BuildIsolation, EmptyInstalledPackages, HashStrategy, SourceTreeEditablePolicy};
use uv_virtualenv::remove_virtualenv;
use uv_warnings::{warn_user, warn_user_once};
use uv_workspace::dependency_groups::DependencyGroupError;
use uv_workspace::pyproject::{ExtraBuildDependency, PyProjectToml};
use uv_workspace::{RequiresPythonSources, Workspace, WorkspaceCache};
use crate::commands::pip::loggers::{InstallLogger, ResolveLogger};
use crate::commands::pip::operations::{Changelog, Modifications};
use crate::commands::project::install_target::InstallTarget;
use crate::commands::reporters::{PythonDownloadReporter, ResolverReporter};
use crate::commands::{capitalize, conjunction, pip};
use crate::printer::Printer;
use crate::settings::{
FrozenSource, InstallerSettingsRef, LockCheckSource, ResolverInstallerSettings,
ResolverSettings,
};
pub(crate) mod add;
pub(crate) mod audit;
pub(crate) mod environment;
pub(crate) mod export;
pub(crate) mod format;
pub(crate) mod init;
mod install_target;
pub(crate) mod lock;
pub(crate) mod lock_target;
pub(crate) mod remove;
pub(crate) mod run;
pub(crate) mod sync;
pub(crate) mod tree;
pub(crate) mod version;
#[derive(Debug, Clone, Copy)]
pub(crate) enum MissingLockfileSource {
Frozen,
FrozenEnv,
FrozenConfiguration,
Locked,
LockedEnv,
LockedConfiguration,
Check,
}
impl std::fmt::Display for MissingLockfileSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Frozen => write!(f, "`--frozen`"),
Self::FrozenEnv => write!(f, "`UV_FROZEN=1`"),
Self::FrozenConfiguration => write!(f, "`frozen` (workspace configuration)"),
Self::Locked => write!(f, "`--locked`"),
Self::LockedEnv => write!(f, "`UV_LOCKED=1`"),
Self::LockedConfiguration => write!(f, "`locked` (workspace configuration)"),
Self::Check => write!(f, "`--check`"),
}
}
}
impl From<LockCheckSource> for MissingLockfileSource {
fn from(source: LockCheckSource) -> Self {
match source {
LockCheckSource::LockedCli => Self::Locked,
LockCheckSource::LockedEnv => Self::LockedEnv,
LockCheckSource::LockedConfiguration => Self::LockedConfiguration,
LockCheckSource::Check => Self::Check,
}
}
}
impl From<FrozenSource> for MissingLockfileSource {
fn from(source: FrozenSource) -> Self {
match source {
FrozenSource::Cli => Self::Frozen,
FrozenSource::Env => Self::FrozenEnv,
FrozenSource::Configuration => Self::FrozenConfiguration,
}
}
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum ProjectError {
#[error(
"The lockfile at `uv.lock` needs to be updated, but `{2}` was provided. To update the lockfile, run `uv lock`."
)]
LockMismatch(Option<Box<Lock>>, Box<Lock>, LockCheckSource),
#[error(
"Unable to find lockfile at `{1}`, but {0} was provided. To create a lockfile, run `uv lock` or `uv sync` without the flag."
)]
MissingLockfile(MissingLockfileSource, PathBuf),
#[error(
"The lockfile at `uv.lock` needs to be updated, but `--frozen` was provided: Missing workspace member `{0}`. To update the lockfile, run `uv lock`."
)]
LockWorkspaceMismatch(PackageName),
#[error(
"The lockfile at `uv.lock` uses an unsupported schema version (v{1}, but only v{0} is supported). Downgrade to a compatible uv version, or remove the `uv.lock` prior to running `uv lock` or `uv sync`."
)]
UnsupportedLockVersion(u32, u32),
#[error(
"Failed to parse `uv.lock`, which uses an unsupported schema version (v{1}, but only v{0} is supported). Downgrade to a compatible uv version, or remove the `uv.lock` prior to running `uv lock` or `uv sync`."
)]
UnparsableLockVersion(u32, u32, #[source] toml::de::Error),
#[error("Failed to serialize `uv.lock`")]
LockSerialization(#[from] toml_edit::ser::Error),
#[error(
"The current Python version ({0}) is not compatible with the locked Python requirement: `{1}`"
)]
LockedPythonIncompatibility(Version, RequiresPython),
#[error(
"The current Python platform is not compatible with the lockfile's supported environments: {0}"
)]
LockedPlatformIncompatibility(String),
#[error(transparent)]
Conflict(#[from] ConflictError),
#[error(
"The requested interpreter resolved to Python {_0}, which is incompatible with the project's Python requirement: `{_1}`{}",
format_optional_requires_python_sources(_2, *_3)
)]
RequestedPythonProjectIncompatibility(Version, RequiresPython, RequiresPythonSources, bool),
#[error(
"The Python request from `{python_request}` resolved to Python {version}, which is incompatible with the project's Python requirement: `{requires_python}`{}\nUse `uv python pin` to update the `.python-version` file to a compatible version",
format_optional_requires_python_sources(requires_python_sources, *workspace),
)]
DotPythonVersionProjectIncompatibility {
python_request: String,
version: Version,
requires_python: RequiresPython,
requires_python_sources: Box<RequiresPythonSources>,
workspace: bool,
},
#[error(
"The resolved Python interpreter (Python {_0}) is incompatible with the project's Python requirement: `{_1}`{}",
format_optional_requires_python_sources(_2, *_3)
)]
RequiresPythonProjectIncompatibility(Version, RequiresPython, RequiresPythonSources, bool),
#[error(
"The requested interpreter resolved to Python {0}, which is incompatible with the script's Python requirement: `{1}`"
)]
RequestedPythonScriptIncompatibility(Version, RequiresPython),
#[error(
"The Python request from `{0}` resolved to Python {1}, which is incompatible with the script's Python requirement: `{2}`"
)]
DotPythonVersionScriptIncompatibility(String, Version, RequiresPython),
#[error(
"The resolved Python interpreter (Python {0}) is incompatible with the script's Python requirement: `{1}`"
)]
RequiresPythonScriptIncompatibility(Version, RequiresPython),
#[error("Group `{0}` is not defined in the project's `dependency-groups` table")]
MissingGroupProject(GroupName),
#[error("Group `{0}` is not defined in any project's `dependency-groups` table")]
MissingGroupProjects(GroupName),
#[error("PEP 723 scripts do not support dependency groups, but group `{0}` was specified")]
MissingGroupScript(GroupName),
#[error(
"Default group `{0}` (from `tool.uv.default-groups`) is not defined in the project's `dependency-groups` table"
)]
MissingDefaultGroup(GroupName),
#[error("Extra `{0}` is not defined in the project's `optional-dependencies` table")]
MissingExtraProject(ExtraName),
#[error("Extra `{0}` is not defined in any project's `optional-dependencies` table")]
MissingExtraProjects(ExtraName),
#[error("PEP 723 scripts do not support optional dependencies, but extra `{0}` was specified")]
MissingExtraScript(ExtraName),
#[error("Supported environments must be disjoint, but the following markers overlap: `{0}` and `{1}`.\n\n{hint}{colon} replace `{1}` with `{2}`.", hint = "hint".bold().cyan(), colon = ":".bold())]
OverlappingMarkers(String, String, String),
#[error("Environment markers `{0}` don't overlap with Python requirement `{1}`")]
DisjointEnvironment(MarkerTreeContents, VersionSpecifiers),
#[error(
"Found conflicting Python requirements:\n{}",
format_requires_python_sources(_0)
)]
DisjointRequiresPython(BTreeMap<(PackageName, Option<GroupName>), VersionSpecifiers>),
#[error("Environment marker is empty")]
EmptyEnvironment,
#[error("Project virtual environment directory `{0}` cannot be used because {1}")]
InvalidProjectEnvironmentDir(PathBuf, String),
#[error("Failed to parse `uv.lock`")]
UvLockParse(#[source] toml::de::Error),
#[error("Failed to parse `pyproject.toml`")]
PyprojectTomlParse(#[source] toml::de::Error),
#[error("Failed to update `pyproject.toml`")]
PyprojectTomlUpdate,
#[error("Failed to parse PEP 723 script metadata")]
Pep723ScriptTomlParse(#[source] toml::de::Error),
#[error("Failed to find `site-packages` directory for environment")]
NoSitePackages,
#[error("Attempted to drop a temporary virtual environment while still in-use")]
DroppedEnvironment,
#[error(transparent)]
DependencyGroup(#[from] DependencyGroupError),
#[error(transparent)]
Client(#[from] uv_client::Error),
#[error(transparent)]
ClientBuild(#[from] uv_client::ClientBuildError),
#[error(transparent)]
Python(#[from] uv_python::Error),
#[error(transparent)]
Virtualenv(#[from] uv_virtualenv::Error),
#[error(transparent)]
HashStrategy(#[from] uv_types::HashStrategyError),
#[error(transparent)]
Tags(#[from] uv_platform_tags::TagsError),
#[error(transparent)]
FlatIndex(#[from] uv_client::FlatIndexError),
#[error(transparent)]
Lock(#[from] uv_resolver::LockError),
#[error(transparent)]
Operation(#[from] pip::operations::Error),
#[error(transparent)]
Interpreter(#[from] uv_python::InterpreterError),
#[error(transparent)]
Tool(#[from] uv_tool::Error),
#[error(transparent)]
Name(#[from] uv_normalize::InvalidNameError),
#[error(transparent)]
Requirements(#[from] uv_requirements::Error),
#[error(transparent)]
Metadata(#[from] uv_distribution::MetadataError),
#[error(transparent)]
Lowering(#[from] uv_distribution::LoweringError),
#[error(transparent)]
Workspace(#[from] uv_workspace::WorkspaceError),
#[error(transparent)]
PyprojectMut(#[from] uv_workspace::pyproject_mut::Error),
#[error(transparent)]
ExtraBuildRequires(#[from] uv_distribution_types::ExtraBuildRequiresError),
#[error(transparent)]
Fmt(#[from] std::fmt::Error),
#[error(transparent)]
CacheInfo(#[from] uv_cache_info::CacheInfoError),
#[error(transparent)]
Io(#[from] std::io::Error),
#[error(transparent)]
RetryParsing(#[from] uv_client::RetryParsingError),
#[error(transparent)]
Accelerator(#[from] uv_torch::AcceleratorError),
#[error(transparent)]
Anyhow(#[from] anyhow::Error),
}
#[derive(Debug)]
pub(crate) struct ConflictError {
pub(crate) set: ConflictSet,
pub(crate) conflicts: Vec<ConflictItem>,
pub(crate) groups: DependencyGroupsWithDefaults,
}
impl std::fmt::Display for ConflictError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let set = self
.set
.iter()
.map(|item| match item.kind() {
ConflictKind::Project => format!("{}", item.package()),
ConflictKind::Extra(extra) => format!("`{}[{}]`", item.package(), extra),
ConflictKind::Group(group) => format!("`{}:{}`", item.package(), group),
})
.join(", ");
if self
.conflicts
.iter()
.all(|conflict| matches!(conflict.kind(), ConflictKind::Extra(..)))
{
write!(
f,
"Extras {} are incompatible with the declared conflicts: {{{set}}}",
conjunction(
self.conflicts
.iter()
.map(|conflict| match conflict.kind() {
ConflictKind::Extra(extra) => format!("`{extra}`"),
ConflictKind::Group(..) | ConflictKind::Project => unreachable!(),
})
.collect()
)
)
} else if self
.conflicts
.iter()
.all(|conflict| matches!(conflict.kind(), ConflictKind::Group(..)))
{
write!(
f,
"Groups {} are incompatible with the conflicts: {{{set}}}",
conjunction(
self.conflicts
.iter()
.map(|conflict| match conflict.kind() {
ConflictKind::Group(group)
if self.groups.contains_because_default(group) =>
format!("`{group}` (enabled by default)"),
ConflictKind::Group(group) => format!("`{group}`"),
ConflictKind::Extra(..) | ConflictKind::Project => unreachable!(),
})
.collect()
)
)
} else {
write!(
f,
"{} are incompatible with the declared conflicts: {{{set}}}",
conjunction(
self.conflicts
.iter()
.enumerate()
.map(|(i, conflict)| {
let conflict = match conflict.kind() {
ConflictKind::Project => {
format!("package `{}`", conflict.package())
}
ConflictKind::Extra(extra) => format!("extra `{extra}`"),
ConflictKind::Group(group)
if self.groups.contains_because_default(group) =>
{
format!("group `{group}` (enabled by default)")
}
ConflictKind::Group(group) => format!("group `{group}`"),
};
if i == 0 {
capitalize(&conflict)
} else {
conflict
}
})
.collect()
)
)
}
}
}
impl std::error::Error for ConflictError {}
#[derive(Default, Clone)]
pub(crate) struct UniversalState(SharedState);
impl std::ops::Deref for UniversalState {
type Target = SharedState;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl UniversalState {
pub(crate) fn fork(&self) -> PlatformState {
PlatformState(self.0.fork())
}
}
#[derive(Default, Clone)]
pub(crate) struct PlatformState(SharedState);
impl std::ops::Deref for PlatformState {
type Target = SharedState;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl PlatformState {
pub(crate) fn fork(&self) -> UniversalState {
UniversalState(self.0.fork())
}
pub(crate) fn into_inner(self) -> SharedState {
self.0
}
}
pub(crate) fn find_requires_python(
workspace: &Workspace,
groups: &DependencyGroupsWithDefaults,
) -> Result<Option<RequiresPython>, ProjectError> {
let requires_python = workspace.requires_python(groups)?;
if requires_python.is_empty() {
return Ok(None);
}
for ((package, group), specifiers) in &requires_python {
if let [spec] = &specifiers[..] {
if let Some(spec) = TildeVersionSpecifier::from_specifier_ref(spec) {
if spec.has_patch() {
continue;
}
let (lower, upper) = spec.bounding_specifiers();
let spec_0 = spec.with_patch_version(0);
let (lower_0, upper_0) = spec_0.bounding_specifiers();
warn_user_once!(
"The `requires-python` specifier (`{spec}`) in `{package}{group}` \
uses the tilde specifier (`~=`) without a patch version. This will be \
interpreted as `{lower}, {upper}`. Did you mean `{spec_0}` to constrain the \
version as `{lower_0}, {upper_0}`? We recommend only using \
the tilde specifier with a patch version to avoid ambiguity.",
group = if let Some(group) = group {
format!(":{group}")
} else {
String::new()
},
);
}
}
}
match RequiresPython::intersection(requires_python.iter().map(|(.., specifiers)| specifiers)) {
Some(requires_python) => Ok(Some(requires_python)),
None => Err(ProjectError::DisjointRequiresPython(requires_python)),
}
}
pub(crate) fn validate_project_requires_python(
interpreter: &Interpreter,
workspace: Option<&Workspace>,
groups: &DependencyGroupsWithDefaults,
requires_python: &RequiresPython,
source: &PythonRequestSource,
) -> Result<(), ProjectError> {
if requires_python.contains(interpreter.python_version()) {
return Ok(());
}
let conflicting_requires = workspace
.and_then(|workspace| workspace.requires_python(groups).ok())
.into_iter()
.flatten()
.filter(|(.., requires)| !requires.contains(interpreter.python_version()))
.collect::<RequiresPythonSources>();
let workspace_non_trivial = workspace
.map(|workspace| workspace.packages().len() > 1)
.unwrap_or(false);
match source {
PythonRequestSource::UserRequest => {
Err(ProjectError::RequestedPythonProjectIncompatibility(
interpreter.python_version().clone(),
requires_python.clone(),
conflicting_requires,
workspace_non_trivial,
))
}
PythonRequestSource::DotPythonVersion(file) => {
Err(ProjectError::DotPythonVersionProjectIncompatibility {
python_request: file.path().user_display().to_string(),
version: interpreter.python_version().clone(),
requires_python: requires_python.clone(),
requires_python_sources: Box::new(conflicting_requires),
workspace: workspace_non_trivial,
})
}
PythonRequestSource::RequiresPython => {
Err(ProjectError::RequiresPythonProjectIncompatibility(
interpreter.python_version().clone(),
requires_python.clone(),
conflicting_requires,
workspace_non_trivial,
))
}
}
}
fn validate_script_requires_python(
interpreter: &Interpreter,
requires_python: &RequiresPython,
source: &PythonRequestSource,
) -> Result<(), ProjectError> {
if requires_python.contains(interpreter.python_version()) {
return Ok(());
}
match source {
PythonRequestSource::UserRequest => {
Err(ProjectError::RequestedPythonScriptIncompatibility(
interpreter.python_version().clone(),
requires_python.clone(),
))
}
PythonRequestSource::DotPythonVersion(file) => {
Err(ProjectError::DotPythonVersionScriptIncompatibility(
file.file_name().to_string(),
interpreter.python_version().clone(),
requires_python.clone(),
))
}
PythonRequestSource::RequiresPython => {
Err(ProjectError::RequiresPythonScriptIncompatibility(
interpreter.python_version().clone(),
requires_python.clone(),
))
}
}
}
#[derive(Debug, Clone)]
#[expect(clippy::large_enum_variant)]
pub(crate) enum ScriptInterpreter {
Interpreter(Interpreter),
Environment(PythonEnvironment),
}
impl ScriptInterpreter {
pub(crate) fn root(script: Pep723ItemRef<'_>, active: Option<bool>, cache: &Cache) -> PathBuf {
fn from_virtual_env_variable() -> Option<PathBuf> {
let value = std::env::var_os(EnvVars::VIRTUAL_ENV)?;
if value.is_empty() {
return None;
}
let path = PathBuf::from(value);
if path.is_absolute() {
return Some(path);
}
Some(CWD.join(path))
}
let cache_env = {
let entry = match script {
Pep723ItemRef::Script(script) => {
let digest = cache_digest(&script.path);
if let Some(file_name) = script
.path
.file_stem()
.and_then(|name| name.to_str())
.and_then(cache_name)
{
format!("{file_name}-{digest}")
} else {
digest
}
}
Pep723ItemRef::Remote(.., url) => cache_digest(url),
Pep723ItemRef::Stdin(metadata) => cache_digest(&metadata.raw),
};
cache
.shard(CacheBucket::Environments, entry)
.into_path_buf()
};
if let Some(from_virtual_env) = from_virtual_env_variable() {
if !uv_fs::is_same_file_allow_missing(&from_virtual_env, &cache_env).unwrap_or(false) {
match active {
Some(true) => {
debug!(
"Using active virtual environment `{}` instead of script environment `{}`",
from_virtual_env.user_display(),
cache_env.user_display()
);
return from_virtual_env;
}
Some(false) => {}
None => {
warn_user_once!(
"`VIRTUAL_ENV={}` does not match the script environment path `{}` and will be ignored; use `--active` to target the active environment instead",
from_virtual_env.user_display(),
cache_env.user_display()
);
}
}
}
} else {
if active.unwrap_or_default() {
debug!(
"Use of the active virtual environment was requested, but `VIRTUAL_ENV` is not set"
);
}
}
cache_env
}
pub(crate) async fn discover(
script: Pep723ItemRef<'_>,
python_request: Option<PythonRequest>,
client_builder: &BaseClientBuilder<'_>,
python_preference: PythonPreference,
python_downloads: PythonDownloads,
install_mirrors: &PythonInstallMirrors,
keep_incompatible: bool,
no_config: bool,
active: Option<bool>,
cache: &Cache,
printer: Printer,
preview: Preview,
) -> Result<Self, ProjectError> {
let workspace = None;
let ScriptPython {
source,
python_request,
requires_python,
} = ScriptPython::from_request(python_request, workspace, script, no_config).await?;
let root = Self::root(script, active, cache);
match PythonEnvironment::from_root(&root, cache) {
Ok(venv) => {
match environment_is_usable(
&venv,
EnvironmentKind::Script,
python_request.as_ref(),
python_preference,
requires_python
.as_ref()
.map(|(requires_python, _)| requires_python),
cache,
) {
Ok(()) => return Ok(Self::Environment(venv)),
Err(err) if keep_incompatible => {
warn_user!(
"Using incompatible environment (`{}`) due to `--no-sync` ({err})",
root.user_display().cyan(),
);
return Ok(Self::Environment(venv));
}
Err(err) => {
debug!("{err}");
}
}
}
Err(uv_python::Error::MissingEnvironment(_)) => {}
Err(err) => warn!("Ignoring existing script environment: {err}"),
}
let reporter = PythonDownloadReporter::single(printer);
let interpreter = PythonInstallation::find_or_download(
python_request.as_ref(),
EnvironmentPreference::Any,
python_preference,
python_downloads,
client_builder,
cache,
Some(&reporter),
install_mirrors.python_install_mirror.as_deref(),
install_mirrors.pypy_install_mirror.as_deref(),
install_mirrors.python_downloads_json_url.as_deref(),
preview,
)
.await?
.into_interpreter();
if let Err(err) = match requires_python {
Some((requires_python, RequiresPythonSource::Project)) => {
validate_project_requires_python(
&interpreter,
workspace,
&DependencyGroupsWithDefaults::none(),
&requires_python,
&source,
)
}
Some((requires_python, RequiresPythonSource::Script)) => {
validate_script_requires_python(&interpreter, &requires_python, &source)
}
None => Ok(()),
} {
warn_user!("{err}");
}
Ok(Self::Interpreter(interpreter))
}
pub(crate) fn into_interpreter(self) -> Interpreter {
match self {
Self::Interpreter(interpreter) => interpreter,
Self::Environment(venv) => venv.into_interpreter(),
}
}
pub(crate) async fn lock(script: Pep723ItemRef<'_>) -> Result<LockedFile, LockedFileError> {
match script {
Pep723ItemRef::Script(script) => {
LockedFile::acquire(
std::env::temp_dir().join(format!("uv-{}.lock", cache_digest(&script.path))),
LockedFileMode::Exclusive,
script.path.simplified_display(),
)
.await
}
Pep723ItemRef::Remote(.., url) => {
LockedFile::acquire(
std::env::temp_dir().join(format!("uv-{}.lock", cache_digest(url))),
LockedFileMode::Exclusive,
url.to_string(),
)
.await
}
Pep723ItemRef::Stdin(metadata) => {
LockedFile::acquire(
std::env::temp_dir().join(format!("uv-{}.lock", cache_digest(&metadata.raw))),
LockedFileMode::Exclusive,
"stdin".to_string(),
)
.await
}
}
}
}
#[derive(Debug)]
pub(crate) enum EnvironmentKind {
Script,
Project,
}
impl std::fmt::Display for EnvironmentKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Script => write!(f, "script"),
Self::Project => write!(f, "project"),
}
}
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum EnvironmentIncompatibilityError {
#[error("The {0} environment's Python version does not satisfy the request: `{1}`")]
PythonRequest(EnvironmentKind, PythonRequest),
#[error("The {0} environment's Python version does not meet the Python requirement: `{1}`")]
RequiresPython(EnvironmentKind, RequiresPython),
#[error(
"The interpreter in the {0} environment has a different version ({1}) than it was created with ({2})"
)]
PyenvVersionConflict(EnvironmentKind, Version, Version),
#[error("The {0} environment's Python interpreter does not meet the Python preference: `{1}`")]
PythonPreference(EnvironmentKind, PythonPreference),
}
fn environment_is_usable(
environment: &PythonEnvironment,
kind: EnvironmentKind,
python_request: Option<&PythonRequest>,
python_preference: PythonPreference,
requires_python: Option<&RequiresPython>,
cache: &Cache,
) -> Result<(), EnvironmentIncompatibilityError> {
if let Some((cfg_version, int_version)) = environment.get_pyvenv_version_conflict() {
return Err(EnvironmentIncompatibilityError::PyenvVersionConflict(
kind,
int_version,
cfg_version,
));
}
if let Some(request) = python_request {
if request.satisfied(environment.interpreter(), cache) {
debug!("The {kind} environment's Python version satisfies the request: `{request}`");
} else {
return Err(EnvironmentIncompatibilityError::PythonRequest(
kind,
request.clone(),
));
}
}
if let Some(requires_python) = requires_python {
if requires_python.contains(environment.interpreter().python_version()) {
trace!(
"The {kind} environment's Python version meets the Python requirement: `{requires_python}`"
);
} else {
return Err(EnvironmentIncompatibilityError::RequiresPython(
kind,
requires_python.clone(),
));
}
}
if python_preference.allows_installation(&PythonInstallation::new(
PythonSource::DiscoveredEnvironment,
environment.interpreter().clone(),
)) {
trace!(
"The virtual environment's Python interpreter meets the Python preference: `{}`",
python_preference
);
} else {
return Err(EnvironmentIncompatibilityError::PythonPreference(
kind,
python_preference,
));
}
Ok(())
}
#[derive(Debug)]
#[expect(clippy::large_enum_variant)]
pub(crate) enum ProjectInterpreter {
Interpreter(Interpreter),
Environment(PythonEnvironment),
}
impl ProjectInterpreter {
pub(crate) async fn discover(
workspace: &Workspace,
groups: &DependencyGroupsWithDefaults,
workspace_python: WorkspacePython,
client_builder: &BaseClientBuilder<'_>,
python_preference: PythonPreference,
python_downloads: PythonDownloads,
install_mirrors: &PythonInstallMirrors,
keep_incompatible: bool,
active: Option<bool>,
cache: &Cache,
printer: Printer,
preview: Preview,
) -> Result<Self, ProjectError> {
let WorkspacePython {
source,
python_request,
requires_python,
} = workspace_python;
let root = workspace.venv(active);
match PythonEnvironment::from_root(&root, cache) {
Ok(venv) => {
match environment_is_usable(
&venv,
EnvironmentKind::Project,
python_request.as_ref(),
python_preference,
requires_python.as_ref(),
cache,
) {
Ok(()) => return Ok(Self::Environment(venv)),
Err(err) if keep_incompatible => {
warn_user!(
"Using incompatible environment (`{}`) due to `--no-sync` ({err})",
root.user_display().cyan(),
);
return Ok(Self::Environment(venv));
}
Err(err) => {
debug!("{err}");
}
}
}
Err(uv_python::Error::MissingEnvironment(_)) => {}
Err(uv_python::Error::InvalidEnvironment(inner)) => {
match inner.kind {
InvalidEnvironmentKind::NotDirectory => {
return Err(ProjectError::InvalidProjectEnvironmentDir(
root,
inner.kind.to_string(),
));
}
InvalidEnvironmentKind::MissingExecutable(_) => {
if fs_err::read_dir(&root).is_ok_and(|mut dir| dir.next().is_some()) {
if !root.join("pyvenv.cfg").try_exists().unwrap_or_default() {
return Err(ProjectError::InvalidProjectEnvironmentDir(
root,
"it is not a valid Python environment (no Python executable was found)"
.to_string(),
));
}
}
}
InvalidEnvironmentKind::Empty => {}
}
}
Err(uv_python::Error::Query(uv_python::InterpreterError::NotFound(_))) => {}
Err(uv_python::Error::Query(uv_python::InterpreterError::BrokenLink(BrokenLink {
path,
unix,
venv: _,
}))) => {
if unix {
let target_path = fs_err::read_link(&path)?;
warn_user!(
"Ignoring existing virtual environment linked to non-existent Python interpreter: {} -> {}",
path.user_display().cyan(),
target_path.user_display().cyan(),
);
} else {
warn_user!(
"Ignoring existing virtual environment linked to non-existent Python interpreter: {}",
path.user_display().cyan(),
);
}
}
Err(err) => return Err(err.into()),
}
let reporter = PythonDownloadReporter::single(printer);
let python = PythonInstallation::find_or_download(
python_request.as_ref(),
EnvironmentPreference::OnlySystem,
python_preference,
python_downloads,
client_builder,
cache,
Some(&reporter),
install_mirrors.python_install_mirror.as_deref(),
install_mirrors.pypy_install_mirror.as_deref(),
install_mirrors.python_downloads_json_url.as_deref(),
preview,
)
.await?;
let managed = python.source().is_managed();
let implementation = python.implementation();
let interpreter = python.into_interpreter();
if managed {
writeln!(
printer.stderr(),
"Using {} {}{}",
implementation.pretty(),
interpreter.python_version().cyan(),
interpreter.variant().display_suffix().cyan(),
)?;
} else {
writeln!(
printer.stderr(),
"Using {} {}{} interpreter at: {}",
implementation.pretty(),
interpreter.python_version(),
interpreter.variant().display_suffix(),
interpreter.sys_executable().user_display().cyan()
)?;
}
if let Some(requires_python) = requires_python.as_ref() {
validate_project_requires_python(
&interpreter,
Some(workspace),
groups,
requires_python,
&source,
)?;
}
Ok(Self::Interpreter(interpreter))
}
pub(crate) fn into_interpreter(self) -> Interpreter {
match self {
Self::Interpreter(interpreter) => interpreter,
Self::Environment(venv) => venv.into_interpreter(),
}
}
pub(crate) async fn lock(workspace: &Workspace) -> Result<LockedFile, LockedFileError> {
LockedFile::acquire(
std::env::temp_dir().join(format!(
"uv-{}.lock",
cache_digest(workspace.install_path())
)),
LockedFileMode::Exclusive,
workspace.install_path().simplified_display(),
)
.await
}
}
#[derive(Debug, Clone)]
pub(crate) enum RequiresPythonSource {
Script,
Project,
}
#[derive(Debug, Clone)]
pub(crate) enum PythonRequestSource {
UserRequest,
DotPythonVersion(PythonVersionFile),
RequiresPython,
}
impl std::fmt::Display for PythonRequestSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::UserRequest => write!(f, "explicit request"),
Self::DotPythonVersion(file) => {
write!(f, "version file at `{}`", file.path().user_display())
}
Self::RequiresPython => write!(f, "`requires-python` metadata"),
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct WorkspacePython {
pub(crate) source: PythonRequestSource,
pub(crate) python_request: Option<PythonRequest>,
pub(crate) requires_python: Option<RequiresPython>,
}
impl WorkspacePython {
pub(crate) async fn from_request(
python_request: Option<PythonRequest>,
workspace: Option<&Workspace>,
groups: &DependencyGroupsWithDefaults,
project_dir: &Path,
no_config: bool,
) -> Result<Self, ProjectError> {
let requires_python = workspace
.map(|workspace| find_requires_python(workspace, groups))
.transpose()?
.flatten();
let workspace_root = workspace.map(Workspace::install_path);
let (source, python_request) = if let Some(request) = python_request {
let source = PythonRequestSource::UserRequest;
let request = Some(request);
(source, request)
} else if let Some(file) = PythonVersionFile::discover(
project_dir,
&VersionFileDiscoveryOptions::default()
.with_stop_discovery_at(workspace_root.map(PathBuf::as_ref))
.with_no_config(no_config),
)
.await?
.filter(|file| {
if !file.is_global() {
return true;
}
match (file.version(), requires_python.as_ref()) {
(Some(request), Some(requires_python)) => request
.as_pep440_version()
.is_none_or(|version| requires_python.contains(&version)),
_ => true,
}
}) {
let source = PythonRequestSource::DotPythonVersion(file.clone());
let request = file.version().cloned();
(source, request)
} else {
let request = requires_python
.clone()
.and_then(PythonRequest::from_requires_python);
let source = PythonRequestSource::RequiresPython;
(source, request)
};
if let Some(python_request) = python_request.as_ref() {
debug!(
"Using Python request `{}` from {source}",
python_request.to_canonical_string()
);
}
Ok(Self {
source,
python_request,
requires_python,
})
}
}
#[derive(Debug, Clone)]
pub(crate) struct ScriptPython {
pub(crate) source: PythonRequestSource,
pub(crate) python_request: Option<PythonRequest>,
pub(crate) requires_python: Option<(RequiresPython, RequiresPythonSource)>,
}
impl ScriptPython {
pub(crate) async fn from_request(
python_request: Option<PythonRequest>,
workspace: Option<&Workspace>,
script: Pep723ItemRef<'_>,
no_config: bool,
) -> Result<Self, ProjectError> {
let script_requires_python = script
.metadata()
.requires_python
.as_ref()
.map(RequiresPython::from_specifiers);
let workspace_requires_python = workspace
.map(|workspace| find_requires_python(workspace, &DependencyGroupsWithDefaults::none()))
.transpose()?
.flatten();
let workspace_root = workspace.map(Workspace::install_path);
let project_dir = script.path().and_then(Path::parent).unwrap_or(&**CWD);
let (source, python_request) = if let Some(request) = python_request {
(PythonRequestSource::UserRequest, Some(request))
} else if let Some(file) = PythonVersionFile::discover(
project_dir,
&VersionFileDiscoveryOptions::default()
.with_stop_discovery_at(workspace_root.map(PathBuf::as_ref))
.with_no_config(no_config),
)
.await?
.filter(|file| {
match (file.version(), script_requires_python.as_ref()) {
(Some(request), Some(requires_python)) => {
request.intersects_requires_python(requires_python)
}
_ => true,
}
})
.filter(|file| {
if !file.is_global() {
return true;
}
match (file.version(), workspace_requires_python.as_ref()) {
(Some(request), Some(requires_python)) => {
request.intersects_requires_python(requires_python)
}
_ => true,
}
}) {
(
PythonRequestSource::DotPythonVersion(file.clone()),
file.version().cloned(),
)
} else if let Some(specifiers) = script.metadata().requires_python.as_ref() {
let request = PythonRequest::Version(VersionRequest::from_specifiers(
specifiers.clone(),
PythonVariant::Default,
));
(PythonRequestSource::RequiresPython, Some(request))
} else {
let request = workspace_requires_python
.clone()
.and_then(PythonRequest::from_requires_python);
(PythonRequestSource::RequiresPython, request)
};
let requires_python = if let Some(requires_python) = script_requires_python {
Some((requires_python, RequiresPythonSource::Script))
} else {
workspace_requires_python
.map(|requires_python| (requires_python, RequiresPythonSource::Project))
};
if let Some(python_request) = python_request.as_ref() {
debug!(
"Using Python request `{}` from {source}",
python_request.to_canonical_string()
);
}
Ok(Self {
source,
python_request,
requires_python,
})
}
}
#[derive(Debug)]
enum ProjectEnvironment {
Existing(PythonEnvironment),
Replaced(PythonEnvironment),
Created(PythonEnvironment),
WouldReplace(
PathBuf,
PythonEnvironment,
#[allow(unused)] tempfile::TempDir,
),
WouldCreate(
PathBuf,
PythonEnvironment,
#[allow(unused)] tempfile::TempDir,
),
}
impl ProjectEnvironment {
pub(crate) async fn get_or_init(
workspace: &Workspace,
groups: &DependencyGroupsWithDefaults,
python: Option<PythonRequest>,
install_mirrors: &PythonInstallMirrors,
client_builder: &BaseClientBuilder<'_>,
python_preference: PythonPreference,
python_downloads: PythonDownloads,
no_sync: bool,
no_config: bool,
active: Option<bool>,
cache: &Cache,
dry_run: DryRun,
printer: Printer,
preview: Preview,
) -> Result<Self, ProjectError> {
let _lock = ProjectInterpreter::lock(workspace)
.await
.inspect_err(|err| {
warn!("Failed to acquire project environment lock: {err}");
})
.ok();
let workspace_python = WorkspacePython::from_request(
python,
Some(workspace),
groups,
workspace.install_path().as_ref(),
no_config,
)
.await?;
let upgradeable = workspace_python
.python_request
.as_ref()
.is_none_or(|request| !request.includes_patch());
match ProjectInterpreter::discover(
workspace,
groups,
workspace_python,
client_builder,
python_preference,
python_downloads,
install_mirrors,
no_sync,
active,
cache,
printer,
preview,
)
.await?
{
ProjectInterpreter::Environment(environment) => Ok(Self::Existing(environment)),
ProjectInterpreter::Interpreter(interpreter) => {
let root = workspace.venv(active);
let replace = match (root.try_exists(), root.join("pyvenv.cfg").try_exists()) {
(_, Ok(true)) => true,
(Ok(false), Ok(false)) => false,
(Ok(true), Ok(false)) => {
if root.read_dir().is_ok_and(|mut dir| dir.next().is_none()) {
false
} else {
return Err(ProjectError::InvalidProjectEnvironmentDir(
root,
"it is not a compatible environment but cannot be recreated because it is not a virtual environment".to_string(),
));
}
}
(_, Err(err)) | (Err(err), _) => {
return Err(ProjectError::InvalidProjectEnvironmentDir(
root,
format!(
"it is not a compatible environment but cannot be recreated because uv cannot determine if it is a virtual environment: {err}"
),
));
}
};
let prompt = workspace
.pyproject_toml()
.project
.as_ref()
.map(|p| p.name.to_string())
.or_else(|| {
workspace
.install_path()
.file_name()
.map(|f| f.to_string_lossy().to_string())
})
.map(uv_virtualenv::Prompt::Static)
.unwrap_or(uv_virtualenv::Prompt::None);
if dry_run.enabled() {
let temp_dir = cache.venv_dir()?;
let environment = uv_virtualenv::create_venv(
temp_dir.path(),
interpreter,
prompt,
false,
uv_virtualenv::OnExisting::Remove(
uv_virtualenv::RemovalReason::ManagedEnvironment,
),
false,
false,
upgradeable,
)?;
return Ok(if replace {
Self::WouldReplace(root, environment, temp_dir)
} else {
Self::WouldCreate(root, environment, temp_dir)
});
}
if replace {
match remove_virtualenv(&root) {
Ok(()) => {
writeln!(
printer.stderr(),
"Removed virtual environment at: {}",
root.user_display().cyan()
)?;
}
Err(uv_virtualenv::Error::Io(err))
if err.kind() == std::io::ErrorKind::NotFound => {}
Err(err) => return Err(err.into()),
}
}
writeln!(
printer.stderr(),
"Creating virtual environment at: {}",
root.user_display().cyan()
)?;
let environment = uv_virtualenv::create_venv(
&root,
interpreter,
prompt,
false,
uv_virtualenv::OnExisting::Remove(
uv_virtualenv::RemovalReason::ManagedEnvironment,
),
false,
false,
upgradeable,
)?;
if replace {
Ok(Self::Replaced(environment))
} else {
Ok(Self::Created(environment))
}
}
}
}
pub(crate) fn into_environment(self) -> Result<PythonEnvironment, ProjectError> {
match self {
Self::Existing(environment) => Ok(environment),
Self::Replaced(environment) => Ok(environment),
Self::Created(environment) => Ok(environment),
Self::WouldReplace(..) => Err(ProjectError::DroppedEnvironment),
Self::WouldCreate(..) => Err(ProjectError::DroppedEnvironment),
}
}
pub(crate) fn dry_run_target(&self) -> Option<&Path> {
match self {
Self::WouldReplace(path, _, _) | Self::WouldCreate(path, _, _) => Some(path),
Self::Created(_) | Self::Existing(_) | Self::Replaced(_) => None,
}
}
}
impl std::ops::Deref for ProjectEnvironment {
type Target = PythonEnvironment;
fn deref(&self) -> &Self::Target {
match self {
Self::Existing(environment) => environment,
Self::Replaced(environment) => environment,
Self::Created(environment) => environment,
Self::WouldReplace(_, environment, _) => environment,
Self::WouldCreate(_, environment, _) => environment,
}
}
}
#[derive(Debug)]
enum ScriptEnvironment {
Existing(PythonEnvironment),
Replaced(PythonEnvironment),
Created(PythonEnvironment),
WouldReplace(
PathBuf,
PythonEnvironment,
#[allow(unused)] tempfile::TempDir,
),
WouldCreate(
PathBuf,
PythonEnvironment,
#[allow(unused)] tempfile::TempDir,
),
}
impl ScriptEnvironment {
pub(crate) async fn get_or_init(
script: Pep723ItemRef<'_>,
python_request: Option<PythonRequest>,
client_builder: &BaseClientBuilder<'_>,
python_preference: PythonPreference,
python_downloads: PythonDownloads,
install_mirrors: &PythonInstallMirrors,
no_sync: bool,
no_config: bool,
active: Option<bool>,
cache: &Cache,
dry_run: DryRun,
printer: Printer,
preview: Preview,
) -> Result<Self, ProjectError> {
let _lock = ScriptInterpreter::lock(script)
.await
.inspect_err(|err| {
warn!("Failed to acquire script environment lock: {err}");
})
.ok();
let upgradeable = python_request
.as_ref()
.is_none_or(|request| !request.includes_patch());
match ScriptInterpreter::discover(
script,
python_request,
client_builder,
python_preference,
python_downloads,
install_mirrors,
no_sync,
no_config,
active,
cache,
printer,
preview,
)
.await?
{
ScriptInterpreter::Environment(environment) => Ok(Self::Existing(environment)),
ScriptInterpreter::Interpreter(interpreter) => {
let root = ScriptInterpreter::root(script, active, cache);
let prompt = script
.path()
.and_then(|path| path.file_name())
.map(|f| f.to_string_lossy().to_string())
.map(uv_virtualenv::Prompt::Static)
.unwrap_or(uv_virtualenv::Prompt::None);
if dry_run.enabled() {
let temp_dir = cache.venv_dir()?;
let environment = uv_virtualenv::create_venv(
temp_dir.path(),
interpreter,
prompt,
false,
uv_virtualenv::OnExisting::Remove(
uv_virtualenv::RemovalReason::ManagedEnvironment,
),
false,
false,
upgradeable,
)?;
return Ok(if root.exists() {
Self::WouldReplace(root, environment, temp_dir)
} else {
Self::WouldCreate(root, environment, temp_dir)
});
}
let replaced = match remove_virtualenv(&root) {
Ok(()) => {
debug!(
"Removed virtual environment at: {}",
root.user_display().cyan()
);
true
}
Err(uv_virtualenv::Error::Io(err))
if err.kind() == std::io::ErrorKind::NotFound =>
{
false
}
Err(err) => return Err(err.into()),
};
debug!(
"Creating script environment at: {}",
root.user_display().cyan()
);
let environment = uv_virtualenv::create_venv(
&root,
interpreter,
prompt,
false,
uv_virtualenv::OnExisting::Remove(
uv_virtualenv::RemovalReason::ManagedEnvironment,
),
false,
false,
upgradeable,
)?;
Ok(if replaced {
Self::Replaced(environment)
} else {
Self::Created(environment)
})
}
}
}
pub(crate) fn into_environment(self) -> Result<PythonEnvironment, ProjectError> {
match self {
Self::Existing(environment) => Ok(environment),
Self::Replaced(environment) => Ok(environment),
Self::Created(environment) => Ok(environment),
Self::WouldReplace(..) => Err(ProjectError::DroppedEnvironment),
Self::WouldCreate(..) => Err(ProjectError::DroppedEnvironment),
}
}
pub(crate) fn dry_run_target(&self) -> Option<&Path> {
match self {
Self::WouldReplace(path, _, _) | Self::WouldCreate(path, _, _) => Some(path),
Self::Created(_) | Self::Existing(_) | Self::Replaced(_) => None,
}
}
}
impl std::ops::Deref for ScriptEnvironment {
type Target = PythonEnvironment;
fn deref(&self) -> &Self::Target {
match self {
Self::Existing(environment) => environment,
Self::Replaced(environment) => environment,
Self::Created(environment) => environment,
Self::WouldReplace(_, environment, _) => environment,
Self::WouldCreate(_, environment, _) => environment,
}
}
}
pub(crate) async fn resolve_names(
requirements: Vec<UnresolvedRequirementSpecification>,
interpreter: &Interpreter,
settings: &ResolverInstallerSettings,
client_builder: &BaseClientBuilder<'_>,
state: &SharedState,
concurrency: &Concurrency,
cache: &Cache,
workspace_cache: &WorkspaceCache,
printer: Printer,
preview: Preview,
lfs: GitLfsSetting,
) -> Result<Vec<Requirement>, uv_requirements::Error> {
let (mut requirements, unnamed): (Vec<_>, Vec<_>) = requirements
.into_iter()
.map(|spec| {
spec.requirement
.augment_requirement(None, None, None, lfs.into(), None)
})
.partition_map(|requirement| match requirement {
UnresolvedRequirement::Named(requirement) => itertools::Either::Left(requirement),
UnresolvedRequirement::Unnamed(requirement) => itertools::Either::Right(requirement),
});
if unnamed.is_empty() {
return Ok(requirements);
}
let ResolverInstallerSettings {
resolver:
ResolverSettings {
build_options,
config_setting,
config_settings_package,
dependency_metadata,
exclude_newer,
fork_strategy: _,
index_locations,
index_strategy,
keyring_provider,
link_mode,
build_isolation,
extra_build_dependencies,
extra_build_variables,
prerelease: _,
resolution: _,
sources,
torch_backend,
upgrade: _,
},
compile_bytecode: _,
reinstall: _,
} = settings;
let client_builder = client_builder.clone().keyring(*keyring_provider);
let torch_backend = torch_backend
.map(|mode| {
let source = if uv_auth::PyxTokenStore::from_settings()
.is_ok_and(|store| store.has_credentials())
{
TorchSource::Pyx
} else {
TorchSource::default()
};
TorchStrategy::from_mode(mode, source, interpreter.platform().os())
})
.transpose()
.ok()
.flatten();
let client = RegistryClientBuilder::new(client_builder, cache.clone())
.index_locations(index_locations.clone())
.index_strategy(*index_strategy)
.torch_backend(torch_backend.clone())
.markers(interpreter.markers())
.platform(interpreter.platform())
.build()
.map_err(std::io::Error::other)?;
let environment;
let build_isolation = match build_isolation {
uv_configuration::BuildIsolation::Isolate => BuildIsolation::Isolated,
uv_configuration::BuildIsolation::Shared => {
environment = PythonEnvironment::from_interpreter(interpreter.clone());
BuildIsolation::Shared(&environment)
}
uv_configuration::BuildIsolation::SharedPackage(packages) => {
environment = PythonEnvironment::from_interpreter(interpreter.clone());
BuildIsolation::SharedPackage(&environment, packages)
}
};
let hasher = HashStrategy::default();
let flat_index = FlatIndex::default();
let build_constraints = Constraints::default();
let build_hasher = HashStrategy::default();
let extra_build_requires =
LoweredExtraBuildDependencies::from_non_lowered(extra_build_dependencies.clone())
.into_inner();
let build_dispatch = BuildDispatch::new(
&client,
cache,
&build_constraints,
interpreter,
index_locations,
&flat_index,
dependency_metadata,
state.clone(),
*index_strategy,
config_setting,
config_settings_package,
build_isolation,
&extra_build_requires,
extra_build_variables,
*link_mode,
build_options,
&build_hasher,
exclude_newer.clone(),
sources.clone(),
SourceTreeEditablePolicy::Project,
workspace_cache.clone(),
concurrency.clone(),
preview,
);
requirements.extend(
NamedRequirementsResolver::new(
&hasher,
state.index(),
DistributionDatabase::new(
&client,
&build_dispatch,
concurrency.downloads_semaphore.clone(),
),
)
.with_reporter(Arc::new(ResolverReporter::from(printer)))
.resolve(unnamed.into_iter())
.await?,
);
Ok(requirements)
}
#[derive(Debug, Clone)]
pub(crate) enum PreferenceLocation<'lock> {
Lock {
lock: &'lock Lock,
install_path: &'lock Path,
},
Entries(Vec<Preference>),
}
#[derive(Debug, Clone)]
pub(crate) struct EnvironmentSpecification<'lock> {
requirements: RequirementsSpecification,
preferences: Option<PreferenceLocation<'lock>>,
}
impl From<RequirementsSpecification> for EnvironmentSpecification<'_> {
fn from(requirements: RequirementsSpecification) -> Self {
Self {
requirements,
preferences: None,
}
}
}
impl<'lock> EnvironmentSpecification<'lock> {
#[must_use]
pub(crate) fn with_preferences(self, preferences: PreferenceLocation<'lock>) -> Self {
Self {
preferences: Some(preferences),
..self
}
}
}
pub(crate) async fn resolve_environment(
spec: EnvironmentSpecification<'_>,
interpreter: &Interpreter,
python_platform: Option<&TargetTriple>,
source_tree_editable_policy: SourceTreeEditablePolicy,
build_constraints: Constraints,
settings: &ResolverSettings,
client_builder: &BaseClientBuilder<'_>,
state: &PlatformState,
logger: Box<dyn ResolveLogger>,
concurrency: &Concurrency,
cache: &Cache,
workspace_cache: &WorkspaceCache,
printer: Printer,
preview: Preview,
) -> Result<ResolverOutput, ProjectError> {
warn_on_requirements_txt_setting(&spec.requirements, settings);
let ResolverSettings {
index_locations,
index_strategy,
keyring_provider,
resolution,
prerelease,
fork_strategy,
dependency_metadata,
config_setting,
config_settings_package,
build_isolation,
extra_build_dependencies,
extra_build_variables,
exclude_newer,
link_mode,
upgrade: _,
build_options,
sources,
torch_backend,
} = settings;
let RequirementsSpecification {
project,
requirements,
constraints,
overrides,
excludes,
source_trees,
..
} = spec.requirements;
let client_builder = client_builder.clone().keyring(*keyring_provider);
let tags = pip::resolution_tags(None, python_platform, interpreter)?;
let marker_env = pip::resolution_markers(None, python_platform, interpreter);
let python_requirement = PythonRequirement::from_interpreter(interpreter);
let torch_backend = torch_backend
.map(|mode| {
let source = if uv_auth::PyxTokenStore::from_settings()
.is_ok_and(|store| store.has_credentials())
{
TorchSource::Pyx
} else {
TorchSource::default()
};
TorchStrategy::from_mode(
mode,
source,
python_platform
.map(|t| t.platform())
.as_ref()
.unwrap_or(interpreter.platform())
.os(),
)
})
.transpose()?;
let client = RegistryClientBuilder::new(client_builder, cache.clone())
.index_locations(index_locations.clone())
.index_strategy(*index_strategy)
.torch_backend(torch_backend.clone())
.markers(interpreter.markers())
.platform(interpreter.platform())
.build()?;
let environment;
let build_isolation = match build_isolation {
uv_configuration::BuildIsolation::Isolate => BuildIsolation::Isolated,
uv_configuration::BuildIsolation::Shared => {
environment = PythonEnvironment::from_interpreter(interpreter.clone());
BuildIsolation::Shared(&environment)
}
uv_configuration::BuildIsolation::SharedPackage(packages) => {
environment = PythonEnvironment::from_interpreter(interpreter.clone());
BuildIsolation::SharedPackage(&environment, packages)
}
};
let options = OptionsBuilder::new()
.resolution_mode(*resolution)
.prerelease_mode(*prerelease)
.fork_strategy(*fork_strategy)
.exclude_newer(exclude_newer.clone())
.index_strategy(*index_strategy)
.build_options(build_options.clone())
.build();
let extras = ExtrasSpecification::default();
let groups = BTreeMap::new();
let hasher = HashStrategy::default();
let build_hasher = HashStrategy::default();
let reinstall = Reinstall::default();
let upgrade = Upgrade::default();
let preferences = match spec.preferences {
Some(PreferenceLocation::Lock { lock, install_path }) => {
let LockedRequirements { preferences, git } =
read_lock_requirements(lock, install_path, &upgrade)?;
for ResolvedRepositoryReference { reference, sha } in git {
debug!("Inserting Git reference into resolver: `{reference:?}` at `{sha}`");
state.git().insert(reference, sha);
}
preferences
}
Some(PreferenceLocation::Entries(entries)) => entries,
None => vec![],
};
let flat_index = {
let client = FlatIndexClient::new(client.cached_client(), client.connectivity(), cache);
let entries = client
.fetch_all(index_locations.flat_indexes().map(Index::url))
.await?;
FlatIndex::from_entries(entries, Some(&tags), &hasher, build_options)
};
let extra_build_requires =
LoweredExtraBuildDependencies::from_non_lowered(extra_build_dependencies.clone())
.into_inner();
let resolve_dispatch = BuildDispatch::new(
&client,
cache,
&build_constraints,
interpreter,
index_locations,
&flat_index,
dependency_metadata,
state.clone().into_inner(),
*index_strategy,
config_setting,
config_settings_package,
build_isolation,
&extra_build_requires,
extra_build_variables,
*link_mode,
build_options,
&build_hasher,
exclude_newer.clone(),
sources.clone(),
source_tree_editable_policy,
workspace_cache.clone(),
concurrency.clone(),
preview,
);
Ok(pip::operations::resolve(
requirements,
constraints,
overrides,
excludes,
source_trees,
project,
BTreeSet::default(),
&extras,
&groups,
preferences,
EmptyInstalledPackages,
&hasher,
&reinstall,
&upgrade,
Some(&tags),
ResolverEnvironment::specific(marker_env),
python_requirement,
interpreter.markers(),
Conflicts::empty(),
&client,
&flat_index,
state.index(),
&resolve_dispatch,
concurrency,
options,
logger,
printer,
)
.await?
.0)
}
pub(crate) async fn sync_environment(
venv: PythonEnvironment,
resolution: &Resolution,
modifications: Modifications,
build_constraints: Constraints,
settings: InstallerSettingsRef<'_>,
client_builder: &BaseClientBuilder<'_>,
state: &PlatformState,
logger: Box<dyn InstallLogger>,
installer_metadata: bool,
concurrency: &Concurrency,
cache: &Cache,
printer: Printer,
preview: Preview,
) -> Result<PythonEnvironment, ProjectError> {
let InstallerSettingsRef {
index_locations,
index_strategy,
keyring_provider,
dependency_metadata,
config_setting,
config_settings_package,
build_isolation,
extra_build_dependencies,
extra_build_variables,
exclude_newer,
link_mode,
compile_bytecode,
reinstall,
build_options,
sources,
} = settings;
let client_builder = client_builder.clone().keyring(keyring_provider);
let site_packages = SitePackages::from_environment(&venv)?;
let interpreter = venv.interpreter();
let tags = venv.interpreter().tags()?;
let client = RegistryClientBuilder::new(client_builder, cache.clone())
.index_locations(index_locations.clone())
.index_strategy(index_strategy)
.markers(interpreter.markers())
.platform(interpreter.platform())
.build()?;
let build_isolation = match build_isolation {
uv_configuration::BuildIsolation::Isolate => BuildIsolation::Isolated,
uv_configuration::BuildIsolation::Shared => BuildIsolation::Shared(&venv),
uv_configuration::BuildIsolation::SharedPackage(packages) => {
BuildIsolation::SharedPackage(&venv, packages)
}
};
let build_hasher = HashStrategy::default();
let dry_run = DryRun::default();
let hasher = HashStrategy::default();
let workspace_cache = WorkspaceCache::default();
let flat_index = {
let client = FlatIndexClient::new(client.cached_client(), client.connectivity(), cache);
let entries = client
.fetch_all(index_locations.flat_indexes().map(Index::url))
.await?;
FlatIndex::from_entries(entries, Some(tags), &hasher, build_options)
};
let extra_build_requires =
LoweredExtraBuildDependencies::from_non_lowered(extra_build_dependencies.clone())
.into_inner();
let build_dispatch = BuildDispatch::new(
&client,
cache,
&build_constraints,
interpreter,
index_locations,
&flat_index,
dependency_metadata,
state.clone().into_inner(),
index_strategy,
config_setting,
config_settings_package,
build_isolation,
&extra_build_requires,
extra_build_variables,
link_mode,
build_options,
&build_hasher,
exclude_newer.clone(),
sources,
SourceTreeEditablePolicy::Project,
workspace_cache,
concurrency.clone(),
preview,
);
pip::operations::install(
resolution,
site_packages,
InstallationStrategy::Permissive,
modifications,
reinstall,
build_options,
link_mode,
compile_bytecode,
&hasher,
tags,
&client,
state.in_flight(),
concurrency,
&build_dispatch,
cache,
&venv,
logger,
installer_metadata,
dry_run,
printer,
preview,
)
.await?;
pip::operations::diagnose_resolution(resolution.diagnostics(), printer)?;
Ok(venv)
}
#[derive(Debug)]
pub(crate) struct EnvironmentUpdate {
pub(crate) environment: PythonEnvironment,
pub(crate) changelog: Changelog,
}
impl EnvironmentUpdate {
pub(crate) fn into_environment(self) -> PythonEnvironment {
self.environment
}
}
pub(crate) async fn update_environment(
venv: PythonEnvironment,
spec: RequirementsSpecification,
modifications: Modifications,
python_platform: Option<&TargetTriple>,
source_tree_editable_policy: SourceTreeEditablePolicy,
build_constraints: Constraints,
extra_build_requires: ExtraBuildRequires,
settings: &ResolverInstallerSettings,
client_builder: &BaseClientBuilder<'_>,
state: &SharedState,
resolve: Box<dyn ResolveLogger>,
install: Box<dyn InstallLogger>,
installer_metadata: bool,
concurrency: &Concurrency,
cache: &Cache,
workspace_cache: &WorkspaceCache,
dry_run: DryRun,
printer: Printer,
preview: Preview,
) -> Result<EnvironmentUpdate, ProjectError> {
warn_on_requirements_txt_setting(&spec, &settings.resolver);
let ResolverInstallerSettings {
resolver:
ResolverSettings {
build_options,
config_setting,
config_settings_package,
dependency_metadata,
exclude_newer,
fork_strategy,
index_locations,
index_strategy,
keyring_provider,
link_mode,
build_isolation,
extra_build_dependencies: _,
extra_build_variables,
prerelease,
resolution,
sources,
torch_backend,
upgrade,
},
compile_bytecode,
reinstall,
} = settings;
let client_builder = client_builder.clone().keyring(*keyring_provider);
let RequirementsSpecification {
project,
requirements,
constraints,
overrides,
excludes,
source_trees,
..
} = spec;
let interpreter = venv.interpreter();
let marker_env = pip::resolution_markers(None, python_platform, interpreter);
let tags = pip::resolution_tags(None, python_platform, interpreter)?;
let site_packages = SitePackages::from_environment(&venv)?;
if reinstall.is_none()
&& upgrade.is_none()
&& source_trees.is_empty()
&& matches!(modifications, Modifications::Sufficient)
{
match site_packages.satisfies_spec(
&requirements,
&constraints,
&overrides,
InstallationStrategy::Permissive,
&marker_env,
&tags,
config_setting,
config_settings_package,
&extra_build_requires,
extra_build_variables,
)? {
SatisfiesResult::Fresh {
recursive_requirements,
} => {
if recursive_requirements.is_empty() {
debug!("No requirements to install");
} else {
debug!(
"All requirements satisfied: {}",
recursive_requirements
.iter()
.map(ToString::to_string)
.sorted()
.join(" | ")
);
}
return Ok(EnvironmentUpdate {
environment: venv,
changelog: Changelog::default(),
});
}
SatisfiesResult::Unsatisfied(requirement) => {
debug!("At least one requirement is not satisfied: {requirement}");
}
}
}
let torch_backend = torch_backend
.map(|mode| {
let source = if uv_auth::PyxTokenStore::from_settings()
.is_ok_and(|store| store.has_credentials())
{
TorchSource::Pyx
} else {
TorchSource::default()
};
TorchStrategy::from_mode(
mode,
source,
python_platform
.map(|t| t.platform())
.as_ref()
.unwrap_or(interpreter.platform())
.os(),
)
})
.transpose()?;
let client = RegistryClientBuilder::new(client_builder, cache.clone())
.index_locations(index_locations.clone())
.index_strategy(*index_strategy)
.torch_backend(torch_backend.clone())
.markers(interpreter.markers())
.platform(interpreter.platform())
.build()?;
let build_isolation = match build_isolation {
uv_configuration::BuildIsolation::Isolate => BuildIsolation::Isolated,
uv_configuration::BuildIsolation::Shared => BuildIsolation::Shared(&venv),
uv_configuration::BuildIsolation::SharedPackage(packages) => {
BuildIsolation::SharedPackage(&venv, packages)
}
};
let options = OptionsBuilder::new()
.resolution_mode(*resolution)
.prerelease_mode(*prerelease)
.fork_strategy(*fork_strategy)
.exclude_newer(exclude_newer.clone())
.index_strategy(*index_strategy)
.build_options(build_options.clone())
.build();
let build_hasher = HashStrategy::default();
let extras = ExtrasSpecification::default();
let groups = BTreeMap::new();
let hasher = HashStrategy::default();
let preferences = Vec::default();
let python_requirement = PythonRequirement::from_interpreter(interpreter);
let flat_index = {
let client = FlatIndexClient::new(client.cached_client(), client.connectivity(), cache);
let entries = client
.fetch_all(index_locations.flat_indexes().map(Index::url))
.await?;
FlatIndex::from_entries(entries, Some(&tags), &hasher, build_options)
};
let build_dispatch = BuildDispatch::new(
&client,
cache,
&build_constraints,
interpreter,
index_locations,
&flat_index,
dependency_metadata,
state.clone(),
*index_strategy,
config_setting,
config_settings_package,
build_isolation,
&extra_build_requires,
extra_build_variables,
*link_mode,
build_options,
&build_hasher,
exclude_newer.clone(),
sources.clone(),
source_tree_editable_policy,
workspace_cache.clone(),
concurrency.clone(),
preview,
);
let (resolution, hasher) = match pip::operations::resolve(
requirements,
constraints,
overrides,
excludes,
source_trees,
project,
BTreeSet::default(),
&extras,
&groups,
preferences,
site_packages.clone(),
&hasher,
reinstall,
upgrade,
Some(&tags),
ResolverEnvironment::specific(marker_env.clone()),
python_requirement,
venv.interpreter().markers(),
Conflicts::empty(),
&client,
&flat_index,
state.index(),
&build_dispatch,
concurrency,
options,
resolve,
printer,
)
.await
{
Ok((resolution, hasher)) => (Resolution::from(resolution), hasher),
Err(err) => return Err(err.into()),
};
let changelog = pip::operations::install(
&resolution,
site_packages,
InstallationStrategy::Permissive,
modifications,
reinstall,
build_options,
*link_mode,
*compile_bytecode,
&hasher,
&tags,
&client,
state.in_flight(),
concurrency,
&build_dispatch,
cache,
&venv,
install,
installer_metadata,
dry_run,
printer,
preview,
)
.await?;
pip::operations::diagnose_resolution(resolution.diagnostics(), printer)?;
Ok(EnvironmentUpdate {
environment: venv,
changelog,
})
}
pub(crate) async fn init_script_python_requirement(
python: Option<&str>,
install_mirrors: &PythonInstallMirrors,
directory: &Path,
no_pin_python: bool,
python_preference: PythonPreference,
python_downloads: PythonDownloads,
no_config: bool,
client_builder: &BaseClientBuilder<'_>,
cache: &Cache,
reporter: &PythonDownloadReporter,
preview: Preview,
) -> anyhow::Result<RequiresPython> {
let python_request = if let Some(request) = python {
Some(PythonRequest::parse(request))
} else if let (false, Some(request)) = (
no_pin_python,
PythonVersionFile::discover(
directory,
&VersionFileDiscoveryOptions::default().with_no_config(no_config),
)
.await?
.and_then(PythonVersionFile::into_version),
) {
Some(request)
} else {
None
};
let interpreter = PythonInstallation::find_or_download(
python_request.as_ref(),
EnvironmentPreference::Any,
python_preference,
python_downloads,
client_builder,
cache,
Some(reporter),
install_mirrors.python_install_mirror.as_deref(),
install_mirrors.pypy_install_mirror.as_deref(),
install_mirrors.python_downloads_json_url.as_deref(),
preview,
)
.await?
.into_interpreter();
Ok(RequiresPython::greater_than_equal_version(
&interpreter.python_minor_version(),
))
}
pub(crate) fn default_dependency_groups(
pyproject_toml: &PyProjectToml,
) -> Result<DefaultGroups, ProjectError> {
if let Some(defaults) = pyproject_toml
.tool
.as_ref()
.and_then(|tool| tool.uv.as_ref().and_then(|uv| uv.default_groups.as_ref()))
{
if let DefaultGroups::List(defaults) = defaults {
for group in defaults {
if !pyproject_toml
.dependency_groups
.as_ref()
.is_some_and(|groups| groups.contains_key(group))
{
return Err(ProjectError::MissingDefaultGroup(group.clone()));
}
}
}
Ok(defaults.clone())
} else {
Ok(DefaultGroups::List(vec![DEV_DEPENDENCIES.clone()]))
}
}
pub(crate) fn detect_conflicts(
target: &InstallTarget,
extras: &ExtrasSpecification,
groups: &DependencyGroupsWithDefaults,
) -> Result<(), ProjectError> {
let lock = target.lock();
let packages = target.packages(extras, groups);
let conflicts = lock.conflicts();
for set in conflicts.iter() {
let mut conflicts: Vec<ConflictItem> = vec![];
for item in set.iter() {
if !packages.contains(item.package()) {
continue;
}
let is_conflicting = match item.kind() {
ConflictKind::Project => groups.prod(),
ConflictKind::Extra(extra) => extras.contains(extra),
ConflictKind::Group(group1) => groups.contains(group1),
};
if is_conflicting {
conflicts.push(item.clone());
}
}
if conflicts.len() >= 2 {
return Err(ProjectError::Conflict(ConflictError {
set: set.clone(),
conflicts,
groups: groups.clone(),
}));
}
}
Ok(())
}
pub(crate) fn script_specification(
script: Pep723ItemRef<'_>,
settings: &ResolverSettings,
credentials_cache: &CredentialsCache,
) -> Result<Option<RequirementsSpecification>, ProjectError> {
let Some(dependencies) = script.metadata().dependencies.as_ref() else {
return Ok(None);
};
let script_dir = script.directory()?;
let script_indexes = script.indexes(&settings.sources);
let script_sources = script.sources(&settings.sources);
let requirements = dependencies
.iter()
.cloned()
.flat_map(|requirement| {
LoweredRequirement::from_non_workspace_requirement(
requirement,
script_dir.as_ref(),
script_sources,
script_indexes,
&settings.index_locations,
credentials_cache,
)
.map_ok(LoweredRequirement::into_inner)
})
.collect::<Result<_, _>>()?;
let constraints = script
.metadata()
.tool
.as_ref()
.and_then(|tool| tool.uv.as_ref())
.and_then(|uv| uv.constraint_dependencies.as_ref())
.into_iter()
.flatten()
.cloned()
.flat_map(|requirement| {
LoweredRequirement::from_non_workspace_requirement(
requirement,
script_dir.as_ref(),
script_sources,
script_indexes,
&settings.index_locations,
credentials_cache,
)
.map_ok(LoweredRequirement::into_inner)
})
.collect::<Result<Vec<_>, _>>()?;
let overrides = script
.metadata()
.tool
.as_ref()
.and_then(|tool| tool.uv.as_ref())
.and_then(|uv| uv.override_dependencies.as_ref())
.into_iter()
.flatten()
.cloned()
.flat_map(|requirement| {
LoweredRequirement::from_non_workspace_requirement(
requirement,
script_dir.as_ref(),
script_sources,
script_indexes,
&settings.index_locations,
credentials_cache,
)
.map_ok(LoweredRequirement::into_inner)
})
.collect::<Result<Vec<_>, _>>()?;
let excludes = script
.metadata()
.tool
.as_ref()
.and_then(|tool| tool.uv.as_ref())
.and_then(|uv| uv.exclude_dependencies.as_ref())
.into_iter()
.flatten()
.cloned()
.collect::<Vec<_>>();
Ok(Some(RequirementsSpecification::from_excludes(
requirements,
constraints,
overrides,
excludes,
)))
}
pub(crate) fn script_extra_build_requires(
script: Pep723ItemRef<'_>,
settings: &ResolverSettings,
credentials_cache: &CredentialsCache,
) -> Result<LoweredExtraBuildDependencies, ProjectError> {
let script_dir = script.directory()?;
let script_indexes = script.indexes(&settings.sources);
let script_sources = script.sources(&settings.sources);
let empty = BTreeMap::default();
let script_extra_build_dependencies = script
.metadata()
.tool
.as_ref()
.and_then(|tool| tool.uv.as_ref())
.and_then(|uv| uv.extra_build_dependencies.as_ref())
.unwrap_or(&empty);
let mut extra_build_requires = ExtraBuildRequires::default();
for (name, requirements) in script_extra_build_dependencies {
let lowered_requirements: Vec<_> = requirements
.iter()
.cloned()
.flat_map(
|ExtraBuildDependency {
requirement,
match_runtime,
}| {
LoweredRequirement::from_non_workspace_requirement(
requirement,
script_dir.as_ref(),
script_sources,
script_indexes,
&settings.index_locations,
credentials_cache,
)
.map_ok(move |requirement| ExtraBuildRequirement {
requirement: requirement.into_inner(),
match_runtime,
})
},
)
.collect::<Result<Vec<_>, _>>()?;
extra_build_requires.insert(name.clone(), lowered_requirements);
}
Ok(LoweredExtraBuildDependencies::from_lowered(
extra_build_requires,
))
}
fn warn_on_requirements_txt_setting(spec: &RequirementsSpecification, settings: &ResolverSettings) {
let RequirementsSpecification {
index_url,
extra_index_urls,
no_index,
find_links,
no_binary,
no_build,
..
} = spec;
if settings.index_locations.no_index() {
} else if *no_index {
warn_user_once!(
"Ignoring `--no-index` from requirements file. Instead, use the `--no-index` command-line argument, or set `no-index` in a `uv.toml` or `pyproject.toml` file."
);
} else {
if let Some(index_url) = index_url {
if settings.index_locations.default_index().map(Index::url) != Some(index_url) {
warn_user_once!(
"Ignoring `--index-url` from requirements file: `{index_url}`. Instead, use the `--index-url` command-line argument, or set `index-url` in a `uv.toml` or `pyproject.toml` file."
);
}
}
for extra_index_url in extra_index_urls {
if !settings
.index_locations
.implicit_indexes()
.any(|index| index.url() == extra_index_url)
{
warn_user_once!(
"Ignoring `--extra-index-url` from requirements file: `{extra_index_url}`. Instead, use the `--extra-index-url` command-line argument, or set `extra-index-url` in a `uv.toml` or `pyproject.toml` file.`"
);
}
}
for find_link in find_links {
if !settings
.index_locations
.flat_indexes()
.any(|index| index.url() == find_link)
{
warn_user_once!(
"Ignoring `--find-links` from requirements file: `{find_link}`. Instead, use the `--find-links` command-line argument, or set `find-links` in a `uv.toml` or `pyproject.toml` file.`"
);
}
}
}
if !no_binary.is_none() && settings.build_options.no_binary() != no_binary {
warn_user_once!(
"Ignoring `--no-binary` setting from requirements file. Instead, use the `--no-binary` command-line argument, or set `no-binary` in a `uv.toml` or `pyproject.toml` file."
);
}
if !no_build.is_none() && settings.build_options.no_build() != no_build {
warn_user_once!(
"Ignoring `--no-binary` setting from requirements file. Instead, use the `--no-build` command-line argument, or set `no-build` in a `uv.toml` or `pyproject.toml` file."
);
}
}
fn cache_name(name: &str) -> Option<Cow<'_, str>> {
if name.bytes().all(|c| matches!(c, b'0'..=b'9' | b'a'..=b'f')) {
return if name.is_empty() {
None
} else {
Some(Cow::Borrowed(name))
};
}
let mut normalized = String::with_capacity(name.len());
let mut dash = false;
for char in name.bytes() {
match char {
b'0'..=b'9' | b'a'..=b'z' | b'A'..=b'Z' => {
dash = false;
normalized.push(char.to_ascii_lowercase() as char);
}
_ => {
if !dash {
normalized.push('-');
dash = true;
}
}
}
}
if normalized.ends_with('-') {
normalized.pop();
}
if normalized.is_empty() {
None
} else {
Some(Cow::Owned(normalized))
}
}
fn format_requires_python_sources(conflicts: &RequiresPythonSources) -> String {
conflicts
.iter()
.map(|((package, group), specifiers)| {
if let Some(group) = group {
format!("- {package}:{group}: {specifiers}")
} else {
format!("- {package}: {specifiers}")
}
})
.join("\n")
}
fn format_optional_requires_python_sources(
conflicts: &RequiresPythonSources,
workspace_non_trivial: bool,
) -> String {
if conflicts.len() > 1 {
return format!(
".\nThe following `requires-python` declarations do not permit this version:\n{}",
format_requires_python_sources(conflicts)
);
}
if conflicts.len() == 1 {
let ((package, group), _) = conflicts.iter().next().unwrap();
if let Some(group) = group {
if workspace_non_trivial {
return format!(
" (from workspace member `{package}`'s `tool.uv.dependency-groups.{group}.requires-python`)."
);
}
return format!(" (from `tool.uv.dependency-groups.{group}.requires-python`).");
}
if workspace_non_trivial {
return format!(" (from workspace member `{package}`'s `project.requires-python`).");
}
return " (from `project.requires-python`)".to_owned();
}
String::new()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_cache_name() {
assert_eq!(cache_name("foo"), Some("foo".into()));
assert_eq!(cache_name("foo-bar"), Some("foo-bar".into()));
assert_eq!(cache_name("foo_bar"), Some("foo-bar".into()));
assert_eq!(cache_name("foo-bar_baz"), Some("foo-bar-baz".into()));
assert_eq!(cache_name("foo-bar_baz_"), Some("foo-bar-baz".into()));
assert_eq!(cache_name("foo-_bar_baz"), Some("foo-bar-baz".into()));
assert_eq!(cache_name("_+-_"), None);
}
}