1use std::borrow::Borrow;
18use std::cmp::max;
19use std::cmp::Ordering;
20use std::collections::BTreeMap;
21use std::collections::VecDeque;
22use std::iter;
23use std::iter::zip;
24use std::pin::Pin;
25use std::sync::Arc;
26use std::task::Context;
27use std::task::Poll;
28use std::vec;
29
30use either::Either;
31use futures::future::BoxFuture;
32use futures::stream::BoxStream;
33use futures::stream::StreamExt as _;
34use futures::Stream;
35use futures::TryStreamExt as _;
36use itertools::EitherOrBoth;
37use itertools::Itertools as _;
38use pollster::FutureExt as _;
39
40use crate::backend;
41use crate::backend::BackendResult;
42use crate::backend::MergedTreeId;
43use crate::backend::TreeId;
44use crate::backend::TreeValue;
45use crate::copies::CopiesTreeDiffEntry;
46use crate::copies::CopiesTreeDiffStream;
47use crate::copies::CopyRecords;
48use crate::matchers::EverythingMatcher;
49use crate::matchers::Matcher;
50use crate::merge::Merge;
51use crate::merge::MergeBuilder;
52use crate::merge::MergedTreeVal;
53use crate::merge::MergedTreeValue;
54use crate::repo_path::RepoPath;
55use crate::repo_path::RepoPathBuf;
56use crate::repo_path::RepoPathComponent;
57use crate::store::Store;
58use crate::tree::try_resolve_file_conflict;
59use crate::tree::Tree;
60use crate::tree_builder::TreeBuilder;
61
62#[derive(PartialEq, Eq, Clone, Debug)]
64pub struct MergedTree {
65 trees: Merge<Tree>,
66}
67
68impl MergedTree {
69 pub fn resolved(tree: Tree) -> Self {
71 MergedTree::new(Merge::resolved(tree))
72 }
73
74 pub fn new(trees: Merge<Tree>) -> Self {
77 debug_assert!(!trees.iter().any(|t| t.has_conflict()));
78 debug_assert!(trees.iter().map(|tree| tree.dir()).all_equal());
79 debug_assert!(trees
80 .iter()
81 .map(|tree| Arc::as_ptr(tree.store()))
82 .all_equal());
83 MergedTree { trees }
84 }
85
86 pub fn from_legacy_tree(tree: Tree) -> BackendResult<Self> {
90 let conflict_ids = tree.conflicts();
91 if conflict_ids.is_empty() {
92 return Ok(MergedTree::resolved(tree));
93 }
94
95 let mut max_tree_count = 1;
97 let store = tree.store();
98 let mut conflicts: Vec<(&RepoPath, MergedTreeValue)> = vec![];
99 for (path, conflict_id) in &conflict_ids {
100 let conflict = store.read_conflict(path, conflict_id)?;
101 max_tree_count = max(max_tree_count, conflict.iter().len());
102 conflicts.push((path, conflict));
103 }
104 let mut tree_builders = Vec::new();
105 tree_builders.resize_with(max_tree_count, || store.tree_builder(tree.id().clone()));
106 for (path, conflict) in conflicts {
107 let terms_padded = conflict.into_iter().chain(iter::repeat(None));
111 for (builder, term) in zip(&mut tree_builders, terms_padded) {
112 builder.set_or_remove(path.to_owned(), term);
113 }
114 }
115
116 let new_trees: Vec<_> = tree_builders
117 .into_iter()
118 .map(|builder| {
119 let tree_id = builder.write_tree()?;
120 store.get_tree(RepoPathBuf::root(), &tree_id)
121 })
122 .try_collect()?;
123 Ok(MergedTree {
124 trees: Merge::from_vec(new_trees),
125 })
126 }
127
128 pub fn as_merge(&self) -> &Merge<Tree> {
130 &self.trees
131 }
132
133 pub fn take(self) -> Merge<Tree> {
135 self.trees
136 }
137
138 pub fn dir(&self) -> &RepoPath {
140 self.trees.first().dir()
141 }
142
143 pub fn store(&self) -> &Arc<Store> {
145 self.trees.first().store()
146 }
147
148 pub fn names<'a>(&'a self) -> Box<dyn Iterator<Item = &'a RepoPathComponent> + 'a> {
150 Box::new(all_tree_basenames(&self.trees))
151 }
152
153 pub fn value(&self, basename: &RepoPathComponent) -> MergedTreeVal {
158 trees_value(&self.trees, basename)
159 }
160
161 pub fn resolve(&self) -> BackendResult<MergedTree> {
164 let merged = merge_trees(&self.trees)?;
165 let simplified = merged.simplify();
170 if cfg!(debug_assertions) {
174 let re_merged = merge_trees(&simplified).unwrap();
175 debug_assert_eq!(re_merged, simplified);
176 }
177 Ok(MergedTree { trees: simplified })
178 }
179
180 pub fn conflicts(
186 &self,
187 ) -> impl Iterator<Item = (RepoPathBuf, BackendResult<MergedTreeValue>)> + use<> {
188 ConflictIterator::new(self)
189 }
190
191 pub fn has_conflict(&self) -> bool {
193 !self.trees.is_resolved()
194 }
195
196 pub fn sub_tree(&self, name: &RepoPathComponent) -> BackendResult<Option<MergedTree>> {
200 match self.value(name).into_resolved() {
201 Ok(Some(TreeValue::Tree(sub_tree_id))) => {
202 let subdir = self.dir().join(name);
203 Ok(Some(MergedTree::resolved(
204 self.store().get_tree(subdir, sub_tree_id)?,
205 )))
206 }
207 Ok(_) => Ok(None),
208 Err(merge) => {
209 let trees = merge.try_map(|value| match value {
210 Some(TreeValue::Tree(sub_tree_id)) => {
211 let subdir = self.dir().join(name);
212 self.store().get_tree(subdir, sub_tree_id)
213 }
214 _ => {
215 let subdir = self.dir().join(name);
216 Ok(Tree::empty(self.store().clone(), subdir))
217 }
218 })?;
219 Ok(Some(MergedTree { trees }))
220 }
221 }
222 }
223
224 pub fn path_value(&self, path: &RepoPath) -> BackendResult<MergedTreeValue> {
228 assert_eq!(self.dir(), RepoPath::root());
229 match path.split() {
230 Some((dir, basename)) => match self.sub_tree_recursive(dir)? {
231 None => Ok(Merge::absent()),
232 Some(tree) => Ok(tree.value(basename).cloned()),
233 },
234 None => Ok(self
235 .trees
236 .map(|tree| Some(TreeValue::Tree(tree.id().clone())))),
237 }
238 }
239
240 pub fn id(&self) -> MergedTreeId {
242 MergedTreeId::Merge(self.trees.map(|tree| tree.id().clone()))
243 }
244
245 pub fn sub_tree_recursive(&self, path: &RepoPath) -> BackendResult<Option<MergedTree>> {
247 let mut current_tree = self.clone();
248 for name in path.components() {
249 match current_tree.sub_tree(name)? {
250 None => {
251 return Ok(None);
252 }
253 Some(sub_tree) => {
254 current_tree = sub_tree;
255 }
256 }
257 }
258 Ok(Some(current_tree))
259 }
260
261 pub fn entries(&self) -> TreeEntriesIterator<'static> {
271 self.entries_matching(&EverythingMatcher)
272 }
273
274 pub fn entries_matching<'matcher>(
276 &self,
277 matcher: &'matcher dyn Matcher,
278 ) -> TreeEntriesIterator<'matcher> {
279 TreeEntriesIterator::new(&self.trees, matcher)
280 }
281
282 pub fn diff_stream<'matcher>(
287 &self,
288 other: &MergedTree,
289 matcher: &'matcher dyn Matcher,
290 ) -> TreeDiffStream<'matcher> {
291 let concurrency = self.store().concurrency();
292 if concurrency <= 1 {
293 Box::pin(futures::stream::iter(TreeDiffIterator::new(
294 &self.trees,
295 &other.trees,
296 matcher,
297 )))
298 } else {
299 Box::pin(TreeDiffStreamImpl::new(
300 &self.trees,
301 &other.trees,
302 matcher,
303 concurrency,
304 ))
305 }
306 }
307
308 pub fn diff_stream_with_copies<'a>(
310 &self,
311 other: &MergedTree,
312 matcher: &'a dyn Matcher,
313 copy_records: &'a CopyRecords,
314 ) -> BoxStream<'a, CopiesTreeDiffEntry> {
315 let stream = self.diff_stream(other, matcher);
316 Box::pin(CopiesTreeDiffStream::new(
317 stream,
318 self.clone(),
319 other.clone(),
320 copy_records,
321 ))
322 }
323
324 pub fn merge(&self, base: &MergedTree, other: &MergedTree) -> BackendResult<MergedTree> {
327 self.merge_no_resolve(base, other).resolve()
328 }
329
330 pub fn merge_no_resolve(&self, base: &MergedTree, other: &MergedTree) -> MergedTree {
333 let nested = Merge::from_vec(vec![
334 self.trees.clone(),
335 base.trees.clone(),
336 other.trees.clone(),
337 ]);
338 MergedTree {
339 trees: nested.flatten().simplify(),
340 }
341 }
342}
343
344pub struct TreeDiffEntry {
346 pub path: RepoPathBuf,
348 pub values: BackendResult<(MergedTreeValue, MergedTreeValue)>,
350}
351
352pub type TreeDiffStream<'matcher> = BoxStream<'matcher, TreeDiffEntry>;
356
357fn all_tree_basenames(trees: &Merge<Tree>) -> impl Iterator<Item = &RepoPathComponent> {
358 trees
359 .iter()
360 .map(|tree| tree.data().names())
361 .kmerge()
362 .dedup()
363}
364
365fn all_tree_entries(
366 trees: &Merge<Tree>,
367) -> impl Iterator<Item = (&RepoPathComponent, MergedTreeVal<'_>)> {
368 if let Some(tree) = trees.as_resolved() {
369 let iter = tree
370 .entries_non_recursive()
371 .map(|entry| (entry.name(), Merge::normal(entry.value())));
372 Either::Left(iter)
373 } else {
374 let iter = all_merged_tree_entries(trees).map(|(name, values)| {
375 let values = match values.resolve_trivial() {
377 Some(resolved) => Merge::resolved(*resolved),
378 None => values,
379 };
380 (name, values)
381 });
382 Either::Right(iter)
383 }
384}
385
386fn all_merged_tree_entries(
390 trees: &Merge<Tree>,
391) -> impl Iterator<Item = (&RepoPathComponent, MergedTreeVal<'_>)> {
392 let mut entries_iters = trees
393 .iter()
394 .map(|tree| tree.entries_non_recursive().peekable())
395 .collect_vec();
396 iter::from_fn(move || {
397 let next_name = entries_iters
398 .iter_mut()
399 .filter_map(|iter| iter.peek())
400 .map(|entry| entry.name())
401 .min()?;
402 let values: MergeBuilder<_> = entries_iters
403 .iter_mut()
404 .map(|iter| {
405 let entry = iter.next_if(|entry| entry.name() == next_name)?;
406 Some(entry.value())
407 })
408 .collect();
409 Some((next_name, values.build()))
410 })
411}
412
413fn merged_tree_entry_diff<'a>(
414 trees1: &'a Merge<Tree>,
415 trees2: &'a Merge<Tree>,
416) -> impl Iterator<Item = (&'a RepoPathComponent, MergedTreeVal<'a>, MergedTreeVal<'a>)> {
417 itertools::merge_join_by(
418 all_tree_entries(trees1),
419 all_tree_entries(trees2),
420 |(name1, _), (name2, _)| name1.cmp(name2),
421 )
422 .map(|entry| match entry {
423 EitherOrBoth::Both((name, value1), (_, value2)) => (name, value1, value2),
424 EitherOrBoth::Left((name, value1)) => (name, value1, Merge::absent()),
425 EitherOrBoth::Right((name, value2)) => (name, Merge::absent(), value2),
426 })
427 .filter(|(_, value1, value2)| value1 != value2)
428}
429
430fn trees_value<'a>(trees: &'a Merge<Tree>, basename: &RepoPathComponent) -> MergedTreeVal<'a> {
431 if let Some(tree) = trees.as_resolved() {
432 return Merge::resolved(tree.value(basename));
433 }
434 let value = trees.map(|tree| tree.value(basename));
435 if let Some(resolved) = value.resolve_trivial() {
436 return Merge::resolved(*resolved);
437 }
438 value
439}
440
441fn merge_trees(merge: &Merge<Tree>) -> BackendResult<Merge<Tree>> {
444 if let Some(tree) = merge.resolve_trivial() {
445 return Ok(Merge::resolved(tree.clone()));
446 }
447
448 let base_tree = merge.first();
449 let store = base_tree.store();
450 let dir = base_tree.dir();
451 let mut new_tree = backend::Tree::default();
455 let mut conflicts = vec![];
456 for (basename, path_merge) in all_merged_tree_entries(merge) {
458 let path = dir.join(basename);
459 let path_merge = merge_tree_values(store, &path, &path_merge).block_on()?;
460 match path_merge.into_resolved() {
461 Ok(value) => {
462 new_tree.set_or_remove(basename, value);
463 }
464 Err(path_merge) => {
465 conflicts.push((basename, path_merge.into_iter()));
466 }
467 };
468 }
469 if conflicts.is_empty() {
470 let new_tree_id = store.write_tree(dir, new_tree).block_on()?;
471 Ok(Merge::resolved(new_tree_id))
472 } else {
473 let tree_count = merge.iter().len();
477 let mut new_trees = Vec::with_capacity(tree_count);
478 for _ in 0..tree_count {
479 for (basename, path_conflict) in &mut conflicts {
480 new_tree.set_or_remove(basename, path_conflict.next().unwrap());
481 }
482 let tree = store.write_tree(dir, new_tree.clone()).block_on()?;
483 new_trees.push(tree);
484 }
485 Ok(Merge::from_vec(new_trees))
486 }
487}
488
489async fn merge_tree_values(
494 store: &Arc<Store>,
495 path: &RepoPath,
496 values: &MergedTreeVal<'_>,
497) -> BackendResult<MergedTreeValue> {
498 if let Some(resolved) = values.resolve_trivial() {
499 return Ok(Merge::resolved(resolved.cloned()));
500 }
501
502 if let Some(trees) = values.to_tree_merge(store, path)? {
503 let empty_tree_id = store.empty_tree_id();
506 let merged_tree = merge_trees(&trees)?;
507 Ok(merged_tree
508 .map(|tree| (tree.id() != empty_tree_id).then(|| TreeValue::Tree(tree.id().clone()))))
509 } else {
510 let maybe_resolved = try_resolve_file_values(store, path, values).await?;
511 Ok(maybe_resolved.unwrap_or_else(|| values.cloned()))
512 }
513}
514
515pub async fn resolve_file_values(
519 store: &Arc<Store>,
520 path: &RepoPath,
521 values: MergedTreeValue,
522) -> BackendResult<MergedTreeValue> {
523 if let Some(resolved) = values.resolve_trivial() {
524 return Ok(Merge::resolved(resolved.clone()));
525 }
526
527 let maybe_resolved = try_resolve_file_values(store, path, &values).await?;
528 Ok(maybe_resolved.unwrap_or(values))
529}
530
531async fn try_resolve_file_values<T: Borrow<TreeValue>>(
532 store: &Arc<Store>,
533 path: &RepoPath,
534 values: &Merge<Option<T>>,
535) -> BackendResult<Option<MergedTreeValue>> {
536 let simplified = values
539 .map(|value| value.as_ref().map(Borrow::borrow))
540 .simplify();
541 if let Some(resolved) = try_resolve_file_conflict(store, path, &simplified).await? {
544 Ok(Some(Merge::normal(resolved)))
545 } else {
546 Ok(None)
548 }
549}
550
551pub struct TreeEntriesIterator<'matcher> {
553 store: Arc<Store>,
554 stack: Vec<TreeEntriesDirItem>,
555 matcher: &'matcher dyn Matcher,
556}
557
558struct TreeEntriesDirItem {
559 entries: Vec<(RepoPathBuf, MergedTreeValue)>,
560}
561
562impl TreeEntriesDirItem {
563 fn new(trees: &Merge<Tree>, matcher: &dyn Matcher) -> Self {
564 let mut entries = vec![];
565 let dir = trees.first().dir();
566 for (name, value) in all_tree_entries(trees) {
567 let path = dir.join(name);
568 if value.is_tree() {
569 if matcher.visit(&path).is_nothing() {
571 continue;
572 }
573 } else if !matcher.matches(&path) {
574 continue;
575 }
576 entries.push((path, value.cloned()));
577 }
578 entries.reverse();
579 TreeEntriesDirItem { entries }
580 }
581}
582
583impl<'matcher> TreeEntriesIterator<'matcher> {
584 fn new(trees: &Merge<Tree>, matcher: &'matcher dyn Matcher) -> Self {
585 Self {
586 store: trees.first().store().clone(),
587 stack: vec![TreeEntriesDirItem::new(trees, matcher)],
588 matcher,
589 }
590 }
591}
592
593impl Iterator for TreeEntriesIterator<'_> {
594 type Item = (RepoPathBuf, BackendResult<MergedTreeValue>);
595
596 fn next(&mut self) -> Option<Self::Item> {
597 while let Some(top) = self.stack.last_mut() {
598 if let Some((path, value)) = top.entries.pop() {
599 let maybe_trees = match value.to_tree_merge(&self.store, &path) {
600 Ok(maybe_trees) => maybe_trees,
601 Err(err) => return Some((path, Err(err))),
602 };
603 if let Some(trees) = maybe_trees {
604 self.stack
605 .push(TreeEntriesDirItem::new(&trees, self.matcher));
606 } else {
607 return Some((path, Ok(value)));
608 }
609 } else {
610 self.stack.pop();
611 }
612 }
613 None
614 }
615}
616
617struct ConflictsDirItem {
620 entries: Vec<(RepoPathBuf, MergedTreeValue)>,
621}
622
623impl From<&Merge<Tree>> for ConflictsDirItem {
624 fn from(trees: &Merge<Tree>) -> Self {
625 let dir = trees.first().dir();
626 if trees.is_resolved() {
627 return ConflictsDirItem { entries: vec![] };
628 }
629
630 let mut entries = vec![];
631 for (basename, value) in all_tree_entries(trees) {
632 if !value.is_resolved() {
633 entries.push((dir.join(basename), value.cloned()));
634 }
635 }
636 entries.reverse();
637 ConflictsDirItem { entries }
638 }
639}
640
641struct ConflictIterator {
642 store: Arc<Store>,
643 stack: Vec<ConflictsDirItem>,
644}
645
646impl ConflictIterator {
647 fn new(tree: &MergedTree) -> Self {
648 ConflictIterator {
649 store: tree.store().clone(),
650 stack: vec![ConflictsDirItem::from(&tree.trees)],
651 }
652 }
653}
654
655impl Iterator for ConflictIterator {
656 type Item = (RepoPathBuf, BackendResult<MergedTreeValue>);
657
658 fn next(&mut self) -> Option<Self::Item> {
659 while let Some(top) = self.stack.last_mut() {
660 if let Some((path, tree_values)) = top.entries.pop() {
661 match tree_values.to_tree_merge(&self.store, &path) {
662 Ok(Some(trees)) => {
663 self.stack.push(ConflictsDirItem::from(&trees));
665 }
666 Ok(None) => {
667 return Some((path, Ok(tree_values)));
671 }
672 Err(err) => {
673 return Some((path, Err(err)));
674 }
675 }
676 } else {
677 self.stack.pop();
678 }
679 }
680 None
681 }
682}
683
684pub struct TreeDiffIterator<'matcher> {
686 store: Arc<Store>,
687 stack: Vec<TreeDiffItem>,
688 matcher: &'matcher dyn Matcher,
689}
690
691struct TreeDiffDirItem {
692 entries: Vec<(RepoPathBuf, MergedTreeValue, MergedTreeValue)>,
693}
694
695enum TreeDiffItem {
696 Dir(TreeDiffDirItem),
697 File(RepoPathBuf, MergedTreeValue, MergedTreeValue),
701}
702
703impl<'matcher> TreeDiffIterator<'matcher> {
704 pub fn new(trees1: &Merge<Tree>, trees2: &Merge<Tree>, matcher: &'matcher dyn Matcher) -> Self {
707 assert!(Arc::ptr_eq(trees1.first().store(), trees2.first().store()));
708 let root_dir = RepoPath::root();
709 let mut stack = Vec::new();
710 if !matcher.visit(root_dir).is_nothing() {
711 stack.push(TreeDiffItem::Dir(TreeDiffDirItem::from_trees(
712 root_dir, trees1, trees2, matcher,
713 )));
714 };
715 Self {
716 store: trees1.first().store().clone(),
717 stack,
718 matcher,
719 }
720 }
721
722 fn trees(
724 store: &Arc<Store>,
725 dir: &RepoPath,
726 values: &MergedTreeValue,
727 ) -> BackendResult<Merge<Tree>> {
728 if let Some(trees) = values.to_tree_merge(store, dir)? {
729 Ok(trees)
730 } else {
731 Ok(Merge::resolved(Tree::empty(store.clone(), dir.to_owned())))
732 }
733 }
734}
735
736impl TreeDiffDirItem {
737 fn from_trees(
738 dir: &RepoPath,
739 trees1: &Merge<Tree>,
740 trees2: &Merge<Tree>,
741 matcher: &dyn Matcher,
742 ) -> Self {
743 let mut entries = vec![];
744 for (name, before, after) in merged_tree_entry_diff(trees1, trees2) {
745 let path = dir.join(name);
746 let tree_before = before.is_tree();
747 let tree_after = after.is_tree();
748 let tree_matches = (tree_before || tree_after) && !matcher.visit(&path).is_nothing();
751 let file_matches = (!tree_before || !tree_after) && matcher.matches(&path);
752
753 let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
755 before
756 } else {
757 Merge::absent()
758 };
759 let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
760 after
761 } else {
762 Merge::absent()
763 };
764 if before.is_absent() && after.is_absent() {
765 continue;
766 }
767 entries.push((path, before.cloned(), after.cloned()));
768 }
769 entries.reverse();
770 Self { entries }
771 }
772}
773
774impl Iterator for TreeDiffIterator<'_> {
775 type Item = TreeDiffEntry;
776
777 fn next(&mut self) -> Option<Self::Item> {
778 while let Some(top) = self.stack.last_mut() {
779 let (path, before, after) = match top {
780 TreeDiffItem::Dir(dir) => match dir.entries.pop() {
781 Some(entry) => entry,
782 None => {
783 self.stack.pop().unwrap();
784 continue;
785 }
786 },
787 TreeDiffItem::File(..) => {
788 if let TreeDiffItem::File(path, before, after) = self.stack.pop().unwrap() {
789 return Some(TreeDiffEntry {
790 path,
791 values: Ok((before, after)),
792 });
793 } else {
794 unreachable!();
795 }
796 }
797 };
798
799 let tree_before = before.is_tree();
800 let tree_after = after.is_tree();
801 let post_subdir = if tree_before || tree_after {
802 let (before_tree, after_tree) = match (
803 Self::trees(&self.store, &path, &before),
804 Self::trees(&self.store, &path, &after),
805 ) {
806 (Ok(before_tree), Ok(after_tree)) => (before_tree, after_tree),
807 (Err(before_err), _) => {
808 return Some(TreeDiffEntry {
809 path,
810 values: Err(before_err),
811 });
812 }
813 (_, Err(after_err)) => {
814 return Some(TreeDiffEntry {
815 path,
816 values: Err(after_err),
817 });
818 }
819 };
820 let subdir =
821 TreeDiffDirItem::from_trees(&path, &before_tree, &after_tree, self.matcher);
822 self.stack.push(TreeDiffItem::Dir(subdir));
823 self.stack.len() - 1
824 } else {
825 self.stack.len()
826 };
827 if !tree_before && tree_after {
828 if before.is_present() {
829 return Some(TreeDiffEntry {
830 path,
831 values: Ok((before, Merge::absent())),
832 });
833 }
834 } else if tree_before && !tree_after {
835 if after.is_present() {
836 self.stack.insert(
837 post_subdir,
838 TreeDiffItem::File(path, Merge::absent(), after),
839 );
840 }
841 } else if !tree_before && !tree_after {
842 return Some(TreeDiffEntry {
843 path,
844 values: Ok((before, after)),
845 });
846 }
847 }
848 None
849 }
850}
851
852pub struct TreeDiffStreamImpl<'matcher> {
854 store: Arc<Store>,
855 matcher: &'matcher dyn Matcher,
856 items: BTreeMap<DiffStreamKey, BackendResult<(MergedTreeValue, MergedTreeValue)>>,
860 #[expect(clippy::type_complexity)]
862 pending_trees: VecDeque<(
863 RepoPathBuf,
864 BoxFuture<'matcher, BackendResult<(Merge<Tree>, Merge<Tree>)>>,
865 )>,
866 max_concurrent_reads: usize,
871 max_queued_items: usize,
877}
878
879#[derive(PartialEq, Eq, Clone, Debug)]
882struct DiffStreamKey {
883 path: RepoPathBuf,
884 file_after_dir: bool,
885}
886
887impl DiffStreamKey {
888 fn normal(path: RepoPathBuf) -> Self {
889 DiffStreamKey {
890 path,
891 file_after_dir: false,
892 }
893 }
894
895 fn file_after_dir(path: RepoPathBuf) -> Self {
896 DiffStreamKey {
897 path,
898 file_after_dir: true,
899 }
900 }
901}
902
903impl PartialOrd for DiffStreamKey {
904 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
905 Some(self.cmp(other))
906 }
907}
908
909impl Ord for DiffStreamKey {
910 fn cmp(&self, other: &Self) -> Ordering {
911 if self == other {
912 Ordering::Equal
913 } else if self.file_after_dir && other.path.starts_with(&self.path) {
914 Ordering::Greater
915 } else if other.file_after_dir && self.path.starts_with(&other.path) {
916 Ordering::Less
917 } else {
918 self.path.cmp(&other.path)
919 }
920 }
921}
922
923impl<'matcher> TreeDiffStreamImpl<'matcher> {
924 pub fn new(
927 trees1: &Merge<Tree>,
928 trees2: &Merge<Tree>,
929 matcher: &'matcher dyn Matcher,
930 max_concurrent_reads: usize,
931 ) -> Self {
932 assert!(Arc::ptr_eq(trees1.first().store(), trees2.first().store()));
933 let mut stream = Self {
934 store: trees1.first().store().clone(),
935 matcher,
936 items: BTreeMap::new(),
937 pending_trees: VecDeque::new(),
938 max_concurrent_reads,
939 max_queued_items: 10000,
940 };
941 stream.add_dir_diff_items(RepoPath::root(), trees1, trees2);
942 stream
943 }
944
945 async fn single_tree(
946 store: &Arc<Store>,
947 dir: RepoPathBuf,
948 value: Option<&TreeValue>,
949 ) -> BackendResult<Tree> {
950 match value {
951 Some(TreeValue::Tree(tree_id)) => store.get_tree_async(dir, tree_id).await,
952 _ => Ok(Tree::empty(store.clone(), dir.clone())),
953 }
954 }
955
956 async fn trees(
958 store: Arc<Store>,
959 dir: RepoPathBuf,
960 values: MergedTreeValue,
961 ) -> BackendResult<Merge<Tree>> {
962 if values.is_tree() {
963 let builder: MergeBuilder<Tree> = futures::stream::iter(values.iter())
964 .then(|value| Self::single_tree(&store, dir.clone(), value.as_ref()))
965 .try_collect()
966 .await?;
967 Ok(builder.build())
968 } else {
969 Ok(Merge::resolved(Tree::empty(store, dir)))
970 }
971 }
972
973 fn add_dir_diff_items(&mut self, dir: &RepoPath, trees1: &Merge<Tree>, trees2: &Merge<Tree>) {
974 for (basename, before, after) in merged_tree_entry_diff(trees1, trees2) {
975 let path = dir.join(basename);
976 let tree_before = before.is_tree();
977 let tree_after = after.is_tree();
978 let tree_matches =
981 (tree_before || tree_after) && !self.matcher.visit(&path).is_nothing();
982 let file_matches = (!tree_before || !tree_after) && self.matcher.matches(&path);
983
984 let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
986 before
987 } else {
988 Merge::absent()
989 };
990 let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
991 after
992 } else {
993 Merge::absent()
994 };
995 if before.is_absent() && after.is_absent() {
996 continue;
997 }
998
999 if tree_matches {
1001 let before_tree_future =
1002 Self::trees(self.store.clone(), path.clone(), before.cloned());
1003 let after_tree_future =
1004 Self::trees(self.store.clone(), path.clone(), after.cloned());
1005 let both_trees_future =
1006 async { futures::try_join!(before_tree_future, after_tree_future) };
1007 self.pending_trees
1008 .push_back((path.clone(), Box::pin(both_trees_future)));
1009 }
1010
1011 self.items.insert(
1012 DiffStreamKey::normal(path),
1013 Ok((before.cloned(), after.cloned())),
1014 );
1015 }
1016 }
1017
1018 fn poll_tree_futures(&mut self, cx: &mut Context<'_>) {
1019 let mut pending_index = 0;
1020 while pending_index < self.pending_trees.len()
1021 && (pending_index < self.max_concurrent_reads
1022 || self.items.len() < self.max_queued_items)
1023 {
1024 let (_, future) = &mut self.pending_trees[pending_index];
1025 if let Poll::Ready(tree_diff) = future.as_mut().poll(cx) {
1026 let (dir, _) = self.pending_trees.remove(pending_index).unwrap();
1027 let key = DiffStreamKey::normal(dir);
1028 let (before, after) = self.items.remove(&key).unwrap().unwrap();
1031 if before.is_present() && !before.is_tree() {
1034 self.items
1035 .insert(key.clone(), Ok((before, Merge::absent())));
1036 } else if after.is_present() && !after.is_tree() {
1037 self.items.insert(
1038 DiffStreamKey::file_after_dir(key.path.clone()),
1039 Ok((Merge::absent(), after)),
1040 );
1041 }
1042 match tree_diff {
1043 Ok((trees1, trees2)) => {
1044 self.add_dir_diff_items(&key.path, &trees1, &trees2);
1045 }
1046 Err(err) => {
1047 self.items.insert(DiffStreamKey::normal(key.path), Err(err));
1048 }
1049 }
1050 } else {
1051 pending_index += 1;
1052 }
1053 }
1054 }
1055}
1056
1057impl Stream for TreeDiffStreamImpl<'_> {
1058 type Item = TreeDiffEntry;
1059
1060 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1061 self.poll_tree_futures(cx);
1063
1064 if let Some(entry) = self.items.first_entry() {
1066 match entry.get() {
1067 Err(_) => {
1068 let (key, result) = entry.remove_entry();
1070 Poll::Ready(Some(match result {
1071 Err(err) => TreeDiffEntry {
1072 path: key.path,
1073 values: Err(err),
1074 },
1075 Ok((before, after)) => TreeDiffEntry {
1076 path: key.path,
1077 values: Ok((before, after)),
1078 },
1079 }))
1080 }
1081 Ok((before, after)) if !before.is_tree() && !after.is_tree() => {
1082 let (key, result) = entry.remove_entry();
1084 Poll::Ready(Some(match result {
1085 Err(err) => TreeDiffEntry {
1086 path: key.path,
1087 values: Err(err),
1088 },
1089 Ok((before, after)) => TreeDiffEntry {
1090 path: key.path,
1091 values: Ok((before, after)),
1092 },
1093 }))
1094 }
1095 _ => {
1096 assert!(!self.pending_trees.is_empty());
1099 Poll::Pending
1100 }
1101 }
1102 } else {
1103 Poll::Ready(None)
1104 }
1105 }
1106}
1107
1108pub struct MergedTreeBuilder {
1116 base_tree_id: MergedTreeId,
1117 overrides: BTreeMap<RepoPathBuf, MergedTreeValue>,
1118}
1119
1120impl MergedTreeBuilder {
1121 pub fn new(base_tree_id: MergedTreeId) -> Self {
1123 MergedTreeBuilder {
1124 base_tree_id,
1125 overrides: BTreeMap::new(),
1126 }
1127 }
1128
1129 pub fn set_or_remove(&mut self, path: RepoPathBuf, values: MergedTreeValue) {
1136 if let MergedTreeId::Merge(_) = &self.base_tree_id {
1137 assert!(!values
1138 .iter()
1139 .flatten()
1140 .any(|value| matches!(value, TreeValue::Conflict(_))));
1141 }
1142 self.overrides.insert(path, values);
1143 }
1144
1145 pub fn write_tree(self, store: &Arc<Store>) -> BackendResult<MergedTreeId> {
1147 let base_tree_ids = match self.base_tree_id.clone() {
1148 MergedTreeId::Legacy(base_tree_id) => {
1149 let legacy_base_tree = store.get_tree(RepoPathBuf::root(), &base_tree_id)?;
1150 let base_tree = MergedTree::from_legacy_tree(legacy_base_tree)?;
1151 base_tree.id().to_merge()
1152 }
1153 MergedTreeId::Merge(base_tree_ids) => base_tree_ids,
1154 };
1155 let new_tree_ids = self.write_merged_trees(base_tree_ids, store)?;
1156 match new_tree_ids.simplify().into_resolved() {
1157 Ok(single_tree_id) => Ok(MergedTreeId::resolved(single_tree_id)),
1158 Err(tree_id) => {
1159 let tree = store.get_root_tree(&MergedTreeId::Merge(tree_id))?;
1160 let resolved = tree.resolve()?;
1161 Ok(resolved.id())
1162 }
1163 }
1164 }
1165
1166 fn write_merged_trees(
1167 self,
1168 mut base_tree_ids: Merge<TreeId>,
1169 store: &Arc<Store>,
1170 ) -> BackendResult<Merge<TreeId>> {
1171 let num_sides = self
1172 .overrides
1173 .values()
1174 .map(|value| value.num_sides())
1175 .max()
1176 .unwrap_or(0);
1177 base_tree_ids.pad_to(num_sides, store.empty_tree_id());
1178 let mut tree_builders =
1180 base_tree_ids.map(|base_tree_id| TreeBuilder::new(store.clone(), base_tree_id.clone()));
1181 for (path, values) in self.overrides {
1182 match values.into_resolved() {
1183 Ok(value) => {
1184 for builder in tree_builders.iter_mut() {
1187 builder.set_or_remove(path.clone(), value.clone());
1188 }
1189 }
1190 Err(mut values) => {
1191 values.pad_to(num_sides, &None);
1192 for (builder, value) in zip(tree_builders.iter_mut(), values) {
1195 builder.set_or_remove(path.clone(), value);
1196 }
1197 }
1198 }
1199 }
1200 let merge_builder: MergeBuilder<TreeId> = tree_builders
1204 .into_iter()
1205 .map(|builder| builder.write_tree())
1206 .try_collect()?;
1207 Ok(merge_builder.build())
1208 }
1209}