use crate::integrations::flox::{FloxEnvironment, FloxIntegration};
use pyo3::prelude::*;
use pyo3::types::{PyDict, PyList, PyType};
use std::collections::HashMap;
use std::path::PathBuf;
#[pyclass(name = "Flox")]
pub struct Flox {
pub path: PathBuf,
#[pyo3(get)]
pub _is_activated: bool,
pub _original_env: Option<HashMap<String, String>>,
pub _added_keys: Option<Vec<String>>,
pub _services: Option<Vec<String>>,
}
#[pymethods]
impl Flox {
#[new]
#[pyo3(signature = (path=None, services=None))]
fn __new__(path: Option<Py<PyAny>>, services: Option<Vec<String>>) -> PyResult<Self> {
Python::attach(|py| {
let path_str = if let Some(path_obj) = path {
if let Ok(s) = path_obj.extract::<String>(py) {
s
} else {
path_obj
.call_method0(py, "__str__")?
.extract::<String>(py)?
}
} else {
".".to_string()
};
let path_str = if path_str.starts_with('~') {
let pathlib = py.import("pathlib")?;
let path_class = pathlib.getattr("Path")?;
let py_path = path_class.call1((&path_str,))?;
let expanded = py_path.call_method0("expanduser")?;
expanded.call_method0("__str__")?.extract::<String>()?
} else {
path_str
};
let path_buf = if path_str.starts_with('/') {
PathBuf::from(&path_str)
} else {
std::env::current_dir()
.map_err(|e| {
PyErr::new::<pyo3::exceptions::PyIOError, _>(format!(
"Cannot get current directory: {}",
e
))
})?
.join(&path_str)
};
let resolved_path = {
let pathlib = py.import("pathlib")?;
let path_class = pathlib.getattr("Path")?;
let py_path = path_class.call1((path_buf.to_str().unwrap(),))?;
let resolved_py_path = py_path.call_method0("resolve")?;
let resolved_str = resolved_py_path
.call_method0("__str__")?
.extract::<String>()?;
PathBuf::from(resolved_str)
};
Ok(Flox {
path: resolved_path,
_is_activated: false,
_original_env: None,
_added_keys: None,
_services: services,
})
})
}
#[getter]
fn exists(&self) -> bool {
let flox_env = FloxEnvironment::new(&self.path);
flox_env.exists()
}
#[getter]
fn has_manifest(&self) -> bool {
let flox_env = FloxEnvironment::new(&self.path);
flox_env.has_manifest()
}
#[getter]
fn path(&self, py: Python) -> PyResult<Py<PyAny>> {
let pathlib = py.import("pathlib")?;
let path_class = pathlib.getattr("Path")?;
let result = path_class.call1((self.path.to_str().unwrap(),))?;
Ok(result.into())
}
fn activate(&mut self) -> PyResult<()> {
if self._is_activated {
return Ok(()); }
Python::attach(|py| {
if !self.exists() {
return Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
"Flox environment at {} does not exist",
self.path.display()
)));
}
let flox_env = FloxEnvironment::new(&self.path);
let activation_env = flox_env.get_activation_env().map_err(|e| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
"Failed to get Flox activation environment: {}",
e
))
})?;
let os = py.import("os")?;
let environ = os.getattr("environ")?;
let mut original_env = HashMap::new();
let mut added_keys = Vec::new();
for key in activation_env.keys() {
if let Ok(value) = environ.get_item(key) {
original_env.insert(key.clone(), value.extract::<String>()?);
} else {
added_keys.push(key.clone());
}
}
self._original_env = Some(original_env);
self._added_keys = Some(added_keys);
for (key, value) in &activation_env {
environ.set_item(key, value)?;
std::env::set_var(key, value);
}
self._is_activated = true;
Ok(())
})
}
fn deactivate(&mut self) -> PyResult<()> {
if !self._is_activated {
return Ok(()); }
Python::attach(|py| {
let os = py.import("os")?;
let environ = os.getattr("environ")?;
if let Some(ref original_env) = self._original_env {
for (key, value) in original_env {
environ.set_item(key, value)?;
std::env::set_var(key, value);
}
}
if let Some(ref added_keys) = self._added_keys {
for key in added_keys {
let _ = environ.call_method1("pop", (key, py.None()));
std::env::remove_var(key);
}
}
self._is_activated = false;
self._original_env = None;
self._added_keys = None;
Ok(())
})
}
fn __enter__(mut slf: PyRefMut<Self>) -> PyResult<PyRefMut<Self>> {
slf.activate()?;
if let Some(ref services) = slf._services {
if !services.is_empty() {
let flox_env = FloxEnvironment::new(&slf.path);
let service_refs: Vec<&str> = services.iter().map(|s| s.as_str()).collect();
flox_env.services_start(&service_refs).map_err(|e| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
"Failed to start services: {}",
e
))
})?;
}
}
Ok(slf)
}
fn __exit__(
&mut self,
_exc_type: &Bound<'_, PyAny>,
_exc_val: &Bound<'_, PyAny>,
_exc_tb: &Bound<'_, PyAny>,
) -> PyResult<()> {
if let Some(ref services) = self._services {
if !services.is_empty() {
let flox_env = FloxEnvironment::new(&self.path);
let service_refs: Vec<&str> = services.iter().map(|s| s.as_str()).collect();
let _ = flox_env.services_stop(&service_refs);
}
}
self.deactivate()?;
Ok(())
}
#[classmethod]
fn is_available(_cls: &Bound<'_, PyType>) -> bool {
FloxIntegration::is_available()
}
#[classmethod]
fn version(_cls: &Bound<'_, PyType>) -> PyResult<String> {
FloxIntegration::version().map_err(|e| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
"Failed to get Flox version: {}",
e
))
})
}
#[pyo3(signature = (command, args = None))]
fn run(&self, command: &str, args: Option<Vec<String>>) -> PyResult<(i32, String, String)> {
let flox_env = FloxEnvironment::new(&self.path);
let args_refs: Vec<&str> = args
.as_ref()
.map(|a| a.iter().map(|s| s.as_str()).collect())
.unwrap_or_default();
let output = flox_env.run_in_env(command, &args_refs).map_err(|e| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
"Failed to run command in Flox environment: {}",
e
))
})?;
let exit_code = output.status.code().unwrap_or(-1);
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
Ok((exit_code, stdout, stderr))
}
#[getter]
fn services(&self) -> FloxServices {
FloxServices {
path: self.path.clone(),
}
}
}
#[pyclass(name = "ServiceInfo")]
#[derive(Clone)]
pub struct ServiceInfo {
#[pyo3(get)]
pub name: String,
#[pyo3(get)]
pub status: String,
#[pyo3(get)]
pub pid: Option<u32>,
}
#[pymethods]
impl ServiceInfo {
#[new]
#[pyo3(signature = (name, status, pid=None))]
fn __new__(name: String, status: String, pid: Option<u32>) -> Self {
ServiceInfo { name, status, pid }
}
fn __repr__(&self) -> String {
match self.pid {
Some(pid) => format!(
"ServiceInfo(name='{}', status='{}', pid={})",
self.name, self.status, pid
),
None => format!(
"ServiceInfo(name='{}', status='{}')",
self.name, self.status
),
}
}
fn as_tuple(&self) -> (String, String, Option<u32>) {
(self.name.clone(), self.status.clone(), self.pid)
}
}
#[pyclass(name = "FloxServices")]
pub struct FloxServices {
pub path: PathBuf,
}
#[pymethods]
impl FloxServices {
#[new]
fn __new__(path: Py<PyAny>) -> PyResult<Self> {
Python::attach(|py| {
let path_str = if let Ok(s) = path.extract::<String>(py) {
s
} else {
path.call_method0(py, "__str__")?.extract::<String>(py)?
};
let path_str = if path_str.starts_with('~') {
let pathlib = py.import("pathlib")?;
let path_class = pathlib.getattr("Path")?;
let py_path = path_class.call1((&path_str,))?;
let expanded = py_path.call_method0("expanduser")?;
expanded.call_method0("__str__")?.extract::<String>()?
} else {
path_str
};
let path_buf = if path_str.starts_with('/') {
PathBuf::from(&path_str)
} else {
std::env::current_dir()
.map_err(|e| {
PyErr::new::<pyo3::exceptions::PyIOError, _>(format!(
"Cannot get current directory: {}",
e
))
})?
.join(&path_str)
};
let resolved_path = {
let pathlib = py.import("pathlib")?;
let path_class = pathlib.getattr("Path")?;
let py_path = path_class.call1((path_buf.to_str().unwrap(),))?;
let resolved_py_path = py_path.call_method0("resolve")?;
let resolved_str = resolved_py_path
.call_method0("__str__")?
.extract::<String>()?;
PathBuf::from(resolved_str)
};
Ok(FloxServices {
path: resolved_path,
})
})
}
#[getter]
fn path(&self, py: Python) -> PyResult<Py<PyAny>> {
let pathlib = py.import("pathlib")?;
let path_class = pathlib.getattr("Path")?;
let result = path_class.call1((self.path.to_str().unwrap(),))?;
Ok(result.into())
}
#[pyo3(signature = (*services))]
fn start(&self, services: &Bound<'_, PyAny>) -> PyResult<FloxServiceHandle> {
let flox_env = FloxEnvironment::new(&self.path);
let service_names: Vec<String> = if services.len()? > 0 {
services.extract()?
} else {
Vec::new()
};
let service_refs: Vec<&str> = service_names.iter().map(|s| s.as_str()).collect();
flox_env.services_start(&service_refs).map_err(|e| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
"Failed to start services: {}",
e
))
})?;
let statuses = flox_env.services_status().map_err(|e| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
"Failed to get service status: {}",
e
))
})?;
let service_infos: Vec<ServiceInfo> = statuses
.into_iter()
.filter(|s| {
service_names.is_empty() || service_names.iter().any(|name| name == &s.name)
})
.map(|s| ServiceInfo {
name: s.name,
status: s.status,
pid: s.pid,
})
.collect();
Ok(FloxServiceHandle {
flox_env_path: self.path.clone(),
services: service_infos,
started_at: chrono_now(),
})
}
#[pyo3(signature = (*services))]
fn stop(&self, services: &Bound<'_, PyAny>) -> PyResult<()> {
let flox_env = FloxEnvironment::new(&self.path);
let service_names: Vec<String> = if services.len()? > 0 {
services.extract()?
} else {
Vec::new()
};
let service_refs: Vec<&str> = service_names.iter().map(|s| s.as_str()).collect();
flox_env.services_stop(&service_refs).map_err(|e| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
"Failed to stop services: {}",
e
))
})
}
fn status(&self) -> PyResult<Vec<ServiceInfo>> {
let flox_env = FloxEnvironment::new(&self.path);
let statuses = flox_env.services_status().map_err(|e| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
"Failed to get service status: {}",
e
))
})?;
Ok(statuses
.into_iter()
.map(|s| ServiceInfo {
name: s.name,
status: s.status,
pid: s.pid,
})
.collect())
}
#[pyo3(signature = (service, follow=false, tail=None))]
fn logs(&self, service: &str, follow: bool, tail: Option<u32>) -> PyResult<String> {
if follow {
return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
"follow=True is not supported: it would block \
indefinitely. Use tail=N to get recent logs.",
));
}
let flox_env = FloxEnvironment::new(&self.path);
flox_env.services_logs(service, false, tail).map_err(|e| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
"Failed to get logs for service '{}': {}",
service, e
))
})
}
#[pyo3(signature = (*services))]
fn restart(&self, services: &Bound<'_, PyAny>) -> PyResult<()> {
let flox_env = FloxEnvironment::new(&self.path);
let service_names: Vec<String> = if services.len()? > 0 {
services.extract()?
} else {
Vec::new()
};
let service_refs: Vec<&str> = service_names.iter().map(|s| s.as_str()).collect();
flox_env.services_restart(&service_refs).map_err(|e| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
"Failed to restart services: {}",
e
))
})
}
}
#[pyclass(name = "FloxServiceHandle")]
#[derive(Clone)]
pub struct FloxServiceHandle {
pub flox_env_path: PathBuf,
pub services: Vec<ServiceInfo>,
pub started_at: String,
}
fn chrono_now() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let secs = now.as_secs();
let days = secs / 86400;
let time_secs = secs % 86400;
let hours = time_secs / 3600;
let minutes = (time_secs % 3600) / 60;
let seconds = time_secs % 60;
let mut y = 1970i64;
let mut remaining_days = days as i64;
loop {
let days_in_year = if is_leap_year(y) { 366 } else { 365 };
if remaining_days < days_in_year {
break;
}
remaining_days -= days_in_year;
y += 1;
}
let leap = is_leap_year(y);
let month_days = [
31,
if leap { 29 } else { 28 },
31,
30,
31,
30,
31,
31,
30,
31,
30,
31,
];
let mut m = 0usize;
for (i, &md) in month_days.iter().enumerate() {
if remaining_days < md as i64 {
m = i;
break;
}
remaining_days -= md as i64;
}
format!(
"{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z",
y,
m + 1,
remaining_days + 1,
hours,
minutes,
seconds
)
}
fn is_leap_year(y: i64) -> bool {
(y % 4 == 0 && y % 100 != 0) || (y % 400 == 0)
}
#[pymethods]
impl FloxServiceHandle {
#[getter]
fn flox_env_path(&self, py: Python) -> PyResult<Py<PyAny>> {
let pathlib = py.import("pathlib")?;
let path_class = pathlib.getattr("Path")?;
let result = path_class.call1((self.flox_env_path.to_str().unwrap(),))?;
Ok(result.into())
}
#[getter]
fn services(&self) -> Vec<ServiceInfo> {
self.services.clone()
}
#[getter]
fn started_at(&self) -> String {
self.started_at.clone()
}
fn stop(&self) -> PyResult<()> {
let flox_env = FloxEnvironment::new(&self.flox_env_path);
let service_names: Vec<&str> = self.services.iter().map(|s| s.name.as_str()).collect();
flox_env.services_stop(&service_names).map_err(|e| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
"Failed to stop services: {}",
e
))
})
}
#[pyo3(signature = (path=None))]
fn save(&self, path: Option<&str>) -> PyResult<()> {
let file_path = path.unwrap_or(".flox-services.json");
Python::attach(|py| {
let json = py.import("json")?;
let data = PyDict::new(py);
data.set_item("flox_env_path", self.flox_env_path.to_str().unwrap())?;
data.set_item("started_at", &self.started_at)?;
let services_list = PyList::empty(py);
for svc in &self.services {
let svc_dict = PyDict::new(py);
svc_dict.set_item("name", &svc.name)?;
svc_dict.set_item("status", &svc.status)?;
svc_dict.set_item("pid", svc.pid)?;
services_list.append(svc_dict)?;
}
data.set_item("services", services_list)?;
let builtins = py.import("builtins")?;
let file = builtins.call_method1("open", (file_path, "w"))?;
json.call_method1("dump", (data, &file))?;
file.call_method0("close")?;
Ok(())
})
}
#[classmethod]
#[pyo3(signature = (path=None))]
fn load(_cls: &Bound<'_, PyType>, path: Option<&str>) -> PyResult<FloxServiceHandle> {
let file_path = path.unwrap_or(".flox-services.json");
Python::attach(|py| {
let json = py.import("json")?;
let builtins = py.import("builtins")?;
let file = builtins.call_method1("open", (file_path, "r"))?;
let data: Bound<PyDict> = json.call_method1("load", (&file,))?.cast_into()?;
file.call_method0("close")?;
let flox_env_path = PathBuf::from(
data.get_item("flox_env_path")?
.ok_or_else(|| {
PyErr::new::<pyo3::exceptions::PyKeyError, _>("missing flox_env_path")
})?
.extract::<String>()?,
);
let started_at = data
.get_item("started_at")?
.ok_or_else(|| PyErr::new::<pyo3::exceptions::PyKeyError, _>("missing started_at"))?
.extract::<String>()?;
let services_list: Bound<PyList> = data
.get_item("services")?
.ok_or_else(|| PyErr::new::<pyo3::exceptions::PyKeyError, _>("missing services"))?
.cast_into()?;
let mut services = Vec::new();
for item in services_list.iter() {
let svc_dict: Bound<PyDict> = item.cast_into()?;
let name = svc_dict
.get_item("name")?
.ok_or_else(|| {
PyErr::new::<pyo3::exceptions::PyKeyError, _>("missing service name")
})?
.extract::<String>()?;
let status = svc_dict
.get_item("status")?
.ok_or_else(|| {
PyErr::new::<pyo3::exceptions::PyKeyError, _>("missing service status")
})?
.extract::<String>()?;
let pid = svc_dict
.get_item("pid")?
.map(|p| p.extract::<Option<u32>>())
.transpose()?
.flatten();
services.push(ServiceInfo { name, status, pid });
}
Ok(FloxServiceHandle {
flox_env_path,
services,
started_at,
})
})
}
fn __repr__(&self) -> String {
let service_names: Vec<&str> = self.services.iter().map(|s| s.name.as_str()).collect();
format!(
"FloxServiceHandle(services={:?}, started_at='{}')",
service_names, self.started_at
)
}
}
#[pyfunction]
#[pyo3(signature = (path=None, services=None))]
pub fn flox_required(
path: Option<Py<PyAny>>,
services: Option<Vec<String>>,
) -> PyResult<FloxRequiredDecorator> {
Python::attach(|py| {
Ok(FloxRequiredDecorator {
path: path.map(|p| p.clone_ref(py)),
services,
})
})
}
#[pyclass]
pub struct FloxRequiredDecorator {
path: Option<Py<PyAny>>,
services: Option<Vec<String>>,
}
#[pymethods]
impl FloxRequiredDecorator {
fn __call__(&self, py: Python, func: Py<PyAny>) -> PyResult<Py<PyAny>> {
let wrapper = FloxRequiredWrapper {
original_func: func,
path: self.path.as_ref().map(|p| p.clone_ref(py)),
services: self.services.clone(),
};
Ok(Py::new(py, wrapper)?.into())
}
}
#[pyclass]
struct FloxRequiredWrapper {
original_func: Py<PyAny>,
path: Option<Py<PyAny>>,
services: Option<Vec<String>>,
}
impl Clone for FloxRequiredWrapper {
fn clone(&self) -> Self {
Python::attach(|py| Self {
original_func: self.original_func.clone_ref(py),
path: self.path.as_ref().map(|p| p.clone_ref(py)),
services: self.services.clone(),
})
}
}
#[pymethods]
impl FloxRequiredWrapper {
#[pyo3(signature = (*args, **kwargs))]
fn __call__(
&self,
args: &Bound<'_, pyo3::types::PyTuple>,
kwargs: Option<&Bound<'_, pyo3::types::PyDict>>,
) -> PyResult<Py<PyAny>> {
Python::attach(|py| {
let flox_class = py.get_type::<Flox>();
let flox = if let Some(path) = &self.path {
flox_class.call1((path,))?
} else {
flox_class.call0()?
};
let exists: bool = flox.getattr("exists")?.extract()?;
if !exists {
return Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
"Flox environment does not exist. Run 'flox init' first.",
));
}
flox.call_method0("activate")?;
let mut services_started = false;
if let Some(service_list) = &self.services {
if !service_list.is_empty() {
let services = flox.getattr("services")?;
let service_tuple = pyo3::types::PyTuple::new(py, service_list)?;
services.call_method1("start", service_tuple)?;
services_started = true;
}
}
let call_result = if let Some(kwargs) = kwargs {
self.original_func.call(py, args, Some(kwargs))
} else {
self.original_func.call(py, args, None)
};
if services_started {
let services = flox.getattr("services")?;
let _ = services.call_method0("stop");
}
let _ = flox.call_method0("deactivate");
call_result
})
}
#[getter(__arguments)]
fn get_arguments(&self) -> PyResult<Py<PyAny>> {
Python::attach(|py| {
self.original_func
.getattr(py, "__arguments")
.or_else(|_| Ok(py.None()))
})
}
#[getter]
fn __name__(&self) -> PyResult<Py<PyAny>> {
Python::attach(|py| {
self.original_func
.getattr(py, "__name__")
.or_else(|_| Ok(py.None()))
})
}
#[getter]
fn __doc__(&self) -> PyResult<Py<PyAny>> {
Python::attach(|py| {
self.original_func
.getattr(py, "__doc__")
.or_else(|_| Ok(py.None()))
})
}
fn __getattr__(&self, name: &str) -> PyResult<Py<PyAny>> {
Python::attach(|py| self.original_func.getattr(py, name))
}
}
#[pymodule]
pub fn flox(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<Flox>()?;
m.add_class::<FloxServices>()?;
m.add_class::<FloxServiceHandle>()?;
m.add_class::<ServiceInfo>()?;
m.add_class::<FloxRequiredDecorator>()?;
m.add_class::<FloxRequiredWrapper>()?;
m.add_function(pyo3::wrap_pyfunction!(flox_required, m)?)?;
Ok(())
}
pub fn register_flox(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<Flox>()?;
m.add_class::<FloxServices>()?;
m.add_class::<FloxServiceHandle>()?;
m.add_class::<ServiceInfo>()?;
m.add_class::<FloxRequiredDecorator>()?;
m.add_class::<FloxRequiredWrapper>()?;
m.add_function(pyo3::wrap_pyfunction!(flox_required, m)?)?;
Ok(())
}