Skip to main content

jj_lib/
merged_tree.rs

1// Copyright 2023 The Jujutsu Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A lazily merged view of a set of trees.
16
17use std::collections::BTreeMap;
18use std::fmt;
19use std::iter;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::Context;
23use std::task::Poll;
24use std::task::ready;
25use std::vec;
26
27use either::Either;
28use futures::Stream;
29use futures::StreamExt as _;
30use futures::future::BoxFuture;
31use futures::future::try_join;
32use futures::stream::BoxStream;
33use itertools::EitherOrBoth;
34use itertools::Itertools as _;
35use pollster::FutureExt as _;
36
37use crate::backend::BackendResult;
38use crate::backend::CopyId;
39use crate::backend::TreeId;
40use crate::backend::TreeValue;
41use crate::conflict_labels::ConflictLabels;
42use crate::copies::CopiesTreeDiffEntry;
43use crate::copies::CopiesTreeDiffStream;
44use crate::copies::CopyHistoryDiffStream;
45use crate::copies::CopyHistoryTreeDiffEntry;
46use crate::copies::CopyRecords;
47use crate::matchers::EverythingMatcher;
48use crate::matchers::Matcher;
49use crate::merge::Diff;
50use crate::merge::Merge;
51use crate::merge::MergeBuilder;
52use crate::merge::MergedTreeVal;
53use crate::merge::MergedTreeValue;
54use crate::repo_path::RepoPath;
55use crate::repo_path::RepoPathBuf;
56use crate::repo_path::RepoPathComponent;
57use crate::store::Store;
58use crate::tree::Tree;
59use crate::tree_merge::merge_trees;
60
61/// Presents a view of a merged set of trees at the root directory, as well as
62/// conflict labels.
63#[derive(Clone)]
64pub struct MergedTree {
65    store: Arc<Store>,
66    tree_ids: Merge<TreeId>,
67    labels: ConflictLabels,
68}
69
70impl fmt::Debug for MergedTree {
71    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72        f.debug_struct("MergedTree")
73            .field("tree_ids", &self.tree_ids)
74            .field("labels", &self.labels)
75            .finish_non_exhaustive()
76    }
77}
78
79impl MergedTree {
80    /// Creates a `MergedTree` with the given resolved tree ID.
81    pub fn resolved(store: Arc<Store>, tree_id: TreeId) -> Self {
82        Self {
83            store,
84            tree_ids: Merge::resolved(tree_id),
85            labels: ConflictLabels::unlabeled(),
86        }
87    }
88
89    /// Creates a `MergedTree` with the given tree IDs.
90    pub fn new(store: Arc<Store>, tree_ids: Merge<TreeId>, labels: ConflictLabels) -> Self {
91        if let Some(num_sides) = labels.num_sides() {
92            assert_eq!(tree_ids.num_sides(), num_sides);
93        }
94        Self {
95            store,
96            tree_ids,
97            labels,
98        }
99    }
100
101    /// The `Store` associated with this tree.
102    pub fn store(&self) -> &Arc<Store> {
103        &self.store
104    }
105
106    /// The underlying tree IDs for this `MergedTree`. If there are file changes
107    /// between two trees, then the tree IDs will be different.
108    pub fn tree_ids(&self) -> &Merge<TreeId> {
109        &self.tree_ids
110    }
111
112    /// Extracts the underlying tree IDs for this `MergedTree`, discarding any
113    /// conflict labels.
114    pub fn into_tree_ids(self) -> Merge<TreeId> {
115        self.tree_ids
116    }
117
118    /// Returns this merge's conflict labels, if any.
119    pub fn labels(&self) -> &ConflictLabels {
120        &self.labels
121    }
122
123    /// Returns both the underlying tree IDs and any conflict labels. This can
124    /// be used to check whether there are changes in files to be materialized
125    /// in the working copy.
126    pub fn tree_ids_and_labels(&self) -> (&Merge<TreeId>, &ConflictLabels) {
127        (&self.tree_ids, &self.labels)
128    }
129
130    /// Extracts the underlying tree IDs and conflict labels.
131    pub fn into_tree_ids_and_labels(self) -> (Merge<TreeId>, ConflictLabels) {
132        (self.tree_ids, self.labels)
133    }
134
135    /// Reads the merge of tree objects represented by this `MergedTree`.
136    pub async fn trees(&self) -> BackendResult<Merge<Tree>> {
137        self.tree_ids
138            .try_map_async(|id| self.store.get_tree(RepoPathBuf::root(), id))
139            .await
140    }
141
142    /// Returns a label for each term in a merge. Resolved merges use the
143    /// provided label, while conflicted merges keep their original labels.
144    /// Missing labels are indicated by empty strings.
145    pub fn labels_by_term<'a>(&'a self, label: &'a str) -> Merge<&'a str> {
146        if self.tree_ids.is_resolved() {
147            assert!(!self.labels.has_labels());
148            Merge::resolved(label)
149        } else if self.labels.has_labels() {
150            // If the merge is conflicted and it already has labels, then we want to use
151            // those labels instead of the provided label. This ensures that rebasing
152            // conflicted commits keeps meaningful labels.
153            let labels = self.labels.as_merge();
154            assert_eq!(labels.num_sides(), self.tree_ids.num_sides());
155            labels.map(|label| label.as_str())
156        } else {
157            // If the merge is conflicted but it doesn't have labels (e.g. conflicts created
158            // before labels were added), then we use empty strings to indicate missing
159            // labels. We could consider using `label` for all the sides instead, but it
160            // might be confusing.
161            Merge::repeated("", self.tree_ids.num_sides())
162        }
163    }
164
165    /// Tries to resolve any conflicts, resolving any conflicts that can be
166    /// automatically resolved and leaving the rest unresolved.
167    pub async fn resolve(self) -> BackendResult<Self> {
168        let merged = merge_trees(&self.store, self.tree_ids).await?;
169        // If the result can be resolved, then `merge_trees()` above would have returned
170        // a resolved merge. However, that function will always preserve the arity of
171        // conflicts it cannot resolve. So we simplify the conflict again
172        // here to possibly reduce a complex conflict to a simpler one.
173        let (simplified_labels, simplified) = if merged.is_resolved() {
174            (ConflictLabels::unlabeled(), merged)
175        } else {
176            self.labels.simplify_with(&merged)
177        };
178        // If debug assertions are enabled, check that the merge was idempotent. In
179        // particular, that this last simplification doesn't enable further automatic
180        // resolutions
181        if cfg!(debug_assertions) {
182            let re_merged = merge_trees(&self.store, simplified.clone()).await.unwrap();
183            debug_assert_eq!(re_merged, simplified);
184        }
185        Ok(Self {
186            store: self.store,
187            tree_ids: simplified,
188            labels: simplified_labels,
189        })
190    }
191
192    /// An iterator over the conflicts in this tree, including subtrees.
193    /// Recurses into subtrees and yields conflicts in those, but only if
194    /// all sides are trees, so tree/file conflicts will be reported as a single
195    /// conflict, not one for each path in the tree.
196    pub fn conflicts(
197        &self,
198    ) -> impl Iterator<Item = (RepoPathBuf, BackendResult<MergedTreeValue>)> + use<> {
199        self.conflicts_matching(&EverythingMatcher)
200    }
201
202    /// Like `conflicts()` but restricted by a matcher.
203    pub fn conflicts_matching<'matcher>(
204        &self,
205        matcher: &'matcher dyn Matcher,
206    ) -> impl Iterator<Item = (RepoPathBuf, BackendResult<MergedTreeValue>)> + use<'matcher> {
207        ConflictIterator::new(self, matcher)
208    }
209
210    /// Whether this tree has conflicts.
211    pub fn has_conflict(&self) -> bool {
212        !self.tree_ids.is_resolved()
213    }
214
215    /// The value at the given path. The value can be `Resolved` even if
216    /// `self` is a `Conflict`, which happens if the value at the path can be
217    /// trivially merged.
218    pub async fn path_value(&self, path: &RepoPath) -> BackendResult<MergedTreeValue> {
219        match path.split() {
220            Some((dir, basename)) => {
221                let trees = self.trees().await?;
222                match trees.sub_tree_recursive(dir).await? {
223                    None => Ok(Merge::absent()),
224                    Some(tree) => Ok(tree.value(basename).cloned()),
225                }
226            }
227            None => Ok(self.to_merged_tree_value()),
228        }
229    }
230
231    /// Returns the `TreeValue` associated with `id` if it exists at the
232    /// expected path and is resolved.
233    pub async fn copy_value(&self, id: &CopyId) -> BackendResult<Option<TreeValue>> {
234        let copy = self.store().backend().read_copy(id).await?;
235        let merged_val = self.path_value(&copy.current_path).await?;
236        match merged_val.into_resolved() {
237            Ok(Some(val)) if val.copy_id() == Some(id) => Ok(Some(val)),
238            _ => Ok(None),
239        }
240    }
241
242    fn to_merged_tree_value(&self) -> MergedTreeValue {
243        self.tree_ids
244            .map(|tree_id| Some(TreeValue::Tree(tree_id.clone())))
245    }
246
247    /// Iterator over the entries matching the given matcher. Subtrees are
248    /// visited recursively. Subtrees that differ between the current
249    /// `MergedTree`'s terms are merged on the fly. Missing terms are treated as
250    /// empty directories. Subtrees that conflict with non-trees are not
251    /// visited. For example, if current tree is a merge of 3 trees, and the
252    /// entry for 'foo' is a conflict between a change subtree and a symlink
253    /// (i.e. the subdirectory was replaced by symlink in one side of the
254    /// conflict), then the entry for `foo` itself will be emitted, but no
255    /// entries from inside `foo/` from either of the trees will be.
256    pub fn entries(&self) -> TreeEntriesIterator<'static> {
257        self.entries_matching(&EverythingMatcher)
258    }
259
260    /// Like `entries()` but restricted by a matcher.
261    pub fn entries_matching<'matcher>(
262        &self,
263        matcher: &'matcher dyn Matcher,
264    ) -> TreeEntriesIterator<'matcher> {
265        TreeEntriesIterator::new(self, matcher)
266    }
267
268    /// Stream of the differences between this tree and another tree.
269    fn diff_stream_internal<'matcher>(
270        &self,
271        other: &Self,
272        matcher: &'matcher dyn Matcher,
273    ) -> TreeDiffStream<'matcher> {
274        let concurrency = self.store().concurrency();
275        if concurrency <= 1 {
276            futures::stream::iter(TreeDiffIterator::new(self, other, matcher)).boxed()
277        } else {
278            TreeDiffStreamImpl::new(self, other, matcher, concurrency).boxed()
279        }
280    }
281
282    /// Stream of the differences between this tree and another tree.
283    pub fn diff_stream<'matcher>(
284        &self,
285        other: &Self,
286        matcher: &'matcher dyn Matcher,
287    ) -> TreeDiffStream<'matcher> {
288        stream_without_trees(self.diff_stream_internal(other, matcher))
289    }
290
291    /// Like `diff_stream()` but trees with diffs themselves are also included.
292    pub fn diff_stream_with_trees<'matcher>(
293        &self,
294        other: &Self,
295        matcher: &'matcher dyn Matcher,
296    ) -> TreeDiffStream<'matcher> {
297        self.diff_stream_internal(other, matcher)
298    }
299
300    /// Like `diff_stream()` but files in a removed tree will be returned before
301    /// a file that replaces it.
302    pub fn diff_stream_for_file_system<'matcher>(
303        &self,
304        other: &Self,
305        matcher: &'matcher dyn Matcher,
306    ) -> TreeDiffStream<'matcher> {
307        DiffStreamForFileSystem::new(self.diff_stream_internal(other, matcher)).boxed()
308    }
309
310    /// Like `diff_stream()` but takes the given copy records into account.
311    pub fn diff_stream_with_copies<'a>(
312        &self,
313        other: &Self,
314        matcher: &'a dyn Matcher,
315        copy_records: &'a CopyRecords,
316    ) -> BoxStream<'a, CopiesTreeDiffEntry> {
317        let stream = self.diff_stream(other, matcher);
318        CopiesTreeDiffStream::new(stream, self.clone(), other.clone(), copy_records).boxed()
319    }
320
321    /// Like `diff_stream()` but takes CopyHistory into account.
322    pub fn diff_stream_with_copy_history<'a>(
323        &'a self,
324        other: &'a Self,
325        matcher: &'a dyn Matcher,
326    ) -> BoxStream<'a, CopyHistoryTreeDiffEntry> {
327        let stream = self.diff_stream(other, matcher);
328        CopyHistoryDiffStream::new(stream, self, other).boxed()
329    }
330
331    /// Merges the provided trees into a single `MergedTree`. Any conflicts will
332    /// be resolved recursively if possible. The provided labels are used if a
333    /// conflict arises. However, if one of the input trees is already
334    /// conflicted, the corresponding label will be ignored, and its existing
335    /// labels will be used instead.
336    pub async fn merge(merge: Merge<(Self, String)>) -> BackendResult<Self> {
337        Self::merge_no_resolve(merge).resolve().await
338    }
339
340    /// Merges the provided trees into a single `MergedTree`, without attempting
341    /// to resolve file conflicts.
342    pub fn merge_no_resolve(merge: Merge<(Self, String)>) -> Self {
343        debug_assert!(
344            merge
345                .iter()
346                .map(|(tree, _)| Arc::as_ptr(tree.store()))
347                .all_equal()
348        );
349        let store = merge.first().0.store().clone();
350        let flattened_labels = ConflictLabels::from_merge(
351            merge
352                .map(|(tree, label)| tree.labels_by_term(label))
353                .flatten()
354                .map(|&label| label.to_owned()),
355        );
356        let flattened_tree_ids: Merge<TreeId> = merge
357            .into_map(|(tree, _label)| tree.into_tree_ids())
358            .flatten();
359
360        let (labels, tree_ids) = flattened_labels.simplify_with(&flattened_tree_ids);
361        Self::new(store, tree_ids, labels)
362    }
363}
364
365/// A single entry in a tree diff.
366#[derive(Debug)]
367pub struct TreeDiffEntry {
368    /// The path.
369    pub path: RepoPathBuf,
370    /// The resolved tree values if available.
371    pub values: BackendResult<Diff<MergedTreeValue>>,
372}
373
374/// Type alias for the result from `MergedTree::diff_stream()`. We use a
375/// `Stream` instead of an `Iterator` so high-latency backends (e.g. cloud-based
376/// ones) can fetch trees asynchronously.
377pub type TreeDiffStream<'matcher> = BoxStream<'matcher, TreeDiffEntry>;
378
379fn all_tree_entries(
380    trees: &Merge<Tree>,
381) -> impl Iterator<Item = (&RepoPathComponent, MergedTreeVal<'_>)> {
382    if let Some(tree) = trees.as_resolved() {
383        let iter = tree
384            .entries_non_recursive()
385            .map(|entry| (entry.name(), Merge::normal(entry.value())));
386        Either::Left(iter)
387    } else {
388        let same_change = trees.first().store().merge_options().same_change;
389        let iter = all_merged_tree_entries(trees).map(move |(name, values)| {
390            // TODO: move resolve_trivial() to caller?
391            let values = match values.resolve_trivial(same_change) {
392                Some(resolved) => Merge::resolved(*resolved),
393                None => values,
394            };
395            (name, values)
396        });
397        Either::Right(iter)
398    }
399}
400
401/// Suppose the given `trees` aren't resolved, iterates `(name, values)` pairs
402/// non-recursively. This also works if `trees` are resolved, but is more costly
403/// than `tree.entries_non_recursive()`.
404pub fn all_merged_tree_entries(
405    trees: &Merge<Tree>,
406) -> impl Iterator<Item = (&RepoPathComponent, MergedTreeVal<'_>)> {
407    let mut entries_iters = trees
408        .iter()
409        .map(|tree| tree.entries_non_recursive().peekable())
410        .collect_vec();
411    iter::from_fn(move || {
412        let next_name = entries_iters
413            .iter_mut()
414            .filter_map(|iter| iter.peek())
415            .map(|entry| entry.name())
416            .min()?;
417        let values: MergeBuilder<_> = entries_iters
418            .iter_mut()
419            .map(|iter| {
420                let entry = iter.next_if(|entry| entry.name() == next_name)?;
421                Some(entry.value())
422            })
423            .collect();
424        Some((next_name, values.build()))
425    })
426}
427
428fn merged_tree_entry_diff<'a>(
429    trees1: &'a Merge<Tree>,
430    trees2: &'a Merge<Tree>,
431) -> impl Iterator<Item = (&'a RepoPathComponent, Diff<MergedTreeVal<'a>>)> {
432    itertools::merge_join_by(
433        all_tree_entries(trees1),
434        all_tree_entries(trees2),
435        |(name1, _), (name2, _)| name1.cmp(name2),
436    )
437    .map(|entry| match entry {
438        EitherOrBoth::Both((name, value1), (_, value2)) => (name, Diff::new(value1, value2)),
439        EitherOrBoth::Left((name, value1)) => (name, Diff::new(value1, Merge::absent())),
440        EitherOrBoth::Right((name, value2)) => (name, Diff::new(Merge::absent(), value2)),
441    })
442    .filter(|(_, diff)| diff.is_changed())
443}
444
445/// Recursive iterator over the entries in a tree.
446pub struct TreeEntriesIterator<'matcher> {
447    store: Arc<Store>,
448    stack: Vec<TreeEntriesDirItem>,
449    matcher: &'matcher dyn Matcher,
450}
451
452struct TreeEntriesDirItem {
453    entries: Vec<(RepoPathBuf, MergedTreeValue)>,
454}
455
456impl TreeEntriesDirItem {
457    fn new(trees: &Merge<Tree>, matcher: &dyn Matcher) -> Self {
458        let mut entries = vec![];
459        let dir = trees.first().dir();
460        for (name, value) in all_tree_entries(trees) {
461            let path = dir.join(name);
462            if value.is_tree() {
463                // TODO: Handle the other cases (specific files and trees)
464                if matcher.visit(&path).is_nothing() {
465                    continue;
466                }
467            } else if !matcher.matches(&path) {
468                continue;
469            }
470            entries.push((path, value.cloned()));
471        }
472        entries.reverse();
473        Self { entries }
474    }
475}
476
477impl<'matcher> TreeEntriesIterator<'matcher> {
478    fn new(trees: &MergedTree, matcher: &'matcher dyn Matcher) -> Self {
479        Self {
480            store: trees.store.clone(),
481            stack: vec![TreeEntriesDirItem {
482                entries: vec![(RepoPathBuf::root(), trees.to_merged_tree_value())],
483            }],
484            matcher,
485        }
486    }
487}
488
489impl Iterator for TreeEntriesIterator<'_> {
490    type Item = (RepoPathBuf, BackendResult<MergedTreeValue>);
491
492    fn next(&mut self) -> Option<Self::Item> {
493        while let Some(top) = self.stack.last_mut() {
494            if let Some((path, value)) = top.entries.pop() {
495                let maybe_trees = match value.to_tree_merge(&self.store, &path).block_on() {
496                    Ok(maybe_trees) => maybe_trees,
497                    Err(err) => return Some((path, Err(err))),
498                };
499                if let Some(trees) = maybe_trees {
500                    self.stack
501                        .push(TreeEntriesDirItem::new(&trees, self.matcher));
502                } else {
503                    return Some((path, Ok(value)));
504                }
505            } else {
506                self.stack.pop();
507            }
508        }
509        None
510    }
511}
512
513/// The state for the non-recursive iteration over the conflicted entries in a
514/// single directory.
515struct ConflictsDirItem {
516    entries: Vec<(RepoPathBuf, MergedTreeValue)>,
517}
518
519impl ConflictsDirItem {
520    fn new(trees: &Merge<Tree>, matcher: &dyn Matcher) -> Self {
521        if trees.is_resolved() {
522            return Self { entries: vec![] };
523        }
524
525        let dir = trees.first().dir();
526        let mut entries = vec![];
527        for (basename, value) in all_tree_entries(trees) {
528            if value.is_resolved() {
529                continue;
530            }
531            let path = dir.join(basename);
532            if value.is_tree() {
533                if matcher.visit(&path).is_nothing() {
534                    continue;
535                }
536            } else if !matcher.matches(&path) {
537                continue;
538            }
539            entries.push((path, value.cloned()));
540        }
541        entries.reverse();
542        Self { entries }
543    }
544}
545
546struct ConflictIterator<'matcher> {
547    store: Arc<Store>,
548    stack: Vec<ConflictsDirItem>,
549    matcher: &'matcher dyn Matcher,
550}
551
552impl<'matcher> ConflictIterator<'matcher> {
553    fn new(tree: &MergedTree, matcher: &'matcher dyn Matcher) -> Self {
554        Self {
555            store: tree.store().clone(),
556            stack: vec![ConflictsDirItem {
557                entries: vec![(RepoPathBuf::root(), tree.to_merged_tree_value())],
558            }],
559            matcher,
560        }
561    }
562}
563
564impl Iterator for ConflictIterator<'_> {
565    type Item = (RepoPathBuf, BackendResult<MergedTreeValue>);
566
567    fn next(&mut self) -> Option<Self::Item> {
568        while let Some(top) = self.stack.last_mut() {
569            if let Some((path, tree_values)) = top.entries.pop() {
570                match tree_values.to_tree_merge(&self.store, &path).block_on() {
571                    Ok(Some(trees)) => {
572                        // If all sides are trees or missing, descend into the merged tree
573                        self.stack.push(ConflictsDirItem::new(&trees, self.matcher));
574                    }
575                    Ok(None) => {
576                        // Otherwise this is a conflict between files, trees, etc. If they could
577                        // be automatically resolved, they should have been when the top-level
578                        // tree conflict was written, so we assume that they can't be.
579                        return Some((path, Ok(tree_values)));
580                    }
581                    Err(err) => {
582                        return Some((path, Err(err)));
583                    }
584                }
585            } else {
586                self.stack.pop();
587            }
588        }
589        None
590    }
591}
592
593/// Iterator over the differences between two trees.
594pub struct TreeDiffIterator<'matcher> {
595    store: Arc<Store>,
596    stack: Vec<TreeDiffDir>,
597    matcher: &'matcher dyn Matcher,
598}
599
600struct TreeDiffDir {
601    entries: Vec<(RepoPathBuf, Diff<MergedTreeValue>)>,
602}
603
604impl<'matcher> TreeDiffIterator<'matcher> {
605    /// Creates a iterator over the differences between two trees.
606    pub fn new(tree1: &MergedTree, tree2: &MergedTree, matcher: &'matcher dyn Matcher) -> Self {
607        assert!(Arc::ptr_eq(tree1.store(), tree2.store()));
608        let root_dir = RepoPath::root();
609        let mut stack = Vec::new();
610        let root_diff = Diff::new(tree1.to_merged_tree_value(), tree2.to_merged_tree_value());
611        if root_diff.is_changed() && !matcher.visit(root_dir).is_nothing() {
612            stack.push(TreeDiffDir {
613                entries: vec![(root_dir.to_owned(), root_diff)],
614            });
615        }
616        Self {
617            store: tree1.store().clone(),
618            stack,
619            matcher,
620        }
621    }
622
623    /// Gets the given trees if `values` are trees, otherwise an empty tree.
624    fn trees(
625        store: &Arc<Store>,
626        dir: &RepoPath,
627        values: &MergedTreeValue,
628    ) -> BackendResult<Merge<Tree>> {
629        if let Some(trees) = values.to_tree_merge(store, dir).block_on()? {
630            Ok(trees)
631        } else {
632            Ok(Merge::resolved(Tree::empty(store.clone(), dir.to_owned())))
633        }
634    }
635}
636
637impl TreeDiffDir {
638    fn from_trees(
639        dir: &RepoPath,
640        trees1: &Merge<Tree>,
641        trees2: &Merge<Tree>,
642        matcher: &dyn Matcher,
643    ) -> Self {
644        let mut entries = vec![];
645        for (name, diff) in merged_tree_entry_diff(trees1, trees2) {
646            let path = dir.join(name);
647            let tree_before = diff.before.is_tree();
648            let tree_after = diff.after.is_tree();
649            // Check if trees and files match, but only if either side is a tree or a file
650            // (don't query the matcher unnecessarily).
651            let tree_matches = (tree_before || tree_after) && !matcher.visit(&path).is_nothing();
652            let file_matches = (!tree_before || !tree_after) && matcher.matches(&path);
653
654            // Replace trees or files that don't match by `Merge::absent()`
655            let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
656                diff.before
657            } else {
658                Merge::absent()
659            };
660            let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
661                diff.after
662            } else {
663                Merge::absent()
664            };
665            if before.is_absent() && after.is_absent() {
666                continue;
667            }
668            entries.push((path, Diff::new(before.cloned(), after.cloned())));
669        }
670        entries.reverse();
671        Self { entries }
672    }
673}
674
675impl Iterator for TreeDiffIterator<'_> {
676    type Item = TreeDiffEntry;
677
678    fn next(&mut self) -> Option<Self::Item> {
679        while let Some(top) = self.stack.last_mut() {
680            let Some((path, diff)) = top.entries.pop() else {
681                self.stack.pop().unwrap();
682                continue;
683            };
684
685            if diff.before.is_tree() || diff.after.is_tree() {
686                let (before_tree, after_tree) = match (
687                    Self::trees(&self.store, &path, &diff.before),
688                    Self::trees(&self.store, &path, &diff.after),
689                ) {
690                    (Ok(before_tree), Ok(after_tree)) => (before_tree, after_tree),
691                    (Err(before_err), _) => {
692                        return Some(TreeDiffEntry {
693                            path,
694                            values: Err(before_err),
695                        });
696                    }
697                    (_, Err(after_err)) => {
698                        return Some(TreeDiffEntry {
699                            path,
700                            values: Err(after_err),
701                        });
702                    }
703                };
704                let subdir =
705                    TreeDiffDir::from_trees(&path, &before_tree, &after_tree, self.matcher);
706                self.stack.push(subdir);
707            }
708            if diff.before.is_file_like()
709                || diff.after.is_file_like()
710                || self.matcher.matches(&path)
711            {
712                return Some(TreeDiffEntry {
713                    path,
714                    values: Ok(diff),
715                });
716            }
717        }
718        None
719    }
720}
721
722/// Stream of differences between two trees.
723pub struct TreeDiffStreamImpl<'matcher> {
724    store: Arc<Store>,
725    matcher: &'matcher dyn Matcher,
726    /// Pairs of tree values that may or may not be ready to emit, sorted in the
727    /// order we want to emit them. If either side is a tree, there will be
728    /// a corresponding entry in `pending_trees`. The item is ready to emit
729    /// unless there's a smaller or equal path in `pending_trees`.
730    items: BTreeMap<RepoPathBuf, BackendResult<Diff<MergedTreeValue>>>,
731    // TODO: Is it better to combine this and `items` into a single map?
732    #[expect(clippy::type_complexity)]
733    pending_trees:
734        BTreeMap<RepoPathBuf, BoxFuture<'matcher, BackendResult<(Merge<Tree>, Merge<Tree>)>>>,
735    /// The maximum number of trees to request concurrently. However, we do the
736    /// accounting per path, so there will often be twice as many pending
737    /// `Backend::read_tree()` calls - for the "before" and "after" sides. For
738    /// conflicts, there will be even more.
739    max_concurrent_reads: usize,
740    /// The maximum number of items in `items`. However, we will always add the
741    /// full differences from a particular pair of trees, so it may temporarily
742    /// go over the limit (until we emit those items). It may also go over the
743    /// limit because we have a file item that's blocked by pending subdirectory
744    /// items.
745    max_queued_items: usize,
746}
747
748impl<'matcher> TreeDiffStreamImpl<'matcher> {
749    /// Creates a iterator over the differences between two trees. Generally
750    /// prefer `MergedTree::diff_stream()` of calling this directly.
751    pub fn new(
752        tree1: &MergedTree,
753        tree2: &MergedTree,
754        matcher: &'matcher dyn Matcher,
755        max_concurrent_reads: usize,
756    ) -> Self {
757        assert!(Arc::ptr_eq(tree1.store(), tree2.store()));
758        let store = tree1.store().clone();
759        let mut stream = Self {
760            store: store.clone(),
761            matcher,
762            items: BTreeMap::new(),
763            pending_trees: BTreeMap::new(),
764            max_concurrent_reads,
765            max_queued_items: 10000,
766        };
767        let dir = RepoPathBuf::root();
768        let merged_tree1 = tree1.to_merged_tree_value();
769        let merged_tree2 = tree2.to_merged_tree_value();
770        let root_diff = Diff::new(merged_tree1.clone(), merged_tree2.clone());
771        if root_diff.is_changed() && matcher.matches(&dir) {
772            stream.items.insert(dir.clone(), Ok(root_diff));
773        }
774        let root_tree_fut = Box::pin(try_join(
775            Self::trees(store.clone(), dir.clone(), merged_tree1),
776            Self::trees(store, dir.clone(), merged_tree2),
777        ));
778        stream.pending_trees.insert(dir, root_tree_fut);
779        stream
780    }
781
782    async fn single_tree(
783        store: &Arc<Store>,
784        dir: RepoPathBuf,
785        value: Option<&TreeValue>,
786    ) -> BackendResult<Tree> {
787        match value {
788            Some(TreeValue::Tree(tree_id)) => store.get_tree(dir, tree_id).await,
789            _ => Ok(Tree::empty(store.clone(), dir.clone())),
790        }
791    }
792
793    /// Gets the given trees if `values` are trees, otherwise an empty tree.
794    async fn trees(
795        store: Arc<Store>,
796        dir: RepoPathBuf,
797        values: MergedTreeValue,
798    ) -> BackendResult<Merge<Tree>> {
799        if values.is_tree() {
800            values
801                .try_map_async(|value| Self::single_tree(&store, dir.clone(), value.as_ref()))
802                .await
803        } else {
804            Ok(Merge::resolved(Tree::empty(store, dir)))
805        }
806    }
807
808    fn add_dir_diff_items(&mut self, dir: &RepoPath, trees1: &Merge<Tree>, trees2: &Merge<Tree>) {
809        for (basename, diff) in merged_tree_entry_diff(trees1, trees2) {
810            let path = dir.join(basename);
811            let tree_before = diff.before.is_tree();
812            let tree_after = diff.after.is_tree();
813            // Check if trees and files match, but only if either side is a tree or a file
814            // (don't query the matcher unnecessarily).
815            let tree_matches =
816                (tree_before || tree_after) && !self.matcher.visit(&path).is_nothing();
817            let file_matches = (!tree_before || !tree_after) && self.matcher.matches(&path);
818
819            // Replace trees or files that don't match by `Merge::absent()`
820            let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
821                diff.before
822            } else {
823                Merge::absent()
824            };
825            let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
826                diff.after
827            } else {
828                Merge::absent()
829            };
830            if before.is_absent() && after.is_absent() {
831                continue;
832            }
833
834            // If the path was a tree on either side of the diff, read those trees.
835            if tree_matches {
836                let before_tree_future =
837                    Self::trees(self.store.clone(), path.clone(), before.cloned());
838                let after_tree_future =
839                    Self::trees(self.store.clone(), path.clone(), after.cloned());
840                let both_trees_future = try_join(before_tree_future, after_tree_future);
841                self.pending_trees
842                    .insert(path.clone(), Box::pin(both_trees_future));
843            }
844
845            if file_matches || self.matcher.matches(&path) {
846                self.items
847                    .insert(path, Ok(Diff::new(before.cloned(), after.cloned())));
848            }
849        }
850    }
851
852    fn poll_tree_futures(&mut self, cx: &mut Context<'_>) {
853        loop {
854            let mut tree_diffs = vec![];
855            let mut some_pending = false;
856            let mut all_pending = true;
857            for (dir, future) in self
858                .pending_trees
859                .iter_mut()
860                .take(self.max_concurrent_reads)
861            {
862                if let Poll::Ready(tree_diff) = future.as_mut().poll(cx) {
863                    all_pending = false;
864                    tree_diffs.push((dir.clone(), tree_diff));
865                } else {
866                    some_pending = true;
867                }
868            }
869
870            for (dir, tree_diff) in tree_diffs {
871                drop(self.pending_trees.remove_entry(&dir).unwrap());
872                match tree_diff {
873                    Ok((trees1, trees2)) => {
874                        self.add_dir_diff_items(&dir, &trees1, &trees2);
875                    }
876                    Err(err) => {
877                        self.items.insert(dir, Err(err));
878                    }
879                }
880            }
881
882            // If none of the futures have been polled and returned `Poll::Pending`, we must
883            // not return. If we did, nothing would call the waker so we might never get
884            // polled again.
885            if all_pending || (some_pending && self.items.len() >= self.max_queued_items) {
886                return;
887            }
888        }
889    }
890}
891
892impl Stream for TreeDiffStreamImpl<'_> {
893    type Item = TreeDiffEntry;
894
895    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
896        // Go through all pending tree futures and poll them.
897        self.poll_tree_futures(cx);
898
899        // Now emit the first file, or the first tree that completed with an error
900        if let Some((path, _)) = self.items.first_key_value() {
901            // Check if there are any pending trees before this item that we need to finish
902            // polling before we can emit this item.
903            if let Some((dir, _)) = self.pending_trees.first_key_value()
904                && dir < path
905            {
906                return Poll::Pending;
907            }
908
909            let (path, values) = self.items.pop_first().unwrap();
910            Poll::Ready(Some(TreeDiffEntry { path, values }))
911        } else if self.pending_trees.is_empty() {
912            Poll::Ready(None)
913        } else {
914            Poll::Pending
915        }
916    }
917}
918
919fn stream_without_trees(stream: TreeDiffStream) -> TreeDiffStream {
920    stream
921        .filter_map(|mut entry| async move {
922            let skip_tree = |merge: MergedTreeValue| {
923                if merge.is_tree() {
924                    Merge::absent()
925                } else {
926                    merge
927                }
928            };
929            entry.values = entry.values.map(|diff| diff.map(skip_tree));
930
931            // Filter out entries where neither side is present.
932            let any_present = entry.values.as_ref().map_or(true, |diff| {
933                diff.before.is_present() || diff.after.is_present()
934            });
935            any_present.then_some(entry)
936        })
937        .boxed()
938}
939
940/// Adapts a `TreeDiffStream` to emit a added file at a given path after a
941/// removed directory at the same path.
942struct DiffStreamForFileSystem<'a> {
943    inner: TreeDiffStream<'a>,
944    next_item: Option<TreeDiffEntry>,
945    held_file: Option<TreeDiffEntry>,
946}
947
948impl<'a> DiffStreamForFileSystem<'a> {
949    fn new(inner: TreeDiffStream<'a>) -> Self {
950        Self {
951            inner,
952            next_item: None,
953            held_file: None,
954        }
955    }
956}
957
958impl Stream for DiffStreamForFileSystem<'_> {
959    type Item = TreeDiffEntry;
960
961    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
962        while let Some(next) = match self.next_item.take() {
963            Some(next) => Some(next),
964            None => ready!(self.inner.as_mut().poll_next(cx)),
965        } {
966            // Filter out changes where neither side (before or after) is_file_like.
967            // This ensures we only process file-level changes and transitions.
968            if let Ok(diff) = &next.values
969                && !diff.before.is_file_like()
970                && !diff.after.is_file_like()
971            {
972                continue;
973            }
974
975            // If there's a held file "foo" and the next item to emit is not "foo/...", then
976            // we must be done with the "foo/" directory and it's time to emit "foo" as a
977            // removed file.
978            if let Some(held_entry) = self
979                .held_file
980                .take_if(|held_entry| !next.path.starts_with(&held_entry.path))
981            {
982                self.next_item = Some(next);
983                return Poll::Ready(Some(held_entry));
984            }
985
986            match next.values {
987                Ok(diff) if diff.before.is_tree() => {
988                    assert!(diff.after.is_present());
989                    assert!(self.held_file.is_none());
990                    self.held_file = Some(TreeDiffEntry {
991                        path: next.path,
992                        values: Ok(Diff::new(Merge::absent(), diff.after)),
993                    });
994                }
995                Ok(diff) if diff.after.is_tree() => {
996                    assert!(diff.before.is_present());
997                    return Poll::Ready(Some(TreeDiffEntry {
998                        path: next.path,
999                        values: Ok(Diff::new(diff.before, Merge::absent())),
1000                    }));
1001                }
1002                _ => {
1003                    return Poll::Ready(Some(next));
1004                }
1005            }
1006        }
1007        Poll::Ready(self.held_file.take())
1008    }
1009}