Skip to main content

dag/
default_impl.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8use std::collections::HashMap;
9use std::collections::HashSet;
10use std::sync::Arc;
11
12use futures::StreamExt;
13use futures::TryStreamExt;
14
15use crate::dag::MemDag;
16use crate::errors::programming;
17use crate::ops::DagAddHeads;
18use crate::ops::IdConvert;
19use crate::ops::IdDagAlgorithm;
20use crate::ops::Parents;
21use crate::ops::ToIdSet;
22use crate::ops::ToSet;
23use crate::set::hints::Hints;
24use crate::utils;
25use crate::DagAlgorithm;
26use crate::Group;
27use crate::Id;
28use crate::IdSet;
29use crate::Result;
30use crate::Set;
31use crate::Vertex;
32use crate::VertexListWithOptions;
33
34/// Re-create the graph so it looks better when rendered.
35///
36/// See `utils::beautify_graph` for details.
37///
38/// For example, the left-side graph will be rewritten to the right-side:
39///
40/// 1. Linearize.
41///
42/// ```plain,ignore
43///   A             A      # Linearize is done by IdMap::assign_heads,
44///   |             |      # as long as the heads provided are the heads
45///   | C           B      # of the whole graph ("A", "C", not "B", "D").
46///   | |           |
47///   B |     ->    | C
48///   | |           | |
49///   | D           | D
50///   |/            |/
51///   E             E
52/// ```
53///
54/// 2. Reorder branches (at different branching points) to reduce columns.
55///
56/// ```plain,ignore
57///     D           B
58///     |           |      # Assuming the main branch is B-C-E.
59///   B |           | A    # Branching point of the D branch is "C"
60///   | |           |/     # Branching point of the A branch is "C"
61///   | | A   ->    C      # The D branch should be moved to below
62///   | |/          |      # the A branch.
63///   | |           | D
64///   |/|           |/
65///   C /           E
66///   |/
67///   E
68/// ```
69///
70/// 3. Reorder branches (at a same branching point) to reduce length of
71///    edges.
72///
73/// ```plain,ignore
74///   D              A
75///   |              |     # This is done by picking the longest
76///   | A            B     # branch (A-B-C-E) as the "main branch"
77///   | |            |     # and work on the remaining branches
78///   | B     ->     C     # recursively.
79///   | |            |
80///   | C            | D
81///   |/             |/
82///   E              E
83/// ```
84///
85/// `main_branch` optionally defines how to sort the heads. A head `x` will
86/// be emitted first during iteration, if `x` is in `main_branch`.
87///
88/// This function is expensive. Only run on small graphs.
89pub(crate) async fn beautify(
90    this: &(impl DagAlgorithm + ?Sized),
91    main_branch: Option<Set>,
92) -> Result<MemDag> {
93    // Prepare input for utils::beautify_graph.
94    // Maintain usize <-> Vertex map. Also fetch the Vertex <-> Id mapping (via all.iter).
95    let all = this.all().await?;
96    let usize_to_vertex: Vec<Vertex> = all.iter_rev().await?.try_collect().await?;
97    let vertex_to_usize: HashMap<Vertex, usize> = usize_to_vertex
98        .iter()
99        .enumerate()
100        .map(|(i, v)| (v.clone(), i))
101        .collect();
102    let mut priorities = Vec::new();
103    let main_branch = main_branch.unwrap_or_else(Set::empty);
104
105    let mut parents_vec = Vec::with_capacity(usize_to_vertex.len());
106    for (i, vertex) in usize_to_vertex.iter().enumerate() {
107        if main_branch.contains(vertex).await? {
108            priorities.push(i);
109        }
110        let parent_vertexes = this.parent_names(vertex.clone()).await?;
111        let parent_usizes: Vec<usize> = parent_vertexes
112            .iter()
113            .filter_map(|p| vertex_to_usize.get(p))
114            .copied()
115            .collect();
116        parents_vec.push(parent_usizes);
117    }
118
119    // Call utils::beautify_graph.
120    let sorted = utils::beautify_graph(&parents_vec, &priorities);
121
122    // Recreate the graph using the given order.
123    let mut dag = MemDag::new();
124    let snapshot = this.dag_snapshot()?;
125    for i in sorted.into_iter().rev() {
126        let heads: Vec<Vertex> = vec![usize_to_vertex[i].clone()];
127        dag.add_heads(&snapshot, &heads.into()).await?;
128    }
129    Ok(dag)
130}
131
132/// Provide a sub-graph containing only the specified set.
133pub(crate) async fn subdag(this: &(impl DagAlgorithm + ?Sized), set: Set) -> Result<MemDag> {
134    let set = this.sort(&set).await?;
135    let parents = match set.to_parents().await? {
136        Some(p) => p,
137        None => return programming("Set returned by dag.sort() should support to_parents()"),
138    };
139    let mut dag = MemDag::new();
140    let heads = this.heads_ancestors(set).await?;
141    // "heads" is in DESC order. Use reversed order for insertion so the
142    // resulting subdag might preserve the same order with the original dag.
143    let heads: Vec<Vertex> = heads.iter_rev().await?.try_collect().await?;
144    // MASTER group enables the ONLY_HEAD segment flag. It improves graph query performance.
145    let heads = VertexListWithOptions::from(heads).with_desired_group(Group::MASTER);
146    dag.add_heads(&parents, &heads).await?;
147    Ok(dag)
148}
149
150/// Convert `Set` to a `Parents` implementation that only returns vertexes in the set.
151pub(crate) async fn set_to_parents(set: &Set) -> Result<Option<impl Parents>> {
152    let (id_set, id_map) = match set.to_id_set_and_id_map_in_o1() {
153        Some(v) => v,
154        None => return Ok(None),
155    };
156    let dag = match set.dag() {
157        None => return Ok(None),
158        Some(dag) => dag,
159    };
160    let id_dag = dag.id_dag_snapshot()?;
161
162    // Pre-resolve ids to vertexes. Reduce remote lookup round-trips.
163    let ids: Vec<Id> = id_set.iter_desc().collect();
164    id_map.vertex_name_batch(&ids).await?;
165
166    struct IdParents {
167        id_set: IdSet,
168        id_dag: Arc<dyn IdDagAlgorithm + Send + Sync>,
169        id_map: Arc<dyn IdConvert + Send + Sync>,
170    }
171
172    #[async_trait::async_trait]
173    impl Parents for IdParents {
174        async fn parent_names(&self, name: Vertex) -> Result<Vec<Vertex>> {
175            tracing::debug!(
176                target: "dag::idparents",
177                "resolving parents for {:?}", &name,
178            );
179            let id = self.id_map.vertex_id(name).await?;
180            let direct_parent_ids = self.id_dag.parent_ids(id)?;
181            let parent_ids = if direct_parent_ids.iter().all(|&id| self.id_set.contains(id)) {
182                // Fast path. No "leaked" parents.
183                direct_parent_ids
184            } else {
185                // Slower path.
186                // PERF: There might be room to optimize (ex. dedicated API like
187                // reachable_roots).
188                let parent_id_set = IdSet::from_spans(direct_parent_ids);
189                let ancestors = self.id_dag.ancestors(parent_id_set)?;
190                let heads = ancestors.intersection(&self.id_set);
191                let heads = self.id_dag.heads_ancestors(heads)?;
192                heads.iter_desc().collect()
193            };
194
195            let vertexes = self.id_map.vertex_name_batch(&parent_ids).await?;
196            let parents = vertexes.into_iter().collect::<Result<Vec<_>>>()?;
197            Ok(parents)
198        }
199
200        async fn hint_subdag_for_insertion(&self, _heads: &[Vertex]) -> Result<MemDag> {
201            // The `IdParents` is not intended to be inserted to other graphs.
202            tracing::warn!(
203                target: "dag::idparents",
204                "IdParents does not implement hint_subdag_for_insertion() for efficient insertion"
205            );
206            Ok(MemDag::new())
207        }
208    }
209
210    let parents = IdParents {
211        id_set,
212        id_dag,
213        id_map,
214    };
215
216    Ok(Some(parents))
217}
218
219pub(crate) async fn parents(this: &(impl DagAlgorithm + ?Sized), set: Set) -> Result<Set> {
220    let mut result: Vec<Vertex> = Vec::new();
221    let mut iter = set.iter().await?;
222    // PERF: This is not an efficient async implementation.
223    while let Some(vertex) = iter.next().await {
224        let parents = this.parent_names(vertex?).await?;
225        result.extend(parents);
226    }
227    Ok(Set::from_static_names(result))
228}
229
230pub(crate) async fn first_ancestor_nth(
231    this: &(impl DagAlgorithm + ?Sized),
232    name: Vertex,
233    n: u64,
234) -> Result<Option<Vertex>> {
235    let mut vertex = name.clone();
236    for _ in 0..n {
237        let parents = this.parent_names(vertex).await?;
238        if parents.is_empty() {
239            return Ok(None);
240        }
241        vertex = parents[0].clone();
242    }
243    Ok(Some(vertex))
244}
245
246pub(crate) async fn first_ancestors(this: &(impl DagAlgorithm + ?Sized), set: Set) -> Result<Set> {
247    let mut to_visit: Vec<Vertex> = {
248        let mut list = Vec::with_capacity(set.count_slow().await?.try_into()?);
249        let mut iter = set.iter().await?;
250        while let Some(next) = iter.next().await {
251            let vertex = next?;
252            list.push(vertex);
253        }
254        list
255    };
256    let mut visited: HashSet<Vertex> = to_visit.clone().into_iter().collect();
257    while let Some(v) = to_visit.pop() {
258        #[allow(clippy::never_loop)]
259        if let Some(parent) = this.parent_names(v).await?.into_iter().next() {
260            if visited.insert(parent.clone()) {
261                to_visit.push(parent);
262            }
263        }
264    }
265    let hints = Hints::new_inherit_idmap_dag(set.hints());
266    let set = Set::from_iter(visited.into_iter().map(Ok), hints);
267    this.sort(&set).await
268}
269
270pub(crate) async fn heads(this: &(impl DagAlgorithm + ?Sized), set: Set) -> Result<Set> {
271    Ok(set.clone() - this.parents(set).await?)
272}
273
274pub(crate) async fn roots(this: &(impl DagAlgorithm + ?Sized), set: Set) -> Result<Set> {
275    Ok(set.clone() - this.children(set).await?)
276}
277
278pub(crate) async fn merges(this: &(impl DagAlgorithm + ?Sized), set: Set) -> Result<Set> {
279    let this = this.dag_snapshot()?;
280    Ok(set.filter(Box::new(move |v: &Vertex| {
281        let this = this.clone();
282        Box::pin(async move {
283            DagAlgorithm::parent_names(&this, v.clone())
284                .await
285                .map(|ps| ps.len() >= 2)
286        })
287    })))
288}
289
290pub(crate) async fn reachable_roots(
291    this: &(impl DagAlgorithm + ?Sized),
292    roots: Set,
293    heads: Set,
294) -> Result<Set> {
295    let heads_ancestors = this.ancestors(heads.clone()).await?;
296    let roots = roots & heads_ancestors.clone(); // Filter out "bogus" roots.
297    let only = heads_ancestors - this.ancestors(roots.clone()).await?;
298    Ok(roots.clone() & (heads.clone() | this.parents(only).await?))
299}
300
301pub(crate) async fn heads_ancestors(this: &(impl DagAlgorithm + ?Sized), set: Set) -> Result<Set> {
302    this.heads(this.ancestors(set).await?).await
303}
304
305pub(crate) async fn only(
306    this: &(impl DagAlgorithm + ?Sized),
307    reachable: Set,
308    unreachable: Set,
309) -> Result<Set> {
310    let reachable = this.ancestors(reachable).await?;
311    let unreachable = this.ancestors(unreachable).await?;
312    Ok(reachable - unreachable)
313}
314
315pub(crate) async fn only_both(
316    this: &(impl DagAlgorithm + ?Sized),
317    reachable: Set,
318    unreachable: Set,
319) -> Result<(Set, Set)> {
320    let reachable = this.ancestors(reachable).await?;
321    let unreachable = this.ancestors(unreachable).await?;
322    Ok((reachable - unreachable.clone(), unreachable))
323}
324
325pub(crate) async fn gca_one(
326    this: &(impl DagAlgorithm + ?Sized),
327    set: Set,
328) -> Result<Option<Vertex>> {
329    this.gca_all(set)
330        .await?
331        .iter()
332        .await?
333        .next()
334        .await
335        .transpose()
336}
337
338pub(crate) async fn gca_all(this: &(impl DagAlgorithm + ?Sized), set: Set) -> Result<Set> {
339    this.heads_ancestors(this.common_ancestors(set).await?)
340        .await
341}
342
343pub(crate) async fn common_ancestors(this: &(impl DagAlgorithm + ?Sized), set: Set) -> Result<Set> {
344    let result = match set.count_slow().await? {
345        0 => set,
346        1 => this.ancestors(set).await?,
347        _ => {
348            // Try to reduce the size of `set`.
349            // `common_ancestors(X)` = `common_ancestors(roots(X))`.
350            let set = this.roots(set).await?;
351            let mut iter = set.iter().await?;
352            let mut result = this
353                .ancestors(Set::from(iter.next().await.unwrap()?))
354                .await?;
355            while let Some(v) = iter.next().await {
356                result = result.intersection(&this.ancestors(Set::from(v?)).await?);
357            }
358            result
359        }
360    };
361    Ok(result)
362}
363
364pub(crate) async fn is_ancestor(
365    this: &(impl DagAlgorithm + ?Sized),
366    ancestor: Vertex,
367    descendant: Vertex,
368) -> Result<bool> {
369    let mut to_visit = vec![descendant];
370    let mut visited: HashSet<_> = to_visit.clone().into_iter().collect();
371    while let Some(v) = to_visit.pop() {
372        if v == ancestor {
373            return Ok(true);
374        }
375        for parent in this.parent_names(v).await? {
376            if visited.insert(parent.clone()) {
377                to_visit.push(parent);
378            }
379        }
380    }
381    Ok(false)
382}
383
384/// Implementation of `suggest_bisect`.
385///
386/// This is not the default trait implementation because the extra trait bounds
387/// (ToIdSet, ToSet).
388pub async fn suggest_bisect(
389    this: &(impl DagAlgorithm + ToIdSet + ToSet + IdConvert + ?Sized),
390    roots: Set,
391    heads: Set,
392    skip: Set,
393) -> Result<(Option<Vertex>, Set, Set)> {
394    let roots = this.to_id_set(&roots).await?;
395    let heads = this.to_id_set(&heads).await?;
396    let skip = this.to_id_set(&skip).await?;
397    let (maybe_id, untested, heads) = this
398        .id_dag_snapshot()?
399        .suggest_bisect(&roots, &heads, &skip)?;
400    let maybe_vertex = match maybe_id {
401        Some(id) => Some(this.vertex_name(id).await?),
402        None => None,
403    };
404    let untested = this.to_set(&untested)?;
405    let heads = this.to_set(&heads)?;
406    Ok((maybe_vertex, untested, heads))
407}
408
409// `scope` is usually the "dirty" set that might need to be inserted, or might
410// already exist in the existing dag, obtained by `dag.dirty()`. It is okay for
411// `scope` to be empty, which might lead to more network round-trips. See also
412// the docstring for `Parents::hint_subdag_for_insertion`.
413#[tracing::instrument(skip(this), level=tracing::Level::DEBUG)]
414pub(crate) async fn hint_subdag_for_insertion(
415    this: &(impl Parents + ?Sized),
416    scope: &Set,
417    heads: &[Vertex],
418) -> Result<MemDag> {
419    let count = scope.count_slow().await?;
420    tracing::trace!("hint_subdag_for_insertion: pending vertexes: {}", count);
421
422    // ScopedParents only contains parents within "scope".
423    struct ScopedParents<'a, P: Parents + ?Sized> {
424        parents: &'a P,
425        scope: &'a Set,
426    }
427
428    #[async_trait::async_trait]
429    impl<'a, P: Parents + ?Sized> Parents for ScopedParents<'a, P> {
430        async fn parent_names(&self, name: Vertex) -> Result<Vec<Vertex>> {
431            let parents: Vec<Vertex> = self.parents.parent_names(name).await?;
432            // Filter by scope. We don't need to provide a "correct" parents here.
433            // It is only used to optimize network fetches, not used to actually insert
434            // to the graph.
435            let mut filtered_parents = Vec::with_capacity(parents.len());
436            for v in parents {
437                if self.scope.contains(&v).await? {
438                    filtered_parents.push(v)
439                }
440            }
441            Ok(filtered_parents)
442        }
443
444        async fn hint_subdag_for_insertion(&self, _heads: &[Vertex]) -> Result<MemDag> {
445            // No need to use such a hint (to avoid infinite recursion).
446            // Pending names should exist in the graph without using remote fetching.
447            Ok(MemDag::new())
448        }
449    }
450
451    // Insert vertexes in `scope` to `dag`.
452    let mut dag = MemDag::new();
453    // The MemDag should not be lazy.
454    assert!(!dag.is_vertex_lazy());
455
456    let scoped_parents = ScopedParents {
457        parents: this,
458        scope,
459    };
460
461    // Exclude heads that are outside 'scope'. They might trigger remote fetches.
462    let heads_in_scope = {
463        let mut heads_in_scope = Vec::with_capacity(heads.len());
464        for head in heads {
465            if scope.contains(head).await? {
466                heads_in_scope.push(head.clone());
467            }
468        }
469        heads_in_scope
470    };
471    dag.add_heads(&scoped_parents, &heads_in_scope.into())
472        .await?;
473
474    Ok(dag)
475}