jj_lib/
merged_tree.rs

1// Copyright 2023 The Jujutsu Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A lazily merged view of a set of trees.
16
17use std::collections::BTreeMap;
18use std::fmt;
19use std::iter;
20use std::iter::zip;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::Context;
24use std::task::Poll;
25use std::task::ready;
26use std::vec;
27
28use either::Either;
29use futures::Stream;
30use futures::StreamExt as _;
31use futures::future::BoxFuture;
32use futures::future::try_join;
33use futures::stream::BoxStream;
34use itertools::EitherOrBoth;
35use itertools::Itertools as _;
36use pollster::FutureExt as _;
37
38use crate::backend::BackendResult;
39use crate::backend::TreeId;
40use crate::backend::TreeValue;
41use crate::copies::CopiesTreeDiffEntry;
42use crate::copies::CopiesTreeDiffStream;
43use crate::copies::CopyRecords;
44use crate::matchers::EverythingMatcher;
45use crate::matchers::Matcher;
46use crate::merge::Diff;
47use crate::merge::Merge;
48use crate::merge::MergeBuilder;
49use crate::merge::MergedTreeVal;
50use crate::merge::MergedTreeValue;
51use crate::repo_path::RepoPath;
52use crate::repo_path::RepoPathBuf;
53use crate::repo_path::RepoPathComponent;
54use crate::store::Store;
55use crate::tree::Tree;
56use crate::tree_builder::TreeBuilder;
57use crate::tree_merge::merge_trees;
58
59/// Presents a view of a merged set of trees at the root directory. In the
60/// future, this may store additional metadata like conflict labels, so tree IDs
61/// should be compared instead when checking for file changes.
62#[derive(Clone)]
63pub struct MergedTree {
64    store: Arc<Store>,
65    tree_ids: Merge<TreeId>,
66}
67
68impl fmt::Debug for MergedTree {
69    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70        f.debug_struct("MergedTree")
71            .field("tree_ids", &self.tree_ids)
72            .finish_non_exhaustive()
73    }
74}
75
76impl MergedTree {
77    /// Creates a `MergedTree` with the given resolved tree ID.
78    pub fn resolved(store: Arc<Store>, tree_id: TreeId) -> Self {
79        Self::new(store, Merge::resolved(tree_id))
80    }
81
82    /// Creates a `MergedTree` with the given tree IDs.
83    pub fn new(store: Arc<Store>, tree_ids: Merge<TreeId>) -> Self {
84        Self { store, tree_ids }
85    }
86
87    /// The `Store` associated with this tree.
88    pub fn store(&self) -> &Arc<Store> {
89        &self.store
90    }
91
92    /// The underlying tree IDs for this `MergedTree`. If there are file changes
93    /// between two trees, then the tree IDs will be different.
94    pub fn tree_ids(&self) -> &Merge<TreeId> {
95        &self.tree_ids
96    }
97
98    /// Extracts the underlying tree IDs for this `MergedTree`.
99    pub fn into_tree_ids(self) -> Merge<TreeId> {
100        self.tree_ids
101    }
102
103    /// Reads the merge of tree objects represented by this `MergedTree`.
104    pub fn trees(&self) -> BackendResult<Merge<Tree>> {
105        self.trees_async().block_on()
106    }
107
108    /// Async version of `trees()`.
109    pub async fn trees_async(&self) -> BackendResult<Merge<Tree>> {
110        self.tree_ids
111            .try_map_async(|id| self.store.get_tree_async(RepoPathBuf::root(), id))
112            .await
113    }
114
115    /// Tries to resolve any conflicts, resolving any conflicts that can be
116    /// automatically resolved and leaving the rest unresolved.
117    pub async fn resolve(self) -> BackendResult<Self> {
118        let merged = merge_trees(&self.store, self.tree_ids).await?;
119        // If the result can be resolved, then `merge_trees()` above would have returned
120        // a resolved merge. However, that function will always preserve the arity of
121        // conflicts it cannot resolve. So we simplify the conflict again
122        // here to possibly reduce a complex conflict to a simpler one.
123        let simplified = merged.simplify();
124        // If debug assertions are enabled, check that the merge was idempotent. In
125        // particular,  that this last simplification doesn't enable further automatic
126        // resolutions
127        if cfg!(debug_assertions) {
128            let re_merged = merge_trees(&self.store, simplified.clone()).await.unwrap();
129            debug_assert_eq!(re_merged, simplified);
130        }
131        Ok(Self::new(self.store, simplified))
132    }
133
134    /// An iterator over the conflicts in this tree, including subtrees.
135    /// Recurses into subtrees and yields conflicts in those, but only if
136    /// all sides are trees, so tree/file conflicts will be reported as a single
137    /// conflict, not one for each path in the tree.
138    // TODO: Restrict this by a matcher (or add a separate method for that).
139    pub fn conflicts(
140        &self,
141    ) -> impl Iterator<Item = (RepoPathBuf, BackendResult<MergedTreeValue>)> + use<> {
142        ConflictIterator::new(self)
143    }
144
145    /// Whether this tree has conflicts.
146    pub fn has_conflict(&self) -> bool {
147        !self.tree_ids.is_resolved()
148    }
149
150    /// The value at the given path. The value can be `Resolved` even if
151    /// `self` is a `Conflict`, which happens if the value at the path can be
152    /// trivially merged.
153    pub fn path_value(&self, path: &RepoPath) -> BackendResult<MergedTreeValue> {
154        self.path_value_async(path).block_on()
155    }
156
157    /// Async version of `path_value()`.
158    pub async fn path_value_async(&self, path: &RepoPath) -> BackendResult<MergedTreeValue> {
159        match path.split() {
160            Some((dir, basename)) => {
161                let trees = self.trees_async().await?;
162                match trees.sub_tree_recursive(dir).await? {
163                    None => Ok(Merge::absent()),
164                    Some(tree) => Ok(tree.value(basename).cloned()),
165                }
166            }
167            None => Ok(self.to_merged_tree_value()),
168        }
169    }
170
171    fn to_merged_tree_value(&self) -> MergedTreeValue {
172        self.tree_ids
173            .map(|tree_id| Some(TreeValue::Tree(tree_id.clone())))
174    }
175
176    /// Iterator over the entries matching the given matcher. Subtrees are
177    /// visited recursively. Subtrees that differ between the current
178    /// `MergedTree`'s terms are merged on the fly. Missing terms are treated as
179    /// empty directories. Subtrees that conflict with non-trees are not
180    /// visited. For example, if current tree is a merge of 3 trees, and the
181    /// entry for 'foo' is a conflict between a change subtree and a symlink
182    /// (i.e. the subdirectory was replaced by symlink in one side of the
183    /// conflict), then the entry for `foo` itself will be emitted, but no
184    /// entries from inside `foo/` from either of the trees will be.
185    pub fn entries(&self) -> TreeEntriesIterator<'static> {
186        self.entries_matching(&EverythingMatcher)
187    }
188
189    /// Like `entries()` but restricted by a matcher.
190    pub fn entries_matching<'matcher>(
191        &self,
192        matcher: &'matcher dyn Matcher,
193    ) -> TreeEntriesIterator<'matcher> {
194        TreeEntriesIterator::new(self, matcher)
195    }
196
197    /// Stream of the differences between this tree and another tree.
198    ///
199    /// Tree entries (`MergedTreeValue::is_tree()`) are included only if the
200    /// other side is present and not a tree.
201    fn diff_stream_internal<'matcher>(
202        &self,
203        other: &Self,
204        matcher: &'matcher dyn Matcher,
205    ) -> TreeDiffStream<'matcher> {
206        let concurrency = self.store().concurrency();
207        if concurrency <= 1 {
208            Box::pin(futures::stream::iter(TreeDiffIterator::new(
209                self, other, matcher,
210            )))
211        } else {
212            Box::pin(TreeDiffStreamImpl::new(self, other, matcher, concurrency))
213        }
214    }
215
216    /// Stream of the differences between this tree and another tree.
217    pub fn diff_stream<'matcher>(
218        &self,
219        other: &Self,
220        matcher: &'matcher dyn Matcher,
221    ) -> TreeDiffStream<'matcher> {
222        stream_without_trees(self.diff_stream_internal(other, matcher))
223    }
224
225    /// Like `diff_stream()` but files in a removed tree will be returned before
226    /// a file that replaces it.
227    pub fn diff_stream_for_file_system<'matcher>(
228        &self,
229        other: &Self,
230        matcher: &'matcher dyn Matcher,
231    ) -> TreeDiffStream<'matcher> {
232        Box::pin(DiffStreamForFileSystem::new(
233            self.diff_stream_internal(other, matcher),
234        ))
235    }
236
237    /// Like `diff_stream()` but takes the given copy records into account.
238    pub fn diff_stream_with_copies<'a>(
239        &self,
240        other: &Self,
241        matcher: &'a dyn Matcher,
242        copy_records: &'a CopyRecords,
243    ) -> BoxStream<'a, CopiesTreeDiffEntry> {
244        let stream = self.diff_stream(other, matcher);
245        Box::pin(CopiesTreeDiffStream::new(
246            stream,
247            self.clone(),
248            other.clone(),
249            copy_records,
250        ))
251    }
252
253    /// Merges this tree with `other`, using `base` as base. Any conflicts will
254    /// be resolved recursively if possible.
255    pub async fn merge(self, base: Self, other: Self) -> BackendResult<Self> {
256        self.merge_no_resolve(base, other).resolve().await
257    }
258
259    /// Merges this tree with `other`, using `base` as base, without attempting
260    /// to resolve file conflicts.
261    pub fn merge_no_resolve(self, base: Self, other: Self) -> Self {
262        debug_assert!(Arc::ptr_eq(&base.store, &self.store));
263        debug_assert!(Arc::ptr_eq(&other.store, &self.store));
264        let nested = Merge::from_vec(vec![self.tree_ids, base.tree_ids, other.tree_ids]);
265        Self::new(self.store, nested.flatten().simplify())
266    }
267}
268
269/// A single entry in a tree diff.
270#[derive(Debug)]
271pub struct TreeDiffEntry {
272    /// The path.
273    pub path: RepoPathBuf,
274    /// The resolved tree values if available.
275    pub values: BackendResult<Diff<MergedTreeValue>>,
276}
277
278/// Type alias for the result from `MergedTree::diff_stream()`. We use a
279/// `Stream` instead of an `Iterator` so high-latency backends (e.g. cloud-based
280/// ones) can fetch trees asynchronously.
281pub type TreeDiffStream<'matcher> = BoxStream<'matcher, TreeDiffEntry>;
282
283fn all_tree_entries(
284    trees: &Merge<Tree>,
285) -> impl Iterator<Item = (&RepoPathComponent, MergedTreeVal<'_>)> {
286    if let Some(tree) = trees.as_resolved() {
287        let iter = tree
288            .entries_non_recursive()
289            .map(|entry| (entry.name(), Merge::normal(entry.value())));
290        Either::Left(iter)
291    } else {
292        let same_change = trees.first().store().merge_options().same_change;
293        let iter = all_merged_tree_entries(trees).map(move |(name, values)| {
294            // TODO: move resolve_trivial() to caller?
295            let values = match values.resolve_trivial(same_change) {
296                Some(resolved) => Merge::resolved(*resolved),
297                None => values,
298            };
299            (name, values)
300        });
301        Either::Right(iter)
302    }
303}
304
305/// Suppose the given `trees` aren't resolved, iterates `(name, values)` pairs
306/// non-recursively. This also works if `trees` are resolved, but is more costly
307/// than `tree.entries_non_recursive()`.
308pub fn all_merged_tree_entries(
309    trees: &Merge<Tree>,
310) -> impl Iterator<Item = (&RepoPathComponent, MergedTreeVal<'_>)> {
311    let mut entries_iters = trees
312        .iter()
313        .map(|tree| tree.entries_non_recursive().peekable())
314        .collect_vec();
315    iter::from_fn(move || {
316        let next_name = entries_iters
317            .iter_mut()
318            .filter_map(|iter| iter.peek())
319            .map(|entry| entry.name())
320            .min()?;
321        let values: MergeBuilder<_> = entries_iters
322            .iter_mut()
323            .map(|iter| {
324                let entry = iter.next_if(|entry| entry.name() == next_name)?;
325                Some(entry.value())
326            })
327            .collect();
328        Some((next_name, values.build()))
329    })
330}
331
332fn merged_tree_entry_diff<'a>(
333    trees1: &'a Merge<Tree>,
334    trees2: &'a Merge<Tree>,
335) -> impl Iterator<Item = (&'a RepoPathComponent, Diff<MergedTreeVal<'a>>)> {
336    itertools::merge_join_by(
337        all_tree_entries(trees1),
338        all_tree_entries(trees2),
339        |(name1, _), (name2, _)| name1.cmp(name2),
340    )
341    .map(|entry| match entry {
342        EitherOrBoth::Both((name, value1), (_, value2)) => (name, Diff::new(value1, value2)),
343        EitherOrBoth::Left((name, value1)) => (name, Diff::new(value1, Merge::absent())),
344        EitherOrBoth::Right((name, value2)) => (name, Diff::new(Merge::absent(), value2)),
345    })
346    .filter(|(_, diff)| diff.is_changed())
347}
348
349/// Recursive iterator over the entries in a tree.
350pub struct TreeEntriesIterator<'matcher> {
351    store: Arc<Store>,
352    stack: Vec<TreeEntriesDirItem>,
353    matcher: &'matcher dyn Matcher,
354}
355
356struct TreeEntriesDirItem {
357    entries: Vec<(RepoPathBuf, MergedTreeValue)>,
358}
359
360impl TreeEntriesDirItem {
361    fn new(trees: &Merge<Tree>, matcher: &dyn Matcher) -> Self {
362        let mut entries = vec![];
363        let dir = trees.first().dir();
364        for (name, value) in all_tree_entries(trees) {
365            let path = dir.join(name);
366            if value.is_tree() {
367                // TODO: Handle the other cases (specific files and trees)
368                if matcher.visit(&path).is_nothing() {
369                    continue;
370                }
371            } else if !matcher.matches(&path) {
372                continue;
373            }
374            entries.push((path, value.cloned()));
375        }
376        entries.reverse();
377        Self { entries }
378    }
379}
380
381impl<'matcher> TreeEntriesIterator<'matcher> {
382    fn new(trees: &MergedTree, matcher: &'matcher dyn Matcher) -> Self {
383        Self {
384            store: trees.store.clone(),
385            stack: vec![TreeEntriesDirItem {
386                entries: vec![(RepoPathBuf::root(), trees.to_merged_tree_value())],
387            }],
388            matcher,
389        }
390    }
391}
392
393impl Iterator for TreeEntriesIterator<'_> {
394    type Item = (RepoPathBuf, BackendResult<MergedTreeValue>);
395
396    fn next(&mut self) -> Option<Self::Item> {
397        while let Some(top) = self.stack.last_mut() {
398            if let Some((path, value)) = top.entries.pop() {
399                let maybe_trees = match value.to_tree_merge(&self.store, &path).block_on() {
400                    Ok(maybe_trees) => maybe_trees,
401                    Err(err) => return Some((path, Err(err))),
402                };
403                if let Some(trees) = maybe_trees {
404                    self.stack
405                        .push(TreeEntriesDirItem::new(&trees, self.matcher));
406                } else {
407                    return Some((path, Ok(value)));
408                }
409            } else {
410                self.stack.pop();
411            }
412        }
413        None
414    }
415}
416
417/// The state for the non-recursive iteration over the conflicted entries in a
418/// single directory.
419struct ConflictsDirItem {
420    entries: Vec<(RepoPathBuf, MergedTreeValue)>,
421}
422
423impl From<&Merge<Tree>> for ConflictsDirItem {
424    fn from(trees: &Merge<Tree>) -> Self {
425        let dir = trees.first().dir();
426        if trees.is_resolved() {
427            return Self { entries: vec![] };
428        }
429
430        let mut entries = vec![];
431        for (basename, value) in all_tree_entries(trees) {
432            if !value.is_resolved() {
433                entries.push((dir.join(basename), value.cloned()));
434            }
435        }
436        entries.reverse();
437        Self { entries }
438    }
439}
440
441struct ConflictIterator {
442    store: Arc<Store>,
443    stack: Vec<ConflictsDirItem>,
444}
445
446impl ConflictIterator {
447    fn new(tree: &MergedTree) -> Self {
448        Self {
449            store: tree.store().clone(),
450            stack: vec![ConflictsDirItem {
451                entries: vec![(RepoPathBuf::root(), tree.to_merged_tree_value())],
452            }],
453        }
454    }
455}
456
457impl Iterator for ConflictIterator {
458    type Item = (RepoPathBuf, BackendResult<MergedTreeValue>);
459
460    fn next(&mut self) -> Option<Self::Item> {
461        while let Some(top) = self.stack.last_mut() {
462            if let Some((path, tree_values)) = top.entries.pop() {
463                match tree_values.to_tree_merge(&self.store, &path).block_on() {
464                    Ok(Some(trees)) => {
465                        // If all sides are trees or missing, descend into the merged tree
466                        self.stack.push(ConflictsDirItem::from(&trees));
467                    }
468                    Ok(None) => {
469                        // Otherwise this is a conflict between files, trees, etc. If they could
470                        // be automatically resolved, they should have been when the top-level
471                        // tree conflict was written, so we assume that they can't be.
472                        return Some((path, Ok(tree_values)));
473                    }
474                    Err(err) => {
475                        return Some((path, Err(err)));
476                    }
477                }
478            } else {
479                self.stack.pop();
480            }
481        }
482        None
483    }
484}
485
486/// Iterator over the differences between two trees.
487///
488/// Tree entries (`MergedTreeValue::is_tree()`) are included only if the other
489/// side is present and not a tree.
490pub struct TreeDiffIterator<'matcher> {
491    store: Arc<Store>,
492    stack: Vec<TreeDiffDir>,
493    matcher: &'matcher dyn Matcher,
494}
495
496struct TreeDiffDir {
497    entries: Vec<(RepoPathBuf, Diff<MergedTreeValue>)>,
498}
499
500impl<'matcher> TreeDiffIterator<'matcher> {
501    /// Creates a iterator over the differences between two trees.
502    pub fn new(tree1: &MergedTree, tree2: &MergedTree, matcher: &'matcher dyn Matcher) -> Self {
503        assert!(Arc::ptr_eq(tree1.store(), tree2.store()));
504        let root_dir = RepoPath::root();
505        let mut stack = Vec::new();
506        if !matcher.visit(root_dir).is_nothing() {
507            stack.push(TreeDiffDir {
508                entries: vec![(
509                    root_dir.to_owned(),
510                    Diff::new(tree1.to_merged_tree_value(), tree2.to_merged_tree_value()),
511                )],
512            });
513        };
514        Self {
515            store: tree1.store().clone(),
516            stack,
517            matcher,
518        }
519    }
520
521    /// Gets the given trees if `values` are trees, otherwise an empty tree.
522    fn trees(
523        store: &Arc<Store>,
524        dir: &RepoPath,
525        values: &MergedTreeValue,
526    ) -> BackendResult<Merge<Tree>> {
527        if let Some(trees) = values.to_tree_merge(store, dir).block_on()? {
528            Ok(trees)
529        } else {
530            Ok(Merge::resolved(Tree::empty(store.clone(), dir.to_owned())))
531        }
532    }
533}
534
535impl TreeDiffDir {
536    fn from_trees(
537        dir: &RepoPath,
538        trees1: &Merge<Tree>,
539        trees2: &Merge<Tree>,
540        matcher: &dyn Matcher,
541    ) -> Self {
542        let mut entries = vec![];
543        for (name, diff) in merged_tree_entry_diff(trees1, trees2) {
544            let path = dir.join(name);
545            let tree_before = diff.before.is_tree();
546            let tree_after = diff.after.is_tree();
547            // Check if trees and files match, but only if either side is a tree or a file
548            // (don't query the matcher unnecessarily).
549            let tree_matches = (tree_before || tree_after) && !matcher.visit(&path).is_nothing();
550            let file_matches = (!tree_before || !tree_after) && matcher.matches(&path);
551
552            // Replace trees or files that don't match by `Merge::absent()`
553            let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
554                diff.before
555            } else {
556                Merge::absent()
557            };
558            let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
559                diff.after
560            } else {
561                Merge::absent()
562            };
563            if before.is_absent() && after.is_absent() {
564                continue;
565            }
566            entries.push((path, Diff::new(before.cloned(), after.cloned())));
567        }
568        entries.reverse();
569        Self { entries }
570    }
571}
572
573impl Iterator for TreeDiffIterator<'_> {
574    type Item = TreeDiffEntry;
575
576    fn next(&mut self) -> Option<Self::Item> {
577        while let Some(top) = self.stack.last_mut() {
578            let (path, diff) = match top.entries.pop() {
579                Some(entry) => entry,
580                None => {
581                    self.stack.pop().unwrap();
582                    continue;
583                }
584            };
585
586            if diff.before.is_tree() || diff.after.is_tree() {
587                let (before_tree, after_tree) = match (
588                    Self::trees(&self.store, &path, &diff.before),
589                    Self::trees(&self.store, &path, &diff.after),
590                ) {
591                    (Ok(before_tree), Ok(after_tree)) => (before_tree, after_tree),
592                    (Err(before_err), _) => {
593                        return Some(TreeDiffEntry {
594                            path,
595                            values: Err(before_err),
596                        });
597                    }
598                    (_, Err(after_err)) => {
599                        return Some(TreeDiffEntry {
600                            path,
601                            values: Err(after_err),
602                        });
603                    }
604                };
605                let subdir =
606                    TreeDiffDir::from_trees(&path, &before_tree, &after_tree, self.matcher);
607                self.stack.push(subdir);
608            };
609            if diff.before.is_file_like() || diff.after.is_file_like() {
610                return Some(TreeDiffEntry {
611                    path,
612                    values: Ok(diff),
613                });
614            }
615        }
616        None
617    }
618}
619
620/// Stream of differences between two trees.
621///
622/// Tree entries (`MergedTreeValue::is_tree()`) are included only if the other
623/// side is present and not a tree.
624pub struct TreeDiffStreamImpl<'matcher> {
625    store: Arc<Store>,
626    matcher: &'matcher dyn Matcher,
627    /// Pairs of tree values that may or may not be ready to emit, sorted in the
628    /// order we want to emit them. If either side is a tree, there will be
629    /// a corresponding entry in `pending_trees`. The item is ready to emit
630    /// unless there's a smaller or equal path in `pending_trees`.
631    items: BTreeMap<RepoPathBuf, BackendResult<Diff<MergedTreeValue>>>,
632    // TODO: Is it better to combine this and `items` into a single map?
633    #[expect(clippy::type_complexity)]
634    pending_trees:
635        BTreeMap<RepoPathBuf, BoxFuture<'matcher, BackendResult<(Merge<Tree>, Merge<Tree>)>>>,
636    /// The maximum number of trees to request concurrently. However, we do the
637    /// accounting per path, so there will often be twice as many pending
638    /// `Backend::read_tree()` calls - for the "before" and "after" sides. For
639    /// conflicts, there will be even more.
640    max_concurrent_reads: usize,
641    /// The maximum number of items in `items`. However, we will always add the
642    /// full differences from a particular pair of trees, so it may temporarily
643    /// go over the limit (until we emit those items). It may also go over the
644    /// limit because we have a file item that's blocked by pending subdirectory
645    /// items.
646    max_queued_items: usize,
647}
648
649impl<'matcher> TreeDiffStreamImpl<'matcher> {
650    /// Creates a iterator over the differences between two trees. Generally
651    /// prefer `MergedTree::diff_stream()` of calling this directly.
652    pub fn new(
653        tree1: &MergedTree,
654        tree2: &MergedTree,
655        matcher: &'matcher dyn Matcher,
656        max_concurrent_reads: usize,
657    ) -> Self {
658        assert!(Arc::ptr_eq(tree1.store(), tree2.store()));
659        let store = tree1.store().clone();
660        let mut stream = Self {
661            store: store.clone(),
662            matcher,
663            items: BTreeMap::new(),
664            pending_trees: BTreeMap::new(),
665            max_concurrent_reads,
666            max_queued_items: 10000,
667        };
668        let dir = RepoPathBuf::root();
669        let root_tree_fut = Box::pin(try_join(
670            Self::trees(store.clone(), dir.clone(), tree1.to_merged_tree_value()),
671            Self::trees(store, dir.clone(), tree2.to_merged_tree_value()),
672        ));
673        stream.pending_trees.insert(dir, root_tree_fut);
674        stream
675    }
676
677    async fn single_tree(
678        store: &Arc<Store>,
679        dir: RepoPathBuf,
680        value: Option<&TreeValue>,
681    ) -> BackendResult<Tree> {
682        match value {
683            Some(TreeValue::Tree(tree_id)) => store.get_tree_async(dir, tree_id).await,
684            _ => Ok(Tree::empty(store.clone(), dir.clone())),
685        }
686    }
687
688    /// Gets the given trees if `values` are trees, otherwise an empty tree.
689    async fn trees(
690        store: Arc<Store>,
691        dir: RepoPathBuf,
692        values: MergedTreeValue,
693    ) -> BackendResult<Merge<Tree>> {
694        if values.is_tree() {
695            values
696                .try_map_async(|value| Self::single_tree(&store, dir.clone(), value.as_ref()))
697                .await
698        } else {
699            Ok(Merge::resolved(Tree::empty(store, dir)))
700        }
701    }
702
703    fn add_dir_diff_items(&mut self, dir: &RepoPath, trees1: &Merge<Tree>, trees2: &Merge<Tree>) {
704        for (basename, diff) in merged_tree_entry_diff(trees1, trees2) {
705            let path = dir.join(basename);
706            let tree_before = diff.before.is_tree();
707            let tree_after = diff.after.is_tree();
708            // Check if trees and files match, but only if either side is a tree or a file
709            // (don't query the matcher unnecessarily).
710            let tree_matches =
711                (tree_before || tree_after) && !self.matcher.visit(&path).is_nothing();
712            let file_matches = (!tree_before || !tree_after) && self.matcher.matches(&path);
713
714            // Replace trees or files that don't match by `Merge::absent()`
715            let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
716                diff.before
717            } else {
718                Merge::absent()
719            };
720            let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
721                diff.after
722            } else {
723                Merge::absent()
724            };
725            if before.is_absent() && after.is_absent() {
726                continue;
727            }
728
729            // If the path was a tree on either side of the diff, read those trees.
730            if tree_matches {
731                let before_tree_future =
732                    Self::trees(self.store.clone(), path.clone(), before.cloned());
733                let after_tree_future =
734                    Self::trees(self.store.clone(), path.clone(), after.cloned());
735                let both_trees_future = try_join(before_tree_future, after_tree_future);
736                self.pending_trees
737                    .insert(path.clone(), Box::pin(both_trees_future));
738            }
739
740            if before.is_file_like() || after.is_file_like() {
741                self.items
742                    .insert(path, Ok(Diff::new(before.cloned(), after.cloned())));
743            }
744        }
745    }
746
747    fn poll_tree_futures(&mut self, cx: &mut Context<'_>) {
748        loop {
749            let mut tree_diffs = vec![];
750            let mut some_pending = false;
751            let mut all_pending = true;
752            for (dir, future) in self
753                .pending_trees
754                .iter_mut()
755                .take(self.max_concurrent_reads)
756            {
757                if let Poll::Ready(tree_diff) = future.as_mut().poll(cx) {
758                    all_pending = false;
759                    tree_diffs.push((dir.clone(), tree_diff));
760                } else {
761                    some_pending = true;
762                }
763            }
764
765            for (dir, tree_diff) in tree_diffs {
766                let _ = self.pending_trees.remove_entry(&dir).unwrap();
767                match tree_diff {
768                    Ok((trees1, trees2)) => {
769                        self.add_dir_diff_items(&dir, &trees1, &trees2);
770                    }
771                    Err(err) => {
772                        self.items.insert(dir, Err(err));
773                    }
774                }
775            }
776
777            // If none of the futures have been polled and returned `Poll::Pending`, we must
778            // not return. If we did, nothing would call the waker so we might never get
779            // polled again.
780            if all_pending || (some_pending && self.items.len() >= self.max_queued_items) {
781                return;
782            }
783        }
784    }
785}
786
787impl Stream for TreeDiffStreamImpl<'_> {
788    type Item = TreeDiffEntry;
789
790    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
791        // Go through all pending tree futures and poll them.
792        self.poll_tree_futures(cx);
793
794        // Now emit the first file, or the first tree that completed with an error
795        if let Some((path, _)) = self.items.first_key_value() {
796            // Check if there are any pending trees before this item that we need to finish
797            // polling before we can emit this item.
798            if let Some((dir, _)) = self.pending_trees.first_key_value()
799                && dir < path
800            {
801                return Poll::Pending;
802            }
803
804            let (path, values) = self.items.pop_first().unwrap();
805            Poll::Ready(Some(TreeDiffEntry { path, values }))
806        } else if self.pending_trees.is_empty() {
807            Poll::Ready(None)
808        } else {
809            Poll::Pending
810        }
811    }
812}
813
814fn stream_without_trees(stream: TreeDiffStream) -> TreeDiffStream {
815    Box::pin(stream.map(|mut entry| {
816        let skip_tree = |merge: MergedTreeValue| {
817            if merge.is_tree() {
818                Merge::absent()
819            } else {
820                merge
821            }
822        };
823        entry.values = entry.values.map(|diff| diff.map(skip_tree));
824        entry
825    }))
826}
827
828/// Adapts a `TreeDiffStream` to emit a added file at a given path after a
829/// removed directory at the same path.
830struct DiffStreamForFileSystem<'a> {
831    inner: TreeDiffStream<'a>,
832    next_item: Option<TreeDiffEntry>,
833    held_file: Option<TreeDiffEntry>,
834}
835
836impl<'a> DiffStreamForFileSystem<'a> {
837    fn new(inner: TreeDiffStream<'a>) -> Self {
838        Self {
839            inner,
840            next_item: None,
841            held_file: None,
842        }
843    }
844}
845
846impl Stream for DiffStreamForFileSystem<'_> {
847    type Item = TreeDiffEntry;
848
849    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
850        while let Some(next) = match self.next_item.take() {
851            Some(next) => Some(next),
852            None => ready!(self.inner.as_mut().poll_next(cx)),
853        } {
854            // If there's a held file "foo" and the next item to emit is not "foo/...", then
855            // we must be done with the "foo/" directory and it's time to emit "foo" as a
856            // removed file.
857            if let Some(held_entry) = self
858                .held_file
859                .take_if(|held_entry| !next.path.starts_with(&held_entry.path))
860            {
861                self.next_item = Some(next);
862                return Poll::Ready(Some(held_entry));
863            }
864
865            match next.values {
866                Ok(diff) if diff.before.is_tree() => {
867                    assert!(diff.after.is_present());
868                    assert!(self.held_file.is_none());
869                    self.held_file = Some(TreeDiffEntry {
870                        path: next.path,
871                        values: Ok(Diff::new(Merge::absent(), diff.after)),
872                    });
873                }
874                Ok(diff) if diff.after.is_tree() => {
875                    assert!(diff.before.is_present());
876                    return Poll::Ready(Some(TreeDiffEntry {
877                        path: next.path,
878                        values: Ok(Diff::new(diff.before, Merge::absent())),
879                    }));
880                }
881                _ => {
882                    return Poll::Ready(Some(next));
883                }
884            }
885        }
886        Poll::Ready(self.held_file.take())
887    }
888}
889
890/// Helper for writing trees with conflicts.
891///
892/// You start by creating an instance of this type with one or more
893/// base trees. You then add overrides on top. The overrides may be
894/// conflicts. Then you can write the result as a merge of trees.
895#[derive(Debug)]
896pub struct MergedTreeBuilder {
897    base_tree: MergedTree,
898    overrides: BTreeMap<RepoPathBuf, MergedTreeValue>,
899}
900
901impl MergedTreeBuilder {
902    /// Create a new builder with the given trees as base.
903    pub fn new(base_tree: MergedTree) -> Self {
904        Self {
905            base_tree,
906            overrides: BTreeMap::new(),
907        }
908    }
909
910    /// Set an override compared to  the base tree. The `values` merge must
911    /// either be resolved (i.e. have 1 side) or have the same number of
912    /// sides as the `base_tree_ids` used to construct this builder. Use
913    /// `Merge::absent()` to remove a value from the tree.
914    pub fn set_or_remove(&mut self, path: RepoPathBuf, values: MergedTreeValue) {
915        self.overrides.insert(path, values);
916    }
917
918    /// Create new tree(s) from the base tree(s) and overrides.
919    pub fn write_tree(self) -> BackendResult<MergedTree> {
920        let store = self.base_tree.store.clone();
921        let new_tree_ids = self.write_merged_trees()?;
922        match new_tree_ids.simplify().into_resolved() {
923            Ok(single_tree_id) => Ok(MergedTree::resolved(store, single_tree_id)),
924            Err(tree_ids) => {
925                let tree = MergedTree::new(store, tree_ids);
926                tree.resolve().block_on()
927            }
928        }
929    }
930
931    fn write_merged_trees(self) -> BackendResult<Merge<TreeId>> {
932        let store = self.base_tree.store;
933        let mut base_tree_ids = self.base_tree.tree_ids;
934        let num_sides = self
935            .overrides
936            .values()
937            .map(|value| value.num_sides())
938            .max()
939            .unwrap_or(0);
940        base_tree_ids.pad_to(num_sides, store.empty_tree_id());
941        // Create a single-tree builder for each base tree
942        let mut tree_builders =
943            base_tree_ids.map(|base_tree_id| TreeBuilder::new(store.clone(), base_tree_id.clone()));
944        for (path, values) in self.overrides {
945            match values.into_resolved() {
946                Ok(value) => {
947                    // This path was overridden with a resolved value. Apply that to all
948                    // builders.
949                    for builder in tree_builders.iter_mut() {
950                        builder.set_or_remove(path.clone(), value.clone());
951                    }
952                }
953                Err(mut values) => {
954                    values.pad_to(num_sides, &None);
955                    // This path was overridden with a conflicted value. Apply each term to
956                    // its corresponding builder.
957                    for (builder, value) in zip(tree_builders.iter_mut(), values) {
958                        builder.set_or_remove(path.clone(), value);
959                    }
960                }
961            }
962        }
963        // TODO: This can be made more efficient. If there's a single resolved conflict
964        // in `dir/file`, we shouldn't have to write the `dir/` and root trees more than
965        // once.
966        let merge_builder: MergeBuilder<TreeId> = tree_builders
967            .into_iter()
968            .map(|builder| builder.write_tree())
969            .try_collect()?;
970        Ok(merge_builder.build())
971    }
972}