#![allow(non_local_definitions)]
use pyo3::prelude::*;
use pyo3::types::PyDict;
use pyo3::wrap_pyfunction;
use std::collections::HashMap;
use std::path::Path;
use crate::integrations::git::Git;
#[pyclass(extends=pyo3::exceptions::PyException)]
pub struct GitException {
message: String,
}
#[pymethods]
impl GitException {
#[new]
fn new(message: String) -> Self {
Self { message }
}
fn __str__(&self) -> String {
self.message.clone()
}
}
#[pyclass(name = "Git")]
pub struct PyGit {
inner: Git,
}
#[pymethods]
impl PyGit {
#[new]
#[pyo3(signature = (working_dir=None))]
fn new(working_dir: Option<&str>) -> PyResult<Self> {
let git = Git::new(working_dir.map(Path::new))
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
Ok(Self { inner: git })
}
fn execute(
&self,
subcommand: &str,
args: Vec<String>,
) -> PyResult<(i32, Py<PyAny>, Py<PyAny>)> {
Python::attach(|py| {
let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
let output = self
.inner
.execute(subcommand, &arg_refs)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
Ok((
output.exit_code,
pyo3::types::PyBytes::new(py, output.stderr.as_bytes()).into(),
pyo3::types::PyBytes::new(py, output.stdout.as_bytes()).into(),
))
})
}
#[pyo3(signature = (bare=None))]
fn init(&self, bare: Option<bool>) -> PyResult<(i32, Py<PyAny>, Py<PyAny>)> {
Python::attach(|py| {
let output = if bare.unwrap_or(false) {
self.inner.execute("init", &["--bare"])
} else {
self.inner.execute("init", &[])
}
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
Ok((
output.exit_code,
pyo3::types::PyBytes::new(py, output.stderr.as_bytes()).into(),
pyo3::types::PyBytes::new(py, output.stdout.as_bytes()).into(),
))
})
}
#[pyo3(signature = (*paths))]
fn add(
&self,
paths: &Bound<'_, pyo3::types::PyTuple>,
) -> PyResult<(i32, Py<PyAny>, Py<PyAny>)> {
Python::attach(|py| {
let path_strs: Vec<String> = paths
.iter()
.map(|p| p.extract::<String>())
.collect::<Result<Vec<_>, _>>()?;
let path_refs: Vec<&str> = path_strs.iter().map(|s| s.as_str()).collect();
let output = self
.inner
.execute("add", &path_refs)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
Ok((
output.exit_code,
pyo3::types::PyBytes::new(py, output.stderr.as_bytes()).into(),
pyo3::types::PyBytes::new(py, output.stdout.as_bytes()).into(),
))
})
}
#[pyo3(signature = (message, all=None))]
fn commit(&self, message: &str, all: Option<bool>) -> PyResult<(i32, Py<PyAny>, Py<PyAny>)> {
Python::attach(|py| {
let args = if all.unwrap_or(false) {
vec!["-m", message, "-a"]
} else {
vec!["-m", message]
};
let output = self
.inner
.execute("commit", &args)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
Ok((
output.exit_code,
pyo3::types::PyBytes::new(py, output.stderr.as_bytes()).into(),
pyo3::types::PyBytes::new(py, output.stdout.as_bytes()).into(),
))
})
}
#[pyo3(signature = (remote=None, branch=None))]
fn push(
&self,
remote: Option<&str>,
branch: Option<&str>,
) -> PyResult<(i32, Py<PyAny>, Py<PyAny>)> {
Python::attach(|py| {
let mut args = vec![];
if let Some(r) = remote {
args.push(r);
}
if let Some(b) = branch {
args.push(b);
}
let output = self
.inner
.execute("push", &args)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
Ok((
output.exit_code,
pyo3::types::PyBytes::new(py, output.stderr.as_bytes()).into(),
pyo3::types::PyBytes::new(py, output.stdout.as_bytes()).into(),
))
})
}
#[pyo3(signature = (remote=None, branch=None))]
fn pull(
&self,
remote: Option<&str>,
branch: Option<&str>,
) -> PyResult<(i32, Py<PyAny>, Py<PyAny>)> {
Python::attach(|py| {
let mut args = vec![];
if let Some(r) = remote {
args.push(r);
}
if let Some(b) = branch {
args.push(b);
}
let output = self
.inner
.execute("pull", &args)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
Ok((
output.exit_code,
pyo3::types::PyBytes::new(py, output.stderr.as_bytes()).into(),
pyo3::types::PyBytes::new(py, output.stdout.as_bytes()).into(),
))
})
}
#[pyo3(signature = (short=None))]
fn status(&self, short: Option<bool>) -> PyResult<(i32, Py<PyAny>, Py<PyAny>)> {
Python::attach(|py| {
let args = if short.unwrap_or(false) {
vec!["--short"]
} else {
vec![]
};
let output = self
.inner
.execute("status", &args)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
Ok((
output.exit_code,
pyo3::types::PyBytes::new(py, output.stderr.as_bytes()).into(),
pyo3::types::PyBytes::new(py, output.stdout.as_bytes()).into(),
))
})
}
#[pyo3(signature = (name=None, delete=None))]
fn branch(
&self,
name: Option<&str>,
delete: Option<bool>,
) -> PyResult<(i32, Py<PyAny>, Py<PyAny>)> {
Python::attach(|py| {
let mut args = vec![];
if delete.unwrap_or(false) {
args.push("-d");
}
if let Some(n) = name {
args.push(n);
}
let output = self
.inner
.execute("branch", &args)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
Ok((
output.exit_code,
pyo3::types::PyBytes::new(py, output.stderr.as_bytes()).into(),
pyo3::types::PyBytes::new(py, output.stdout.as_bytes()).into(),
))
})
}
#[pyo3(signature = (branch, create=None))]
fn checkout(
&self,
branch: &str,
create: Option<bool>,
) -> PyResult<(i32, Py<PyAny>, Py<PyAny>)> {
Python::attach(|py| {
let args = if create.unwrap_or(false) {
vec!["-b", branch]
} else {
vec![branch]
};
let output = self
.inner
.execute("checkout", &args)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
Ok((
output.exit_code,
pyo3::types::PyBytes::new(py, output.stderr.as_bytes()).into(),
pyo3::types::PyBytes::new(py, output.stdout.as_bytes()).into(),
))
})
}
#[pyo3(signature = (name, message=None))]
fn tag(&self, name: &str, message: Option<&str>) -> PyResult<(i32, Py<PyAny>, Py<PyAny>)> {
Python::attach(|py| {
let args = if let Some(msg) = message {
vec!["-m", msg, name]
} else {
vec![name]
};
let output = self
.inner
.execute("tag", &args)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
Ok((
output.exit_code,
pyo3::types::PyBytes::new(py, output.stderr.as_bytes()).into(),
pyo3::types::PyBytes::new(py, output.stdout.as_bytes()).into(),
))
})
}
#[pyo3(signature = (command, *args, **kwargs))]
fn __call__(
&self,
command: &str,
args: &Bound<'_, pyo3::types::PyTuple>,
kwargs: Option<&Bound<'_, PyDict>>,
) -> PyResult<(i32, Py<PyAny>, Py<PyAny>)> {
Python::attach(|py| {
let arg_strs: Vec<String> = args
.iter()
.map(|p| p.extract::<String>())
.collect::<Result<Vec<_>, _>>()?;
let arg_refs: Vec<&str> = arg_strs.iter().map(|s| s.as_str()).collect();
let output = if let Some(dict) = kwargs {
let mut options_owned = HashMap::new();
for (key, value) in dict.iter() {
let key_str = key.extract::<String>()?;
let value_str = if value.is_truthy()? {
"".to_string() } else {
value.extract::<String>()?
};
options_owned.insert(key_str, value_str);
}
let options: HashMap<&str, &str> = options_owned
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect();
self.inner.execute_with_options(command, options, &arg_refs)
} else {
self.inner.execute(command, &arg_refs)
}
.map_err(|e| {
if e.to_string().contains("not supported") {
PyErr::new::<GitException, _>(e.to_string())
} else {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string())
}
})?;
Ok((
output.exit_code,
pyo3::types::PyBytes::new(py, output.stderr.as_bytes()).into(),
pyo3::types::PyBytes::new(py, output.stdout.as_bytes()).into(),
))
})
}
#[getter]
fn working_dir(&self) -> String {
self.inner.working_dir().display().to_string()
}
fn __getattr__(&self, _py: Python, name: &str) -> PyResult<Py<PyAny>> {
Err(PyErr::new::<GitException, _>(format!(
"Git command '{}' not found",
name
)))
}
}
#[pyfunction]
#[pyo3(signature = (remote, destination=None))]
pub fn git_clone(remote: &str, destination: Option<&str>) -> PyResult<String> {
let dest = Git::clone(remote, destination.map(Path::new))
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
Ok(dest.display().to_string())
}
#[pymodule]
pub fn git_integration(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<GitException>()?;
m.add("Git", _py.get_type::<PyGit>())?;
m.add("clone", wrap_pyfunction!(git_clone, m)?)?;
Ok(())
}