use crate::id::Group;
use crate::id::VertexName;
use crate::iddag::IdDag;
use crate::iddag::SyncableIdDag;
use crate::iddagstore::IndexedLogStore;
use crate::idmap::IdMap;
use crate::idmap::IdMapLike;
use crate::idmap::SyncableIdMap;
use crate::nameset::dag::DagSet;
use crate::nameset::NameSet;
use crate::spanset::SpanSet;
use anyhow::{anyhow, bail, ensure, Result};
use indexedlog::multi;
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::sync::Arc;
pub struct NameDag {
pub(crate) dag: IdDag<IndexedLogStore>,
pub(crate) map: IdMap,
pub(crate) snapshot_map: Arc<IdMap>,
mlog: multi::MultiLog,
pending_heads: Vec<VertexName>,
}
impl NameDag {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref();
let opts = multi::OpenOptions::from_name_opts(vec![
("idmap", IdMap::log_open_options()),
("iddag", IndexedLogStore::log_open_options()),
]);
let mut mlog = opts.open(path)?;
let mut logs = mlog.detach_logs();
let dag_log = logs.pop().unwrap();
let map_log = logs.pop().unwrap();
let map = IdMap::open_from_log(map_log)?;
let dag = IdDag::open_from_store(IndexedLogStore::open_from_log(dag_log))?;
let snapshot_map = Arc::new(map.try_clone()?);
Ok(Self {
dag,
map,
snapshot_map,
mlog,
pending_heads: Default::default(),
})
}
pub fn add_heads_and_flush<F>(
&mut self,
parent_names_func: F,
master_names: &[VertexName],
non_master_names: &[VertexName],
) -> Result<()>
where
F: Fn(VertexName) -> Result<Vec<VertexName>>,
{
ensure!(
self.pending_heads.is_empty(),
"ProgrammingError: add_heads_and_flush called with pending heads ({:?})",
&self.pending_heads,
);
if master_names.iter().all(|n| {
is_ok_some(
self.map
.find_id_by_name_with_max_group(n.as_ref(), Group::MASTER),
)
}) && non_master_names
.iter()
.all(|n| is_ok_some(self.map.find_id_by_name(n.as_ref())))
{
return Ok(());
}
let lock = self.mlog.lock()?;
let mut map = self.map.prepare_filesystem_sync()?;
let mut dag = self.dag.prepare_filesystem_sync()?;
build(
&mut map,
&mut dag,
parent_names_func,
master_names,
non_master_names,
)?;
map.sync()?;
dag.sync(std::iter::once(&mut self.dag))?;
self.mlog.write_meta(&lock)?;
self.snapshot_map = Arc::new(self.map.try_clone()?);
Ok(())
}
pub fn add_heads<F>(&mut self, parents: F, heads: &[VertexName]) -> Result<()>
where
F: Fn(VertexName) -> Result<Vec<VertexName>>,
{
let group = Group::NON_MASTER;
for head in heads.iter() {
if self.map.find_id_by_name(head.as_ref())?.is_none() {
self.map.assign_head(head.clone(), &parents, group)?;
self.pending_heads.push(head.clone());
}
}
let parent_ids_func = self.map.build_get_parents_by_id(&parents);
let id = self.map.next_free_id(group)?;
if id > group.min_id() {
self.dag.build_segments_volatile(id - 1, &parent_ids_func)?;
}
Ok(())
}
pub fn flush(&mut self, master_heads: &[VertexName]) -> Result<()> {
for head in master_heads.iter() {
ensure!(
self.map.find_id_by_name(head.as_ref())?.is_some(),
"head {:?} does not exist in DAG",
head
);
}
let parents_map = self.pending_graph()?;
let mut non_master_heads = Vec::new();
std::mem::swap(&mut self.pending_heads, &mut non_master_heads);
self.reload()?;
let parents = |name| {
parents_map.get(&name).cloned().ok_or_else(|| {
anyhow!(
"{:?} not found in parent map ({:?}, {:?})",
&name,
&parents_map,
&non_master_heads,
)
})
};
let flush_result = self.add_heads_and_flush(&parents, master_heads, &non_master_heads);
if let Err(flush_err) = flush_result {
return match self.add_heads(&parents, &non_master_heads) {
Ok(_) => Err(flush_err),
Err(err) => Err(flush_err.context(err)),
};
}
Ok(())
}
pub fn reload(&mut self) -> Result<()> {
self.map.reload()?;
self.dag.reload()?;
self.pending_heads.clear();
Ok(())
}
pub fn sort(&self, set: &NameSet) -> Result<NameSet> {
if set.is_topo_sorted() {
Ok(set.clone())
} else {
let mut spans = SpanSet::empty();
for name in set.iter()? {
let id = self.snapshot_map.vertex_id(name?)?;
spans.push(id);
}
Ok(NameSet::from_spans_idmap(spans, self.snapshot_map.clone()))
}
}
pub fn parent_names(&self, name: VertexName) -> Result<Vec<VertexName>> {
let id = match self.map.find_id_by_name(name.as_ref())? {
Some(id) => id,
None => bail!("{:?} does not exist in DAG", name),
};
self.dag
.parent_ids(id)?
.into_iter()
.map(|id| match self.map.find_vertex_name_by_id(id)? {
Some(name) => Ok(name),
None => bail!("cannot resolve parent id {} to name", id),
})
.collect()
}
fn pending_graph(&self) -> Result<HashMap<VertexName, Vec<VertexName>>> {
let mut parents_map: HashMap<VertexName, Vec<VertexName>> = Default::default();
let mut to_visit: Vec<VertexName> = self.pending_heads.clone();
while let Some(name) = to_visit.pop() {
let group = self.map.find_id_by_name(name.as_ref())?.map(|i| i.group());
if group == Some(Group::MASTER) {
continue;
}
let parents = self.parent_names(name.clone())?;
for parent in parents.iter() {
to_visit.push(parent.clone());
}
parents_map.insert(name, parents);
}
Ok(parents_map)
}
}
impl NameDag {
pub fn all(&self) -> Result<NameSet> {
let spans = self.dag.all()?;
let query = DagSet::from_spans_idmap(spans, self.snapshot_map.clone()).mark_as_all();
Ok(NameSet::from_query(query))
}
pub fn ancestors(&self, set: NameSet) -> Result<NameSet> {
let spans = self.dag.ancestors(self.to_span_set(set)?)?;
Ok(NameSet::from_spans_idmap(spans, self.snapshot_map.clone()))
}
pub fn parents(&self, set: NameSet) -> Result<NameSet> {
let spans = self.dag.parents(self.to_span_set(set)?)?;
Ok(NameSet::from_spans_idmap(spans, self.snapshot_map.clone()))
}
pub fn first_ancestor_nth(&self, name: VertexName, n: u64) -> Result<VertexName> {
let id = self.map.vertex_id(name)?;
let id = self.dag.first_ancestor_nth(id, n)?;
self.map.vertex_name(id)
}
pub fn heads(&self, set: NameSet) -> Result<NameSet> {
let spans = self.dag.heads(self.to_span_set(set)?)?;
Ok(NameSet::from_spans_idmap(spans, self.snapshot_map.clone()))
}
pub fn children(&self, set: NameSet) -> Result<NameSet> {
let spans = self.dag.children(self.to_span_set(set)?)?;
Ok(NameSet::from_spans_idmap(spans, self.snapshot_map.clone()))
}
pub fn roots(&self, set: NameSet) -> Result<NameSet> {
let spans = self.dag.roots(self.to_span_set(set)?)?;
Ok(NameSet::from_spans_idmap(spans, self.snapshot_map.clone()))
}
pub fn gca_one(&self, set: NameSet) -> Result<Option<VertexName>> {
match self.dag.gca_one(self.to_span_set(set)?)? {
None => Ok(None),
Some(id) => Ok(Some(self.map.vertex_name(id)?)),
}
}
pub fn gca_all(&self, set: NameSet) -> Result<NameSet> {
let spans = self.dag.gca_all(self.to_span_set(set)?)?;
Ok(NameSet::from_spans_idmap(spans, self.snapshot_map.clone()))
}
pub fn common_ancestors(&self, set: NameSet) -> Result<NameSet> {
let spans = self.dag.common_ancestors(self.to_span_set(set)?)?;
Ok(NameSet::from_spans_idmap(spans, self.snapshot_map.clone()))
}
pub fn is_ancestor(&self, ancestor: VertexName, descendant: VertexName) -> Result<bool> {
let ancestor_id = self.map.vertex_id(ancestor)?;
let descendant_id = self.map.vertex_id(descendant)?;
self.dag.is_ancestor(ancestor_id, descendant_id)
}
pub fn heads_ancestors(&self, set: NameSet) -> Result<NameSet> {
let spans = self.dag.heads_ancestors(self.to_span_set(set)?)?;
Ok(NameSet::from_spans_idmap(spans, self.snapshot_map.clone()))
}
pub fn range(&self, roots: NameSet, heads: NameSet) -> Result<NameSet> {
let roots = self.to_span_set(roots)?;
let heads = self.to_span_set(heads)?;
let spans = self.dag.range(roots, heads)?;
Ok(NameSet::from_spans_idmap(spans, self.snapshot_map.clone()))
}
pub fn descendants(&self, set: NameSet) -> Result<NameSet> {
let spans = self.dag.descendants(self.to_span_set(set)?)?;
Ok(NameSet::from_spans_idmap(spans, self.snapshot_map.clone()))
}
fn to_span_set(&self, set: NameSet) -> Result<SpanSet> {
if let Some(set) = set.as_any().downcast_ref::<DagSet>() {
if Arc::ptr_eq(&set.map, &self.snapshot_map) {
return Ok(set.spans.clone());
}
}
let mut spans = SpanSet::empty();
for name in set.iter()? {
let name = name?;
let id = self.map.vertex_id(name)?;
spans.push(id);
}
Ok(spans)
}
}
fn non_master_parent_names(
map: &SyncableIdMap,
dag: &SyncableIdDag<IndexedLogStore>,
) -> Result<HashMap<VertexName, Vec<VertexName>>> {
let parent_ids = dag.non_master_parent_ids()?;
let parent_names = parent_ids
.iter()
.map(|(id, parent_ids)| {
let name = map.vertex_name(*id)?;
let parent_names = parent_ids
.into_iter()
.map(|p| map.vertex_name(*p))
.collect::<Result<Vec<_>>>()?;
Ok((name, parent_names))
})
.collect::<Result<HashMap<_, _>>>()?;
Ok(parent_names)
}
pub fn rebuild_non_master(
map: &mut SyncableIdMap,
dag: &mut SyncableIdDag<IndexedLogStore>,
) -> Result<()> {
let parents = non_master_parent_names(map, dag)?;
let mut heads = parents
.keys()
.collect::<HashSet<_>>()
.difference(
&parents
.values()
.flat_map(|ps| ps.into_iter())
.collect::<HashSet<_>>(),
)
.map(|&v| v.clone())
.collect::<Vec<_>>();
heads.sort_unstable();
dag.remove_non_master()?;
map.remove_non_master()?;
let parent_func = |name: VertexName| match parents.get(&name) {
Some(names) => Ok(names.iter().cloned().collect()),
None => bail!(
"bug: parents of {:?} is missing (in rebuild_non_master)",
name
),
};
build(map, dag, parent_func, &[], &heads[..])?;
Ok(())
}
pub fn build<F>(
map: &mut SyncableIdMap,
dag: &mut SyncableIdDag<IndexedLogStore>,
parent_names_func: F,
master_heads: &[VertexName],
non_master_heads: &[VertexName],
) -> Result<()>
where
F: Fn(VertexName) -> Result<Vec<VertexName>>,
{
for (nodes, group) in [
(master_heads, Group::MASTER),
(non_master_heads, Group::NON_MASTER),
]
.iter()
{
for node in nodes.iter() {
map.assign_head(node.clone(), &parent_names_func, *group)?;
}
}
{
let parent_ids_func = map.build_get_parents_by_id(&parent_names_func);
for &group in Group::ALL.iter() {
let id = map.next_free_id(group)?;
if id > group.min_id() {
dag.build_segments_persistent(id - 1, &parent_ids_func)?;
}
}
}
if map.need_rebuild_non_master {
rebuild_non_master(map, dag)?;
}
Ok(())
}
pub unsafe trait LowLevelAccess {
fn dag(&self) -> &IdDag<IndexedLogStore>;
fn map(&self) -> &IdMap;
}
unsafe impl LowLevelAccess for NameDag {
fn dag(&self) -> &IdDag<IndexedLogStore> {
&self.dag
}
fn map(&self) -> &IdMap {
&self.map
}
}
fn is_ok_some<T>(value: Result<Option<T>>) -> bool {
match value {
Ok(Some(_)) => true,
_ => false,
}
}