esl01-dag 0.3.0

An implementation of a DAG used for source control.
Documentation
/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * This source code is licensed under the MIT license found in the
 * LICENSE file in the root directory of this source tree.
 */

//! DAG and Id operations (mostly traits)

use std::sync::Arc;

use futures::StreamExt;
use futures::TryStreamExt;

use crate::clone::CloneData;
use crate::default_impl;
use crate::errors::NotFoundError;
use crate::id::Group;
use crate::id::Id;
use crate::id::VertexName;
pub use crate::iddag::IdDagAlgorithm;
use crate::namedag::MemNameDag;
use crate::nameset::id_lazy::IdLazySet;
use crate::nameset::id_static::IdStaticSet;
use crate::nameset::NameSet;
use crate::IdSet;
use crate::Result;
use crate::VerLink;
use crate::VertexListWithOptions;

/// DAG related read-only algorithms.
#[async_trait::async_trait]
pub trait DagAlgorithm: Send + Sync {
    /// Sort a `NameSet` topologically.
    ///
    /// The returned set should have `dag` and `id_map` hints set to associated
    /// with this dag or its previous compatible version. For example, if a
    /// `set` is sorted on another dag but not in this dag, it should be resorted
    /// using this dag.  If a `set` is empty and not associated to the current
    /// `dag` in its hints, the return value should be a different empty `set`
    /// that has the `dag` and `id_map` hints set to this dag.
    async fn sort(&self, set: &NameSet) -> Result<NameSet>;

    /// Re-create the graph so it looks better when rendered.
    async fn beautify(&self, main_branch: Option<NameSet>) -> Result<MemNameDag> {
        default_impl::beautify(self, main_branch).await
    }

    /// Extract a sub graph containing only specified vertexes.
    async fn subdag(&self, set: NameSet) -> Result<MemNameDag> {
        default_impl::subdag(self, set).await
    }

    /// Get ordered parent vertexes.
    async fn parent_names(&self, name: VertexName) -> Result<Vec<VertexName>>;

    /// Returns a set that covers all vertexes tracked by this DAG.
    async fn all(&self) -> Result<NameSet>;

    /// Returns a set that covers all vertexes in the master group.
    async fn master_group(&self) -> Result<NameSet>;

    /// Calculates all ancestors reachable from any name from the given set.
    async fn ancestors(&self, set: NameSet) -> Result<NameSet>;

    /// Calculates parents of the given set.
    ///
    /// Note: Parent order is not preserved. Use [`NameDag::parent_names`]
    /// to preserve order.
    async fn parents(&self, set: NameSet) -> Result<NameSet> {
        default_impl::parents(self, set).await
    }

    /// Calculates the n-th first ancestor.
    async fn first_ancestor_nth(&self, name: VertexName, n: u64) -> Result<Option<VertexName>> {
        default_impl::first_ancestor_nth(self, name, n).await
    }

    /// Calculates ancestors but only follows the first parent.
    async fn first_ancestors(&self, set: NameSet) -> Result<NameSet> {
        default_impl::first_ancestors(self, set).await
    }

    /// Calculates heads of the given set.
    async fn heads(&self, set: NameSet) -> Result<NameSet> {
        default_impl::heads(self, set).await
    }

    /// Calculates children of the given set.
    async fn children(&self, set: NameSet) -> Result<NameSet>;

    /// Calculates roots of the given set.
    async fn roots(&self, set: NameSet) -> Result<NameSet> {
        default_impl::roots(self, set).await
    }

    /// Calculates merges of the selected set (vertexes with >=2 parents).
    async fn merges(&self, set: NameSet) -> Result<NameSet> {
        default_impl::merges(self, set).await
    }

    /// Calculates one "greatest common ancestor" of the given set.
    ///
    /// If there are no common ancestors, return None.
    /// If there are multiple greatest common ancestors, pick one arbitrarily.
    /// Use `gca_all` to get all of them.
    async fn gca_one(&self, set: NameSet) -> Result<Option<VertexName>> {
        default_impl::gca_one(self, set).await
    }

