jj_lib/
merged_tree.rs

1// Copyright 2023 The Jujutsu Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A lazily merged view of a set of trees.
16
17use std::collections::BTreeMap;
18use std::iter;
19use std::iter::zip;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::Context;
23use std::task::Poll;
24use std::task::ready;
25use std::vec;
26
27use either::Either;
28use futures::Stream;
29use futures::StreamExt as _;
30use futures::future::BoxFuture;
31use futures::future::try_join;
32use futures::stream::BoxStream;
33use itertools::EitherOrBoth;
34use itertools::Itertools as _;
35use pollster::FutureExt as _;
36
37use crate::backend::BackendResult;
38use crate::backend::MergedTreeId;
39use crate::backend::TreeId;
40use crate::backend::TreeValue;
41use crate::copies::CopiesTreeDiffEntry;
42use crate::copies::CopiesTreeDiffStream;
43use crate::copies::CopyRecords;
44use crate::matchers::EverythingMatcher;
45use crate::matchers::Matcher;
46use crate::merge::Diff;
47use crate::merge::Merge;
48use crate::merge::MergeBuilder;
49use crate::merge::MergedTreeVal;
50use crate::merge::MergedTreeValue;
51use crate::repo_path::RepoPath;
52use crate::repo_path::RepoPathBuf;
53use crate::repo_path::RepoPathComponent;
54use crate::store::Store;
55use crate::tree::Tree;
56use crate::tree_builder::TreeBuilder;
57use crate::tree_merge::merge_trees;
58
59/// Presents a view of a merged set of trees.
60#[derive(PartialEq, Eq, Clone, Debug)]
61pub struct MergedTree {
62    trees: Merge<Tree>,
63}
64
65impl MergedTree {
66    /// Creates a new `MergedTree` representing a single tree without conflicts.
67    pub fn resolved(tree: Tree) -> Self {
68        Self::new(Merge::resolved(tree))
69    }
70
71    /// Creates a new `MergedTree` representing a merge of a set of trees. The
72    /// individual trees must not have any conflicts.
73    pub fn new(trees: Merge<Tree>) -> Self {
74        debug_assert!(trees.iter().map(|tree| tree.dir()).all_equal());
75        debug_assert!(
76            trees
77                .iter()
78                .map(|tree| Arc::as_ptr(tree.store()))
79                .all_equal()
80        );
81        Self { trees }
82    }
83
84    /// Returns the underlying `Merge<Tree>`.
85    pub fn as_merge(&self) -> &Merge<Tree> {
86        &self.trees
87    }
88
89    /// Extracts the underlying `Merge<Tree>`.
90    pub fn take(self) -> Merge<Tree> {
91        self.trees
92    }
93
94    /// This tree's directory
95    pub fn dir(&self) -> &RepoPath {
96        self.trees.first().dir()
97    }
98
99    /// The `Store` associated with this tree.
100    pub fn store(&self) -> &Arc<Store> {
101        self.trees.first().store()
102    }
103
104    /// Base names of entries in this directory.
105    pub fn names<'a>(&'a self) -> Box<dyn Iterator<Item = &'a RepoPathComponent> + 'a> {
106        Box::new(all_tree_basenames(&self.trees))
107    }
108
109    /// The value at the given basename. The value can be `Resolved` even if
110    /// `self` is a `Merge`, which happens if the value at the path can be
111    /// trivially merged. Does not recurse, so if `basename` refers to a Tree,
112    /// then a `TreeValue::Tree` will be returned.
113    pub fn value(&self, basename: &RepoPathComponent) -> MergedTreeVal<'_> {
114        trees_value(&self.trees, basename)
115    }
116
117    /// Tries to resolve any conflicts, resolving any conflicts that can be
118    /// automatically resolved and leaving the rest unresolved.
119    pub async fn resolve(self) -> BackendResult<Self> {
120        let merged = merge_trees(self.trees).await?;
121        // If the result can be resolved, then `merge_trees()` above would have returned
122        // a resolved merge. However, that function will always preserve the arity of
123        // conflicts it cannot resolve. So we simplify the conflict again
124        // here to possibly reduce a complex conflict to a simpler one.
125        let simplified = merged.simplify();
126        // If debug assertions are enabled, check that the merge was idempotent. In
127        // particular,  that this last simplification doesn't enable further automatic
128        // resolutions
129        if cfg!(debug_assertions) {
130            let re_merged = merge_trees(simplified.clone()).await.unwrap();
131            debug_assert_eq!(re_merged, simplified);
132        }
133        Ok(Self { trees: simplified })
134    }
135
136    /// An iterator over the conflicts in this tree, including subtrees.
137    /// Recurses into subtrees and yields conflicts in those, but only if
138    /// all sides are trees, so tree/file conflicts will be reported as a single
139    /// conflict, not one for each path in the tree.
140    // TODO: Restrict this by a matcher (or add a separate method for that).
141    pub fn conflicts(
142        &self,
143    ) -> impl Iterator<Item = (RepoPathBuf, BackendResult<MergedTreeValue>)> + use<> {
144        ConflictIterator::new(self)
145    }
146
147    /// Whether this tree has conflicts.
148    pub fn has_conflict(&self) -> bool {
149        !self.trees.is_resolved()
150    }
151
152    /// Gets the `MergeTree` in a subdirectory of the current tree. If the path
153    /// doesn't correspond to a tree in any of the inputs to the merge, then
154    /// that entry will be replace by an empty tree in the result.
155    pub async fn sub_tree(&self, name: &RepoPathComponent) -> BackendResult<Option<Self>> {
156        match self.value(name).into_resolved() {
157            Ok(Some(TreeValue::Tree(sub_tree_id))) => {
158                let subdir = self.dir().join(name);
159                Ok(Some(Self::resolved(
160                    self.store().get_tree_async(subdir, sub_tree_id).await?,
161                )))
162            }
163            Ok(_) => Ok(None),
164            Err(merge) => {
165                if !merge.is_tree() {
166                    return Ok(None);
167                }
168                let trees = merge
169                    .try_map_async(async |value| match value {
170                        Some(TreeValue::Tree(sub_tree_id)) => {
171                            let subdir = self.dir().join(name);
172                            self.store().get_tree_async(subdir, sub_tree_id).await
173                        }
174                        Some(_) => unreachable!(),
175                        None => {
176                            let subdir = self.dir().join(name);
177                            Ok(Tree::empty(self.store().clone(), subdir))
178                        }
179                    })
180                    .await?;
181                Ok(Some(Self { trees }))
182            }
183        }
184    }
185
186    /// The value at the given path. The value can be `Resolved` even if
187    /// `self` is a `Conflict`, which happens if the value at the path can be
188    /// trivially merged.
189    pub fn path_value(&self, path: &RepoPath) -> BackendResult<MergedTreeValue> {
190        self.path_value_async(path).block_on()
191    }
192
193    /// Async version of `path_value()`.
194    pub async fn path_value_async(&self, path: &RepoPath) -> BackendResult<MergedTreeValue> {
195        assert_eq!(self.dir(), RepoPath::root());
196        match path.split() {
197            Some((dir, basename)) => match self.sub_tree_recursive(dir).await? {
198                None => Ok(Merge::absent()),
199                Some(tree) => Ok(tree.value(basename).cloned()),
200            },
201            None => Ok(self
202                .trees
203                .map(|tree| Some(TreeValue::Tree(tree.id().clone())))),
204        }
205    }
206
207    /// The tree's id
208    pub fn id(&self) -> MergedTreeId {
209        MergedTreeId::Merge(self.trees.map(|tree| tree.id().clone()))
210    }
211
212    /// Look up the tree at the given path.
213    pub async fn sub_tree_recursive(&self, path: &RepoPath) -> BackendResult<Option<Self>> {
214        let mut current_tree = self.clone();
215        for name in path.components() {
216            match current_tree.sub_tree(name).await? {
217                None => {
218                    return Ok(None);
219                }
220                Some(sub_tree) => {
221                    current_tree = sub_tree;
222                }
223            }
224        }
225        Ok(Some(current_tree))
226    }
227
228    /// Iterator over the entries matching the given matcher. Subtrees are
229    /// visited recursively. Subtrees that differ between the current
230    /// `MergedTree`'s terms are merged on the fly. Missing terms are treated as
231    /// empty directories. Subtrees that conflict with non-trees are not
232    /// visited. For example, if current tree is a merge of 3 trees, and the
233    /// entry for 'foo' is a conflict between a change subtree and a symlink
234    /// (i.e. the subdirectory was replaced by symlink in one side of the
235    /// conflict), then the entry for `foo` itself will be emitted, but no
236    /// entries from inside `foo/` from either of the trees will be.
237    pub fn entries(&self) -> TreeEntriesIterator<'static> {
238        self.entries_matching(&EverythingMatcher)
239    }
240
241    /// Like `entries()` but restricted by a matcher.
242    pub fn entries_matching<'matcher>(
243        &self,
244        matcher: &'matcher dyn Matcher,
245    ) -> TreeEntriesIterator<'matcher> {
246        TreeEntriesIterator::new(&self.trees, matcher)
247    }
248
249    /// Stream of the differences between this tree and another tree.
250    ///
251    /// Tree entries (`MergedTreeValue::is_tree()`) are included only if the
252    /// other side is present and not a tree.
253    fn diff_stream_internal<'matcher>(
254        &self,
255        other: &Self,
256        matcher: &'matcher dyn Matcher,
257    ) -> TreeDiffStream<'matcher> {
258        let concurrency = self.store().concurrency();
259        if concurrency <= 1 {
260            Box::pin(futures::stream::iter(TreeDiffIterator::new(
261                &self.trees,
262                &other.trees,
263                matcher,
264            )))
265        } else {
266            Box::pin(TreeDiffStreamImpl::new(
267                &self.trees,
268                &other.trees,
269                matcher,
270                concurrency,
271            ))
272        }
273    }
274
275    /// Stream of the differences between this tree and another tree.
276    pub fn diff_stream<'matcher>(
277        &self,
278        other: &Self,
279        matcher: &'matcher dyn Matcher,
280    ) -> TreeDiffStream<'matcher> {
281        stream_without_trees(self.diff_stream_internal(other, matcher))
282    }
283
284    /// Like `diff_stream()` but files in a removed tree will be returned before
285    /// a file that replaces it.
286    pub fn diff_stream_for_file_system<'matcher>(
287        &self,
288        other: &Self,
289        matcher: &'matcher dyn Matcher,
290    ) -> TreeDiffStream<'matcher> {
291        Box::pin(DiffStreamForFileSystem::new(
292            self.diff_stream_internal(other, matcher),
293        ))
294    }
295
296    /// Like `diff_stream()` but takes the given copy records into account.
297    pub fn diff_stream_with_copies<'a>(
298        &self,
299        other: &Self,
300        matcher: &'a dyn Matcher,
301        copy_records: &'a CopyRecords,
302    ) -> BoxStream<'a, CopiesTreeDiffEntry> {
303        let stream = self.diff_stream(other, matcher);
304        Box::pin(CopiesTreeDiffStream::new(
305            stream,
306            self.clone(),
307            other.clone(),
308            copy_records,
309        ))
310    }
311
312    /// Merges this tree with `other`, using `base` as base. Any conflicts will
313    /// be resolved recursively if possible.
314    pub async fn merge(self, base: Self, other: Self) -> BackendResult<Self> {
315        self.merge_no_resolve(base, other).resolve().await
316    }
317
318    /// Merges this tree with `other`, using `base` as base, without attempting
319    /// to resolve file conflicts.
320    pub fn merge_no_resolve(self, base: Self, other: Self) -> Self {
321        let nested = Merge::from_vec(vec![self.trees, base.trees, other.trees]);
322        Self {
323            trees: nested.flatten().simplify(),
324        }
325    }
326}
327
328/// A single entry in a tree diff.
329pub struct TreeDiffEntry {
330    /// The path.
331    pub path: RepoPathBuf,
332    /// The resolved tree values if available.
333    pub values: BackendResult<Diff<MergedTreeValue>>,
334}
335
336/// Type alias for the result from `MergedTree::diff_stream()`. We use a
337/// `Stream` instead of an `Iterator` so high-latency backends (e.g. cloud-based
338/// ones) can fetch trees asynchronously.
339pub type TreeDiffStream<'matcher> = BoxStream<'matcher, TreeDiffEntry>;
340
341fn all_tree_basenames(trees: &Merge<Tree>) -> impl Iterator<Item = &RepoPathComponent> {
342    trees
343        .iter()
344        .map(|tree| tree.data().names())
345        .kmerge()
346        .dedup()
347}
348
349fn all_tree_entries(
350    trees: &Merge<Tree>,
351) -> impl Iterator<Item = (&RepoPathComponent, MergedTreeVal<'_>)> {
352    if let Some(tree) = trees.as_resolved() {
353        let iter = tree
354            .entries_non_recursive()
355            .map(|entry| (entry.name(), Merge::normal(entry.value())));
356        Either::Left(iter)
357    } else {
358        let same_change = trees.first().store().merge_options().same_change;
359        let iter = all_merged_tree_entries(trees).map(move |(name, values)| {
360            // TODO: move resolve_trivial() to caller?
361            let values = match values.resolve_trivial(same_change) {
362                Some(resolved) => Merge::resolved(*resolved),
363                None => values,
364            };
365            (name, values)
366        });
367        Either::Right(iter)
368    }
369}
370
371/// Suppose the given `trees` aren't resolved, iterates `(name, values)` pairs
372/// non-recursively. This also works if `trees` are resolved, but is more costly
373/// than `tree.entries_non_recursive()`.
374pub fn all_merged_tree_entries(
375    trees: &Merge<Tree>,
376) -> impl Iterator<Item = (&RepoPathComponent, MergedTreeVal<'_>)> {
377    let mut entries_iters = trees
378        .iter()
379        .map(|tree| tree.entries_non_recursive().peekable())
380        .collect_vec();
381    iter::from_fn(move || {
382        let next_name = entries_iters
383            .iter_mut()
384            .filter_map(|iter| iter.peek())
385            .map(|entry| entry.name())
386            .min()?;
387        let values: MergeBuilder<_> = entries_iters
388            .iter_mut()
389            .map(|iter| {
390                let entry = iter.next_if(|entry| entry.name() == next_name)?;
391                Some(entry.value())
392            })
393            .collect();
394        Some((next_name, values.build()))
395    })
396}
397
398fn merged_tree_entry_diff<'a>(
399    trees1: &'a Merge<Tree>,
400    trees2: &'a Merge<Tree>,
401) -> impl Iterator<Item = (&'a RepoPathComponent, Diff<MergedTreeVal<'a>>)> {
402    itertools::merge_join_by(
403        all_tree_entries(trees1),
404        all_tree_entries(trees2),
405        |(name1, _), (name2, _)| name1.cmp(name2),
406    )
407    .map(|entry| match entry {
408        EitherOrBoth::Both((name, value1), (_, value2)) => (name, Diff::new(value1, value2)),
409        EitherOrBoth::Left((name, value1)) => (name, Diff::new(value1, Merge::absent())),
410        EitherOrBoth::Right((name, value2)) => (name, Diff::new(Merge::absent(), value2)),
411    })
412    .filter(|(_, diff)| diff.is_changed())
413}
414
415fn trees_value<'a>(trees: &'a Merge<Tree>, basename: &RepoPathComponent) -> MergedTreeVal<'a> {
416    if let Some(tree) = trees.as_resolved() {
417        return Merge::resolved(tree.value(basename));
418    }
419    let same_change = trees.first().store().merge_options().same_change;
420    let value = trees.map(|tree| tree.value(basename));
421    if let Some(resolved) = value.resolve_trivial(same_change) {
422        return Merge::resolved(*resolved);
423    }
424    value
425}
426
427/// Recursive iterator over the entries in a tree.
428pub struct TreeEntriesIterator<'matcher> {
429    store: Arc<Store>,
430    stack: Vec<TreeEntriesDirItem>,
431    matcher: &'matcher dyn Matcher,
432}
433
434struct TreeEntriesDirItem {
435    entries: Vec<(RepoPathBuf, MergedTreeValue)>,
436}
437
438impl TreeEntriesDirItem {
439    fn new(trees: &Merge<Tree>, matcher: &dyn Matcher) -> Self {
440        let mut entries = vec![];
441        let dir = trees.first().dir();
442        for (name, value) in all_tree_entries(trees) {
443            let path = dir.join(name);
444            if value.is_tree() {
445                // TODO: Handle the other cases (specific files and trees)
446                if matcher.visit(&path).is_nothing() {
447                    continue;
448                }
449            } else if !matcher.matches(&path) {
450                continue;
451            }
452            entries.push((path, value.cloned()));
453        }
454        entries.reverse();
455        Self { entries }
456    }
457}
458
459impl<'matcher> TreeEntriesIterator<'matcher> {
460    fn new(trees: &Merge<Tree>, matcher: &'matcher dyn Matcher) -> Self {
461        Self {
462            store: trees.first().store().clone(),
463            stack: vec![TreeEntriesDirItem::new(trees, matcher)],
464            matcher,
465        }
466    }
467}
468
469impl Iterator for TreeEntriesIterator<'_> {
470    type Item = (RepoPathBuf, BackendResult<MergedTreeValue>);
471
472    fn next(&mut self) -> Option<Self::Item> {
473        while let Some(top) = self.stack.last_mut() {
474            if let Some((path, value)) = top.entries.pop() {
475                let maybe_trees = match value.to_tree_merge(&self.store, &path).block_on() {
476                    Ok(maybe_trees) => maybe_trees,
477                    Err(err) => return Some((path, Err(err))),
478                };
479                if let Some(trees) = maybe_trees {
480                    self.stack
481                        .push(TreeEntriesDirItem::new(&trees, self.matcher));
482                } else {
483                    return Some((path, Ok(value)));
484                }
485            } else {
486                self.stack.pop();
487            }
488        }
489        None
490    }
491}
492
493/// The state for the non-recursive iteration over the conflicted entries in a
494/// single directory.
495struct ConflictsDirItem {
496    entries: Vec<(RepoPathBuf, MergedTreeValue)>,
497}
498
499impl From<&Merge<Tree>> for ConflictsDirItem {
500    fn from(trees: &Merge<Tree>) -> Self {
501        let dir = trees.first().dir();
502        if trees.is_resolved() {
503            return Self { entries: vec![] };
504        }
505
506        let mut entries = vec![];
507        for (basename, value) in all_tree_entries(trees) {
508            if !value.is_resolved() {
509                entries.push((dir.join(basename), value.cloned()));
510            }
511        }
512        entries.reverse();
513        Self { entries }
514    }
515}
516
517struct ConflictIterator {
518    store: Arc<Store>,
519    stack: Vec<ConflictsDirItem>,
520}
521
522impl ConflictIterator {
523    fn new(tree: &MergedTree) -> Self {
524        Self {
525            store: tree.store().clone(),
526            stack: vec![ConflictsDirItem::from(&tree.trees)],
527        }
528    }
529}
530
531impl Iterator for ConflictIterator {
532    type Item = (RepoPathBuf, BackendResult<MergedTreeValue>);
533
534    fn next(&mut self) -> Option<Self::Item> {
535        while let Some(top) = self.stack.last_mut() {
536            if let Some((path, tree_values)) = top.entries.pop() {
537                match tree_values.to_tree_merge(&self.store, &path).block_on() {
538                    Ok(Some(trees)) => {
539                        // If all sides are trees or missing, descend into the merged tree
540                        self.stack.push(ConflictsDirItem::from(&trees));
541                    }
542                    Ok(None) => {
543                        // Otherwise this is a conflict between files, trees, etc. If they could
544                        // be automatically resolved, they should have been when the top-level
545                        // tree conflict was written, so we assume that they can't be.
546                        return Some((path, Ok(tree_values)));
547                    }
548                    Err(err) => {
549                        return Some((path, Err(err)));
550                    }
551                }
552            } else {
553                self.stack.pop();
554            }
555        }
556        None
557    }
558}
559
560/// Iterator over the differences between two trees.
561///
562/// Tree entries (`MergedTreeValue::is_tree()`) are included only if the other
563/// side is present and not a tree.
564pub struct TreeDiffIterator<'matcher> {
565    store: Arc<Store>,
566    stack: Vec<TreeDiffDir>,
567    matcher: &'matcher dyn Matcher,
568}
569
570struct TreeDiffDir {
571    entries: Vec<(RepoPathBuf, Diff<MergedTreeValue>)>,
572}
573
574impl<'matcher> TreeDiffIterator<'matcher> {
575    /// Creates a iterator over the differences between two trees.
576    pub fn new(trees1: &Merge<Tree>, trees2: &Merge<Tree>, matcher: &'matcher dyn Matcher) -> Self {
577        assert!(Arc::ptr_eq(trees1.first().store(), trees2.first().store()));
578        let root_dir = RepoPath::root();
579        let mut stack = Vec::new();
580        if !matcher.visit(root_dir).is_nothing() {
581            stack.push(TreeDiffDir::from_trees(root_dir, trees1, trees2, matcher));
582        };
583        Self {
584            store: trees1.first().store().clone(),
585            stack,
586            matcher,
587        }
588    }
589
590    /// Gets the given trees if `values` are trees, otherwise an empty tree.
591    fn trees(
592        store: &Arc<Store>,
593        dir: &RepoPath,
594        values: &MergedTreeValue,
595    ) -> BackendResult<Merge<Tree>> {
596        if let Some(trees) = values.to_tree_merge(store, dir).block_on()? {
597            Ok(trees)
598        } else {
599            Ok(Merge::resolved(Tree::empty(store.clone(), dir.to_owned())))
600        }
601    }
602}
603
604impl TreeDiffDir {
605    fn from_trees(
606        dir: &RepoPath,
607        trees1: &Merge<Tree>,
608        trees2: &Merge<Tree>,
609        matcher: &dyn Matcher,
610    ) -> Self {
611        let mut entries = vec![];
612        for (name, diff) in merged_tree_entry_diff(trees1, trees2) {
613            let path = dir.join(name);
614            let tree_before = diff.before.is_tree();
615            let tree_after = diff.after.is_tree();
616            // Check if trees and files match, but only if either side is a tree or a file
617            // (don't query the matcher unnecessarily).
618            let tree_matches = (tree_before || tree_after) && !matcher.visit(&path).is_nothing();
619            let file_matches = (!tree_before || !tree_after) && matcher.matches(&path);
620
621            // Replace trees or files that don't match by `Merge::absent()`
622            let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
623                diff.before
624            } else {
625                Merge::absent()
626            };
627            let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
628                diff.after
629            } else {
630                Merge::absent()
631            };
632            if before.is_absent() && after.is_absent() {
633                continue;
634            }
635            entries.push((path, Diff::new(before.cloned(), after.cloned())));
636        }
637        entries.reverse();
638        Self { entries }
639    }
640}
641
642impl Iterator for TreeDiffIterator<'_> {
643    type Item = TreeDiffEntry;
644
645    fn next(&mut self) -> Option<Self::Item> {
646        while let Some(top) = self.stack.last_mut() {
647            let (path, diff) = match top.entries.pop() {
648                Some(entry) => entry,
649                None => {
650                    self.stack.pop().unwrap();
651                    continue;
652                }
653            };
654
655            if diff.before.is_tree() || diff.after.is_tree() {
656                let (before_tree, after_tree) = match (
657                    Self::trees(&self.store, &path, &diff.before),
658                    Self::trees(&self.store, &path, &diff.after),
659                ) {
660                    (Ok(before_tree), Ok(after_tree)) => (before_tree, after_tree),
661                    (Err(before_err), _) => {
662                        return Some(TreeDiffEntry {
663                            path,
664                            values: Err(before_err),
665                        });
666                    }
667                    (_, Err(after_err)) => {
668                        return Some(TreeDiffEntry {
669                            path,
670                            values: Err(after_err),
671                        });
672                    }
673                };
674                let subdir =
675                    TreeDiffDir::from_trees(&path, &before_tree, &after_tree, self.matcher);
676                self.stack.push(subdir);
677            };
678            if diff.before.is_file_like() || diff.after.is_file_like() {
679                return Some(TreeDiffEntry {
680                    path,
681                    values: Ok(diff),
682                });
683            }
684        }
685        None
686    }
687}
688
689/// Stream of differences between two trees.
690///
691/// Tree entries (`MergedTreeValue::is_tree()`) are included only if the other
692/// side is present and not a tree.
693pub struct TreeDiffStreamImpl<'matcher> {
694    store: Arc<Store>,
695    matcher: &'matcher dyn Matcher,
696    /// Pairs of tree values that may or may not be ready to emit, sorted in the
697    /// order we want to emit them. If either side is a tree, there will be
698    /// a corresponding entry in `pending_trees`. The item is ready to emit
699    /// unless there's a smaller or equal path in `pending_trees`.
700    items: BTreeMap<RepoPathBuf, BackendResult<Diff<MergedTreeValue>>>,
701    // TODO: Is it better to combine this and `items` into a single map?
702    #[expect(clippy::type_complexity)]
703    pending_trees:
704        BTreeMap<RepoPathBuf, BoxFuture<'matcher, BackendResult<(Merge<Tree>, Merge<Tree>)>>>,
705    /// The maximum number of trees to request concurrently. However, we do the
706    /// accounting per path, so there will often be twice as many pending
707    /// `Backend::read_tree()` calls - for the "before" and "after" sides. For
708    /// conflicts, there will be even more.
709    max_concurrent_reads: usize,
710    /// The maximum number of items in `items`. However, we will always add the
711    /// full differences from a particular pair of trees, so it may temporarily
712    /// go over the limit (until we emit those items). It may also go over the
713    /// limit because we have a file item that's blocked by pending subdirectory
714    /// items.
715    max_queued_items: usize,
716}
717
718impl<'matcher> TreeDiffStreamImpl<'matcher> {
719    /// Creates a iterator over the differences between two trees. Generally
720    /// prefer `MergedTree::diff_stream()` of calling this directly.
721    pub fn new(
722        trees1: &Merge<Tree>,
723        trees2: &Merge<Tree>,
724        matcher: &'matcher dyn Matcher,
725        max_concurrent_reads: usize,
726    ) -> Self {
727        assert!(Arc::ptr_eq(trees1.first().store(), trees2.first().store()));
728        let mut stream = Self {
729            store: trees1.first().store().clone(),
730            matcher,
731            items: BTreeMap::new(),
732            pending_trees: BTreeMap::new(),
733            max_concurrent_reads,
734            max_queued_items: 10000,
735        };
736        stream.add_dir_diff_items(RepoPath::root(), trees1, trees2);
737        stream
738    }
739
740    async fn single_tree(
741        store: &Arc<Store>,
742        dir: RepoPathBuf,
743        value: Option<&TreeValue>,
744    ) -> BackendResult<Tree> {
745        match value {
746            Some(TreeValue::Tree(tree_id)) => store.get_tree_async(dir, tree_id).await,
747            _ => Ok(Tree::empty(store.clone(), dir.clone())),
748        }
749    }
750
751    /// Gets the given trees if `values` are trees, otherwise an empty tree.
752    async fn trees(
753        store: Arc<Store>,
754        dir: RepoPathBuf,
755        values: MergedTreeValue,
756    ) -> BackendResult<Merge<Tree>> {
757        if values.is_tree() {
758            values
759                .try_map_async(|value| Self::single_tree(&store, dir.clone(), value.as_ref()))
760                .await
761        } else {
762            Ok(Merge::resolved(Tree::empty(store, dir)))
763        }
764    }
765
766    fn add_dir_diff_items(&mut self, dir: &RepoPath, trees1: &Merge<Tree>, trees2: &Merge<Tree>) {
767        for (basename, diff) in merged_tree_entry_diff(trees1, trees2) {
768            let path = dir.join(basename);
769            let tree_before = diff.before.is_tree();
770            let tree_after = diff.after.is_tree();
771            // Check if trees and files match, but only if either side is a tree or a file
772            // (don't query the matcher unnecessarily).
773            let tree_matches =
774                (tree_before || tree_after) && !self.matcher.visit(&path).is_nothing();
775            let file_matches = (!tree_before || !tree_after) && self.matcher.matches(&path);
776
777            // Replace trees or files that don't match by `Merge::absent()`
778            let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
779                diff.before
780            } else {
781                Merge::absent()
782            };
783            let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
784                diff.after
785            } else {
786                Merge::absent()
787            };
788            if before.is_absent() && after.is_absent() {
789                continue;
790            }
791
792            // If the path was a tree on either side of the diff, read those trees.
793            if tree_matches {
794                let before_tree_future =
795                    Self::trees(self.store.clone(), path.clone(), before.cloned());
796                let after_tree_future =
797                    Self::trees(self.store.clone(), path.clone(), after.cloned());
798                let both_trees_future = try_join(before_tree_future, after_tree_future);
799                self.pending_trees
800                    .insert(path.clone(), Box::pin(both_trees_future));
801            }
802
803            if before.is_file_like() || after.is_file_like() {
804                self.items
805                    .insert(path, Ok(Diff::new(before.cloned(), after.cloned())));
806            }
807        }
808    }
809
810    fn poll_tree_futures(&mut self, cx: &mut Context<'_>) {
811        loop {
812            let mut tree_diffs = vec![];
813            let mut some_pending = false;
814            let mut all_pending = true;
815            for (dir, future) in self
816                .pending_trees
817                .iter_mut()
818                .take(self.max_concurrent_reads)
819            {
820                if let Poll::Ready(tree_diff) = future.as_mut().poll(cx) {
821                    all_pending = false;
822                    tree_diffs.push((dir.clone(), tree_diff));
823                } else {
824                    some_pending = true;
825                }
826            }
827
828            for (dir, tree_diff) in tree_diffs {
829                let _ = self.pending_trees.remove_entry(&dir).unwrap();
830                match tree_diff {
831                    Ok((trees1, trees2)) => {
832                        self.add_dir_diff_items(&dir, &trees1, &trees2);
833                    }
834                    Err(err) => {
835                        self.items.insert(dir, Err(err));
836                    }
837                }
838            }
839
840            // If none of the futures have been polled and returned `Poll::Pending`, we must
841            // not return. If we did, nothing would call the waker so we might never get
842            // polled again.
843            if all_pending || (some_pending && self.items.len() >= self.max_queued_items) {
844                return;
845            }
846        }
847    }
848}
849
850impl Stream for TreeDiffStreamImpl<'_> {
851    type Item = TreeDiffEntry;
852
853    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
854        // Go through all pending tree futures and poll them.
855        self.poll_tree_futures(cx);
856
857        // Now emit the first file, or the first tree that completed with an error
858        if let Some((path, _)) = self.items.first_key_value() {
859            // Check if there are any pending trees before this item that we need to finish
860            // polling before we can emit this item.
861            if let Some((dir, _)) = self.pending_trees.first_key_value()
862                && dir < path
863            {
864                return Poll::Pending;
865            }
866
867            let (path, values) = self.items.pop_first().unwrap();
868            Poll::Ready(Some(TreeDiffEntry { path, values }))
869        } else if self.pending_trees.is_empty() {
870            Poll::Ready(None)
871        } else {
872            Poll::Pending
873        }
874    }
875}
876
877fn stream_without_trees(stream: TreeDiffStream) -> TreeDiffStream {
878    Box::pin(stream.map(|mut entry| {
879        let skip_tree = |merge: MergedTreeValue| {
880            if merge.is_tree() {
881                Merge::absent()
882            } else {
883                merge
884            }
885        };
886        entry.values = entry.values.map(|diff| diff.map(skip_tree));
887        entry
888    }))
889}
890
891/// Adapts a `TreeDiffStream` to emit a added file at a given path after a
892/// removed directory at the same path.
893struct DiffStreamForFileSystem<'a> {
894    inner: TreeDiffStream<'a>,
895    next_item: Option<TreeDiffEntry>,
896    held_file: Option<TreeDiffEntry>,
897}
898
899impl<'a> DiffStreamForFileSystem<'a> {
900    fn new(inner: TreeDiffStream<'a>) -> Self {
901        Self {
902            inner,
903            next_item: None,
904            held_file: None,
905        }
906    }
907}
908
909impl Stream for DiffStreamForFileSystem<'_> {
910    type Item = TreeDiffEntry;
911
912    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
913        while let Some(next) = match self.next_item.take() {
914            Some(next) => Some(next),
915            None => ready!(self.inner.as_mut().poll_next(cx)),
916        } {
917            // If there's a held file "foo" and the next item to emit is not "foo/...", then
918            // we must be done with the "foo/" directory and it's time to emit "foo" as a
919            // removed file.
920            if let Some(held_entry) = self
921                .held_file
922                .take_if(|held_entry| !next.path.starts_with(&held_entry.path))
923            {
924                self.next_item = Some(next);
925                return Poll::Ready(Some(held_entry));
926            }
927
928            match next.values {
929                Ok(diff) if diff.before.is_tree() => {
930                    assert!(diff.after.is_present());
931                    assert!(self.held_file.is_none());
932                    self.held_file = Some(TreeDiffEntry {
933                        path: next.path,
934                        values: Ok(Diff::new(Merge::absent(), diff.after)),
935                    });
936                }
937                Ok(diff) if diff.after.is_tree() => {
938                    assert!(diff.before.is_present());
939                    return Poll::Ready(Some(TreeDiffEntry {
940                        path: next.path,
941                        values: Ok(Diff::new(diff.before, Merge::absent())),
942                    }));
943                }
944                _ => {
945                    return Poll::Ready(Some(next));
946                }
947            }
948        }
949        Poll::Ready(self.held_file.take())
950    }
951}
952
953/// Helper for writing trees with conflicts.
954///
955/// You start by creating an instance of this type with one or more
956/// base trees. You then add overrides on top. The overrides may be
957/// conflicts. Then you can write the result as a legacy tree
958/// (allowing path-level conflicts) or as multiple conflict-free
959/// trees.
960pub struct MergedTreeBuilder {
961    base_tree_id: MergedTreeId,
962    overrides: BTreeMap<RepoPathBuf, MergedTreeValue>,
963}
964
965impl MergedTreeBuilder {
966    /// Create a new builder with the given trees as base.
967    pub fn new(base_tree_id: MergedTreeId) -> Self {
968        Self {
969            base_tree_id,
970            overrides: BTreeMap::new(),
971        }
972    }
973
974    /// Set an override compared to  the base tree. The `values` merge must
975    /// either be resolved (i.e. have 1 side) or have the same number of
976    /// sides as the `base_tree_ids` used to construct this builder. Use
977    /// `Merge::absent()` to remove a value from the tree.
978    pub fn set_or_remove(&mut self, path: RepoPathBuf, values: MergedTreeValue) {
979        self.overrides.insert(path, values);
980    }
981
982    /// Create new tree(s) from the base tree(s) and overrides.
983    pub fn write_tree(self, store: &Arc<Store>) -> BackendResult<MergedTreeId> {
984        let base_tree_ids = match self.base_tree_id.clone() {
985            MergedTreeId::Legacy(base_tree_id) => Merge::resolved(base_tree_id),
986            MergedTreeId::Merge(base_tree_ids) => base_tree_ids,
987        };
988        let new_tree_ids = self.write_merged_trees(base_tree_ids, store)?;
989        match new_tree_ids.simplify().into_resolved() {
990            Ok(single_tree_id) => Ok(MergedTreeId::resolved(single_tree_id)),
991            Err(tree_id) => {
992                let tree = store.get_root_tree(&MergedTreeId::Merge(tree_id))?;
993                let resolved = tree.resolve().block_on()?;
994                Ok(resolved.id())
995            }
996        }
997    }
998
999    fn write_merged_trees(
1000        self,
1001        mut base_tree_ids: Merge<TreeId>,
1002        store: &Arc<Store>,
1003    ) -> BackendResult<Merge<TreeId>> {
1004        let num_sides = self
1005            .overrides
1006            .values()
1007            .map(|value| value.num_sides())
1008            .max()
1009            .unwrap_or(0);
1010        base_tree_ids.pad_to(num_sides, store.empty_tree_id());
1011        // Create a single-tree builder for each base tree
1012        let mut tree_builders =
1013            base_tree_ids.map(|base_tree_id| TreeBuilder::new(store.clone(), base_tree_id.clone()));
1014        for (path, values) in self.overrides {
1015            match values.into_resolved() {
1016                Ok(value) => {
1017                    // This path was overridden with a resolved value. Apply that to all
1018                    // builders.
1019                    for builder in tree_builders.iter_mut() {
1020                        builder.set_or_remove(path.clone(), value.clone());
1021                    }
1022                }
1023                Err(mut values) => {
1024                    values.pad_to(num_sides, &None);
1025                    // This path was overridden with a conflicted value. Apply each term to
1026                    // its corresponding builder.
1027                    for (builder, value) in zip(tree_builders.iter_mut(), values) {
1028                        builder.set_or_remove(path.clone(), value);
1029                    }
1030                }
1031            }
1032        }
1033        // TODO: This can be made more efficient. If there's a single resolved conflict
1034        // in `dir/file`, we shouldn't have to write the `dir/` and root trees more than
1035        // once.
1036        let merge_builder: MergeBuilder<TreeId> = tree_builders
1037            .into_iter()
1038            .map(|builder| builder.write_tree())
1039            .try_collect()?;
1040        Ok(merge_builder.build())
1041    }
1042}