use std::borrow::Cow;
use std::fmt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use owo_colors::OwoColorize;
use tracing::debug;
use uv_cache::Cache;
use uv_fs::{LockedFile, LockedFileError, Simplified};
use uv_pep440::Version;
use uv_preview::Preview;
use crate::discovery::find_python_installation;
use crate::installation::PythonInstallation;
use crate::virtualenv::{PyVenvConfiguration, virtualenv_python_executable};
use crate::{
EnvironmentPreference, Error, Interpreter, Prefix, PythonNotFound, PythonPreference,
PythonRequest, Target,
};
#[derive(Debug, Clone)]
pub struct PythonEnvironment(Arc<PythonEnvironmentShared>);
#[derive(Debug, Clone)]
struct PythonEnvironmentShared {
root: PathBuf,
interpreter: Interpreter,
}
#[derive(Clone, Debug, Error)]
pub struct EnvironmentNotFound {
request: PythonRequest,
preference: EnvironmentPreference,
}
#[derive(Clone, Debug, Error)]
pub struct InvalidEnvironment {
path: PathBuf,
pub kind: InvalidEnvironmentKind,
}
#[derive(Debug, Clone)]
pub enum InvalidEnvironmentKind {
NotDirectory,
Empty,
MissingExecutable(PathBuf),
}
impl From<PythonNotFound> for EnvironmentNotFound {
fn from(value: PythonNotFound) -> Self {
Self {
request: value.request,
preference: value.environment_preference,
}
}
}
impl fmt::Display for EnvironmentNotFound {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
#[derive(Debug, Copy, Clone)]
enum SearchType {
Virtual,
System,
VirtualOrSystem,
}
impl fmt::Display for SearchType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Virtual => write!(f, "virtual environment"),
Self::System => write!(f, "system Python installation"),
Self::VirtualOrSystem => {
write!(f, "virtual environment or system Python installation")
}
}
}
}
let search_type = match self.preference {
EnvironmentPreference::Any => SearchType::VirtualOrSystem,
EnvironmentPreference::ExplicitSystem => {
if self.request.is_explicit_system() {
SearchType::VirtualOrSystem
} else {
SearchType::Virtual
}
}
EnvironmentPreference::OnlySystem => SearchType::System,
EnvironmentPreference::OnlyVirtual => SearchType::Virtual,
};
if matches!(self.request, PythonRequest::Default | PythonRequest::Any) {
write!(f, "No {search_type} found")?;
} else {
write!(f, "No {search_type} found for {}", self.request)?;
}
match search_type {
SearchType::Virtual => write!(
f,
"; run `{}` to create an environment, or pass `{}` to install into a non-virtual environment",
"uv venv".green(),
"--system".green()
)?,
SearchType::VirtualOrSystem => {
write!(f, "; run `{}` to create an environment", "uv venv".green())?;
}
SearchType::System => {}
}
Ok(())
}
}
impl fmt::Display for InvalidEnvironment {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Invalid environment at `{}`: {}",
self.path.user_display(),
self.kind
)
}
}
impl fmt::Display for InvalidEnvironmentKind {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::NotDirectory => write!(f, "expected directory but found a file"),
Self::MissingExecutable(path) => {
write!(f, "missing Python executable at `{}`", path.user_display())
}
Self::Empty => write!(f, "directory is empty"),
}
}
}
impl PythonEnvironment {
pub fn find(
request: &PythonRequest,
preference: EnvironmentPreference,
python_preference: PythonPreference,
cache: &Cache,
preview: Preview,
) -> Result<Self, Error> {
let installation =
match find_python_installation(request, preference, python_preference, cache, preview)?
{
Ok(installation) => installation,
Err(err) => return Err(EnvironmentNotFound::from(err).into()),
};
Ok(Self::from_installation(installation))
}
pub fn from_root(root: impl AsRef<Path>, cache: &Cache) -> Result<Self, Error> {
debug!(
"Checking for Python environment at: `{}`",
root.as_ref().user_display()
);
match root.as_ref().try_exists() {
Ok(true) => {}
Ok(false) => {
return Err(Error::MissingEnvironment(EnvironmentNotFound {
preference: EnvironmentPreference::Any,
request: PythonRequest::Directory(root.as_ref().to_owned()),
}));
}
Err(err) => return Err(Error::Discovery(err.into())),
}
if root.as_ref().is_file() {
return Err(InvalidEnvironment {
path: root.as_ref().to_path_buf(),
kind: InvalidEnvironmentKind::NotDirectory,
}
.into());
}
if root
.as_ref()
.read_dir()
.is_ok_and(|mut dir| dir.next().is_none())
{
return Err(InvalidEnvironment {
path: root.as_ref().to_path_buf(),
kind: InvalidEnvironmentKind::Empty,
}
.into());
}
let executable = virtualenv_python_executable(&root);
if !(executable.is_symlink() || executable.is_file()) {
return Err(InvalidEnvironment {
path: root.as_ref().to_path_buf(),
kind: InvalidEnvironmentKind::MissingExecutable(executable.clone()),
}
.into());
}
let interpreter = Interpreter::query(executable, cache)?;
Ok(Self(Arc::new(PythonEnvironmentShared {
root: interpreter.sys_prefix().to_path_buf(),
interpreter,
})))
}
pub fn from_installation(installation: PythonInstallation) -> Self {
Self::from_interpreter(installation.into_interpreter())
}
pub fn from_interpreter(interpreter: Interpreter) -> Self {
Self(Arc::new(PythonEnvironmentShared {
root: interpreter.sys_prefix().to_path_buf(),
interpreter,
}))
}
pub fn with_target(self, target: Target) -> std::io::Result<Self> {
let inner = Arc::unwrap_or_clone(self.0);
Ok(Self(Arc::new(PythonEnvironmentShared {
interpreter: inner.interpreter.with_target(target)?,
..inner
})))
}
pub fn with_prefix(self, prefix: Prefix) -> std::io::Result<Self> {
let inner = Arc::unwrap_or_clone(self.0);
Ok(Self(Arc::new(PythonEnvironmentShared {
interpreter: inner.interpreter.with_prefix(prefix)?,
..inner
})))
}
pub fn root(&self) -> &Path {
&self.0.root
}
pub fn interpreter(&self) -> &Interpreter {
&self.0.interpreter
}
pub fn cfg(&self) -> Result<PyVenvConfiguration, Error> {
Ok(PyVenvConfiguration::parse(self.0.root.join("pyvenv.cfg"))?)
}
pub fn set_pyvenv_cfg(&self, key: &str, value: &str) -> Result<(), Error> {
let content = fs_err::read_to_string(self.0.root.join("pyvenv.cfg"))?;
fs_err::write(
self.0.root.join("pyvenv.cfg"),
PyVenvConfiguration::set(&content, key, value),
)?;
Ok(())
}
pub fn relocatable(&self) -> bool {
self.cfg().is_ok_and(|cfg| cfg.is_relocatable())
}
pub fn python_executable(&self) -> &Path {
self.0.interpreter.sys_executable()
}
pub fn site_packages(&self) -> impl Iterator<Item = Cow<'_, Path>> {
self.0.interpreter.site_packages()
}
pub fn scripts(&self) -> &Path {
self.0.interpreter.scripts()
}
pub async fn lock(&self) -> Result<LockedFile, LockedFileError> {
self.0.interpreter.lock().await
}
pub fn into_interpreter(self) -> Interpreter {
Arc::unwrap_or_clone(self.0).interpreter
}
pub fn uses(&self, interpreter: &Interpreter) -> bool {
if cfg!(windows) {
let old_base_prefix = self.interpreter().sys_base_prefix();
let selected_base_prefix = interpreter.sys_base_prefix();
old_base_prefix == selected_base_prefix
} else {
self.interpreter().sys_executable() == interpreter.sys_executable()
|| same_file::is_same_file(
self.interpreter().sys_executable(),
interpreter.sys_executable(),
)
.unwrap_or(false)
}
}
pub fn get_pyvenv_version_conflict(&self) -> Option<(Version, Version)> {
let cfg = self.cfg().ok()?;
let cfg_version = cfg.version?.into_version();
let exe_version = if cfg_version.release().get(2).is_none() {
self.interpreter().python_minor_version()
} else if cfg_version.pre().is_none() {
self.interpreter().python_patch_version()
} else {
self.interpreter().python_version().clone()
};
(cfg_version != exe_version).then_some((cfg_version, exe_version))
}
}