use itertools::Either;
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use tracing::info_span;
use uv_auth::CredentialsCache;
use uv_configuration::{DependencyGroupsWithDefaults, NoSources};
use uv_distribution::LoweredRequirement;
use uv_distribution_types::{Index, IndexLocations, Requirement, RequiresPython};
use uv_normalize::{GroupName, PackageName};
use uv_pep508::RequirementOrigin;
use uv_pypi_types::{Conflicts, SupportedEnvironments, VerbatimParsedUrl};
use uv_resolver::{Lock, LockVersion, VERSION};
use uv_scripts::Pep723Script;
use uv_workspace::dependency_groups::{DependencyGroupError, FlatDependencyGroup};
use uv_workspace::{Editability, Workspace, WorkspaceMember};
use crate::commands::project::{ProjectError, find_requires_python};
#[derive(Debug, Copy, Clone)]
pub(crate) enum LockTarget<'lock> {
Workspace(&'lock Workspace),
Script(&'lock Pep723Script),
}
impl<'lock> From<&'lock Workspace> for LockTarget<'lock> {
fn from(workspace: &'lock Workspace) -> Self {
Self::Workspace(workspace)
}
}
impl<'lock> From<&'lock Pep723Script> for LockTarget<'lock> {
fn from(script: &'lock Pep723Script) -> Self {
LockTarget::Script(script)
}
}
impl<'lock> LockTarget<'lock> {
pub(crate) fn requirements(self) -> Vec<uv_pep508::Requirement<VerbatimParsedUrl>> {
match self {
Self::Workspace(workspace) => workspace.requirements(),
Self::Script(script) => script.metadata.dependencies.clone().unwrap_or_default(),
}
}
pub(crate) fn overrides(self) -> Vec<uv_pep508::Requirement<VerbatimParsedUrl>> {
match self {
Self::Workspace(workspace) => workspace.overrides(),
Self::Script(script) => script
.metadata
.tool
.as_ref()
.and_then(|tool| tool.uv.as_ref())
.and_then(|uv| uv.override_dependencies.as_ref())
.into_iter()
.flatten()
.cloned()
.collect(),
}
}
pub(crate) fn exclude_dependencies(self) -> Vec<uv_normalize::PackageName> {
match self {
Self::Workspace(workspace) => workspace.exclude_dependencies(),
Self::Script(script) => script
.metadata
.tool
.as_ref()
.and_then(|tool| tool.uv.as_ref())
.and_then(|uv| uv.exclude_dependencies.as_ref())
.into_iter()
.flatten()
.cloned()
.collect(),
}
}
pub(crate) fn constraints(self) -> Vec<uv_pep508::Requirement<VerbatimParsedUrl>> {
match self {
Self::Workspace(workspace) => workspace.constraints(),
Self::Script(script) => script
.metadata
.tool
.as_ref()
.and_then(|tool| tool.uv.as_ref())
.and_then(|uv| uv.constraint_dependencies.as_ref())
.into_iter()
.flatten()
.cloned()
.collect(),
}
}
pub(crate) fn build_constraints(self) -> Vec<uv_pep508::Requirement<VerbatimParsedUrl>> {
match self {
Self::Workspace(workspace) => workspace.build_constraints(),
Self::Script(script) => script
.metadata
.tool
.as_ref()
.and_then(|tool| tool.uv.as_ref())
.and_then(|uv| uv.build_constraint_dependencies.as_ref())
.into_iter()
.flatten()
.cloned()
.collect(),
}
}
pub(crate) fn dependency_groups(
self,
) -> Result<BTreeMap<GroupName, FlatDependencyGroup>, DependencyGroupError> {
match self {
Self::Workspace(workspace) => workspace.workspace_dependency_groups(),
Self::Script(_) => Ok(BTreeMap::new()),
}
}
pub(crate) fn members_requirements(self) -> impl Iterator<Item = Requirement> + 'lock {
match self {
Self::Workspace(workspace) => Either::Left(workspace.members_requirements()),
Self::Script(_) => Either::Right(std::iter::empty()),
}
}
pub(crate) fn group_requirements(self) -> impl Iterator<Item = Requirement> + 'lock {
match self {
Self::Workspace(workspace) => Either::Left(workspace.group_requirements()),
Self::Script(_) => Either::Right(std::iter::empty()),
}
}
pub(crate) fn members(self) -> Vec<PackageName> {
match self {
Self::Workspace(workspace) => {
let mut members = workspace.packages().keys().cloned().collect::<Vec<_>>();
members.sort();
if members.len() == 1 && !workspace.is_non_project() {
members.clear();
}
members
}
Self::Script(_) => Vec::new(),
}
}
pub(crate) fn packages(self) -> &'lock BTreeMap<PackageName, WorkspaceMember> {
match self {
Self::Workspace(workspace) => workspace.packages(),
Self::Script(_) => {
static EMPTY: BTreeMap<PackageName, WorkspaceMember> = BTreeMap::new();
&EMPTY
}
}
}
pub(crate) fn required_members(self) -> &'lock BTreeMap<PackageName, Editability> {
match self {
Self::Workspace(workspace) => workspace.required_members(),
Self::Script(_) => {
static EMPTY: BTreeMap<PackageName, Editability> = BTreeMap::new();
&EMPTY
}
}
}
pub(crate) fn environments(self) -> Option<&'lock SupportedEnvironments> {
match self {
Self::Workspace(workspace) => workspace.environments(),
Self::Script(_) => {
None
}
}
}
pub(crate) fn required_environments(self) -> Option<&'lock SupportedEnvironments> {
match self {
Self::Workspace(workspace) => workspace.required_environments(),
Self::Script(_) => {
None
}
}
}
pub(crate) fn conflicts(self) -> Result<Conflicts, ProjectError> {
match self {
Self::Workspace(workspace) => Ok(workspace.conflicts()?),
Self::Script(_) => Ok(Conflicts::empty()),
}
}
pub(crate) fn indexes(self) -> impl Iterator<Item = &'lock Index> {
match self {
Self::Workspace(workspace) => Either::Left(workspace.indexes().iter().chain(
workspace.packages().values().flat_map(|member| {
member
.pyproject_toml()
.tool
.as_ref()
.and_then(|tool| tool.uv.as_ref())
.and_then(|uv| uv.index.as_ref())
.into_iter()
.flatten()
}),
)),
Self::Script(script) => Either::Right(
script
.metadata
.tool
.as_ref()
.and_then(|tool| tool.uv.as_ref())
.and_then(|uv| uv.top_level.index.as_deref())
.into_iter()
.flatten(),
),
}
}
pub(crate) fn requires_python(self) -> Result<Option<RequiresPython>, ProjectError> {
match self {
Self::Workspace(workspace) => {
let groups = DependencyGroupsWithDefaults::none();
find_requires_python(workspace, &groups)
}
Self::Script(script) => Ok(script
.metadata
.requires_python
.as_ref()
.map(RequiresPython::from_specifiers)),
}
}
pub(crate) fn install_path(self) -> &'lock Path {
match self {
Self::Workspace(workspace) => workspace.install_path(),
Self::Script(script) => script.path.parent().unwrap(),
}
}
pub(crate) fn lock_filename(self) -> PathBuf {
PathBuf::from(self.lock_path().file_name().unwrap())
}
pub(crate) fn lock_path(self) -> PathBuf {
match self {
Self::Workspace(workspace) => workspace.install_path().join("uv.lock"),
Self::Script(script) => {
let mut file_name = match script.path.file_name() {
Some(f) => f.to_os_string(),
None => panic!("Script path has no file name"),
};
file_name.push(".lock");
script.path.with_file_name(file_name)
}
}
}
pub(crate) async fn read(self) -> Result<Option<Lock>, ProjectError> {
let lock_path = self.lock_path();
match fs_err::tokio::read_to_string(&lock_path).await {
Ok(encoded) => {
let result = info_span!("toml::from_str lock", path = %lock_path.display())
.in_scope(|| toml::from_str::<Lock>(&encoded));
match result {
Ok(lock) => {
if lock.version() != VERSION {
return Err(ProjectError::UnsupportedLockVersion(
VERSION,
lock.version(),
));
}
Ok(Some(lock))
}
Err(err) => {
if let Ok(lock) = toml::from_str::<LockVersion>(&encoded) {
if lock.version() != VERSION {
return Err(ProjectError::UnparsableLockVersion(
VERSION,
lock.version(),
err,
));
}
}
Err(ProjectError::UvLockParse(err))
}
}
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(err) => Err(err.into()),
}
}
pub(crate) async fn read_bytes(self) -> Result<Option<Vec<u8>>, std::io::Error> {
match fs_err::tokio::read(self.lock_path()).await {
Ok(encoded) => Ok(Some(encoded)),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(err) => Err(err),
}
}
pub(crate) async fn commit(self, lock: &Lock) -> Result<(), ProjectError> {
let encoded = lock.to_toml()?;
fs_err::tokio::write(self.lock_path(), encoded).await?;
Ok(())
}
pub(crate) fn lower(
self,
requirements: Vec<uv_pep508::Requirement<VerbatimParsedUrl>>,
locations: &IndexLocations,
sources: &NoSources,
credentials_cache: &CredentialsCache,
) -> Result<Vec<Requirement>, uv_distribution::MetadataError> {
match self {
Self::Workspace(workspace) => {
let name = workspace
.pyproject_toml()
.project
.as_ref()
.map(|project| project.name.clone());
let metadata = uv_distribution::BuildRequires::from_workspace(
uv_pypi_types::BuildRequires {
name,
requires_dist: requirements,
},
workspace,
locations,
sources,
credentials_cache,
)?;
Ok(metadata
.requires_dist
.into_iter()
.map(|requirement| requirement.with_origin(RequirementOrigin::Workspace))
.collect::<Vec<_>>())
}
Self::Script(script) => {
let empty = Vec::default();
let indexes = script
.metadata
.tool
.as_ref()
.and_then(|tool| tool.uv.as_ref())
.and_then(|uv| uv.top_level.index.as_deref())
.unwrap_or(&empty);
let empty = BTreeMap::default();
let sources_map = script
.metadata
.tool
.as_ref()
.and_then(|tool| tool.uv.as_ref())
.and_then(|uv| uv.sources.as_ref())
.unwrap_or(&empty);
Ok(requirements
.into_iter()
.flat_map(|requirement| {
if sources.for_package(&requirement.name) {
vec![Ok(Requirement::from(requirement))].into_iter()
} else {
let requirement_name = requirement.name.clone();
LoweredRequirement::from_non_workspace_requirement(
requirement,
script.path.parent().unwrap(),
sources_map,
indexes,
locations,
credentials_cache,
)
.map(move |requirement| match requirement {
Ok(requirement) => Ok(requirement.into_inner()),
Err(err) => Err(uv_distribution::MetadataError::LoweringError(
requirement_name.clone(),
Box::new(err),
)),
})
.collect::<Vec<_>>()
.into_iter()
}
})
.collect::<Result<_, _>>()?)
}
}
}
}