    /// Calculates all "greatest common ancestor"s of the given set.
    /// `gca_one` is faster if an arbitrary answer is ok.
    async fn gca_all(&self, set: NameSet) -> Result<NameSet> {
        default_impl::gca_all(self, set).await
    }

    /// Calculates all common ancestors of the given set.
    async fn common_ancestors(&self, set: NameSet) -> Result<NameSet> {
        default_impl::common_ancestors(self, set).await
    }

    /// Tests if `ancestor` is an ancestor of `descendant`.
    async fn is_ancestor(&self, ancestor: VertexName, descendant: VertexName) -> Result<bool> {
        default_impl::is_ancestor(self, ancestor, descendant).await
    }

    /// Calculates "heads" of the ancestors of the given set. That is,
    /// Find Y, which is the smallest subset of set X, where `ancestors(Y)` is
    /// `ancestors(X)`.
    ///
    /// This is faster than calculating `heads(ancestors(set))` in certain
    /// implementations like segmented changelog.
    ///
    /// This is different from `heads`. In case set contains X and Y, and Y is
    /// an ancestor of X, but not the immediate ancestor, `heads` will include
    /// Y while this function won't.
    async fn heads_ancestors(&self, set: NameSet) -> Result<NameSet> {
        default_impl::heads_ancestors(self, set).await
    }

    /// Calculates the "dag range" - vertexes reachable from both sides.
    async fn range(&self, roots: NameSet, heads: NameSet) -> Result<NameSet>;

    /// Calculates `ancestors(reachable) - ancestors(unreachable)`.
    async fn only(&self, reachable: NameSet, unreachable: NameSet) -> Result<NameSet> {
        default_impl::only(self, reachable, unreachable).await
    }

    /// Calculates `ancestors(reachable) - ancestors(unreachable)`, and
    /// `ancestors(unreachable)`.
    /// This might be faster in some implementations than calculating `only` and
    /// `ancestors` separately.
    async fn only_both(
        &self,
        reachable: NameSet,
        unreachable: NameSet,
    ) -> Result<(NameSet, NameSet)> {
        default_impl::only_both(self, reachable, unreachable).await
    }

    /// Calculates the descendants of the given set.
    async fn descendants(&self, set: NameSet) -> Result<NameSet>;

    /// Calculates `roots` that are reachable from `heads` without going
    /// through other `roots`. For example, given the following graph:
    ///
    /// ```plain,ignore
    ///   F
    ///   |\
    ///   C E
    ///   | |
    ///   B D
    ///   |/
    ///   A
    /// ```
    ///
    /// `reachable_roots(roots=[A, B, C], heads=[F])` returns `[A, C]`.
    /// `B` is not included because it cannot be reached without going
    /// through another root `C` from `F`. `A` is included because it
    /// can be reached via `F -> E -> D -> A` that does not go through
    /// other roots.
    ///
    /// The can be calculated as
    /// `roots & (heads | parents(only(heads, roots & ancestors(heads))))`.
    /// Actual implementation might have faster paths.
    ///
    /// The `roots & ancestors(heads)` portion filters out bogus roots for
    /// compatibility, if the callsite does not provide bogus roots, it
    /// could be simplified to just `roots`.
    async fn reachable_roots(&self, roots: NameSet, heads: NameSet) -> Result<NameSet> {
        default_impl::reachable_roots(self, roots, heads).await
    }

    /// Vertexes buffered in memory, not yet written to disk.
    async fn dirty(&self) -> Result<NameSet>;

    /// Returns true if the vertex names might need to be resolved remotely.
    fn is_vertex_lazy(&self) -> bool;

    /// Get a snapshot of the current graph that can operate separately.
    ///
    /// This makes it easier to fight with borrowck.
    fn dag_snapshot(&self) -> Result<Arc<dyn DagAlgorithm + Send + Sync>>;

