jj_lib/
merged_tree.rs

1// Copyright 2023 The Jujutsu Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A lazily merged view of a set of trees.
16
17use std::borrow::Borrow;
18use std::cmp::max;
19use std::collections::BTreeMap;
20use std::collections::HashSet;
21use std::iter;
22use std::iter::zip;
23use std::pin::Pin;
24use std::sync::Arc;
25use std::task::Context;
26use std::task::Poll;
27use std::task::ready;
28use std::vec;
29
30use either::Either;
31use futures::FutureExt as _;
32use futures::Stream;
33use futures::StreamExt as _;
34use futures::future::BoxFuture;
35use futures::future::try_join;
36use futures::future::try_join_all;
37use futures::stream::BoxStream;
38use futures::stream::FuturesUnordered;
39use itertools::EitherOrBoth;
40use itertools::Itertools as _;
41use pollster::FutureExt as _;
42
43use crate::backend;
44use crate::backend::BackendResult;
45use crate::backend::MergedTreeId;
46use crate::backend::TreeId;
47use crate::backend::TreeValue;
48use crate::copies::CopiesTreeDiffEntry;
49use crate::copies::CopiesTreeDiffStream;
50use crate::copies::CopyRecords;
51use crate::matchers::EverythingMatcher;
52use crate::matchers::Matcher;
53use crate::merge::Merge;
54use crate::merge::MergeBuilder;
55use crate::merge::MergedTreeVal;
56use crate::merge::MergedTreeValue;
57use crate::repo_path::RepoPath;
58use crate::repo_path::RepoPathBuf;
59use crate::repo_path::RepoPathComponent;
60use crate::repo_path::RepoPathComponentBuf;
61use crate::store::Store;
62use crate::tree::Tree;
63use crate::tree::try_resolve_file_conflict;
64use crate::tree_builder::TreeBuilder;
65
66/// Presents a view of a merged set of trees.
67#[derive(PartialEq, Eq, Clone, Debug)]
68pub struct MergedTree {
69    trees: Merge<Tree>,
70}
71
72impl MergedTree {
73    /// Creates a new `MergedTree` representing a single tree without conflicts.
74    pub fn resolved(tree: Tree) -> Self {
75        Self::new(Merge::resolved(tree))
76    }
77
78    /// Creates a new `MergedTree` representing a merge of a set of trees. The
79    /// individual trees must not have any conflicts.
80    pub fn new(trees: Merge<Tree>) -> Self {
81        debug_assert!(!trees.iter().any(|t| t.has_conflict()));
82        debug_assert!(trees.iter().map(|tree| tree.dir()).all_equal());
83        debug_assert!(
84            trees
85                .iter()
86                .map(|tree| Arc::as_ptr(tree.store()))
87                .all_equal()
88        );
89        Self { trees }
90    }
91
92    /// Takes a tree in the legacy format (with path-level conflicts in the
93    /// tree) and returns a `MergedTree` with any conflicts converted to
94    /// tree-level conflicts.
95    pub fn from_legacy_tree(tree: Tree) -> BackendResult<Self> {
96        let conflict_ids = tree.conflicts();
97        if conflict_ids.is_empty() {
98            return Ok(Self::resolved(tree));
99        }
100
101        // Find the number of removes and adds in the most complex conflict.
102        let mut max_tree_count = 1;
103        let store = tree.store();
104        let mut conflicts: Vec<(&RepoPath, MergedTreeValue)> = vec![];
105        for (path, conflict_id) in &conflict_ids {
106            let conflict = store.read_conflict(path, conflict_id)?;
107            max_tree_count = max(max_tree_count, conflict.iter().len());
108            conflicts.push((path, conflict));
109        }
110        let mut tree_builders = Vec::new();
111        tree_builders.resize_with(max_tree_count, || {
112            TreeBuilder::new(store.clone(), tree.id().clone())
113        });
114        for (path, conflict) in conflicts {
115            // If there are fewer terms in this conflict than in some other conflict, we can
116            // add canceling removes and adds of any value. The simplest value is an absent
117            // value, so we use that.
118            let terms_padded = conflict.into_iter().chain(iter::repeat(None));
119            for (builder, term) in zip(&mut tree_builders, terms_padded) {
120                builder.set_or_remove(path.to_owned(), term);
121            }
122        }
123
124        let new_trees: Vec<_> = tree_builders
125            .into_iter()
126            .map(|builder| {
127                let tree_id = builder.write_tree()?;
128                store.get_tree(RepoPathBuf::root(), &tree_id)
129            })
130            .try_collect()?;
131        Ok(Self {
132            trees: Merge::from_vec(new_trees),
133        })
134    }
135
136    /// Returns the underlying `Merge<Tree>`.
137    pub fn as_merge(&self) -> &Merge<Tree> {
138        &self.trees
139    }
140
141    /// Extracts the underlying `Merge<Tree>`.
142    pub fn take(self) -> Merge<Tree> {
143        self.trees
144    }
145
146    /// This tree's directory
147    pub fn dir(&self) -> &RepoPath {
148        self.trees.first().dir()
149    }
150
151    /// The `Store` associated with this tree.
152    pub fn store(&self) -> &Arc<Store> {
153        self.trees.first().store()
154    }
155
156    /// Base names of entries in this directory.
157    pub fn names<'a>(&'a self) -> Box<dyn Iterator<Item = &'a RepoPathComponent> + 'a> {
158        Box::new(all_tree_basenames(&self.trees))
159    }
160
161    /// The value at the given basename. The value can be `Resolved` even if
162    /// `self` is a `Merge`, which happens if the value at the path can be
163    /// trivially merged. Does not recurse, so if `basename` refers to a Tree,
164    /// then a `TreeValue::Tree` will be returned.
165    pub fn value(&self, basename: &RepoPathComponent) -> MergedTreeVal<'_> {
166        trees_value(&self.trees, basename)
167    }
168
169    /// Tries to resolve any conflicts, resolving any conflicts that can be
170    /// automatically resolved and leaving the rest unresolved.
171    pub async fn resolve(self) -> BackendResult<Self> {
172        let merged = merge_trees(self.trees).await?;
173        // If the result can be resolved, then `merge_trees()` above would have returned
174        // a resolved merge. However, that function will always preserve the arity of
175        // conflicts it cannot resolve. So we simplify the conflict again
176        // here to possibly reduce a complex conflict to a simpler one.
177        let simplified = merged.simplify();
178        // If debug assertions are enabled, check that the merge was idempotent. In
179        // particular,  that this last simplification doesn't enable further automatic
180        // resolutions
181        if cfg!(debug_assertions) {
182            let re_merged = merge_trees(simplified.clone()).await.unwrap();
183            debug_assert_eq!(re_merged, simplified);
184        }
185        Ok(Self { trees: simplified })
186    }
187
188    /// An iterator over the conflicts in this tree, including subtrees.
189    /// Recurses into subtrees and yields conflicts in those, but only if
190    /// all sides are trees, so tree/file conflicts will be reported as a single
191    /// conflict, not one for each path in the tree.
192    // TODO: Restrict this by a matcher (or add a separate method for that).
193    pub fn conflicts(
194        &self,
195    ) -> impl Iterator<Item = (RepoPathBuf, BackendResult<MergedTreeValue>)> + use<> {
196        ConflictIterator::new(self)
197    }
198
199    /// Whether this tree has conflicts.
200    pub fn has_conflict(&self) -> bool {
201        !self.trees.is_resolved()
202    }
203
204    /// Gets the `MergeTree` in a subdirectory of the current tree. If the path
205    /// doesn't correspond to a tree in any of the inputs to the merge, then
206    /// that entry will be replace by an empty tree in the result.
207    pub async fn sub_tree(&self, name: &RepoPathComponent) -> BackendResult<Option<Self>> {
208        match self.value(name).into_resolved() {
209            Ok(Some(TreeValue::Tree(sub_tree_id))) => {
210                let subdir = self.dir().join(name);
211                Ok(Some(Self::resolved(
212                    self.store().get_tree_async(subdir, sub_tree_id).await?,
213                )))
214            }
215            Ok(_) => Ok(None),
216            Err(merge) => {
217                if !merge.is_tree() {
218                    return Ok(None);
219                }
220                let trees = merge
221                    .try_map_async(async |value| match value {
222                        Some(TreeValue::Tree(sub_tree_id)) => {
223                            let subdir = self.dir().join(name);
224                            self.store().get_tree_async(subdir, sub_tree_id).await
225                        }
226                        Some(_) => unreachable!(),
227                        None => {
228                            let subdir = self.dir().join(name);
229                            Ok(Tree::empty(self.store().clone(), subdir))
230                        }
231                    })
232                    .await?;
233                Ok(Some(Self { trees }))
234            }
235        }
236    }
237
238    /// The value at the given path. The value can be `Resolved` even if
239    /// `self` is a `Conflict`, which happens if the value at the path can be
240    /// trivially merged.
241    pub fn path_value(&self, path: &RepoPath) -> BackendResult<MergedTreeValue> {
242        self.path_value_async(path).block_on()
243    }
244
245    /// Async version of `path_value()`.
246    pub async fn path_value_async(&self, path: &RepoPath) -> BackendResult<MergedTreeValue> {
247        assert_eq!(self.dir(), RepoPath::root());
248        match path.split() {
249            Some((dir, basename)) => match self.sub_tree_recursive(dir).await? {
250                None => Ok(Merge::absent()),
251                Some(tree) => Ok(tree.value(basename).cloned()),
252            },
253            None => Ok(self
254                .trees
255                .map(|tree| Some(TreeValue::Tree(tree.id().clone())))),
256        }
257    }
258
259    /// The tree's id
260    pub fn id(&self) -> MergedTreeId {
261        MergedTreeId::Merge(self.trees.map(|tree| tree.id().clone()))
262    }
263
264    /// Look up the tree at the given path.
265    pub async fn sub_tree_recursive(&self, path: &RepoPath) -> BackendResult<Option<Self>> {
266        let mut current_tree = self.clone();
267        for name in path.components() {
268            match current_tree.sub_tree(name).await? {
269                None => {
270                    return Ok(None);
271                }
272                Some(sub_tree) => {
273                    current_tree = sub_tree;
274                }
275            }
276        }
277        Ok(Some(current_tree))
278    }
279
280    /// Iterator over the entries matching the given matcher. Subtrees are
281    /// visited recursively. Subtrees that differ between the current
282    /// `MergedTree`'s terms are merged on the fly. Missing terms are treated as
283    /// empty directories. Subtrees that conflict with non-trees are not
284    /// visited. For example, if current tree is a merge of 3 trees, and the
285    /// entry for 'foo' is a conflict between a change subtree and a symlink
286    /// (i.e. the subdirectory was replaced by symlink in one side of the
287    /// conflict), then the entry for `foo` itself will be emitted, but no
288    /// entries from inside `foo/` from either of the trees will be.
289    pub fn entries(&self) -> TreeEntriesIterator<'static> {
290        self.entries_matching(&EverythingMatcher)
291    }
292
293    /// Like `entries()` but restricted by a matcher.
294    pub fn entries_matching<'matcher>(
295        &self,
296        matcher: &'matcher dyn Matcher,
297    ) -> TreeEntriesIterator<'matcher> {
298        TreeEntriesIterator::new(&self.trees, matcher)
299    }
300
301    /// Stream of the differences between this tree and another tree.
302    ///
303    /// Tree entries (`MergedTreeValue::is_tree()`) are included only if the
304    /// other side is present and not a tree.
305    fn diff_stream_internal<'matcher>(
306        &self,
307        other: &Self,
308        matcher: &'matcher dyn Matcher,
309    ) -> TreeDiffStream<'matcher> {
310        let concurrency = self.store().concurrency();
311        if concurrency <= 1 {
312            Box::pin(futures::stream::iter(TreeDiffIterator::new(
313                &self.trees,
314                &other.trees,
315                matcher,
316            )))
317        } else {
318            Box::pin(TreeDiffStreamImpl::new(
319                &self.trees,
320                &other.trees,
321                matcher,
322                concurrency,
323            ))
324        }
325    }
326
327    /// Stream of the differences between this tree and another tree.
328    pub fn diff_stream<'matcher>(
329        &self,
330        other: &Self,
331        matcher: &'matcher dyn Matcher,
332    ) -> TreeDiffStream<'matcher> {
333        stream_without_trees(self.diff_stream_internal(other, matcher))
334    }
335
336    /// Like `diff_stream()` but files in a removed tree will be returned before
337    /// a file that replaces it.
338    pub fn diff_stream_for_file_system<'matcher>(
339        &self,
340        other: &Self,
341        matcher: &'matcher dyn Matcher,
342    ) -> TreeDiffStream<'matcher> {
343        Box::pin(DiffStreamForFileSystem::new(
344            self.diff_stream_internal(other, matcher),
345        ))
346    }
347
348    /// Like `diff_stream()` but takes the given copy records into account.
349    pub fn diff_stream_with_copies<'a>(
350        &self,
351        other: &Self,
352        matcher: &'a dyn Matcher,
353        copy_records: &'a CopyRecords,
354    ) -> BoxStream<'a, CopiesTreeDiffEntry> {
355        let stream = self.diff_stream(other, matcher);
356        Box::pin(CopiesTreeDiffStream::new(
357            stream,
358            self.clone(),
359            other.clone(),
360            copy_records,
361        ))
362    }
363
364    /// Merges this tree with `other`, using `base` as base. Any conflicts will
365    /// be resolved recursively if possible.
366    pub async fn merge(self, base: Self, other: Self) -> BackendResult<Self> {
367        self.merge_no_resolve(base, other).resolve().await
368    }
369
370    /// Merges this tree with `other`, using `base` as base, without attempting
371    /// to resolve file conflicts.
372    pub fn merge_no_resolve(self, base: Self, other: Self) -> Self {
373        let nested = Merge::from_vec(vec![self.trees, base.trees, other.trees]);
374        Self {
375            trees: nested.flatten().simplify(),
376        }
377    }
378}
379
380/// A single entry in a tree diff.
381pub struct TreeDiffEntry {
382    /// The path.
383    pub path: RepoPathBuf,
384    /// The resolved tree values if available.
385    pub values: BackendResult<(MergedTreeValue, MergedTreeValue)>,
386}
387
388/// Type alias for the result from `MergedTree::diff_stream()`. We use a
389/// `Stream` instead of an `Iterator` so high-latency backends (e.g. cloud-based
390/// ones) can fetch trees asynchronously.
391pub type TreeDiffStream<'matcher> = BoxStream<'matcher, TreeDiffEntry>;
392
393fn all_tree_basenames(trees: &Merge<Tree>) -> impl Iterator<Item = &RepoPathComponent> {
394    trees
395        .iter()
396        .map(|tree| tree.data().names())
397        .kmerge()
398        .dedup()
399}
400
401fn all_tree_entries(
402    trees: &Merge<Tree>,
403) -> impl Iterator<Item = (&RepoPathComponent, MergedTreeVal<'_>)> {
404    if let Some(tree) = trees.as_resolved() {
405        let iter = tree
406            .entries_non_recursive()
407            .map(|entry| (entry.name(), Merge::normal(entry.value())));
408        Either::Left(iter)
409    } else {
410        let iter = all_merged_tree_entries(trees).map(|(name, values)| {
411            // TODO: move resolve_trivial() to caller?
412            let values = match values.resolve_trivial() {
413                Some(resolved) => Merge::resolved(*resolved),
414                None => values,
415            };
416            (name, values)
417        });
418        Either::Right(iter)
419    }
420}
421
422/// Suppose the given `trees` aren't resolved, iterates `(name, values)` pairs
423/// non-recursively. This also works if `trees` are resolved, but is more costly
424/// than `tree.entries_non_recursive()`.
425fn all_merged_tree_entries(
426    trees: &Merge<Tree>,
427) -> impl Iterator<Item = (&RepoPathComponent, MergedTreeVal<'_>)> {
428    let mut entries_iters = trees
429        .iter()
430        .map(|tree| tree.entries_non_recursive().peekable())
431        .collect_vec();
432    iter::from_fn(move || {
433        let next_name = entries_iters
434            .iter_mut()
435            .filter_map(|iter| iter.peek())
436            .map(|entry| entry.name())
437            .min()?;
438        let values: MergeBuilder<_> = entries_iters
439            .iter_mut()
440            .map(|iter| {
441                let entry = iter.next_if(|entry| entry.name() == next_name)?;
442                Some(entry.value())
443            })
444            .collect();
445        Some((next_name, values.build()))
446    })
447}
448
449fn merged_tree_entry_diff<'a>(
450    trees1: &'a Merge<Tree>,
451    trees2: &'a Merge<Tree>,
452) -> impl Iterator<Item = (&'a RepoPathComponent, MergedTreeVal<'a>, MergedTreeVal<'a>)> {
453    itertools::merge_join_by(
454        all_tree_entries(trees1),
455        all_tree_entries(trees2),
456        |(name1, _), (name2, _)| name1.cmp(name2),
457    )
458    .map(|entry| match entry {
459        EitherOrBoth::Both((name, value1), (_, value2)) => (name, value1, value2),
460        EitherOrBoth::Left((name, value1)) => (name, value1, Merge::absent()),
461        EitherOrBoth::Right((name, value2)) => (name, Merge::absent(), value2),
462    })
463    .filter(|(_, value1, value2)| value1 != value2)
464}
465
466fn trees_value<'a>(trees: &'a Merge<Tree>, basename: &RepoPathComponent) -> MergedTreeVal<'a> {
467    if let Some(tree) = trees.as_resolved() {
468        return Merge::resolved(tree.value(basename));
469    }
470    let value = trees.map(|tree| tree.value(basename));
471    if let Some(resolved) = value.resolve_trivial() {
472        return Merge::resolved(*resolved);
473    }
474    value
475}
476
477struct MergedTreeInput {
478    resolved: BTreeMap<RepoPathComponentBuf, TreeValue>,
479    /// Entries that we're currently waiting for data for in order to resolve
480    /// them. When this set becomes empty, we're ready to write the tree(s).
481    pending_lookup: HashSet<RepoPathComponentBuf>,
482    conflicts: BTreeMap<RepoPathComponentBuf, MergedTreeValue>,
483}
484
485impl MergedTreeInput {
486    fn new(resolved: BTreeMap<RepoPathComponentBuf, TreeValue>) -> Self {
487        Self {
488            resolved,
489            pending_lookup: HashSet::new(),
490            conflicts: BTreeMap::new(),
491        }
492    }
493
494    fn mark_completed(&mut self, basename: RepoPathComponentBuf, value: MergedTreeValue) {
495        let was_pending = self.pending_lookup.remove(&basename);
496        assert!(was_pending, "No pending lookup for {basename:?}");
497        if let Some(resolved) = value.resolve_trivial() {
498            if let Some(resolved) = resolved.as_ref() {
499                self.resolved.insert(basename, resolved.clone());
500            }
501        } else {
502            self.conflicts.insert(basename, value);
503        }
504    }
505
506    fn into_backend_trees(self) -> Merge<backend::Tree> {
507        assert!(self.pending_lookup.is_empty());
508
509        fn by_name(
510            (name1, _): &(RepoPathComponentBuf, TreeValue),
511            (name2, _): &(RepoPathComponentBuf, TreeValue),
512        ) -> bool {
513            name1 < name2
514        }
515
516        if self.conflicts.is_empty() {
517            let all_entries = self.resolved.into_iter().collect();
518            Merge::resolved(backend::Tree::from_sorted_entries(all_entries))
519        } else {
520            // Create a Merge with the conflict entries for each side.
521            let mut conflict_entries = self.conflicts.first_key_value().unwrap().1.map(|_| vec![]);
522            for (basename, value) in self.conflicts {
523                assert_eq!(value.num_sides(), conflict_entries.num_sides());
524                for (entries, value) in conflict_entries.iter_mut().zip(value.into_iter()) {
525                    if let Some(value) = value {
526                        entries.push((basename.clone(), value));
527                    }
528                }
529            }
530
531            let mut backend_trees = vec![];
532            for entries in conflict_entries.into_iter() {
533                let backend_tree = backend::Tree::from_sorted_entries(
534                    self.resolved
535                        .iter()
536                        .map(|(name, value)| (name.clone(), value.clone()))
537                        .merge_by(entries, by_name)
538                        .collect(),
539                );
540                backend_trees.push(backend_tree);
541            }
542            Merge::from_vec(backend_trees)
543        }
544    }
545}
546
547/// The result from an asynchronously scheduled work item.
548enum TreeMergerWorkOutput {
549    /// Trees that have been read (i.e. `Read` is past tense)
550    ReadTrees {
551        dir: RepoPathBuf,
552        result: BackendResult<Merge<Tree>>,
553    },
554    WrittenTrees {
555        dir: RepoPathBuf,
556        result: BackendResult<Merge<Tree>>,
557    },
558    MergedFiles {
559        path: RepoPathBuf,
560        result: BackendResult<MergedTreeValue>,
561    },
562}
563
564#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
565enum TreeMergeWorkItemKey {
566    // `MergeFiles` variant before `ReadTrees` so files are polled before trees because they
567    // typically take longer to process.
568    MergeFiles { path: RepoPathBuf },
569    ReadTrees { dir: RepoPathBuf },
570}
571
572struct TreeMerger {
573    store: Arc<Store>,
574    // Trees we're currently working on.
575    trees_to_resolve: BTreeMap<RepoPathBuf, MergedTreeInput>,
576    // Futures we're currently processing. In order to respect the backend's concurrency limit.
577    work: FuturesUnordered<BoxFuture<'static, TreeMergerWorkOutput>>,
578    // Futures we haven't started polling yet, in order to respect the backend's concurrency limit.
579    unstarted_work: BTreeMap<TreeMergeWorkItemKey, BoxFuture<'static, TreeMergerWorkOutput>>,
580}
581
582impl TreeMerger {
583    async fn merge(mut self) -> BackendResult<Merge<Tree>> {
584        while let Some(work_item) = self.work.next().await {
585            match work_item {
586                TreeMergerWorkOutput::ReadTrees { dir, result } => {
587                    let tree = result?;
588                    self.process_tree(dir, tree);
589                }
590                TreeMergerWorkOutput::WrittenTrees { dir, result } => {
591                    let tree = result?;
592                    if dir.is_root() {
593                        assert!(self.trees_to_resolve.is_empty());
594                        assert!(self.work.is_empty());
595                        assert!(self.unstarted_work.is_empty());
596                        return Ok(tree);
597                    }
598                    // Propagate the write to the parent tree, replacing empty trees by `None`.
599                    let new_value = tree.map(|tree| {
600                        (tree.id() != self.store.empty_tree_id())
601                            .then(|| TreeValue::Tree(tree.id().clone()))
602                    });
603                    self.mark_completed(&dir, new_value);
604                }
605                TreeMergerWorkOutput::MergedFiles { path, result } => {
606                    let value = result?;
607                    self.mark_completed(&path, value);
608                }
609            }
610
611            while self.work.len() < self.store.concurrency() {
612                if let Some((_key, work)) = self.unstarted_work.pop_first() {
613                    self.work.push(work);
614                } else {
615                    break;
616                }
617            }
618        }
619
620        unreachable!("There was no work item for writing the root tree");
621    }
622
623    fn process_tree(&mut self, dir: RepoPathBuf, tree: Merge<Tree>) {
624        // First resolve trivial merges (those that we don't need to load any more data
625        // for)
626        let mut resolved = vec![];
627        let mut non_trivial = vec![];
628        for (basename, path_merge) in all_merged_tree_entries(&tree) {
629            if let Some(value) = path_merge.resolve_trivial() {
630                if let Some(value) = value.cloned() {
631                    resolved.push((basename.to_owned(), value));
632                }
633            } else {
634                non_trivial.push((basename.to_owned(), path_merge.cloned()));
635            }
636        }
637
638        // If there are no non-trivial merges, we can write the tree now.
639        if non_trivial.is_empty() {
640            let backend_trees = Merge::resolved(backend::Tree::from_sorted_entries(resolved));
641            self.enqueue_tree_write(dir, backend_trees);
642            return;
643        }
644
645        let mut unmerged_tree = MergedTreeInput::new(resolved.into_iter().collect());
646        for (basename, value) in non_trivial {
647            let path = dir.join(&basename);
648            unmerged_tree.pending_lookup.insert(basename);
649            if value.is_tree() {
650                self.enqueue_tree_read(path, value);
651            } else {
652                // TODO: If it's e.g. a dir/file conflict, there's no need to try to
653                // resolve it as a file. We should mark them to
654                // `unmerged_tree.conflicts` instead.
655                self.enqueue_file_merge(path, value);
656            }
657        }
658
659        self.trees_to_resolve.insert(dir, unmerged_tree);
660    }
661
662    fn enqueue_tree_read(&mut self, dir: RepoPathBuf, value: MergedTreeValue) {
663        let key = TreeMergeWorkItemKey::ReadTrees { dir: dir.clone() };
664        let work_fut = read_trees(self.store.clone(), dir.clone(), value)
665            .map(|result| TreeMergerWorkOutput::ReadTrees { dir, result });
666        if self.work.len() < self.store.concurrency() {
667            self.work.push(Box::pin(work_fut));
668        } else {
669            self.unstarted_work.insert(key, Box::pin(work_fut));
670        }
671    }
672
673    fn enqueue_tree_write(&mut self, dir: RepoPathBuf, backend_trees: Merge<backend::Tree>) {
674        let work_fut = write_trees(self.store.clone(), dir.clone(), backend_trees)
675            .map(|result| TreeMergerWorkOutput::WrittenTrees { dir, result });
676        // Bypass the `unstarted_work` queue because writing trees usually results in
677        // saving memory (each tree gets replaced by a `TreeValue::Tree`)
678        self.work.push(Box::pin(work_fut));
679    }
680
681    fn enqueue_file_merge(&mut self, path: RepoPathBuf, value: MergedTreeValue) {
682        let key = TreeMergeWorkItemKey::MergeFiles { path: path.clone() };
683        let work_fut = resolve_file_values_owned(self.store.clone(), path.clone(), value)
684            .map(|result| TreeMergerWorkOutput::MergedFiles { path, result });
685        if self.work.len() < self.store.concurrency() {
686            self.work.push(Box::pin(work_fut));
687        } else {
688            self.unstarted_work.insert(key, Box::pin(work_fut));
689        }
690    }
691
692    fn mark_completed(&mut self, path: &RepoPath, value: MergedTreeValue) {
693        let (dir, basename) = path.split().unwrap();
694        let tree = self.trees_to_resolve.get_mut(dir).unwrap();
695        tree.mark_completed(basename.to_owned(), value);
696        // If all entries in this tree have been processed (either resolved or still a
697        // conflict), schedule the writing of the tree(s) to the backend.
698        if tree.pending_lookup.is_empty() {
699            let tree = self.trees_to_resolve.remove(dir).unwrap();
700            self.enqueue_tree_write(dir.to_owned(), tree.into_backend_trees());
701        }
702    }
703}
704
705async fn read_trees(
706    store: Arc<Store>,
707    dir: RepoPathBuf,
708    value: MergedTreeValue,
709) -> BackendResult<Merge<Tree>> {
710    let trees = value
711        .to_tree_merge(&store, &dir)
712        .await?
713        .expect("Should be tree merge");
714    Ok(trees)
715}
716
717async fn write_trees(
718    store: Arc<Store>,
719    dir: RepoPathBuf,
720    backend_trees: Merge<backend::Tree>,
721) -> BackendResult<Merge<Tree>> {
722    // TODO: Could use `backend_trees.try_map_async()` here if it took `self` by
723    // value or if `Backend::write_tree()` to an `Arc<backend::Tree>`.
724    let trees = try_join_all(
725        backend_trees
726            .into_iter()
727            .map(|backend_tree| store.write_tree(&dir, backend_tree)),
728    )
729    .await?;
730    Ok(Merge::from_vec(trees))
731}
732
733async fn resolve_file_values_owned(
734    store: Arc<Store>,
735    path: RepoPathBuf,
736    values: MergedTreeValue,
737) -> BackendResult<MergedTreeValue> {
738    let maybe_resolved = try_resolve_file_values(&store, &path, &values).await?;
739    Ok(maybe_resolved.unwrap_or(values))
740}
741
742/// The returned conflict will either be resolved or have the same number of
743/// sides as the input.
744async fn merge_trees(merge: Merge<Tree>) -> BackendResult<Merge<Tree>> {
745    let merge = match merge.into_resolved() {
746        Ok(tree) => return Ok(Merge::resolved(tree)),
747        Err(merge) => merge,
748    };
749
750    let store = merge.first().store().clone();
751    let merger = TreeMerger {
752        store,
753        trees_to_resolve: BTreeMap::new(),
754        work: FuturesUnordered::new(),
755        unstarted_work: BTreeMap::new(),
756    };
757    merger.work.push(Box::pin(std::future::ready(
758        TreeMergerWorkOutput::ReadTrees {
759            dir: RepoPathBuf::root(),
760            result: Ok(merge),
761        },
762    )));
763    merger.merge().await
764}
765
766/// Tries to resolve file conflicts by merging the file contents. Treats missing
767/// files as empty. If the file conflict cannot be resolved, returns the passed
768/// `values` unmodified.
769pub async fn resolve_file_values(
770    store: &Arc<Store>,
771    path: &RepoPath,
772    values: MergedTreeValue,
773) -> BackendResult<MergedTreeValue> {
774    if let Some(resolved) = values.resolve_trivial() {
775        return Ok(Merge::resolved(resolved.clone()));
776    }
777
778    let maybe_resolved = try_resolve_file_values(store, path, &values).await?;
779    Ok(maybe_resolved.unwrap_or(values))
780}
781
782async fn try_resolve_file_values<T: Borrow<TreeValue>>(
783    store: &Arc<Store>,
784    path: &RepoPath,
785    values: &Merge<Option<T>>,
786) -> BackendResult<Option<MergedTreeValue>> {
787    // The values may contain trees canceling each other (notably padded absent
788    // trees), so we need to simplify them first.
789    let simplified = values
790        .map(|value| value.as_ref().map(Borrow::borrow))
791        .simplify();
792    // No fast path for simplified.is_resolved(). If it could be resolved, it would
793    // have been caught by values.resolve_trivial() above.
794    if let Some(resolved) = try_resolve_file_conflict(store, path, &simplified).await? {
795        Ok(Some(Merge::normal(resolved)))
796    } else {
797        // Failed to merge the files, or the paths are not files
798        Ok(None)
799    }
800}
801
802/// Recursive iterator over the entries in a tree.
803pub struct TreeEntriesIterator<'matcher> {
804    store: Arc<Store>,
805    stack: Vec<TreeEntriesDirItem>,
806    matcher: &'matcher dyn Matcher,
807}
808
809struct TreeEntriesDirItem {
810    entries: Vec<(RepoPathBuf, MergedTreeValue)>,
811}
812
813impl TreeEntriesDirItem {
814    fn new(trees: &Merge<Tree>, matcher: &dyn Matcher) -> Self {
815        let mut entries = vec![];
816        let dir = trees.first().dir();
817        for (name, value) in all_tree_entries(trees) {
818            let path = dir.join(name);
819            if value.is_tree() {
820                // TODO: Handle the other cases (specific files and trees)
821                if matcher.visit(&path).is_nothing() {
822                    continue;
823                }
824            } else if !matcher.matches(&path) {
825                continue;
826            }
827            entries.push((path, value.cloned()));
828        }
829        entries.reverse();
830        Self { entries }
831    }
832}
833
834impl<'matcher> TreeEntriesIterator<'matcher> {
835    fn new(trees: &Merge<Tree>, matcher: &'matcher dyn Matcher) -> Self {
836        Self {
837            store: trees.first().store().clone(),
838            stack: vec![TreeEntriesDirItem::new(trees, matcher)],
839            matcher,
840        }
841    }
842}
843
844impl Iterator for TreeEntriesIterator<'_> {
845    type Item = (RepoPathBuf, BackendResult<MergedTreeValue>);
846
847    fn next(&mut self) -> Option<Self::Item> {
848        while let Some(top) = self.stack.last_mut() {
849            if let Some((path, value)) = top.entries.pop() {
850                let maybe_trees = match value.to_tree_merge(&self.store, &path).block_on() {
851                    Ok(maybe_trees) => maybe_trees,
852                    Err(err) => return Some((path, Err(err))),
853                };
854                if let Some(trees) = maybe_trees {
855                    self.stack
856                        .push(TreeEntriesDirItem::new(&trees, self.matcher));
857                } else {
858                    return Some((path, Ok(value)));
859                }
860            } else {
861                self.stack.pop();
862            }
863        }
864        None
865    }
866}
867
868/// The state for the non-recursive iteration over the conflicted entries in a
869/// single directory.
870struct ConflictsDirItem {
871    entries: Vec<(RepoPathBuf, MergedTreeValue)>,
872}
873
874impl From<&Merge<Tree>> for ConflictsDirItem {
875    fn from(trees: &Merge<Tree>) -> Self {
876        let dir = trees.first().dir();
877        if trees.is_resolved() {
878            return Self { entries: vec![] };
879        }
880
881        let mut entries = vec![];
882        for (basename, value) in all_tree_entries(trees) {
883            if !value.is_resolved() {
884                entries.push((dir.join(basename), value.cloned()));
885            }
886        }
887        entries.reverse();
888        Self { entries }
889    }
890}
891
892struct ConflictIterator {
893    store: Arc<Store>,
894    stack: Vec<ConflictsDirItem>,
895}
896
897impl ConflictIterator {
898    fn new(tree: &MergedTree) -> Self {
899        Self {
900            store: tree.store().clone(),
901            stack: vec![ConflictsDirItem::from(&tree.trees)],
902        }
903    }
904}
905
906impl Iterator for ConflictIterator {
907    type Item = (RepoPathBuf, BackendResult<MergedTreeValue>);
908
909    fn next(&mut self) -> Option<Self::Item> {
910        while let Some(top) = self.stack.last_mut() {
911            if let Some((path, tree_values)) = top.entries.pop() {
912                match tree_values.to_tree_merge(&self.store, &path).block_on() {
913                    Ok(Some(trees)) => {
914                        // If all sides are trees or missing, descend into the merged tree
915                        self.stack.push(ConflictsDirItem::from(&trees));
916                    }
917                    Ok(None) => {
918                        // Otherwise this is a conflict between files, trees, etc. If they could
919                        // be automatically resolved, they should have been when the top-level
920                        // tree conflict was written, so we assume that they can't be.
921                        return Some((path, Ok(tree_values)));
922                    }
923                    Err(err) => {
924                        return Some((path, Err(err)));
925                    }
926                }
927            } else {
928                self.stack.pop();
929            }
930        }
931        None
932    }
933}
934
935/// Iterator over the differences between two trees.
936///
937/// Tree entries (`MergedTreeValue::is_tree()`) are included only if the other
938/// side is present and not a tree.
939pub struct TreeDiffIterator<'matcher> {
940    store: Arc<Store>,
941    stack: Vec<TreeDiffDir>,
942    matcher: &'matcher dyn Matcher,
943}
944
945struct TreeDiffDir {
946    entries: Vec<(RepoPathBuf, MergedTreeValue, MergedTreeValue)>,
947}
948
949impl<'matcher> TreeDiffIterator<'matcher> {
950    /// Creates a iterator over the differences between two trees.
951    pub fn new(trees1: &Merge<Tree>, trees2: &Merge<Tree>, matcher: &'matcher dyn Matcher) -> Self {
952        assert!(Arc::ptr_eq(trees1.first().store(), trees2.first().store()));
953        let root_dir = RepoPath::root();
954        let mut stack = Vec::new();
955        if !matcher.visit(root_dir).is_nothing() {
956            stack.push(TreeDiffDir::from_trees(root_dir, trees1, trees2, matcher));
957        };
958        Self {
959            store: trees1.first().store().clone(),
960            stack,
961            matcher,
962        }
963    }
964
965    /// Gets the given trees if `values` are trees, otherwise an empty tree.
966    fn trees(
967        store: &Arc<Store>,
968        dir: &RepoPath,
969        values: &MergedTreeValue,
970    ) -> BackendResult<Merge<Tree>> {
971        if let Some(trees) = values.to_tree_merge(store, dir).block_on()? {
972            Ok(trees)
973        } else {
974            Ok(Merge::resolved(Tree::empty(store.clone(), dir.to_owned())))
975        }
976    }
977}
978
979impl TreeDiffDir {
980    fn from_trees(
981        dir: &RepoPath,
982        trees1: &Merge<Tree>,
983        trees2: &Merge<Tree>,
984        matcher: &dyn Matcher,
985    ) -> Self {
986        let mut entries = vec![];
987        for (name, before, after) in merged_tree_entry_diff(trees1, trees2) {
988            let path = dir.join(name);
989            let tree_before = before.is_tree();
990            let tree_after = after.is_tree();
991            // Check if trees and files match, but only if either side is a tree or a file
992            // (don't query the matcher unnecessarily).
993            let tree_matches = (tree_before || tree_after) && !matcher.visit(&path).is_nothing();
994            let file_matches = (!tree_before || !tree_after) && matcher.matches(&path);
995
996            // Replace trees or files that don't match by `Merge::absent()`
997            let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
998                before
999            } else {
1000                Merge::absent()
1001            };
1002            let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
1003                after
1004            } else {
1005                Merge::absent()
1006            };
1007            if before.is_absent() && after.is_absent() {
1008                continue;
1009            }
1010            entries.push((path, before.cloned(), after.cloned()));
1011        }
1012        entries.reverse();
1013        Self { entries }
1014    }
1015}
1016
1017impl Iterator for TreeDiffIterator<'_> {
1018    type Item = TreeDiffEntry;
1019
1020    fn next(&mut self) -> Option<Self::Item> {
1021        while let Some(top) = self.stack.last_mut() {
1022            let (path, before, after) = match top.entries.pop() {
1023                Some(entry) => entry,
1024                None => {
1025                    self.stack.pop().unwrap();
1026                    continue;
1027                }
1028            };
1029
1030            if before.is_tree() || after.is_tree() {
1031                let (before_tree, after_tree) = match (
1032                    Self::trees(&self.store, &path, &before),
1033                    Self::trees(&self.store, &path, &after),
1034                ) {
1035                    (Ok(before_tree), Ok(after_tree)) => (before_tree, after_tree),
1036                    (Err(before_err), _) => {
1037                        return Some(TreeDiffEntry {
1038                            path,
1039                            values: Err(before_err),
1040                        });
1041                    }
1042                    (_, Err(after_err)) => {
1043                        return Some(TreeDiffEntry {
1044                            path,
1045                            values: Err(after_err),
1046                        });
1047                    }
1048                };
1049                let subdir =
1050                    TreeDiffDir::from_trees(&path, &before_tree, &after_tree, self.matcher);
1051                self.stack.push(subdir);
1052            };
1053            if before.is_file_like() || after.is_file_like() {
1054                return Some(TreeDiffEntry {
1055                    path,
1056                    values: Ok((before, after)),
1057                });
1058            }
1059        }
1060        None
1061    }
1062}
1063
1064/// Stream of differences between two trees.
1065///
1066/// Tree entries (`MergedTreeValue::is_tree()`) are included only if the other
1067/// side is present and not a tree.
1068pub struct TreeDiffStreamImpl<'matcher> {
1069    store: Arc<Store>,
1070    matcher: &'matcher dyn Matcher,
1071    /// Pairs of tree values that may or may not be ready to emit, sorted in the
1072    /// order we want to emit them. If either side is a tree, there will be
1073    /// a corresponding entry in `pending_trees`. The item is ready to emit
1074    /// unless there's a smaller or equal path in `pending_trees`.
1075    items: BTreeMap<RepoPathBuf, BackendResult<(MergedTreeValue, MergedTreeValue)>>,
1076    // TODO: Is it better to combine this and `items` into a single map?
1077    #[expect(clippy::type_complexity)]
1078    pending_trees:
1079        BTreeMap<RepoPathBuf, BoxFuture<'matcher, BackendResult<(Merge<Tree>, Merge<Tree>)>>>,
1080    /// The maximum number of trees to request concurrently. However, we do the
1081    /// accounting per path, so there will often be twice as many pending
1082    /// `Backend::read_tree()` calls - for the "before" and "after" sides. For
1083    /// conflicts, there will be even more.
1084    max_concurrent_reads: usize,
1085    /// The maximum number of items in `items`. However, we will always add the
1086    /// full differences from a particular pair of trees, so it may temporarily
1087    /// go over the limit (until we emit those items). It may also go over the
1088    /// limit because we have a file item that's blocked by pending subdirectory
1089    /// items.
1090    max_queued_items: usize,
1091}
1092
1093impl<'matcher> TreeDiffStreamImpl<'matcher> {
1094    /// Creates a iterator over the differences between two trees. Generally
1095    /// prefer `MergedTree::diff_stream()` of calling this directly.
1096    pub fn new(
1097        trees1: &Merge<Tree>,
1098        trees2: &Merge<Tree>,
1099        matcher: &'matcher dyn Matcher,
1100        max_concurrent_reads: usize,
1101    ) -> Self {
1102        assert!(Arc::ptr_eq(trees1.first().store(), trees2.first().store()));
1103        let mut stream = Self {
1104            store: trees1.first().store().clone(),
1105            matcher,
1106            items: BTreeMap::new(),
1107            pending_trees: BTreeMap::new(),
1108            max_concurrent_reads,
1109            max_queued_items: 10000,
1110        };
1111        stream.add_dir_diff_items(RepoPath::root(), trees1, trees2);
1112        stream
1113    }
1114
1115    async fn single_tree(
1116        store: &Arc<Store>,
1117        dir: RepoPathBuf,
1118        value: Option<&TreeValue>,
1119    ) -> BackendResult<Tree> {
1120        match value {
1121            Some(TreeValue::Tree(tree_id)) => store.get_tree_async(dir, tree_id).await,
1122            _ => Ok(Tree::empty(store.clone(), dir.clone())),
1123        }
1124    }
1125
1126    /// Gets the given trees if `values` are trees, otherwise an empty tree.
1127    async fn trees(
1128        store: Arc<Store>,
1129        dir: RepoPathBuf,
1130        values: MergedTreeValue,
1131    ) -> BackendResult<Merge<Tree>> {
1132        if values.is_tree() {
1133            values
1134                .try_map_async(|value| Self::single_tree(&store, dir.clone(), value.as_ref()))
1135                .await
1136        } else {
1137            Ok(Merge::resolved(Tree::empty(store, dir)))
1138        }
1139    }
1140
1141    fn add_dir_diff_items(&mut self, dir: &RepoPath, trees1: &Merge<Tree>, trees2: &Merge<Tree>) {
1142        for (basename, before, after) in merged_tree_entry_diff(trees1, trees2) {
1143            let path = dir.join(basename);
1144            let tree_before = before.is_tree();
1145            let tree_after = after.is_tree();
1146            // Check if trees and files match, but only if either side is a tree or a file
1147            // (don't query the matcher unnecessarily).
1148            let tree_matches =
1149                (tree_before || tree_after) && !self.matcher.visit(&path).is_nothing();
1150            let file_matches = (!tree_before || !tree_after) && self.matcher.matches(&path);
1151
1152            // Replace trees or files that don't match by `Merge::absent()`
1153            let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
1154                before
1155            } else {
1156                Merge::absent()
1157            };
1158            let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
1159                after
1160            } else {
1161                Merge::absent()
1162            };
1163            if before.is_absent() && after.is_absent() {
1164                continue;
1165            }
1166
1167            // If the path was a tree on either side of the diff, read those trees.
1168            if tree_matches {
1169                let before_tree_future =
1170                    Self::trees(self.store.clone(), path.clone(), before.cloned());
1171                let after_tree_future =
1172                    Self::trees(self.store.clone(), path.clone(), after.cloned());
1173                let both_trees_future = try_join(before_tree_future, after_tree_future);
1174                self.pending_trees
1175                    .insert(path.clone(), Box::pin(both_trees_future));
1176            }
1177
1178            if before.is_file_like() || after.is_file_like() {
1179                self.items
1180                    .insert(path, Ok((before.cloned(), after.cloned())));
1181            }
1182        }
1183    }
1184
1185    fn poll_tree_futures(&mut self, cx: &mut Context<'_>) {
1186        loop {
1187            let mut tree_diffs = vec![];
1188            let mut some_pending = false;
1189            let mut all_pending = true;
1190            for (dir, future) in self
1191                .pending_trees
1192                .iter_mut()
1193                .take(self.max_concurrent_reads)
1194            {
1195                if let Poll::Ready(tree_diff) = future.as_mut().poll(cx) {
1196                    all_pending = false;
1197                    tree_diffs.push((dir.clone(), tree_diff));
1198                } else {
1199                    some_pending = true;
1200                }
1201            }
1202
1203            for (dir, tree_diff) in tree_diffs {
1204                let _ = self.pending_trees.remove_entry(&dir).unwrap();
1205                match tree_diff {
1206                    Ok((trees1, trees2)) => {
1207                        self.add_dir_diff_items(&dir, &trees1, &trees2);
1208                    }
1209                    Err(err) => {
1210                        self.items.insert(dir, Err(err));
1211                    }
1212                }
1213            }
1214
1215            // If none of the futures have been polled and returned `Poll::Pending`, we must
1216            // not return. If we did, nothing would call the waker so we might never get
1217            // polled again.
1218            if all_pending || (some_pending && self.items.len() >= self.max_queued_items) {
1219                return;
1220            }
1221        }
1222    }
1223}
1224
1225impl Stream for TreeDiffStreamImpl<'_> {
1226    type Item = TreeDiffEntry;
1227
1228    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1229        // Go through all pending tree futures and poll them.
1230        self.poll_tree_futures(cx);
1231
1232        // Now emit the first file, or the first tree that completed with an error
1233        if let Some((path, _)) = self.items.first_key_value() {
1234            // Check if there are any pending trees before this item that we need to finish
1235            // polling before we can emit this item.
1236            if let Some((dir, _)) = self.pending_trees.first_key_value() {
1237                if dir < path {
1238                    return Poll::Pending;
1239                }
1240            }
1241
1242            let (path, values) = self.items.pop_first().unwrap();
1243            Poll::Ready(Some(TreeDiffEntry { path, values }))
1244        } else if self.pending_trees.is_empty() {
1245            Poll::Ready(None)
1246        } else {
1247            Poll::Pending
1248        }
1249    }
1250}
1251
1252fn stream_without_trees(stream: TreeDiffStream) -> TreeDiffStream {
1253    Box::pin(stream.map(|mut entry| {
1254        let skip_tree = |merge: MergedTreeValue| {
1255            if merge.is_tree() {
1256                Merge::absent()
1257            } else {
1258                merge
1259            }
1260        };
1261        entry.values = entry
1262            .values
1263            .map(|(before, after)| (skip_tree(before), skip_tree(after)));
1264        entry
1265    }))
1266}
1267
1268/// Adapts a `TreeDiffStream` to emit a added file at a given path after a
1269/// removed directory at the same path.
1270struct DiffStreamForFileSystem<'a> {
1271    inner: TreeDiffStream<'a>,
1272    next_item: Option<TreeDiffEntry>,
1273    held_file: Option<TreeDiffEntry>,
1274}
1275
1276impl<'a> DiffStreamForFileSystem<'a> {
1277    fn new(inner: TreeDiffStream<'a>) -> Self {
1278        Self {
1279            inner,
1280            next_item: None,
1281            held_file: None,
1282        }
1283    }
1284}
1285
1286impl Stream for DiffStreamForFileSystem<'_> {
1287    type Item = TreeDiffEntry;
1288
1289    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1290        while let Some(next) = match self.next_item.take() {
1291            Some(next) => Some(next),
1292            None => ready!(self.inner.as_mut().poll_next(cx)),
1293        } {
1294            // If there's a held file "foo" and the next item to emit is not "foo/...", then
1295            // we must be done with the "foo/" directory and it's time to emit "foo" as a
1296            // removed file.
1297            if let Some(held_entry) = self
1298                .held_file
1299                .take_if(|held_entry| !next.path.starts_with(&held_entry.path))
1300            {
1301                self.next_item = Some(next);
1302                return Poll::Ready(Some(held_entry));
1303            }
1304
1305            match next.values {
1306                Ok((before, after)) if before.is_tree() => {
1307                    assert!(after.is_present());
1308                    assert!(self.held_file.is_none());
1309                    self.held_file = Some(TreeDiffEntry {
1310                        path: next.path,
1311                        values: Ok((Merge::absent(), after)),
1312                    });
1313                }
1314                Ok((before, after)) if after.is_tree() => {
1315                    assert!(before.is_present());
1316                    return Poll::Ready(Some(TreeDiffEntry {
1317                        path: next.path,
1318                        values: Ok((before, Merge::absent())),
1319                    }));
1320                }
1321                _ => {
1322                    return Poll::Ready(Some(next));
1323                }
1324            }
1325        }
1326        Poll::Ready(self.held_file.take())
1327    }
1328}
1329
1330/// Helper for writing trees with conflicts.
1331///
1332/// You start by creating an instance of this type with one or more
1333/// base trees. You then add overrides on top. The overrides may be
1334/// conflicts. Then you can write the result as a legacy tree
1335/// (allowing path-level conflicts) or as multiple conflict-free
1336/// trees.
1337pub struct MergedTreeBuilder {
1338    base_tree_id: MergedTreeId,
1339    overrides: BTreeMap<RepoPathBuf, MergedTreeValue>,
1340}
1341
1342impl MergedTreeBuilder {
1343    /// Create a new builder with the given trees as base.
1344    pub fn new(base_tree_id: MergedTreeId) -> Self {
1345        Self {
1346            base_tree_id,
1347            overrides: BTreeMap::new(),
1348        }
1349    }
1350
1351    /// Set an override compared to  the base tree. The `values` merge must
1352    /// either be resolved (i.e. have 1 side) or have the same number of
1353    /// sides as the `base_tree_ids` used to construct this builder. Use
1354    /// `Merge::absent()` to remove a value from the tree. When the base tree is
1355    /// a legacy tree, conflicts can be written either as a multi-way `Merge`
1356    /// value or as a resolved `Merge` value using `TreeValue::Conflict`.
1357    pub fn set_or_remove(&mut self, path: RepoPathBuf, values: MergedTreeValue) {
1358        if let MergedTreeId::Merge(_) = &self.base_tree_id {
1359            assert!(
1360                !values
1361                    .iter()
1362                    .flatten()
1363                    .any(|value| matches!(value, TreeValue::Conflict(_)))
1364            );
1365        }
1366        self.overrides.insert(path, values);
1367    }
1368
1369    /// Create new tree(s) from the base tree(s) and overrides.
1370    pub fn write_tree(self, store: &Arc<Store>) -> BackendResult<MergedTreeId> {
1371        let base_tree_ids = match self.base_tree_id.clone() {
1372            MergedTreeId::Legacy(base_tree_id) => {
1373                let legacy_base_tree = store.get_tree(RepoPathBuf::root(), &base_tree_id)?;
1374                let base_tree = MergedTree::from_legacy_tree(legacy_base_tree)?;
1375                base_tree.id().to_merge()
1376            }
1377            MergedTreeId::Merge(base_tree_ids) => base_tree_ids,
1378        };
1379        let new_tree_ids = self.write_merged_trees(base_tree_ids, store)?;
1380        match new_tree_ids.simplify().into_resolved() {
1381            Ok(single_tree_id) => Ok(MergedTreeId::resolved(single_tree_id)),
1382            Err(tree_id) => {
1383                let tree = store.get_root_tree(&MergedTreeId::Merge(tree_id))?;
1384                let resolved = tree.resolve().block_on()?;
1385                Ok(resolved.id())
1386            }
1387        }
1388    }
1389
1390    fn write_merged_trees(
1391        self,
1392        mut base_tree_ids: Merge<TreeId>,
1393        store: &Arc<Store>,
1394    ) -> BackendResult<Merge<TreeId>> {
1395        let num_sides = self
1396            .overrides
1397            .values()
1398            .map(|value| value.num_sides())
1399            .max()
1400            .unwrap_or(0);
1401        base_tree_ids.pad_to(num_sides, store.empty_tree_id());
1402        // Create a single-tree builder for each base tree
1403        let mut tree_builders =
1404            base_tree_ids.map(|base_tree_id| TreeBuilder::new(store.clone(), base_tree_id.clone()));
1405        for (path, values) in self.overrides {
1406            match values.into_resolved() {
1407                Ok(value) => {
1408                    // This path was overridden with a resolved value. Apply that to all
1409                    // builders.
1410                    for builder in tree_builders.iter_mut() {
1411                        builder.set_or_remove(path.clone(), value.clone());
1412                    }
1413                }
1414                Err(mut values) => {
1415                    values.pad_to(num_sides, &None);
1416                    // This path was overridden with a conflicted value. Apply each term to
1417                    // its corresponding builder.
1418                    for (builder, value) in zip(tree_builders.iter_mut(), values) {
1419                        builder.set_or_remove(path.clone(), value);
1420                    }
1421                }
1422            }
1423        }
1424        // TODO: This can be made more efficient. If there's a single resolved conflict
1425        // in `dir/file`, we shouldn't have to write the `dir/` and root trees more than
1426        // once.
1427        let merge_builder: MergeBuilder<TreeId> = tree_builders
1428            .into_iter()
1429            .map(|builder| builder.write_tree())
1430            .try_collect()?;
1431        Ok(merge_builder.build())
1432    }
1433}