use crate::integrations::uv::{UvIntegration, UvVirtualEnv};
use pyo3::prelude::*;
use pyo3::types::PyType;
use std::path::PathBuf;
#[pyclass(name = "VirtualEnv")]
pub struct VirtualEnv {
pub path: PathBuf,
#[pyo3(get)]
pub name: String,
pub python_executable: PathBuf,
pub python_version: Option<String>,
pub requirements: Option<Py<PyAny>>,
#[pyo3(get)]
pub _is_activated: bool,
pub _original_prefix: Option<String>,
pub _original_exec_prefix: Option<String>,
pub _original_path: Option<Vec<String>>,
pub _original_env_path: Option<String>,
pub _original_virtual_env: Option<Option<String>>,
}
#[pymethods]
impl VirtualEnv {
#[new]
#[pyo3(signature = (path=None, python=None, requirements=None, now=true))]
fn __new__(
path: Option<Py<PyAny>>,
python: Option<&str>,
requirements: Option<Py<PyAny>>,
now: bool,
) -> PyResult<Self> {
Python::attach(|py| {
let path_str = if let Some(path_obj) = path {
if let Ok(s) = path_obj.extract::<String>(py) {
s
} else {
path_obj
.call_method0(py, "__str__")?
.extract::<String>(py)?
}
} else {
".venv".to_string()
};
let path_str = if path_str.starts_with('~') {
let pathlib = py.import("pathlib")?;
let path_class = pathlib.getattr("Path")?;
let py_path = path_class.call1((&path_str,))?;
let expanded = py_path.call_method0("expanduser")?;
expanded.call_method0("__str__")?.extract::<String>()?
} else {
path_str
};
let path_buf = if path_str.starts_with('/') {
PathBuf::from(&path_str)
} else {
std::env::current_dir()
.map_err(|e| {
PyErr::new::<pyo3::exceptions::PyIOError, _>(format!(
"Cannot get current directory: {}",
e
))
})?
.join(&path_str)
};
let resolved_path = {
let pathlib = py.import("pathlib")?;
let path_class = pathlib.getattr("Path")?;
let py_path = path_class.call1((path_buf.to_str().unwrap(),))?;
let resolved_py_path = py_path.call_method0("resolve")?;
let resolved_str = resolved_py_path
.call_method0("__str__")?
.extract::<String>()?;
PathBuf::from(resolved_str)
};
let python_executable = if cfg!(windows) {
resolved_path.join("Scripts").join("python.exe")
} else {
resolved_path.join("bin").join("python")
};
let venv = VirtualEnv {
path: resolved_path,
name: path_str.to_string(),
python_executable,
python_version: python.map(|s| s.to_string()),
requirements,
_is_activated: false,
_original_prefix: None,
_original_exec_prefix: None,
_original_path: None,
_original_env_path: None,
_original_virtual_env: None,
};
if now {
venv.create()?;
}
Ok(venv)
})
}
fn create(&self) -> PyResult<()> {
if self.path.exists() {
return Ok(());
}
UvVirtualEnv::create(&self.path, self.python_version.as_deref()).map_err(|e| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
"Failed to create virtual environment: {}",
e
))
})?;
Ok(())
}
fn activate(&mut self) -> PyResult<()> {
if self._is_activated {
return Ok(()); }
Python::attach(|py| {
if !self.exists(py)? {
return Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
"Virtual environment at {} does not exist",
self.path.display()
)));
}
let sys = py.import("sys")?;
let current_prefix = sys.getattr("prefix")?.extract::<String>()?;
let current_exec_prefix = sys.getattr("exec_prefix")?.extract::<String>()?;
let current_path = sys.getattr("path")?.extract::<Vec<String>>()?;
self._original_prefix = Some(current_prefix);
self._original_exec_prefix = Some(current_exec_prefix);
self._original_path = Some(current_path.clone());
let os = py.import("os")?;
let environ = os.getattr("environ")?;
let current_env_path = environ.get_item("PATH")?.extract::<String>()?;
self._original_env_path = Some(current_env_path.clone());
let current_virtual_env = if let Ok(venv) = environ.get_item("VIRTUAL_ENV") {
Some(venv.extract::<String>()?)
} else {
None
};
self._original_virtual_env = Some(current_virtual_env);
sys.setattr("prefix", self.path.to_str().unwrap())?;
sys.setattr("exec_prefix", self.path.to_str().unwrap())?;
let site_packages = if cfg!(windows) {
self.path.join("Lib").join("site-packages")
} else {
let lib_dir = self.path.join("lib");
let mut found = None;
if let Ok(entries) = std::fs::read_dir(&lib_dir) {
for entry in entries.flatten() {
let name = entry.file_name();
let name_str = name.to_string_lossy();
if name_str.starts_with("python") {
let sp = entry.path().join("site-packages");
if sp.exists() {
found = Some(sp);
break;
}
}
}
}
found.unwrap_or_else(|| {
lib_dir
.join(format!(
"python{}.{}",
py.version_info().major,
py.version_info().minor
))
.join("site-packages")
})
};
let path_list = sys.getattr("path")?;
path_list.call_method1("insert", (0, site_packages.to_str().unwrap()))?;
let bin_dir = if cfg!(windows) {
self.path.join("Scripts")
} else {
self.path.join("bin")
};
let path_sep = if cfg!(windows) { ";" } else { ":" };
let new_path = format!(
"{}{}{}",
bin_dir.to_string_lossy(),
path_sep,
current_env_path
);
environ.set_item("PATH", new_path)?;
environ.set_item("VIRTUAL_ENV", self.path.to_str().unwrap())?;
self._is_activated = true;
Ok(())
})
}
fn remove(&self) -> PyResult<()> {
if self.path.exists() {
std::fs::remove_dir_all(&self.path).map_err(|e| {
PyErr::new::<pyo3::exceptions::PyIOError, _>(format!(
"Failed to remove virtual environment: {}",
e
))
})?;
}
Ok(())
}
fn __enter__(mut slf: PyRefMut<Self>) -> PyResult<PyRefMut<Self>> {
slf.create()?;
slf.install_requirements()?;
slf.activate()?;
Ok(slf)
}
fn __exit__(
&mut self,
_exc_type: &Bound<'_, PyAny>,
_exc_val: &Bound<'_, PyAny>,
_exc_tb: &Bound<'_, PyAny>,
) -> PyResult<()> {
self.deactivate()?;
Ok(())
}
#[getter]
fn exists(&self, _py: Python) -> PyResult<bool> {
Ok(self.path.join("pyvenv.cfg").exists())
}
#[getter]
fn path(&self, py: Python) -> PyResult<Py<PyAny>> {
let pathlib = py.import("pathlib")?;
let path_class = pathlib.getattr("Path")?;
let result = path_class.call1((self.path.to_str().unwrap(),))?;
Ok(result.into())
}
#[getter]
fn python_executable(&self, py: Python) -> PyResult<Py<PyAny>> {
let pathlib = py.import("pathlib")?;
let path_class = pathlib.getattr("Path")?;
let result = path_class.call1((self.python_executable.to_str().unwrap(),))?;
Ok(result.into())
}
fn deactivate(&mut self) -> PyResult<()> {
if !self._is_activated {
return Ok(()); }
Python::attach(|py| {
if let (Some(prefix), Some(path)) = (&self._original_prefix, &self._original_path) {
let sys = py.import("sys")?;
sys.setattr("prefix", prefix)?;
if let Some(exec_prefix) = &self._original_exec_prefix {
sys.setattr("exec_prefix", exec_prefix)?;
}
let path_list = sys.getattr("path")?;
path_list.call_method0("clear")?;
for p in path {
path_list.call_method1("append", (p,))?;
}
}
if let Some(original_env_path) = &self._original_env_path {
let os = py.import("os")?;
let environ = os.getattr("environ")?;
environ.set_item("PATH", original_env_path)?;
if let Some(Some(original_venv)) = &self._original_virtual_env {
environ.set_item("VIRTUAL_ENV", original_venv)?;
} else {
let _ = environ.call_method1("pop", ("VIRTUAL_ENV", py.None()));
}
}
self._is_activated = false;
self._original_prefix = None;
self._original_exec_prefix = None;
self._original_path = None;
self._original_env_path = None;
self._original_virtual_env = None;
Ok(())
})
}
fn install_requirements(&self) -> PyResult<()> {
if let Some(reqs) = &self.requirements {
Python::attach(|py| {
if reqs.extract::<String>(py).is_ok() || reqs.extract::<Vec<String>>(py).is_ok() {
self.install(reqs.clone_ref(py))
} else {
match reqs.extract::<i32>(py) {
Ok(_) => Err(PyErr::new::<pyo3::exceptions::PyTypeError, _>(
"requirements should be a string, list of strings, or Path object, not int",
)),
Err(_) => self.install(reqs.clone_ref(py)), }
}
})
} else {
Ok(())
}
}
fn install(&self, packages: Py<PyAny>) -> PyResult<()> {
let uv_venv = UvVirtualEnv {
path: self.path.clone(),
};
Python::attach(|py| {
if let Ok(package_str) = packages.extract::<String>(py) {
if package_str.ends_with(".txt") {
uv_venv
.install_requirements(std::path::Path::new(&package_str))
.map_err(|e| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
"Failed to install requirements: {}",
e
))
})?;
} else {
uv_venv.install_packages(&[package_str]).map_err(|e| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
"Failed to install package: {}",
e
))
})?;
}
} else if let Ok(package_list) = packages.extract::<Vec<String>>(py) {
uv_venv.install_packages(&package_list).map_err(|e| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
"Failed to install packages: {}",
e
))
})?;
} else {
let package_str = packages
.call_method0(py, "__str__")?
.extract::<String>(py)?;
uv_venv
.install_requirements(std::path::Path::new(&package_str))
.map_err(|e| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
"Failed to install requirements: {}",
e
))
})?;
}
Ok(())
})
}
#[classmethod]
fn discover_available_pythons(_cls: &Bound<'_, PyType>) -> PyResult<Vec<(String, String)>> {
UvVirtualEnv::discover_pythons()
.map(|pythons| {
pythons
.into_iter()
.map(|(version, path)| (version, path.display().to_string()))
.collect()
})
.map_err(|e| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
"Failed to discover Python installations: {}",
e
))
})
}
#[classmethod]
fn ensure_python(_cls: &Bound<'_, PyType>, version: &str) -> PyResult<String> {
UvVirtualEnv::install_python(version)
.map(|path| path.display().to_string())
.map_err(|e| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
"Failed to ensure Python {}: {}",
version, e
))
})
}
#[classmethod]
fn version(_cls: &Bound<'_, PyType>) -> PyResult<String> {
UvIntegration::version().map_err(|e| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
"Failed to get UV version: {}",
e
))
})
}
}
#[pyfunction]
#[pyo3(signature = (path, requirements = None))]
pub fn venv_required(
path: &str,
requirements: Option<Py<PyAny>>,
) -> PyResult<VenvRequiredDecorator> {
Ok(VenvRequiredDecorator {
path: path.to_string(),
requirements,
})
}
#[pyclass]
pub struct VenvRequiredDecorator {
path: String,
requirements: Option<Py<PyAny>>,
}
#[pymethods]
impl VenvRequiredDecorator {
fn __call__(&self, py: Python, func: Py<PyAny>) -> PyResult<Py<PyAny>> {
let wrapper = VenvRequiredWrapper {
original_func: func,
path: self.path.clone(),
requirements: self.requirements.as_ref().map(|r| r.clone_ref(py)),
command: None,
arguments: None,
};
Ok(Py::new(py, wrapper)?.into())
}
}
#[pyclass]
struct VenvRequiredWrapper {
original_func: Py<PyAny>,
path: String,
requirements: Option<Py<PyAny>>,
command: Option<Py<PyAny>>,
arguments: Option<Py<PyAny>>,
}
impl Clone for VenvRequiredWrapper {
fn clone(&self) -> Self {
Python::attach(|py| Self {
original_func: self.original_func.clone_ref(py),
path: self.path.clone(),
requirements: self.requirements.as_ref().map(|r| r.clone_ref(py)),
command: self.command.as_ref().map(|c| c.clone_ref(py)),
arguments: self.arguments.as_ref().map(|a| a.clone_ref(py)),
})
}
}
#[pymethods]
impl VenvRequiredWrapper {
#[pyo3(signature = (*args, **kwargs))]
fn __call__(
&self,
args: &Bound<'_, pyo3::types::PyTuple>,
kwargs: Option<&Bound<'_, pyo3::types::PyDict>>,
) -> PyResult<Py<PyAny>> {
Python::attach(|py| {
let venv_class = py.get_type::<VirtualEnv>();
let venv_kwargs = pyo3::types::PyDict::new(py);
venv_kwargs.set_item("now", true)?;
if let Some(reqs) = &self.requirements {
venv_kwargs.set_item("requirements", reqs)?;
}
let venv = venv_class.call((&self.path,), Some(&venv_kwargs))?;
venv.call_method0("install_requirements")?;
venv.call_method0("activate")?;
let call_result = if let Some(kwargs) = kwargs {
self.original_func.call(py, args, Some(kwargs))
} else {
self.original_func.call(py, args, None)
};
let _ = venv.call_method0("deactivate");
call_result
})
}
#[getter(__command)]
fn get_command(&self) -> PyResult<Py<PyAny>> {
Python::attach(|py| {
self.command
.as_ref()
.map(|c| c.clone_ref(py))
.ok_or_else(|| {
PyErr::new::<pyo3::exceptions::PyAttributeError, _>("__command not set")
})
})
}
#[setter(__command)]
fn set_command(&mut self, value: Py<PyAny>) {
self.command = Some(value);
}
#[getter(__arguments)]
fn get_arguments(&self) -> PyResult<Py<PyAny>> {
Python::attach(|py| {
if let Some(args) = &self.arguments {
Ok(args.clone_ref(py))
} else {
self.original_func
.getattr(py, "__arguments")
.or_else(|_| Ok(py.None()))
}
})
}
#[setter(__arguments)]
fn set_arguments(&mut self, value: Py<PyAny>) {
self.arguments = Some(value);
}
#[getter]
fn __name__(&self) -> PyResult<Py<PyAny>> {
Python::attach(|py| {
self.original_func
.getattr(py, "__name__")
.or_else(|_| Ok(py.None()))
})
}
#[getter]
fn __doc__(&self) -> PyResult<Py<PyAny>> {
Python::attach(|py| {
self.original_func
.getattr(py, "__doc__")
.or_else(|_| Ok(py.None()))
})
}
fn __getattr__(&self, name: &str) -> PyResult<Py<PyAny>> {
Python::attach(|py| self.original_func.getattr(py, name))
}
}
pub fn register_venv(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<VirtualEnv>()?;
m.add_class::<VenvRequiredDecorator>()?;
m.add_class::<VenvRequiredWrapper>()?;
m.add_function(pyo3::wrap_pyfunction!(venv_required, m)?)?;
Ok(())
}