    /// Get a snapshot of the `IdDag` that can operate separately.
    ///
    /// This is for advanced use-cases. For example, if callsite wants to
    /// do some graph calculation without network, and control how to
    /// batch the vertex name lookups precisely.
    fn id_dag_snapshot(&self) -> Result<Arc<dyn IdDagAlgorithm + Send + Sync>> {
        Err(crate::errors::BackendError::Generic(format!(
            "id_dag_snapshot() is not supported for {}",
            std::any::type_name::<Self>()
        ))
        .into())
    }

    /// Identity of the dag.
    fn dag_id(&self) -> &str;

    /// Version of the dag. Useful to figure out compatibility between two dags.
    fn dag_version(&self) -> &VerLink;
}

#[async_trait::async_trait]
pub trait Parents: Send + Sync {
    async fn parent_names(&self, name: VertexName) -> Result<Vec<VertexName>>;

    /// A hint of a sub-graph for inserting `heads`.
    ///
    /// This is used to reduce remote fetches in a lazy graph.
    /// The roots will be checked first, if a root is unknown locally then
    /// all its descendants will be considered unknown locally.
    ///
    /// The returned graph is only used to optimize network fetches in
    /// `assign_head`. It is not used to be actually inserted to the graph. So
    /// returning an empty or "incorrect" graph does not hurt correctness. But
    /// might hurt performance.
    async fn hint_subdag_for_insertion(&self, _heads: &[VertexName]) -> Result<MemNameDag>;
}

#[async_trait::async_trait]
impl Parents for Arc<dyn DagAlgorithm + Send + Sync> {
    async fn parent_names(&self, name: VertexName) -> Result<Vec<VertexName>> {
        DagAlgorithm::parent_names(self, name).await
    }

    async fn hint_subdag_for_insertion(&self, heads: &[VertexName]) -> Result<MemNameDag> {
        let scope = self.dirty().await?;
        default_impl::hint_subdag_for_insertion(self, &scope, heads).await
    }
}

#[async_trait::async_trait]
impl Parents for &(dyn DagAlgorithm + Send + Sync) {
    async fn parent_names(&self, name: VertexName) -> Result<Vec<VertexName>> {
        DagAlgorithm::parent_names(*self, name).await
    }

    async fn hint_subdag_for_insertion(&self, heads: &[VertexName]) -> Result<MemNameDag> {
        let scope = self.dirty().await?;
        default_impl::hint_subdag_for_insertion(self, &scope, heads).await
    }
}

#[async_trait::async_trait]
impl<'a> Parents for Box<dyn Fn(VertexName) -> Result<Vec<VertexName>> + Send + Sync + 'a> {
    async fn parent_names(&self, name: VertexName) -> Result<Vec<VertexName>> {
        (self)(name)
    }

    async fn hint_subdag_for_insertion(&self, _heads: &[VertexName]) -> Result<MemNameDag> {
        // No clear way to detect the "dirty" scope.
        Ok(MemNameDag::new())
    }
}

#[async_trait::async_trait]
impl Parents for std::collections::HashMap<VertexName, Vec<VertexName>> {
    async fn parent_names(&self, name: VertexName) -> Result<Vec<VertexName>> {
        match self.get(&name) {
            Some(v) => Ok(v.clone()),
            None => name.not_found(),
        }
    }

    async fn hint_subdag_for_insertion(&self, heads: &[VertexName]) -> Result<MemNameDag> {
        let mut keys: Vec<VertexName> = self.keys().cloned().collect();
        keys.sort_unstable();
        let scope = NameSet::from_static_names(keys);
        default_impl::hint_subdag_for_insertion(self, &scope, heads).await
    }
}

/// Add vertexes recursively to the DAG.
#[async_trait::async_trait]
pub trait DagAddHeads {
    /// Add vertexes and their ancestors to the DAG. This does not persistent
    /// changes to disk.
    async fn add_heads(
        &mut self,
        parents: &dyn Parents,
        heads: &VertexListWithOptions,
    ) -> Result<bool>;
}

/// Remove vertexes and their descendants from the DAG.
#[async_trait::async_trait]
pub trait DagStrip {
    /// Remove the given `set` and their descendants.
    ///
    /// This will reload the DAG from its source (ex. filesystem) and writes
    /// changes back with a lock so there are no other processes adding
    /// new descendants of the stripped set.
    ///
    /// After strip, the `self` graph might contain new vertexes because of
    /// the reload.
    async fn strip(&mut self, set: &NameSet) -> Result<()>;
}

