use std::io::{self, Write};
use std::path::{Path, PathBuf};
use std::str::FromStr;
use fs_err as fs;
use fs_err::File;
use owo_colors::OwoColorize;
use thiserror::Error;
use tracing::{debug, warn};
use uv_cache::Cache;
use uv_dirs::user_executable_directory;
use uv_fs::{LockedFile, LockedFileError, LockedFileMode, Simplified};
use uv_install_wheel::read_record_file;
use uv_installer::SitePackages;
use uv_normalize::{InvalidNameError, PackageName};
use uv_pep440::Version;
use uv_python::{BrokenLink, Interpreter, PythonEnvironment};
use uv_state::{StateBucket, StateStore};
use uv_static::EnvVars;
use uv_virtualenv::remove_virtualenv;
pub use receipt::ToolReceipt;
pub use tool::{Tool, ToolEntrypoint};
mod receipt;
mod tool;
#[derive(Debug, Clone)]
pub struct ToolEnvironment {
environment: PythonEnvironment,
name: PackageName,
}
impl ToolEnvironment {
pub fn new(environment: PythonEnvironment, name: PackageName) -> Self {
Self { environment, name }
}
pub fn version(&self) -> Result<Version, Error> {
let site_packages = SitePackages::from_environment(&self.environment).map_err(|err| {
Error::EnvironmentRead(self.environment.root().to_path_buf(), err.to_string())
})?;
let packages = site_packages.get_packages(&self.name);
let package = packages
.first()
.ok_or_else(|| Error::MissingToolPackage(self.name.clone()))?;
Ok(package.version().clone())
}
pub fn into_environment(self) -> PythonEnvironment {
self.environment
}
pub fn environment(&self) -> &PythonEnvironment {
&self.environment
}
}
#[derive(Error, Debug)]
pub enum Error {
#[error(transparent)]
Io(#[from] io::Error),
#[error(transparent)]
LockedFile(#[from] LockedFileError),
#[error("Failed to update `uv-receipt.toml` at {0}")]
ReceiptWrite(PathBuf, #[source] Box<toml_edit::ser::Error>),
#[error("Failed to read `uv-receipt.toml` at {0}")]
ReceiptRead(PathBuf, #[source] Box<toml::de::Error>),
#[error(transparent)]
VirtualEnvError(#[from] uv_virtualenv::Error),
#[error("Failed to read package entry points {0}")]
EntrypointRead(#[from] uv_install_wheel::Error),
#[error("Failed to find a directory to install executables into")]
NoExecutableDirectory,
#[error(transparent)]
ToolName(#[from] InvalidNameError),
#[error(transparent)]
EnvironmentError(#[from] uv_python::Error),
#[error("Failed to find a receipt for tool `{0}` at {1}")]
MissingToolReceipt(String, PathBuf),
#[error("Failed to read tool environment packages at `{0}`: {1}")]
EnvironmentRead(PathBuf, String),
#[error("Failed find package `{0}` in tool environment")]
MissingToolPackage(PackageName),
#[error("Tool `{0}` environment not found at `{1}`")]
ToolEnvironmentNotFound(PackageName, PathBuf),
}
impl Error {
pub fn as_io_error(&self) -> Option<&io::Error> {
match self {
Self::Io(err) => Some(err),
Self::LockedFile(err) => err.as_io_error(),
Self::VirtualEnvError(uv_virtualenv::Error::Io(err)) => Some(err),
Self::ReceiptWrite(_, _)
| Self::ReceiptRead(_, _)
| Self::VirtualEnvError(_)
| Self::EntrypointRead(_)
| Self::NoExecutableDirectory
| Self::ToolName(_)
| Self::EnvironmentError(_)
| Self::MissingToolReceipt(_, _)
| Self::EnvironmentRead(_, _)
| Self::MissingToolPackage(_)
| Self::ToolEnvironmentNotFound(_, _) => None,
}
}
}
#[derive(Debug, Clone)]
pub struct InstalledTools {
root: PathBuf,
}
impl InstalledTools {
fn from_path(root: impl Into<PathBuf>) -> Self {
Self { root: root.into() }
}
pub fn from_settings() -> Result<Self, Error> {
if let Some(tool_dir) = std::env::var_os(EnvVars::UV_TOOL_DIR).filter(|s| !s.is_empty()) {
Ok(Self::from_path(std::path::absolute(tool_dir)?))
} else {
Ok(Self::from_path(
StateStore::from_settings(None)?.bucket(StateBucket::Tools),
))
}
}
pub fn tool_dir(&self, name: &PackageName) -> PathBuf {
self.root.join(name.to_string())
}
#[expect(clippy::type_complexity)]
pub fn tools(&self) -> Result<Vec<(PackageName, Result<Tool, Error>)>, Error> {
let mut tools = Vec::new();
for directory in uv_fs::directories(self.root())? {
let Some(name) = directory
.file_name()
.and_then(|file_name| file_name.to_str())
else {
continue;
};
let name = PackageName::from_str(name)?;
let path = directory.join("uv-receipt.toml");
let contents = match fs_err::read_to_string(&path) {
Ok(contents) => contents,
Err(err) if err.kind() == io::ErrorKind::NotFound => {
let err = Error::MissingToolReceipt(name.to_string(), path);
tools.push((name, Err(err)));
continue;
}
Err(err) => return Err(err.into()),
};
match ToolReceipt::from_string(contents) {
Ok(tool_receipt) => tools.push((name, Ok(tool_receipt.tool))),
Err(err) => {
let err = Error::ReceiptRead(path, Box::new(err));
tools.push((name, Err(err)));
}
}
}
Ok(tools)
}
pub fn get_tool_receipt(&self, name: &PackageName) -> Result<Option<Tool>, Error> {
let path = self.tool_dir(name).join("uv-receipt.toml");
match ToolReceipt::from_path(&path) {
Ok(tool_receipt) => Ok(Some(tool_receipt.tool)),
Err(Error::Io(err)) if err.kind() == io::ErrorKind::NotFound => Ok(None),
Err(err) => Err(err),
}
}
pub async fn lock(&self) -> Result<LockedFile, Error> {
Ok(LockedFile::acquire(
self.root.join(".lock"),
LockedFileMode::Exclusive,
self.root.user_display(),
)
.await?)
}
pub fn add_tool_receipt(&self, name: &PackageName, tool: Tool) -> Result<(), Error> {
let tool_receipt = ToolReceipt::from(tool);
let path = self.tool_dir(name).join("uv-receipt.toml");
debug!(
"Adding metadata entry for tool `{name}` at {}",
path.user_display()
);
let doc = tool_receipt
.to_toml()
.map_err(|err| Error::ReceiptWrite(path.clone(), Box::new(err)))?;
fs_err::write(&path, doc)?;
Ok(())
}
pub fn remove_environment(&self, name: &PackageName) -> Result<(), Error> {
let environment_path = self.tool_dir(name);
debug!(
"Deleting environment for tool `{name}` at {}",
environment_path.user_display()
);
remove_virtualenv(environment_path.as_path())?;
Ok(())
}
pub fn get_environment(
&self,
name: &PackageName,
cache: &Cache,
) -> Result<Option<ToolEnvironment>, Error> {
let environment_path = self.tool_dir(name);
match PythonEnvironment::from_root(&environment_path, cache) {
Ok(venv) => {
debug!(
"Found existing environment for tool `{name}`: {}",
environment_path.user_display()
);
Ok(Some(ToolEnvironment::new(venv, name.clone())))
}
Err(uv_python::Error::MissingEnvironment(_)) => Ok(None),
Err(uv_python::Error::Query(uv_python::InterpreterError::NotFound(
interpreter_path,
))) => {
warn!(
"Ignoring existing virtual environment with missing Python interpreter: {}",
interpreter_path.user_display()
);
Ok(None)
}
Err(uv_python::Error::Query(uv_python::InterpreterError::BrokenLink(BrokenLink {
path,
unix,
venv: _,
}))) => {
if unix {
let target_path = fs_err::read_link(&path)?;
warn!(
"Ignoring existing virtual environment linked to non-existent Python interpreter: {} -> {}",
path.user_display().cyan(),
target_path.user_display().cyan(),
);
} else {
warn!(
"Ignoring existing virtual environment linked to non-existent Python interpreter: {}",
path.user_display().cyan(),
);
}
Ok(None)
}
Err(err) => Err(err.into()),
}
}
pub fn create_environment(
&self,
name: &PackageName,
interpreter: Interpreter,
) -> Result<PythonEnvironment, Error> {
let environment_path = self.tool_dir(name);
match remove_virtualenv(&environment_path) {
Ok(()) => {
debug!(
"Removed existing environment for tool `{name}`: {}",
environment_path.user_display()
);
}
Err(uv_virtualenv::Error::Io(err)) if err.kind() == io::ErrorKind::NotFound => (),
Err(err) => return Err(err.into()),
}
debug!(
"Creating environment for tool `{name}`: {}",
environment_path.user_display()
);
let venv = uv_virtualenv::create_venv(
&environment_path,
interpreter,
uv_virtualenv::Prompt::None,
false,
uv_virtualenv::OnExisting::Remove(uv_virtualenv::RemovalReason::ManagedEnvironment),
false,
false,
false,
)?;
Ok(venv)
}
pub fn temp() -> Result<Self, Error> {
Ok(Self::from_path(
StateStore::temp()?.bucket(StateBucket::Tools),
))
}
pub fn init(self) -> Result<Self, Error> {
let root = &self.root;
fs::create_dir_all(root)?;
match fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(root.join(".gitignore"))
{
Ok(mut file) => file.write_all(b"*")?,
Err(err) if err.kind() == io::ErrorKind::AlreadyExists => (),
Err(err) => return Err(err.into()),
}
Ok(self)
}
pub fn root(&self) -> &Path {
&self.root
}
}
#[derive(Debug, Clone)]
pub struct InstalledTool {
path: PathBuf,
}
impl InstalledTool {
pub fn new(path: PathBuf) -> Result<Self, Error> {
Ok(Self { path })
}
pub fn path(&self) -> &Path {
&self.path
}
}
impl std::fmt::Display for InstalledTool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
self.path
.file_name()
.unwrap_or(self.path.as_os_str())
.to_string_lossy()
)
}
}
pub fn tool_executable_dir() -> Result<PathBuf, Error> {
user_executable_directory(Some(EnvVars::UV_TOOL_BIN_DIR)).ok_or(Error::NoExecutableDirectory)
}
fn find_dist_info<'a>(
site_packages: &'a SitePackages,
package_name: &PackageName,
package_version: &Version,
) -> Result<&'a Path, Error> {
site_packages
.get_packages(package_name)
.iter()
.find(|package| package.version() == package_version)
.map(|dist| dist.install_path())
.ok_or_else(|| Error::MissingToolPackage(package_name.clone()))
}
pub fn entrypoint_paths(
site_packages: &SitePackages,
package_name: &PackageName,
package_version: &Version,
) -> Result<Vec<(String, PathBuf)>, Error> {
let dist_info_path = find_dist_info(site_packages, package_name, package_version)?;
debug!(
"Looking at `.dist-info` at: {}",
dist_info_path.user_display()
);
let record = read_record_file(&mut File::open(dist_info_path.join("RECORD"))?)?;
let layout = site_packages.interpreter().layout();
let script_relative = pathdiff::diff_paths(&layout.scheme.scripts, &layout.scheme.purelib)
.ok_or_else(|| {
io::Error::other(format!(
"Could not find relative path for: {}",
layout.scheme.scripts.simplified_display()
))
})?;
let mut entrypoints = vec![];
for entry in record {
let relative_path = PathBuf::from(&entry.path);
let Ok(path_in_scripts) = relative_path.strip_prefix(&script_relative) else {
continue;
};
let absolute_path = layout.scheme.scripts.join(path_in_scripts);
let script_name = relative_path
.file_name()
.and_then(|filename| filename.to_str())
.map(ToString::to_string)
.unwrap_or(entry.path);
entrypoints.push((script_name, absolute_path));
}
Ok(entrypoints)
}