jj_lib/
graph.rs

1// Copyright 2021-2023 The Jujutsu Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(missing_docs)]
16
17use std::collections::HashMap;
18use std::collections::HashSet;
19use std::collections::VecDeque;
20use std::hash::Hash;
21
22/// Node and edges pair of type `N` and `ID` respectively.
23///
24/// `ID` uniquely identifies a node within the graph. It's usually cheap to
25/// clone. There should be a pure `(&N) -> &ID` function.
26pub type GraphNode<N, ID = N> = (N, Vec<GraphEdge<ID>>);
27
28#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
29pub struct GraphEdge<N> {
30    pub target: N,
31    pub edge_type: GraphEdgeType,
32}
33
34impl<N> GraphEdge<N> {
35    pub fn missing(target: N) -> Self {
36        Self {
37            target,
38            edge_type: GraphEdgeType::Missing,
39        }
40    }
41
42    pub fn direct(target: N) -> Self {
43        Self {
44            target,
45            edge_type: GraphEdgeType::Direct,
46        }
47    }
48
49    pub fn indirect(target: N) -> Self {
50        Self {
51            target,
52            edge_type: GraphEdgeType::Indirect,
53        }
54    }
55
56    pub fn map<M>(self, f: impl FnOnce(N) -> M) -> GraphEdge<M> {
57        GraphEdge {
58            target: f(self.target),
59            edge_type: self.edge_type,
60        }
61    }
62}
63
64#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
65pub enum GraphEdgeType {
66    Missing,
67    Direct,
68    Indirect,
69}
70
71fn reachable_targets<N>(edges: &[GraphEdge<N>]) -> impl DoubleEndedIterator<Item = &N> {
72    edges
73        .iter()
74        .filter(|edge| edge.edge_type != GraphEdgeType::Missing)
75        .map(|edge| &edge.target)
76}
77
78/// Creates new graph in which nodes and edges are reversed.
79pub fn reverse_graph<N, ID: Clone + Eq + Hash, E>(
80    input: impl Iterator<Item = Result<GraphNode<N, ID>, E>>,
81    as_id: impl Fn(&N) -> &ID,
82) -> Result<Vec<GraphNode<N, ID>>, E> {
83    let mut entries = vec![];
84    let mut reverse_edges: HashMap<ID, Vec<GraphEdge<ID>>> = HashMap::new();
85    for item in input {
86        let (node, edges) = item?;
87        for GraphEdge { target, edge_type } in edges {
88            reverse_edges.entry(target).or_default().push(GraphEdge {
89                target: as_id(&node).clone(),
90                edge_type,
91            });
92        }
93        entries.push(node);
94    }
95
96    let mut items = vec![];
97    for node in entries.into_iter().rev() {
98        let edges = reverse_edges.remove(as_id(&node)).unwrap_or_default();
99        items.push((node, edges));
100    }
101    Ok(items)
102}
103
104/// Graph iterator adapter to group topological branches.
105///
106/// Basic idea is DFS from the heads. At fork point, the other descendant
107/// branches will be visited. At merge point, the second (or the last) ancestor
108/// branch will be visited first. This is practically [the same as Git][Git].
109///
110/// If no branches are prioritized, the branch containing the first commit in
111/// the input iterator will be emitted first. It is often the working-copy
112/// ancestor branch. The other head branches won't be enqueued eagerly, and will
113/// be emitted as late as possible.
114///
115/// [Git]: https://github.blog/2022-08-30-gits-database-internals-ii-commit-history-queries/#topological-sorting
116#[derive(Clone, Debug)]
117pub struct TopoGroupedGraphIterator<N, I> {
118    input_iter: I,
119    /// Graph nodes read from the input iterator but not yet emitted.
120    nodes: HashMap<N, TopoGroupedGraphNode<N>>,
121    /// Stack of graph nodes to be emitted.
122    emittable_ids: Vec<N>,
123    /// List of new head nodes found while processing unpopulated nodes, or
124    /// prioritized branch nodes added by caller.
125    new_head_ids: VecDeque<N>,
126    /// Set of nodes which may be ancestors of `new_head_ids`.
127    blocked_ids: HashSet<N>,
128}
129
130#[derive(Clone, Debug)]
131struct TopoGroupedGraphNode<N> {
132    /// Graph nodes which must be emitted before.
133    child_ids: HashSet<N>,
134    /// Graph edges to parent nodes. `None` until this node is populated.
135    edges: Option<Vec<GraphEdge<N>>>,
136}
137
138impl<N> Default for TopoGroupedGraphNode<N> {
139    fn default() -> Self {
140        Self {
141            child_ids: Default::default(),
142            edges: None,
143        }
144    }
145}
146
147impl<N, E, I> TopoGroupedGraphIterator<N, I>
148where
149    N: Clone + Hash + Eq,
150    I: Iterator<Item = Result<GraphNode<N>, E>>,
151{
152    /// Wraps the given iterator to group topological branches. The input
153    /// iterator must be topologically ordered.
154    pub fn new(input_iter: I) -> Self {
155        TopoGroupedGraphIterator {
156            input_iter,
157            nodes: HashMap::new(),
158            emittable_ids: Vec::new(),
159            new_head_ids: VecDeque::new(),
160            blocked_ids: HashSet::new(),
161        }
162    }
163
164    /// Makes the branch containing the specified node be emitted earlier than
165    /// the others.
166    ///
167    /// The `id` usually points to a head node, but this isn't a requirement.
168    /// If the specified node isn't a head, all preceding nodes will be queued.
169    ///
170    /// The specified node must exist in the input iterator. If it didn't, the
171    /// iterator would panic.
172    pub fn prioritize_branch(&mut self, id: N) {
173        // Mark existence of unpopulated node
174        self.nodes.entry(id.clone()).or_default();
175        // Push to non-emitting list so the prioritized heads wouldn't be
176        // interleaved
177        self.new_head_ids.push_back(id);
178    }
179
180    fn populate_one(&mut self) -> Result<Option<()>, E> {
181        let (current_id, edges) = match self.input_iter.next() {
182            Some(Ok(data)) => data,
183            Some(Err(err)) => {
184                return Err(err);
185            }
186            None => {
187                return Ok(None);
188            }
189        };
190
191        // Set up reverse reference
192        for parent_id in reachable_targets(&edges) {
193            let parent_node = self.nodes.entry(parent_id.clone()).or_default();
194            parent_node.child_ids.insert(current_id.clone());
195        }
196
197        // Populate the current node
198        if let Some(current_node) = self.nodes.get_mut(&current_id) {
199            assert!(current_node.edges.is_none());
200            current_node.edges = Some(edges);
201        } else {
202            let current_node = TopoGroupedGraphNode {
203                edges: Some(edges),
204                ..Default::default()
205            };
206            self.nodes.insert(current_id.clone(), current_node);
207            // Push to non-emitting list so the new head wouldn't be interleaved
208            self.new_head_ids.push_back(current_id);
209        }
210
211        Ok(Some(()))
212    }
213
214    /// Enqueues the first new head which will unblock the waiting ancestors.
215    ///
216    /// This does not move multiple head nodes to the queue at once because
217    /// heads may be connected to the fork points in arbitrary order.
218    fn flush_new_head(&mut self) {
219        assert!(!self.new_head_ids.is_empty());
220        if self.blocked_ids.is_empty() || self.new_head_ids.len() <= 1 {
221            // Fast path: orphaned or no choice
222            let new_head_id = self.new_head_ids.pop_front().unwrap();
223            self.emittable_ids.push(new_head_id);
224            self.blocked_ids.clear();
225            return;
226        }
227
228        // Mark descendant nodes reachable from the blocking nodes
229        let mut to_visit: Vec<&N> = self
230            .blocked_ids
231            .iter()
232            .map(|id| {
233                // Borrow from self.nodes so self.blocked_ids can be mutated later
234                let (id, _) = self.nodes.get_key_value(id).unwrap();
235                id
236            })
237            .collect();
238        let mut visited: HashSet<&N> = to_visit.iter().copied().collect();
239        while let Some(id) = to_visit.pop() {
240            let node = self.nodes.get(id).unwrap();
241            to_visit.extend(node.child_ids.iter().filter(|id| visited.insert(id)));
242        }
243
244        // Pick the first reachable head
245        let index = self
246            .new_head_ids
247            .iter()
248            .position(|id| visited.contains(id))
249            .expect("blocking head should exist");
250        let new_head_id = self.new_head_ids.remove(index).unwrap();
251
252        // Unmark ancestors of the selected head so they won't contribute to future
253        // new-head resolution within the newly-unblocked sub graph. The sub graph
254        // can have many fork points, and the corresponding heads should be picked in
255        // the fork-point order, not in the head appearance order.
256        to_visit.push(&new_head_id);
257        visited.remove(&new_head_id);
258        while let Some(id) = to_visit.pop() {
259            let node = self.nodes.get(id).unwrap();
260            if let Some(edges) = &node.edges {
261                to_visit.extend(reachable_targets(edges).filter(|id| visited.remove(id)));
262            }
263        }
264        self.blocked_ids.retain(|id| visited.contains(id));
265        self.emittable_ids.push(new_head_id);
266    }
267
268    fn next_node(&mut self) -> Result<Option<GraphNode<N>>, E> {
269        // Based on Kahn's algorithm
270        loop {
271            if let Some(current_id) = self.emittable_ids.last() {
272                let Some(current_node) = self.nodes.get_mut(current_id) else {
273                    // Queued twice because new children populated and emitted
274                    self.emittable_ids.pop().unwrap();
275                    continue;
276                };
277                if !current_node.child_ids.is_empty() {
278                    // New children populated after emitting the other
279                    let current_id = self.emittable_ids.pop().unwrap();
280                    self.blocked_ids.insert(current_id);
281                    continue;
282                }
283                let Some(edges) = current_node.edges.take() else {
284                    // Not yet populated
285                    self.populate_one()?
286                        .expect("parent or prioritized node should exist");
287                    continue;
288                };
289                // The second (or the last) parent will be visited first
290                let current_id = self.emittable_ids.pop().unwrap();
291                self.nodes.remove(&current_id).unwrap();
292                for parent_id in reachable_targets(&edges) {
293                    let parent_node = self.nodes.get_mut(parent_id).unwrap();
294                    parent_node.child_ids.remove(&current_id);
295                    if parent_node.child_ids.is_empty() {
296                        let reusable_id = self.blocked_ids.take(parent_id);
297                        let parent_id = reusable_id.unwrap_or_else(|| parent_id.clone());
298                        self.emittable_ids.push(parent_id);
299                    } else {
300                        self.blocked_ids.insert(parent_id.clone());
301                    }
302                }
303                return Ok(Some((current_id, edges)));
304            } else if !self.new_head_ids.is_empty() {
305                self.flush_new_head();
306            } else {
307                // Populate the first or orphan head
308                if self.populate_one()?.is_none() {
309                    return Ok(None);
310                }
311            }
312        }
313    }
314}
315
316impl<N, E, I> Iterator for TopoGroupedGraphIterator<N, I>
317where
318    N: Clone + Hash + Eq,
319    I: Iterator<Item = Result<GraphNode<N>, E>>,
320{
321    type Item = Result<GraphNode<N>, E>;
322
323    fn next(&mut self) -> Option<Self::Item> {
324        match self.next_node() {
325            Ok(Some(node)) => Some(Ok(node)),
326            Ok(None) => {
327                assert!(self.nodes.is_empty(), "all nodes should have been emitted");
328                None
329            }
330            Err(err) => Some(Err(err)),
331        }
332    }
333}
334
335#[cfg(test)]
336mod tests {
337    use std::convert::Infallible;
338
339    use itertools::Itertools as _;
340    use renderdag::Ancestor;
341    use renderdag::GraphRowRenderer;
342    use renderdag::Renderer as _;
343
344    use super::*;
345
346    fn missing(c: char) -> GraphEdge<char> {
347        GraphEdge::missing(c)
348    }
349
350    fn direct(c: char) -> GraphEdge<char> {
351        GraphEdge::direct(c)
352    }
353
354    fn indirect(c: char) -> GraphEdge<char> {
355        GraphEdge::indirect(c)
356    }
357
358    fn format_edge(edge: &GraphEdge<char>) -> String {
359        let c = edge.target;
360        match edge.edge_type {
361            GraphEdgeType::Missing => format!("missing({c})"),
362            GraphEdgeType::Direct => format!("direct({c})"),
363            GraphEdgeType::Indirect => format!("indirect({c})"),
364        }
365    }
366
367    fn format_graph(
368        graph_iter: impl IntoIterator<Item = Result<GraphNode<char>, Infallible>>,
369    ) -> String {
370        let mut renderer = GraphRowRenderer::new()
371            .output()
372            .with_min_row_height(2)
373            .build_box_drawing();
374        graph_iter
375            .into_iter()
376            .map(|item| match item {
377                Ok(node) => node,
378                Err(err) => match err {},
379            })
380            .map(|(id, edges)| {
381                let glyph = id.to_string();
382                let message = edges.iter().map(format_edge).join(", ");
383                let parents = edges
384                    .into_iter()
385                    .map(|edge| match edge.edge_type {
386                        GraphEdgeType::Missing => Ancestor::Anonymous,
387                        GraphEdgeType::Direct => Ancestor::Parent(edge.target),
388                        GraphEdgeType::Indirect => Ancestor::Ancestor(edge.target),
389                    })
390                    .collect();
391                renderer.next_row(id, parents, glyph, message)
392            })
393            .collect()
394    }
395
396    #[test]
397    fn test_format_graph() {
398        let graph = [
399            ('D', vec![direct('C'), indirect('B')]),
400            ('C', vec![direct('A')]),
401            ('B', vec![missing('X')]),
402            ('A', vec![]),
403        ]
404        .map(Ok);
405        insta::assert_snapshot!(format_graph(graph), @r###"
406        D    direct(C), indirect(B)
407        ├─╮
408        C ╷  direct(A)
409        │ ╷
410        │ B  missing(X)
411        │ │
412        │ ~
413414        A
415
416        "###);
417    }
418
419    fn topo_grouped<I, E>(graph_iter: I) -> TopoGroupedGraphIterator<char, I::IntoIter>
420    where
421        I: IntoIterator<Item = Result<GraphNode<char>, E>>,
422    {
423        TopoGroupedGraphIterator::new(graph_iter.into_iter())
424    }
425
426    #[test]
427    fn test_topo_grouped_multiple_roots() {
428        let graph = [
429            ('C', vec![missing('Y')]),
430            ('B', vec![missing('X')]),
431            ('A', vec![]),
432        ]
433        .map(Ok);
434        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r###"
435        C  missing(Y)
436437        ~
438
439        B  missing(X)
440441        ~
442
443        A
444        "###);
445        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r###"
446        C  missing(Y)
447448        ~
449
450        B  missing(X)
451452        ~
453
454        A
455        "###);
456
457        // All nodes can be lazily emitted.
458        let mut iter = topo_grouped(graph.iter().cloned().peekable());
459        assert_eq!(iter.next().unwrap().unwrap().0, 'C');
460        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'B');
461        assert_eq!(iter.next().unwrap().unwrap().0, 'B');
462        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'A');
463    }
464
465    #[test]
466    fn test_topo_grouped_trivial_fork() {
467        let graph = [
468            ('E', vec![direct('B')]),
469            ('D', vec![direct('A')]),
470            ('C', vec![direct('B')]),
471            ('B', vec![direct('A')]),
472            ('A', vec![]),
473        ]
474        .map(Ok);
475        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r###"
476        E  direct(B)
477478        │ D  direct(A)
479        │ │
480        │ │ C  direct(B)
481        ├───╯
482        B │  direct(A)
483        ├─╯
484        A
485
486        "###);
487        // D-A is found earlier than B-A, but B is emitted first because it belongs to
488        // the emitting branch.
489        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r###"
490        E  direct(B)
491492        │ C  direct(B)
493        ├─╯
494        B  direct(A)
495496        │ D  direct(A)
497        ├─╯
498        A
499
500        "###);
501
502        // E can be lazy, then D and C will be queued.
503        let mut iter = topo_grouped(graph.iter().cloned().peekable());
504        assert_eq!(iter.next().unwrap().unwrap().0, 'E');
505        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'D');
506        assert_eq!(iter.next().unwrap().unwrap().0, 'C');
507        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'B');
508        assert_eq!(iter.next().unwrap().unwrap().0, 'B');
509        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'A');
510    }
511
512    #[test]
513    fn test_topo_grouped_fork_interleaved() {
514        let graph = [
515            ('F', vec![direct('D')]),
516            ('E', vec![direct('C')]),
517            ('D', vec![direct('B')]),
518            ('C', vec![direct('B')]),
519            ('B', vec![direct('A')]),
520            ('A', vec![]),
521        ]
522        .map(Ok);
523        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r###"
524        F  direct(D)
525526        │ E  direct(C)
527        │ │
528        D │  direct(B)
529        │ │
530        │ C  direct(B)
531        ├─╯
532        B  direct(A)
533534        A
535
536        "###);
537        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r###"
538        F  direct(D)
539540        D  direct(B)
541542        │ E  direct(C)
543        │ │
544        │ C  direct(B)
545        ├─╯
546        B  direct(A)
547548        A
549
550        "###);
551
552        // F can be lazy, then E will be queued, then C.
553        let mut iter = topo_grouped(graph.iter().cloned().peekable());
554        assert_eq!(iter.next().unwrap().unwrap().0, 'F');
555        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'E');
556        assert_eq!(iter.next().unwrap().unwrap().0, 'D');
557        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'C');
558        assert_eq!(iter.next().unwrap().unwrap().0, 'E');
559        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'B');
560    }
561
562    #[test]
563    fn test_topo_grouped_fork_multiple_heads() {
564        let graph = [
565            ('I', vec![direct('E')]),
566            ('H', vec![direct('C')]),
567            ('G', vec![direct('A')]),
568            ('F', vec![direct('E')]),
569            ('E', vec![direct('C')]),
570            ('D', vec![direct('C')]),
571            ('C', vec![direct('A')]),
572            ('B', vec![direct('A')]),
573            ('A', vec![]),
574        ]
575        .map(Ok);
576        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r###"
577        I  direct(E)
578579        │ H  direct(C)
580        │ │
581        │ │ G  direct(A)
582        │ │ │
583        │ │ │ F  direct(E)
584        ├─────╯
585        E │ │  direct(C)
586        ├─╯ │
587        │ D │  direct(C)
588        ├─╯ │
589        C   │  direct(A)
590        ├───╯
591        │ B  direct(A)
592        ├─╯
593        A
594
595        "###);
596        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r###"
597        I  direct(E)
598599        │ F  direct(E)
600        ├─╯
601        E  direct(C)
602603        │ H  direct(C)
604        ├─╯
605        │ D  direct(C)
606        ├─╯
607        C  direct(A)
608609        │ G  direct(A)
610        ├─╯
611        │ B  direct(A)
612        ├─╯
613        A
614
615        "###);
616
617        // I can be lazy, then H, G, and F will be queued.
618        let mut iter = topo_grouped(graph.iter().cloned().peekable());
619        assert_eq!(iter.next().unwrap().unwrap().0, 'I');
620        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'H');
621        assert_eq!(iter.next().unwrap().unwrap().0, 'F');
622        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'E');
623    }
624
625    #[test]
626    fn test_topo_grouped_fork_parallel() {
627        let graph = [
628            // Pull all sub graphs in reverse order:
629            ('I', vec![direct('A')]),
630            ('H', vec![direct('C')]),
631            ('G', vec![direct('E')]),
632            // Orphan sub graph G,F-E:
633            ('F', vec![direct('E')]),
634            ('E', vec![missing('Y')]),
635            // Orphan sub graph H,D-C:
636            ('D', vec![direct('C')]),
637            ('C', vec![missing('X')]),
638            // Orphan sub graph I,B-A:
639            ('B', vec![direct('A')]),
640            ('A', vec![]),
641        ]
642        .map(Ok);
643        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r###"
644        I  direct(A)
645646        │ H  direct(C)
647        │ │
648        │ │ G  direct(E)
649        │ │ │
650        │ │ │ F  direct(E)
651        │ │ ├─╯
652        │ │ E  missing(Y)
653        │ │ │
654        │ │ ~
655        │ │
656        │ │ D  direct(C)
657        │ ├─╯
658        │ C  missing(X)
659        │ │
660        │ ~
661662        │ B  direct(A)
663        ├─╯
664        A
665
666        "###);
667        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r###"
668        I  direct(A)
669670        │ B  direct(A)
671        ├─╯
672        A
673
674        H  direct(C)
675676        │ D  direct(C)
677        ├─╯
678        C  missing(X)
679680        ~
681
682        G  direct(E)
683684        │ F  direct(E)
685        ├─╯
686        E  missing(Y)
687688        ~
689        "###);
690    }
691
692    #[test]
693    fn test_topo_grouped_fork_nested() {
694        fn sub_graph(
695            chars: impl IntoIterator<Item = char>,
696            root_edges: Vec<GraphEdge<char>>,
697        ) -> Vec<GraphNode<char>> {
698            let [b, c, d, e, f]: [char; 5] = chars.into_iter().collect_vec().try_into().unwrap();
699            vec![
700                (f, vec![direct(c)]),
701                (e, vec![direct(b)]),
702                (d, vec![direct(c)]),
703                (c, vec![direct(b)]),
704                (b, root_edges),
705            ]
706        }
707
708        // One nested fork sub graph from A
709        let graph = itertools::chain!(
710            vec![('G', vec![direct('A')])],
711            sub_graph('B'..='F', vec![direct('A')]),
712            vec![('A', vec![])],
713        )
714        .map(Ok)
715        .collect_vec();
716        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r###"
717        G  direct(A)
718719        │ F  direct(C)
720        │ │
721        │ │ E  direct(B)
722        │ │ │
723        │ │ │ D  direct(C)
724        │ ├───╯
725        │ C │  direct(B)
726        │ ├─╯
727        │ B  direct(A)
728        ├─╯
729        A
730
731        "###);
732        // A::F is picked at A, and A will be unblocked. Then, C::D at C, ...
733        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r###"
734        G  direct(A)
735736        │ F  direct(C)
737        │ │
738        │ │ D  direct(C)
739        │ ├─╯
740        │ C  direct(B)
741        │ │
742        │ │ E  direct(B)
743        │ ├─╯
744        │ B  direct(A)
745        ├─╯
746        A
747
748        "###);
749
750        // Two nested fork sub graphs from A
751        let graph = itertools::chain!(
752            vec![('L', vec![direct('A')])],
753            sub_graph('G'..='K', vec![direct('A')]),
754            sub_graph('B'..='F', vec![direct('A')]),
755            vec![('A', vec![])],
756        )
757        .map(Ok)
758        .collect_vec();
759        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r###"
760        L  direct(A)
761762        │ K  direct(H)
763        │ │
764        │ │ J  direct(G)
765        │ │ │
766        │ │ │ I  direct(H)
767        │ ├───╯
768        │ H │  direct(G)
769        │ ├─╯
770        │ G  direct(A)
771        ├─╯
772        │ F  direct(C)
773        │ │
774        │ │ E  direct(B)
775        │ │ │
776        │ │ │ D  direct(C)
777        │ ├───╯
778        │ C │  direct(B)
779        │ ├─╯
780        │ B  direct(A)
781        ├─╯
782        A
783
784        "###);
785        // A::K is picked at A, and A will be unblocked. Then, H::I at H, ...
786        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r###"
787        L  direct(A)
788789        │ K  direct(H)
790        │ │
791        │ │ I  direct(H)
792        │ ├─╯
793        │ H  direct(G)
794        │ │
795        │ │ J  direct(G)
796        │ ├─╯
797        │ G  direct(A)
798        ├─╯
799        │ F  direct(C)
800        │ │
801        │ │ D  direct(C)
802        │ ├─╯
803        │ C  direct(B)
804        │ │
805        │ │ E  direct(B)
806        │ ├─╯
807        │ B  direct(A)
808        ├─╯
809        A
810
811        "###);
812
813        // Two nested fork sub graphs from A, interleaved
814        let graph = itertools::chain!(
815            vec![('L', vec![direct('A')])],
816            sub_graph(['C', 'E', 'G', 'I', 'K'], vec![direct('A')]),
817            sub_graph(['B', 'D', 'F', 'H', 'J'], vec![direct('A')]),
818            vec![('A', vec![])],
819        )
820        .sorted_by(|(id1, _), (id2, _)| id2.cmp(id1))
821        .map(Ok)
822        .collect_vec();
823        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r###"
824        L  direct(A)
825826        │ K  direct(E)
827        │ │
828        │ │ J  direct(D)
829        │ │ │
830        │ │ │ I  direct(C)
831        │ │ │ │
832        │ │ │ │ H  direct(B)
833        │ │ │ │ │
834        │ │ │ │ │ G  direct(E)
835        │ ├───────╯
836        │ │ │ │ │ F  direct(D)
837        │ │ ├─────╯
838        │ E │ │ │  direct(C)
839        │ ├───╯ │
840        │ │ D   │  direct(B)
841        │ │ ├───╯
842        │ C │  direct(A)
843        ├─╯ │
844        │   B  direct(A)
845        ├───╯
846        A
847
848        "###);
849        // A::K is picked at A, and A will be unblocked. Then, E::G at E, ...
850        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r###"
851        L  direct(A)
852853        │ K  direct(E)
854        │ │
855        │ │ G  direct(E)
856        │ ├─╯
857        │ E  direct(C)
858        │ │
859        │ │ I  direct(C)
860        │ ├─╯
861        │ C  direct(A)
862        ├─╯
863        │ J  direct(D)
864        │ │
865        │ │ F  direct(D)
866        │ ├─╯
867        │ D  direct(B)
868        │ │
869        │ │ H  direct(B)
870        │ ├─╯
871        │ B  direct(A)
872        ├─╯
873        A
874
875        "###);
876
877        // Merged fork sub graphs at K
878        let graph = itertools::chain!(
879            vec![('K', vec![direct('E'), direct('J')])],
880            sub_graph('F'..='J', vec![missing('Y')]),
881            sub_graph('A'..='E', vec![missing('X')]),
882        )
883        .map(Ok)
884        .collect_vec();
885        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r###"
886        K    direct(E), direct(J)
887        ├─╮
888        │ J  direct(G)
889        │ │
890        │ │ I  direct(F)
891        │ │ │
892        │ │ │ H  direct(G)
893        │ ├───╯
894        │ G │  direct(F)
895        │ ├─╯
896        │ F  missing(Y)
897        │ │
898        │ ~
899900        E  direct(B)
901902        │ D  direct(A)
903        │ │
904        │ │ C  direct(B)
905        ├───╯
906        B │  direct(A)
907        ├─╯
908        A  missing(X)
909910        ~
911        "###);
912        // K-E,J is resolved without queuing new heads. Then, G::H, F::I, B::C, and
913        // A::D.
914        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r###"
915        K    direct(E), direct(J)
916        ├─╮
917        │ J  direct(G)
918        │ │
919        E │  direct(B)
920        │ │
921        │ │ H  direct(G)
922        │ ├─╯
923        │ G  direct(F)
924        │ │
925        │ │ I  direct(F)
926        │ ├─╯
927        │ F  missing(Y)
928        │ │
929        │ ~
930931        │ C  direct(B)
932        ├─╯
933        B  direct(A)
934935        │ D  direct(A)
936        ├─╯
937        A  missing(X)
938939        ~
940        "###);
941
942        // Merged fork sub graphs at K, interleaved
943        let graph = itertools::chain!(
944            vec![('K', vec![direct('I'), direct('J')])],
945            sub_graph(['B', 'D', 'F', 'H', 'J'], vec![missing('Y')]),
946            sub_graph(['A', 'C', 'E', 'G', 'I'], vec![missing('X')]),
947        )
948        .sorted_by(|(id1, _), (id2, _)| id2.cmp(id1))
949        .map(Ok)
950        .collect_vec();
951        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r###"
952        K    direct(I), direct(J)
953        ├─╮
954        │ J  direct(D)
955        │ │
956        I │  direct(C)
957        │ │
958        │ │ H  direct(B)
959        │ │ │
960        │ │ │ G  direct(A)
961        │ │ │ │
962        │ │ │ │ F  direct(D)
963        │ ├─────╯
964        │ │ │ │ E  direct(C)
965        ├───────╯
966        │ D │ │  direct(B)
967        │ ├─╯ │
968        C │   │  direct(A)
969        ├─────╯
970        │ B  missing(Y)
971        │ │
972        │ ~
973974        A  missing(X)
975976        ~
977        "###);
978        // K-I,J is resolved without queuing new heads. Then, D::F, B::H, C::E, and
979        // A::G.
980        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r###"
981        K    direct(I), direct(J)
982        ├─╮
983        │ J  direct(D)
984        │ │
985        I │  direct(C)
986        │ │
987        │ │ F  direct(D)
988        │ ├─╯
989        │ D  direct(B)
990        │ │
991        │ │ H  direct(B)
992        │ ├─╯
993        │ B  missing(Y)
994        │ │
995        │ ~
996997        │ E  direct(C)
998        ├─╯
999        C  direct(A)
10001001        │ G  direct(A)
1002        ├─╯
1003        A  missing(X)
10041005        ~
1006        "###);
1007    }
1008
1009    #[test]
1010    fn test_topo_grouped_merge_interleaved() {
1011        let graph = [
1012            ('F', vec![direct('E')]),
1013            ('E', vec![direct('C'), direct('D')]),
1014            ('D', vec![direct('B')]),
1015            ('C', vec![direct('A')]),
1016            ('B', vec![direct('A')]),
1017            ('A', vec![]),
1018        ]
1019        .map(Ok);
1020        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r###"
1021        F  direct(E)
10221023        E    direct(C), direct(D)
1024        ├─╮
1025        │ D  direct(B)
1026        │ │
1027        C │  direct(A)
1028        │ │
1029        │ B  direct(A)
1030        ├─╯
1031        A
1032
1033        "###);
1034        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r###"
1035        F  direct(E)
10361037        E    direct(C), direct(D)
1038        ├─╮
1039        │ D  direct(B)
1040        │ │
1041        │ B  direct(A)
1042        │ │
1043        C │  direct(A)
1044        ├─╯
1045        A
1046
1047        "###);
1048
1049        // F, E, and D can be lazy, then C will be queued, then B.
1050        let mut iter = topo_grouped(graph.iter().cloned().peekable());
1051        assert_eq!(iter.next().unwrap().unwrap().0, 'F');
1052        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'E');
1053        assert_eq!(iter.next().unwrap().unwrap().0, 'E');
1054        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'D');
1055        assert_eq!(iter.next().unwrap().unwrap().0, 'D');
1056        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'C');
1057        assert_eq!(iter.next().unwrap().unwrap().0, 'B');
1058        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'A');
1059    }
1060
1061    #[test]
1062    fn test_topo_grouped_merge_but_missing() {
1063        let graph = [
1064            ('E', vec![direct('D')]),
1065            ('D', vec![missing('Y'), direct('C')]),
1066            ('C', vec![direct('B'), missing('X')]),
1067            ('B', vec![direct('A')]),
1068            ('A', vec![]),
1069        ]
1070        .map(Ok);
1071        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r###"
1072        E  direct(D)
10731074        D    missing(Y), direct(C)
1075        ├─╮
1076        │ │
1077        ~ │
10781079          C  direct(B), missing(X)
1080        ╭─┤
1081        │ │
1082        ~ │
10831084          B  direct(A)
10851086          A
1087
1088        "###);
1089        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r###"
1090        E  direct(D)
10911092        D    missing(Y), direct(C)
1093        ├─╮
1094        │ │
1095        ~ │
10961097          C  direct(B), missing(X)
1098        ╭─┤
1099        │ │
1100        ~ │
11011102          B  direct(A)
11031104          A
1105
1106        "###);
1107
1108        // All nodes can be lazily emitted.
1109        let mut iter = topo_grouped(graph.iter().cloned().peekable());
1110        assert_eq!(iter.next().unwrap().unwrap().0, 'E');
1111        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'D');
1112        assert_eq!(iter.next().unwrap().unwrap().0, 'D');
1113        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'C');
1114        assert_eq!(iter.next().unwrap().unwrap().0, 'C');
1115        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'B');
1116        assert_eq!(iter.next().unwrap().unwrap().0, 'B');
1117        assert_eq!(iter.input_iter.peek().unwrap().as_ref().unwrap().0, 'A');
1118    }
1119
1120    #[test]
1121    fn test_topo_grouped_merge_criss_cross() {
1122        let graph = [
1123            ('G', vec![direct('E')]),
1124            ('F', vec![direct('D')]),
1125            ('E', vec![direct('B'), direct('C')]),
1126            ('D', vec![direct('B'), direct('C')]),
1127            ('C', vec![direct('A')]),
1128            ('B', vec![direct('A')]),
1129            ('A', vec![]),
1130        ]
1131        .map(Ok);
1132        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r###"
1133        G  direct(E)
11341135        │ F  direct(D)
1136        │ │
1137        E │    direct(B), direct(C)
1138        ├───╮
1139        │ D │  direct(B), direct(C)
1140        ╭─┴─╮
1141        │   C  direct(A)
1142        │   │
1143        B   │  direct(A)
1144        ├───╯
1145        A
1146
1147        "###);
1148        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r###"
1149        G  direct(E)
11501151        E    direct(B), direct(C)
1152        ├─╮
1153        │ │ F  direct(D)
1154        │ │ │
1155        │ │ D  direct(B), direct(C)
1156        ╭─┬─╯
1157        │ C  direct(A)
1158        │ │
1159        B │  direct(A)
1160        ├─╯
1161        A
1162
1163        "###);
1164    }
1165
1166    #[test]
1167    fn test_topo_grouped_merge_descendants_interleaved() {
1168        let graph = [
1169            ('H', vec![direct('F')]),
1170            ('G', vec![direct('E')]),
1171            ('F', vec![direct('D')]),
1172            ('E', vec![direct('C')]),
1173            ('D', vec![direct('C'), direct('B')]),
1174            ('C', vec![direct('A')]),
1175            ('B', vec![direct('A')]),
1176            ('A', vec![]),
1177        ]
1178        .map(Ok);
1179        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r###"
1180        H  direct(F)
11811182        │ G  direct(E)
1183        │ │
1184        F │  direct(D)
1185        │ │
1186        │ E  direct(C)
1187        │ │
1188        D │  direct(C), direct(B)
1189        ├─╮
1190        │ C  direct(A)
1191        │ │
1192        B │  direct(A)
1193        ├─╯
1194        A
1195
1196        "###);
1197        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r###"
1198        H  direct(F)
11991200        F  direct(D)
12011202        D    direct(C), direct(B)
1203        ├─╮
1204        │ B  direct(A)
1205        │ │
1206        │ │ G  direct(E)
1207        │ │ │
1208        │ │ E  direct(C)
1209        ├───╯
1210        C │  direct(A)
1211        ├─╯
1212        A
1213
1214        "###);
1215    }
1216
1217    #[test]
1218    fn test_topo_grouped_merge_multiple_roots() {
1219        let graph = [
1220            ('D', vec![direct('C')]),
1221            ('C', vec![direct('B'), direct('A')]),
1222            ('B', vec![missing('X')]),
1223            ('A', vec![]),
1224        ]
1225        .map(Ok);
1226        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r###"
1227        D  direct(C)
12281229        C    direct(B), direct(A)
1230        ├─╮
1231        B │  missing(X)
1232        │ │
1233        ~ │
12341235          A
1236
1237        "###);
1238        // A is emitted first because it's the second parent.
1239        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r###"
1240        D  direct(C)
12411242        C    direct(B), direct(A)
1243        ├─╮
1244        │ A
12451246        B  missing(X)
12471248        ~
1249        "###);
1250    }
1251
1252    #[test]
1253    fn test_topo_grouped_merge_stairs() {
1254        let graph = [
1255            // Merge topic branches one by one:
1256            ('J', vec![direct('I'), direct('G')]),
1257            ('I', vec![direct('H'), direct('E')]),
1258            ('H', vec![direct('D'), direct('F')]),
1259            // Topic branches:
1260            ('G', vec![direct('D')]),
1261            ('F', vec![direct('C')]),
1262            ('E', vec![direct('B')]),
1263            // Base nodes:
1264            ('D', vec![direct('C')]),
1265            ('C', vec![direct('B')]),
1266            ('B', vec![direct('A')]),
1267            ('A', vec![]),
1268        ]
1269        .map(Ok);
1270        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r###"
1271        J    direct(I), direct(G)
1272        ├─╮
1273        I │    direct(H), direct(E)
1274        ├───╮
1275        H │ │    direct(D), direct(F)
1276        ├─────╮
1277        │ G │ │  direct(D)
1278        ├─╯ │ │
1279        │   │ F  direct(C)
1280        │   │ │
1281        │   E │  direct(B)
1282        │   │ │
1283        D   │ │  direct(C)
1284        ├─────╯
1285        C   │  direct(B)
1286        ├───╯
1287        B  direct(A)
12881289        A
1290
1291        "###);
1292        // Second branches are visited first.
1293        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r###"
1294        J    direct(I), direct(G)
1295        ├─╮
1296        │ G  direct(D)
1297        │ │
1298        I │    direct(H), direct(E)
1299        ├───╮
1300        │ │ E  direct(B)
1301        │ │ │
1302        H │ │  direct(D), direct(F)
1303        ├─╮ │
1304        F │ │  direct(C)
1305        │ │ │
1306        │ D │  direct(C)
1307        ├─╯ │
1308        C   │  direct(B)
1309        ├───╯
1310        B  direct(A)
13111312        A
1313
1314        "###);
1315    }
1316
1317    #[test]
1318    fn test_topo_grouped_merge_and_fork() {
1319        let graph = [
1320            ('J', vec![direct('F')]),
1321            ('I', vec![direct('E')]),
1322            ('H', vec![direct('G')]),
1323            ('G', vec![direct('D'), direct('E')]),
1324            ('F', vec![direct('C')]),
1325            ('E', vec![direct('B')]),
1326            ('D', vec![direct('B')]),
1327            ('C', vec![direct('A')]),
1328            ('B', vec![direct('A')]),
1329            ('A', vec![]),
1330        ]
1331        .map(Ok);
1332        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r###"
1333        J  direct(F)
13341335        │ I  direct(E)
1336        │ │
1337        │ │ H  direct(G)
1338        │ │ │
1339        │ │ G  direct(D), direct(E)
1340        │ ╭─┤
1341        F │ │  direct(C)
1342        │ │ │
1343        │ E │  direct(B)
1344        │ │ │
1345        │ │ D  direct(B)
1346        │ ├─╯
1347        C │  direct(A)
1348        │ │
1349        │ B  direct(A)
1350        ├─╯
1351        A
1352
1353        "###);
1354        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r###"
1355        J  direct(F)
13561357        F  direct(C)
13581359        C  direct(A)
13601361        │ I  direct(E)
1362        │ │
1363        │ │ H  direct(G)
1364        │ │ │
1365        │ │ G  direct(D), direct(E)
1366        │ ╭─┤
1367        │ E │  direct(B)
1368        │ │ │
1369        │ │ D  direct(B)
1370        │ ├─╯
1371        │ B  direct(A)
1372        ├─╯
1373        A
1374
1375        "###);
1376    }
1377
1378    #[test]
1379    fn test_topo_grouped_merge_and_fork_multiple_roots() {
1380        let graph = [
1381            ('J', vec![direct('F')]),
1382            ('I', vec![direct('G')]),
1383            ('H', vec![direct('E')]),
1384            ('G', vec![direct('E'), direct('B')]),
1385            ('F', vec![direct('D')]),
1386            ('E', vec![direct('C')]),
1387            ('D', vec![direct('A')]),
1388            ('C', vec![direct('A')]),
1389            ('B', vec![missing('X')]),
1390            ('A', vec![]),
1391        ]
1392        .map(Ok);
1393        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r###"
1394        J  direct(F)
13951396        │ I  direct(G)
1397        │ │
1398        │ │ H  direct(E)
1399        │ │ │
1400        │ G │  direct(E), direct(B)
1401        │ ├─╮
1402        F │ │  direct(D)
1403        │ │ │
1404        │ │ E  direct(C)
1405        │ │ │
1406        D │ │  direct(A)
1407        │ │ │
1408        │ │ C  direct(A)
1409        ├───╯
1410        │ B  missing(X)
1411        │ │
1412        │ ~
14131414        A
1415
1416        "###);
1417        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r###"
1418        J  direct(F)
14191420        F  direct(D)
14211422        D  direct(A)
14231424        │ I  direct(G)
1425        │ │
1426        │ G    direct(E), direct(B)
1427        │ ├─╮
1428        │ │ B  missing(X)
1429        │ │ │
1430        │ │ ~
1431        │ │
1432        │ │ H  direct(E)
1433        │ ├─╯
1434        │ E  direct(C)
1435        │ │
1436        │ C  direct(A)
1437        ├─╯
1438        A
1439
1440        "###);
1441    }
1442
1443    #[test]
1444    fn test_topo_grouped_parallel_interleaved() {
1445        let graph = [
1446            ('E', vec![direct('C')]),
1447            ('D', vec![direct('B')]),
1448            ('C', vec![direct('A')]),
1449            ('B', vec![missing('X')]),
1450            ('A', vec![]),
1451        ]
1452        .map(Ok);
1453        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r###"
1454        E  direct(C)
14551456        │ D  direct(B)
1457        │ │
1458        C │  direct(A)
1459        │ │
1460        │ B  missing(X)
1461        │ │
1462        │ ~
14631464        A
1465
1466        "###);
1467        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r###"
1468        E  direct(C)
14691470        C  direct(A)
14711472        A
1473
1474        D  direct(B)
14751476        B  missing(X)
14771478        ~
1479        "###);
1480    }
1481
1482    #[test]
1483    fn test_topo_grouped_multiple_child_dependencies() {
1484        let graph = [
1485            ('I', vec![direct('H'), direct('G')]),
1486            ('H', vec![direct('D')]),
1487            ('G', vec![direct('B')]),
1488            ('F', vec![direct('E'), direct('C')]),
1489            ('E', vec![direct('D')]),
1490            ('D', vec![direct('B')]),
1491            ('C', vec![direct('B')]),
1492            ('B', vec![direct('A')]),
1493            ('A', vec![]),
1494        ]
1495        .map(Ok);
1496        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r###"
1497        I    direct(H), direct(G)
1498        ├─╮
1499        H │  direct(D)
1500        │ │
1501        │ G  direct(B)
1502        │ │
1503        │ │ F    direct(E), direct(C)
1504        │ │ ├─╮
1505        │ │ E │  direct(D)
1506        ├───╯ │
1507        D │   │  direct(B)
1508        ├─╯   │
1509        │     C  direct(B)
1510        ├─────╯
1511        B  direct(A)
15121513        A
1514
1515        "###);
1516        // Topological order must be preserved. Depending on the implementation,
1517        // E might be requested more than once by paths D->E and B->D->E.
1518        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r###"
1519        I    direct(H), direct(G)
1520        ├─╮
1521        │ G  direct(B)
1522        │ │
1523        H │  direct(D)
1524        │ │
1525        │ │ F    direct(E), direct(C)
1526        │ │ ├─╮
1527        │ │ │ C  direct(B)
1528        │ ├───╯
1529        │ │ E  direct(D)
1530        ├───╯
1531        D │  direct(B)
1532        ├─╯
1533        B  direct(A)
15341535        A
1536
1537        "###);
1538    }
1539
1540    #[test]
1541    fn test_topo_grouped_prioritized_branches_trivial_fork() {
1542        // The same setup as test_topo_grouped_trivial_fork()
1543        let graph = [
1544            ('E', vec![direct('B')]),
1545            ('D', vec![direct('A')]),
1546            ('C', vec![direct('B')]),
1547            ('B', vec![direct('A')]),
1548            ('A', vec![]),
1549        ]
1550        .map(Ok);
1551        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r"
1552        E  direct(B)
15531554        │ D  direct(A)
1555        │ │
1556        │ │ C  direct(B)
1557        ├───╯
1558        B │  direct(A)
1559        ├─╯
1560        A
1561        ");
1562
1563        // Emit the branch C first
1564        let mut iter = topo_grouped(graph.iter().cloned());
1565        iter.prioritize_branch('C');
1566        insta::assert_snapshot!(format_graph(iter), @r"
1567        C  direct(B)
15681569        │ E  direct(B)
1570        ├─╯
1571        B  direct(A)
15721573        │ D  direct(A)
1574        ├─╯
1575        A
1576        ");
1577
1578        // Emit the branch D first
1579        let mut iter = topo_grouped(graph.iter().cloned());
1580        iter.prioritize_branch('D');
1581        insta::assert_snapshot!(format_graph(iter), @r"
1582        D  direct(A)
15831584        │ E  direct(B)
1585        │ │
1586        │ │ C  direct(B)
1587        │ ├─╯
1588        │ B  direct(A)
1589        ├─╯
1590        A
1591        ");
1592
1593        // Emit the branch C first, then D. E is emitted earlier than D because
1594        // E belongs to the branch C compared to the branch D.
1595        let mut iter = topo_grouped(graph.iter().cloned());
1596        iter.prioritize_branch('C');
1597        iter.prioritize_branch('D');
1598        insta::assert_snapshot!(format_graph(iter), @r"
1599        C  direct(B)
16001601        │ E  direct(B)
1602        ├─╯
1603        B  direct(A)
16041605        │ D  direct(A)
1606        ├─╯
1607        A
1608        ");
1609
1610        // Non-head node can be prioritized
1611        let mut iter = topo_grouped(graph.iter().cloned());
1612        iter.prioritize_branch('B');
1613        insta::assert_snapshot!(format_graph(iter), @r"
1614        E  direct(B)
16151616        │ C  direct(B)
1617        ├─╯
1618        B  direct(A)
16191620        │ D  direct(A)
1621        ├─╯
1622        A
1623        ");
1624
1625        // Root node can be prioritized
1626        let mut iter = topo_grouped(graph.iter().cloned());
1627        iter.prioritize_branch('A');
1628        insta::assert_snapshot!(format_graph(iter), @r"
1629        D  direct(A)
16301631        │ E  direct(B)
1632        │ │
1633        │ │ C  direct(B)
1634        │ ├─╯
1635        │ B  direct(A)
1636        ├─╯
1637        A
1638        ");
1639    }
1640
1641    #[test]
1642    fn test_topo_grouped_prioritized_branches_fork_multiple_heads() {
1643        // The same setup as test_topo_grouped_fork_multiple_heads()
1644        let graph = [
1645            ('I', vec![direct('E')]),
1646            ('H', vec![direct('C')]),
1647            ('G', vec![direct('A')]),
1648            ('F', vec![direct('E')]),
1649            ('E', vec![direct('C')]),
1650            ('D', vec![direct('C')]),
1651            ('C', vec![direct('A')]),
1652            ('B', vec![direct('A')]),
1653            ('A', vec![]),
1654        ]
1655        .map(Ok);
1656        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r"
1657        I  direct(E)
16581659        │ H  direct(C)
1660        │ │
1661        │ │ G  direct(A)
1662        │ │ │
1663        │ │ │ F  direct(E)
1664        ├─────╯
1665        E │ │  direct(C)
1666        ├─╯ │
1667        │ D │  direct(C)
1668        ├─╯ │
1669        C   │  direct(A)
1670        ├───╯
1671        │ B  direct(A)
1672        ├─╯
1673        A
1674        ");
1675
1676        // Emit B, G, then remainders
1677        let mut iter = topo_grouped(graph.iter().cloned());
1678        iter.prioritize_branch('B');
1679        iter.prioritize_branch('G');
1680        insta::assert_snapshot!(format_graph(iter), @r"
1681        B  direct(A)
16821683        │ G  direct(A)
1684        ├─╯
1685        │ I  direct(E)
1686        │ │
1687        │ │ F  direct(E)
1688        │ ├─╯
1689        │ E  direct(C)
1690        │ │
1691        │ │ H  direct(C)
1692        │ ├─╯
1693        │ │ D  direct(C)
1694        │ ├─╯
1695        │ C  direct(A)
1696        ├─╯
1697        A
1698        ");
1699
1700        // Emit D, H, then descendants of C. The order of B and G is not
1701        // respected because G can be found earlier through C->A->G. At this
1702        // point, B is not populated yet, so A is blocked only by {G}. This is
1703        // a limitation of the current node reordering logic.
1704        let mut iter = topo_grouped(graph.iter().cloned());
1705        iter.prioritize_branch('D');
1706        iter.prioritize_branch('H');
1707        iter.prioritize_branch('B');
1708        iter.prioritize_branch('G');
1709        insta::assert_snapshot!(format_graph(iter), @r"
1710        D  direct(C)
17111712        │ H  direct(C)
1713        ├─╯
1714        │ I  direct(E)
1715        │ │
1716        │ │ F  direct(E)
1717        │ ├─╯
1718        │ E  direct(C)
1719        ├─╯
1720        C  direct(A)
17211722        │ G  direct(A)
1723        ├─╯
1724        │ B  direct(A)
1725        ├─╯
1726        A
1727        ");
1728    }
1729
1730    #[test]
1731    fn test_topo_grouped_prioritized_branches_fork_parallel() {
1732        // The same setup as test_topo_grouped_fork_parallel()
1733        let graph = [
1734            // Pull all sub graphs in reverse order:
1735            ('I', vec![direct('A')]),
1736            ('H', vec![direct('C')]),
1737            ('G', vec![direct('E')]),
1738            // Orphan sub graph G,F-E:
1739            ('F', vec![direct('E')]),
1740            ('E', vec![missing('Y')]),
1741            // Orphan sub graph H,D-C:
1742            ('D', vec![direct('C')]),
1743            ('C', vec![missing('X')]),
1744            // Orphan sub graph I,B-A:
1745            ('B', vec![direct('A')]),
1746            ('A', vec![]),
1747        ]
1748        .map(Ok);
1749        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r"
1750        I  direct(A)
17511752        │ H  direct(C)
1753        │ │
1754        │ │ G  direct(E)
1755        │ │ │
1756        │ │ │ F  direct(E)
1757        │ │ ├─╯
1758        │ │ E  missing(Y)
1759        │ │ │
1760        │ │ ~
1761        │ │
1762        │ │ D  direct(C)
1763        │ ├─╯
1764        │ C  missing(X)
1765        │ │
1766        │ ~
17671768        │ B  direct(A)
1769        ├─╯
1770        A
1771        ");
1772
1773        // Emit the sub graph G first
1774        let mut iter = topo_grouped(graph.iter().cloned());
1775        iter.prioritize_branch('G');
1776        insta::assert_snapshot!(format_graph(iter), @r"
1777        G  direct(E)
17781779        │ F  direct(E)
1780        ├─╯
1781        E  missing(Y)
17821783        ~
1784
1785        I  direct(A)
17861787        │ B  direct(A)
1788        ├─╯
1789        A
1790
1791        H  direct(C)
17921793        │ D  direct(C)
1794        ├─╯
1795        C  missing(X)
17961797        ~
1798        ");
1799
1800        // Emit sub graphs in reverse order by selecting roots
1801        let mut iter = topo_grouped(graph.iter().cloned());
1802        iter.prioritize_branch('E');
1803        iter.prioritize_branch('C');
1804        iter.prioritize_branch('A');
1805        insta::assert_snapshot!(format_graph(iter), @r"
1806        G  direct(E)
18071808        │ F  direct(E)
1809        ├─╯
1810        E  missing(Y)
18111812        ~
1813
1814        H  direct(C)
18151816        │ D  direct(C)
1817        ├─╯
1818        C  missing(X)
18191820        ~
1821
1822        I  direct(A)
18231824        │ B  direct(A)
1825        ├─╯
1826        A
1827        ");
1828    }
1829
1830    #[test]
1831    fn test_topo_grouped_requeue_unpopulated() {
1832        let graph = [
1833            ('C', vec![direct('A'), direct('B')]),
1834            ('B', vec![direct('A')]),
1835            ('A', vec![]),
1836        ]
1837        .map(Ok);
1838        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r###"
1839        C    direct(A), direct(B)
1840        ├─╮
1841        │ B  direct(A)
1842        ├─╯
1843        A
1844
1845        "###);
1846        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r###"
1847        C    direct(A), direct(B)
1848        ├─╮
1849        │ B  direct(A)
1850        ├─╯
1851        A
1852
1853        "###);
1854
1855        // A is queued once by C-A because B isn't populated at this point. Since
1856        // B is the second parent, B-A is processed next and A is queued again. So
1857        // one of them in the queue has to be ignored.
1858        let mut iter = topo_grouped(graph.iter().cloned());
1859        assert_eq!(iter.next().unwrap().unwrap().0, 'C');
1860        assert_eq!(iter.emittable_ids, vec!['A', 'B']);
1861        assert_eq!(iter.next().unwrap().unwrap().0, 'B');
1862        assert_eq!(iter.emittable_ids, vec!['A', 'A']);
1863        assert_eq!(iter.next().unwrap().unwrap().0, 'A');
1864        assert!(iter.next().is_none());
1865        assert!(iter.emittable_ids.is_empty());
1866    }
1867
1868    #[test]
1869    fn test_topo_grouped_duplicated_edges() {
1870        // The graph shouldn't have duplicated parent->child edges, but topo-grouped
1871        // iterator can handle it anyway.
1872        let graph = [('B', vec![direct('A'), direct('A')]), ('A', vec![])].map(Ok);
1873        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @r###"
1874        B  direct(A), direct(A)
18751876        A
1877
1878        "###);
1879        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @r###"
1880        B  direct(A), direct(A)
18811882        A
1883
1884        "###);
1885
1886        let mut iter = topo_grouped(graph.iter().cloned());
1887        assert_eq!(iter.next().unwrap().unwrap().0, 'B');
1888        assert_eq!(iter.emittable_ids, vec!['A', 'A']);
1889        assert_eq!(iter.next().unwrap().unwrap().0, 'A');
1890        assert!(iter.next().is_none());
1891        assert!(iter.emittable_ids.is_empty());
1892    }
1893}