Skip to main content

jj_lib/
graph.rs

1// Copyright 2021-2023 The Jujutsu Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![expect(missing_docs)]
16
17use std::collections::HashMap;
18use std::collections::HashSet;
19use std::collections::VecDeque;
20use std::hash::Hash;
21
22use futures::Stream;
23use futures::TryStreamExt as _;
24use futures::stream;
25
26/// Node and edges pair of type `N` and `ID` respectively.
27///
28/// `ID` uniquely identifies a node within the graph. It's usually cheap to
29/// clone. There should be a pure `(&N) -> &ID` function.
30pub type GraphNode<N, ID = N> = (N, Vec<GraphEdge<ID>>);
31
32#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
33pub struct GraphEdge<N> {
34    pub target: N,
35    pub edge_type: GraphEdgeType,
36}
37
38impl<N> GraphEdge<N> {
39    pub fn missing(target: N) -> Self {
40        Self {
41            target,
42            edge_type: GraphEdgeType::Missing,
43        }
44    }
45
46    pub fn direct(target: N) -> Self {
47        Self {
48            target,
49            edge_type: GraphEdgeType::Direct,
50        }
51    }
52
53    pub fn indirect(target: N) -> Self {
54        Self {
55            target,
56            edge_type: GraphEdgeType::Indirect,
57        }
58    }
59
60    pub fn map<M>(self, f: impl FnOnce(N) -> M) -> GraphEdge<M> {
61        GraphEdge {
62            target: f(self.target),
63            edge_type: self.edge_type,
64        }
65    }
66
67    pub fn is_missing(&self) -> bool {
68        self.edge_type == GraphEdgeType::Missing
69    }
70
71    pub fn is_direct(&self) -> bool {
72        self.edge_type == GraphEdgeType::Direct
73    }
74
75    pub fn is_indirect(&self) -> bool {
76        self.edge_type == GraphEdgeType::Indirect
77    }
78}
79
80#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
81pub enum GraphEdgeType {
82    Missing,
83    Direct,
84    Indirect,
85}
86
87fn reachable_targets<N>(edges: &[GraphEdge<N>]) -> impl DoubleEndedIterator<Item = &N> {
88    edges
89        .iter()
90        .filter(|edge| !edge.is_missing())
91        .map(|edge| &edge.target)
92}
93
94/// Creates new graph in which nodes and edges are reversed.
95pub fn reverse_graph<N, ID: Clone + Eq + Hash, E>(
96    input: impl Iterator<Item = Result<GraphNode<N, ID>, E>>,
97    as_id: impl Fn(&N) -> &ID,
98) -> Result<Vec<GraphNode<N, ID>>, E> {
99    let mut entries = vec![];
100    let mut reverse_edges: HashMap<ID, Vec<GraphEdge<ID>>> = HashMap::new();
101    for item in input {
102        let (node, edges) = item?;
103        for GraphEdge { target, edge_type } in edges {
104            reverse_edges.entry(target).or_default().push(GraphEdge {
105                target: as_id(&node).clone(),
106                edge_type,
107            });
108        }
109        entries.push(node);
110    }
111
112    let mut items: Vec<(N, Vec<GraphEdge<ID>>)> = vec![];
113    for node in entries.into_iter().rev() {
114        let edges = reverse_edges.remove(as_id(&node)).unwrap_or_default();
115        items.push((node, edges));
116    }
117    Ok(items)
118}
119
120/// Graph iterator adapter builder to group topological branches.
121///
122/// Basic idea is DFS from the heads. At fork point, the other descendant
123/// branches will be visited. At merge point, the second (or the last) ancestor
124/// branch will be visited first. This is practically [the same as Git][Git].
125///
126/// If no branches are prioritized, the branch containing the first commit in
127/// the input iterator will be emitted first. It is often the working-copy
128/// ancestor branch. The other head branches won't be enqueued eagerly, and will
129/// be emitted as late as possible.
130///
131/// [Git]: https://github.blog/2022-08-30-gits-database-internals-ii-commit-history-queries/#topological-sorting
132#[derive(Clone, Debug)]
133pub struct TopoGroupedGraph<N, ID, S, F> {
134    input_stream: S,
135    as_id: F,
136    /// Graph nodes read from the input iterator but not yet emitted.
137    nodes: HashMap<ID, TopoGroupedGraphNode<N, ID>>,
138    /// Stack of graph nodes to be emitted.
139    emittable_ids: Vec<ID>,
140    /// List of new head nodes found while processing unpopulated nodes, or
141    /// prioritized branch nodes added by caller.
142    new_head_ids: VecDeque<ID>,
143    /// Set of nodes which may be ancestors of `new_head_ids`.
144    blocked_ids: HashSet<ID>,
145}
146
147#[derive(Clone, Debug)]
148struct TopoGroupedGraphNode<N, ID> {
149    /// Graph nodes which must be emitted before.
150    child_ids: HashSet<ID>,
151    /// Graph node data and edges to parent nodes. `None` until this node is
152    /// populated.
153    item: Option<GraphNode<N, ID>>,
154}
155
156impl<N, ID> Default for TopoGroupedGraphNode<N, ID> {
157    fn default() -> Self {
158        Self {
159            child_ids: Default::default(),
160            item: None,
161        }
162    }
163}
164
165impl<N, ID, E, S, F> TopoGroupedGraph<N, ID, S, F>
166where
167    ID: Clone + Hash + Eq + Unpin,
168    S: Stream<Item = Result<GraphNode<N, ID>, E>> + Unpin,
169    F: Fn(&N) -> &ID + Unpin,
170{
171    /// Wraps the given iterator to group topological branches. The input
172    /// iterator must be topologically ordered.
173    pub fn new(input_stream: S, as_id: F) -> Self {
174        Self {
175            input_stream,
176            as_id,
177            nodes: HashMap::new(),
178            emittable_ids: Vec::new(),
179            new_head_ids: VecDeque::new(),
180            blocked_ids: HashSet::new(),
181        }
182    }
183
184    /// Makes the branch containing the specified node be emitted earlier than
185    /// the others.
186    ///
187    /// The `id` usually points to a head node, but this isn't a requirement.
188    /// If the specified node isn't a head, all preceding nodes will be queued.
189    ///
190    /// The specified node must exist in the input iterator. If it didn't, the
191    /// iterator would panic.
192    pub fn prioritize_branch(&mut self, id: ID) {
193        // Mark existence of unpopulated node
194        self.nodes.entry(id.clone()).or_default();
195        // Push to non-emitting list so the prioritized heads wouldn't be
196        // interleaved
197        self.new_head_ids.push_back(id);
198    }
199
200    async fn populate_one(&mut self) -> Result<Option<()>, E> {
201        let Some(item) = self.input_stream.try_next().await? else {
202            return Ok(None);
203        };
204        let (data, edges) = &item;
205        let current_id = (self.as_id)(data);
206
207        // Set up reverse reference
208        for parent_id in reachable_targets(edges) {
209            let parent_node = self.nodes.entry(parent_id.clone()).or_default();
210            parent_node.child_ids.insert(current_id.clone());
211        }
212
213        // Populate the current node
214        if let Some(current_node) = self.nodes.get_mut(current_id) {
215            assert!(current_node.item.is_none());
216            current_node.item = Some(item);
217        } else {
218            let current_id = current_id.clone();
219            let current_node = TopoGroupedGraphNode {
220                item: Some(item),
221                ..Default::default()
222            };
223            self.nodes.insert(current_id.clone(), current_node);
224            // Push to non-emitting list so the new head wouldn't be interleaved
225            self.new_head_ids.push_back(current_id);
226        }
227
228        Ok(Some(()))
229    }
230
231    /// Enqueues the first new head which will unblock the waiting ancestors.
232    ///
233    /// This does not move multiple head nodes to the queue at once because
234    /// heads may be connected to the fork points in arbitrary order.
235    fn flush_new_head(&mut self) {
236        assert!(!self.new_head_ids.is_empty());
237        if self.blocked_ids.is_empty() || self.new_head_ids.len() <= 1 {
238            // Fast path: orphaned or no choice
239            let new_head_id = self.new_head_ids.pop_front().unwrap();
240            self.emittable_ids.push(new_head_id);
241            self.blocked_ids.clear();
242            return;
243        }
244
245        // Mark descendant nodes reachable from the blocking nodes
246        let mut to_visit: Vec<&ID> = self
247            .blocked_ids
248            .iter()
249            .map(|id| {
250                // Borrow from self.nodes so self.blocked_ids can be mutated later
251                let (id, _) = self.nodes.get_key_value(id).unwrap();
252                id
253            })
254            .collect();
255        let mut visited: HashSet<&ID> = to_visit.iter().copied().collect();
256        while let Some(id) = to_visit.pop() {
257            let node = self.nodes.get(id).unwrap();
258            to_visit.extend(node.child_ids.iter().filter(|id| visited.insert(id)));
259        }
260
261        // Pick the first reachable head
262        let index = self
263            .new_head_ids
264            .iter()
265            .position(|id| visited.contains(id))
266            .expect("blocking head should exist");
267        let new_head_id = self.new_head_ids.remove(index).unwrap();
268
269        // Unmark ancestors of the selected head so they won't contribute to future
270        // new-head resolution within the newly-unblocked sub graph. The sub graph
271        // can have many fork points, and the corresponding heads should be picked in
272        // the fork-point order, not in the head appearance order.
273        to_visit.push(&new_head_id);
274        visited.remove(&new_head_id);
275        while let Some(id) = to_visit.pop() {
276            let node = self.nodes.get(id).unwrap();
277            if let Some((_, edges)) = &node.item {
278                to_visit.extend(reachable_targets(edges).filter(|id| visited.remove(id)));
279            }
280        }
281        self.blocked_ids.retain(|id| visited.contains(id));
282        self.emittable_ids.push(new_head_id);
283    }
284
285    async fn next_node(&mut self) -> Result<Option<GraphNode<N, ID>>, E> {
286        // Based on Kahn's algorithm
287        loop {
288            if let Some(current_id) = self.emittable_ids.last() {
289                let Some(current_node) = self.nodes.get_mut(current_id) else {
290                    // Queued twice because new children populated and emitted
291                    self.emittable_ids.pop().unwrap();
292                    continue;
293                };
294                if !current_node.child_ids.is_empty() {
295                    // New children populated after emitting the other
296                    let current_id = self.emittable_ids.pop().unwrap();
297                    self.blocked_ids.insert(current_id);
298                    continue;
299                }
300                let Some(item) = current_node.item.take() else {
301                    // Not yet populated
302                    self.populate_one()
303                        .await?
304                        .expect("parent or prioritized node should exist");
305                    continue;
306                };
307                // The second (or the last) parent will be visited first
308                let current_id = self.emittable_ids.pop().unwrap();
309                self.nodes.remove(&current_id).unwrap();
310                let (_, edges) = &item;
311                for parent_id in reachable_targets(edges) {
312                    let parent_node = self.nodes.get_mut(parent_id).unwrap();
313                    parent_node.child_ids.remove(&current_id);
314                    if parent_node.child_ids.is_empty() {
315                        let reusable_id = self.blocked_ids.take(parent_id);
316                        let parent_id = reusable_id.unwrap_or_else(|| parent_id.clone());
317                        self.emittable_ids.push(parent_id);
318                    } else {
319                        self.blocked_ids.insert(parent_id.clone());
320                    }
321                }
322                return Ok(Some(item));
323            } else if !self.new_head_ids.is_empty() {
324                self.flush_new_head();
325            } else {
326                // Populate the first or orphan head
327                if self.populate_one().await?.is_none() {
328                    return Ok(None);
329                }
330            }
331        }
332    }
333
334    /// Make a stream based on the input stream and the optional prioritized
335    /// branches.
336    pub fn stream(self) -> impl Stream<Item = Result<GraphNode<N, ID>, E>> {
337        stream::unfold(self, async |mut state| match state.next_node().await {
338            Ok(Some(node)) => Some((Ok(node), state)),
339            Ok(None) => {
340                assert!(state.nodes.is_empty(), "all nodes should have been emitted");
341                None
342            }
343            Err(err) => Some((Err(err), state)),
344        })
345    }
346}
347
348#[cfg(test)]
349mod tests {
350    use std::cell::RefCell;
351    use std::convert::Infallible;
352    use std::iter::Peekable;
353    use std::rc::Rc;
354
355    use futures::StreamExt as _;
356    use futures::executor::block_on_stream;
357    use itertools::Itertools as _;
358    use renderdag::Ancestor;
359    use renderdag::GraphRowRenderer;
360    use renderdag::Renderer as _;
361
362    use super::*;
363    use crate::tests::TestResult;
364
365    fn missing(c: char) -> GraphEdge<char> {
366        GraphEdge::missing(c)
367    }
368
369    fn direct(c: char) -> GraphEdge<char> {
370        GraphEdge::direct(c)
371    }
372
373    fn indirect(c: char) -> GraphEdge<char> {
374        GraphEdge::indirect(c)
375    }
376
377    fn format_edge(edge: &GraphEdge<char>) -> String {
378        let c = edge.target;
379        match edge.edge_type {
380            GraphEdgeType::Missing => format!("missing({c})"),
381            GraphEdgeType::Direct => format!("direct({c})"),
382            GraphEdgeType::Indirect => format!("indirect({c})"),
383        }
384    }
385
386    /// Wraps the iterator and returns the wrapped iterator and a spy for
387    /// peeking the next item the iterator will return.
388    fn spy_on<I: Iterator>(iter: I) -> (IteratorSpy<I>, SpiedIterator<I>) {
389        let iter = Rc::new(RefCell::new(iter.peekable()));
390        let spy = IteratorSpy { iter: iter.clone() };
391        let iter = SpiedIterator { iter };
392        (spy, iter)
393    }
394
395    struct IteratorSpy<I: Iterator> {
396        iter: Rc<RefCell<Peekable<I>>>,
397    }
398
399    struct SpiedIterator<I: Iterator> {
400        iter: Rc<RefCell<Peekable<I>>>,
401    }
402
403    impl<N, I> IteratorSpy<I>
404    where
405        I: Iterator<Item = N>,
406        N: Clone,
407    {
408        fn peek(&self) -> Option<N> {
409            self.iter.borrow_mut().peek().cloned()
410        }
411    }
412
413    impl<N, I> Iterator for SpiedIterator<I>
414    where
415        I: Iterator<Item = N>,
416    {
417        type Item = N;
418
419        fn next(&mut self) -> Option<Self::Item> {
420            self.iter.borrow_mut().next()
421        }
422    }
423
424    fn format_graph(
425        graph_iter: impl IntoIterator<Item = Result<GraphNode<char>, Infallible>>,
426    ) -> String {
427        let mut renderer = GraphRowRenderer::new()
428            .output()
429            .with_min_row_height(2)
430            .build_box_drawing();
431        graph_iter
432            .into_iter()
433            .map(|item| match item {
434                Ok(node) => node,
435                Err(err) => match err {},
436            })
437            .map(|(id, edges)| {
438                let glyph = id.to_string();
439                let message = edges.iter().map(format_edge).join(", ");
440                let parents = edges
441                    .into_iter()
442                    .map(|edge| match edge.edge_type {
443                        GraphEdgeType::Missing => Ancestor::Anonymous,
444                        GraphEdgeType::Direct => Ancestor::Parent(edge.target),
445                        GraphEdgeType::Indirect => Ancestor::Ancestor(edge.target),
446                    })
447                    .collect();
448                renderer.next_row(id, parents, glyph, message)
449            })
450            .collect()
451    }
452
453    #[test]
454    fn test_format_graph() {
455        let graph = [
456            ('D', vec![direct('C'), indirect('B')]),
457            ('C', vec![direct('A')]),
458            ('B', vec![missing('X')]),
459            ('A', vec![]),
460        ]
461        .map(Ok);
462        insta::assert_snapshot!(format_graph(graph), @"
463        D    direct(C), indirect(B)
464        ├─╮
465        C ╷  direct(A)
466        │ ╷
467        │ B  missing(X)
468        │ │
469        │ ~
470471        A
472        ");
473    }
474
475    fn topo_grouped<'a, I, E: 'a>(
476        graph_iter: I,
477    ) -> impl Iterator<Item = Result<GraphNode<char>, E>> + 'a
478    where
479        I: IntoIterator<Item = Result<GraphNode<char>, E>> + 'a,
480    {
481        block_on_stream(
482            TopoGroupedGraph::new(stream::iter(graph_iter), |c| c)
483                .stream()
484                .boxed_local(),
485        )
486    }
487
488    fn topo_grouped_with_prioritization<'a, I, E: 'a>(
489        graph_iter: I,
490        prioritized_ids: &'a [char],
491    ) -> impl Iterator<Item = Result<GraphNode<char>, E>> + 'a
492    where
493        I: IntoIterator<Item = Result<GraphNode<char>, E>> + 'a,
494    {
495        let mut topo_order = TopoGroupedGraph::new(stream::iter(graph_iter), |c| c);
496        for id in prioritized_ids {
497            topo_order.prioritize_branch(*id);
498        }
499        block_on_stream(topo_order.stream().boxed_local())
500    }
501
502    #[test]
503    fn test_topo_grouped_multiple_roots() -> TestResult {
504        let graph = [
505            ('C', vec![missing('Y')]),
506            ('B', vec![missing('X')]),
507            ('A', vec![]),
508        ]
509        .map(Ok);
510        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @"
511        C  missing(Y)
512513        ~
514
515        B  missing(X)
516517        ~
518
519        A
520        ");
521        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @"
522        C  missing(Y)
523524        ~
525
526        B  missing(X)
527528        ~
529
530        A
531        ");
532
533        // All nodes can be lazily emitted.
534        let (spy, iter) = spy_on(graph.iter().cloned());
535        let mut iter = topo_grouped(iter);
536        assert_eq!(iter.next().unwrap()?.0, 'C');
537        assert_eq!(spy.peek().unwrap().as_ref().unwrap().0, 'B');
538        assert_eq!(iter.next().unwrap()?.0, 'B');
539        assert_eq!(spy.peek().unwrap().as_ref().unwrap().0, 'A');
540        Ok(())
541    }
542
543    #[test]
544    fn test_topo_grouped_trivial_fork() -> TestResult {
545        let graph = [
546            ('E', vec![direct('B')]),
547            ('D', vec![direct('A')]),
548            ('C', vec![direct('B')]),
549            ('B', vec![direct('A')]),
550            ('A', vec![]),
551        ]
552        .map(Ok);
553        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @"
554        E  direct(B)
555556        │ D  direct(A)
557        │ │
558        │ │ C  direct(B)
559        ├───╯
560        B │  direct(A)
561        ├─╯
562        A
563        ");
564        // D-A is found earlier than B-A, but B is emitted first because it belongs to
565        // the emitting branch.
566        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @"
567        E  direct(B)
568569        │ C  direct(B)
570        ├─╯
571        B  direct(A)
572573        │ D  direct(A)
574        ├─╯
575        A
576        ");
577
578        // E can be lazy, then D and C will be queued.
579        let (spy, iter) = spy_on(graph.iter().cloned());
580        let mut iter = topo_grouped(iter);
581        assert_eq!(iter.next().unwrap()?.0, 'E');
582        assert_eq!(spy.peek().unwrap().as_ref().unwrap().0, 'D');
583        assert_eq!(iter.next().unwrap()?.0, 'C');
584        assert_eq!(spy.peek().unwrap().as_ref().unwrap().0, 'B');
585        assert_eq!(iter.next().unwrap()?.0, 'B');
586        assert_eq!(spy.peek().unwrap().as_ref().unwrap().0, 'A');
587        Ok(())
588    }
589
590    #[test]
591    fn test_topo_grouped_fork_interleaved() -> TestResult {
592        let graph = [
593            ('F', vec![direct('D')]),
594            ('E', vec![direct('C')]),
595            ('D', vec![direct('B')]),
596            ('C', vec![direct('B')]),
597            ('B', vec![direct('A')]),
598            ('A', vec![]),
599        ]
600        .map(Ok);
601        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @"
602        F  direct(D)
603604        │ E  direct(C)
605        │ │
606        D │  direct(B)
607        │ │
608        │ C  direct(B)
609        ├─╯
610        B  direct(A)
611612        A
613        ");
614        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @"
615        F  direct(D)
616617        D  direct(B)
618619        │ E  direct(C)
620        │ │
621        │ C  direct(B)
622        ├─╯
623        B  direct(A)
624625        A
626        ");
627
628        // F can be lazy, then E will be queued, then C.
629        let (spy, iter) = spy_on(graph.iter().cloned());
630        let mut iter = topo_grouped(iter);
631        assert_eq!(iter.next().unwrap()?.0, 'F');
632        assert_eq!(spy.peek().unwrap().as_ref().unwrap().0, 'E');
633        assert_eq!(iter.next().unwrap()?.0, 'D');
634        assert_eq!(spy.peek().unwrap().as_ref().unwrap().0, 'C');
635        assert_eq!(iter.next().unwrap()?.0, 'E');
636        assert_eq!(spy.peek().unwrap().as_ref().unwrap().0, 'B');
637        Ok(())
638    }
639
640    #[test]
641    fn test_topo_grouped_fork_multiple_heads() -> TestResult {
642        let graph = [
643            ('I', vec![direct('E')]),
644            ('H', vec![direct('C')]),
645            ('G', vec![direct('A')]),
646            ('F', vec![direct('E')]),
647            ('E', vec![direct('C')]),
648            ('D', vec![direct('C')]),
649            ('C', vec![direct('A')]),
650            ('B', vec![direct('A')]),
651            ('A', vec![]),
652        ]
653        .map(Ok);
654        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @"
655        I  direct(E)
656657        │ H  direct(C)
658        │ │
659        │ │ G  direct(A)
660        │ │ │
661        │ │ │ F  direct(E)
662        ├─────╯
663        E │ │  direct(C)
664        ├─╯ │
665        │ D │  direct(C)
666        ├─╯ │
667        C   │  direct(A)
668        ├───╯
669        │ B  direct(A)
670        ├─╯
671        A
672        ");
673        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @"
674        I  direct(E)
675676        │ F  direct(E)
677        ├─╯
678        E  direct(C)
679680        │ H  direct(C)
681        ├─╯
682        │ D  direct(C)
683        ├─╯
684        C  direct(A)
685686        │ G  direct(A)
687        ├─╯
688        │ B  direct(A)
689        ├─╯
690        A
691        ");
692
693        // I can be lazy, then H, G, and F will be queued.
694        let (spy, iter) = spy_on(graph.iter().cloned());
695        let mut iter = topo_grouped(iter);
696        assert_eq!(iter.next().unwrap()?.0, 'I');
697        assert_eq!(spy.peek().unwrap().as_ref().unwrap().0, 'H');
698        assert_eq!(iter.next().unwrap()?.0, 'F');
699        assert_eq!(spy.peek().unwrap().as_ref().unwrap().0, 'E');
700        Ok(())
701    }
702
703    #[test]
704    fn test_topo_grouped_fork_parallel() {
705        let graph = [
706            // Pull all sub graphs in reverse order:
707            ('I', vec![direct('A')]),
708            ('H', vec![direct('C')]),
709            ('G', vec![direct('E')]),
710            // Orphan sub graph G,F-E:
711            ('F', vec![direct('E')]),
712            ('E', vec![missing('Y')]),
713            // Orphan sub graph H,D-C:
714            ('D', vec![direct('C')]),
715            ('C', vec![missing('X')]),
716            // Orphan sub graph I,B-A:
717            ('B', vec![direct('A')]),
718            ('A', vec![]),
719        ]
720        .map(Ok);
721        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @"
722        I  direct(A)
723724        │ H  direct(C)
725        │ │
726        │ │ G  direct(E)
727        │ │ │
728        │ │ │ F  direct(E)
729        │ │ ├─╯
730        │ │ E  missing(Y)
731        │ │ │
732        │ │ ~
733        │ │
734        │ │ D  direct(C)
735        │ ├─╯
736        │ C  missing(X)
737        │ │
738        │ ~
739740        │ B  direct(A)
741        ├─╯
742        A
743        ");
744        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @"
745        I  direct(A)
746747        │ B  direct(A)
748        ├─╯
749        A
750
751        H  direct(C)
752753        │ D  direct(C)
754        ├─╯
755        C  missing(X)
756757        ~
758
759        G  direct(E)
760761        │ F  direct(E)
762        ├─╯
763        E  missing(Y)
764765        ~
766        ");
767    }
768
769    #[test]
770    fn test_topo_grouped_fork_nested() {
771        fn sub_graph(
772            chars: impl IntoIterator<Item = char>,
773            root_edges: Vec<GraphEdge<char>>,
774        ) -> Vec<GraphNode<char>> {
775            let [b, c, d, e, f]: [char; 5] = chars.into_iter().collect_vec().try_into().unwrap();
776            vec![
777                (f, vec![direct(c)]),
778                (e, vec![direct(b)]),
779                (d, vec![direct(c)]),
780                (c, vec![direct(b)]),
781                (b, root_edges),
782            ]
783        }
784
785        // One nested fork sub graph from A
786        let graph = itertools::chain!(
787            vec![('G', vec![direct('A')])],
788            sub_graph('B'..='F', vec![direct('A')]),
789            vec![('A', vec![])],
790        )
791        .map(Ok)
792        .collect_vec();
793        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @"
794        G  direct(A)
795796        │ F  direct(C)
797        │ │
798        │ │ E  direct(B)
799        │ │ │
800        │ │ │ D  direct(C)
801        │ ├───╯
802        │ C │  direct(B)
803        │ ├─╯
804        │ B  direct(A)
805        ├─╯
806        A
807        ");
808        // A::F is picked at A, and A will be unblocked. Then, C::D at C, ...
809        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @"
810        G  direct(A)
811812        │ F  direct(C)
813        │ │
814        │ │ D  direct(C)
815        │ ├─╯
816        │ C  direct(B)
817        │ │
818        │ │ E  direct(B)
819        │ ├─╯
820        │ B  direct(A)
821        ├─╯
822        A
823        ");
824
825        // Two nested fork sub graphs from A
826        let graph = itertools::chain!(
827            vec![('L', vec![direct('A')])],
828            sub_graph('G'..='K', vec![direct('A')]),
829            sub_graph('B'..='F', vec![direct('A')]),
830            vec![('A', vec![])],
831        )
832        .map(Ok)
833        .collect_vec();
834        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @"
835        L  direct(A)
836837        │ K  direct(H)
838        │ │
839        │ │ J  direct(G)
840        │ │ │
841        │ │ │ I  direct(H)
842        │ ├───╯
843        │ H │  direct(G)
844        │ ├─╯
845        │ G  direct(A)
846        ├─╯
847        │ F  direct(C)
848        │ │
849        │ │ E  direct(B)
850        │ │ │
851        │ │ │ D  direct(C)
852        │ ├───╯
853        │ C │  direct(B)
854        │ ├─╯
855        │ B  direct(A)
856        ├─╯
857        A
858        ");
859        // A::K is picked at A, and A will be unblocked. Then, H::I at H, ...
860        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @"
861        L  direct(A)
862863        │ K  direct(H)
864        │ │
865        │ │ I  direct(H)
866        │ ├─╯
867        │ H  direct(G)
868        │ │
869        │ │ J  direct(G)
870        │ ├─╯
871        │ G  direct(A)
872        ├─╯
873        │ F  direct(C)
874        │ │
875        │ │ D  direct(C)
876        │ ├─╯
877        │ C  direct(B)
878        │ │
879        │ │ E  direct(B)
880        │ ├─╯
881        │ B  direct(A)
882        ├─╯
883        A
884        ");
885
886        // Two nested fork sub graphs from A, interleaved
887        let graph = itertools::chain!(
888            vec![('L', vec![direct('A')])],
889            sub_graph(['C', 'E', 'G', 'I', 'K'], vec![direct('A')]),
890            sub_graph(['B', 'D', 'F', 'H', 'J'], vec![direct('A')]),
891            vec![('A', vec![])],
892        )
893        .sorted_by(|(id1, _), (id2, _)| id2.cmp(id1))
894        .map(Ok)
895        .collect_vec();
896        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @"
897        L  direct(A)
898899        │ K  direct(E)
900        │ │
901        │ │ J  direct(D)
902        │ │ │
903        │ │ │ I  direct(C)
904        │ │ │ │
905        │ │ │ │ H  direct(B)
906        │ │ │ │ │
907        │ │ │ │ │ G  direct(E)
908        │ ├───────╯
909        │ │ │ │ │ F  direct(D)
910        │ │ ├─────╯
911        │ E │ │ │  direct(C)
912        │ ├───╯ │
913        │ │ D   │  direct(B)
914        │ │ ├───╯
915        │ C │  direct(A)
916        ├─╯ │
917        │   B  direct(A)
918        ├───╯
919        A
920        ");
921        // A::K is picked at A, and A will be unblocked. Then, E::G at E, ...
922        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @"
923        L  direct(A)
924925        │ K  direct(E)
926        │ │
927        │ │ G  direct(E)
928        │ ├─╯
929        │ E  direct(C)
930        │ │
931        │ │ I  direct(C)
932        │ ├─╯
933        │ C  direct(A)
934        ├─╯
935        │ J  direct(D)
936        │ │
937        │ │ F  direct(D)
938        │ ├─╯
939        │ D  direct(B)
940        │ │
941        │ │ H  direct(B)
942        │ ├─╯
943        │ B  direct(A)
944        ├─╯
945        A
946        ");
947
948        // Merged fork sub graphs at K
949        let graph = itertools::chain!(
950            vec![('K', vec![direct('E'), direct('J')])],
951            sub_graph('F'..='J', vec![missing('Y')]),
952            sub_graph('A'..='E', vec![missing('X')]),
953        )
954        .map(Ok)
955        .collect_vec();
956        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @"
957        K    direct(E), direct(J)
958        ├─╮
959        │ J  direct(G)
960        │ │
961        │ │ I  direct(F)
962        │ │ │
963        │ │ │ H  direct(G)
964        │ ├───╯
965        │ G │  direct(F)
966        │ ├─╯
967        │ F  missing(Y)
968        │ │
969        │ ~
970971        E  direct(B)
972973        │ D  direct(A)
974        │ │
975        │ │ C  direct(B)
976        ├───╯
977        B │  direct(A)
978        ├─╯
979        A  missing(X)
980981        ~
982        ");
983        // K-E,J is resolved without queuing new heads. Then, G::H, F::I, B::C, and
984        // A::D.
985        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @"
986        K    direct(E), direct(J)
987        ├─╮
988        │ J  direct(G)
989        │ │
990        E │  direct(B)
991        │ │
992        │ │ H  direct(G)
993        │ ├─╯
994        │ G  direct(F)
995        │ │
996        │ │ I  direct(F)
997        │ ├─╯
998        │ F  missing(Y)
999        │ │
1000        │ ~
10011002        │ C  direct(B)
1003        ├─╯
1004        B  direct(A)
10051006        │ D  direct(A)
1007        ├─╯
1008        A  missing(X)
10091010        ~
1011        ");
1012
1013        // Merged fork sub graphs at K, interleaved
1014        let graph = itertools::chain!(
1015            vec![('K', vec![direct('I'), direct('J')])],
1016            sub_graph(['B', 'D', 'F', 'H', 'J'], vec![missing('Y')]),
1017            sub_graph(['A', 'C', 'E', 'G', 'I'], vec![missing('X')]),
1018        )
1019        .sorted_by(|(id1, _), (id2, _)| id2.cmp(id1))
1020        .map(Ok)
1021        .collect_vec();
1022        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @"
1023        K    direct(I), direct(J)
1024        ├─╮
1025        │ J  direct(D)
1026        │ │
1027        I │  direct(C)
1028        │ │
1029        │ │ H  direct(B)
1030        │ │ │
1031        │ │ │ G  direct(A)
1032        │ │ │ │
1033        │ │ │ │ F  direct(D)
1034        │ ├─────╯
1035        │ │ │ │ E  direct(C)
1036        ├───────╯
1037        │ D │ │  direct(B)
1038        │ ├─╯ │
1039        C │   │  direct(A)
1040        ├─────╯
1041        │ B  missing(Y)
1042        │ │
1043        │ ~
10441045        A  missing(X)
10461047        ~
1048        ");
1049        // K-I,J is resolved without queuing new heads. Then, D::F, B::H, C::E, and
1050        // A::G.
1051        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @"
1052        K    direct(I), direct(J)
1053        ├─╮
1054        │ J  direct(D)
1055        │ │
1056        I │  direct(C)
1057        │ │
1058        │ │ F  direct(D)
1059        │ ├─╯
1060        │ D  direct(B)
1061        │ │
1062        │ │ H  direct(B)
1063        │ ├─╯
1064        │ B  missing(Y)
1065        │ │
1066        │ ~
10671068        │ E  direct(C)
1069        ├─╯
1070        C  direct(A)
10711072        │ G  direct(A)
1073        ├─╯
1074        A  missing(X)
10751076        ~
1077        ");
1078    }
1079
1080    #[test]
1081    fn test_topo_grouped_merge_interleaved() -> TestResult {
1082        let graph = [
1083            ('F', vec![direct('E')]),
1084            ('E', vec![direct('C'), direct('D')]),
1085            ('D', vec![direct('B')]),
1086            ('C', vec![direct('A')]),
1087            ('B', vec![direct('A')]),
1088            ('A', vec![]),
1089        ]
1090        .map(Ok);
1091        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @"
1092        F  direct(E)
10931094        E    direct(C), direct(D)
1095        ├─╮
1096        │ D  direct(B)
1097        │ │
1098        C │  direct(A)
1099        │ │
1100        │ B  direct(A)
1101        ├─╯
1102        A
1103        ");
1104        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @"
1105        F  direct(E)
11061107        E    direct(C), direct(D)
1108        ├─╮
1109        │ D  direct(B)
1110        │ │
1111        │ B  direct(A)
1112        │ │
1113        C │  direct(A)
1114        ├─╯
1115        A
1116        ");
1117
1118        // F, E, and D can be lazy, then C will be queued, then B.
1119        let (spy, iter) = spy_on(graph.iter().cloned());
1120        let mut iter = topo_grouped(iter);
1121        assert_eq!(iter.next().unwrap()?.0, 'F');
1122        assert_eq!(spy.peek().unwrap().as_ref().unwrap().0, 'E');
1123        assert_eq!(iter.next().unwrap()?.0, 'E');
1124        assert_eq!(spy.peek().unwrap().as_ref().unwrap().0, 'D');
1125        assert_eq!(iter.next().unwrap()?.0, 'D');
1126        assert_eq!(spy.peek().unwrap().as_ref().unwrap().0, 'C');
1127        assert_eq!(iter.next().unwrap()?.0, 'B');
1128        assert_eq!(spy.peek().unwrap().as_ref().unwrap().0, 'A');
1129        Ok(())
1130    }
1131
1132    #[test]
1133    fn test_topo_grouped_merge_but_missing() -> TestResult {
1134        let graph = [
1135            ('E', vec![direct('D')]),
1136            ('D', vec![missing('Y'), direct('C')]),
1137            ('C', vec![direct('B'), missing('X')]),
1138            ('B', vec![direct('A')]),
1139            ('A', vec![]),
1140        ]
1141        .map(Ok);
1142        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @"
1143        E  direct(D)
11441145        D    missing(Y), direct(C)
1146        ├─╮
1147        │ │
1148        ~ │
11491150          C  direct(B), missing(X)
1151        ╭─┤
1152        │ │
1153        ~ │
11541155          B  direct(A)
11561157          A
1158        ");
1159        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @"
1160        E  direct(D)
11611162        D    missing(Y), direct(C)
1163        ├─╮
1164        │ │
1165        ~ │
11661167          C  direct(B), missing(X)
1168        ╭─┤
1169        │ │
1170        ~ │
11711172          B  direct(A)
11731174          A
1175        ");
1176
1177        // All nodes can be lazily emitted.
1178        let (spy, iter) = spy_on(graph.iter().cloned());
1179        let mut iter = topo_grouped(iter);
1180        assert_eq!(iter.next().unwrap()?.0, 'E');
1181        assert_eq!(spy.peek().unwrap().as_ref().unwrap().0, 'D');
1182        assert_eq!(iter.next().unwrap()?.0, 'D');
1183        assert_eq!(spy.peek().unwrap().as_ref().unwrap().0, 'C');
1184        assert_eq!(iter.next().unwrap()?.0, 'C');
1185        assert_eq!(spy.peek().unwrap().as_ref().unwrap().0, 'B');
1186        assert_eq!(iter.next().unwrap()?.0, 'B');
1187        assert_eq!(spy.peek().unwrap().as_ref().unwrap().0, 'A');
1188        Ok(())
1189    }
1190
1191    #[test]
1192    fn test_topo_grouped_merge_criss_cross() -> TestResult {
1193        let graph = [
1194            ('G', vec![direct('E')]),
1195            ('F', vec![direct('D')]),
1196            ('E', vec![direct('B'), direct('C')]),
1197            ('D', vec![direct('B'), direct('C')]),
1198            ('C', vec![direct('A')]),
1199            ('B', vec![direct('A')]),
1200            ('A', vec![]),
1201        ]
1202        .map(Ok);
1203        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @"
1204        G  direct(E)
12051206        │ F  direct(D)
1207        │ │
1208        E │    direct(B), direct(C)
1209        ├───╮
1210        │ D │  direct(B), direct(C)
1211        ╭─┴─╮
1212        │   C  direct(A)
1213        │   │
1214        B   │  direct(A)
1215        ├───╯
1216        A
1217        ");
1218        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @"
1219        G  direct(E)
12201221        E    direct(B), direct(C)
1222        ├─╮
1223        │ │ F  direct(D)
1224        │ │ │
1225        │ │ D  direct(B), direct(C)
1226        ╭─┬─╯
1227        │ C  direct(A)
1228        │ │
1229        B │  direct(A)
1230        ├─╯
1231        A
1232        ");
1233        Ok(())
1234    }
1235
1236    #[test]
1237    fn test_topo_grouped_merge_descendants_interleaved() -> TestResult {
1238        let graph = [
1239            ('H', vec![direct('F')]),
1240            ('G', vec![direct('E')]),
1241            ('F', vec![direct('D')]),
1242            ('E', vec![direct('C')]),
1243            ('D', vec![direct('C'), direct('B')]),
1244            ('C', vec![direct('A')]),
1245            ('B', vec![direct('A')]),
1246            ('A', vec![]),
1247        ]
1248        .map(Ok);
1249        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @"
1250        H  direct(F)
12511252        │ G  direct(E)
1253        │ │
1254        F │  direct(D)
1255        │ │
1256        │ E  direct(C)
1257        │ │
1258        D │  direct(C), direct(B)
1259        ├─╮
1260        │ C  direct(A)
1261        │ │
1262        B │  direct(A)
1263        ├─╯
1264        A
1265        ");
1266        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @"
1267        H  direct(F)
12681269        F  direct(D)
12701271        D    direct(C), direct(B)
1272        ├─╮
1273        │ B  direct(A)
1274        │ │
1275        │ │ G  direct(E)
1276        │ │ │
1277        │ │ E  direct(C)
1278        ├───╯
1279        C │  direct(A)
1280        ├─╯
1281        A
1282        ");
1283        Ok(())
1284    }
1285
1286    #[test]
1287    fn test_topo_grouped_merge_multiple_roots() -> TestResult {
1288        let graph = [
1289            ('D', vec![direct('C')]),
1290            ('C', vec![direct('B'), direct('A')]),
1291            ('B', vec![missing('X')]),
1292            ('A', vec![]),
1293        ]
1294        .map(Ok);
1295        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @"
1296        D  direct(C)
12971298        C    direct(B), direct(A)
1299        ├─╮
1300        B │  missing(X)
1301        │ │
1302        ~ │
13031304          A
1305        ");
1306        // A is emitted first because it's the second parent.
1307        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @"
1308        D  direct(C)
13091310        C    direct(B), direct(A)
1311        ├─╮
1312        │ A
13131314        B  missing(X)
13151316        ~
1317        ");
1318        Ok(())
1319    }
1320
1321    #[test]
1322    fn test_topo_grouped_merge_stairs() -> TestResult {
1323        let graph = [
1324            // Merge topic branches one by one:
1325            ('J', vec![direct('I'), direct('G')]),
1326            ('I', vec![direct('H'), direct('E')]),
1327            ('H', vec![direct('D'), direct('F')]),
1328            // Topic branches:
1329            ('G', vec![direct('D')]),
1330            ('F', vec![direct('C')]),
1331            ('E', vec![direct('B')]),
1332            // Base nodes:
1333            ('D', vec![direct('C')]),
1334            ('C', vec![direct('B')]),
1335            ('B', vec![direct('A')]),
1336            ('A', vec![]),
1337        ]
1338        .map(Ok);
1339        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @"
1340        J    direct(I), direct(G)
1341        ├─╮
1342        I │    direct(H), direct(E)
1343        ├───╮
1344        H │ │    direct(D), direct(F)
1345        ├─────╮
1346        │ G │ │  direct(D)
1347        ├─╯ │ │
1348        │   │ F  direct(C)
1349        │   │ │
1350        │   E │  direct(B)
1351        │   │ │
1352        D   │ │  direct(C)
1353        ├─────╯
1354        C   │  direct(B)
1355        ├───╯
1356        B  direct(A)
13571358        A
1359        ");
1360        // Second branches are visited first.
1361        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @"
1362        J    direct(I), direct(G)
1363        ├─╮
1364        │ G  direct(D)
1365        │ │
1366        I │    direct(H), direct(E)
1367        ├───╮
1368        │ │ E  direct(B)
1369        │ │ │
1370        H │ │  direct(D), direct(F)
1371        ├─╮ │
1372        F │ │  direct(C)
1373        │ │ │
1374        │ D │  direct(C)
1375        ├─╯ │
1376        C   │  direct(B)
1377        ├───╯
1378        B  direct(A)
13791380        A
1381        ");
1382        Ok(())
1383    }
1384
1385    #[test]
1386    fn test_topo_grouped_merge_and_fork() -> TestResult {
1387        let graph = [
1388            ('J', vec![direct('F')]),
1389            ('I', vec![direct('E')]),
1390            ('H', vec![direct('G')]),
1391            ('G', vec![direct('D'), direct('E')]),
1392            ('F', vec![direct('C')]),
1393            ('E', vec![direct('B')]),
1394            ('D', vec![direct('B')]),
1395            ('C', vec![direct('A')]),
1396            ('B', vec![direct('A')]),
1397            ('A', vec![]),
1398        ]
1399        .map(Ok);
1400        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @"
1401        J  direct(F)
14021403        │ I  direct(E)
1404        │ │
1405        │ │ H  direct(G)
1406        │ │ │
1407        │ │ G  direct(D), direct(E)
1408        │ ╭─┤
1409        F │ │  direct(C)
1410        │ │ │
1411        │ E │  direct(B)
1412        │ │ │
1413        │ │ D  direct(B)
1414        │ ├─╯
1415        C │  direct(A)
1416        │ │
1417        │ B  direct(A)
1418        ├─╯
1419        A
1420        ");
1421        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @"
1422        J  direct(F)
14231424        F  direct(C)
14251426        C  direct(A)
14271428        │ I  direct(E)
1429        │ │
1430        │ │ H  direct(G)
1431        │ │ │
1432        │ │ G  direct(D), direct(E)
1433        │ ╭─┤
1434        │ E │  direct(B)
1435        │ │ │
1436        │ │ D  direct(B)
1437        │ ├─╯
1438        │ B  direct(A)
1439        ├─╯
1440        A
1441        ");
1442        Ok(())
1443    }
1444
1445    #[test]
1446    fn test_topo_grouped_merge_and_fork_multiple_roots() -> TestResult {
1447        let graph = [
1448            ('J', vec![direct('F')]),
1449            ('I', vec![direct('G')]),
1450            ('H', vec![direct('E')]),
1451            ('G', vec![direct('E'), direct('B')]),
1452            ('F', vec![direct('D')]),
1453            ('E', vec![direct('C')]),
1454            ('D', vec![direct('A')]),
1455            ('C', vec![direct('A')]),
1456            ('B', vec![missing('X')]),
1457            ('A', vec![]),
1458        ]
1459        .map(Ok);
1460        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @"
1461        J  direct(F)
14621463        │ I  direct(G)
1464        │ │
1465        │ │ H  direct(E)
1466        │ │ │
1467        │ G │  direct(E), direct(B)
1468        │ ├─╮
1469        F │ │  direct(D)
1470        │ │ │
1471        │ │ E  direct(C)
1472        │ │ │
1473        D │ │  direct(A)
1474        │ │ │
1475        │ │ C  direct(A)
1476        ├───╯
1477        │ B  missing(X)
1478        │ │
1479        │ ~
14801481        A
1482        ");
1483        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @"
1484        J  direct(F)
14851486        F  direct(D)
14871488        D  direct(A)
14891490        │ I  direct(G)
1491        │ │
1492        │ G    direct(E), direct(B)
1493        │ ├─╮
1494        │ │ B  missing(X)
1495        │ │ │
1496        │ │ ~
1497        │ │
1498        │ │ H  direct(E)
1499        │ ├─╯
1500        │ E  direct(C)
1501        │ │
1502        │ C  direct(A)
1503        ├─╯
1504        A
1505        ");
1506        Ok(())
1507    }
1508
1509    #[test]
1510    fn test_topo_grouped_parallel_interleaved() -> TestResult {
1511        let graph = [
1512            ('E', vec![direct('C')]),
1513            ('D', vec![direct('B')]),
1514            ('C', vec![direct('A')]),
1515            ('B', vec![missing('X')]),
1516            ('A', vec![]),
1517        ]
1518        .map(Ok);
1519        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @"
1520        E  direct(C)
15211522        │ D  direct(B)
1523        │ │
1524        C │  direct(A)
1525        │ │
1526        │ B  missing(X)
1527        │ │
1528        │ ~
15291530        A
1531        ");
1532        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @"
1533        E  direct(C)
15341535        C  direct(A)
15361537        A
1538
1539        D  direct(B)
15401541        B  missing(X)
15421543        ~
1544        ");
1545        Ok(())
1546    }
1547
1548    #[test]
1549    fn test_topo_grouped_multiple_child_dependencies() -> TestResult {
1550        let graph = [
1551            ('I', vec![direct('H'), direct('G')]),
1552            ('H', vec![direct('D')]),
1553            ('G', vec![direct('B')]),
1554            ('F', vec![direct('E'), direct('C')]),
1555            ('E', vec![direct('D')]),
1556            ('D', vec![direct('B')]),
1557            ('C', vec![direct('B')]),
1558            ('B', vec![direct('A')]),
1559            ('A', vec![]),
1560        ]
1561        .map(Ok);
1562        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @"
1563        I    direct(H), direct(G)
1564        ├─╮
1565        H │  direct(D)
1566        │ │
1567        │ G  direct(B)
1568        │ │
1569        │ │ F    direct(E), direct(C)
1570        │ │ ├─╮
1571        │ │ E │  direct(D)
1572        ├───╯ │
1573        D │   │  direct(B)
1574        ├─╯   │
1575        │     C  direct(B)
1576        ├─────╯
1577        B  direct(A)
15781579        A
1580        ");
1581        // Topological order must be preserved. Depending on the implementation,
1582        // E might be requested more than once by paths D->E and B->D->E.
1583        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @"
1584        I    direct(H), direct(G)
1585        ├─╮
1586        │ G  direct(B)
1587        │ │
1588        H │  direct(D)
1589        │ │
1590        │ │ F    direct(E), direct(C)
1591        │ │ ├─╮
1592        │ │ │ C  direct(B)
1593        │ ├───╯
1594        │ │ E  direct(D)
1595        ├───╯
1596        D │  direct(B)
1597        ├─╯
1598        B  direct(A)
15991600        A
1601        ");
1602        Ok(())
1603    }
1604
1605    #[test]
1606    fn test_topo_grouped_prioritized_branches_trivial_fork() -> TestResult {
1607        // The same setup as test_topo_grouped_trivial_fork()
1608        let graph = [
1609            ('E', vec![direct('B')]),
1610            ('D', vec![direct('A')]),
1611            ('C', vec![direct('B')]),
1612            ('B', vec![direct('A')]),
1613            ('A', vec![]),
1614        ]
1615        .map(Ok);
1616        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @"
1617        E  direct(B)
16181619        │ D  direct(A)
1620        │ │
1621        │ │ C  direct(B)
1622        ├───╯
1623        B │  direct(A)
1624        ├─╯
1625        A
1626        ");
1627
1628        // Emit the branch C first
1629        let iter = topo_grouped_with_prioritization(graph.iter().cloned(), &['C']);
1630        insta::assert_snapshot!(format_graph(iter), @"
1631        C  direct(B)
16321633        │ E  direct(B)
1634        ├─╯
1635        B  direct(A)
16361637        │ D  direct(A)
1638        ├─╯
1639        A
1640        ");
1641
1642        // Emit the branch D first
1643        let iter = topo_grouped_with_prioritization(graph.iter().cloned(), &['D']);
1644        insta::assert_snapshot!(format_graph(iter), @"
1645        D  direct(A)
16461647        │ E  direct(B)
1648        │ │
1649        │ │ C  direct(B)
1650        │ ├─╯
1651        │ B  direct(A)
1652        ├─╯
1653        A
1654        ");
1655
1656        // Emit the branch C first, then D. E is emitted earlier than D because
1657        // E belongs to the branch C compared to the branch D.
1658        let iter = topo_grouped_with_prioritization(graph.iter().cloned(), &['C', 'D']);
1659        insta::assert_snapshot!(format_graph(iter), @"
1660        C  direct(B)
16611662        │ E  direct(B)
1663        ├─╯
1664        B  direct(A)
16651666        │ D  direct(A)
1667        ├─╯
1668        A
1669        ");
1670
1671        // Non-head node can be prioritized
1672        let iter = topo_grouped_with_prioritization(graph.iter().cloned(), &['B']);
1673        insta::assert_snapshot!(format_graph(iter), @"
1674        E  direct(B)
16751676        │ C  direct(B)
1677        ├─╯
1678        B  direct(A)
16791680        │ D  direct(A)
1681        ├─╯
1682        A
1683        ");
1684
1685        // Root node can be prioritized
1686        let iter = topo_grouped_with_prioritization(graph.iter().cloned(), &['A']);
1687        insta::assert_snapshot!(format_graph(iter), @"
1688        D  direct(A)
16891690        │ E  direct(B)
1691        │ │
1692        │ │ C  direct(B)
1693        │ ├─╯
1694        │ B  direct(A)
1695        ├─╯
1696        A
1697        ");
1698        Ok(())
1699    }
1700
1701    #[test]
1702    fn test_topo_grouped_prioritized_branches_fork_multiple_heads() -> TestResult {
1703        // The same setup as test_topo_grouped_fork_multiple_heads()
1704        let graph = [
1705            ('I', vec![direct('E')]),
1706            ('H', vec![direct('C')]),
1707            ('G', vec![direct('A')]),
1708            ('F', vec![direct('E')]),
1709            ('E', vec![direct('C')]),
1710            ('D', vec![direct('C')]),
1711            ('C', vec![direct('A')]),
1712            ('B', vec![direct('A')]),
1713            ('A', vec![]),
1714        ]
1715        .map(Ok);
1716        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @"
1717        I  direct(E)
17181719        │ H  direct(C)
1720        │ │
1721        │ │ G  direct(A)
1722        │ │ │
1723        │ │ │ F  direct(E)
1724        ├─────╯
1725        E │ │  direct(C)
1726        ├─╯ │
1727        │ D │  direct(C)
1728        ├─╯ │
1729        C   │  direct(A)
1730        ├───╯
1731        │ B  direct(A)
1732        ├─╯
1733        A
1734        ");
1735
1736        // Emit B, G, then remainders
1737        let iter = topo_grouped_with_prioritization(graph.iter().cloned(), &['B', 'G']);
1738        insta::assert_snapshot!(format_graph(iter), @"
1739        B  direct(A)
17401741        │ G  direct(A)
1742        ├─╯
1743        │ I  direct(E)
1744        │ │
1745        │ │ F  direct(E)
1746        │ ├─╯
1747        │ E  direct(C)
1748        │ │
1749        │ │ H  direct(C)
1750        │ ├─╯
1751        │ │ D  direct(C)
1752        │ ├─╯
1753        │ C  direct(A)
1754        ├─╯
1755        A
1756        ");
1757
1758        // Emit D, H, then descendants of C. The order of B and G is not
1759        // respected because G can be found earlier through C->A->G. At this
1760        // point, B is not populated yet, so A is blocked only by {G}. This is
1761        // a limitation of the current node reordering logic.
1762        let iter = topo_grouped_with_prioritization(graph.iter().cloned(), &['D', 'H', 'B', 'G']);
1763        insta::assert_snapshot!(format_graph(iter), @"
1764        D  direct(C)
17651766        │ H  direct(C)
1767        ├─╯
1768        │ I  direct(E)
1769        │ │
1770        │ │ F  direct(E)
1771        │ ├─╯
1772        │ E  direct(C)
1773        ├─╯
1774        C  direct(A)
17751776        │ G  direct(A)
1777        ├─╯
1778        │ B  direct(A)
1779        ├─╯
1780        A
1781        ");
1782        Ok(())
1783    }
1784
1785    #[test]
1786    fn test_topo_grouped_prioritized_branches_fork_parallel() -> TestResult {
1787        // The same setup as test_topo_grouped_fork_parallel()
1788        let graph = [
1789            // Pull all sub graphs in reverse order:
1790            ('I', vec![direct('A')]),
1791            ('H', vec![direct('C')]),
1792            ('G', vec![direct('E')]),
1793            // Orphan sub graph G,F-E:
1794            ('F', vec![direct('E')]),
1795            ('E', vec![missing('Y')]),
1796            // Orphan sub graph H,D-C:
1797            ('D', vec![direct('C')]),
1798            ('C', vec![missing('X')]),
1799            // Orphan sub graph I,B-A:
1800            ('B', vec![direct('A')]),
1801            ('A', vec![]),
1802        ]
1803        .map(Ok);
1804        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @"
1805        I  direct(A)
18061807        │ H  direct(C)
1808        │ │
1809        │ │ G  direct(E)
1810        │ │ │
1811        │ │ │ F  direct(E)
1812        │ │ ├─╯
1813        │ │ E  missing(Y)
1814        │ │ │
1815        │ │ ~
1816        │ │
1817        │ │ D  direct(C)
1818        │ ├─╯
1819        │ C  missing(X)
1820        │ │
1821        │ ~
18221823        │ B  direct(A)
1824        ├─╯
1825        A
1826        ");
1827
1828        // Emit the sub graph G first
1829        let iter = topo_grouped_with_prioritization(graph.iter().cloned(), &['G']);
1830        insta::assert_snapshot!(format_graph(iter), @"
1831        G  direct(E)
18321833        │ F  direct(E)
1834        ├─╯
1835        E  missing(Y)
18361837        ~
1838
1839        I  direct(A)
18401841        │ B  direct(A)
1842        ├─╯
1843        A
1844
1845        H  direct(C)
18461847        │ D  direct(C)
1848        ├─╯
1849        C  missing(X)
18501851        ~
1852        ");
1853
1854        // Emit sub graphs in reverse order by selecting roots
1855        let iter = topo_grouped_with_prioritization(graph.iter().cloned(), &['E', 'C', 'A']);
1856        insta::assert_snapshot!(format_graph(iter), @"
1857        G  direct(E)
18581859        │ F  direct(E)
1860        ├─╯
1861        E  missing(Y)
18621863        ~
1864
1865        H  direct(C)
18661867        │ D  direct(C)
1868        ├─╯
1869        C  missing(X)
18701871        ~
1872
1873        I  direct(A)
18741875        │ B  direct(A)
1876        ├─╯
1877        A
1878        ");
1879        Ok(())
1880    }
1881
1882    #[test]
1883    fn test_topo_grouped_requeue_unpopulated() -> TestResult {
1884        let graph = [
1885            ('C', vec![direct('A'), direct('B')]),
1886            ('B', vec![direct('A')]),
1887            ('A', vec![]),
1888        ]
1889        .map(Ok);
1890        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @"
1891        C    direct(A), direct(B)
1892        ├─╮
1893        │ B  direct(A)
1894        ├─╯
1895        A
1896        ");
1897        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @"
1898        C    direct(A), direct(B)
1899        ├─╮
1900        │ B  direct(A)
1901        ├─╯
1902        A
1903        ");
1904
1905        // A is queued once by C-A because B isn't populated at this point. Since
1906        // B is the second parent, B-A is processed next and A is queued again. So
1907        // one of them in the queue has to be ignored.
1908        let mut iter = topo_grouped(graph.iter().cloned());
1909        assert_eq!(iter.next().unwrap()?.0, 'C');
1910        assert_eq!(iter.next().unwrap()?.0, 'B');
1911        assert_eq!(iter.next().unwrap()?.0, 'A');
1912        assert!(iter.next().is_none());
1913        Ok(())
1914    }
1915
1916    #[test]
1917    fn test_topo_grouped_duplicated_edges() -> TestResult {
1918        // The graph shouldn't have duplicated parent->child edges, but topo-grouped
1919        // iterator can handle it anyway.
1920        let graph = [('B', vec![direct('A'), direct('A')]), ('A', vec![])].map(Ok);
1921        insta::assert_snapshot!(format_graph(graph.iter().cloned()), @"
1922        B  direct(A), direct(A)
19231924        A
1925        ");
1926        insta::assert_snapshot!(format_graph(topo_grouped(graph.iter().cloned())), @"
1927        B  direct(A), direct(A)
19281929        A
1930        ");
1931
1932        let mut iter = topo_grouped(graph.iter().cloned());
1933        assert_eq!(iter.next().unwrap()?.0, 'B');
1934        assert_eq!(iter.next().unwrap()?.0, 'A');
1935        assert!(iter.next().is_none());
1936        Ok(())
1937    }
1938}