Skip to main content

jj_lib/
merged_tree.rs

1// Copyright 2023 The Jujutsu Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A lazily merged view of a set of trees.
16
17use std::collections::BTreeMap;
18use std::fmt;
19use std::iter;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::Context;
23use std::task::Poll;
24use std::task::ready;
25use std::vec;
26
27use either::Either;
28use futures::Stream;
29use futures::StreamExt as _;
30use futures::future::BoxFuture;
31use futures::future::try_join;
32use futures::stream::BoxStream;
33use itertools::EitherOrBoth;
34use itertools::Itertools as _;
35use pollster::FutureExt as _;
36
37use crate::backend::BackendResult;
38use crate::backend::TreeId;
39use crate::backend::TreeValue;
40use crate::conflict_labels::ConflictLabels;
41use crate::copies::CopiesTreeDiffEntry;
42use crate::copies::CopiesTreeDiffStream;
43use crate::copies::CopyRecords;
44use crate::matchers::EverythingMatcher;
45use crate::matchers::Matcher;
46use crate::merge::Diff;
47use crate::merge::Merge;
48use crate::merge::MergeBuilder;
49use crate::merge::MergedTreeVal;
50use crate::merge::MergedTreeValue;
51use crate::repo_path::RepoPath;
52use crate::repo_path::RepoPathBuf;
53use crate::repo_path::RepoPathComponent;
54use crate::store::Store;
55use crate::tree::Tree;
56use crate::tree_merge::merge_trees;
57
58/// Presents a view of a merged set of trees at the root directory, as well as
59/// conflict labels.
60#[derive(Clone)]
61pub struct MergedTree {
62    store: Arc<Store>,
63    tree_ids: Merge<TreeId>,
64    labels: ConflictLabels,
65}
66
67impl fmt::Debug for MergedTree {
68    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69        f.debug_struct("MergedTree")
70            .field("tree_ids", &self.tree_ids)
71            .field("labels", &self.labels)
72            .finish_non_exhaustive()
73    }
74}
75
76impl MergedTree {
77    /// Creates a `MergedTree` with the given resolved tree ID.
78    pub fn resolved(store: Arc<Store>, tree_id: TreeId) -> Self {
79        Self {
80            store,
81            tree_ids: Merge::resolved(tree_id),
82            labels: ConflictLabels::unlabeled(),
83        }
84    }
85
86    /// Creates a `MergedTree` with the given tree IDs.
87    pub fn new(store: Arc<Store>, tree_ids: Merge<TreeId>, labels: ConflictLabels) -> Self {
88        if let Some(num_sides) = labels.num_sides() {
89            assert_eq!(tree_ids.num_sides(), num_sides);
90        }
91        Self {
92            store,
93            tree_ids,
94            labels,
95        }
96    }
97
98    /// The `Store` associated with this tree.
99    pub fn store(&self) -> &Arc<Store> {
100        &self.store
101    }
102
103    /// The underlying tree IDs for this `MergedTree`. If there are file changes
104    /// between two trees, then the tree IDs will be different.
105    pub fn tree_ids(&self) -> &Merge<TreeId> {
106        &self.tree_ids
107    }
108
109    /// Extracts the underlying tree IDs for this `MergedTree`, discarding any
110    /// conflict labels.
111    pub fn into_tree_ids(self) -> Merge<TreeId> {
112        self.tree_ids
113    }
114
115    /// Returns this merge's conflict labels, if any.
116    pub fn labels(&self) -> &ConflictLabels {
117        &self.labels
118    }
119
120    /// Returns both the underlying tree IDs and any conflict labels. This can
121    /// be used to check whether there are changes in files to be materialized
122    /// in the working copy.
123    pub fn tree_ids_and_labels(&self) -> (&Merge<TreeId>, &ConflictLabels) {
124        (&self.tree_ids, &self.labels)
125    }
126
127    /// Extracts the underlying tree IDs and conflict labels.
128    pub fn into_tree_ids_and_labels(self) -> (Merge<TreeId>, ConflictLabels) {
129        (self.tree_ids, self.labels)
130    }
131
132    /// Reads the merge of tree objects represented by this `MergedTree`.
133    pub async fn trees(&self) -> BackendResult<Merge<Tree>> {
134        self.tree_ids
135            .try_map_async(|id| self.store.get_tree_async(RepoPathBuf::root(), id))
136            .await
137    }
138
139    /// Returns a label for each term in a merge. Resolved merges use the
140    /// provided label, while conflicted merges keep their original labels.
141    /// Missing labels are indicated by empty strings.
142    pub fn labels_by_term<'a>(&'a self, label: &'a str) -> Merge<&'a str> {
143        if self.tree_ids.is_resolved() {
144            assert!(!self.labels.has_labels());
145            Merge::resolved(label)
146        } else if self.labels.has_labels() {
147            // If the merge is conflicted and it already has labels, then we want to use
148            // those labels instead of the provided label. This ensures that rebasing
149            // conflicted commits keeps meaningful labels.
150            let labels = self.labels.as_merge();
151            assert_eq!(labels.num_sides(), self.tree_ids.num_sides());
152            labels.map(|label| label.as_str())
153        } else {
154            // If the merge is conflicted but it doesn't have labels (e.g. conflicts created
155            // before labels were added), then we use empty strings to indicate missing
156            // labels. We could consider using `label` for all the sides instead, but it
157            // might be confusing.
158            Merge::repeated("", self.tree_ids.num_sides())
159        }
160    }
161
162    /// Tries to resolve any conflicts, resolving any conflicts that can be
163    /// automatically resolved and leaving the rest unresolved.
164    pub async fn resolve(self) -> BackendResult<Self> {
165        let merged = merge_trees(&self.store, self.tree_ids).await?;
166        // If the result can be resolved, then `merge_trees()` above would have returned
167        // a resolved merge. However, that function will always preserve the arity of
168        // conflicts it cannot resolve. So we simplify the conflict again
169        // here to possibly reduce a complex conflict to a simpler one.
170        let (simplified_labels, simplified) = if merged.is_resolved() {
171            (ConflictLabels::unlabeled(), merged)
172        } else {
173            self.labels.simplify_with(&merged)
174        };
175        // If debug assertions are enabled, check that the merge was idempotent. In
176        // particular, that this last simplification doesn't enable further automatic
177        // resolutions
178        if cfg!(debug_assertions) {
179            let re_merged = merge_trees(&self.store, simplified.clone()).await.unwrap();
180            debug_assert_eq!(re_merged, simplified);
181        }
182        Ok(Self {
183            store: self.store,
184            tree_ids: simplified,
185            labels: simplified_labels,
186        })
187    }
188
189    /// An iterator over the conflicts in this tree, including subtrees.
190    /// Recurses into subtrees and yields conflicts in those, but only if
191    /// all sides are trees, so tree/file conflicts will be reported as a single
192    /// conflict, not one for each path in the tree.
193    pub fn conflicts(
194        &self,
195    ) -> impl Iterator<Item = (RepoPathBuf, BackendResult<MergedTreeValue>)> + use<> {
196        self.conflicts_matching(&EverythingMatcher)
197    }
198
199    /// Like `conflicts()` but restricted by a matcher.
200    pub fn conflicts_matching<'matcher>(
201        &self,
202        matcher: &'matcher dyn Matcher,
203    ) -> impl Iterator<Item = (RepoPathBuf, BackendResult<MergedTreeValue>)> + use<'matcher> {
204        ConflictIterator::new(self, matcher)
205    }
206
207    /// Whether this tree has conflicts.
208    pub fn has_conflict(&self) -> bool {
209        !self.tree_ids.is_resolved()
210    }
211
212    /// The value at the given path. The value can be `Resolved` even if
213    /// `self` is a `Conflict`, which happens if the value at the path can be
214    /// trivially merged.
215    pub fn path_value(&self, path: &RepoPath) -> BackendResult<MergedTreeValue> {
216        self.path_value_async(path).block_on()
217    }
218
219    /// Async version of `path_value()`.
220    pub async fn path_value_async(&self, path: &RepoPath) -> BackendResult<MergedTreeValue> {
221        match path.split() {
222            Some((dir, basename)) => {
223                let trees = self.trees().await?;
224                match trees.sub_tree_recursive(dir).await? {
225                    None => Ok(Merge::absent()),
226                    Some(tree) => Ok(tree.value(basename).cloned()),
227                }
228            }
229            None => Ok(self.to_merged_tree_value()),
230        }
231    }
232
233    fn to_merged_tree_value(&self) -> MergedTreeValue {
234        self.tree_ids
235            .map(|tree_id| Some(TreeValue::Tree(tree_id.clone())))
236    }
237
238    /// Iterator over the entries matching the given matcher. Subtrees are
239    /// visited recursively. Subtrees that differ between the current
240    /// `MergedTree`'s terms are merged on the fly. Missing terms are treated as
241    /// empty directories. Subtrees that conflict with non-trees are not
242    /// visited. For example, if current tree is a merge of 3 trees, and the
243    /// entry for 'foo' is a conflict between a change subtree and a symlink
244    /// (i.e. the subdirectory was replaced by symlink in one side of the
245    /// conflict), then the entry for `foo` itself will be emitted, but no
246    /// entries from inside `foo/` from either of the trees will be.
247    pub fn entries(&self) -> TreeEntriesIterator<'static> {
248        self.entries_matching(&EverythingMatcher)
249    }
250
251    /// Like `entries()` but restricted by a matcher.
252    pub fn entries_matching<'matcher>(
253        &self,
254        matcher: &'matcher dyn Matcher,
255    ) -> TreeEntriesIterator<'matcher> {
256        TreeEntriesIterator::new(self, matcher)
257    }
258
259    /// Stream of the differences between this tree and another tree.
260    fn diff_stream_internal<'matcher>(
261        &self,
262        other: &Self,
263        matcher: &'matcher dyn Matcher,
264    ) -> TreeDiffStream<'matcher> {
265        let concurrency = self.store().concurrency();
266        if concurrency <= 1 {
267            Box::pin(futures::stream::iter(TreeDiffIterator::new(
268                self, other, matcher,
269            )))
270        } else {
271            Box::pin(TreeDiffStreamImpl::new(self, other, matcher, concurrency))
272        }
273    }
274
275    /// Stream of the differences between this tree and another tree.
276    pub fn diff_stream<'matcher>(
277        &self,
278        other: &Self,
279        matcher: &'matcher dyn Matcher,
280    ) -> TreeDiffStream<'matcher> {
281        stream_without_trees(self.diff_stream_internal(other, matcher))
282    }
283
284    /// Like `diff_stream()` but trees with diffs themselves are also included.
285    pub fn diff_stream_with_trees<'matcher>(
286        &self,
287        other: &Self,
288        matcher: &'matcher dyn Matcher,
289    ) -> TreeDiffStream<'matcher> {
290        self.diff_stream_internal(other, matcher)
291    }
292
293    /// Like `diff_stream()` but files in a removed tree will be returned before
294    /// a file that replaces it.
295    pub fn diff_stream_for_file_system<'matcher>(
296        &self,
297        other: &Self,
298        matcher: &'matcher dyn Matcher,
299    ) -> TreeDiffStream<'matcher> {
300        Box::pin(DiffStreamForFileSystem::new(
301            self.diff_stream_internal(other, matcher),
302        ))
303    }
304
305    /// Like `diff_stream()` but takes the given copy records into account.
306    pub fn diff_stream_with_copies<'a>(
307        &self,
308        other: &Self,
309        matcher: &'a dyn Matcher,
310        copy_records: &'a CopyRecords,
311    ) -> BoxStream<'a, CopiesTreeDiffEntry> {
312        let stream = self.diff_stream(other, matcher);
313        Box::pin(CopiesTreeDiffStream::new(
314            stream,
315            self.clone(),
316            other.clone(),
317            copy_records,
318        ))
319    }
320
321    /// Merges the provided trees into a single `MergedTree`. Any conflicts will
322    /// be resolved recursively if possible. The provided labels are used if a
323    /// conflict arises. However, if one of the input trees is already
324    /// conflicted, the corresponding label will be ignored, and its existing
325    /// labels will be used instead.
326    pub async fn merge(merge: Merge<(Self, String)>) -> BackendResult<Self> {
327        Self::merge_no_resolve(merge).resolve().await
328    }
329
330    /// Merges the provided trees into a single `MergedTree`, without attempting
331    /// to resolve file conflicts.
332    pub fn merge_no_resolve(merge: Merge<(Self, String)>) -> Self {
333        debug_assert!(
334            merge
335                .iter()
336                .map(|(tree, _)| Arc::as_ptr(tree.store()))
337                .all_equal()
338        );
339        let store = merge.first().0.store().clone();
340        let flattened_labels = ConflictLabels::from_merge(
341            merge
342                .map(|(tree, label)| tree.labels_by_term(label))
343                .flatten()
344                .map(|&label| label.to_owned()),
345        );
346        let flattened_tree_ids: Merge<TreeId> = merge
347            .into_iter()
348            .map(|(tree, _label)| tree.into_tree_ids())
349            .collect::<MergeBuilder<_>>()
350            .build()
351            .flatten();
352
353        let (labels, tree_ids) = flattened_labels.simplify_with(&flattened_tree_ids);
354        Self::new(store, tree_ids, labels)
355    }
356}
357
358/// A single entry in a tree diff.
359#[derive(Debug)]
360pub struct TreeDiffEntry {
361    /// The path.
362    pub path: RepoPathBuf,
363    /// The resolved tree values if available.
364    pub values: BackendResult<Diff<MergedTreeValue>>,
365}
366
367/// Type alias for the result from `MergedTree::diff_stream()`. We use a
368/// `Stream` instead of an `Iterator` so high-latency backends (e.g. cloud-based
369/// ones) can fetch trees asynchronously.
370pub type TreeDiffStream<'matcher> = BoxStream<'matcher, TreeDiffEntry>;
371
372fn all_tree_entries(
373    trees: &Merge<Tree>,
374) -> impl Iterator<Item = (&RepoPathComponent, MergedTreeVal<'_>)> {
375    if let Some(tree) = trees.as_resolved() {
376        let iter = tree
377            .entries_non_recursive()
378            .map(|entry| (entry.name(), Merge::normal(entry.value())));
379        Either::Left(iter)
380    } else {
381        let same_change = trees.first().store().merge_options().same_change;
382        let iter = all_merged_tree_entries(trees).map(move |(name, values)| {
383            // TODO: move resolve_trivial() to caller?
384            let values = match values.resolve_trivial(same_change) {
385                Some(resolved) => Merge::resolved(*resolved),
386                None => values,
387            };
388            (name, values)
389        });
390        Either::Right(iter)
391    }
392}
393
394/// Suppose the given `trees` aren't resolved, iterates `(name, values)` pairs
395/// non-recursively. This also works if `trees` are resolved, but is more costly
396/// than `tree.entries_non_recursive()`.
397pub fn all_merged_tree_entries(
398    trees: &Merge<Tree>,
399) -> impl Iterator<Item = (&RepoPathComponent, MergedTreeVal<'_>)> {
400    let mut entries_iters = trees
401        .iter()
402        .map(|tree| tree.entries_non_recursive().peekable())
403        .collect_vec();
404    iter::from_fn(move || {
405        let next_name = entries_iters
406            .iter_mut()
407            .filter_map(|iter| iter.peek())
408            .map(|entry| entry.name())
409            .min()?;
410        let values: MergeBuilder<_> = entries_iters
411            .iter_mut()
412            .map(|iter| {
413                let entry = iter.next_if(|entry| entry.name() == next_name)?;
414                Some(entry.value())
415            })
416            .collect();
417        Some((next_name, values.build()))
418    })
419}
420
421fn merged_tree_entry_diff<'a>(
422    trees1: &'a Merge<Tree>,
423    trees2: &'a Merge<Tree>,
424) -> impl Iterator<Item = (&'a RepoPathComponent, Diff<MergedTreeVal<'a>>)> {
425    itertools::merge_join_by(
426        all_tree_entries(trees1),
427        all_tree_entries(trees2),
428        |(name1, _), (name2, _)| name1.cmp(name2),
429    )
430    .map(|entry| match entry {
431        EitherOrBoth::Both((name, value1), (_, value2)) => (name, Diff::new(value1, value2)),
432        EitherOrBoth::Left((name, value1)) => (name, Diff::new(value1, Merge::absent())),
433        EitherOrBoth::Right((name, value2)) => (name, Diff::new(Merge::absent(), value2)),
434    })
435    .filter(|(_, diff)| diff.is_changed())
436}
437
438/// Recursive iterator over the entries in a tree.
439pub struct TreeEntriesIterator<'matcher> {
440    store: Arc<Store>,
441    stack: Vec<TreeEntriesDirItem>,
442    matcher: &'matcher dyn Matcher,
443}
444
445struct TreeEntriesDirItem {
446    entries: Vec<(RepoPathBuf, MergedTreeValue)>,
447}
448
449impl TreeEntriesDirItem {
450    fn new(trees: &Merge<Tree>, matcher: &dyn Matcher) -> Self {
451        let mut entries = vec![];
452        let dir = trees.first().dir();
453        for (name, value) in all_tree_entries(trees) {
454            let path = dir.join(name);
455            if value.is_tree() {
456                // TODO: Handle the other cases (specific files and trees)
457                if matcher.visit(&path).is_nothing() {
458                    continue;
459                }
460            } else if !matcher.matches(&path) {
461                continue;
462            }
463            entries.push((path, value.cloned()));
464        }
465        entries.reverse();
466        Self { entries }
467    }
468}
469
470impl<'matcher> TreeEntriesIterator<'matcher> {
471    fn new(trees: &MergedTree, matcher: &'matcher dyn Matcher) -> Self {
472        Self {
473            store: trees.store.clone(),
474            stack: vec![TreeEntriesDirItem {
475                entries: vec![(RepoPathBuf::root(), trees.to_merged_tree_value())],
476            }],
477            matcher,
478        }
479    }
480}
481
482impl Iterator for TreeEntriesIterator<'_> {
483    type Item = (RepoPathBuf, BackendResult<MergedTreeValue>);
484
485    fn next(&mut self) -> Option<Self::Item> {
486        while let Some(top) = self.stack.last_mut() {
487            if let Some((path, value)) = top.entries.pop() {
488                let maybe_trees = match value.to_tree_merge(&self.store, &path).block_on() {
489                    Ok(maybe_trees) => maybe_trees,
490                    Err(err) => return Some((path, Err(err))),
491                };
492                if let Some(trees) = maybe_trees {
493                    self.stack
494                        .push(TreeEntriesDirItem::new(&trees, self.matcher));
495                } else {
496                    return Some((path, Ok(value)));
497                }
498            } else {
499                self.stack.pop();
500            }
501        }
502        None
503    }
504}
505
506/// The state for the non-recursive iteration over the conflicted entries in a
507/// single directory.
508struct ConflictsDirItem {
509    entries: Vec<(RepoPathBuf, MergedTreeValue)>,
510}
511
512impl ConflictsDirItem {
513    fn new(trees: &Merge<Tree>, matcher: &dyn Matcher) -> Self {
514        if trees.is_resolved() {
515            return Self { entries: vec![] };
516        }
517
518        let dir = trees.first().dir();
519        let mut entries = vec![];
520        for (basename, value) in all_tree_entries(trees) {
521            if value.is_resolved() {
522                continue;
523            }
524            let path = dir.join(basename);
525            if value.is_tree() {
526                if matcher.visit(&path).is_nothing() {
527                    continue;
528                }
529            } else if !matcher.matches(&path) {
530                continue;
531            }
532            entries.push((path, value.cloned()));
533        }
534        entries.reverse();
535        Self { entries }
536    }
537}
538
539struct ConflictIterator<'matcher> {
540    store: Arc<Store>,
541    stack: Vec<ConflictsDirItem>,
542    matcher: &'matcher dyn Matcher,
543}
544
545impl<'matcher> ConflictIterator<'matcher> {
546    fn new(tree: &MergedTree, matcher: &'matcher dyn Matcher) -> Self {
547        Self {
548            store: tree.store().clone(),
549            stack: vec![ConflictsDirItem {
550                entries: vec![(RepoPathBuf::root(), tree.to_merged_tree_value())],
551            }],
552            matcher,
553        }
554    }
555}
556
557impl Iterator for ConflictIterator<'_> {
558    type Item = (RepoPathBuf, BackendResult<MergedTreeValue>);
559
560    fn next(&mut self) -> Option<Self::Item> {
561        while let Some(top) = self.stack.last_mut() {
562            if let Some((path, tree_values)) = top.entries.pop() {
563                match tree_values.to_tree_merge(&self.store, &path).block_on() {
564                    Ok(Some(trees)) => {
565                        // If all sides are trees or missing, descend into the merged tree
566                        self.stack.push(ConflictsDirItem::new(&trees, self.matcher));
567                    }
568                    Ok(None) => {
569                        // Otherwise this is a conflict between files, trees, etc. If they could
570                        // be automatically resolved, they should have been when the top-level
571                        // tree conflict was written, so we assume that they can't be.
572                        return Some((path, Ok(tree_values)));
573                    }
574                    Err(err) => {
575                        return Some((path, Err(err)));
576                    }
577                }
578            } else {
579                self.stack.pop();
580            }
581        }
582        None
583    }
584}
585
586/// Iterator over the differences between two trees.
587pub struct TreeDiffIterator<'matcher> {
588    store: Arc<Store>,
589    stack: Vec<TreeDiffDir>,
590    matcher: &'matcher dyn Matcher,
591}
592
593struct TreeDiffDir {
594    entries: Vec<(RepoPathBuf, Diff<MergedTreeValue>)>,
595}
596
597impl<'matcher> TreeDiffIterator<'matcher> {
598    /// Creates a iterator over the differences between two trees.
599    pub fn new(tree1: &MergedTree, tree2: &MergedTree, matcher: &'matcher dyn Matcher) -> Self {
600        assert!(Arc::ptr_eq(tree1.store(), tree2.store()));
601        let root_dir = RepoPath::root();
602        let mut stack = Vec::new();
603        let root_diff = Diff::new(tree1.to_merged_tree_value(), tree2.to_merged_tree_value());
604        if root_diff.is_changed() && !matcher.visit(root_dir).is_nothing() {
605            stack.push(TreeDiffDir {
606                entries: vec![(root_dir.to_owned(), root_diff)],
607            });
608        }
609        Self {
610            store: tree1.store().clone(),
611            stack,
612            matcher,
613        }
614    }
615
616    /// Gets the given trees if `values` are trees, otherwise an empty tree.
617    fn trees(
618        store: &Arc<Store>,
619        dir: &RepoPath,
620        values: &MergedTreeValue,
621    ) -> BackendResult<Merge<Tree>> {
622        if let Some(trees) = values.to_tree_merge(store, dir).block_on()? {
623            Ok(trees)
624        } else {
625            Ok(Merge::resolved(Tree::empty(store.clone(), dir.to_owned())))
626        }
627    }
628}
629
630impl TreeDiffDir {
631    fn from_trees(
632        dir: &RepoPath,
633        trees1: &Merge<Tree>,
634        trees2: &Merge<Tree>,
635        matcher: &dyn Matcher,
636    ) -> Self {
637        let mut entries = vec![];
638        for (name, diff) in merged_tree_entry_diff(trees1, trees2) {
639            let path = dir.join(name);
640            let tree_before = diff.before.is_tree();
641            let tree_after = diff.after.is_tree();
642            // Check if trees and files match, but only if either side is a tree or a file
643            // (don't query the matcher unnecessarily).
644            let tree_matches = (tree_before || tree_after) && !matcher.visit(&path).is_nothing();
645            let file_matches = (!tree_before || !tree_after) && matcher.matches(&path);
646
647            // Replace trees or files that don't match by `Merge::absent()`
648            let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
649                diff.before
650            } else {
651                Merge::absent()
652            };
653            let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
654                diff.after
655            } else {
656                Merge::absent()
657            };
658            if before.is_absent() && after.is_absent() {
659                continue;
660            }
661            entries.push((path, Diff::new(before.cloned(), after.cloned())));
662        }
663        entries.reverse();
664        Self { entries }
665    }
666}
667
668impl Iterator for TreeDiffIterator<'_> {
669    type Item = TreeDiffEntry;
670
671    fn next(&mut self) -> Option<Self::Item> {
672        while let Some(top) = self.stack.last_mut() {
673            let (path, diff) = match top.entries.pop() {
674                Some(entry) => entry,
675                None => {
676                    self.stack.pop().unwrap();
677                    continue;
678                }
679            };
680
681            if diff.before.is_tree() || diff.after.is_tree() {
682                let (before_tree, after_tree) = match (
683                    Self::trees(&self.store, &path, &diff.before),
684                    Self::trees(&self.store, &path, &diff.after),
685                ) {
686                    (Ok(before_tree), Ok(after_tree)) => (before_tree, after_tree),
687                    (Err(before_err), _) => {
688                        return Some(TreeDiffEntry {
689                            path,
690                            values: Err(before_err),
691                        });
692                    }
693                    (_, Err(after_err)) => {
694                        return Some(TreeDiffEntry {
695                            path,
696                            values: Err(after_err),
697                        });
698                    }
699                };
700                let subdir =
701                    TreeDiffDir::from_trees(&path, &before_tree, &after_tree, self.matcher);
702                self.stack.push(subdir);
703            }
704            if diff.before.is_file_like()
705                || diff.after.is_file_like()
706                || self.matcher.matches(&path)
707            {
708                return Some(TreeDiffEntry {
709                    path,
710                    values: Ok(diff),
711                });
712            }
713        }
714        None
715    }
716}
717
718/// Stream of differences between two trees.
719pub struct TreeDiffStreamImpl<'matcher> {
720    store: Arc<Store>,
721    matcher: &'matcher dyn Matcher,
722    /// Pairs of tree values that may or may not be ready to emit, sorted in the
723    /// order we want to emit them. If either side is a tree, there will be
724    /// a corresponding entry in `pending_trees`. The item is ready to emit
725    /// unless there's a smaller or equal path in `pending_trees`.
726    items: BTreeMap<RepoPathBuf, BackendResult<Diff<MergedTreeValue>>>,
727    // TODO: Is it better to combine this and `items` into a single map?
728    #[expect(clippy::type_complexity)]
729    pending_trees:
730        BTreeMap<RepoPathBuf, BoxFuture<'matcher, BackendResult<(Merge<Tree>, Merge<Tree>)>>>,
731    /// The maximum number of trees to request concurrently. However, we do the
732    /// accounting per path, so there will often be twice as many pending
733    /// `Backend::read_tree()` calls - for the "before" and "after" sides. For
734    /// conflicts, there will be even more.
735    max_concurrent_reads: usize,
736    /// The maximum number of items in `items`. However, we will always add the
737    /// full differences from a particular pair of trees, so it may temporarily
738    /// go over the limit (until we emit those items). It may also go over the
739    /// limit because we have a file item that's blocked by pending subdirectory
740    /// items.
741    max_queued_items: usize,
742}
743
744impl<'matcher> TreeDiffStreamImpl<'matcher> {
745    /// Creates a iterator over the differences between two trees. Generally
746    /// prefer `MergedTree::diff_stream()` of calling this directly.
747    pub fn new(
748        tree1: &MergedTree,
749        tree2: &MergedTree,
750        matcher: &'matcher dyn Matcher,
751        max_concurrent_reads: usize,
752    ) -> Self {
753        assert!(Arc::ptr_eq(tree1.store(), tree2.store()));
754        let store = tree1.store().clone();
755        let mut stream = Self {
756            store: store.clone(),
757            matcher,
758            items: BTreeMap::new(),
759            pending_trees: BTreeMap::new(),
760            max_concurrent_reads,
761            max_queued_items: 10000,
762        };
763        let dir = RepoPathBuf::root();
764        let merged_tree1 = tree1.to_merged_tree_value();
765        let merged_tree2 = tree2.to_merged_tree_value();
766        let root_diff = Diff::new(merged_tree1.clone(), merged_tree2.clone());
767        if root_diff.is_changed() && matcher.matches(&dir) {
768            stream.items.insert(dir.clone(), Ok(root_diff));
769        }
770        let root_tree_fut = Box::pin(try_join(
771            Self::trees(store.clone(), dir.clone(), merged_tree1),
772            Self::trees(store, dir.clone(), merged_tree2),
773        ));
774        stream.pending_trees.insert(dir, root_tree_fut);
775        stream
776    }
777
778    async fn single_tree(
779        store: &Arc<Store>,
780        dir: RepoPathBuf,
781        value: Option<&TreeValue>,
782    ) -> BackendResult<Tree> {
783        match value {
784            Some(TreeValue::Tree(tree_id)) => store.get_tree_async(dir, tree_id).await,
785            _ => Ok(Tree::empty(store.clone(), dir.clone())),
786        }
787    }
788
789    /// Gets the given trees if `values` are trees, otherwise an empty tree.
790    async fn trees(
791        store: Arc<Store>,
792        dir: RepoPathBuf,
793        values: MergedTreeValue,
794    ) -> BackendResult<Merge<Tree>> {
795        if values.is_tree() {
796            values
797                .try_map_async(|value| Self::single_tree(&store, dir.clone(), value.as_ref()))
798                .await
799        } else {
800            Ok(Merge::resolved(Tree::empty(store, dir)))
801        }
802    }
803
804    fn add_dir_diff_items(&mut self, dir: &RepoPath, trees1: &Merge<Tree>, trees2: &Merge<Tree>) {
805        for (basename, diff) in merged_tree_entry_diff(trees1, trees2) {
806            let path = dir.join(basename);
807            let tree_before = diff.before.is_tree();
808            let tree_after = diff.after.is_tree();
809            // Check if trees and files match, but only if either side is a tree or a file
810            // (don't query the matcher unnecessarily).
811            let tree_matches =
812                (tree_before || tree_after) && !self.matcher.visit(&path).is_nothing();
813            let file_matches = (!tree_before || !tree_after) && self.matcher.matches(&path);
814
815            // Replace trees or files that don't match by `Merge::absent()`
816            let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
817                diff.before
818            } else {
819                Merge::absent()
820            };
821            let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
822                diff.after
823            } else {
824                Merge::absent()
825            };
826            if before.is_absent() && after.is_absent() {
827                continue;
828            }
829
830            // If the path was a tree on either side of the diff, read those trees.
831            if tree_matches {
832                let before_tree_future =
833                    Self::trees(self.store.clone(), path.clone(), before.cloned());
834                let after_tree_future =
835                    Self::trees(self.store.clone(), path.clone(), after.cloned());
836                let both_trees_future = try_join(before_tree_future, after_tree_future);
837                self.pending_trees
838                    .insert(path.clone(), Box::pin(both_trees_future));
839            }
840
841            if file_matches || self.matcher.matches(&path) {
842                self.items
843                    .insert(path, Ok(Diff::new(before.cloned(), after.cloned())));
844            }
845        }
846    }
847
848    fn poll_tree_futures(&mut self, cx: &mut Context<'_>) {
849        loop {
850            let mut tree_diffs = vec![];
851            let mut some_pending = false;
852            let mut all_pending = true;
853            for (dir, future) in self
854                .pending_trees
855                .iter_mut()
856                .take(self.max_concurrent_reads)
857            {
858                if let Poll::Ready(tree_diff) = future.as_mut().poll(cx) {
859                    all_pending = false;
860                    tree_diffs.push((dir.clone(), tree_diff));
861                } else {
862                    some_pending = true;
863                }
864            }
865
866            for (dir, tree_diff) in tree_diffs {
867                drop(self.pending_trees.remove_entry(&dir).unwrap());
868                match tree_diff {
869                    Ok((trees1, trees2)) => {
870                        self.add_dir_diff_items(&dir, &trees1, &trees2);
871                    }
872                    Err(err) => {
873                        self.items.insert(dir, Err(err));
874                    }
875                }
876            }
877
878            // If none of the futures have been polled and returned `Poll::Pending`, we must
879            // not return. If we did, nothing would call the waker so we might never get
880            // polled again.
881            if all_pending || (some_pending && self.items.len() >= self.max_queued_items) {
882                return;
883            }
884        }
885    }
886}
887
888impl Stream for TreeDiffStreamImpl<'_> {
889    type Item = TreeDiffEntry;
890
891    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
892        // Go through all pending tree futures and poll them.
893        self.poll_tree_futures(cx);
894
895        // Now emit the first file, or the first tree that completed with an error
896        if let Some((path, _)) = self.items.first_key_value() {
897            // Check if there are any pending trees before this item that we need to finish
898            // polling before we can emit this item.
899            if let Some((dir, _)) = self.pending_trees.first_key_value()
900                && dir < path
901            {
902                return Poll::Pending;
903            }
904
905            let (path, values) = self.items.pop_first().unwrap();
906            Poll::Ready(Some(TreeDiffEntry { path, values }))
907        } else if self.pending_trees.is_empty() {
908            Poll::Ready(None)
909        } else {
910            Poll::Pending
911        }
912    }
913}
914
915fn stream_without_trees(stream: TreeDiffStream) -> TreeDiffStream {
916    Box::pin(stream.filter_map(|mut entry| async move {
917        let skip_tree = |merge: MergedTreeValue| {
918            if merge.is_tree() {
919                Merge::absent()
920            } else {
921                merge
922            }
923        };
924        entry.values = entry.values.map(|diff| diff.map(skip_tree));
925
926        // Filter out entries where neither side is present.
927        let any_present = entry.values.as_ref().map_or(true, |diff| {
928            diff.before.is_present() || diff.after.is_present()
929        });
930        any_present.then_some(entry)
931    }))
932}
933
934/// Adapts a `TreeDiffStream` to emit a added file at a given path after a
935/// removed directory at the same path.
936struct DiffStreamForFileSystem<'a> {
937    inner: TreeDiffStream<'a>,
938    next_item: Option<TreeDiffEntry>,
939    held_file: Option<TreeDiffEntry>,
940}
941
942impl<'a> DiffStreamForFileSystem<'a> {
943    fn new(inner: TreeDiffStream<'a>) -> Self {
944        Self {
945            inner,
946            next_item: None,
947            held_file: None,
948        }
949    }
950}
951
952impl Stream for DiffStreamForFileSystem<'_> {
953    type Item = TreeDiffEntry;
954
955    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
956        while let Some(next) = match self.next_item.take() {
957            Some(next) => Some(next),
958            None => ready!(self.inner.as_mut().poll_next(cx)),
959        } {
960            // Filter out changes where neither side (before or after) is_file_like.
961            // This ensures we only process file-level changes and transitions.
962            if let Ok(diff) = &next.values
963                && !diff.before.is_file_like()
964                && !diff.after.is_file_like()
965            {
966                continue;
967            }
968
969            // If there's a held file "foo" and the next item to emit is not "foo/...", then
970            // we must be done with the "foo/" directory and it's time to emit "foo" as a
971            // removed file.
972            if let Some(held_entry) = self
973                .held_file
974                .take_if(|held_entry| !next.path.starts_with(&held_entry.path))
975            {
976                self.next_item = Some(next);
977                return Poll::Ready(Some(held_entry));
978            }
979
980            match next.values {
981                Ok(diff) if diff.before.is_tree() => {
982                    assert!(diff.after.is_present());
983                    assert!(self.held_file.is_none());
984                    self.held_file = Some(TreeDiffEntry {
985                        path: next.path,
986                        values: Ok(Diff::new(Merge::absent(), diff.after)),
987                    });
988                }
989                Ok(diff) if diff.after.is_tree() => {
990                    assert!(diff.before.is_present());
991                    return Poll::Ready(Some(TreeDiffEntry {
992                        path: next.path,
993                        values: Ok(Diff::new(diff.before, Merge::absent())),
994                    }));
995                }
996                _ => {
997                    return Poll::Ready(Some(next));
998                }
999            }
1000        }
1001        Poll::Ready(self.held_file.take())
1002    }
1003}