/// Import a generated `CloneData` object into an empty DAG.
#[async_trait::async_trait]
pub trait DagImportCloneData {
    /// Updates the DAG using a `CloneData` object.
    async fn import_clone_data(&mut self, clone_data: CloneData<VertexName>) -> Result<()>;
}

/// Import a generated incremental `CloneData` object into an existing DAG.
/// Ids in the passed CloneData might not match ids in existing DAG.
#[async_trait::async_trait]
pub trait DagImportPullData {
    /// Updates the DAG using a `CloneData` object.
    ///
    /// Only import the given `heads`.
    async fn import_pull_data(
        &mut self,
        clone_data: CloneData<VertexName>,
        heads: &VertexListWithOptions,
    ) -> Result<()>;
}

#[async_trait::async_trait]
pub trait DagExportCloneData {
    /// Export `CloneData` for vertexes in the master group.
    async fn export_clone_data(&self) -> Result<CloneData<VertexName>>;
}

#[async_trait::async_trait]
pub trait DagExportPullData {
    /// Export `CloneData` for vertexes in the given set.
    /// The set is typcially calculated by `only(heads, common)`.
    async fn export_pull_data(&self, set: &NameSet) -> Result<CloneData<VertexName>>;
}

/// Persistent the DAG on disk.
#[async_trait::async_trait]
pub trait DagPersistent {
    /// Write in-memory DAG to disk. This might also pick up changes to
    /// the DAG by other processes.
    async fn flush(&mut self, master_heads: &VertexListWithOptions) -> Result<()>;

    /// Write in-memory IdMap that caches Id <-> Vertex translation from
    /// remote service to disk.
    async fn flush_cached_idmap(&self) -> Result<()>;

    /// A faster path for add_heads, followed by flush.
    async fn add_heads_and_flush(
        &mut self,
        parent_names_func: &dyn Parents,
        heads: &VertexListWithOptions,
    ) -> Result<()>;

    /// Import from another (potentially large) DAG. Write to disk immediately.
    async fn import_and_flush(
        &mut self,
        dag: &dyn DagAlgorithm,
        master_heads: NameSet,
    ) -> Result<()> {
        let heads = dag.heads(dag.all().await?).await?;
        let non_master_heads = heads - master_heads.clone();
        let master_heads: Vec<VertexName> =
            master_heads.iter().await?.try_collect::<Vec<_>>().await?;
        let non_master_heads: Vec<VertexName> = non_master_heads
            .iter()
            .await?
            .try_collect::<Vec<_>>()
            .await?;
        let heads = VertexListWithOptions::from(master_heads)
            .with_highest_group(Group::MASTER)
            .chain(non_master_heads);
        self.add_heads_and_flush(&dag.dag_snapshot()?, &heads).await
    }
}

/// Import ASCII graph to DAG.
pub trait ImportAscii {
    /// Import vertexes described in an ASCII graph.
    /// `heads` optionally specifies the order of heads to insert.
    /// Useful for testing. Panic if the input is invalid.
    fn import_ascii_with_heads(
        &mut self,
        text: &str,
        heads: Option<&[impl AsRef<str>]>,
    ) -> Result<()>;

    /// Import vertexes described in an ASCII graph.
    fn import_ascii(&mut self, text: &str) -> Result<()> {
        self.import_ascii_with_heads(text, <Option<&[&str]>>::None)
    }
}

/// Lookup vertexes by prefixes.
#[async_trait::async_trait]
pub trait PrefixLookup {
    /// Lookup vertexes by hex prefix.
    async fn vertexes_by_hex_prefix(
        &self,
        hex_prefix: &[u8],
        limit: usize,
    ) -> Result<Vec<VertexName>>;
}

