use crate::controldir::{ControlDir, GenericControlDir, PyControlDir};
use crate::error::Error;
use crate::foreign::VcsType;
use crate::lock::Lock;
use crate::repository::{GenericRepository, PyRepository, Repository};
use crate::revisionid::RevisionId;
use pyo3::intern;
use pyo3::prelude::*;
use pyo3::types::PyDict;
#[derive(Debug)]
pub struct BranchFormat(Py<PyAny>);
impl Clone for BranchFormat {
fn clone(&self) -> Self {
Python::attach(|py| BranchFormat(self.0.clone_ref(py)))
}
}
impl BranchFormat {
pub fn supports_stacking(&self) -> bool {
Python::attach(|py| {
self.0
.call_method0(py, "supports_stacking")
.unwrap()
.extract(py)
.unwrap()
})
}
}
pub trait Branch {
fn as_any(&self) -> &dyn std::any::Any;
fn format(&self) -> BranchFormat;
fn vcs_type(&self) -> VcsType;
fn revno(&self) -> u32;
fn lock_read(&self) -> Result<Lock, crate::error::Error>;
fn lock_write(&self) -> Result<Lock, crate::error::Error>;
fn tags(&self) -> Result<crate::tags::Tags, crate::error::Error>;
fn repository(&self) -> GenericRepository;
fn last_revision(&self) -> RevisionId;
fn name(&self) -> Option<String>;
fn basis_tree(&self) -> Result<crate::tree::RevisionTree, crate::error::Error>;
fn get_user_url(&self) -> url::Url;
fn controldir(
&self,
) -> Box<
dyn ControlDir<
Branch = GenericBranch,
Repository = crate::repository::GenericRepository,
WorkingTree = crate::workingtree::GenericWorkingTree,
>,
>;
fn push(
&self,
remote_branch: &dyn PyBranch,
overwrite: bool,
stop_revision: Option<&RevisionId>,
tag_selector: Option<Box<dyn Fn(String) -> bool>>,
) -> Result<(), crate::error::Error>;
fn pull(&self, source_branch: &dyn PyBranch, overwrite: Option<bool>) -> Result<(), Error>;
fn get_parent(&self) -> Option<String>;
fn set_parent(&mut self, parent: &str);
fn get_public_branch(&self) -> Option<String>;
fn get_push_location(&self) -> Option<String>;
fn get_submit_branch(&self) -> Option<String>;
fn user_transport(&self) -> crate::transport::Transport;
fn get_config(&self) -> crate::config::BranchConfig;
fn get_config_stack(&self) -> crate::config::ConfigStack;
fn sprout(&self, to_controldir: &dyn PyControlDir, to_branch_name: &str) -> Result<(), Error>;
fn create_checkout(
&self,
to_location: &std::path::Path,
) -> Result<crate::workingtree::GenericWorkingTree, Error>;
fn generate_revision_history(&self, last_revision: &RevisionId) -> Result<(), Error>;
fn bind(&self, other: &dyn Branch) -> Result<(), Error>;
fn unbind(&self) -> Result<(), Error>;
fn get_bound_location(&self) -> Option<String>;
fn get_old_bound_location(&self) -> Option<String>;
fn is_locked(&self) -> bool;
fn peek_lock_mode(&self) -> Option<char>;
fn get_rev_id(&self, revno: u32) -> Result<RevisionId, Error>;
fn revision_id_to_revno(&self, revision_id: &RevisionId) -> Result<u32, Error>;
fn check_real_revno(&self, revno: u32) -> bool;
fn last_revision_info(&self) -> (u32, RevisionId);
fn set_last_revision_info(&self, revno: u32, revision_id: &RevisionId) -> Result<(), Error>;
fn get_stacked_on_url(&self) -> Result<String, Error>;
fn set_stacked_on_url(&self, url: &str) -> Result<(), Error>;
fn fetch(
&self,
from_branch: &dyn Branch,
last_revision: Option<&RevisionId>,
) -> Result<(), Error>;
fn update(&self) -> Result<(), Error>;
fn set_push_location(&self, location: &str) -> Result<(), Error>;
fn set_public_branch(&self, location: &str) -> Result<(), Error>;
fn get_append_revisions_only(&self) -> bool;
fn set_append_revisions_only(&self, value: bool) -> Result<(), Error>;
}
pub trait PyBranch: Branch + Send + std::any::Any {
fn to_object(&self, py: Python<'_>) -> Py<PyAny>;
}
impl dyn PyBranch {
pub fn as_branch(&self) -> &dyn Branch {
self
}
}
impl<T: PyBranch> Branch for T {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn format(&self) -> BranchFormat {
Python::attach(|py| BranchFormat(self.to_object(py).getattr(py, "_format").unwrap()))
}
fn vcs_type(&self) -> VcsType {
self.repository().vcs_type()
}
fn revno(&self) -> u32 {
Python::attach(|py| {
self.to_object(py)
.call_method0(py, "revno")
.unwrap()
.extract(py)
.unwrap()
})
}
fn lock_read(&self) -> Result<Lock, crate::error::Error> {
Python::attach(|py| {
Ok(Lock::from(
self.to_object(py)
.call_method0(py, intern!(py, "lock_read"))?,
))
})
}
fn lock_write(&self) -> Result<Lock, crate::error::Error> {
Python::attach(|py| {
Ok(Lock::from(
self.to_object(py)
.call_method0(py, intern!(py, "lock_write"))?,
))
})
}
fn tags(&self) -> Result<crate::tags::Tags, crate::error::Error> {
Python::attach(|py| {
Ok(crate::tags::Tags::from(
self.to_object(py).getattr(py, "tags")?,
))
})
}
fn repository(&self) -> GenericRepository {
Python::attach(|py| {
GenericRepository::new(self.to_object(py).getattr(py, "repository").unwrap())
})
}
fn last_revision(&self) -> RevisionId {
Python::attach(|py| {
self.to_object(py)
.call_method0(py, intern!(py, "last_revision"))
.unwrap()
.extract(py)
.unwrap()
})
}
fn name(&self) -> Option<String> {
Python::attach(|py| {
self.to_object(py)
.getattr(py, "name")
.unwrap()
.extract::<Option<String>>(py)
.unwrap()
})
}
fn basis_tree(&self) -> Result<crate::tree::RevisionTree, crate::error::Error> {
Python::attach(|py| {
Ok(crate::tree::RevisionTree(
self.to_object(py).call_method0(py, "basis_tree")?,
))
})
}
fn get_user_url(&self) -> url::Url {
Python::attach(|py| {
let url = self
.to_object(py)
.getattr(py, "user_url")
.unwrap()
.extract::<String>(py)
.unwrap();
url.parse::<url::Url>().unwrap()
})
}
fn controldir(
&self,
) -> Box<
dyn ControlDir<
Branch = GenericBranch,
Repository = crate::repository::GenericRepository,
WorkingTree = crate::workingtree::GenericWorkingTree,
>,
> {
Python::attach(|py| {
Box::new(GenericControlDir::new(
self.to_object(py).getattr(py, "controldir").unwrap(),
))
as Box<
dyn ControlDir<
Branch = GenericBranch,
Repository = crate::repository::GenericRepository,
WorkingTree = crate::workingtree::GenericWorkingTree,
>,
>
})
}
fn push(
&self,
remote_branch: &dyn PyBranch,
overwrite: bool,
stop_revision: Option<&RevisionId>,
tag_selector: Option<Box<dyn Fn(String) -> bool>>,
) -> Result<(), crate::error::Error> {
Python::attach(|py| {
let kwargs = PyDict::new(py);
kwargs.set_item("overwrite", overwrite)?;
if let Some(stop_revision) = stop_revision {
kwargs.set_item("stop_revision", stop_revision.clone())?;
}
if let Some(tag_selector) = tag_selector {
kwargs.set_item("tag_selector", py_tag_selector(py, tag_selector)?)?;
}
self.to_object(py).call_method(
py,
"push",
(&remote_branch.to_object(py),),
Some(&kwargs),
)?;
Ok(())
})
}
fn pull(&self, source_branch: &dyn PyBranch, overwrite: Option<bool>) -> Result<(), Error> {
Python::attach(|py| {
let kwargs = PyDict::new(py);
if let Some(overwrite) = overwrite {
kwargs.set_item("overwrite", overwrite)?;
}
self.to_object(py).call_method(
py,
"pull",
(&source_branch.to_object(py),),
Some(&kwargs),
)?;
Ok(())
})
}
fn get_parent(&self) -> Option<String> {
Python::attach(|py| {
self.to_object(py)
.call_method0(py, "get_parent")
.unwrap()
.extract(py)
.unwrap()
})
}
fn set_parent(&mut self, parent: &str) {
Python::attach(|py| {
self.to_object(py)
.call_method1(py, "set_parent", (parent,))
.unwrap();
})
}
fn get_public_branch(&self) -> Option<String> {
Python::attach(|py| {
self.to_object(py)
.call_method0(py, "get_public_branch")
.unwrap()
.extract(py)
.unwrap()
})
}
fn get_push_location(&self) -> Option<String> {
Python::attach(|py| {
self.to_object(py)
.call_method0(py, "get_push_location")
.unwrap()
.extract(py)
.unwrap()
})
}
fn get_submit_branch(&self) -> Option<String> {
Python::attach(|py| {
self.to_object(py)
.call_method0(py, "get_submit_branch")
.unwrap()
.extract(py)
.unwrap()
})
}
fn user_transport(&self) -> crate::transport::Transport {
Python::attach(|py| {
crate::transport::Transport::new(
self.to_object(py).getattr(py, "user_transport").unwrap(),
)
})
}
fn get_config(&self) -> crate::config::BranchConfig {
Python::attach(|py| {
crate::config::BranchConfig::new(
self.to_object(py).call_method0(py, "get_config").unwrap(),
)
})
}
fn get_config_stack(&self) -> crate::config::ConfigStack {
Python::attach(|py| {
crate::config::ConfigStack::new(
self.to_object(py)
.call_method0(py, "get_config_stack")
.unwrap(),
)
})
}
fn sprout(&self, to_controldir: &dyn PyControlDir, to_branch_name: &str) -> Result<(), Error> {
Python::attach(|py| {
let kwargs = PyDict::new(py);
kwargs.set_item("name", to_branch_name)?;
self.to_object(py).call_method(
py,
"sprout",
(to_controldir.to_object(py),),
Some(&kwargs),
)?;
Ok(())
})
}
fn create_checkout(
&self,
to_location: &std::path::Path,
) -> Result<crate::workingtree::GenericWorkingTree, Error> {
Python::attach(|py| {
self.to_object(py)
.call_method1(
py,
"create_checkout",
(to_location.to_string_lossy().to_string(),),
)
.map(crate::workingtree::GenericWorkingTree)
.map_err(|e| e.into())
})
}
fn generate_revision_history(&self, last_revision: &RevisionId) -> Result<(), Error> {
Python::attach(|py| {
self.to_object(py).call_method1(
py,
"generate_revision_history",
(last_revision.clone().into_pyobject(py).unwrap(),),
)?;
Ok(())
})
}
fn bind(&self, other: &dyn Branch) -> Result<(), Error> {
Python::attach(|py| {
if let Some(gb) = other.as_any().downcast_ref::<GenericBranch>() {
self.to_object(py)
.call_method1(py, "bind", (gb.to_object(py),))?;
} else if let Some(mb) = other.as_any().downcast_ref::<MemoryBranch>() {
self.to_object(py)
.call_method1(py, "bind", (mb.to_object(py),))?;
} else {
return Err(Error::Other(pyo3::exceptions::PyTypeError::new_err(
"Branch must be a PyBranch",
)));
}
Ok(())
})
}
fn unbind(&self) -> Result<(), Error> {
Python::attach(|py| {
self.to_object(py).call_method0(py, "unbind")?;
Ok(())
})
}
fn get_bound_location(&self) -> Option<String> {
Python::attach(|py| {
self.to_object(py)
.call_method0(py, "get_bound_location")
.unwrap()
.extract(py)
.unwrap()
})
}
fn get_old_bound_location(&self) -> Option<String> {
Python::attach(|py| {
self.to_object(py)
.call_method0(py, "get_old_bound_location")
.unwrap()
.extract(py)
.unwrap()
})
}
fn is_locked(&self) -> bool {
Python::attach(|py| {
self.to_object(py)
.call_method0(py, "is_locked")
.unwrap()
.extract(py)
.unwrap()
})
}
fn peek_lock_mode(&self) -> Option<char> {
Python::attach(|py| {
let result = self
.to_object(py)
.call_method0(py, "peek_lock_mode")
.unwrap();
if result.is_none(py) {
None
} else {
let mode: String = result.extract(py).unwrap();
mode.chars().next()
}
})
}
fn get_rev_id(&self, revno: u32) -> Result<RevisionId, Error> {
Python::attach(|py| {
self.to_object(py)
.call_method1(py, "get_rev_id", (revno,))?
.extract(py)
.map_err(Into::into)
})
}
fn revision_id_to_revno(&self, revision_id: &RevisionId) -> Result<u32, Error> {
Python::attach(|py| {
self.to_object(py)
.call_method1(py, "revision_id_to_revno", (revision_id.clone(),))?
.extract(py)
.map_err(Into::into)
})
}
fn check_real_revno(&self, revno: u32) -> bool {
Python::attach(|py| {
self.to_object(py)
.call_method1(py, "check_real_revno", (revno,))
.unwrap()
.extract(py)
.unwrap()
})
}
fn last_revision_info(&self) -> (u32, RevisionId) {
Python::attach(|py| {
self.to_object(py)
.call_method0(py, "last_revision_info")
.unwrap()
.extract(py)
.unwrap()
})
}
fn set_last_revision_info(&self, revno: u32, revision_id: &RevisionId) -> Result<(), Error> {
Python::attach(|py| {
self.to_object(py).call_method1(
py,
"set_last_revision_info",
(revno, revision_id.clone()),
)?;
Ok(())
})
}
fn get_stacked_on_url(&self) -> Result<String, Error> {
Python::attach(|py| {
self.to_object(py)
.call_method0(py, "get_stacked_on_url")?
.extract(py)
.map_err(Into::into)
})
}
fn set_stacked_on_url(&self, url: &str) -> Result<(), Error> {
Python::attach(|py| {
self.to_object(py)
.call_method1(py, "set_stacked_on_url", (url,))?;
Ok(())
})
}
fn fetch(
&self,
from_branch: &dyn Branch,
last_revision: Option<&RevisionId>,
) -> Result<(), Error> {
Python::attach(|py| {
let kwargs = PyDict::new(py);
if let Some(rev) = last_revision {
kwargs.set_item("last_revision", rev.clone())?;
}
if let Some(gb) = from_branch.as_any().downcast_ref::<GenericBranch>() {
self.to_object(py)
.call_method(py, "fetch", (gb.to_object(py),), Some(&kwargs))?;
} else if let Some(mb) = from_branch.as_any().downcast_ref::<MemoryBranch>() {
self.to_object(py)
.call_method(py, "fetch", (mb.to_object(py),), Some(&kwargs))?;
} else {
return Err(Error::Other(pyo3::exceptions::PyTypeError::new_err(
"Branch must be a PyBranch",
)));
}
Ok(())
})
}
fn update(&self) -> Result<(), Error> {
Python::attach(|py| {
self.to_object(py).call_method0(py, "update")?;
Ok(())
})
}
fn set_push_location(&self, location: &str) -> Result<(), Error> {
Python::attach(|py| {
self.to_object(py)
.call_method1(py, "set_push_location", (location,))?;
Ok(())
})
}
fn set_public_branch(&self, location: &str) -> Result<(), Error> {
Python::attach(|py| {
self.to_object(py)
.call_method1(py, "set_public_branch", (location,))?;
Ok(())
})
}
fn get_append_revisions_only(&self) -> bool {
Python::attach(|py| {
self.to_object(py)
.call_method0(py, "get_append_revisions_only")
.unwrap()
.extract(py)
.unwrap()
})
}
fn set_append_revisions_only(&self, value: bool) -> Result<(), Error> {
Python::attach(|py| {
self.to_object(py)
.call_method1(py, "set_append_revisions_only", (value,))?;
Ok(())
})
}
}
pub struct GenericBranch(Py<PyAny>);
impl Clone for GenericBranch {
fn clone(&self) -> Self {
Python::attach(|py| GenericBranch(self.0.clone_ref(py)))
}
}
impl PyBranch for GenericBranch {
fn to_object(&self, py: Python<'_>) -> Py<PyAny> {
self.0.clone_ref(py)
}
}
impl<'py> IntoPyObject<'py> for GenericBranch {
type Target = PyAny;
type Output = Bound<'py, Self::Target>;
type Error = std::convert::Infallible;
fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
Ok(self.0.into_bound(py))
}
}
impl<'a, 'py> FromPyObject<'a, 'py> for GenericBranch {
type Error = PyErr;
fn extract(ob: Borrowed<'a, 'py, PyAny>) -> PyResult<Self> {
Ok(GenericBranch(ob.to_owned().unbind()))
}
}
impl<'py> From<Bound<'py, PyAny>> for GenericBranch {
fn from(ob: Bound<PyAny>) -> Self {
GenericBranch(ob.unbind())
}
}
impl From<Py<PyAny>> for GenericBranch {
fn from(gb: Py<PyAny>) -> Self {
GenericBranch(gb)
}
}
pub struct MemoryBranch(Py<PyAny>);
impl Clone for MemoryBranch {
fn clone(&self) -> Self {
Python::attach(|py| MemoryBranch(self.0.clone_ref(py)))
}
}
impl PyBranch for MemoryBranch {
fn to_object(&self, py: Python<'_>) -> Py<PyAny> {
self.0.clone_ref(py)
}
}
impl MemoryBranch {
pub fn new<R: PyRepository>(repository: &R, revno: Option<u32>, revid: &RevisionId) -> Self {
Python::attach(|py| {
let mb_cls = py
.import("breezy.memorybranch")
.unwrap()
.getattr("MemoryBranch")
.unwrap();
let o = mb_cls
.call1((repository.to_object(py), (revno, revid.clone())))
.unwrap();
MemoryBranch(o.unbind())
})
}
}
pub(crate) fn py_tag_selector(
py: Python,
tag_selector: Box<dyn Fn(String) -> bool>,
) -> PyResult<Py<PyAny>> {
#[pyclass(unsendable)]
struct PyTagSelector(Box<dyn Fn(String) -> bool>);
#[pymethods]
impl PyTagSelector {
fn __call__(&self, tag: String) -> bool {
(self.0)(tag)
}
}
Ok(PyTagSelector(tag_selector)
.into_pyobject(py)
.unwrap()
.unbind()
.into())
}
#[deprecated(
since = "0.7.7",
note = "Use `open_as_generic` instead to avoid unnecessary boxing"
)]
pub fn open(url: &url::Url) -> Result<Box<dyn Branch>, Error> {
open_as_generic(url).map(|b| Box::new(b) as Box<dyn Branch>)
}
pub fn open_as_generic(url: &url::Url) -> Result<GenericBranch, Error> {
Python::attach(|py| {
let m = py.import("breezy.branch").unwrap();
let c = m.getattr("Branch").unwrap();
let r = c.call_method1("open", (url.to_string(),))?;
Ok(GenericBranch::from(r))
})
}
#[deprecated(
since = "0.7.7",
note = "Use `open_containing_as_generic` instead to avoid unnecessary boxing"
)]
pub fn open_containing(url: &url::Url) -> Result<(Box<dyn Branch>, String), Error> {
open_containing_as_generic(url).map(|(b, p)| (Box::new(b) as Box<dyn Branch>, p))
}
pub fn open_containing_as_generic(url: &url::Url) -> Result<(GenericBranch, String), Error> {
Python::attach(|py| {
let m = py.import("breezy.branch").unwrap();
let c = m.getattr("Branch").unwrap();
let (b, p): (Bound<PyAny>, String) = c
.call_method1("open_containing", (url.to_string(),))?
.extract()?;
Ok((GenericBranch(b.unbind()), p))
})
}
#[deprecated(
since = "0.7.7",
note = "Use `open_from_transport_as_generic` instead to avoid unnecessary boxing"
)]
pub fn open_from_transport(
transport: &crate::transport::Transport,
) -> Result<Box<dyn Branch>, Error> {
open_from_transport_as_generic(transport).map(|b| Box::new(b) as Box<dyn Branch>)
}
pub fn open_from_transport_as_generic(
transport: &crate::transport::Transport,
) -> Result<GenericBranch, Error> {
Python::attach(|py| {
let m = py.import("breezy.branch").unwrap();
let c = m.getattr("Branch").unwrap();
let r = c.call_method1("open_from_transport", (transport.as_pyobject(),))?;
Ok(GenericBranch(r.unbind()))
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_create_and_clone() {
crate::init();
let td = tempfile::tempdir().unwrap();
let url = url::Url::from_directory_path(td.path()).unwrap();
let branch = crate::controldir::create_branch_convenience_as_generic(
&url,
None,
&crate::controldir::ControlDirFormat::default(),
)
.unwrap();
assert_eq!(branch.revno(), 0);
assert_eq!(branch.last_revision(), RevisionId::null());
}
#[test]
fn test_create_and_clone_memory() {
crate::init();
let td = tempfile::tempdir().unwrap();
let url = url::Url::from_directory_path(td.path()).unwrap();
let branch = crate::controldir::create_branch_convenience_as_generic(
&url,
None,
&crate::controldir::ControlDirFormat::default(),
)
.unwrap();
let branch = MemoryBranch::new(&branch.repository(), None, &RevisionId::null());
assert_eq!(branch.last_revision(), RevisionId::null());
}
}