1use std::borrow::Borrow;
18use std::cmp::max;
19use std::cmp::Ordering;
20use std::collections::BTreeMap;
21use std::collections::VecDeque;
22use std::iter;
23use std::iter::zip;
24use std::pin::Pin;
25use std::sync::Arc;
26use std::task::Context;
27use std::task::Poll;
28use std::vec;
29
30use either::Either;
31use futures::future::BoxFuture;
32use futures::stream::BoxStream;
33use futures::Stream;
34use itertools::EitherOrBoth;
35use itertools::Itertools as _;
36use pollster::FutureExt as _;
37
38use crate::backend;
39use crate::backend::BackendResult;
40use crate::backend::MergedTreeId;
41use crate::backend::TreeId;
42use crate::backend::TreeValue;
43use crate::copies::CopiesTreeDiffEntry;
44use crate::copies::CopiesTreeDiffStream;
45use crate::copies::CopyRecords;
46use crate::matchers::EverythingMatcher;
47use crate::matchers::Matcher;
48use crate::merge::Merge;
49use crate::merge::MergeBuilder;
50use crate::merge::MergedTreeVal;
51use crate::merge::MergedTreeValue;
52use crate::repo_path::RepoPath;
53use crate::repo_path::RepoPathBuf;
54use crate::repo_path::RepoPathComponent;
55use crate::store::Store;
56use crate::tree::try_resolve_file_conflict;
57use crate::tree::Tree;
58use crate::tree_builder::TreeBuilder;
59
60#[derive(PartialEq, Eq, Clone, Debug)]
62pub struct MergedTree {
63 trees: Merge<Tree>,
64}
65
66impl MergedTree {
67 pub fn resolved(tree: Tree) -> Self {
69 MergedTree::new(Merge::resolved(tree))
70 }
71
72 pub fn new(trees: Merge<Tree>) -> Self {
75 debug_assert!(!trees.iter().any(|t| t.has_conflict()));
76 debug_assert!(trees.iter().map(|tree| tree.dir()).all_equal());
77 debug_assert!(trees
78 .iter()
79 .map(|tree| Arc::as_ptr(tree.store()))
80 .all_equal());
81 MergedTree { trees }
82 }
83
84 pub fn from_legacy_tree(tree: Tree) -> BackendResult<Self> {
88 let conflict_ids = tree.conflicts();
89 if conflict_ids.is_empty() {
90 return Ok(MergedTree::resolved(tree));
91 }
92
93 let mut max_tree_count = 1;
95 let store = tree.store();
96 let mut conflicts: Vec<(&RepoPath, MergedTreeValue)> = vec![];
97 for (path, conflict_id) in &conflict_ids {
98 let conflict = store.read_conflict(path, conflict_id)?;
99 max_tree_count = max(max_tree_count, conflict.iter().len());
100 conflicts.push((path, conflict));
101 }
102 let mut tree_builders = Vec::new();
103 tree_builders.resize_with(max_tree_count, || store.tree_builder(tree.id().clone()));
104 for (path, conflict) in conflicts {
105 let terms_padded = conflict.into_iter().chain(iter::repeat(None));
109 for (builder, term) in zip(&mut tree_builders, terms_padded) {
110 builder.set_or_remove(path.to_owned(), term);
111 }
112 }
113
114 let new_trees: Vec<_> = tree_builders
115 .into_iter()
116 .map(|builder| {
117 let tree_id = builder.write_tree()?;
118 store.get_tree(RepoPathBuf::root(), &tree_id)
119 })
120 .try_collect()?;
121 Ok(MergedTree {
122 trees: Merge::from_vec(new_trees),
123 })
124 }
125
126 pub fn as_merge(&self) -> &Merge<Tree> {
128 &self.trees
129 }
130
131 pub fn take(self) -> Merge<Tree> {
133 self.trees
134 }
135
136 pub fn dir(&self) -> &RepoPath {
138 self.trees.first().dir()
139 }
140
141 pub fn store(&self) -> &Arc<Store> {
143 self.trees.first().store()
144 }
145
146 pub fn names<'a>(&'a self) -> Box<dyn Iterator<Item = &'a RepoPathComponent> + 'a> {
148 Box::new(all_tree_basenames(&self.trees))
149 }
150
151 pub fn value(&self, basename: &RepoPathComponent) -> MergedTreeVal {
156 trees_value(&self.trees, basename)
157 }
158
159 pub fn resolve(&self) -> BackendResult<MergedTree> {
162 let merged = merge_trees(&self.trees).block_on()?;
163 let simplified = merged.simplify();
168 if cfg!(debug_assertions) {
172 let re_merged = merge_trees(&simplified).block_on().unwrap();
173 debug_assert_eq!(re_merged, simplified);
174 }
175 Ok(MergedTree { trees: simplified })
176 }
177
178 pub fn conflicts(
184 &self,
185 ) -> impl Iterator<Item = (RepoPathBuf, BackendResult<MergedTreeValue>)> + use<> {
186 ConflictIterator::new(self)
187 }
188
189 pub fn has_conflict(&self) -> bool {
191 !self.trees.is_resolved()
192 }
193
194 pub fn sub_tree(&self, name: &RepoPathComponent) -> BackendResult<Option<MergedTree>> {
198 match self.value(name).into_resolved() {
199 Ok(Some(TreeValue::Tree(sub_tree_id))) => {
200 let subdir = self.dir().join(name);
201 Ok(Some(MergedTree::resolved(
202 self.store().get_tree(subdir, sub_tree_id)?,
203 )))
204 }
205 Ok(_) => Ok(None),
206 Err(merge) => {
207 let trees = merge.try_map(|value| match value {
208 Some(TreeValue::Tree(sub_tree_id)) => {
209 let subdir = self.dir().join(name);
210 self.store().get_tree(subdir, sub_tree_id)
211 }
212 _ => {
213 let subdir = self.dir().join(name);
214 Ok(Tree::empty(self.store().clone(), subdir))
215 }
216 })?;
217 Ok(Some(MergedTree { trees }))
218 }
219 }
220 }
221
222 pub fn path_value(&self, path: &RepoPath) -> BackendResult<MergedTreeValue> {
226 assert_eq!(self.dir(), RepoPath::root());
227 match path.split() {
228 Some((dir, basename)) => match self.sub_tree_recursive(dir)? {
229 None => Ok(Merge::absent()),
230 Some(tree) => Ok(tree.value(basename).cloned()),
231 },
232 None => Ok(self
233 .trees
234 .map(|tree| Some(TreeValue::Tree(tree.id().clone())))),
235 }
236 }
237
238 pub fn id(&self) -> MergedTreeId {
240 MergedTreeId::Merge(self.trees.map(|tree| tree.id().clone()))
241 }
242
243 pub fn sub_tree_recursive(&self, path: &RepoPath) -> BackendResult<Option<MergedTree>> {
245 let mut current_tree = self.clone();
246 for name in path.components() {
247 match current_tree.sub_tree(name)? {
248 None => {
249 return Ok(None);
250 }
251 Some(sub_tree) => {
252 current_tree = sub_tree;
253 }
254 }
255 }
256 Ok(Some(current_tree))
257 }
258
259 pub fn entries(&self) -> TreeEntriesIterator<'static> {
269 self.entries_matching(&EverythingMatcher)
270 }
271
272 pub fn entries_matching<'matcher>(
274 &self,
275 matcher: &'matcher dyn Matcher,
276 ) -> TreeEntriesIterator<'matcher> {
277 TreeEntriesIterator::new(&self.trees, matcher)
278 }
279
280 pub fn diff_stream<'matcher>(
285 &self,
286 other: &MergedTree,
287 matcher: &'matcher dyn Matcher,
288 ) -> TreeDiffStream<'matcher> {
289 let concurrency = self.store().concurrency();
290 if concurrency <= 1 {
291 Box::pin(futures::stream::iter(TreeDiffIterator::new(
292 &self.trees,
293 &other.trees,
294 matcher,
295 )))
296 } else {
297 Box::pin(TreeDiffStreamImpl::new(
298 &self.trees,
299 &other.trees,
300 matcher,
301 concurrency,
302 ))
303 }
304 }
305
306 pub fn diff_stream_with_copies<'a>(
308 &self,
309 other: &MergedTree,
310 matcher: &'a dyn Matcher,
311 copy_records: &'a CopyRecords,
312 ) -> BoxStream<'a, CopiesTreeDiffEntry> {
313 let stream = self.diff_stream(other, matcher);
314 Box::pin(CopiesTreeDiffStream::new(
315 stream,
316 self.clone(),
317 other.clone(),
318 copy_records,
319 ))
320 }
321
322 pub fn merge(&self, base: &MergedTree, other: &MergedTree) -> BackendResult<MergedTree> {
325 self.merge_no_resolve(base, other).resolve()
326 }
327
328 pub fn merge_no_resolve(&self, base: &MergedTree, other: &MergedTree) -> MergedTree {
331 let nested = Merge::from_vec(vec![
332 self.trees.clone(),
333 base.trees.clone(),
334 other.trees.clone(),
335 ]);
336 MergedTree {
337 trees: nested.flatten().simplify(),
338 }
339 }
340}
341
342pub struct TreeDiffEntry {
344 pub path: RepoPathBuf,
346 pub values: BackendResult<(MergedTreeValue, MergedTreeValue)>,
348}
349
350pub type TreeDiffStream<'matcher> = BoxStream<'matcher, TreeDiffEntry>;
354
355fn all_tree_basenames(trees: &Merge<Tree>) -> impl Iterator<Item = &RepoPathComponent> {
356 trees
357 .iter()
358 .map(|tree| tree.data().names())
359 .kmerge()
360 .dedup()
361}
362
363fn all_tree_entries(
364 trees: &Merge<Tree>,
365) -> impl Iterator<Item = (&RepoPathComponent, MergedTreeVal<'_>)> {
366 if let Some(tree) = trees.as_resolved() {
367 let iter = tree
368 .entries_non_recursive()
369 .map(|entry| (entry.name(), Merge::normal(entry.value())));
370 Either::Left(iter)
371 } else {
372 let iter = all_merged_tree_entries(trees).map(|(name, values)| {
373 let values = match values.resolve_trivial() {
375 Some(resolved) => Merge::resolved(*resolved),
376 None => values,
377 };
378 (name, values)
379 });
380 Either::Right(iter)
381 }
382}
383
384fn all_merged_tree_entries(
388 trees: &Merge<Tree>,
389) -> impl Iterator<Item = (&RepoPathComponent, MergedTreeVal<'_>)> {
390 let mut entries_iters = trees
391 .iter()
392 .map(|tree| tree.entries_non_recursive().peekable())
393 .collect_vec();
394 iter::from_fn(move || {
395 let next_name = entries_iters
396 .iter_mut()
397 .filter_map(|iter| iter.peek())
398 .map(|entry| entry.name())
399 .min()?;
400 let values: MergeBuilder<_> = entries_iters
401 .iter_mut()
402 .map(|iter| {
403 let entry = iter.next_if(|entry| entry.name() == next_name)?;
404 Some(entry.value())
405 })
406 .collect();
407 Some((next_name, values.build()))
408 })
409}
410
411fn merged_tree_entry_diff<'a>(
412 trees1: &'a Merge<Tree>,
413 trees2: &'a Merge<Tree>,
414) -> impl Iterator<Item = (&'a RepoPathComponent, MergedTreeVal<'a>, MergedTreeVal<'a>)> {
415 itertools::merge_join_by(
416 all_tree_entries(trees1),
417 all_tree_entries(trees2),
418 |(name1, _), (name2, _)| name1.cmp(name2),
419 )
420 .map(|entry| match entry {
421 EitherOrBoth::Both((name, value1), (_, value2)) => (name, value1, value2),
422 EitherOrBoth::Left((name, value1)) => (name, value1, Merge::absent()),
423 EitherOrBoth::Right((name, value2)) => (name, Merge::absent(), value2),
424 })
425 .filter(|(_, value1, value2)| value1 != value2)
426}
427
428fn trees_value<'a>(trees: &'a Merge<Tree>, basename: &RepoPathComponent) -> MergedTreeVal<'a> {
429 if let Some(tree) = trees.as_resolved() {
430 return Merge::resolved(tree.value(basename));
431 }
432 let value = trees.map(|tree| tree.value(basename));
433 if let Some(resolved) = value.resolve_trivial() {
434 return Merge::resolved(*resolved);
435 }
436 value
437}
438
439async fn merge_trees(merge: &Merge<Tree>) -> BackendResult<Merge<Tree>> {
442 if let Some(tree) = merge.resolve_trivial() {
443 return Ok(Merge::resolved(tree.clone()));
444 }
445
446 let base_tree = merge.first();
447 let store = base_tree.store();
448 let dir = base_tree.dir();
449 let mut new_tree = backend::Tree::default();
453 let mut conflicts = vec![];
454 for (basename, path_merge) in all_merged_tree_entries(merge) {
456 let path = dir.join(basename);
457 let path_merge = merge_tree_values(store, &path, &path_merge).await?;
458 match path_merge.into_resolved() {
459 Ok(value) => {
460 new_tree.set_or_remove(basename, value);
461 }
462 Err(path_merge) => {
463 conflicts.push((basename, path_merge.into_iter()));
464 }
465 };
466 }
467 if conflicts.is_empty() {
468 let new_tree_id = store.write_tree(dir, new_tree).await?;
469 Ok(Merge::resolved(new_tree_id))
470 } else {
471 let tree_count = merge.iter().len();
475 let mut new_trees = Vec::with_capacity(tree_count);
476 for _ in 0..tree_count {
477 for (basename, path_conflict) in &mut conflicts {
478 new_tree.set_or_remove(basename, path_conflict.next().unwrap());
479 }
480 let tree = store.write_tree(dir, new_tree.clone()).await?;
481 new_trees.push(tree);
482 }
483 Ok(Merge::from_vec(new_trees))
484 }
485}
486
487async fn merge_tree_values(
492 store: &Arc<Store>,
493 path: &RepoPath,
494 values: &MergedTreeVal<'_>,
495) -> BackendResult<MergedTreeValue> {
496 if let Some(resolved) = values.resolve_trivial() {
497 return Ok(Merge::resolved(resolved.cloned()));
498 }
499
500 if let Some(trees) = values.to_tree_merge(store, path).await? {
501 let empty_tree_id = store.empty_tree_id();
504 let merged_tree = Box::pin(merge_trees(&trees)).await?;
505 Ok(merged_tree
506 .map(|tree| (tree.id() != empty_tree_id).then(|| TreeValue::Tree(tree.id().clone()))))
507 } else {
508 let maybe_resolved = try_resolve_file_values(store, path, values).await?;
509 Ok(maybe_resolved.unwrap_or_else(|| values.cloned()))
510 }
511}
512
513pub async fn resolve_file_values(
517 store: &Arc<Store>,
518 path: &RepoPath,
519 values: MergedTreeValue,
520) -> BackendResult<MergedTreeValue> {
521 if let Some(resolved) = values.resolve_trivial() {
522 return Ok(Merge::resolved(resolved.clone()));
523 }
524
525 let maybe_resolved = try_resolve_file_values(store, path, &values).await?;
526 Ok(maybe_resolved.unwrap_or(values))
527}
528
529async fn try_resolve_file_values<T: Borrow<TreeValue>>(
530 store: &Arc<Store>,
531 path: &RepoPath,
532 values: &Merge<Option<T>>,
533) -> BackendResult<Option<MergedTreeValue>> {
534 let simplified = values
537 .map(|value| value.as_ref().map(Borrow::borrow))
538 .simplify();
539 if let Some(resolved) = try_resolve_file_conflict(store, path, &simplified).await? {
542 Ok(Some(Merge::normal(resolved)))
543 } else {
544 Ok(None)
546 }
547}
548
549pub struct TreeEntriesIterator<'matcher> {
551 store: Arc<Store>,
552 stack: Vec<TreeEntriesDirItem>,
553 matcher: &'matcher dyn Matcher,
554}
555
556struct TreeEntriesDirItem {
557 entries: Vec<(RepoPathBuf, MergedTreeValue)>,
558}
559
560impl TreeEntriesDirItem {
561 fn new(trees: &Merge<Tree>, matcher: &dyn Matcher) -> Self {
562 let mut entries = vec![];
563 let dir = trees.first().dir();
564 for (name, value) in all_tree_entries(trees) {
565 let path = dir.join(name);
566 if value.is_tree() {
567 if matcher.visit(&path).is_nothing() {
569 continue;
570 }
571 } else if !matcher.matches(&path) {
572 continue;
573 }
574 entries.push((path, value.cloned()));
575 }
576 entries.reverse();
577 TreeEntriesDirItem { entries }
578 }
579}
580
581impl<'matcher> TreeEntriesIterator<'matcher> {
582 fn new(trees: &Merge<Tree>, matcher: &'matcher dyn Matcher) -> Self {
583 Self {
584 store: trees.first().store().clone(),
585 stack: vec![TreeEntriesDirItem::new(trees, matcher)],
586 matcher,
587 }
588 }
589}
590
591impl Iterator for TreeEntriesIterator<'_> {
592 type Item = (RepoPathBuf, BackendResult<MergedTreeValue>);
593
594 fn next(&mut self) -> Option<Self::Item> {
595 while let Some(top) = self.stack.last_mut() {
596 if let Some((path, value)) = top.entries.pop() {
597 let maybe_trees = match value.to_tree_merge(&self.store, &path).block_on() {
598 Ok(maybe_trees) => maybe_trees,
599 Err(err) => return Some((path, Err(err))),
600 };
601 if let Some(trees) = maybe_trees {
602 self.stack
603 .push(TreeEntriesDirItem::new(&trees, self.matcher));
604 } else {
605 return Some((path, Ok(value)));
606 }
607 } else {
608 self.stack.pop();
609 }
610 }
611 None
612 }
613}
614
615struct ConflictsDirItem {
618 entries: Vec<(RepoPathBuf, MergedTreeValue)>,
619}
620
621impl From<&Merge<Tree>> for ConflictsDirItem {
622 fn from(trees: &Merge<Tree>) -> Self {
623 let dir = trees.first().dir();
624 if trees.is_resolved() {
625 return ConflictsDirItem { entries: vec![] };
626 }
627
628 let mut entries = vec![];
629 for (basename, value) in all_tree_entries(trees) {
630 if !value.is_resolved() {
631 entries.push((dir.join(basename), value.cloned()));
632 }
633 }
634 entries.reverse();
635 ConflictsDirItem { entries }
636 }
637}
638
639struct ConflictIterator {
640 store: Arc<Store>,
641 stack: Vec<ConflictsDirItem>,
642}
643
644impl ConflictIterator {
645 fn new(tree: &MergedTree) -> Self {
646 ConflictIterator {
647 store: tree.store().clone(),
648 stack: vec![ConflictsDirItem::from(&tree.trees)],
649 }
650 }
651}
652
653impl Iterator for ConflictIterator {
654 type Item = (RepoPathBuf, BackendResult<MergedTreeValue>);
655
656 fn next(&mut self) -> Option<Self::Item> {
657 while let Some(top) = self.stack.last_mut() {
658 if let Some((path, tree_values)) = top.entries.pop() {
659 match tree_values.to_tree_merge(&self.store, &path).block_on() {
660 Ok(Some(trees)) => {
661 self.stack.push(ConflictsDirItem::from(&trees));
663 }
664 Ok(None) => {
665 return Some((path, Ok(tree_values)));
669 }
670 Err(err) => {
671 return Some((path, Err(err)));
672 }
673 }
674 } else {
675 self.stack.pop();
676 }
677 }
678 None
679 }
680}
681
682pub struct TreeDiffIterator<'matcher> {
684 store: Arc<Store>,
685 stack: Vec<TreeDiffItem>,
686 matcher: &'matcher dyn Matcher,
687}
688
689struct TreeDiffDirItem {
690 entries: Vec<(RepoPathBuf, MergedTreeValue, MergedTreeValue)>,
691}
692
693enum TreeDiffItem {
694 Dir(TreeDiffDirItem),
695 File(RepoPathBuf, MergedTreeValue, MergedTreeValue),
699}
700
701impl<'matcher> TreeDiffIterator<'matcher> {
702 pub fn new(trees1: &Merge<Tree>, trees2: &Merge<Tree>, matcher: &'matcher dyn Matcher) -> Self {
705 assert!(Arc::ptr_eq(trees1.first().store(), trees2.first().store()));
706 let root_dir = RepoPath::root();
707 let mut stack = Vec::new();
708 if !matcher.visit(root_dir).is_nothing() {
709 stack.push(TreeDiffItem::Dir(TreeDiffDirItem::from_trees(
710 root_dir, trees1, trees2, matcher,
711 )));
712 };
713 Self {
714 store: trees1.first().store().clone(),
715 stack,
716 matcher,
717 }
718 }
719
720 fn trees(
722 store: &Arc<Store>,
723 dir: &RepoPath,
724 values: &MergedTreeValue,
725 ) -> BackendResult<Merge<Tree>> {
726 if let Some(trees) = values.to_tree_merge(store, dir).block_on()? {
727 Ok(trees)
728 } else {
729 Ok(Merge::resolved(Tree::empty(store.clone(), dir.to_owned())))
730 }
731 }
732}
733
734impl TreeDiffDirItem {
735 fn from_trees(
736 dir: &RepoPath,
737 trees1: &Merge<Tree>,
738 trees2: &Merge<Tree>,
739 matcher: &dyn Matcher,
740 ) -> Self {
741 let mut entries = vec![];
742 for (name, before, after) in merged_tree_entry_diff(trees1, trees2) {
743 let path = dir.join(name);
744 let tree_before = before.is_tree();
745 let tree_after = after.is_tree();
746 let tree_matches = (tree_before || tree_after) && !matcher.visit(&path).is_nothing();
749 let file_matches = (!tree_before || !tree_after) && matcher.matches(&path);
750
751 let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
753 before
754 } else {
755 Merge::absent()
756 };
757 let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
758 after
759 } else {
760 Merge::absent()
761 };
762 if before.is_absent() && after.is_absent() {
763 continue;
764 }
765 entries.push((path, before.cloned(), after.cloned()));
766 }
767 entries.reverse();
768 Self { entries }
769 }
770}
771
772impl Iterator for TreeDiffIterator<'_> {
773 type Item = TreeDiffEntry;
774
775 fn next(&mut self) -> Option<Self::Item> {
776 while let Some(top) = self.stack.last_mut() {
777 let (path, before, after) = match top {
778 TreeDiffItem::Dir(dir) => match dir.entries.pop() {
779 Some(entry) => entry,
780 None => {
781 self.stack.pop().unwrap();
782 continue;
783 }
784 },
785 TreeDiffItem::File(..) => {
786 if let TreeDiffItem::File(path, before, after) = self.stack.pop().unwrap() {
787 return Some(TreeDiffEntry {
788 path,
789 values: Ok((before, after)),
790 });
791 } else {
792 unreachable!();
793 }
794 }
795 };
796
797 let tree_before = before.is_tree();
798 let tree_after = after.is_tree();
799 let post_subdir = if tree_before || tree_after {
800 let (before_tree, after_tree) = match (
801 Self::trees(&self.store, &path, &before),
802 Self::trees(&self.store, &path, &after),
803 ) {
804 (Ok(before_tree), Ok(after_tree)) => (before_tree, after_tree),
805 (Err(before_err), _) => {
806 return Some(TreeDiffEntry {
807 path,
808 values: Err(before_err),
809 });
810 }
811 (_, Err(after_err)) => {
812 return Some(TreeDiffEntry {
813 path,
814 values: Err(after_err),
815 });
816 }
817 };
818 let subdir =
819 TreeDiffDirItem::from_trees(&path, &before_tree, &after_tree, self.matcher);
820 self.stack.push(TreeDiffItem::Dir(subdir));
821 self.stack.len() - 1
822 } else {
823 self.stack.len()
824 };
825 if !tree_before && tree_after {
826 if before.is_present() {
827 return Some(TreeDiffEntry {
828 path,
829 values: Ok((before, Merge::absent())),
830 });
831 }
832 } else if tree_before && !tree_after {
833 if after.is_present() {
834 self.stack.insert(
835 post_subdir,
836 TreeDiffItem::File(path, Merge::absent(), after),
837 );
838 }
839 } else if !tree_before && !tree_after {
840 return Some(TreeDiffEntry {
841 path,
842 values: Ok((before, after)),
843 });
844 }
845 }
846 None
847 }
848}
849
850pub struct TreeDiffStreamImpl<'matcher> {
852 store: Arc<Store>,
853 matcher: &'matcher dyn Matcher,
854 items: BTreeMap<DiffStreamKey, BackendResult<(MergedTreeValue, MergedTreeValue)>>,
858 #[expect(clippy::type_complexity)]
860 pending_trees: VecDeque<(
861 RepoPathBuf,
862 BoxFuture<'matcher, BackendResult<(Merge<Tree>, Merge<Tree>)>>,
863 )>,
864 max_concurrent_reads: usize,
869 max_queued_items: usize,
875}
876
877#[derive(PartialEq, Eq, Clone, Debug)]
880struct DiffStreamKey {
881 path: RepoPathBuf,
882 file_after_dir: bool,
883}
884
885impl DiffStreamKey {
886 fn normal(path: RepoPathBuf) -> Self {
887 DiffStreamKey {
888 path,
889 file_after_dir: false,
890 }
891 }
892
893 fn file_after_dir(path: RepoPathBuf) -> Self {
894 DiffStreamKey {
895 path,
896 file_after_dir: true,
897 }
898 }
899}
900
901impl PartialOrd for DiffStreamKey {
902 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
903 Some(self.cmp(other))
904 }
905}
906
907impl Ord for DiffStreamKey {
908 fn cmp(&self, other: &Self) -> Ordering {
909 if self == other {
910 Ordering::Equal
911 } else if self.file_after_dir && other.path.starts_with(&self.path) {
912 Ordering::Greater
913 } else if other.file_after_dir && self.path.starts_with(&other.path) {
914 Ordering::Less
915 } else {
916 self.path.cmp(&other.path)
917 }
918 }
919}
920
921impl<'matcher> TreeDiffStreamImpl<'matcher> {
922 pub fn new(
925 trees1: &Merge<Tree>,
926 trees2: &Merge<Tree>,
927 matcher: &'matcher dyn Matcher,
928 max_concurrent_reads: usize,
929 ) -> Self {
930 assert!(Arc::ptr_eq(trees1.first().store(), trees2.first().store()));
931 let mut stream = Self {
932 store: trees1.first().store().clone(),
933 matcher,
934 items: BTreeMap::new(),
935 pending_trees: VecDeque::new(),
936 max_concurrent_reads,
937 max_queued_items: 10000,
938 };
939 stream.add_dir_diff_items(RepoPath::root(), trees1, trees2);
940 stream
941 }
942
943 async fn single_tree(
944 store: &Arc<Store>,
945 dir: RepoPathBuf,
946 value: Option<&TreeValue>,
947 ) -> BackendResult<Tree> {
948 match value {
949 Some(TreeValue::Tree(tree_id)) => store.get_tree_async(dir, tree_id).await,
950 _ => Ok(Tree::empty(store.clone(), dir.clone())),
951 }
952 }
953
954 async fn trees(
956 store: Arc<Store>,
957 dir: RepoPathBuf,
958 values: MergedTreeValue,
959 ) -> BackendResult<Merge<Tree>> {
960 if values.is_tree() {
961 values
962 .try_map_async(|value| Self::single_tree(&store, dir.clone(), value.as_ref()))
963 .await
964 } else {
965 Ok(Merge::resolved(Tree::empty(store, dir)))
966 }
967 }
968
969 fn add_dir_diff_items(&mut self, dir: &RepoPath, trees1: &Merge<Tree>, trees2: &Merge<Tree>) {
970 for (basename, before, after) in merged_tree_entry_diff(trees1, trees2) {
971 let path = dir.join(basename);
972 let tree_before = before.is_tree();
973 let tree_after = after.is_tree();
974 let tree_matches =
977 (tree_before || tree_after) && !self.matcher.visit(&path).is_nothing();
978 let file_matches = (!tree_before || !tree_after) && self.matcher.matches(&path);
979
980 let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
982 before
983 } else {
984 Merge::absent()
985 };
986 let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
987 after
988 } else {
989 Merge::absent()
990 };
991 if before.is_absent() && after.is_absent() {
992 continue;
993 }
994
995 if tree_matches {
997 let before_tree_future =
998 Self::trees(self.store.clone(), path.clone(), before.cloned());
999 let after_tree_future =
1000 Self::trees(self.store.clone(), path.clone(), after.cloned());
1001 let both_trees_future =
1002 async { futures::try_join!(before_tree_future, after_tree_future) };
1003 self.pending_trees
1004 .push_back((path.clone(), Box::pin(both_trees_future)));
1005 }
1006
1007 self.items.insert(
1008 DiffStreamKey::normal(path),
1009 Ok((before.cloned(), after.cloned())),
1010 );
1011 }
1012 }
1013
1014 fn poll_tree_futures(&mut self, cx: &mut Context<'_>) {
1015 let mut pending_index = 0;
1016 while pending_index < self.pending_trees.len()
1017 && (pending_index < self.max_concurrent_reads
1018 || self.items.len() < self.max_queued_items)
1019 {
1020 let (_, future) = &mut self.pending_trees[pending_index];
1021 if let Poll::Ready(tree_diff) = future.as_mut().poll(cx) {
1022 let (dir, _) = self.pending_trees.remove(pending_index).unwrap();
1023 let key = DiffStreamKey::normal(dir);
1024 let (before, after) = self.items.remove(&key).unwrap().unwrap();
1027 if before.is_present() && !before.is_tree() {
1030 self.items
1031 .insert(key.clone(), Ok((before, Merge::absent())));
1032 } else if after.is_present() && !after.is_tree() {
1033 self.items.insert(
1034 DiffStreamKey::file_after_dir(key.path.clone()),
1035 Ok((Merge::absent(), after)),
1036 );
1037 }
1038 match tree_diff {
1039 Ok((trees1, trees2)) => {
1040 self.add_dir_diff_items(&key.path, &trees1, &trees2);
1041 }
1042 Err(err) => {
1043 self.items.insert(DiffStreamKey::normal(key.path), Err(err));
1044 }
1045 }
1046 } else {
1047 pending_index += 1;
1048 }
1049 }
1050 }
1051}
1052
1053impl Stream for TreeDiffStreamImpl<'_> {
1054 type Item = TreeDiffEntry;
1055
1056 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1057 self.poll_tree_futures(cx);
1059
1060 if let Some(entry) = self.items.first_entry() {
1062 match entry.get() {
1063 Err(_) => {
1064 let (key, result) = entry.remove_entry();
1066 Poll::Ready(Some(match result {
1067 Err(err) => TreeDiffEntry {
1068 path: key.path,
1069 values: Err(err),
1070 },
1071 Ok((before, after)) => TreeDiffEntry {
1072 path: key.path,
1073 values: Ok((before, after)),
1074 },
1075 }))
1076 }
1077 Ok((before, after)) if !before.is_tree() && !after.is_tree() => {
1078 let (key, result) = entry.remove_entry();
1080 Poll::Ready(Some(match result {
1081 Err(err) => TreeDiffEntry {
1082 path: key.path,
1083 values: Err(err),
1084 },
1085 Ok((before, after)) => TreeDiffEntry {
1086 path: key.path,
1087 values: Ok((before, after)),
1088 },
1089 }))
1090 }
1091 _ => {
1092 assert!(!self.pending_trees.is_empty());
1095 Poll::Pending
1096 }
1097 }
1098 } else {
1099 Poll::Ready(None)
1100 }
1101 }
1102}
1103
1104pub struct MergedTreeBuilder {
1112 base_tree_id: MergedTreeId,
1113 overrides: BTreeMap<RepoPathBuf, MergedTreeValue>,
1114}
1115
1116impl MergedTreeBuilder {
1117 pub fn new(base_tree_id: MergedTreeId) -> Self {
1119 MergedTreeBuilder {
1120 base_tree_id,
1121 overrides: BTreeMap::new(),
1122 }
1123 }
1124
1125 pub fn set_or_remove(&mut self, path: RepoPathBuf, values: MergedTreeValue) {
1132 if let MergedTreeId::Merge(_) = &self.base_tree_id {
1133 assert!(!values
1134 .iter()
1135 .flatten()
1136 .any(|value| matches!(value, TreeValue::Conflict(_))));
1137 }
1138 self.overrides.insert(path, values);
1139 }
1140
1141 pub fn write_tree(self, store: &Arc<Store>) -> BackendResult<MergedTreeId> {
1143 let base_tree_ids = match self.base_tree_id.clone() {
1144 MergedTreeId::Legacy(base_tree_id) => {
1145 let legacy_base_tree = store.get_tree(RepoPathBuf::root(), &base_tree_id)?;
1146 let base_tree = MergedTree::from_legacy_tree(legacy_base_tree)?;
1147 base_tree.id().to_merge()
1148 }
1149 MergedTreeId::Merge(base_tree_ids) => base_tree_ids,
1150 };
1151 let new_tree_ids = self.write_merged_trees(base_tree_ids, store)?;
1152 match new_tree_ids.simplify().into_resolved() {
1153 Ok(single_tree_id) => Ok(MergedTreeId::resolved(single_tree_id)),
1154 Err(tree_id) => {
1155 let tree = store.get_root_tree(&MergedTreeId::Merge(tree_id))?;
1156 let resolved = tree.resolve()?;
1157 Ok(resolved.id())
1158 }
1159 }
1160 }
1161
1162 fn write_merged_trees(
1163 self,
1164 mut base_tree_ids: Merge<TreeId>,
1165 store: &Arc<Store>,
1166 ) -> BackendResult<Merge<TreeId>> {
1167 let num_sides = self
1168 .overrides
1169 .values()
1170 .map(|value| value.num_sides())
1171 .max()
1172 .unwrap_or(0);
1173 base_tree_ids.pad_to(num_sides, store.empty_tree_id());
1174 let mut tree_builders =
1176 base_tree_ids.map(|base_tree_id| TreeBuilder::new(store.clone(), base_tree_id.clone()));
1177 for (path, values) in self.overrides {
1178 match values.into_resolved() {
1179 Ok(value) => {
1180 for builder in tree_builders.iter_mut() {
1183 builder.set_or_remove(path.clone(), value.clone());
1184 }
1185 }
1186 Err(mut values) => {
1187 values.pad_to(num_sides, &None);
1188 for (builder, value) in zip(tree_builders.iter_mut(), values) {
1191 builder.set_or_remove(path.clone(), value);
1192 }
1193 }
1194 }
1195 }
1196 let merge_builder: MergeBuilder<TreeId> = tree_builders
1200 .into_iter()
1201 .map(|builder| builder.write_tree())
1202 .try_collect()?;
1203 Ok(merge_builder.build())
1204 }
1205}