/// Convert between `Vertex` and `Id`.
#[async_trait::async_trait]
pub trait IdConvert: PrefixLookup + Sync {
    async fn vertex_id(&self, name: VertexName) -> Result<Id>;
    async fn vertex_id_with_max_group(
        &self,
        name: &VertexName,
        max_group: Group,
    ) -> Result<Option<Id>>;
    async fn vertex_name(&self, id: Id) -> Result<VertexName>;
    async fn contains_vertex_name(&self, name: &VertexName) -> Result<bool>;

    /// Test if an `id` is present locally. Do not trigger remote fetching.
    async fn contains_vertex_id_locally(&self, id: &[Id]) -> Result<Vec<bool>>;

    /// Test if an `name` is present locally. Do not trigger remote fetching.
    async fn contains_vertex_name_locally(&self, name: &[VertexName]) -> Result<Vec<bool>>;

    async fn vertex_id_optional(&self, name: &VertexName) -> Result<Option<Id>> {
        self.vertex_id_with_max_group(name, Group::NON_MASTER).await
    }

    /// Convert [`Id`]s to [`VertexName`]s in batch.
    async fn vertex_name_batch(&self, ids: &[Id]) -> Result<Vec<Result<VertexName>>> {
        // This is not an efficient implementation in an async context.
        let mut names = Vec::with_capacity(ids.len());
        for &id in ids {
            names.push(self.vertex_name(id).await);
        }
        Ok(names)
    }

    /// Convert [`VertexName`]s to [`Id`]s in batch.
    async fn vertex_id_batch(&self, names: &[VertexName]) -> Result<Vec<Result<Id>>> {
        // This is not an efficient implementation in an async context.
        let mut ids = Vec::with_capacity(names.len());
        for name in names {
            ids.push(self.vertex_id(name.clone()).await);
        }
        Ok(ids)
    }

    /// Identity of the map.
    fn map_id(&self) -> &str;

    /// Version of the map. Useful to figure out compatibility between two maps.
    fn map_version(&self) -> &VerLink;
}

/// Integrity check functions.
#[async_trait::async_trait]
pub trait CheckIntegrity {
    /// Verify that universally known `Id`s (parents of merges) are actually
    /// known locally.
    ///
    /// Returns set of `Id`s that should be universally known but missing.
    /// An empty set means all universally known `Id`s are known locally.
    ///
    /// Check `FirstAncestorConstraint::KnownUniversally` for concepts of
    /// "universally known".
    async fn check_universal_ids(&self) -> Result<Vec<Id>>;

    /// Check segment properties: no cycles, no overlaps, no gaps etc.
    /// This is only about the `Id`s, not about the vertex names.
    ///
    /// Returns human readable messages about problems.
    /// No messages indicates there are no problems detected.
    async fn check_segments(&self) -> Result<Vec<String>>;

    /// Check that the subset of the current graph (ancestors of `heads`)
    /// is isomorphic with the subset in the `other` graph.
    ///
    /// Returns messages about where two graphs are different.
    /// No messages indicates two graphs are likely isomorphic.
    ///
    /// Note: For performance, this function only checks the "shape"
    /// of the graph, without checking the (potentially lazy) vertex
    /// names.
    async fn check_isomorphic_graph(
        &self,
        other: &dyn DagAlgorithm,
        heads: NameSet,
    ) -> Result<Vec<String>>;
}

impl<T> ImportAscii for T
where
    T: DagAddHeads,
{
    fn import_ascii_with_heads(
        &mut self,
        text: &str,
        heads: Option<&[impl AsRef<str>]>,
    ) -> Result<()> {
        let parents = drawdag::parse(&text);
        let heads: Vec<_> = match heads {
            Some(heads) => heads
                .iter()
                .map(|s| VertexName::copy_from(s.as_ref().as_bytes()))
                .collect(),
            None => {
                let mut heads: Vec<_> = parents
                    .keys()
                    .map(|s| VertexName::copy_from(s.as_bytes()))
                    .collect();
                heads.sort();
                heads
            }
        };

        let v = |s: String| VertexName::copy_from(s.as_bytes());
        let parents: std::collections::HashMap<VertexName, Vec<VertexName>> = parents
            .into_iter()
            .map(|(k, vs)| (v(k), vs.into_iter().map(v).collect()))
            .collect();
        nonblocking::non_blocking_result(self.add_heads(&parents, &heads[..].into()))?;
        Ok(())
    }
}

