Skip to main content

jj_lib/
merged_tree.rs

1// Copyright 2023 The Jujutsu Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A lazily merged view of a set of trees.
16
17use std::collections::BTreeMap;
18use std::fmt;
19use std::iter;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::Context;
23use std::task::Poll;
24use std::task::ready;
25use std::vec;
26
27use either::Either;
28use futures::Stream;
29use futures::StreamExt as _;
30use futures::future::BoxFuture;
31use futures::future::try_join;
32use futures::stream::BoxStream;
33use itertools::EitherOrBoth;
34use itertools::Itertools as _;
35use pollster::FutureExt as _;
36
37use crate::backend::BackendResult;
38use crate::backend::TreeId;
39use crate::backend::TreeValue;
40use crate::conflict_labels::ConflictLabels;
41use crate::copies::CopiesTreeDiffEntry;
42use crate::copies::CopiesTreeDiffStream;
43use crate::copies::CopyRecords;
44use crate::matchers::EverythingMatcher;
45use crate::matchers::Matcher;
46use crate::merge::Diff;
47use crate::merge::Merge;
48use crate::merge::MergeBuilder;
49use crate::merge::MergedTreeVal;
50use crate::merge::MergedTreeValue;
51use crate::repo_path::RepoPath;
52use crate::repo_path::RepoPathBuf;
53use crate::repo_path::RepoPathComponent;
54use crate::store::Store;
55use crate::tree::Tree;
56use crate::tree_merge::merge_trees;
57
58/// Presents a view of a merged set of trees at the root directory, as well as
59/// conflict labels.
60#[derive(Clone)]
61pub struct MergedTree {
62    store: Arc<Store>,
63    tree_ids: Merge<TreeId>,
64    labels: ConflictLabels,
65}
66
67impl fmt::Debug for MergedTree {
68    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69        f.debug_struct("MergedTree")
70            .field("tree_ids", &self.tree_ids)
71            .field("labels", &self.labels)
72            .finish_non_exhaustive()
73    }
74}
75
76impl MergedTree {
77    /// Creates a `MergedTree` with the given resolved tree ID.
78    pub fn resolved(store: Arc<Store>, tree_id: TreeId) -> Self {
79        Self {
80            store,
81            tree_ids: Merge::resolved(tree_id),
82            labels: ConflictLabels::unlabeled(),
83        }
84    }
85
86    /// Creates a `MergedTree` with the given tree IDs.
87    pub fn new(store: Arc<Store>, tree_ids: Merge<TreeId>, labels: ConflictLabels) -> Self {
88        if let Some(num_sides) = labels.num_sides() {
89            assert_eq!(tree_ids.num_sides(), num_sides);
90        }
91        Self {
92            store,
93            tree_ids,
94            labels,
95        }
96    }
97
98    /// The `Store` associated with this tree.
99    pub fn store(&self) -> &Arc<Store> {
100        &self.store
101    }
102
103    /// The underlying tree IDs for this `MergedTree`. If there are file changes
104    /// between two trees, then the tree IDs will be different.
105    pub fn tree_ids(&self) -> &Merge<TreeId> {
106        &self.tree_ids
107    }
108
109    /// Extracts the underlying tree IDs for this `MergedTree`, discarding any
110    /// conflict labels.
111    pub fn into_tree_ids(self) -> Merge<TreeId> {
112        self.tree_ids
113    }
114
115    /// Returns this merge's conflict labels, if any.
116    pub fn labels(&self) -> &ConflictLabels {
117        &self.labels
118    }
119
120    /// Returns both the underlying tree IDs and any conflict labels. This can
121    /// be used to check whether there are changes in files to be materialized
122    /// in the working copy.
123    pub fn tree_ids_and_labels(&self) -> (&Merge<TreeId>, &ConflictLabels) {
124        (&self.tree_ids, &self.labels)
125    }
126
127    /// Extracts the underlying tree IDs and conflict labels.
128    pub fn into_tree_ids_and_labels(self) -> (Merge<TreeId>, ConflictLabels) {
129        (self.tree_ids, self.labels)
130    }
131
132    /// Reads the merge of tree objects represented by this `MergedTree`.
133    pub async fn trees(&self) -> BackendResult<Merge<Tree>> {
134        self.tree_ids
135            .try_map_async(|id| self.store.get_tree_async(RepoPathBuf::root(), id))
136            .await
137    }
138
139    /// Returns a label for each term in a merge. Resolved merges use the
140    /// provided label, while conflicted merges keep their original labels.
141    /// Missing labels are indicated by empty strings.
142    pub fn labels_by_term<'a>(&'a self, label: &'a str) -> Merge<&'a str> {
143        if self.tree_ids.is_resolved() {
144            assert!(!self.labels.has_labels());
145            Merge::resolved(label)
146        } else if self.labels.has_labels() {
147            // If the merge is conflicted and it already has labels, then we want to use
148            // those labels instead of the provided label. This ensures that rebasing
149            // conflicted commits keeps meaningful labels.
150            let labels = self.labels.as_merge();
151            assert_eq!(labels.num_sides(), self.tree_ids.num_sides());
152            labels.map(|label| label.as_str())
153        } else {
154            // If the merge is conflicted but it doesn't have labels (e.g. conflicts created
155            // before labels were added), then we use empty strings to indicate missing
156            // labels. We could consider using `label` for all the sides instead, but it
157            // might be confusing.
158            Merge::repeated("", self.tree_ids.num_sides())
159        }
160    }
161
162    /// Tries to resolve any conflicts, resolving any conflicts that can be
163    /// automatically resolved and leaving the rest unresolved.
164    pub async fn resolve(self) -> BackendResult<Self> {
165        let merged = merge_trees(&self.store, self.tree_ids).await?;
166        // If the result can be resolved, then `merge_trees()` above would have returned
167        // a resolved merge. However, that function will always preserve the arity of
168        // conflicts it cannot resolve. So we simplify the conflict again
169        // here to possibly reduce a complex conflict to a simpler one.
170        let (simplified_labels, simplified) = if merged.is_resolved() {
171            (ConflictLabels::unlabeled(), merged)
172        } else {
173            self.labels.simplify_with(&merged)
174        };
175        // If debug assertions are enabled, check that the merge was idempotent. In
176        // particular, that this last simplification doesn't enable further automatic
177        // resolutions
178        if cfg!(debug_assertions) {
179            let re_merged = merge_trees(&self.store, simplified.clone()).await.unwrap();
180            debug_assert_eq!(re_merged, simplified);
181        }
182        Ok(Self {
183            store: self.store,
184            tree_ids: simplified,
185            labels: simplified_labels,
186        })
187    }
188
189    /// An iterator over the conflicts in this tree, including subtrees.
190    /// Recurses into subtrees and yields conflicts in those, but only if
191    /// all sides are trees, so tree/file conflicts will be reported as a single
192    /// conflict, not one for each path in the tree.
193    pub fn conflicts(
194        &self,
195    ) -> impl Iterator<Item = (RepoPathBuf, BackendResult<MergedTreeValue>)> + use<> {
196        self.conflicts_matching(&EverythingMatcher)
197    }
198
199    /// Like `conflicts()` but restricted by a matcher.
200    pub fn conflicts_matching<'matcher>(
201        &self,
202        matcher: &'matcher dyn Matcher,
203    ) -> impl Iterator<Item = (RepoPathBuf, BackendResult<MergedTreeValue>)> + use<'matcher> {
204        ConflictIterator::new(self, matcher)
205    }
206
207    /// Whether this tree has conflicts.
208    pub fn has_conflict(&self) -> bool {
209        !self.tree_ids.is_resolved()
210    }
211
212    /// The value at the given path. The value can be `Resolved` even if
213    /// `self` is a `Conflict`, which happens if the value at the path can be
214    /// trivially merged.
215    pub fn path_value(&self, path: &RepoPath) -> BackendResult<MergedTreeValue> {
216        self.path_value_async(path).block_on()
217    }
218
219    /// Async version of `path_value()`.
220    pub async fn path_value_async(&self, path: &RepoPath) -> BackendResult<MergedTreeValue> {
221        match path.split() {
222            Some((dir, basename)) => {
223                let trees = self.trees().await?;
224                match trees.sub_tree_recursive(dir).await? {
225                    None => Ok(Merge::absent()),
226                    Some(tree) => Ok(tree.value(basename).cloned()),
227                }
228            }
229            None => Ok(self.to_merged_tree_value()),
230        }
231    }
232
233    fn to_merged_tree_value(&self) -> MergedTreeValue {
234        self.tree_ids
235            .map(|tree_id| Some(TreeValue::Tree(tree_id.clone())))
236    }
237
238    /// Iterator over the entries matching the given matcher. Subtrees are
239    /// visited recursively. Subtrees that differ between the current
240    /// `MergedTree`'s terms are merged on the fly. Missing terms are treated as
241    /// empty directories. Subtrees that conflict with non-trees are not
242    /// visited. For example, if current tree is a merge of 3 trees, and the
243    /// entry for 'foo' is a conflict between a change subtree and a symlink
244    /// (i.e. the subdirectory was replaced by symlink in one side of the
245    /// conflict), then the entry for `foo` itself will be emitted, but no
246    /// entries from inside `foo/` from either of the trees will be.
247    pub fn entries(&self) -> TreeEntriesIterator<'static> {
248        self.entries_matching(&EverythingMatcher)
249    }
250
251    /// Like `entries()` but restricted by a matcher.
252    pub fn entries_matching<'matcher>(
253        &self,
254        matcher: &'matcher dyn Matcher,
255    ) -> TreeEntriesIterator<'matcher> {
256        TreeEntriesIterator::new(self, matcher)
257    }
258
259    /// Stream of the differences between this tree and another tree.
260    fn diff_stream_internal<'matcher>(
261        &self,
262        other: &Self,
263        matcher: &'matcher dyn Matcher,
264    ) -> TreeDiffStream<'matcher> {
265        let concurrency = self.store().concurrency();
266        if concurrency <= 1 {
267            Box::pin(futures::stream::iter(TreeDiffIterator::new(
268                self, other, matcher,
269            )))
270        } else {
271            Box::pin(TreeDiffStreamImpl::new(self, other, matcher, concurrency))
272        }
273    }
274
275    /// Stream of the differences between this tree and another tree.
276    pub fn diff_stream<'matcher>(
277        &self,
278        other: &Self,
279        matcher: &'matcher dyn Matcher,
280    ) -> TreeDiffStream<'matcher> {
281        stream_without_trees(self.diff_stream_internal(other, matcher))
282    }
283
284    /// Like `diff_stream()` but trees with diffs themselves are also included.
285    pub fn diff_stream_with_trees<'matcher>(
286        &self,
287        other: &Self,
288        matcher: &'matcher dyn Matcher,
289    ) -> TreeDiffStream<'matcher> {
290        self.diff_stream_internal(other, matcher)
291    }
292
293    /// Like `diff_stream()` but files in a removed tree will be returned before
294    /// a file that replaces it.
295    pub fn diff_stream_for_file_system<'matcher>(
296        &self,
297        other: &Self,
298        matcher: &'matcher dyn Matcher,
299    ) -> TreeDiffStream<'matcher> {
300        Box::pin(DiffStreamForFileSystem::new(
301            self.diff_stream_internal(other, matcher),
302        ))
303    }
304
305    /// Like `diff_stream()` but takes the given copy records into account.
306    pub fn diff_stream_with_copies<'a>(
307        &self,
308        other: &Self,
309        matcher: &'a dyn Matcher,
310        copy_records: &'a CopyRecords,
311    ) -> BoxStream<'a, CopiesTreeDiffEntry> {
312        let stream = self.diff_stream(other, matcher);
313        Box::pin(CopiesTreeDiffStream::new(
314            stream,
315            self.clone(),
316            other.clone(),
317            copy_records,
318        ))
319    }
320
321    /// Merges the provided trees into a single `MergedTree`. Any conflicts will
322    /// be resolved recursively if possible. The provided labels are used if a
323    /// conflict arises. However, if one of the input trees is already
324    /// conflicted, the corresponding label will be ignored, and its existing
325    /// labels will be used instead.
326    pub async fn merge(merge: Merge<(Self, String)>) -> BackendResult<Self> {
327        Self::merge_no_resolve(merge).resolve().await
328    }
329
330    /// Merges the provided trees into a single `MergedTree`, without attempting
331    /// to resolve file conflicts.
332    pub fn merge_no_resolve(merge: Merge<(Self, String)>) -> Self {
333        debug_assert!(
334            merge
335                .iter()
336                .map(|(tree, _)| Arc::as_ptr(tree.store()))
337                .all_equal()
338        );
339        let store = merge.first().0.store().clone();
340        let flattened_labels = ConflictLabels::from_merge(
341            merge
342                .map(|(tree, label)| tree.labels_by_term(label))
343                .flatten()
344                .map(|&label| label.to_owned()),
345        );
346        let flattened_tree_ids: Merge<TreeId> = merge
347            .into_iter()
348            .map(|(tree, _label)| tree.into_tree_ids())
349            .collect::<MergeBuilder<_>>()
350            .build()
351            .flatten();
352
353        let (labels, tree_ids) = flattened_labels.simplify_with(&flattened_tree_ids);
354        Self::new(store, tree_ids, labels)
355    }
356}
357
358/// A single entry in a tree diff.
359#[derive(Debug)]
360pub struct TreeDiffEntry {
361    /// The path.
362    pub path: RepoPathBuf,
363    /// The resolved tree values if available.
364    pub values: BackendResult<Diff<MergedTreeValue>>,
365}
366
367/// Type alias for the result from `MergedTree::diff_stream()`. We use a
368/// `Stream` instead of an `Iterator` so high-latency backends (e.g. cloud-based
369/// ones) can fetch trees asynchronously.
370pub type TreeDiffStream<'matcher> = BoxStream<'matcher, TreeDiffEntry>;
371
372fn all_tree_entries(
373    trees: &Merge<Tree>,
374) -> impl Iterator<Item = (&RepoPathComponent, MergedTreeVal<'_>)> {
375    if let Some(tree) = trees.as_resolved() {
376        let iter = tree
377            .entries_non_recursive()
378            .map(|entry| (entry.name(), Merge::normal(entry.value())));
379        Either::Left(iter)
380    } else {
381        let same_change = trees.first().store().merge_options().same_change;
382        let iter = all_merged_tree_entries(trees).map(move |(name, values)| {
383            // TODO: move resolve_trivial() to caller?
384            let values = match values.resolve_trivial(same_change) {
385                Some(resolved) => Merge::resolved(*resolved),
386                None => values,
387            };
388            (name, values)
389        });
390        Either::Right(iter)
391    }
392}
393
394/// Suppose the given `trees` aren't resolved, iterates `(name, values)` pairs
395/// non-recursively. This also works if `trees` are resolved, but is more costly
396/// than `tree.entries_non_recursive()`.
397pub fn all_merged_tree_entries(
398    trees: &Merge<Tree>,
399) -> impl Iterator<Item = (&RepoPathComponent, MergedTreeVal<'_>)> {
400    let mut entries_iters = trees
401        .iter()
402        .map(|tree| tree.entries_non_recursive().peekable())
403        .collect_vec();
404    iter::from_fn(move || {
405        let next_name = entries_iters
406            .iter_mut()
407            .filter_map(|iter| iter.peek())
408            .map(|entry| entry.name())
409            .min()?;
410        let values: MergeBuilder<_> = entries_iters
411            .iter_mut()
412            .map(|iter| {
413                let entry = iter.next_if(|entry| entry.name() == next_name)?;
414                Some(entry.value())
415            })
416            .collect();
417        Some((next_name, values.build()))
418    })
419}
420
421fn merged_tree_entry_diff<'a>(
422    trees1: &'a Merge<Tree>,
423    trees2: &'a Merge<Tree>,
424) -> impl Iterator<Item = (&'a RepoPathComponent, Diff<MergedTreeVal<'a>>)> {
425    itertools::merge_join_by(
426        all_tree_entries(trees1),
427        all_tree_entries(trees2),
428        |(name1, _), (name2, _)| name1.cmp(name2),
429    )
430    .map(|entry| match entry {
431        EitherOrBoth::Both((name, value1), (_, value2)) => (name, Diff::new(value1, value2)),
432        EitherOrBoth::Left((name, value1)) => (name, Diff::new(value1, Merge::absent())),
433        EitherOrBoth::Right((name, value2)) => (name, Diff::new(Merge::absent(), value2)),
434    })
435    .filter(|(_, diff)| diff.is_changed())
436}
437
438/// Recursive iterator over the entries in a tree.
439pub struct TreeEntriesIterator<'matcher> {
440    store: Arc<Store>,
441    stack: Vec<TreeEntriesDirItem>,
442    matcher: &'matcher dyn Matcher,
443}
444
445struct TreeEntriesDirItem {
446    entries: Vec<(RepoPathBuf, MergedTreeValue)>,
447}
448
449impl TreeEntriesDirItem {
450    fn new(trees: &Merge<Tree>, matcher: &dyn Matcher) -> Self {
451        let mut entries = vec![];
452        let dir = trees.first().dir();
453        for (name, value) in all_tree_entries(trees) {
454            let path = dir.join(name);
455            if value.is_tree() {
456                // TODO: Handle the other cases (specific files and trees)
457                if matcher.visit(&path).is_nothing() {
458                    continue;
459                }
460            } else if !matcher.matches(&path) {
461                continue;
462            }
463            entries.push((path, value.cloned()));
464        }
465        entries.reverse();
466        Self { entries }
467    }
468}
469
470impl<'matcher> TreeEntriesIterator<'matcher> {
471    fn new(trees: &MergedTree, matcher: &'matcher dyn Matcher) -> Self {
472        Self {
473            store: trees.store.clone(),
474            stack: vec![TreeEntriesDirItem {
475                entries: vec![(RepoPathBuf::root(), trees.to_merged_tree_value())],
476            }],
477            matcher,
478        }
479    }
480}
481
482impl Iterator for TreeEntriesIterator<'_> {
483    type Item = (RepoPathBuf, BackendResult<MergedTreeValue>);
484
485    fn next(&mut self) -> Option<Self::Item> {
486        while let Some(top) = self.stack.last_mut() {
487            if let Some((path, value)) = top.entries.pop() {
488                let maybe_trees = match value.to_tree_merge(&self.store, &path).block_on() {
489                    Ok(maybe_trees) => maybe_trees,
490                    Err(err) => return Some((path, Err(err))),
491                };
492                if let Some(trees) = maybe_trees {
493                    self.stack
494                        .push(TreeEntriesDirItem::new(&trees, self.matcher));
495                } else {
496                    return Some((path, Ok(value)));
497                }
498            } else {
499                self.stack.pop();
500            }
501        }
502        None
503    }
504}
505
506/// The state for the non-recursive iteration over the conflicted entries in a
507/// single directory.
508struct ConflictsDirItem {
509    entries: Vec<(RepoPathBuf, MergedTreeValue)>,
510}
511
512impl ConflictsDirItem {
513    fn new(trees: &Merge<Tree>, matcher: &dyn Matcher) -> Self {
514        if trees.is_resolved() {
515            return Self { entries: vec![] };
516        }
517
518        let dir = trees.first().dir();
519        let mut entries = vec![];
520        for (basename, value) in all_tree_entries(trees) {
521            if value.is_resolved() {
522                continue;
523            }
524            let path = dir.join(basename);
525            if value.is_tree() {
526                if matcher.visit(&path).is_nothing() {
527                    continue;
528                }
529            } else if !matcher.matches(&path) {
530                continue;
531            }
532            entries.push((path, value.cloned()));
533        }
534        entries.reverse();
535        Self { entries }
536    }
537}
538
539struct ConflictIterator<'matcher> {
540    store: Arc<Store>,
541    stack: Vec<ConflictsDirItem>,
542    matcher: &'matcher dyn Matcher,
543}
544
545impl<'matcher> ConflictIterator<'matcher> {
546    fn new(tree: &MergedTree, matcher: &'matcher dyn Matcher) -> Self {
547        Self {
548            store: tree.store().clone(),
549            stack: vec![ConflictsDirItem {
550                entries: vec![(RepoPathBuf::root(), tree.to_merged_tree_value())],
551            }],
552            matcher,
553        }
554    }
555}
556
557impl Iterator for ConflictIterator<'_> {
558    type Item = (RepoPathBuf, BackendResult<MergedTreeValue>);
559
560    fn next(&mut self) -> Option<Self::Item> {
561        while let Some(top) = self.stack.last_mut() {
562            if let Some((path, tree_values)) = top.entries.pop() {
563                match tree_values.to_tree_merge(&self.store, &path).block_on() {
564                    Ok(Some(trees)) => {
565                        // If all sides are trees or missing, descend into the merged tree
566                        self.stack.push(ConflictsDirItem::new(&trees, self.matcher));
567                    }
568                    Ok(None) => {
569                        // Otherwise this is a conflict between files, trees, etc. If they could
570                        // be automatically resolved, they should have been when the top-level
571                        // tree conflict was written, so we assume that they can't be.
572                        return Some((path, Ok(tree_values)));
573                    }
574                    Err(err) => {
575                        return Some((path, Err(err)));
576                    }
577                }
578            } else {
579                self.stack.pop();
580            }
581        }
582        None
583    }
584}
585
586/// Iterator over the differences between two trees.
587pub struct TreeDiffIterator<'matcher> {
588    store: Arc<Store>,
589    stack: Vec<TreeDiffDir>,
590    matcher: &'matcher dyn Matcher,
591}
592
593struct TreeDiffDir {
594    entries: Vec<(RepoPathBuf, Diff<MergedTreeValue>)>,
595}
596
597impl<'matcher> TreeDiffIterator<'matcher> {
598    /// Creates a iterator over the differences between two trees.
599    pub fn new(tree1: &MergedTree, tree2: &MergedTree, matcher: &'matcher dyn Matcher) -> Self {
600        assert!(Arc::ptr_eq(tree1.store(), tree2.store()));
601        let root_dir = RepoPath::root();
602        let mut stack = Vec::new();
603        let root_diff = Diff::new(tree1.to_merged_tree_value(), tree2.to_merged_tree_value());
604        if root_diff.is_changed() && !matcher.visit(root_dir).is_nothing() {
605            stack.push(TreeDiffDir {
606                entries: vec![(root_dir.to_owned(), root_diff)],
607            });
608        }
609        Self {
610            store: tree1.store().clone(),
611            stack,
612            matcher,
613        }
614    }
615
616    /// Gets the given trees if `values` are trees, otherwise an empty tree.
617    fn trees(
618        store: &Arc<Store>,
619        dir: &RepoPath,
620        values: &MergedTreeValue,
621    ) -> BackendResult<Merge<Tree>> {
622        if let Some(trees) = values.to_tree_merge(store, dir).block_on()? {
623            Ok(trees)
624        } else {
625            Ok(Merge::resolved(Tree::empty(store.clone(), dir.to_owned())))
626        }
627    }
628}
629
630impl TreeDiffDir {
631    fn from_trees(
632        dir: &RepoPath,
633        trees1: &Merge<Tree>,
634        trees2: &Merge<Tree>,
635        matcher: &dyn Matcher,
636    ) -> Self {
637        let mut entries = vec![];
638        for (name, diff) in merged_tree_entry_diff(trees1, trees2) {
639            let path = dir.join(name);
640            let tree_before = diff.before.is_tree();
641            let tree_after = diff.after.is_tree();
642            // Check if trees and files match, but only if either side is a tree or a file
643            // (don't query the matcher unnecessarily).
644            let tree_matches = (tree_before || tree_after) && !matcher.visit(&path).is_nothing();
645            let file_matches = (!tree_before || !tree_after) && matcher.matches(&path);
646
647            // Replace trees or files that don't match by `Merge::absent()`
648            let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
649                diff.before
650            } else {
651                Merge::absent()
652            };
653            let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
654                diff.after
655            } else {
656                Merge::absent()
657            };
658            if before.is_absent() && after.is_absent() {
659                continue;
660            }
661            entries.push((path, Diff::new(before.cloned(), after.cloned())));
662        }
663        entries.reverse();
664        Self { entries }
665    }
666}
667
668impl Iterator for TreeDiffIterator<'_> {
669    type Item = TreeDiffEntry;
670
671    fn next(&mut self) -> Option<Self::Item> {
672        while let Some(top) = self.stack.last_mut() {
673            let Some((path, diff)) = top.entries.pop() else {
674                self.stack.pop().unwrap();
675                continue;
676            };
677
678            if diff.before.is_tree() || diff.after.is_tree() {
679                let (before_tree, after_tree) = match (
680                    Self::trees(&self.store, &path, &diff.before),
681                    Self::trees(&self.store, &path, &diff.after),
682                ) {
683                    (Ok(before_tree), Ok(after_tree)) => (before_tree, after_tree),
684                    (Err(before_err), _) => {
685                        return Some(TreeDiffEntry {
686                            path,
687                            values: Err(before_err),
688                        });
689                    }
690                    (_, Err(after_err)) => {
691                        return Some(TreeDiffEntry {
692                            path,
693                            values: Err(after_err),
694                        });
695                    }
696                };
697                let subdir =
698                    TreeDiffDir::from_trees(&path, &before_tree, &after_tree, self.matcher);
699                self.stack.push(subdir);
700            }
701            if diff.before.is_file_like()
702                || diff.after.is_file_like()
703                || self.matcher.matches(&path)
704            {
705                return Some(TreeDiffEntry {
706                    path,
707                    values: Ok(diff),
708                });
709            }
710        }
711        None
712    }
713}
714
715/// Stream of differences between two trees.
716pub struct TreeDiffStreamImpl<'matcher> {
717    store: Arc<Store>,
718    matcher: &'matcher dyn Matcher,
719    /// Pairs of tree values that may or may not be ready to emit, sorted in the
720    /// order we want to emit them. If either side is a tree, there will be
721    /// a corresponding entry in `pending_trees`. The item is ready to emit
722    /// unless there's a smaller or equal path in `pending_trees`.
723    items: BTreeMap<RepoPathBuf, BackendResult<Diff<MergedTreeValue>>>,
724    // TODO: Is it better to combine this and `items` into a single map?
725    #[expect(clippy::type_complexity)]
726    pending_trees:
727        BTreeMap<RepoPathBuf, BoxFuture<'matcher, BackendResult<(Merge<Tree>, Merge<Tree>)>>>,
728    /// The maximum number of trees to request concurrently. However, we do the
729    /// accounting per path, so there will often be twice as many pending
730    /// `Backend::read_tree()` calls - for the "before" and "after" sides. For
731    /// conflicts, there will be even more.
732    max_concurrent_reads: usize,
733    /// The maximum number of items in `items`. However, we will always add the
734    /// full differences from a particular pair of trees, so it may temporarily
735    /// go over the limit (until we emit those items). It may also go over the
736    /// limit because we have a file item that's blocked by pending subdirectory
737    /// items.
738    max_queued_items: usize,
739}
740
741impl<'matcher> TreeDiffStreamImpl<'matcher> {
742    /// Creates a iterator over the differences between two trees. Generally
743    /// prefer `MergedTree::diff_stream()` of calling this directly.
744    pub fn new(
745        tree1: &MergedTree,
746        tree2: &MergedTree,
747        matcher: &'matcher dyn Matcher,
748        max_concurrent_reads: usize,
749    ) -> Self {
750        assert!(Arc::ptr_eq(tree1.store(), tree2.store()));
751        let store = tree1.store().clone();
752        let mut stream = Self {
753            store: store.clone(),
754            matcher,
755            items: BTreeMap::new(),
756            pending_trees: BTreeMap::new(),
757            max_concurrent_reads,
758            max_queued_items: 10000,
759        };
760        let dir = RepoPathBuf::root();
761        let merged_tree1 = tree1.to_merged_tree_value();
762        let merged_tree2 = tree2.to_merged_tree_value();
763        let root_diff = Diff::new(merged_tree1.clone(), merged_tree2.clone());
764        if root_diff.is_changed() && matcher.matches(&dir) {
765            stream.items.insert(dir.clone(), Ok(root_diff));
766        }
767        let root_tree_fut = Box::pin(try_join(
768            Self::trees(store.clone(), dir.clone(), merged_tree1),
769            Self::trees(store, dir.clone(), merged_tree2),
770        ));
771        stream.pending_trees.insert(dir, root_tree_fut);
772        stream
773    }
774
775    async fn single_tree(
776        store: &Arc<Store>,
777        dir: RepoPathBuf,
778        value: Option<&TreeValue>,
779    ) -> BackendResult<Tree> {
780        match value {
781            Some(TreeValue::Tree(tree_id)) => store.get_tree_async(dir, tree_id).await,
782            _ => Ok(Tree::empty(store.clone(), dir.clone())),
783        }
784    }
785
786    /// Gets the given trees if `values` are trees, otherwise an empty tree.
787    async fn trees(
788        store: Arc<Store>,
789        dir: RepoPathBuf,
790        values: MergedTreeValue,
791    ) -> BackendResult<Merge<Tree>> {
792        if values.is_tree() {
793            values
794                .try_map_async(|value| Self::single_tree(&store, dir.clone(), value.as_ref()))
795                .await
796        } else {
797            Ok(Merge::resolved(Tree::empty(store, dir)))
798        }
799    }
800
801    fn add_dir_diff_items(&mut self, dir: &RepoPath, trees1: &Merge<Tree>, trees2: &Merge<Tree>) {
802        for (basename, diff) in merged_tree_entry_diff(trees1, trees2) {
803            let path = dir.join(basename);
804            let tree_before = diff.before.is_tree();
805            let tree_after = diff.after.is_tree();
806            // Check if trees and files match, but only if either side is a tree or a file
807            // (don't query the matcher unnecessarily).
808            let tree_matches =
809                (tree_before || tree_after) && !self.matcher.visit(&path).is_nothing();
810            let file_matches = (!tree_before || !tree_after) && self.matcher.matches(&path);
811
812            // Replace trees or files that don't match by `Merge::absent()`
813            let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
814                diff.before
815            } else {
816                Merge::absent()
817            };
818            let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
819                diff.after
820            } else {
821                Merge::absent()
822            };
823            if before.is_absent() && after.is_absent() {
824                continue;
825            }
826
827            // If the path was a tree on either side of the diff, read those trees.
828            if tree_matches {
829                let before_tree_future =
830                    Self::trees(self.store.clone(), path.clone(), before.cloned());
831                let after_tree_future =
832                    Self::trees(self.store.clone(), path.clone(), after.cloned());
833                let both_trees_future = try_join(before_tree_future, after_tree_future);
834                self.pending_trees
835                    .insert(path.clone(), Box::pin(both_trees_future));
836            }
837
838            if file_matches || self.matcher.matches(&path) {
839                self.items
840                    .insert(path, Ok(Diff::new(before.cloned(), after.cloned())));
841            }
842        }
843    }
844
845    fn poll_tree_futures(&mut self, cx: &mut Context<'_>) {
846        loop {
847            let mut tree_diffs = vec![];
848            let mut some_pending = false;
849            let mut all_pending = true;
850            for (dir, future) in self
851                .pending_trees
852                .iter_mut()
853                .take(self.max_concurrent_reads)
854            {
855                if let Poll::Ready(tree_diff) = future.as_mut().poll(cx) {
856                    all_pending = false;
857                    tree_diffs.push((dir.clone(), tree_diff));
858                } else {
859                    some_pending = true;
860                }
861            }
862
863            for (dir, tree_diff) in tree_diffs {
864                drop(self.pending_trees.remove_entry(&dir).unwrap());
865                match tree_diff {
866                    Ok((trees1, trees2)) => {
867                        self.add_dir_diff_items(&dir, &trees1, &trees2);
868                    }
869                    Err(err) => {
870                        self.items.insert(dir, Err(err));
871                    }
872                }
873            }
874
875            // If none of the futures have been polled and returned `Poll::Pending`, we must
876            // not return. If we did, nothing would call the waker so we might never get
877            // polled again.
878            if all_pending || (some_pending && self.items.len() >= self.max_queued_items) {
879                return;
880            }
881        }
882    }
883}
884
885impl Stream for TreeDiffStreamImpl<'_> {
886    type Item = TreeDiffEntry;
887
888    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
889        // Go through all pending tree futures and poll them.
890        self.poll_tree_futures(cx);
891
892        // Now emit the first file, or the first tree that completed with an error
893        if let Some((path, _)) = self.items.first_key_value() {
894            // Check if there are any pending trees before this item that we need to finish
895            // polling before we can emit this item.
896            if let Some((dir, _)) = self.pending_trees.first_key_value()
897                && dir < path
898            {
899                return Poll::Pending;
900            }
901
902            let (path, values) = self.items.pop_first().unwrap();
903            Poll::Ready(Some(TreeDiffEntry { path, values }))
904        } else if self.pending_trees.is_empty() {
905            Poll::Ready(None)
906        } else {
907            Poll::Pending
908        }
909    }
910}
911
912fn stream_without_trees(stream: TreeDiffStream) -> TreeDiffStream {
913    Box::pin(stream.filter_map(|mut entry| async move {
914        let skip_tree = |merge: MergedTreeValue| {
915            if merge.is_tree() {
916                Merge::absent()
917            } else {
918                merge
919            }
920        };
921        entry.values = entry.values.map(|diff| diff.map(skip_tree));
922
923        // Filter out entries where neither side is present.
924        let any_present = entry.values.as_ref().map_or(true, |diff| {
925            diff.before.is_present() || diff.after.is_present()
926        });
927        any_present.then_some(entry)
928    }))
929}
930
931/// Adapts a `TreeDiffStream` to emit a added file at a given path after a
932/// removed directory at the same path.
933struct DiffStreamForFileSystem<'a> {
934    inner: TreeDiffStream<'a>,
935    next_item: Option<TreeDiffEntry>,
936    held_file: Option<TreeDiffEntry>,
937}
938
939impl<'a> DiffStreamForFileSystem<'a> {
940    fn new(inner: TreeDiffStream<'a>) -> Self {
941        Self {
942            inner,
943            next_item: None,
944            held_file: None,
945        }
946    }
947}
948
949impl Stream for DiffStreamForFileSystem<'_> {
950    type Item = TreeDiffEntry;
951
952    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
953        while let Some(next) = match self.next_item.take() {
954            Some(next) => Some(next),
955            None => ready!(self.inner.as_mut().poll_next(cx)),
956        } {
957            // Filter out changes where neither side (before or after) is_file_like.
958            // This ensures we only process file-level changes and transitions.
959            if let Ok(diff) = &next.values
960                && !diff.before.is_file_like()
961                && !diff.after.is_file_like()
962            {
963                continue;
964            }
965
966            // If there's a held file "foo" and the next item to emit is not "foo/...", then
967            // we must be done with the "foo/" directory and it's time to emit "foo" as a
968            // removed file.
969            if let Some(held_entry) = self
970                .held_file
971                .take_if(|held_entry| !next.path.starts_with(&held_entry.path))
972            {
973                self.next_item = Some(next);
974                return Poll::Ready(Some(held_entry));
975            }
976
977            match next.values {
978                Ok(diff) if diff.before.is_tree() => {
979                    assert!(diff.after.is_present());
980                    assert!(self.held_file.is_none());
981                    self.held_file = Some(TreeDiffEntry {
982                        path: next.path,
983                        values: Ok(Diff::new(Merge::absent(), diff.after)),
984                    });
985                }
986                Ok(diff) if diff.after.is_tree() => {
987                    assert!(diff.before.is_present());
988                    return Poll::Ready(Some(TreeDiffEntry {
989                        path: next.path,
990                        values: Ok(Diff::new(diff.before, Merge::absent())),
991                    }));
992                }
993                _ => {
994                    return Poll::Ready(Some(next));
995                }
996            }
997        }
998        Poll::Ready(self.held_file.take())
999    }
1000}