use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use futures::StreamExt;
use futures::TryStreamExt;
use crate::dag::MemDag;
use crate::errors::programming;
use crate::ops::DagAddHeads;
use crate::ops::IdConvert;
use crate::ops::IdDagAlgorithm;
use crate::ops::Parents;
use crate::ops::ToIdSet;
use crate::ops::ToSet;
use crate::set::hints::Hints;
use crate::utils;
use crate::DagAlgorithm;
use crate::Group;
use crate::Id;
use crate::IdSet;
use crate::Result;
use crate::Set;
use crate::Vertex;
use crate::VertexListWithOptions;
pub(crate) async fn beautify(
this: &(impl DagAlgorithm + ?Sized),
main_branch: Option<Set>,
) -> Result<MemDag> {
let all = this.all().await?;
let usize_to_vertex: Vec<Vertex> = all.iter_rev().await?.try_collect().await?;
let vertex_to_usize: HashMap<Vertex, usize> = usize_to_vertex
.iter()
.enumerate()
.map(|(i, v)| (v.clone(), i))
.collect();
let mut priorities = Vec::new();
let main_branch = main_branch.unwrap_or_else(Set::empty);
let mut parents_vec = Vec::with_capacity(usize_to_vertex.len());
for (i, vertex) in usize_to_vertex.iter().enumerate() {
if main_branch.contains(vertex).await? {
priorities.push(i);
}
let parent_vertexes = this.parent_names(vertex.clone()).await?;
let parent_usizes: Vec<usize> = parent_vertexes
.iter()
.filter_map(|p| vertex_to_usize.get(p))
.copied()
.collect();
parents_vec.push(parent_usizes);
}
let sorted = utils::beautify_graph(&parents_vec, &priorities);
let mut dag = MemDag::new();
let snapshot = this.dag_snapshot()?;
for i in sorted.into_iter().rev() {
let heads: Vec<Vertex> = vec![usize_to_vertex[i].clone()];
dag.add_heads(&snapshot, &heads.into()).await?;
}
Ok(dag)
}
pub(crate) async fn subdag(this: &(impl DagAlgorithm + ?Sized), set: Set) -> Result<MemDag> {
let set = this.sort(&set).await?;
let parents = match set.to_parents().await? {
Some(p) => p,
None => return programming("Set returned by dag.sort() should support to_parents()"),
};
let mut dag = MemDag::new();
let heads = this.heads_ancestors(set).await?;
let heads: Vec<Vertex> = heads.iter_rev().await?.try_collect().await?;
let heads = VertexListWithOptions::from(heads).with_desired_group(Group::MASTER);
dag.add_heads(&parents, &heads).await?;
Ok(dag)
}
pub(crate) async fn set_to_parents(set: &Set) -> Result<Option<impl Parents>> {
let (id_set, id_map) = match set.to_id_set_and_id_map_in_o1() {
Some(v) => v,
None => return Ok(None),
};
let dag = match set.dag() {
None => return Ok(None),
Some(dag) => dag,
};
let id_dag = dag.id_dag_snapshot()?;
let ids: Vec<Id> = id_set.iter_desc().collect();
id_map.vertex_name_batch(&ids).await?;
struct IdParents {
id_set: IdSet,
id_dag: Arc<dyn IdDagAlgorithm + Send + Sync>,
id_map: Arc<dyn IdConvert + Send + Sync>,
}
#[async_trait::async_trait]
impl Parents for IdParents {
async fn parent_names(&self, name: Vertex) -> Result<Vec<Vertex>> {
tracing::debug!(
target: "dag::idparents",
"resolving parents for {:?}", &name,
);
let id = self.id_map.vertex_id(name).await?;
let direct_parent_ids = self.id_dag.parent_ids(id)?;
let parent_ids = if direct_parent_ids.iter().all(|&id| self.id_set.contains(id)) {
direct_parent_ids
} else {
let parent_id_set = IdSet::from_spans(direct_parent_ids);
let ancestors = self.id_dag.ancestors(parent_id_set)?;
let heads = ancestors.intersection(&self.id_set);
let heads = self.id_dag.heads_ancestors(heads)?;
heads.iter_desc().collect()
};
let vertexes = self.id_map.vertex_name_batch(&parent_ids).await?;
let parents = vertexes.into_iter().collect::<Result<Vec<_>>>()?;
Ok(parents)
}
async fn hint_subdag_for_insertion(&self, _heads: &[Vertex]) -> Result<MemDag> {
tracing::warn!(
target: "dag::idparents",
"IdParents does not implement hint_subdag_for_insertion() for efficient insertion"
);
Ok(MemDag::new())
}
}
let parents = IdParents {
id_set,
id_dag,
id_map,
};
Ok(Some(parents))
}
pub(crate) async fn parents(this: &(impl DagAlgorithm + ?Sized), set: Set) -> Result<Set> {
let mut result: Vec<Vertex> = Vec::new();
let mut iter = set.iter().await?;
while let Some(vertex) = iter.next().await {
let parents = this.parent_names(vertex?).await?;
result.extend(parents);
}
Ok(Set::from_static_names(result))
}
pub(crate) async fn first_ancestor_nth(
this: &(impl DagAlgorithm + ?Sized),
name: Vertex,
n: u64,
) -> Result<Option<Vertex>> {
let mut vertex = name.clone();
for _ in 0..n {
let parents = this.parent_names(vertex).await?;
if parents.is_empty() {
return Ok(None);
}
vertex = parents[0].clone();
}
Ok(Some(vertex))
}
pub(crate) async fn first_ancestors(this: &(impl DagAlgorithm + ?Sized), set: Set) -> Result<Set> {
let mut to_visit: Vec<Vertex> = {
let mut list = Vec::with_capacity(set.count_slow().await?.try_into()?);
let mut iter = set.iter().await?;
while let Some(next) = iter.next().await {
let vertex = next?;
list.push(vertex);
}
list
};
let mut visited: HashSet<Vertex> = to_visit.clone().into_iter().collect();
while let Some(v) = to_visit.pop() {
#[allow(clippy::never_loop)]
if let Some(parent) = this.parent_names(v).await?.into_iter().next() {
if visited.insert(parent.clone()) {
to_visit.push(parent);
}
}
}
let hints = Hints::new_inherit_idmap_dag(set.hints());
let set = Set::from_iter(visited.into_iter().map(Ok), hints);
this.sort(&set).await
}
pub(crate) async fn heads(this: &(impl DagAlgorithm + ?Sized), set: Set) -> Result<Set> {
Ok(set.clone() - this.parents(set).await?)
}
pub(crate) async fn roots(this: &(impl DagAlgorithm + ?Sized), set: Set) -> Result<Set> {
Ok(set.clone() - this.children(set).await?)
}
pub(crate) async fn merges(this: &(impl DagAlgorithm + ?Sized), set: Set) -> Result<Set> {
let this = this.dag_snapshot()?;
Ok(set.filter(Box::new(move |v: &Vertex| {
let this = this.clone();
Box::pin(async move {
DagAlgorithm::parent_names(&this, v.clone())
.await
.map(|ps| ps.len() >= 2)
})
})))
}
pub(crate) async fn reachable_roots(
this: &(impl DagAlgorithm + ?Sized),
roots: Set,
heads: Set,
) -> Result<Set> {
let heads_ancestors = this.ancestors(heads.clone()).await?;
let roots = roots & heads_ancestors.clone(); let only = heads_ancestors - this.ancestors(roots.clone()).await?;
Ok(roots.clone() & (heads.clone() | this.parents(only).await?))
}
pub(crate) async fn heads_ancestors(this: &(impl DagAlgorithm + ?Sized), set: Set) -> Result<Set> {
this.heads(this.ancestors(set).await?).await
}
pub(crate) async fn only(
this: &(impl DagAlgorithm + ?Sized),
reachable: Set,
unreachable: Set,
) -> Result<Set> {
let reachable = this.ancestors(reachable).await?;
let unreachable = this.ancestors(unreachable).await?;
Ok(reachable - unreachable)
}
pub(crate) async fn only_both(
this: &(impl DagAlgorithm + ?Sized),
reachable: Set,
unreachable: Set,
) -> Result<(Set, Set)> {
let reachable = this.ancestors(reachable).await?;
let unreachable = this.ancestors(unreachable).await?;
Ok((reachable - unreachable.clone(), unreachable))
}
pub(crate) async fn gca_one(
this: &(impl DagAlgorithm + ?Sized),
set: Set,
) -> Result<Option<Vertex>> {
this.gca_all(set)
.await?
.iter()
.await?
.next()
.await
.transpose()
}
pub(crate) async fn gca_all(this: &(impl DagAlgorithm + ?Sized), set: Set) -> Result<Set> {
this.heads_ancestors(this.common_ancestors(set).await?)
.await
}
pub(crate) async fn common_ancestors(this: &(impl DagAlgorithm + ?Sized), set: Set) -> Result<Set> {
let result = match set.count_slow().await? {
0 => set,
1 => this.ancestors(set).await?,
_ => {
let set = this.roots(set).await?;
let mut iter = set.iter().await?;
let mut result = this
.ancestors(Set::from(iter.next().await.unwrap()?))
.await?;
while let Some(v) = iter.next().await {
result = result.intersection(&this.ancestors(Set::from(v?)).await?);
}
result
}
};
Ok(result)
}
pub(crate) async fn is_ancestor(
this: &(impl DagAlgorithm + ?Sized),
ancestor: Vertex,
descendant: Vertex,
) -> Result<bool> {
let mut to_visit = vec![descendant];
let mut visited: HashSet<_> = to_visit.clone().into_iter().collect();
while let Some(v) = to_visit.pop() {
if v == ancestor {
return Ok(true);
}
for parent in this.parent_names(v).await? {
if visited.insert(parent.clone()) {
to_visit.push(parent);
}
}
}
Ok(false)
}
pub async fn suggest_bisect(
this: &(impl DagAlgorithm + ToIdSet + ToSet + IdConvert + ?Sized),
roots: Set,
heads: Set,
skip: Set,
) -> Result<(Option<Vertex>, Set, Set)> {
let roots = this.to_id_set(&roots).await?;
let heads = this.to_id_set(&heads).await?;
let skip = this.to_id_set(&skip).await?;
let (maybe_id, untested, heads) = this
.id_dag_snapshot()?
.suggest_bisect(&roots, &heads, &skip)?;
let maybe_vertex = match maybe_id {
Some(id) => Some(this.vertex_name(id).await?),
None => None,
};
let untested = this.to_set(&untested)?;
let heads = this.to_set(&heads)?;
Ok((maybe_vertex, untested, heads))
}
#[tracing::instrument(skip(this), level=tracing::Level::DEBUG)]
pub(crate) async fn hint_subdag_for_insertion(
this: &(impl Parents + ?Sized),
scope: &Set,
heads: &[Vertex],
) -> Result<MemDag> {
let count = scope.count_slow().await?;
tracing::trace!("hint_subdag_for_insertion: pending vertexes: {}", count);
struct ScopedParents<'a, P: Parents + ?Sized> {
parents: &'a P,
scope: &'a Set,
}
#[async_trait::async_trait]
impl<'a, P: Parents + ?Sized> Parents for ScopedParents<'a, P> {
async fn parent_names(&self, name: Vertex) -> Result<Vec<Vertex>> {
let parents: Vec<Vertex> = self.parents.parent_names(name).await?;
let mut filtered_parents = Vec::with_capacity(parents.len());
for v in parents {
if self.scope.contains(&v).await? {
filtered_parents.push(v)
}
}
Ok(filtered_parents)
}
async fn hint_subdag_for_insertion(&self, _heads: &[Vertex]) -> Result<MemDag> {
Ok(MemDag::new())
}
}
let mut dag = MemDag::new();
assert!(!dag.is_vertex_lazy());
let scoped_parents = ScopedParents {
parents: this,
scope,
};
let heads_in_scope = {
let mut heads_in_scope = Vec::with_capacity(heads.len());
for head in heads {
if scope.contains(head).await? {
heads_in_scope.push(head.clone());
}
}
heads_in_scope
};
dag.add_heads(&scoped_parents, &heads_in_scope.into())
.await?;
Ok(dag)
}