#[async_trait::async_trait]
pub trait ToIdSet {
    /// Converts [`NameSet`] to [`IdSet`].
    async fn to_id_set(&self, set: &NameSet) -> Result<IdSet>;
}

pub trait ToSet {
    /// Converts [`IdSet`] to [`NameSet`].
    fn to_set(&self, set: &IdSet) -> Result<NameSet>;
}

pub trait IdMapSnapshot {
    /// Get a snapshot of IdMap.
    fn id_map_snapshot(&self) -> Result<Arc<dyn IdConvert + Send + Sync>>;
}

/// Describes how to persist state to disk.
pub trait Persist {
    /// Return type of `lock()`.
    type Lock: Send + Sync;

    /// Obtain an exclusive lock for writing.
    /// This should prevent other writers.
    fn lock(&mut self) -> Result<Self::Lock>;

    /// Reload from the source of truth. Drop pending changes.
    ///
    /// This requires a lock and is usually called before `persist()`.
    fn reload(&mut self, _lock: &Self::Lock) -> Result<()>;

    /// Write pending changes to the source of truth.
    ///
    /// This requires a lock.
    fn persist(&mut self, _lock: &Self::Lock) -> Result<()>;
}

/// Address that can be used to open things.
///
/// The address type decides the return type of `open`.
pub trait Open: Clone {
    type OpenTarget;

    fn open(&self) -> Result<Self::OpenTarget>;
}

/// Has an integer tuple version that can be used to test if the data was
/// changed. If the first number changes, it means incompatible changes.
/// If only the second number increases, it means append-only changes.
pub trait IntVersion {
    fn int_version(&self) -> (u64, u64);
}

/// Fallible clone.
pub trait TryClone {
    fn try_clone(&self) -> Result<Self>
    where
        Self: Sized;
}

impl<T: Clone> TryClone for T {
    fn try_clone(&self) -> Result<Self> {
        Ok(self.clone())
    }
}

#[async_trait::async_trait]
impl<T: IdConvert + IdMapSnapshot> ToIdSet for T {
    /// Converts [`NameSet`] to [`IdSet`].
    async fn to_id_set(&self, set: &NameSet) -> Result<IdSet> {
        let version = set.hints().id_map_version();

        // Fast path: extract IdSet from IdStaticSet.
        if let Some(set) = set.as_any().downcast_ref::<IdStaticSet>() {
            if None < version && version <= Some(self.map_version()) {
                return Ok(set.spans.clone());
            }
        }

        // Convert IdLazySet to IdStaticSet. Bypass hash lookups.
        if let Some(set) = set.as_any().downcast_ref::<IdLazySet>() {
            if None < version && version <= Some(self.map_version()) {
                let set: IdStaticSet = set.to_static()?;
                return Ok(set.spans);
            }
        }

        // Slow path: iterate through the set and convert it to a non-lazy
        // IdSet. Does not bypass hash lookups.
        let mut spans = IdSet::empty();
        let mut iter = set.iter().await?.chunks(1 << 17);
        while let Some(names) = iter.next().await {
            let names = names.into_iter().collect::<Result<Vec<_>>>()?;
            let ids = self.vertex_id_batch(&names).await?;
            for id in ids {
                spans.push(id?);
            }
        }
        Ok(spans)
    }
}

impl IdMapSnapshot for Arc<dyn IdConvert + Send + Sync> {
    fn id_map_snapshot(&self) -> Result<Arc<dyn IdConvert + Send + Sync>> {
        Ok(self.clone())
    }
}

impl<T: IdMapSnapshot + DagAlgorithm> ToSet for T {
    /// Converts [`IdSet`] to [`NameSet`].
    fn to_set(&self, set: &IdSet) -> Result<NameSet> {
        NameSet::from_spans_dag(set.clone(), self)
    }
}