use crate::backend::backend_type::BackendType;
use crate::backend::platform_target::PlatformTarget;
use crate::backend::{Backend, VersionInfo};
use crate::cache::{CacheManager, CacheManagerBuilder};
use crate::cli::args::BackendArg;
use crate::cmd::CmdLineRunner;
use crate::config::{Config, Settings};
use crate::env;
use crate::file;
use crate::github;
use crate::http::HTTP_FETCH;
use crate::install_context::InstallContext;
use crate::timeout;
use crate::toolset::{ToolRequest, ToolVersion, ToolVersionOptions, Toolset, ToolsetBuilder};
use crate::ui::multi_progress_report::MultiProgressReport;
use crate::ui::progress_report::SingleReport;
use async_trait::async_trait;
use eyre::{Result, eyre};
use indexmap::IndexMap;
use itertools::Itertools;
use regex::Regex;
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::{fmt::Debug, sync::Arc};
use versions::Versioning;
use xx::regex;
#[derive(Debug)]
pub struct PIPXBackend {
ba: Arc<BackendArg>,
latest_version_cache: CacheManager<Option<String>>,
}
#[async_trait]
impl Backend for PIPXBackend {
fn get_type(&self) -> BackendType {
BackendType::Pipx
}
fn ba(&self) -> &Arc<BackendArg> {
&self.ba
}
fn get_dependencies(&self) -> eyre::Result<Vec<&str>> {
Ok(vec!["pipx"])
}
fn get_optional_dependencies(&self) -> eyre::Result<Vec<&str>> {
Ok(vec!["uv"])
}
fn supports_lockfile_url(&self) -> bool {
false
}
async fn _list_remote_versions(&self, _config: &Arc<Config>) -> eyre::Result<Vec<VersionInfo>> {
match self.tool_name().parse()? {
PipxRequest::Pypi(package) => {
let registry_url = Self::get_registry_url()?;
if registry_url.contains("/json") {
debug!("Fetching JSON for {}", package);
let url = registry_url.replace("{}", &package);
let data: PypiPackage = HTTP_FETCH.json(url).await?;
let versions = data
.releases
.into_iter()
.sorted_by_cached_key(|(v, _)| Versioning::new(v))
.map(|(version, files)| {
let created_at = files
.iter()
.filter_map(|f| f.upload_time.as_ref())
.min()
.cloned();
VersionInfo {
version,
created_at,
..Default::default()
}
})
.collect();
Ok(versions)
} else {
debug!("Fetching HTML for {}", package);
let url = registry_url.replace("{}", &package);
let html = HTTP_FETCH.get_html(url).await?;
let version_re = regex!(
r#"href=["'][^"']*/([^/]+)\.tar\.gz(?:#(md5|sha1|sha224|sha256|sha384|sha512)=[0-9A-Fa-f]+)?["']"#
);
let versions: Vec<VersionInfo> = version_re
.captures_iter(&html)
.filter_map(|cap| {
let filename = cap.get(1)?.as_str();
let escaped_package = regex::escape(&package);
let re_str = escaped_package.replace(r"\-", r"[\-_.]");
let re_str = format!("^{re_str}-(.+)$");
let pkg_re = regex::Regex::new(&re_str).ok()?;
let pkg_version = pkg_re.captures(filename)?.get(1)?.as_str();
Some(VersionInfo {
version: pkg_version.to_string(),
..Default::default()
})
})
.sorted_by_cached_key(|v| Versioning::new(&v.version))
.collect();
Ok(versions)
}
}
PipxRequest::Git(url) if url.starts_with("https://github.com/") => {
let repo = url.strip_prefix("https://github.com/").unwrap();
let data = github::list_releases(repo).await?;
Ok(data
.into_iter()
.rev()
.map(|r| VersionInfo {
version: r.tag_name,
created_at: Some(r.created_at),
..Default::default()
})
.collect())
}
PipxRequest::Git { .. } => Ok(vec![VersionInfo {
version: "latest".to_string(),
..Default::default()
}]),
}
}
async fn latest_stable_version(&self, config: &Arc<Config>) -> eyre::Result<Option<String>> {
let this = self;
timeout::run_with_timeout_async(
async || {
this.latest_version_cache
.get_or_try_init_async(async || match this.tool_name().parse()? {
PipxRequest::Pypi(package) => {
let registry_url = Self::get_registry_url()?;
if registry_url.contains("/json") {
debug!("Fetching JSON for {}", package);
let url = registry_url.replace("{}", &package);
let pkg: PypiPackage = HTTP_FETCH.json(url).await?;
Ok(Some(pkg.info.version))
} else {
debug!("Fetching HTML for {}", package);
let url = registry_url.replace("{}", &package);
let html = HTTP_FETCH.get_html(url).await?;
let version_re = regex!(r#"href=["'][^"']*/([^/]+)\.tar\.gz(?:#(md5|sha1|sha224|sha256|sha384|sha512)=[0-9A-Fa-f]+)?["']"#);
let version = version_re
.captures_iter(&html)
.filter_map(|cap| {
let filename = cap.get(1)?.as_str();
let escaped_package = regex::escape(&package);
let re_str = escaped_package.replace(r"\-", r"[\-_.]");
let re_str = format!("^{re_str}-(.+)$");
let pkg_re = regex::Regex::new(&re_str).ok()?;
let pkg_version =
pkg_re.captures(filename)?.get(1)?.as_str();
Some(pkg_version.to_string())
})
.filter(|v| {
!v.contains("dev")
&& !v.contains("a")
&& !v.contains("b")
&& !v.contains("rc")
})
.sorted_by_cached_key(|v| Versioning::new(v))
.next_back();
Ok(version)
}
}
_ => this.latest_version(config, Some("latest".into())).await,
})
.await
},
Settings::get().fetch_remote_versions_timeout(),
)
.await
.cloned()
}
async fn install_version_(&self, ctx: &InstallContext, tv: ToolVersion) -> Result<ToolVersion> {
let use_uvx = self.uv_is_installed(&ctx.config).await
&& Settings::get().pipx.uvx != Some(false)
&& tv.request.options().get("uvx") != Some("false");
if !use_uvx {
self.warn_if_dependency_missing(
&ctx.config,
"pipx",
&["pipx"],
"To use pipx packages with mise, you need to install pipx first:\n\
mise use pipx@latest\n\n\
Alternatively, you can use uv/uvx by installing uv:\n\
mise use uv@latest",
)
.await;
}
let pipx_request = self
.tool_name()
.parse::<PipxRequest>()?
.pipx_request(&tv.version, &tv.request.options());
if use_uvx {
ctx.pr
.set_message(format!("uv tool install {pipx_request}"));
let mut cmd = Self::uvx_cmd(
&ctx.config,
&["tool", "install", &pipx_request],
self,
&tv,
&ctx.ts,
ctx.pr.as_ref(),
)
.await?;
if let Some(args) = tv.request.options().get("uvx_args") {
cmd = cmd.args(shell_words::split(args)?);
}
cmd.execute()?;
} else {
ctx.pr.set_message(format!("pipx install {pipx_request}"));
let mut cmd = Self::pipx_cmd(
&ctx.config,
&["install", &pipx_request],
self,
&tv,
&ctx.ts,
ctx.pr.as_ref(),
)
.await?;
if let Some(args) = tv.request.options().get("pipx_args") {
cmd = cmd.args(shell_words::split(args)?);
}
cmd.execute()?;
}
let pkg_name = self.tool_name();
fix_venv_python_symlink(&tv.install_path(), &pkg_name)?;
Ok(tv)
}
fn resolve_lockfile_options(
&self,
request: &ToolRequest,
_target: &PlatformTarget,
) -> BTreeMap<String, String> {
let opts = request.options();
let mut result = BTreeMap::new();
for key in ["extras", "pipx_args", "uvx_args", "uvx"] {
if let Some(value) = opts.get(key) {
result.insert(key.to_string(), value.to_string());
}
}
result
}
}
pub fn install_time_option_keys() -> Vec<String> {
vec![
"extras".into(),
"pipx_args".into(),
"uvx_args".into(),
"uvx".into(),
]
}
impl PIPXBackend {
pub fn from_arg(ba: BackendArg) -> Self {
Self {
latest_version_cache: CacheManagerBuilder::new(
ba.cache_path.join("latest_version.msgpack.z"),
)
.with_fresh_duration(Settings::get().fetch_remote_versions_cache())
.build(),
ba: Arc::new(ba),
}
}
fn get_index_url() -> eyre::Result<String> {
let registry_url = Settings::get().pipx.registry_url.clone();
let mut url = registry_url
.replace("{}", "")
.trim_end_matches('/')
.to_string();
if url.contains("pypi.org") {
if url.contains("/pypi/") {
let re = Regex::new(r"/pypi/[^/]*/(?:json|simple)$").unwrap();
url = re.replace(&url, "/simple").to_string();
} else if !url.ends_with("/simple") {
let base_url = url.split("/simple").next().unwrap_or(&url);
url = format!("{}/simple", base_url.trim_end_matches('/'));
}
} else {
if url.ends_with("/json") {
url = url.replace("/json", "/simple");
} else if !url.ends_with("/simple") {
url = format!("{url}/simple");
}
}
debug!("Converted registry URL to index URL: {}", url);
Ok(url)
}
fn get_registry_url() -> eyre::Result<String> {
let registry_url = Settings::get().pipx.registry_url.clone();
debug!("Pipx registry URL: {}", registry_url);
let re = Regex::new(r"^(http|https)://.*\{\}.*$").unwrap();
if !re.is_match(®istry_url) {
return Err(eyre!(
"Registry URL must be a valid URL and contain a {{}} placeholder"
));
}
Ok(registry_url)
}
pub async fn reinstall_all(config: &Arc<Config>) -> Result<()> {
let ts = ToolsetBuilder::new().build(config).await?;
let pipx_tools = ts
.list_installed_versions(config)
.await?
.into_iter()
.filter(|(b, _tv)| b.ba().backend_type() == BackendType::Pipx)
.collect_vec();
if Settings::get().pipx.uvx != Some(false) {
let pr = MultiProgressReport::get().add("reinstalling pipx tools with uvx");
for (b, tv) in pipx_tools {
for (cmd, tool) in &[
("uninstall", tv.ba().tool_name.to_string()),
("install", format!("{}=={}", tv.ba().tool_name, tv.version)),
] {
let args = &["tool", cmd, tool];
Self::uvx_cmd(config, args, &*b, &tv, &ts, pr.as_ref())
.await?
.execute()?;
}
}
} else {
let pr = MultiProgressReport::get().add("reinstalling pipx tools");
for (b, tv) in pipx_tools {
let args = &["reinstall", &tv.ba().tool_name];
Self::pipx_cmd(config, args, &*b, &tv, &ts, pr.as_ref())
.await?
.execute()?;
}
}
Ok(())
}
async fn uvx_cmd<'a>(
config: &Arc<Config>,
args: &[&str],
b: &dyn Backend,
tv: &ToolVersion,
ts: &Toolset,
pr: &'a dyn SingleReport,
) -> Result<CmdLineRunner<'a>> {
let mut cmd = CmdLineRunner::new("uv");
for arg in args {
cmd = cmd.arg(arg);
}
cmd.with_pr(pr)
.env("UV_TOOL_DIR", tv.install_path())
.env("UV_TOOL_BIN_DIR", tv.install_path().join("bin"))
.env("UV_INDEX", Self::get_index_url()?)
.envs(ts.env_with_path_without_tools(config).await?)
.prepend_path(ts.list_paths(config).await)?
.prepend_path(vec![tv.install_path().join("bin")])?
.prepend_path(b.dependency_toolset(config).await?.list_paths(config).await)
}
async fn pipx_cmd<'a>(
config: &Arc<Config>,
args: &[&str],
b: &dyn Backend,
tv: &ToolVersion,
ts: &Toolset,
pr: &'a dyn SingleReport,
) -> Result<CmdLineRunner<'a>> {
let mut cmd = CmdLineRunner::new("pipx");
for arg in args {
cmd = cmd.arg(arg);
}
cmd.with_pr(pr)
.env("PIPX_HOME", tv.install_path())
.env("PIPX_BIN_DIR", tv.install_path().join("bin"))
.env("PIP_INDEX_URL", Self::get_index_url()?)
.envs(ts.env_with_path_without_tools(config).await?)
.prepend_path(ts.list_paths(config).await)?
.prepend_path(vec![tv.install_path().join("bin")])?
.prepend_path(b.dependency_toolset(config).await?.list_paths(config).await)
}
async fn uv_is_installed(&self, config: &Arc<Config>) -> bool {
self.dependency_which(config, "uv").await.is_some()
}
}
enum PipxRequest {
Git(String),
Pypi(String),
}
impl PipxRequest {
fn extras_from_opts(&self, opts: &ToolVersionOptions) -> String {
match opts.get("extras") {
Some(extras) => format!("[{extras}]"),
None => String::new(),
}
}
fn pipx_request(&self, v: &str, opts: &ToolVersionOptions) -> String {
let extras = self.extras_from_opts(opts);
if v == "latest" {
match self {
PipxRequest::Git(url) => format!("git+{url}.git"),
PipxRequest::Pypi(package) => format!("{package}{extras}"),
}
} else {
match self {
PipxRequest::Git(url) => format!("git+{url}.git@{v}"),
PipxRequest::Pypi(package) => format!("{package}{extras}=={v}"),
}
}
}
}
#[derive(serde::Deserialize)]
struct PypiPackage {
releases: IndexMap<String, Vec<PypiRelease>>,
info: PypiInfo,
}
#[derive(serde::Deserialize)]
struct PypiInfo {
version: String,
}
#[derive(serde::Deserialize)]
struct PypiRelease {
upload_time: Option<String>,
}
impl FromStr for PipxRequest {
type Err = eyre::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if let Some(cap) = regex!(r"(git\+)(.*)(\.git)").captures(s) {
Ok(PipxRequest::Git(cap.get(2).unwrap().as_str().to_string()))
} else if s.contains('/') {
Ok(PipxRequest::Git(format!("https://github.com/{s}")))
} else {
Ok(PipxRequest::Pypi(s.to_string()))
}
}
}
#[cfg(unix)]
fn is_mise_managed_python(path: &Path) -> bool {
let installs_dir = &*env::MISE_INSTALLS_DIR;
path.starts_with(installs_dir.join("python"))
}
#[cfg(unix)]
fn path_with_minor_version(path: &Path) -> Option<PathBuf> {
let path_str = path.to_str()?;
let re = regex!(r"/python/(\d+)\.(\d+)\.\d+/");
if re.is_match(path_str) {
let result = re.replace(path_str, "/python/$1.$2/");
Some(PathBuf::from(result.to_string()))
} else {
None
}
}
#[cfg(unix)]
fn ensure_minor_version_symlink(full_version_path: &Path) -> Result<()> {
let re = regex!(r"/python/(\d+)\.(\d+)\.(\d+)/");
let path_str = match full_version_path.to_str() {
Some(s) => s,
None => return Ok(()),
};
let caps = match re.captures(path_str) {
Some(c) => c,
None => return Ok(()),
};
let minor_version = format!("{}.{}", &caps[1], &caps[2]); let full_version = format!("{}.{}.{}", &caps[1], &caps[2], &caps[3]);
let installs_dir = &*env::MISE_INSTALLS_DIR;
let python_installs = installs_dir.join("python");
let minor_version_dir = python_installs.join(&minor_version);
let full_version_dir = python_installs.join(&full_version);
if !minor_version_dir.exists() && full_version_dir.exists() {
trace!(
"Creating early minor version symlink: {:?} -> ./{:?}",
minor_version_dir, full_version
);
file::make_symlink(&PathBuf::from(".").join(&full_version), &minor_version_dir)?;
}
Ok(())
}
#[cfg(unix)]
fn fix_venv_python_symlink(install_path: &Path, pkg_name: &str) -> Result<()> {
let actual_pkg_name = pkg_name.rsplit('/').next().unwrap_or(pkg_name);
let venv_dirs = [
install_path.join(actual_pkg_name),
install_path.join("venvs").join(actual_pkg_name),
];
trace!(
"fix_venv_python_symlink: checking venv dirs: {:?}",
venv_dirs
);
for venv_dir in &venv_dirs {
let bin_dir = venv_dir.join("bin");
if !bin_dir.exists() {
continue;
}
for name in &["python", "python3"] {
let symlink_path = bin_dir.join(name);
if !symlink_path.is_symlink() {
continue;
}
let target = match file::resolve_symlink(&symlink_path)? {
Some(t) => t,
None => continue,
};
if !target.is_absolute() {
continue;
}
if !is_mise_managed_python(&target) {
continue; }
if let Some(minor_path) = path_with_minor_version(&target)
&& target.exists()
{
ensure_minor_version_symlink(&target)?;
trace!(
"Updating venv Python symlink {:?} to use minor version: {:?}",
symlink_path, minor_path
);
file::make_symlink(&minor_path, &symlink_path)?;
}
}
}
Ok(())
}
#[cfg(not(unix))]
fn fix_venv_python_symlink(_install_path: &Path, _pkg_name: &str) -> Result<()> {
Ok(())
}