jj_lib/
graph.rs

1// Copyright 2021-2023 The Jujutsu Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(missing_docs)]
16
17use std::collections::HashMap;
18use std::collections::HashSet;
19use std::collections::VecDeque;
20use std::hash::Hash;
21
22/// Node and edges pair of type `N` and `ID` respectively.
23///
24/// `ID` uniquely identifies a node within the graph. It's usually cheap to
25/// clone. There should be a pure `(&N) -> &ID` function.
26pub type GraphNode<N, ID = N> = (N, Vec<GraphEdge<ID>>);
27
28#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
29pub struct GraphEdge<N> {
30    pub target: N,
31    pub edge_type: GraphEdgeType,
32}
33
34impl<N> GraphEdge<N> {
35    pub fn missing(target: N) -> Self {
36        Self {
37            target,
38            edge_type: GraphEdgeType::Missing,
39        }
40    }
41
42    pub fn direct(target: N) -> Self {
43        Self {
44            target,
45            edge_type: GraphEdgeType::Direct,
46        }
47    }
48
49    pub fn indirect(target: N) -> Self {
50        Self {
51            target,
52            edge_type: GraphEdgeType::Indirect,
53        }
54    }
55
56    pub fn map<M>(self, f: impl FnOnce(N) -> M) -> GraphEdge<M> {
57        GraphEdge {
58            target: f(self.target),
59            edge_type: self.edge_type,
60        }
61    }
62}
63
64#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
65pub enum GraphEdgeType {
66    Missing,
67    Direct,
68    Indirect,
69}
70
71fn reachable_targets<N>(edges: &[GraphEdge<N>]) -> impl DoubleEndedIterator<Item = &N> {
72    edges
73        .iter()
74        .filter(|edge| edge.edge_type != GraphEdgeType::Missing)
75        .map(|edge| &edge.target)
76}
77
78/// Creates new graph in which nodes and edges are reversed.
79pub fn reverse_graph<N, ID: Clone + Eq + Hash, E>(
80    input: impl Iterator<Item = Result<GraphNode<N, ID>, E>>,
81    as_id: impl Fn(&N) -> &ID,
82) -> Result<Vec<GraphNode<N, ID>>, E> {
83    let mut entries = vec![];
84    let mut reverse_edges: HashMap<ID, Vec<GraphEdge<ID>>> = HashMap::new();
85    for item in input {
86        let (node, edges) = item?;
87        for GraphEdge { target, edge_type } in edges {
88            reverse_edges.entry(target).or_default().push(GraphEdge {
89                target: as_id(&node).clone(),
90                edge_type,
91            });
92        }
93        entries.push(node);
94    }
95
96    let mut items = vec![];
97    for node in entries.into_iter().rev() {
98        let edges = reverse_edges.remove(as_id(&node)).unwrap_or_default();
99        items.push((node, edges));
100    }
101    Ok(items)
102}
103
104/// Graph iterator adapter to group topological branches.
105///
106/// Basic idea is DFS from the heads. At fork point, the other descendant
107/// branches will be visited. At merge point, the second (or the last) ancestor
108/// branch will be visited first. This is practically [the same as Git][Git].
109///
110/// If no branches are prioritized, the branch containing the first commit in
111/// the input iterator will be emitted first. It is often the working-copy
112/// ancestor branch. The other head branches won't be enqueued eagerly, and will
113/// be emitted as late as possible.
114///
115/// [Git]: https://github.blog/2022-08-30-gits-database-internals-ii-commit-history-queries/#topological-sorting
116#[derive(Clone, Debug)]
117pub struct TopoGroupedGraphIterator<N, I> {
118    input_iter: I,
119    /// Graph nodes read from the input iterator but not yet emitted.
120    nodes: HashMap<N, TopoGroupedGraphNode<N>>,
121    /// Stack of graph nodes to be emitted.
122    emittable_ids: Vec<N>,
123    /// List of new head nodes found while processing unpopulated nodes, or
124    /// prioritized branch nodes added by caller.
125    new_head_ids: VecDeque<N>,
126    /// Set of nodes which may be ancestors of `new_head_ids`.
127    blocked_ids: HashSet<N>,
128}
129
130#[derive(Clone, Debug)]
131struct TopoGroupedGraphNode<N> {
132    /// Graph nodes which must be emitted before.
133    child_ids: HashSet<N>,
134    /// Graph edges to parent nodes. `None` until this node is populated.
135    edges: Option<Vec<GraphEdge<N>>>,
136}
137
138impl<N> Default for TopoGroupedGraphNode<N> {
139    fn default() -> Self {
140        Self {
141            child_ids: Default::default(),
142            edges: None,
143        }
144    }
145}
146
147impl<N, E, I> TopoGroupedGraphIterator<N, I>
148where
149    N: Clone + Hash + Eq,
150    I: Iterator<Item = Result<GraphNode<N>, E>>,
151{
152    /// Wraps the given iterator to group topological branches. The input
153    /// iterator must be topologically ordered.
154    pub fn new(input_iter: I) -> Self {
155        TopoGroupedGraphIterator {
156            input_iter,
157            nodes: HashMap::new(),
158            emittable_ids: Vec::new(),
159            new_head_ids: VecDeque::new(),
160            blocked_ids: HashSet::new(),
161        }
162    }
163
164    /// Makes the branch containing the specified node be emitted earlier than
165    /// the others.
166    ///
167    /// The `id` usually points to a head node, but this isn't a requirement.
168    /// If the specified node isn't a head, all preceding nodes will be queued.
169    ///
170    /// The specified node must exist in the input iterator. If it didn't, the
171    /// iterator would panic.
172    pub fn prioritize_branch(&mut self, id: N) {
173        // Mark existence of unpopulated node
174        self.nodes.entry(id.clone()).or_default();
175        // Push to non-emitting list so the prioritized heads wouldn't be
176        // interleaved
177        self.new_head_ids.push_back(id);
178    }
179
180    fn populate_one(&mut self) -> Result<Option<()>, E> {
181        let (current_id, edges) = match self.input_iter.next() {
182            Some(Ok(data)) => data,
183            Some(Err(err)) => {
184                return Err(err);
185            }
186            None => {
187                return Ok(None);
188            }
189        };
190
191        // Set up reverse reference
192        for parent_id in reachable_targets(&edges) {
193            let parent_node = self.nodes.entry(parent_id.clone()).or_default();
194            parent_node.child_ids.insert(current_id.clone());
195        }
196
197        // Populate the current node
198        if let Some(current_node) = self.nodes.get_mut(&current_id) {
199            assert!(current_node.edges.is_none());
200            current_node.edges = Some(edges);
201        } else {
202            let current_node = TopoGroupedGraphNode {
203                edges: Some(edges),
204                ..Default::default()
205            };
206            self.nodes.insert(current_id.clone(), current_node);
207            // Push to non-emitting list so the new head wouldn't be interleaved
208            self.new_head_ids.push_back(current_id);
209        }
210
211        Ok(Some(()))
212    }
213
214    /// Enqueues the first new head which will unblock the waiting ancestors.
215    ///
216    /// This does not move multiple head nodes to the queue at once because
217    /// heads may be connected to the fork points in arbitrary order.
218    fn flush_new_head(&mut self) {
219        assert!(!self.new_head_ids.is_empty());
220        if self.blocked_ids.is_empty() || self.new_head_ids.len() <= 1 {
221            // Fast path: orphaned or no choice
222            let new_head_id = self.new_head_ids.pop_front().unwrap();
223            self.emittable_ids.push(new_head_id);
224            self.blocked_ids.clear();
225            return;
226        }
227
228        // Mark descendant nodes reachable from the blocking nodes
229        let mut to_visit: Vec<&N> = self
230            .blocked_ids
231            .iter()
232            .map(|id| {
233                // Borrow from self.nodes so self.blocked_ids can be mutated later
234                let (id, _) = self.nodes.get_key_value(id).unwrap();
235                id
236            })
237            .collect();
238        let mut visited: HashSet<&N> = to_visit.iter().copied().collect();
239        while let Some(id) = to_visit.pop() {
240            let node = self.nodes.get(id).unwrap();
241            to_visit.extend(node.child_ids.iter().filter(|id| visited.insert(id)));
242        }
243
244        // Pick the first reachable head
245        let index = self
246            .new_head_ids
247            .iter()
248            .position(|id| visited.contains(id))
249            .expect("blocking head should exist");
250        let new_head_id = self.new_head_ids.remove(index).unwrap();
251
252        // Unmark ancestors of the selected head so they won't contribute to future
253        // new-head resolution within the newly-unblocked sub graph. The sub graph
254        // can have many fork points, and the corresponding heads should be picked in
255        // the fork-point order, not in the head appearance order.
256        to_visit.push(&new_head_id);
257        visited.remove(&new_head_id);
258        while let Some(id) = to_visit.pop() {
259            let node = self.nodes.get(id).unwrap();
260            if let Some(edges) = &node.edges {
261                to_visit.extend(reachable_targets(edges).filter(|id| visited.remove(id)));
262            }
263        }
264        self.blocked_ids.retain(|id| visited.contains(id));
265        self.emittable_ids.push(new_head_id);
266    }
267
268    fn next_node(&mut self) -> Result<Option<GraphNode<N>>, E> {
269        // Based on Kahn's algorithm
270        loop {
271            if let Some(current_id) = self.emittable_ids.last() {
272                let Some(current_node) = self.nodes.get_mut(current_id) else {
273                    // Queued twice because new children populated and emitted
274                    self.emittable_ids.pop().unwrap();
275                    continue;
276                };
277                if !current_node.child_ids.is_empty() {
278                    // New children populated after emitting the other
279                    let current_id = self.emittable_ids.pop().unwrap();
280                    self.blocked_ids.insert(current_id);
281                    continue;
282                }
283                let Some(edges) = current_node.edges.take() else {
284                    // Not yet populated
285                    self.populate_one()?
286                        .expect("parent or prioritized node should exist");
287                    continue;
288                };
289                // The second (or the last) parent will be visited first
290                let current_id = self.emittable_ids.pop().unwrap();
291                self.nodes.remove(&current_id).unwrap();
292                for parent_id in reachable_targets(&edges) {
293                    let parent_node = self.nodes.get_mut(parent_id).unwrap();
294                    parent_node.child_ids.remove(&current_id);
295                    if parent_node.child_ids.is_empty() {
296                        let reusable_id = self.blocked_ids.take(parent_id);
297                        let parent_id = reusable_id.unwrap_or_else(|| parent_id.clone());
298                        self.emittable_ids.push(parent_id);
299                    } else {
300                        self.blocked_ids.insert(parent_id.clone());
301                    }
302                }
303                return Ok(Some((current_id, edges)));
304            } else if !self.new_head_ids.is_empty() {
305                self.flush_new_head();
306            } else {
307                // Populate the first or orphan head
308                if self.populate_one()?.is_none() {
309                    return Ok(None);
310                }
311            }
312        }
313    }
314}
315
316impl<N, E, I> Iterator for TopoGroupedGraphIterator<N, I>
317where
318    N: Clone + Hash + Eq,
319    I: Iterator<Item = Result<GraphNode<N>, E>>,
320{
321    type Item = Result<GraphNode<N>, E>;
322
323    fn next(&mut self) -> Option<Self::Item> {
324        match self.next_node() {
325            Ok(Some(node)) => Some(Ok(node)),
326            Ok(None) => {
327                assert!(self.nodes.is_empty(), "all nodes should have been emitted");
328                None
329            }
330            Err(err) => Some(Err(err)),
331        }
332    }
333}
334
335#[cfg(test)]
336mod tests {
337    use std::convert::Infallible;
338
339    use itertools::Itertools as _;
340    use renderdag::Ancestor;
341    use renderdag::GraphRowRenderer;
342    use renderdag::Renderer as _;
343
344    use super::*;
345
346    fn missing(c: char) -> GraphEdge<char> {
347        GraphEdge::missing(c)
348    }
349
350    fn direct(c: char) -> GraphEdge<char> {
351        GraphEdge::direct(c)
352    }
353
354    fn indirect(c: char) -> GraphEdge<char> {
355        GraphEdge::indirect(c)
356    }
357
358    fn format_edge(edge: &GraphEdge<char>) -> String {
359        let c = edge.target;
360        match edge.edge_type {
361            GraphEdgeType::Missing => format!("missing({c})"),
362            GraphEdgeType::Direct => format!("direct({c})"),
363            GraphEdgeType::Indirect => format!("indirect({c})"),
364        }
365    }
366
367    fn format_graph(
368        graph_iter: impl IntoIterator<Item = Result<GraphNode<char>, Infallible>>,
369    ) -> String {
370        let mut renderer = GraphRowRenderer::new()
371            .output()
372            .with_min_row_height(2)
373            .build_box_drawing();
374        graph_iter
375            .into_iter()
376            .map(|item| match item {
377                Ok(node) => node,
378                Err(err) => match err {},
379            })
380            .map(|(id, edges)| {
381                let glyph = id.to_string();
382                let message = edges.iter().map(format_edge).join(", ");
383                let parents = edges
384                    .into_iter()
385                    .map(|edge| match edge.edge_type {
386                        GraphEdgeType::Missing => Ancestor::Anonymous,
387                        GraphEdgeType::Direct => Ancestor::Parent(edge.target),
388                        GraphEdgeType::Indirect => Ancestor::Ancestor(edge.target),
389                    })
390                    .collect();
391                renderer.next_row(id, parents, glyph, message)
392            })
393            .collect()
394    }
395
396    #[test]
397    fn test_format_graph() {
398        let graph = [
399            ('D', vec![direct('C'), indirect('B')]),
400            ('C', vec![direct('A')]),
401            ('B', vec![missing('X')]),
402            ('A', vec![]),
403        ]
404        .map(Ok);
405        insta::assert_snapshot!(format_graph(graph), @r"
406        D    direct(C), indirect(B)
407        ├─╮
408        C ╷  direct(A)
409        │ ╷
410        │ B  missing(X)
411        │ │
412        │ ~
413414        A
415        ");
416    }
417
418    fn topo_grouped<I, E>(graph_iter: I) -> TopoGroupedGraphIterator<char, I::IntoIter>
419    where
420        I: IntoIterator<Item = Result<GraphNode<char>, E>>,
421    {
422        TopoGroupedGraphIterator::new(graph_iter.into_iter())
423    }
424
425    #[test]
426    fn test_topo_grouped_multiple_roots() {
427        let graph = [
428            ('C', vec![missing('Y')]),
429            ('B', vec![missing('X')]),
430            ('A', vec![]),
431        ]
432        .map(Ok);
433        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r"
434        C  missing(Y)
435436        ~
437
438        B  missing(X)
439440        ~
441
442        A
443        ");
444        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r"
445        C  missing(Y)
446447        ~
448
449        B  missing(X)
450451        ~
452
453        A
454        ");
455
456        // All nodes can be lazily emitted.
457        let mut iter = topo_grouped(graph.iter().cloned().peekable());
458        assert_eq!(iter.next().unwrap().unwrap().0, 'C');
459        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'B');
460        assert_eq!(iter.next().unwrap().unwrap().0, 'B');
461        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'A');
462    }
463
464    #[test]
465    fn test_topo_grouped_trivial_fork() {
466        let graph = [
467            ('E', vec![direct('B')]),
468            ('D', vec![direct('A')]),
469            ('C', vec![direct('B')]),
470            ('B', vec![direct('A')]),
471            ('A', vec![]),
472        ]
473        .map(Ok);
474        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r"
475        E  direct(B)
476477        │ D  direct(A)
478        │ │
479        │ │ C  direct(B)
480        ├───╯
481        B │  direct(A)
482        ├─╯
483        A
484        ");
485        // D-A is found earlier than B-A, but B is emitted first because it belongs to
486        // the emitting branch.
487        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r"
488        E  direct(B)
489490        │ C  direct(B)
491        ├─╯
492        B  direct(A)
493494        │ D  direct(A)
495        ├─╯
496        A
497        ");
498
499        // E can be lazy, then D and C will be queued.
500        let mut iter = topo_grouped(graph.iter().cloned().peekable());
501        assert_eq!(iter.next().unwrap().unwrap().0, 'E');
502        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'D');
503        assert_eq!(iter.next().unwrap().unwrap().0, 'C');
504        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'B');
505        assert_eq!(iter.next().unwrap().unwrap().0, 'B');
506        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'A');
507    }
508
509    #[test]
510    fn test_topo_grouped_fork_interleaved() {
511        let graph = [
512            ('F', vec![direct('D')]),
513            ('E', vec![direct('C')]),
514            ('D', vec![direct('B')]),
515            ('C', vec![direct('B')]),
516            ('B', vec![direct('A')]),
517            ('A', vec![]),
518        ]
519        .map(Ok);
520        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r"
521        F  direct(D)
522523        │ E  direct(C)
524        │ │
525        D │  direct(B)
526        │ │
527        │ C  direct(B)
528        ├─╯
529        B  direct(A)
530531        A
532        ");
533        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r"
534        F  direct(D)
535536        D  direct(B)
537538        │ E  direct(C)
539        │ │
540        │ C  direct(B)
541        ├─╯
542        B  direct(A)
543544        A
545        ");
546
547        // F can be lazy, then E will be queued, then C.
548        let mut iter = topo_grouped(graph.iter().cloned().peekable());
549        assert_eq!(iter.next().unwrap().unwrap().0, 'F');
550        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'E');
551        assert_eq!(iter.next().unwrap().unwrap().0, 'D');
552        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'C');
553        assert_eq!(iter.next().unwrap().unwrap().0, 'E');
554        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'B');
555    }
556
557    #[test]
558    fn test_topo_grouped_fork_multiple_heads() {
559        let graph = [
560            ('I', vec![direct('E')]),
561            ('H', vec![direct('C')]),
562            ('G', vec![direct('A')]),
563            ('F', vec![direct('E')]),
564            ('E', vec![direct('C')]),
565            ('D', vec![direct('C')]),
566            ('C', vec![direct('A')]),
567            ('B', vec![direct('A')]),
568            ('A', vec![]),
569        ]
570        .map(Ok);
571        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r"
572        I  direct(E)
573574        │ H  direct(C)
575        │ │
576        │ │ G  direct(A)
577        │ │ │
578        │ │ │ F  direct(E)
579        ├─────╯
580        E │ │  direct(C)
581        ├─╯ │
582        │ D │  direct(C)
583        ├─╯ │
584        C   │  direct(A)
585        ├───╯
586        │ B  direct(A)
587        ├─╯
588        A
589        ");
590        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r"
591        I  direct(E)
592593        │ F  direct(E)
594        ├─╯
595        E  direct(C)
596597        │ H  direct(C)
598        ├─╯
599        │ D  direct(C)
600        ├─╯
601        C  direct(A)
602603        │ G  direct(A)
604        ├─╯
605        │ B  direct(A)
606        ├─╯
607        A
608        ");
609
610        // I can be lazy, then H, G, and F will be queued.
611        let mut iter = topo_grouped(graph.iter().cloned().peekable());
612        assert_eq!(iter.next().unwrap().unwrap().0, 'I');
613        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'H');
614        assert_eq!(iter.next().unwrap().unwrap().0, 'F');
615        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'E');
616    }
617
618    #[test]
619    fn test_topo_grouped_fork_parallel() {
620        let graph = [
621            // Pull all sub graphs in reverse order:
622            ('I', vec![direct('A')]),
623            ('H', vec![direct('C')]),
624            ('G', vec![direct('E')]),
625            // Orphan sub graph G,F-E:
626            ('F', vec![direct('E')]),
627            ('E', vec![missing('Y')]),
628            // Orphan sub graph H,D-C:
629            ('D', vec![direct('C')]),
630            ('C', vec![missing('X')]),
631            // Orphan sub graph I,B-A:
632            ('B', vec![direct('A')]),
633            ('A', vec![]),
634        ]
635        .map(Ok);
636        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r"
637        I  direct(A)
638639        │ H  direct(C)
640        │ │
641        │ │ G  direct(E)
642        │ │ │
643        │ │ │ F  direct(E)
644        │ │ ├─╯
645        │ │ E  missing(Y)
646        │ │ │
647        │ │ ~
648        │ │
649        │ │ D  direct(C)
650        │ ├─╯
651        │ C  missing(X)
652        │ │
653        │ ~
654655        │ B  direct(A)
656        ├─╯
657        A
658        ");
659        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r"
660        I  direct(A)
661662        │ B  direct(A)
663        ├─╯
664        A
665
666        H  direct(C)
667668        │ D  direct(C)
669        ├─╯
670        C  missing(X)
671672        ~
673
674        G  direct(E)
675676        │ F  direct(E)
677        ├─╯
678        E  missing(Y)
679680        ~
681        ");
682    }
683
684    #[test]
685    fn test_topo_grouped_fork_nested() {
686        fn sub_graph(
687            chars: impl IntoIterator<Item = char>,
688            root_edges: Vec<GraphEdge<char>>,
689        ) -> Vec<GraphNode<char>> {
690            let [b, c, d, e, f]: [char; 5] = chars.into_iter().collect_vec().try_into().unwrap();
691            vec![
692                (f, vec![direct(c)]),
693                (e, vec![direct(b)]),
694                (d, vec![direct(c)]),
695                (c, vec![direct(b)]),
696                (b, root_edges),
697            ]
698        }
699
700        // One nested fork sub graph from A
701        let graph = itertools::chain!(
702            vec![('G', vec![direct('A')])],
703            sub_graph('B'..='F', vec![direct('A')]),
704            vec![('A', vec![])],
705        )
706        .map(Ok)
707        .collect_vec();
708        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r"
709        G  direct(A)
710711        │ F  direct(C)
712        │ │
713        │ │ E  direct(B)
714        │ │ │
715        │ │ │ D  direct(C)
716        │ ├───╯
717        │ C │  direct(B)
718        │ ├─╯
719        │ B  direct(A)
720        ├─╯
721        A
722        ");
723        // A::F is picked at A, and A will be unblocked. Then, C::D at C, ...
724        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r"
725        G  direct(A)
726727        │ F  direct(C)
728        │ │
729        │ │ D  direct(C)
730        │ ├─╯
731        │ C  direct(B)
732        │ │
733        │ │ E  direct(B)
734        │ ├─╯
735        │ B  direct(A)
736        ├─╯
737        A
738        ");
739
740        // Two nested fork sub graphs from A
741        let graph = itertools::chain!(
742            vec![('L', vec![direct('A')])],
743            sub_graph('G'..='K', vec![direct('A')]),
744            sub_graph('B'..='F', vec![direct('A')]),
745            vec![('A', vec![])],
746        )
747        .map(Ok)
748        .collect_vec();
749        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r"
750        L  direct(A)
751752        │ K  direct(H)
753        │ │
754        │ │ J  direct(G)
755        │ │ │
756        │ │ │ I  direct(H)
757        │ ├───╯
758        │ H │  direct(G)
759        │ ├─╯
760        │ G  direct(A)
761        ├─╯
762        │ F  direct(C)
763        │ │
764        │ │ E  direct(B)
765        │ │ │
766        │ │ │ D  direct(C)
767        │ ├───╯
768        │ C │  direct(B)
769        │ ├─╯
770        │ B  direct(A)
771        ├─╯
772        A
773        ");
774        // A::K is picked at A, and A will be unblocked. Then, H::I at H, ...
775        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r"
776        L  direct(A)
777778        │ K  direct(H)
779        │ │
780        │ │ I  direct(H)
781        │ ├─╯
782        │ H  direct(G)
783        │ │
784        │ │ J  direct(G)
785        │ ├─╯
786        │ G  direct(A)
787        ├─╯
788        │ F  direct(C)
789        │ │
790        │ │ D  direct(C)
791        │ ├─╯
792        │ C  direct(B)
793        │ │
794        │ │ E  direct(B)
795        │ ├─╯
796        │ B  direct(A)
797        ├─╯
798        A
799        ");
800
801        // Two nested fork sub graphs from A, interleaved
802        let graph = itertools::chain!(
803            vec![('L', vec![direct('A')])],
804            sub_graph(['C', 'E', 'G', 'I', 'K'], vec![direct('A')]),
805            sub_graph(['B', 'D', 'F', 'H', 'J'], vec![direct('A')]),
806            vec![('A', vec![])],
807        )
808        .sorted_by(|(id1, _), (id2, _)| id2.cmp(id1))
809        .map(Ok)
810        .collect_vec();
811        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r"
812        L  direct(A)
813814        │ K  direct(E)
815        │ │
816        │ │ J  direct(D)
817        │ │ │
818        │ │ │ I  direct(C)
819        │ │ │ │
820        │ │ │ │ H  direct(B)
821        │ │ │ │ │
822        │ │ │ │ │ G  direct(E)
823        │ ├───────╯
824        │ │ │ │ │ F  direct(D)
825        │ │ ├─────╯
826        │ E │ │ │  direct(C)
827        │ ├───╯ │
828        │ │ D   │  direct(B)
829        │ │ ├───╯
830        │ C │  direct(A)
831        ├─╯ │
832        │   B  direct(A)
833        ├───╯
834        A
835        ");
836        // A::K is picked at A, and A will be unblocked. Then, E::G at E, ...
837        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r"
838        L  direct(A)
839840        │ K  direct(E)
841        │ │
842        │ │ G  direct(E)
843        │ ├─╯
844        │ E  direct(C)
845        │ │
846        │ │ I  direct(C)
847        │ ├─╯
848        │ C  direct(A)
849        ├─╯
850        │ J  direct(D)
851        │ │
852        │ │ F  direct(D)
853        │ ├─╯
854        │ D  direct(B)
855        │ │
856        │ │ H  direct(B)
857        │ ├─╯
858        │ B  direct(A)
859        ├─╯
860        A
861        ");
862
863        // Merged fork sub graphs at K
864        let graph = itertools::chain!(
865            vec![('K', vec![direct('E'), direct('J')])],
866            sub_graph('F'..='J', vec![missing('Y')]),
867            sub_graph('A'..='E', vec![missing('X')]),
868        )
869        .map(Ok)
870        .collect_vec();
871        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r"
872        K    direct(E), direct(J)
873        ├─╮
874        │ J  direct(G)
875        │ │
876        │ │ I  direct(F)
877        │ │ │
878        │ │ │ H  direct(G)
879        │ ├───╯
880        │ G │  direct(F)
881        │ ├─╯
882        │ F  missing(Y)
883        │ │
884        │ ~
885886        E  direct(B)
887888        │ D  direct(A)
889        │ │
890        │ │ C  direct(B)
891        ├───╯
892        B │  direct(A)
893        ├─╯
894        A  missing(X)
895896        ~
897        ");
898        // K-E,J is resolved without queuing new heads. Then, G::H, F::I, B::C, and
899        // A::D.
900        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r"
901        K    direct(E), direct(J)
902        ├─╮
903        │ J  direct(G)
904        │ │
905        E │  direct(B)
906        │ │
907        │ │ H  direct(G)
908        │ ├─╯
909        │ G  direct(F)
910        │ │
911        │ │ I  direct(F)
912        │ ├─╯
913        │ F  missing(Y)
914        │ │
915        │ ~
916917        │ C  direct(B)
918        ├─╯
919        B  direct(A)
920921        │ D  direct(A)
922        ├─╯
923        A  missing(X)
924925        ~
926        ");
927
928        // Merged fork sub graphs at K, interleaved
929        let graph = itertools::chain!(
930            vec![('K', vec![direct('I'), direct('J')])],
931            sub_graph(['B', 'D', 'F', 'H', 'J'], vec![missing('Y')]),
932            sub_graph(['A', 'C', 'E', 'G', 'I'], vec![missing('X')]),
933        )
934        .sorted_by(|(id1, _), (id2, _)| id2.cmp(id1))
935        .map(Ok)
936        .collect_vec();
937        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r"
938        K    direct(I), direct(J)
939        ├─╮
940        │ J  direct(D)
941        │ │
942        I │  direct(C)
943        │ │
944        │ │ H  direct(B)
945        │ │ │
946        │ │ │ G  direct(A)
947        │ │ │ │
948        │ │ │ │ F  direct(D)
949        │ ├─────╯
950        │ │ │ │ E  direct(C)
951        ├───────╯
952        │ D │ │  direct(B)
953        │ ├─╯ │
954        C │   │  direct(A)
955        ├─────╯
956        │ B  missing(Y)
957        │ │
958        │ ~
959960        A  missing(X)
961962        ~
963        ");
964        // K-I,J is resolved without queuing new heads. Then, D::F, B::H, C::E, and
965        // A::G.
966        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r"
967        K    direct(I), direct(J)
968        ├─╮
969        │ J  direct(D)
970        │ │
971        I │  direct(C)
972        │ │
973        │ │ F  direct(D)
974        │ ├─╯
975        │ D  direct(B)
976        │ │
977        │ │ H  direct(B)
978        │ ├─╯
979        │ B  missing(Y)
980        │ │
981        │ ~
982983        │ E  direct(C)
984        ├─╯
985        C  direct(A)
986987        │ G  direct(A)
988        ├─╯
989        A  missing(X)
990991        ~
992        ");
993    }
994
995    #[test]
996    fn test_topo_grouped_merge_interleaved() {
997        let graph = [
998            ('F', vec![direct('E')]),
999            ('E', vec![direct('C'), direct('D')]),
1000            ('D', vec![direct('B')]),
1001            ('C', vec![direct('A')]),
1002            ('B', vec![direct('A')]),
1003            ('A', vec![]),
1004        ]
1005        .map(Ok);
1006        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r"
1007        F  direct(E)
10081009        E    direct(C), direct(D)
1010        ├─╮
1011        │ D  direct(B)
1012        │ │
1013        C │  direct(A)
1014        │ │
1015        │ B  direct(A)
1016        ├─╯
1017        A
1018        ");
1019        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r"
1020        F  direct(E)
10211022        E    direct(C), direct(D)
1023        ├─╮
1024        │ D  direct(B)
1025        │ │
1026        │ B  direct(A)
1027        │ │
1028        C │  direct(A)
1029        ├─╯
1030        A
1031        ");
1032
1033        // F, E, and D can be lazy, then C will be queued, then B.
1034        let mut iter = topo_grouped(graph.iter().cloned().peekable());
1035        assert_eq!(iter.next().unwrap().unwrap().0, 'F');
1036        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'E');
1037        assert_eq!(iter.next().unwrap().unwrap().0, 'E');
1038        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'D');
1039        assert_eq!(iter.next().unwrap().unwrap().0, 'D');
1040        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'C');
1041        assert_eq!(iter.next().unwrap().unwrap().0, 'B');
1042        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'A');
1043    }
1044
1045    #[test]
1046    fn test_topo_grouped_merge_but_missing() {
1047        let graph = [
1048            ('E', vec![direct('D')]),
1049            ('D', vec![missing('Y'), direct('C')]),
1050            ('C', vec![direct('B'), missing('X')]),
1051            ('B', vec![direct('A')]),
1052            ('A', vec![]),
1053        ]
1054        .map(Ok);
1055        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r"
1056        E  direct(D)
10571058        D    missing(Y), direct(C)
1059        ├─╮
1060        │ │
1061        ~ │
10621063          C  direct(B), missing(X)
1064        ╭─┤
1065        │ │
1066        ~ │
10671068          B  direct(A)
10691070          A
1071        ");
1072        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r"
1073        E  direct(D)
10741075        D    missing(Y), direct(C)
1076        ├─╮
1077        │ │
1078        ~ │
10791080          C  direct(B), missing(X)
1081        ╭─┤
1082        │ │
1083        ~ │
10841085          B  direct(A)
10861087          A
1088        ");
1089
1090        // All nodes can be lazily emitted.
1091        let mut iter = topo_grouped(graph.iter().cloned().peekable());
1092        assert_eq!(iter.next().unwrap().unwrap().0, 'E');
1093        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'D');
1094        assert_eq!(iter.next().unwrap().unwrap().0, 'D');
1095        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'C');
1096        assert_eq!(iter.next().unwrap().unwrap().0, 'C');
1097        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'B');
1098        assert_eq!(iter.next().unwrap().unwrap().0, 'B');
1099        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'A');
1100    }
1101
1102    #[test]
1103    fn test_topo_grouped_merge_criss_cross() {
1104        let graph = [
1105            ('G', vec![direct('E')]),
1106            ('F', vec![direct('D')]),
1107            ('E', vec![direct('B'), direct('C')]),
1108            ('D', vec![direct('B'), direct('C')]),
1109            ('C', vec![direct('A')]),
1110            ('B', vec![direct('A')]),
1111            ('A', vec![]),
1112        ]
1113        .map(Ok);
1114        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r"
1115        G  direct(E)
11161117        │ F  direct(D)
1118        │ │
1119        E │    direct(B), direct(C)
1120        ├───╮
1121        │ D │  direct(B), direct(C)
1122        ╭─┴─╮
1123        │   C  direct(A)
1124        │   │
1125        B   │  direct(A)
1126        ├───╯
1127        A
1128        ");
1129        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r"
1130        G  direct(E)
11311132        E    direct(B), direct(C)
1133        ├─╮
1134        │ │ F  direct(D)
1135        │ │ │
1136        │ │ D  direct(B), direct(C)
1137        ╭─┬─╯
1138        │ C  direct(A)
1139        │ │
1140        B │  direct(A)
1141        ├─╯
1142        A
1143        ");
1144    }
1145
1146    #[test]
1147    fn test_topo_grouped_merge_descendants_interleaved() {
1148        let graph = [
1149            ('H', vec![direct('F')]),
1150            ('G', vec![direct('E')]),
1151            ('F', vec![direct('D')]),
1152            ('E', vec![direct('C')]),
1153            ('D', vec![direct('C'), direct('B')]),
1154            ('C', vec![direct('A')]),
1155            ('B', vec![direct('A')]),
1156            ('A', vec![]),
1157        ]
1158        .map(Ok);
1159        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r"
1160        H  direct(F)
11611162        │ G  direct(E)
1163        │ │
1164        F │  direct(D)
1165        │ │
1166        │ E  direct(C)
1167        │ │
1168        D │  direct(C), direct(B)
1169        ├─╮
1170        │ C  direct(A)
1171        │ │
1172        B │  direct(A)
1173        ├─╯
1174        A
1175        ");
1176        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r"
1177        H  direct(F)
11781179        F  direct(D)
11801181        D    direct(C), direct(B)
1182        ├─╮
1183        │ B  direct(A)
1184        │ │
1185        │ │ G  direct(E)
1186        │ │ │
1187        │ │ E  direct(C)
1188        ├───╯
1189        C │  direct(A)
1190        ├─╯
1191        A
1192        ");
1193    }
1194
1195    #[test]
1196    fn test_topo_grouped_merge_multiple_roots() {
1197        let graph = [
1198            ('D', vec![direct('C')]),
1199            ('C', vec![direct('B'), direct('A')]),
1200            ('B', vec![missing('X')]),
1201            ('A', vec![]),
1202        ]
1203        .map(Ok);
1204        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r"
1205        D  direct(C)
12061207        C    direct(B), direct(A)
1208        ├─╮
1209        B │  missing(X)
1210        │ │
1211        ~ │
12121213          A
1214        ");
1215        // A is emitted first because it's the second parent.
1216        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r"
1217        D  direct(C)
12181219        C    direct(B), direct(A)
1220        ├─╮
1221        │ A
12221223        B  missing(X)
12241225        ~
1226        ");
1227    }
1228
1229    #[test]
1230    fn test_topo_grouped_merge_stairs() {
1231        let graph = [
1232            // Merge topic branches one by one:
1233            ('J', vec![direct('I'), direct('G')]),
1234            ('I', vec![direct('H'), direct('E')]),
1235            ('H', vec![direct('D'), direct('F')]),
1236            // Topic branches:
1237            ('G', vec![direct('D')]),
1238            ('F', vec![direct('C')]),
1239            ('E', vec![direct('B')]),
1240            // Base nodes:
1241            ('D', vec![direct('C')]),
1242            ('C', vec![direct('B')]),
1243            ('B', vec![direct('A')]),
1244            ('A', vec![]),
1245        ]
1246        .map(Ok);
1247        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r"
1248        J    direct(I), direct(G)
1249        ├─╮
1250        I │    direct(H), direct(E)
1251        ├───╮
1252        H │ │    direct(D), direct(F)
1253        ├─────╮
1254        │ G │ │  direct(D)
1255        ├─╯ │ │
1256        │   │ F  direct(C)
1257        │   │ │
1258        │   E │  direct(B)
1259        │   │ │
1260        D   │ │  direct(C)
1261        ├─────╯
1262        C   │  direct(B)
1263        ├───╯
1264        B  direct(A)
12651266        A
1267        ");
1268        // Second branches are visited first.
1269        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r"
1270        J    direct(I), direct(G)
1271        ├─╮
1272        │ G  direct(D)
1273        │ │
1274        I │    direct(H), direct(E)
1275        ├───╮
1276        │ │ E  direct(B)
1277        │ │ │
1278        H │ │  direct(D), direct(F)
1279        ├─╮ │
1280        F │ │  direct(C)
1281        │ │ │
1282        │ D │  direct(C)
1283        ├─╯ │
1284        C   │  direct(B)
1285        ├───╯
1286        B  direct(A)
12871288        A
1289        ");
1290    }
1291
1292    #[test]
1293    fn test_topo_grouped_merge_and_fork() {
1294        let graph = [
1295            ('J', vec![direct('F')]),
1296            ('I', vec![direct('E')]),
1297            ('H', vec![direct('G')]),
1298            ('G', vec![direct('D'), direct('E')]),
1299            ('F', vec![direct('C')]),
1300            ('E', vec![direct('B')]),
1301            ('D', vec![direct('B')]),
1302            ('C', vec![direct('A')]),
1303            ('B', vec![direct('A')]),
1304            ('A', vec![]),
1305        ]
1306        .map(Ok);
1307        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r"
1308        J  direct(F)
13091310        │ I  direct(E)
1311        │ │
1312        │ │ H  direct(G)
1313        │ │ │
1314        │ │ G  direct(D), direct(E)
1315        │ ╭─┤
1316        F │ │  direct(C)
1317        │ │ │
1318        │ E │  direct(B)
1319        │ │ │
1320        │ │ D  direct(B)
1321        │ ├─╯
1322        C │  direct(A)
1323        │ │
1324        │ B  direct(A)
1325        ├─╯
1326        A
1327        ");
1328        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r"
1329        J  direct(F)
13301331        F  direct(C)
13321333        C  direct(A)
13341335        │ I  direct(E)
1336        │ │
1337        │ │ H  direct(G)
1338        │ │ │
1339        │ │ G  direct(D), direct(E)
1340        │ ╭─┤
1341        │ E │  direct(B)
1342        │ │ │
1343        │ │ D  direct(B)
1344        │ ├─╯
1345        │ B  direct(A)
1346        ├─╯
1347        A
1348        ");
1349    }
1350
1351    #[test]
1352    fn test_topo_grouped_merge_and_fork_multiple_roots() {
1353        let graph = [
1354            ('J', vec![direct('F')]),
1355            ('I', vec![direct('G')]),
1356            ('H', vec![direct('E')]),
1357            ('G', vec![direct('E'), direct('B')]),
1358            ('F', vec![direct('D')]),
1359            ('E', vec![direct('C')]),
1360            ('D', vec![direct('A')]),
1361            ('C', vec![direct('A')]),
1362            ('B', vec![missing('X')]),
1363            ('A', vec![]),
1364        ]
1365        .map(Ok);
1366        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r"
1367        J  direct(F)
13681369        │ I  direct(G)
1370        │ │
1371        │ │ H  direct(E)
1372        │ │ │
1373        │ G │  direct(E), direct(B)
1374        │ ├─╮
1375        F │ │  direct(D)
1376        │ │ │
1377        │ │ E  direct(C)
1378        │ │ │
1379        D │ │  direct(A)
1380        │ │ │
1381        │ │ C  direct(A)
1382        ├───╯
1383        │ B  missing(X)
1384        │ │
1385        │ ~
13861387        A
1388        ");
1389        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r"
1390        J  direct(F)
13911392        F  direct(D)
13931394        D  direct(A)
13951396        │ I  direct(G)
1397        │ │
1398        │ G    direct(E), direct(B)
1399        │ ├─╮
1400        │ │ B  missing(X)
1401        │ │ │
1402        │ │ ~
1403        │ │
1404        │ │ H  direct(E)
1405        │ ├─╯
1406        │ E  direct(C)
1407        │ │
1408        │ C  direct(A)
1409        ├─╯
1410        A
1411        ");
1412    }
1413
1414    #[test]
1415    fn test_topo_grouped_parallel_interleaved() {
1416        let graph = [
1417            ('E', vec![direct('C')]),
1418            ('D', vec![direct('B')]),
1419            ('C', vec![direct('A')]),
1420            ('B', vec![missing('X')]),
1421            ('A', vec![]),
1422        ]
1423        .map(Ok);
1424        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r"
1425        E  direct(C)
14261427        │ D  direct(B)
1428        │ │
1429        C │  direct(A)
1430        │ │
1431        │ B  missing(X)
1432        │ │
1433        │ ~
14341435        A
1436        ");
1437        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r"
1438        E  direct(C)
14391440        C  direct(A)
14411442        A
1443
1444        D  direct(B)
14451446        B  missing(X)
14471448        ~
1449        ");
1450    }
1451
1452    #[test]
1453    fn test_topo_grouped_multiple_child_dependencies() {
1454        let graph = [
1455            ('I', vec![direct('H'), direct('G')]),
1456            ('H', vec![direct('D')]),
1457            ('G', vec![direct('B')]),
1458            ('F', vec![direct('E'), direct('C')]),
1459            ('E', vec![direct('D')]),
1460            ('D', vec![direct('B')]),
1461            ('C', vec![direct('B')]),
1462            ('B', vec![direct('A')]),
1463            ('A', vec![]),
1464        ]
1465        .map(Ok);
1466        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r"
1467        I    direct(H), direct(G)
1468        ├─╮
1469        H │  direct(D)
1470        │ │
1471        │ G  direct(B)
1472        │ │
1473        │ │ F    direct(E), direct(C)
1474        │ │ ├─╮
1475        │ │ E │  direct(D)
1476        ├───╯ │
1477        D │   │  direct(B)
1478        ├─╯   │
1479        │     C  direct(B)
1480        ├─────╯
1481        B  direct(A)
14821483        A
1484        ");
1485        // Topological order must be preserved. Depending on the implementation,
1486        // E might be requested more than once by paths D->E and B->D->E.
1487        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r"
1488        I    direct(H), direct(G)
1489        ├─╮
1490        │ G  direct(B)
1491        │ │
1492        H │  direct(D)
1493        │ │
1494        │ │ F    direct(E), direct(C)
1495        │ │ ├─╮
1496        │ │ │ C  direct(B)
1497        │ ├───╯
1498        │ │ E  direct(D)
1499        ├───╯
1500        D │  direct(B)
1501        ├─╯
1502        B  direct(A)
15031504        A
1505        ");
1506    }
1507
1508    #[test]
1509    fn test_topo_grouped_prioritized_branches_trivial_fork() {
1510        // The same setup as test_topo_grouped_trivial_fork()
1511        let graph = [
1512            ('E', vec![direct('B')]),
1513            ('D', vec![direct('A')]),
1514            ('C', vec![direct('B')]),
1515            ('B', vec![direct('A')]),
1516            ('A', vec![]),
1517        ]
1518        .map(Ok);
1519        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r"
1520        E  direct(B)
15211522        │ D  direct(A)
1523        │ │
1524        │ │ C  direct(B)
1525        ├───╯
1526        B │  direct(A)
1527        ├─╯
1528        A
1529        ");
1530
1531        // Emit the branch C first
1532        let mut iter = topo_grouped(graph.iter().cloned());
1533        iter.prioritize_branch('C');
1534        insta::assert_snapshot!(format_graph(iter), @r"
1535        C  direct(B)
15361537        │ E  direct(B)
1538        ├─╯
1539        B  direct(A)
15401541        │ D  direct(A)
1542        ├─╯
1543        A
1544        ");
1545
1546        // Emit the branch D first
1547        let mut iter = topo_grouped(graph.iter().cloned());
1548        iter.prioritize_branch('D');
1549        insta::assert_snapshot!(format_graph(iter), @r"
1550        D  direct(A)
15511552        │ E  direct(B)
1553        │ │
1554        │ │ C  direct(B)
1555        │ ├─╯
1556        │ B  direct(A)
1557        ├─╯
1558        A
1559        ");
1560
1561        // Emit the branch C first, then D. E is emitted earlier than D because
1562        // E belongs to the branch C compared to the branch D.
1563        let mut iter = topo_grouped(graph.iter().cloned());
1564        iter.prioritize_branch('C');
1565        iter.prioritize_branch('D');
1566        insta::assert_snapshot!(format_graph(iter), @r"
1567        C  direct(B)
15681569        │ E  direct(B)
1570        ├─╯
1571        B  direct(A)
15721573        │ D  direct(A)
1574        ├─╯
1575        A
1576        ");
1577
1578        // Non-head node can be prioritized
1579        let mut iter = topo_grouped(graph.iter().cloned());
1580        iter.prioritize_branch('B');
1581        insta::assert_snapshot!(format_graph(iter), @r"
1582        E  direct(B)
15831584        │ C  direct(B)
1585        ├─╯
1586        B  direct(A)
15871588        │ D  direct(A)
1589        ├─╯
1590        A
1591        ");
1592
1593        // Root node can be prioritized
1594        let mut iter = topo_grouped(graph.iter().cloned());
1595        iter.prioritize_branch('A');
1596        insta::assert_snapshot!(format_graph(iter), @r"
1597        D  direct(A)
15981599        │ E  direct(B)
1600        │ │
1601        │ │ C  direct(B)
1602        │ ├─╯
1603        │ B  direct(A)
1604        ├─╯
1605        A
1606        ");
1607    }
1608
1609    #[test]
1610    fn test_topo_grouped_prioritized_branches_fork_multiple_heads() {
1611        // The same setup as test_topo_grouped_fork_multiple_heads()
1612        let graph = [
1613            ('I', vec![direct('E')]),
1614            ('H', vec![direct('C')]),
1615            ('G', vec![direct('A')]),
1616            ('F', vec![direct('E')]),
1617            ('E', vec![direct('C')]),
1618            ('D', vec![direct('C')]),
1619            ('C', vec![direct('A')]),
1620            ('B', vec![direct('A')]),
1621            ('A', vec![]),
1622        ]
1623        .map(Ok);
1624        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r"
1625        I  direct(E)
16261627        │ H  direct(C)
1628        │ │
1629        │ │ G  direct(A)
1630        │ │ │
1631        │ │ │ F  direct(E)
1632        ├─────╯
1633        E │ │  direct(C)
1634        ├─╯ │
1635        │ D │  direct(C)
1636        ├─╯ │
1637        C   │  direct(A)
1638        ├───╯
1639        │ B  direct(A)
1640        ├─╯
1641        A
1642        ");
1643
1644        // Emit B, G, then remainders
1645        let mut iter = topo_grouped(graph.iter().cloned());
1646        iter.prioritize_branch('B');
1647        iter.prioritize_branch('G');
1648        insta::assert_snapshot!(format_graph(iter), @r"
1649        B  direct(A)
16501651        │ G  direct(A)
1652        ├─╯
1653        │ I  direct(E)
1654        │ │
1655        │ │ F  direct(E)
1656        │ ├─╯
1657        │ E  direct(C)
1658        │ │
1659        │ │ H  direct(C)
1660        │ ├─╯
1661        │ │ D  direct(C)
1662        │ ├─╯
1663        │ C  direct(A)
1664        ├─╯
1665        A
1666        ");
1667
1668        // Emit D, H, then descendants of C. The order of B and G is not
1669        // respected because G can be found earlier through C->A->G. At this
1670        // point, B is not populated yet, so A is blocked only by {G}. This is
1671        // a limitation of the current node reordering logic.
1672        let mut iter = topo_grouped(graph.iter().cloned());
1673        iter.prioritize_branch('D');
1674        iter.prioritize_branch('H');
1675        iter.prioritize_branch('B');
1676        iter.prioritize_branch('G');
1677        insta::assert_snapshot!(format_graph(iter), @r"
1678        D  direct(C)
16791680        │ H  direct(C)
1681        ├─╯
1682        │ I  direct(E)
1683        │ │
1684        │ │ F  direct(E)
1685        │ ├─╯
1686        │ E  direct(C)
1687        ├─╯
1688        C  direct(A)
16891690        │ G  direct(A)
1691        ├─╯
1692        │ B  direct(A)
1693        ├─╯
1694        A
1695        ");
1696    }
1697
1698    #[test]
1699    fn test_topo_grouped_prioritized_branches_fork_parallel() {
1700        // The same setup as test_topo_grouped_fork_parallel()
1701        let graph = [
1702            // Pull all sub graphs in reverse order:
1703            ('I', vec![direct('A')]),
1704            ('H', vec![direct('C')]),
1705            ('G', vec![direct('E')]),
1706            // Orphan sub graph G,F-E:
1707            ('F', vec![direct('E')]),
1708            ('E', vec![missing('Y')]),
1709            // Orphan sub graph H,D-C:
1710            ('D', vec![direct('C')]),
1711            ('C', vec![missing('X')]),
1712            // Orphan sub graph I,B-A:
1713            ('B', vec![direct('A')]),
1714            ('A', vec![]),
1715        ]
1716        .map(Ok);
1717        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r"
1718        I  direct(A)
17191720        │ H  direct(C)
1721        │ │
1722        │ │ G  direct(E)
1723        │ │ │
1724        │ │ │ F  direct(E)
1725        │ │ ├─╯
1726        │ │ E  missing(Y)
1727        │ │ │
1728        │ │ ~
1729        │ │
1730        │ │ D  direct(C)
1731        │ ├─╯
1732        │ C  missing(X)
1733        │ │
1734        │ ~
17351736        │ B  direct(A)
1737        ├─╯
1738        A
1739        ");
1740
1741        // Emit the sub graph G first
1742        let mut iter = topo_grouped(graph.iter().cloned());
1743        iter.prioritize_branch('G');
1744        insta::assert_snapshot!(format_graph(iter), @r"
1745        G  direct(E)
17461747        │ F  direct(E)
1748        ├─╯
1749        E  missing(Y)
17501751        ~
1752
1753        I  direct(A)
17541755        │ B  direct(A)
1756        ├─╯
1757        A
1758
1759        H  direct(C)
17601761        │ D  direct(C)
1762        ├─╯
1763        C  missing(X)
17641765        ~
1766        ");
1767
1768        // Emit sub graphs in reverse order by selecting roots
1769        let mut iter = topo_grouped(graph.iter().cloned());
1770        iter.prioritize_branch('E');
1771        iter.prioritize_branch('C');
1772        iter.prioritize_branch('A');
1773        insta::assert_snapshot!(format_graph(iter), @r"
1774        G  direct(E)
17751776        │ F  direct(E)
1777        ├─╯
1778        E  missing(Y)
17791780        ~
1781
1782        H  direct(C)
17831784        │ D  direct(C)
1785        ├─╯
1786        C  missing(X)
17871788        ~
1789
1790        I  direct(A)
17911792        │ B  direct(A)
1793        ├─╯
1794        A
1795        ");
1796    }
1797
1798    #[test]
1799    fn test_topo_grouped_requeue_unpopulated() {
1800        let graph = [
1801            ('C', vec![direct('A'), direct('B')]),
1802            ('B', vec![direct('A')]),
1803            ('A', vec![]),
1804        ]
1805        .map(Ok);
1806        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r"
1807        C    direct(A), direct(B)
1808        ├─╮
1809        │ B  direct(A)
1810        ├─╯
1811        A
1812        ");
1813        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r"
1814        C    direct(A), direct(B)
1815        ├─╮
1816        │ B  direct(A)
1817        ├─╯
1818        A
1819        ");
1820
1821        // A is queued once by C-A because B isn't populated at this point. Since
1822        // B is the second parent, B-A is processed next and A is queued again. So
1823        // one of them in the queue has to be ignored.
1824        let mut iter = topo_grouped(graph.iter().cloned());
1825        assert_eq!(iter.next().unwrap().unwrap().0, 'C');
1826        assert_eq!(iter.emittable_ids, vec!['A', 'B']);
1827        assert_eq!(iter.next().unwrap().unwrap().0, 'B');
1828        assert_eq!(iter.emittable_ids, vec!['A', 'A']);
1829        assert_eq!(iter.next().unwrap().unwrap().0, 'A');
1830        assert!(iter.next().is_none());
1831        assert!(iter.emittable_ids.is_empty());
1832    }
1833
1834    #[test]
1835    fn test_topo_grouped_duplicated_edges() {
1836        // The graph shouldn't have duplicated parent->child edges, but topo-grouped
1837        // iterator can handle it anyway.
1838        let graph = [('B', vec![direct('A'), direct('A')]), ('A', vec![])].map(Ok);
1839        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r"
1840        B  direct(A), direct(A)
18411842        A
1843        ");
1844        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r"
1845        B  direct(A), direct(A)
18461847        A
1848        ");
1849
1850        let mut iter = topo_grouped(graph.iter().cloned());
1851        assert_eq!(iter.next().unwrap().unwrap().0, 'B');
1852        assert_eq!(iter.emittable_ids, vec!['A', 'A']);
1853        assert_eq!(iter.next().unwrap().unwrap().0, 'A');
1854        assert!(iter.next().is_none());
1855        assert!(iter.emittable_ids.is_empty());
1856    }
1857}