1use std::borrow::Borrow;
18use std::cmp::max;
19use std::collections::BTreeMap;
20use std::collections::HashSet;
21use std::iter;
22use std::iter::zip;
23use std::pin::Pin;
24use std::sync::Arc;
25use std::task::Context;
26use std::task::Poll;
27use std::task::ready;
28use std::vec;
29
30use either::Either;
31use futures::FutureExt as _;
32use futures::Stream;
33use futures::StreamExt as _;
34use futures::future::BoxFuture;
35use futures::future::try_join;
36use futures::future::try_join_all;
37use futures::stream::BoxStream;
38use futures::stream::FuturesUnordered;
39use itertools::EitherOrBoth;
40use itertools::Itertools as _;
41use pollster::FutureExt as _;
42
43use crate::backend;
44use crate::backend::BackendResult;
45use crate::backend::MergedTreeId;
46use crate::backend::TreeId;
47use crate::backend::TreeValue;
48use crate::copies::CopiesTreeDiffEntry;
49use crate::copies::CopiesTreeDiffStream;
50use crate::copies::CopyRecords;
51use crate::matchers::EverythingMatcher;
52use crate::matchers::Matcher;
53use crate::merge::Merge;
54use crate::merge::MergeBuilder;
55use crate::merge::MergedTreeVal;
56use crate::merge::MergedTreeValue;
57use crate::repo_path::RepoPath;
58use crate::repo_path::RepoPathBuf;
59use crate::repo_path::RepoPathComponent;
60use crate::repo_path::RepoPathComponentBuf;
61use crate::store::Store;
62use crate::tree::Tree;
63use crate::tree::try_resolve_file_conflict;
64use crate::tree_builder::TreeBuilder;
65
66#[derive(PartialEq, Eq, Clone, Debug)]
68pub struct MergedTree {
69 trees: Merge<Tree>,
70}
71
72impl MergedTree {
73 pub fn resolved(tree: Tree) -> Self {
75 Self::new(Merge::resolved(tree))
76 }
77
78 pub fn new(trees: Merge<Tree>) -> Self {
81 debug_assert!(!trees.iter().any(|t| t.has_conflict()));
82 debug_assert!(trees.iter().map(|tree| tree.dir()).all_equal());
83 debug_assert!(
84 trees
85 .iter()
86 .map(|tree| Arc::as_ptr(tree.store()))
87 .all_equal()
88 );
89 Self { trees }
90 }
91
92 pub fn from_legacy_tree(tree: Tree) -> BackendResult<Self> {
96 let conflict_ids = tree.conflicts();
97 if conflict_ids.is_empty() {
98 return Ok(Self::resolved(tree));
99 }
100
101 let mut max_tree_count = 1;
103 let store = tree.store();
104 let mut conflicts: Vec<(&RepoPath, MergedTreeValue)> = vec![];
105 for (path, conflict_id) in &conflict_ids {
106 let conflict = store.read_conflict(path, conflict_id)?;
107 max_tree_count = max(max_tree_count, conflict.iter().len());
108 conflicts.push((path, conflict));
109 }
110 let mut tree_builders = Vec::new();
111 tree_builders.resize_with(max_tree_count, || {
112 TreeBuilder::new(store.clone(), tree.id().clone())
113 });
114 for (path, conflict) in conflicts {
115 let terms_padded = conflict.into_iter().chain(iter::repeat(None));
119 for (builder, term) in zip(&mut tree_builders, terms_padded) {
120 builder.set_or_remove(path.to_owned(), term);
121 }
122 }
123
124 let new_trees: Vec<_> = tree_builders
125 .into_iter()
126 .map(|builder| {
127 let tree_id = builder.write_tree()?;
128 store.get_tree(RepoPathBuf::root(), &tree_id)
129 })
130 .try_collect()?;
131 Ok(Self {
132 trees: Merge::from_vec(new_trees),
133 })
134 }
135
136 pub fn as_merge(&self) -> &Merge<Tree> {
138 &self.trees
139 }
140
141 pub fn take(self) -> Merge<Tree> {
143 self.trees
144 }
145
146 pub fn dir(&self) -> &RepoPath {
148 self.trees.first().dir()
149 }
150
151 pub fn store(&self) -> &Arc<Store> {
153 self.trees.first().store()
154 }
155
156 pub fn names<'a>(&'a self) -> Box<dyn Iterator<Item = &'a RepoPathComponent> + 'a> {
158 Box::new(all_tree_basenames(&self.trees))
159 }
160
161 pub fn value(&self, basename: &RepoPathComponent) -> MergedTreeVal<'_> {
166 trees_value(&self.trees, basename)
167 }
168
169 pub async fn resolve(self) -> BackendResult<Self> {
172 let merged = merge_trees(self.trees).await?;
173 let simplified = merged.simplify();
178 if cfg!(debug_assertions) {
182 let re_merged = merge_trees(simplified.clone()).await.unwrap();
183 debug_assert_eq!(re_merged, simplified);
184 }
185 Ok(Self { trees: simplified })
186 }
187
188 pub fn conflicts(
194 &self,
195 ) -> impl Iterator<Item = (RepoPathBuf, BackendResult<MergedTreeValue>)> + use<> {
196 ConflictIterator::new(self)
197 }
198
199 pub fn has_conflict(&self) -> bool {
201 !self.trees.is_resolved()
202 }
203
204 pub async fn sub_tree(&self, name: &RepoPathComponent) -> BackendResult<Option<Self>> {
208 match self.value(name).into_resolved() {
209 Ok(Some(TreeValue::Tree(sub_tree_id))) => {
210 let subdir = self.dir().join(name);
211 Ok(Some(Self::resolved(
212 self.store().get_tree_async(subdir, sub_tree_id).await?,
213 )))
214 }
215 Ok(_) => Ok(None),
216 Err(merge) => {
217 if !merge.is_tree() {
218 return Ok(None);
219 }
220 let trees = merge
221 .try_map_async(async |value| match value {
222 Some(TreeValue::Tree(sub_tree_id)) => {
223 let subdir = self.dir().join(name);
224 self.store().get_tree_async(subdir, sub_tree_id).await
225 }
226 Some(_) => unreachable!(),
227 None => {
228 let subdir = self.dir().join(name);
229 Ok(Tree::empty(self.store().clone(), subdir))
230 }
231 })
232 .await?;
233 Ok(Some(Self { trees }))
234 }
235 }
236 }
237
238 pub fn path_value(&self, path: &RepoPath) -> BackendResult<MergedTreeValue> {
242 self.path_value_async(path).block_on()
243 }
244
245 pub async fn path_value_async(&self, path: &RepoPath) -> BackendResult<MergedTreeValue> {
247 assert_eq!(self.dir(), RepoPath::root());
248 match path.split() {
249 Some((dir, basename)) => match self.sub_tree_recursive(dir).await? {
250 None => Ok(Merge::absent()),
251 Some(tree) => Ok(tree.value(basename).cloned()),
252 },
253 None => Ok(self
254 .trees
255 .map(|tree| Some(TreeValue::Tree(tree.id().clone())))),
256 }
257 }
258
259 pub fn id(&self) -> MergedTreeId {
261 MergedTreeId::Merge(self.trees.map(|tree| tree.id().clone()))
262 }
263
264 pub async fn sub_tree_recursive(&self, path: &RepoPath) -> BackendResult<Option<Self>> {
266 let mut current_tree = self.clone();
267 for name in path.components() {
268 match current_tree.sub_tree(name).await? {
269 None => {
270 return Ok(None);
271 }
272 Some(sub_tree) => {
273 current_tree = sub_tree;
274 }
275 }
276 }
277 Ok(Some(current_tree))
278 }
279
280 pub fn entries(&self) -> TreeEntriesIterator<'static> {
290 self.entries_matching(&EverythingMatcher)
291 }
292
293 pub fn entries_matching<'matcher>(
295 &self,
296 matcher: &'matcher dyn Matcher,
297 ) -> TreeEntriesIterator<'matcher> {
298 TreeEntriesIterator::new(&self.trees, matcher)
299 }
300
301 fn diff_stream_internal<'matcher>(
306 &self,
307 other: &Self,
308 matcher: &'matcher dyn Matcher,
309 ) -> TreeDiffStream<'matcher> {
310 let concurrency = self.store().concurrency();
311 if concurrency <= 1 {
312 Box::pin(futures::stream::iter(TreeDiffIterator::new(
313 &self.trees,
314 &other.trees,
315 matcher,
316 )))
317 } else {
318 Box::pin(TreeDiffStreamImpl::new(
319 &self.trees,
320 &other.trees,
321 matcher,
322 concurrency,
323 ))
324 }
325 }
326
327 pub fn diff_stream<'matcher>(
329 &self,
330 other: &Self,
331 matcher: &'matcher dyn Matcher,
332 ) -> TreeDiffStream<'matcher> {
333 stream_without_trees(self.diff_stream_internal(other, matcher))
334 }
335
336 pub fn diff_stream_for_file_system<'matcher>(
339 &self,
340 other: &Self,
341 matcher: &'matcher dyn Matcher,
342 ) -> TreeDiffStream<'matcher> {
343 Box::pin(DiffStreamForFileSystem::new(
344 self.diff_stream_internal(other, matcher),
345 ))
346 }
347
348 pub fn diff_stream_with_copies<'a>(
350 &self,
351 other: &Self,
352 matcher: &'a dyn Matcher,
353 copy_records: &'a CopyRecords,
354 ) -> BoxStream<'a, CopiesTreeDiffEntry> {
355 let stream = self.diff_stream(other, matcher);
356 Box::pin(CopiesTreeDiffStream::new(
357 stream,
358 self.clone(),
359 other.clone(),
360 copy_records,
361 ))
362 }
363
364 pub async fn merge(self, base: Self, other: Self) -> BackendResult<Self> {
367 self.merge_no_resolve(base, other).resolve().await
368 }
369
370 pub fn merge_no_resolve(self, base: Self, other: Self) -> Self {
373 let nested = Merge::from_vec(vec![self.trees, base.trees, other.trees]);
374 Self {
375 trees: nested.flatten().simplify(),
376 }
377 }
378}
379
380pub struct TreeDiffEntry {
382 pub path: RepoPathBuf,
384 pub values: BackendResult<(MergedTreeValue, MergedTreeValue)>,
386}
387
388pub type TreeDiffStream<'matcher> = BoxStream<'matcher, TreeDiffEntry>;
392
393fn all_tree_basenames(trees: &Merge<Tree>) -> impl Iterator<Item = &RepoPathComponent> {
394 trees
395 .iter()
396 .map(|tree| tree.data().names())
397 .kmerge()
398 .dedup()
399}
400
401fn all_tree_entries(
402 trees: &Merge<Tree>,
403) -> impl Iterator<Item = (&RepoPathComponent, MergedTreeVal<'_>)> {
404 if let Some(tree) = trees.as_resolved() {
405 let iter = tree
406 .entries_non_recursive()
407 .map(|entry| (entry.name(), Merge::normal(entry.value())));
408 Either::Left(iter)
409 } else {
410 let iter = all_merged_tree_entries(trees).map(|(name, values)| {
411 let values = match values.resolve_trivial() {
413 Some(resolved) => Merge::resolved(*resolved),
414 None => values,
415 };
416 (name, values)
417 });
418 Either::Right(iter)
419 }
420}
421
422fn all_merged_tree_entries(
426 trees: &Merge<Tree>,
427) -> impl Iterator<Item = (&RepoPathComponent, MergedTreeVal<'_>)> {
428 let mut entries_iters = trees
429 .iter()
430 .map(|tree| tree.entries_non_recursive().peekable())
431 .collect_vec();
432 iter::from_fn(move || {
433 let next_name = entries_iters
434 .iter_mut()
435 .filter_map(|iter| iter.peek())
436 .map(|entry| entry.name())
437 .min()?;
438 let values: MergeBuilder<_> = entries_iters
439 .iter_mut()
440 .map(|iter| {
441 let entry = iter.next_if(|entry| entry.name() == next_name)?;
442 Some(entry.value())
443 })
444 .collect();
445 Some((next_name, values.build()))
446 })
447}
448
449fn merged_tree_entry_diff<'a>(
450 trees1: &'a Merge<Tree>,
451 trees2: &'a Merge<Tree>,
452) -> impl Iterator<Item = (&'a RepoPathComponent, MergedTreeVal<'a>, MergedTreeVal<'a>)> {
453 itertools::merge_join_by(
454 all_tree_entries(trees1),
455 all_tree_entries(trees2),
456 |(name1, _), (name2, _)| name1.cmp(name2),
457 )
458 .map(|entry| match entry {
459 EitherOrBoth::Both((name, value1), (_, value2)) => (name, value1, value2),
460 EitherOrBoth::Left((name, value1)) => (name, value1, Merge::absent()),
461 EitherOrBoth::Right((name, value2)) => (name, Merge::absent(), value2),
462 })
463 .filter(|(_, value1, value2)| value1 != value2)
464}
465
466fn trees_value<'a>(trees: &'a Merge<Tree>, basename: &RepoPathComponent) -> MergedTreeVal<'a> {
467 if let Some(tree) = trees.as_resolved() {
468 return Merge::resolved(tree.value(basename));
469 }
470 let value = trees.map(|tree| tree.value(basename));
471 if let Some(resolved) = value.resolve_trivial() {
472 return Merge::resolved(*resolved);
473 }
474 value
475}
476
477struct MergedTreeInput {
478 resolved: BTreeMap<RepoPathComponentBuf, TreeValue>,
479 pending_lookup: HashSet<RepoPathComponentBuf>,
482 conflicts: BTreeMap<RepoPathComponentBuf, MergedTreeValue>,
483}
484
485impl MergedTreeInput {
486 fn new(resolved: BTreeMap<RepoPathComponentBuf, TreeValue>) -> Self {
487 Self {
488 resolved,
489 pending_lookup: HashSet::new(),
490 conflicts: BTreeMap::new(),
491 }
492 }
493
494 fn mark_completed(&mut self, basename: RepoPathComponentBuf, value: MergedTreeValue) {
495 let was_pending = self.pending_lookup.remove(&basename);
496 assert!(was_pending, "No pending lookup for {basename:?}");
497 if let Some(resolved) = value.resolve_trivial() {
498 if let Some(resolved) = resolved.as_ref() {
499 self.resolved.insert(basename, resolved.clone());
500 }
501 } else {
502 self.conflicts.insert(basename, value);
503 }
504 }
505
506 fn into_backend_trees(self) -> Merge<backend::Tree> {
507 assert!(self.pending_lookup.is_empty());
508
509 fn by_name(
510 (name1, _): &(RepoPathComponentBuf, TreeValue),
511 (name2, _): &(RepoPathComponentBuf, TreeValue),
512 ) -> bool {
513 name1 < name2
514 }
515
516 if self.conflicts.is_empty() {
517 let all_entries = self.resolved.into_iter().collect();
518 Merge::resolved(backend::Tree::from_sorted_entries(all_entries))
519 } else {
520 let mut conflict_entries = self.conflicts.first_key_value().unwrap().1.map(|_| vec![]);
522 for (basename, value) in self.conflicts {
523 assert_eq!(value.num_sides(), conflict_entries.num_sides());
524 for (entries, value) in conflict_entries.iter_mut().zip(value.into_iter()) {
525 if let Some(value) = value {
526 entries.push((basename.clone(), value));
527 }
528 }
529 }
530
531 let mut backend_trees = vec![];
532 for entries in conflict_entries.into_iter() {
533 let backend_tree = backend::Tree::from_sorted_entries(
534 self.resolved
535 .iter()
536 .map(|(name, value)| (name.clone(), value.clone()))
537 .merge_by(entries, by_name)
538 .collect(),
539 );
540 backend_trees.push(backend_tree);
541 }
542 Merge::from_vec(backend_trees)
543 }
544 }
545}
546
547enum TreeMergerWorkOutput {
549 ReadTrees {
551 dir: RepoPathBuf,
552 result: BackendResult<Merge<Tree>>,
553 },
554 WrittenTrees {
555 dir: RepoPathBuf,
556 result: BackendResult<Merge<Tree>>,
557 },
558 MergedFiles {
559 path: RepoPathBuf,
560 result: BackendResult<MergedTreeValue>,
561 },
562}
563
564#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
565enum TreeMergeWorkItemKey {
566 MergeFiles { path: RepoPathBuf },
569 ReadTrees { dir: RepoPathBuf },
570}
571
572struct TreeMerger {
573 store: Arc<Store>,
574 trees_to_resolve: BTreeMap<RepoPathBuf, MergedTreeInput>,
576 work: FuturesUnordered<BoxFuture<'static, TreeMergerWorkOutput>>,
578 unstarted_work: BTreeMap<TreeMergeWorkItemKey, BoxFuture<'static, TreeMergerWorkOutput>>,
580}
581
582impl TreeMerger {
583 async fn merge(mut self) -> BackendResult<Merge<Tree>> {
584 while let Some(work_item) = self.work.next().await {
585 match work_item {
586 TreeMergerWorkOutput::ReadTrees { dir, result } => {
587 let tree = result?;
588 self.process_tree(dir, tree);
589 }
590 TreeMergerWorkOutput::WrittenTrees { dir, result } => {
591 let tree = result?;
592 if dir.is_root() {
593 assert!(self.trees_to_resolve.is_empty());
594 assert!(self.work.is_empty());
595 assert!(self.unstarted_work.is_empty());
596 return Ok(tree);
597 }
598 let new_value = tree.map(|tree| {
600 (tree.id() != self.store.empty_tree_id())
601 .then(|| TreeValue::Tree(tree.id().clone()))
602 });
603 self.mark_completed(&dir, new_value);
604 }
605 TreeMergerWorkOutput::MergedFiles { path, result } => {
606 let value = result?;
607 self.mark_completed(&path, value);
608 }
609 }
610
611 while self.work.len() < self.store.concurrency() {
612 if let Some((_key, work)) = self.unstarted_work.pop_first() {
613 self.work.push(work);
614 } else {
615 break;
616 }
617 }
618 }
619
620 unreachable!("There was no work item for writing the root tree");
621 }
622
623 fn process_tree(&mut self, dir: RepoPathBuf, tree: Merge<Tree>) {
624 let mut resolved = vec![];
627 let mut non_trivial = vec![];
628 for (basename, path_merge) in all_merged_tree_entries(&tree) {
629 if let Some(value) = path_merge.resolve_trivial() {
630 if let Some(value) = value.cloned() {
631 resolved.push((basename.to_owned(), value));
632 }
633 } else {
634 non_trivial.push((basename.to_owned(), path_merge.cloned()));
635 }
636 }
637
638 if non_trivial.is_empty() {
640 let backend_trees = Merge::resolved(backend::Tree::from_sorted_entries(resolved));
641 self.enqueue_tree_write(dir, backend_trees);
642 return;
643 }
644
645 let mut unmerged_tree = MergedTreeInput::new(resolved.into_iter().collect());
646 for (basename, value) in non_trivial {
647 let path = dir.join(&basename);
648 unmerged_tree.pending_lookup.insert(basename);
649 if value.is_tree() {
650 self.enqueue_tree_read(path, value);
651 } else {
652 self.enqueue_file_merge(path, value);
656 }
657 }
658
659 self.trees_to_resolve.insert(dir, unmerged_tree);
660 }
661
662 fn enqueue_tree_read(&mut self, dir: RepoPathBuf, value: MergedTreeValue) {
663 let key = TreeMergeWorkItemKey::ReadTrees { dir: dir.clone() };
664 let work_fut = read_trees(self.store.clone(), dir.clone(), value)
665 .map(|result| TreeMergerWorkOutput::ReadTrees { dir, result });
666 if self.work.len() < self.store.concurrency() {
667 self.work.push(Box::pin(work_fut));
668 } else {
669 self.unstarted_work.insert(key, Box::pin(work_fut));
670 }
671 }
672
673 fn enqueue_tree_write(&mut self, dir: RepoPathBuf, backend_trees: Merge<backend::Tree>) {
674 let work_fut = write_trees(self.store.clone(), dir.clone(), backend_trees)
675 .map(|result| TreeMergerWorkOutput::WrittenTrees { dir, result });
676 self.work.push(Box::pin(work_fut));
679 }
680
681 fn enqueue_file_merge(&mut self, path: RepoPathBuf, value: MergedTreeValue) {
682 let key = TreeMergeWorkItemKey::MergeFiles { path: path.clone() };
683 let work_fut = resolve_file_values_owned(self.store.clone(), path.clone(), value)
684 .map(|result| TreeMergerWorkOutput::MergedFiles { path, result });
685 if self.work.len() < self.store.concurrency() {
686 self.work.push(Box::pin(work_fut));
687 } else {
688 self.unstarted_work.insert(key, Box::pin(work_fut));
689 }
690 }
691
692 fn mark_completed(&mut self, path: &RepoPath, value: MergedTreeValue) {
693 let (dir, basename) = path.split().unwrap();
694 let tree = self.trees_to_resolve.get_mut(dir).unwrap();
695 tree.mark_completed(basename.to_owned(), value);
696 if tree.pending_lookup.is_empty() {
699 let tree = self.trees_to_resolve.remove(dir).unwrap();
700 self.enqueue_tree_write(dir.to_owned(), tree.into_backend_trees());
701 }
702 }
703}
704
705async fn read_trees(
706 store: Arc<Store>,
707 dir: RepoPathBuf,
708 value: MergedTreeValue,
709) -> BackendResult<Merge<Tree>> {
710 let trees = value
711 .to_tree_merge(&store, &dir)
712 .await?
713 .expect("Should be tree merge");
714 Ok(trees)
715}
716
717async fn write_trees(
718 store: Arc<Store>,
719 dir: RepoPathBuf,
720 backend_trees: Merge<backend::Tree>,
721) -> BackendResult<Merge<Tree>> {
722 let trees = try_join_all(
725 backend_trees
726 .into_iter()
727 .map(|backend_tree| store.write_tree(&dir, backend_tree)),
728 )
729 .await?;
730 Ok(Merge::from_vec(trees))
731}
732
733async fn resolve_file_values_owned(
734 store: Arc<Store>,
735 path: RepoPathBuf,
736 values: MergedTreeValue,
737) -> BackendResult<MergedTreeValue> {
738 let maybe_resolved = try_resolve_file_values(&store, &path, &values).await?;
739 Ok(maybe_resolved.unwrap_or(values))
740}
741
742async fn merge_trees(merge: Merge<Tree>) -> BackendResult<Merge<Tree>> {
745 let merge = match merge.into_resolved() {
746 Ok(tree) => return Ok(Merge::resolved(tree)),
747 Err(merge) => merge,
748 };
749
750 let store = merge.first().store().clone();
751 let merger = TreeMerger {
752 store,
753 trees_to_resolve: BTreeMap::new(),
754 work: FuturesUnordered::new(),
755 unstarted_work: BTreeMap::new(),
756 };
757 merger.work.push(Box::pin(std::future::ready(
758 TreeMergerWorkOutput::ReadTrees {
759 dir: RepoPathBuf::root(),
760 result: Ok(merge),
761 },
762 )));
763 merger.merge().await
764}
765
766pub async fn resolve_file_values(
770 store: &Arc<Store>,
771 path: &RepoPath,
772 values: MergedTreeValue,
773) -> BackendResult<MergedTreeValue> {
774 if let Some(resolved) = values.resolve_trivial() {
775 return Ok(Merge::resolved(resolved.clone()));
776 }
777
778 let maybe_resolved = try_resolve_file_values(store, path, &values).await?;
779 Ok(maybe_resolved.unwrap_or(values))
780}
781
782async fn try_resolve_file_values<T: Borrow<TreeValue>>(
783 store: &Arc<Store>,
784 path: &RepoPath,
785 values: &Merge<Option<T>>,
786) -> BackendResult<Option<MergedTreeValue>> {
787 let simplified = values
790 .map(|value| value.as_ref().map(Borrow::borrow))
791 .simplify();
792 if let Some(resolved) = try_resolve_file_conflict(store, path, &simplified).await? {
795 Ok(Some(Merge::normal(resolved)))
796 } else {
797 Ok(None)
799 }
800}
801
802pub struct TreeEntriesIterator<'matcher> {
804 store: Arc<Store>,
805 stack: Vec<TreeEntriesDirItem>,
806 matcher: &'matcher dyn Matcher,
807}
808
809struct TreeEntriesDirItem {
810 entries: Vec<(RepoPathBuf, MergedTreeValue)>,
811}
812
813impl TreeEntriesDirItem {
814 fn new(trees: &Merge<Tree>, matcher: &dyn Matcher) -> Self {
815 let mut entries = vec![];
816 let dir = trees.first().dir();
817 for (name, value) in all_tree_entries(trees) {
818 let path = dir.join(name);
819 if value.is_tree() {
820 if matcher.visit(&path).is_nothing() {
822 continue;
823 }
824 } else if !matcher.matches(&path) {
825 continue;
826 }
827 entries.push((path, value.cloned()));
828 }
829 entries.reverse();
830 Self { entries }
831 }
832}
833
834impl<'matcher> TreeEntriesIterator<'matcher> {
835 fn new(trees: &Merge<Tree>, matcher: &'matcher dyn Matcher) -> Self {
836 Self {
837 store: trees.first().store().clone(),
838 stack: vec![TreeEntriesDirItem::new(trees, matcher)],
839 matcher,
840 }
841 }
842}
843
844impl Iterator for TreeEntriesIterator<'_> {
845 type Item = (RepoPathBuf, BackendResult<MergedTreeValue>);
846
847 fn next(&mut self) -> Option<Self::Item> {
848 while let Some(top) = self.stack.last_mut() {
849 if let Some((path, value)) = top.entries.pop() {
850 let maybe_trees = match value.to_tree_merge(&self.store, &path).block_on() {
851 Ok(maybe_trees) => maybe_trees,
852 Err(err) => return Some((path, Err(err))),
853 };
854 if let Some(trees) = maybe_trees {
855 self.stack
856 .push(TreeEntriesDirItem::new(&trees, self.matcher));
857 } else {
858 return Some((path, Ok(value)));
859 }
860 } else {
861 self.stack.pop();
862 }
863 }
864 None
865 }
866}
867
868struct ConflictsDirItem {
871 entries: Vec<(RepoPathBuf, MergedTreeValue)>,
872}
873
874impl From<&Merge<Tree>> for ConflictsDirItem {
875 fn from(trees: &Merge<Tree>) -> Self {
876 let dir = trees.first().dir();
877 if trees.is_resolved() {
878 return Self { entries: vec![] };
879 }
880
881 let mut entries = vec![];
882 for (basename, value) in all_tree_entries(trees) {
883 if !value.is_resolved() {
884 entries.push((dir.join(basename), value.cloned()));
885 }
886 }
887 entries.reverse();
888 Self { entries }
889 }
890}
891
892struct ConflictIterator {
893 store: Arc<Store>,
894 stack: Vec<ConflictsDirItem>,
895}
896
897impl ConflictIterator {
898 fn new(tree: &MergedTree) -> Self {
899 Self {
900 store: tree.store().clone(),
901 stack: vec![ConflictsDirItem::from(&tree.trees)],
902 }
903 }
904}
905
906impl Iterator for ConflictIterator {
907 type Item = (RepoPathBuf, BackendResult<MergedTreeValue>);
908
909 fn next(&mut self) -> Option<Self::Item> {
910 while let Some(top) = self.stack.last_mut() {
911 if let Some((path, tree_values)) = top.entries.pop() {
912 match tree_values.to_tree_merge(&self.store, &path).block_on() {
913 Ok(Some(trees)) => {
914 self.stack.push(ConflictsDirItem::from(&trees));
916 }
917 Ok(None) => {
918 return Some((path, Ok(tree_values)));
922 }
923 Err(err) => {
924 return Some((path, Err(err)));
925 }
926 }
927 } else {
928 self.stack.pop();
929 }
930 }
931 None
932 }
933}
934
935pub struct TreeDiffIterator<'matcher> {
940 store: Arc<Store>,
941 stack: Vec<TreeDiffDir>,
942 matcher: &'matcher dyn Matcher,
943}
944
945struct TreeDiffDir {
946 entries: Vec<(RepoPathBuf, MergedTreeValue, MergedTreeValue)>,
947}
948
949impl<'matcher> TreeDiffIterator<'matcher> {
950 pub fn new(trees1: &Merge<Tree>, trees2: &Merge<Tree>, matcher: &'matcher dyn Matcher) -> Self {
952 assert!(Arc::ptr_eq(trees1.first().store(), trees2.first().store()));
953 let root_dir = RepoPath::root();
954 let mut stack = Vec::new();
955 if !matcher.visit(root_dir).is_nothing() {
956 stack.push(TreeDiffDir::from_trees(root_dir, trees1, trees2, matcher));
957 };
958 Self {
959 store: trees1.first().store().clone(),
960 stack,
961 matcher,
962 }
963 }
964
965 fn trees(
967 store: &Arc<Store>,
968 dir: &RepoPath,
969 values: &MergedTreeValue,
970 ) -> BackendResult<Merge<Tree>> {
971 if let Some(trees) = values.to_tree_merge(store, dir).block_on()? {
972 Ok(trees)
973 } else {
974 Ok(Merge::resolved(Tree::empty(store.clone(), dir.to_owned())))
975 }
976 }
977}
978
979impl TreeDiffDir {
980 fn from_trees(
981 dir: &RepoPath,
982 trees1: &Merge<Tree>,
983 trees2: &Merge<Tree>,
984 matcher: &dyn Matcher,
985 ) -> Self {
986 let mut entries = vec![];
987 for (name, before, after) in merged_tree_entry_diff(trees1, trees2) {
988 let path = dir.join(name);
989 let tree_before = before.is_tree();
990 let tree_after = after.is_tree();
991 let tree_matches = (tree_before || tree_after) && !matcher.visit(&path).is_nothing();
994 let file_matches = (!tree_before || !tree_after) && matcher.matches(&path);
995
996 let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
998 before
999 } else {
1000 Merge::absent()
1001 };
1002 let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
1003 after
1004 } else {
1005 Merge::absent()
1006 };
1007 if before.is_absent() && after.is_absent() {
1008 continue;
1009 }
1010 entries.push((path, before.cloned(), after.cloned()));
1011 }
1012 entries.reverse();
1013 Self { entries }
1014 }
1015}
1016
1017impl Iterator for TreeDiffIterator<'_> {
1018 type Item = TreeDiffEntry;
1019
1020 fn next(&mut self) -> Option<Self::Item> {
1021 while let Some(top) = self.stack.last_mut() {
1022 let (path, before, after) = match top.entries.pop() {
1023 Some(entry) => entry,
1024 None => {
1025 self.stack.pop().unwrap();
1026 continue;
1027 }
1028 };
1029
1030 if before.is_tree() || after.is_tree() {
1031 let (before_tree, after_tree) = match (
1032 Self::trees(&self.store, &path, &before),
1033 Self::trees(&self.store, &path, &after),
1034 ) {
1035 (Ok(before_tree), Ok(after_tree)) => (before_tree, after_tree),
1036 (Err(before_err), _) => {
1037 return Some(TreeDiffEntry {
1038 path,
1039 values: Err(before_err),
1040 });
1041 }
1042 (_, Err(after_err)) => {
1043 return Some(TreeDiffEntry {
1044 path,
1045 values: Err(after_err),
1046 });
1047 }
1048 };
1049 let subdir =
1050 TreeDiffDir::from_trees(&path, &before_tree, &after_tree, self.matcher);
1051 self.stack.push(subdir);
1052 };
1053 if before.is_file_like() || after.is_file_like() {
1054 return Some(TreeDiffEntry {
1055 path,
1056 values: Ok((before, after)),
1057 });
1058 }
1059 }
1060 None
1061 }
1062}
1063
1064pub struct TreeDiffStreamImpl<'matcher> {
1069 store: Arc<Store>,
1070 matcher: &'matcher dyn Matcher,
1071 items: BTreeMap<RepoPathBuf, BackendResult<(MergedTreeValue, MergedTreeValue)>>,
1076 #[expect(clippy::type_complexity)]
1078 pending_trees:
1079 BTreeMap<RepoPathBuf, BoxFuture<'matcher, BackendResult<(Merge<Tree>, Merge<Tree>)>>>,
1080 max_concurrent_reads: usize,
1085 max_queued_items: usize,
1091}
1092
1093impl<'matcher> TreeDiffStreamImpl<'matcher> {
1094 pub fn new(
1097 trees1: &Merge<Tree>,
1098 trees2: &Merge<Tree>,
1099 matcher: &'matcher dyn Matcher,
1100 max_concurrent_reads: usize,
1101 ) -> Self {
1102 assert!(Arc::ptr_eq(trees1.first().store(), trees2.first().store()));
1103 let mut stream = Self {
1104 store: trees1.first().store().clone(),
1105 matcher,
1106 items: BTreeMap::new(),
1107 pending_trees: BTreeMap::new(),
1108 max_concurrent_reads,
1109 max_queued_items: 10000,
1110 };
1111 stream.add_dir_diff_items(RepoPath::root(), trees1, trees2);
1112 stream
1113 }
1114
1115 async fn single_tree(
1116 store: &Arc<Store>,
1117 dir: RepoPathBuf,
1118 value: Option<&TreeValue>,
1119 ) -> BackendResult<Tree> {
1120 match value {
1121 Some(TreeValue::Tree(tree_id)) => store.get_tree_async(dir, tree_id).await,
1122 _ => Ok(Tree::empty(store.clone(), dir.clone())),
1123 }
1124 }
1125
1126 async fn trees(
1128 store: Arc<Store>,
1129 dir: RepoPathBuf,
1130 values: MergedTreeValue,
1131 ) -> BackendResult<Merge<Tree>> {
1132 if values.is_tree() {
1133 values
1134 .try_map_async(|value| Self::single_tree(&store, dir.clone(), value.as_ref()))
1135 .await
1136 } else {
1137 Ok(Merge::resolved(Tree::empty(store, dir)))
1138 }
1139 }
1140
1141 fn add_dir_diff_items(&mut self, dir: &RepoPath, trees1: &Merge<Tree>, trees2: &Merge<Tree>) {
1142 for (basename, before, after) in merged_tree_entry_diff(trees1, trees2) {
1143 let path = dir.join(basename);
1144 let tree_before = before.is_tree();
1145 let tree_after = after.is_tree();
1146 let tree_matches =
1149 (tree_before || tree_after) && !self.matcher.visit(&path).is_nothing();
1150 let file_matches = (!tree_before || !tree_after) && self.matcher.matches(&path);
1151
1152 let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
1154 before
1155 } else {
1156 Merge::absent()
1157 };
1158 let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
1159 after
1160 } else {
1161 Merge::absent()
1162 };
1163 if before.is_absent() && after.is_absent() {
1164 continue;
1165 }
1166
1167 if tree_matches {
1169 let before_tree_future =
1170 Self::trees(self.store.clone(), path.clone(), before.cloned());
1171 let after_tree_future =
1172 Self::trees(self.store.clone(), path.clone(), after.cloned());
1173 let both_trees_future = try_join(before_tree_future, after_tree_future);
1174 self.pending_trees
1175 .insert(path.clone(), Box::pin(both_trees_future));
1176 }
1177
1178 if before.is_file_like() || after.is_file_like() {
1179 self.items
1180 .insert(path, Ok((before.cloned(), after.cloned())));
1181 }
1182 }
1183 }
1184
1185 fn poll_tree_futures(&mut self, cx: &mut Context<'_>) {
1186 loop {
1187 let mut tree_diffs = vec![];
1188 let mut some_pending = false;
1189 let mut all_pending = true;
1190 for (dir, future) in self
1191 .pending_trees
1192 .iter_mut()
1193 .take(self.max_concurrent_reads)
1194 {
1195 if let Poll::Ready(tree_diff) = future.as_mut().poll(cx) {
1196 all_pending = false;
1197 tree_diffs.push((dir.clone(), tree_diff));
1198 } else {
1199 some_pending = true;
1200 }
1201 }
1202
1203 for (dir, tree_diff) in tree_diffs {
1204 let _ = self.pending_trees.remove_entry(&dir).unwrap();
1205 match tree_diff {
1206 Ok((trees1, trees2)) => {
1207 self.add_dir_diff_items(&dir, &trees1, &trees2);
1208 }
1209 Err(err) => {
1210 self.items.insert(dir, Err(err));
1211 }
1212 }
1213 }
1214
1215 if all_pending || (some_pending && self.items.len() >= self.max_queued_items) {
1219 return;
1220 }
1221 }
1222 }
1223}
1224
1225impl Stream for TreeDiffStreamImpl<'_> {
1226 type Item = TreeDiffEntry;
1227
1228 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1229 self.poll_tree_futures(cx);
1231
1232 if let Some((path, _)) = self.items.first_key_value() {
1234 if let Some((dir, _)) = self.pending_trees.first_key_value() {
1237 if dir < path {
1238 return Poll::Pending;
1239 }
1240 }
1241
1242 let (path, values) = self.items.pop_first().unwrap();
1243 Poll::Ready(Some(TreeDiffEntry { path, values }))
1244 } else if self.pending_trees.is_empty() {
1245 Poll::Ready(None)
1246 } else {
1247 Poll::Pending
1248 }
1249 }
1250}
1251
1252fn stream_without_trees(stream: TreeDiffStream) -> TreeDiffStream {
1253 Box::pin(stream.map(|mut entry| {
1254 let skip_tree = |merge: MergedTreeValue| {
1255 if merge.is_tree() {
1256 Merge::absent()
1257 } else {
1258 merge
1259 }
1260 };
1261 entry.values = entry
1262 .values
1263 .map(|(before, after)| (skip_tree(before), skip_tree(after)));
1264 entry
1265 }))
1266}
1267
1268struct DiffStreamForFileSystem<'a> {
1271 inner: TreeDiffStream<'a>,
1272 next_item: Option<TreeDiffEntry>,
1273 held_file: Option<TreeDiffEntry>,
1274}
1275
1276impl<'a> DiffStreamForFileSystem<'a> {
1277 fn new(inner: TreeDiffStream<'a>) -> Self {
1278 Self {
1279 inner,
1280 next_item: None,
1281 held_file: None,
1282 }
1283 }
1284}
1285
1286impl Stream for DiffStreamForFileSystem<'_> {
1287 type Item = TreeDiffEntry;
1288
1289 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1290 while let Some(next) = match self.next_item.take() {
1291 Some(next) => Some(next),
1292 None => ready!(self.inner.as_mut().poll_next(cx)),
1293 } {
1294 if let Some(held_entry) = self
1298 .held_file
1299 .take_if(|held_entry| !next.path.starts_with(&held_entry.path))
1300 {
1301 self.next_item = Some(next);
1302 return Poll::Ready(Some(held_entry));
1303 }
1304
1305 match next.values {
1306 Ok((before, after)) if before.is_tree() => {
1307 assert!(after.is_present());
1308 assert!(self.held_file.is_none());
1309 self.held_file = Some(TreeDiffEntry {
1310 path: next.path,
1311 values: Ok((Merge::absent(), after)),
1312 });
1313 }
1314 Ok((before, after)) if after.is_tree() => {
1315 assert!(before.is_present());
1316 return Poll::Ready(Some(TreeDiffEntry {
1317 path: next.path,
1318 values: Ok((before, Merge::absent())),
1319 }));
1320 }
1321 _ => {
1322 return Poll::Ready(Some(next));
1323 }
1324 }
1325 }
1326 Poll::Ready(self.held_file.take())
1327 }
1328}
1329
1330pub struct MergedTreeBuilder {
1338 base_tree_id: MergedTreeId,
1339 overrides: BTreeMap<RepoPathBuf, MergedTreeValue>,
1340}
1341
1342impl MergedTreeBuilder {
1343 pub fn new(base_tree_id: MergedTreeId) -> Self {
1345 Self {
1346 base_tree_id,
1347 overrides: BTreeMap::new(),
1348 }
1349 }
1350
1351 pub fn set_or_remove(&mut self, path: RepoPathBuf, values: MergedTreeValue) {
1358 if let MergedTreeId::Merge(_) = &self.base_tree_id {
1359 assert!(
1360 !values
1361 .iter()
1362 .flatten()
1363 .any(|value| matches!(value, TreeValue::Conflict(_)))
1364 );
1365 }
1366 self.overrides.insert(path, values);
1367 }
1368
1369 pub fn write_tree(self, store: &Arc<Store>) -> BackendResult<MergedTreeId> {
1371 let base_tree_ids = match self.base_tree_id.clone() {
1372 MergedTreeId::Legacy(base_tree_id) => {
1373 let legacy_base_tree = store.get_tree(RepoPathBuf::root(), &base_tree_id)?;
1374 let base_tree = MergedTree::from_legacy_tree(legacy_base_tree)?;
1375 base_tree.id().to_merge()
1376 }
1377 MergedTreeId::Merge(base_tree_ids) => base_tree_ids,
1378 };
1379 let new_tree_ids = self.write_merged_trees(base_tree_ids, store)?;
1380 match new_tree_ids.simplify().into_resolved() {
1381 Ok(single_tree_id) => Ok(MergedTreeId::resolved(single_tree_id)),
1382 Err(tree_id) => {
1383 let tree = store.get_root_tree(&MergedTreeId::Merge(tree_id))?;
1384 let resolved = tree.resolve().block_on()?;
1385 Ok(resolved.id())
1386 }
1387 }
1388 }
1389
1390 fn write_merged_trees(
1391 self,
1392 mut base_tree_ids: Merge<TreeId>,
1393 store: &Arc<Store>,
1394 ) -> BackendResult<Merge<TreeId>> {
1395 let num_sides = self
1396 .overrides
1397 .values()
1398 .map(|value| value.num_sides())
1399 .max()
1400 .unwrap_or(0);
1401 base_tree_ids.pad_to(num_sides, store.empty_tree_id());
1402 let mut tree_builders =
1404 base_tree_ids.map(|base_tree_id| TreeBuilder::new(store.clone(), base_tree_id.clone()));
1405 for (path, values) in self.overrides {
1406 match values.into_resolved() {
1407 Ok(value) => {
1408 for builder in tree_builders.iter_mut() {
1411 builder.set_or_remove(path.clone(), value.clone());
1412 }
1413 }
1414 Err(mut values) => {
1415 values.pad_to(num_sides, &None);
1416 for (builder, value) in zip(tree_builders.iter_mut(), values) {
1419 builder.set_or_remove(path.clone(), value);
1420 }
1421 }
1422 }
1423 }
1424 let merge_builder: MergeBuilder<TreeId> = tree_builders
1428 .into_iter()
1429 .map(|builder| builder.write_tree())
1430 .try_collect()?;
1431 Ok(merge_builder.build())
1432 }
1433}