1use std::borrow::Borrow;
18use std::cmp::max;
19use std::cmp::Ordering;
20use std::collections::BTreeMap;
21use std::collections::VecDeque;
22use std::iter;
23use std::iter::zip;
24use std::pin::Pin;
25use std::sync::Arc;
26use std::task::Context;
27use std::task::Poll;
28use std::vec;
29
30use either::Either;
31use futures::future::BoxFuture;
32use futures::stream::BoxStream;
33use futures::Stream;
34use itertools::EitherOrBoth;
35use itertools::Itertools as _;
36use pollster::FutureExt as _;
37
38use crate::backend;
39use crate::backend::BackendResult;
40use crate::backend::MergedTreeId;
41use crate::backend::TreeId;
42use crate::backend::TreeValue;
43use crate::copies::CopiesTreeDiffEntry;
44use crate::copies::CopiesTreeDiffStream;
45use crate::copies::CopyRecords;
46use crate::matchers::EverythingMatcher;
47use crate::matchers::Matcher;
48use crate::merge::Merge;
49use crate::merge::MergeBuilder;
50use crate::merge::MergedTreeVal;
51use crate::merge::MergedTreeValue;
52use crate::repo_path::RepoPath;
53use crate::repo_path::RepoPathBuf;
54use crate::repo_path::RepoPathComponent;
55use crate::store::Store;
56use crate::tree::try_resolve_file_conflict;
57use crate::tree::Tree;
58use crate::tree_builder::TreeBuilder;
59
60#[derive(PartialEq, Eq, Clone, Debug)]
62pub struct MergedTree {
63 trees: Merge<Tree>,
64}
65
66impl MergedTree {
67 pub fn resolved(tree: Tree) -> Self {
69 MergedTree::new(Merge::resolved(tree))
70 }
71
72 pub fn new(trees: Merge<Tree>) -> Self {
75 debug_assert!(!trees.iter().any(|t| t.has_conflict()));
76 debug_assert!(trees.iter().map(|tree| tree.dir()).all_equal());
77 debug_assert!(trees
78 .iter()
79 .map(|tree| Arc::as_ptr(tree.store()))
80 .all_equal());
81 MergedTree { trees }
82 }
83
84 pub fn from_legacy_tree(tree: Tree) -> BackendResult<Self> {
88 let conflict_ids = tree.conflicts();
89 if conflict_ids.is_empty() {
90 return Ok(MergedTree::resolved(tree));
91 }
92
93 let mut max_tree_count = 1;
95 let store = tree.store();
96 let mut conflicts: Vec<(&RepoPath, MergedTreeValue)> = vec![];
97 for (path, conflict_id) in &conflict_ids {
98 let conflict = store.read_conflict(path, conflict_id)?;
99 max_tree_count = max(max_tree_count, conflict.iter().len());
100 conflicts.push((path, conflict));
101 }
102 let mut tree_builders = Vec::new();
103 tree_builders.resize_with(max_tree_count, || store.tree_builder(tree.id().clone()));
104 for (path, conflict) in conflicts {
105 let terms_padded = conflict.into_iter().chain(iter::repeat(None));
109 for (builder, term) in zip(&mut tree_builders, terms_padded) {
110 builder.set_or_remove(path.to_owned(), term);
111 }
112 }
113
114 let new_trees: Vec<_> = tree_builders
115 .into_iter()
116 .map(|builder| {
117 let tree_id = builder.write_tree()?;
118 store.get_tree(RepoPathBuf::root(), &tree_id)
119 })
120 .try_collect()?;
121 Ok(MergedTree {
122 trees: Merge::from_vec(new_trees),
123 })
124 }
125
126 pub fn as_merge(&self) -> &Merge<Tree> {
128 &self.trees
129 }
130
131 pub fn take(self) -> Merge<Tree> {
133 self.trees
134 }
135
136 pub fn dir(&self) -> &RepoPath {
138 self.trees.first().dir()
139 }
140
141 pub fn store(&self) -> &Arc<Store> {
143 self.trees.first().store()
144 }
145
146 pub fn names<'a>(&'a self) -> Box<dyn Iterator<Item = &'a RepoPathComponent> + 'a> {
148 Box::new(all_tree_basenames(&self.trees))
149 }
150
151 pub fn value(&self, basename: &RepoPathComponent) -> MergedTreeVal<'_> {
156 trees_value(&self.trees, basename)
157 }
158
159 pub fn resolve(&self) -> BackendResult<MergedTree> {
162 let merged = merge_trees(&self.trees).block_on()?;
163 let simplified = merged.simplify();
168 if cfg!(debug_assertions) {
172 let re_merged = merge_trees(&simplified).block_on().unwrap();
173 debug_assert_eq!(re_merged, simplified);
174 }
175 Ok(MergedTree { trees: simplified })
176 }
177
178 pub fn conflicts(
184 &self,
185 ) -> impl Iterator<Item = (RepoPathBuf, BackendResult<MergedTreeValue>)> + use<> {
186 ConflictIterator::new(self)
187 }
188
189 pub fn has_conflict(&self) -> bool {
191 !self.trees.is_resolved()
192 }
193
194 pub async fn sub_tree(&self, name: &RepoPathComponent) -> BackendResult<Option<MergedTree>> {
198 match self.value(name).into_resolved() {
199 Ok(Some(TreeValue::Tree(sub_tree_id))) => {
200 let subdir = self.dir().join(name);
201 Ok(Some(MergedTree::resolved(
202 self.store().get_tree_async(subdir, sub_tree_id).await?,
203 )))
204 }
205 Ok(_) => Ok(None),
206 Err(merge) => {
207 let trees = merge
208 .try_map_async(|value| async move {
209 match value {
210 Some(TreeValue::Tree(sub_tree_id)) => {
211 let subdir = self.dir().join(name);
212 self.store().get_tree_async(subdir, sub_tree_id).await
213 }
214 _ => {
215 let subdir = self.dir().join(name);
216 Ok(Tree::empty(self.store().clone(), subdir))
217 }
218 }
219 })
220 .await?;
221 Ok(Some(MergedTree { trees }))
222 }
223 }
224 }
225
226 pub fn path_value(&self, path: &RepoPath) -> BackendResult<MergedTreeValue> {
230 self.path_value_async(path).block_on()
231 }
232
233 pub async fn path_value_async(&self, path: &RepoPath) -> BackendResult<MergedTreeValue> {
235 assert_eq!(self.dir(), RepoPath::root());
236 match path.split() {
237 Some((dir, basename)) => match self.sub_tree_recursive(dir).await? {
238 None => Ok(Merge::absent()),
239 Some(tree) => Ok(tree.value(basename).cloned()),
240 },
241 None => Ok(self
242 .trees
243 .map(|tree| Some(TreeValue::Tree(tree.id().clone())))),
244 }
245 }
246
247 pub fn id(&self) -> MergedTreeId {
249 MergedTreeId::Merge(self.trees.map(|tree| tree.id().clone()))
250 }
251
252 pub async fn sub_tree_recursive(&self, path: &RepoPath) -> BackendResult<Option<MergedTree>> {
254 let mut current_tree = self.clone();
255 for name in path.components() {
256 match current_tree.sub_tree(name).await? {
257 None => {
258 return Ok(None);
259 }
260 Some(sub_tree) => {
261 current_tree = sub_tree;
262 }
263 }
264 }
265 Ok(Some(current_tree))
266 }
267
268 pub fn entries(&self) -> TreeEntriesIterator<'static> {
278 self.entries_matching(&EverythingMatcher)
279 }
280
281 pub fn entries_matching<'matcher>(
283 &self,
284 matcher: &'matcher dyn Matcher,
285 ) -> TreeEntriesIterator<'matcher> {
286 TreeEntriesIterator::new(&self.trees, matcher)
287 }
288
289 pub fn diff_stream<'matcher>(
294 &self,
295 other: &MergedTree,
296 matcher: &'matcher dyn Matcher,
297 ) -> TreeDiffStream<'matcher> {
298 let concurrency = self.store().concurrency();
299 if concurrency <= 1 {
300 Box::pin(futures::stream::iter(TreeDiffIterator::new(
301 &self.trees,
302 &other.trees,
303 matcher,
304 )))
305 } else {
306 Box::pin(TreeDiffStreamImpl::new(
307 &self.trees,
308 &other.trees,
309 matcher,
310 concurrency,
311 ))
312 }
313 }
314
315 pub fn diff_stream_with_copies<'a>(
317 &self,
318 other: &MergedTree,
319 matcher: &'a dyn Matcher,
320 copy_records: &'a CopyRecords,
321 ) -> BoxStream<'a, CopiesTreeDiffEntry> {
322 let stream = self.diff_stream(other, matcher);
323 Box::pin(CopiesTreeDiffStream::new(
324 stream,
325 self.clone(),
326 other.clone(),
327 copy_records,
328 ))
329 }
330
331 pub fn merge(&self, base: &MergedTree, other: &MergedTree) -> BackendResult<MergedTree> {
334 self.merge_no_resolve(base, other).resolve()
335 }
336
337 pub fn merge_no_resolve(&self, base: &MergedTree, other: &MergedTree) -> MergedTree {
340 let nested = Merge::from_vec(vec![
341 self.trees.clone(),
342 base.trees.clone(),
343 other.trees.clone(),
344 ]);
345 MergedTree {
346 trees: nested.flatten().simplify(),
347 }
348 }
349}
350
351pub struct TreeDiffEntry {
353 pub path: RepoPathBuf,
355 pub values: BackendResult<(MergedTreeValue, MergedTreeValue)>,
357}
358
359pub type TreeDiffStream<'matcher> = BoxStream<'matcher, TreeDiffEntry>;
363
364fn all_tree_basenames(trees: &Merge<Tree>) -> impl Iterator<Item = &RepoPathComponent> {
365 trees
366 .iter()
367 .map(|tree| tree.data().names())
368 .kmerge()
369 .dedup()
370}
371
372fn all_tree_entries(
373 trees: &Merge<Tree>,
374) -> impl Iterator<Item = (&RepoPathComponent, MergedTreeVal<'_>)> {
375 if let Some(tree) = trees.as_resolved() {
376 let iter = tree
377 .entries_non_recursive()
378 .map(|entry| (entry.name(), Merge::normal(entry.value())));
379 Either::Left(iter)
380 } else {
381 let iter = all_merged_tree_entries(trees).map(|(name, values)| {
382 let values = match values.resolve_trivial() {
384 Some(resolved) => Merge::resolved(*resolved),
385 None => values,
386 };
387 (name, values)
388 });
389 Either::Right(iter)
390 }
391}
392
393fn all_merged_tree_entries(
397 trees: &Merge<Tree>,
398) -> impl Iterator<Item = (&RepoPathComponent, MergedTreeVal<'_>)> {
399 let mut entries_iters = trees
400 .iter()
401 .map(|tree| tree.entries_non_recursive().peekable())
402 .collect_vec();
403 iter::from_fn(move || {
404 let next_name = entries_iters
405 .iter_mut()
406 .filter_map(|iter| iter.peek())
407 .map(|entry| entry.name())
408 .min()?;
409 let values: MergeBuilder<_> = entries_iters
410 .iter_mut()
411 .map(|iter| {
412 let entry = iter.next_if(|entry| entry.name() == next_name)?;
413 Some(entry.value())
414 })
415 .collect();
416 Some((next_name, values.build()))
417 })
418}
419
420fn merged_tree_entry_diff<'a>(
421 trees1: &'a Merge<Tree>,
422 trees2: &'a Merge<Tree>,
423) -> impl Iterator<Item = (&'a RepoPathComponent, MergedTreeVal<'a>, MergedTreeVal<'a>)> {
424 itertools::merge_join_by(
425 all_tree_entries(trees1),
426 all_tree_entries(trees2),
427 |(name1, _), (name2, _)| name1.cmp(name2),
428 )
429 .map(|entry| match entry {
430 EitherOrBoth::Both((name, value1), (_, value2)) => (name, value1, value2),
431 EitherOrBoth::Left((name, value1)) => (name, value1, Merge::absent()),
432 EitherOrBoth::Right((name, value2)) => (name, Merge::absent(), value2),
433 })
434 .filter(|(_, value1, value2)| value1 != value2)
435}
436
437fn trees_value<'a>(trees: &'a Merge<Tree>, basename: &RepoPathComponent) -> MergedTreeVal<'a> {
438 if let Some(tree) = trees.as_resolved() {
439 return Merge::resolved(tree.value(basename));
440 }
441 let value = trees.map(|tree| tree.value(basename));
442 if let Some(resolved) = value.resolve_trivial() {
443 return Merge::resolved(*resolved);
444 }
445 value
446}
447
448async fn merge_trees(merge: &Merge<Tree>) -> BackendResult<Merge<Tree>> {
451 if let Some(tree) = merge.resolve_trivial() {
452 return Ok(Merge::resolved(tree.clone()));
453 }
454
455 let base_tree = merge.first();
456 let store = base_tree.store();
457 let dir = base_tree.dir();
458 let mut new_tree = backend::Tree::default();
462 let mut conflicts = vec![];
463 for (basename, path_merge) in all_merged_tree_entries(merge) {
465 let path = dir.join(basename);
466 let path_merge = merge_tree_values(store, &path, &path_merge).await?;
467 match path_merge.into_resolved() {
468 Ok(value) => {
469 new_tree.set_or_remove(basename, value);
470 }
471 Err(path_merge) => {
472 conflicts.push((basename, path_merge.into_iter()));
473 }
474 };
475 }
476 if conflicts.is_empty() {
477 let new_tree_id = store.write_tree(dir, new_tree).await?;
478 Ok(Merge::resolved(new_tree_id))
479 } else {
480 let tree_count = merge.iter().len();
484 let mut new_trees = Vec::with_capacity(tree_count);
485 for _ in 0..tree_count {
486 for (basename, path_conflict) in &mut conflicts {
487 new_tree.set_or_remove(basename, path_conflict.next().unwrap());
488 }
489 let tree = store.write_tree(dir, new_tree.clone()).await?;
490 new_trees.push(tree);
491 }
492 Ok(Merge::from_vec(new_trees))
493 }
494}
495
496async fn merge_tree_values(
501 store: &Arc<Store>,
502 path: &RepoPath,
503 values: &MergedTreeVal<'_>,
504) -> BackendResult<MergedTreeValue> {
505 if let Some(resolved) = values.resolve_trivial() {
506 return Ok(Merge::resolved(resolved.cloned()));
507 }
508
509 if let Some(trees) = values.to_tree_merge(store, path).await? {
510 let empty_tree_id = store.empty_tree_id();
513 let merged_tree = Box::pin(merge_trees(&trees)).await?;
514 Ok(merged_tree
515 .map(|tree| (tree.id() != empty_tree_id).then(|| TreeValue::Tree(tree.id().clone()))))
516 } else {
517 let maybe_resolved = try_resolve_file_values(store, path, values).await?;
518 Ok(maybe_resolved.unwrap_or_else(|| values.cloned()))
519 }
520}
521
522pub async fn resolve_file_values(
526 store: &Arc<Store>,
527 path: &RepoPath,
528 values: MergedTreeValue,
529) -> BackendResult<MergedTreeValue> {
530 if let Some(resolved) = values.resolve_trivial() {
531 return Ok(Merge::resolved(resolved.clone()));
532 }
533
534 let maybe_resolved = try_resolve_file_values(store, path, &values).await?;
535 Ok(maybe_resolved.unwrap_or(values))
536}
537
538async fn try_resolve_file_values<T: Borrow<TreeValue>>(
539 store: &Arc<Store>,
540 path: &RepoPath,
541 values: &Merge<Option<T>>,
542) -> BackendResult<Option<MergedTreeValue>> {
543 let simplified = values
546 .map(|value| value.as_ref().map(Borrow::borrow))
547 .simplify();
548 if let Some(resolved) = try_resolve_file_conflict(store, path, &simplified).await? {
551 Ok(Some(Merge::normal(resolved)))
552 } else {
553 Ok(None)
555 }
556}
557
558pub struct TreeEntriesIterator<'matcher> {
560 store: Arc<Store>,
561 stack: Vec<TreeEntriesDirItem>,
562 matcher: &'matcher dyn Matcher,
563}
564
565struct TreeEntriesDirItem {
566 entries: Vec<(RepoPathBuf, MergedTreeValue)>,
567}
568
569impl TreeEntriesDirItem {
570 fn new(trees: &Merge<Tree>, matcher: &dyn Matcher) -> Self {
571 let mut entries = vec![];
572 let dir = trees.first().dir();
573 for (name, value) in all_tree_entries(trees) {
574 let path = dir.join(name);
575 if value.is_tree() {
576 if matcher.visit(&path).is_nothing() {
578 continue;
579 }
580 } else if !matcher.matches(&path) {
581 continue;
582 }
583 entries.push((path, value.cloned()));
584 }
585 entries.reverse();
586 TreeEntriesDirItem { entries }
587 }
588}
589
590impl<'matcher> TreeEntriesIterator<'matcher> {
591 fn new(trees: &Merge<Tree>, matcher: &'matcher dyn Matcher) -> Self {
592 Self {
593 store: trees.first().store().clone(),
594 stack: vec![TreeEntriesDirItem::new(trees, matcher)],
595 matcher,
596 }
597 }
598}
599
600impl Iterator for TreeEntriesIterator<'_> {
601 type Item = (RepoPathBuf, BackendResult<MergedTreeValue>);
602
603 fn next(&mut self) -> Option<Self::Item> {
604 while let Some(top) = self.stack.last_mut() {
605 if let Some((path, value)) = top.entries.pop() {
606 let maybe_trees = match value.to_tree_merge(&self.store, &path).block_on() {
607 Ok(maybe_trees) => maybe_trees,
608 Err(err) => return Some((path, Err(err))),
609 };
610 if let Some(trees) = maybe_trees {
611 self.stack
612 .push(TreeEntriesDirItem::new(&trees, self.matcher));
613 } else {
614 return Some((path, Ok(value)));
615 }
616 } else {
617 self.stack.pop();
618 }
619 }
620 None
621 }
622}
623
624struct ConflictsDirItem {
627 entries: Vec<(RepoPathBuf, MergedTreeValue)>,
628}
629
630impl From<&Merge<Tree>> for ConflictsDirItem {
631 fn from(trees: &Merge<Tree>) -> Self {
632 let dir = trees.first().dir();
633 if trees.is_resolved() {
634 return ConflictsDirItem { entries: vec![] };
635 }
636
637 let mut entries = vec![];
638 for (basename, value) in all_tree_entries(trees) {
639 if !value.is_resolved() {
640 entries.push((dir.join(basename), value.cloned()));
641 }
642 }
643 entries.reverse();
644 ConflictsDirItem { entries }
645 }
646}
647
648struct ConflictIterator {
649 store: Arc<Store>,
650 stack: Vec<ConflictsDirItem>,
651}
652
653impl ConflictIterator {
654 fn new(tree: &MergedTree) -> Self {
655 ConflictIterator {
656 store: tree.store().clone(),
657 stack: vec![ConflictsDirItem::from(&tree.trees)],
658 }
659 }
660}
661
662impl Iterator for ConflictIterator {
663 type Item = (RepoPathBuf, BackendResult<MergedTreeValue>);
664
665 fn next(&mut self) -> Option<Self::Item> {
666 while let Some(top) = self.stack.last_mut() {
667 if let Some((path, tree_values)) = top.entries.pop() {
668 match tree_values.to_tree_merge(&self.store, &path).block_on() {
669 Ok(Some(trees)) => {
670 self.stack.push(ConflictsDirItem::from(&trees));
672 }
673 Ok(None) => {
674 return Some((path, Ok(tree_values)));
678 }
679 Err(err) => {
680 return Some((path, Err(err)));
681 }
682 }
683 } else {
684 self.stack.pop();
685 }
686 }
687 None
688 }
689}
690
691pub struct TreeDiffIterator<'matcher> {
693 store: Arc<Store>,
694 stack: Vec<TreeDiffItem>,
695 matcher: &'matcher dyn Matcher,
696}
697
698struct TreeDiffDirItem {
699 entries: Vec<(RepoPathBuf, MergedTreeValue, MergedTreeValue)>,
700}
701
702enum TreeDiffItem {
703 Dir(TreeDiffDirItem),
704 File(RepoPathBuf, MergedTreeValue, MergedTreeValue),
708}
709
710impl<'matcher> TreeDiffIterator<'matcher> {
711 pub fn new(trees1: &Merge<Tree>, trees2: &Merge<Tree>, matcher: &'matcher dyn Matcher) -> Self {
714 assert!(Arc::ptr_eq(trees1.first().store(), trees2.first().store()));
715 let root_dir = RepoPath::root();
716 let mut stack = Vec::new();
717 if !matcher.visit(root_dir).is_nothing() {
718 stack.push(TreeDiffItem::Dir(TreeDiffDirItem::from_trees(
719 root_dir, trees1, trees2, matcher,
720 )));
721 };
722 Self {
723 store: trees1.first().store().clone(),
724 stack,
725 matcher,
726 }
727 }
728
729 fn trees(
731 store: &Arc<Store>,
732 dir: &RepoPath,
733 values: &MergedTreeValue,
734 ) -> BackendResult<Merge<Tree>> {
735 if let Some(trees) = values.to_tree_merge(store, dir).block_on()? {
736 Ok(trees)
737 } else {
738 Ok(Merge::resolved(Tree::empty(store.clone(), dir.to_owned())))
739 }
740 }
741}
742
743impl TreeDiffDirItem {
744 fn from_trees(
745 dir: &RepoPath,
746 trees1: &Merge<Tree>,
747 trees2: &Merge<Tree>,
748 matcher: &dyn Matcher,
749 ) -> Self {
750 let mut entries = vec![];
751 for (name, before, after) in merged_tree_entry_diff(trees1, trees2) {
752 let path = dir.join(name);
753 let tree_before = before.is_tree();
754 let tree_after = after.is_tree();
755 let tree_matches = (tree_before || tree_after) && !matcher.visit(&path).is_nothing();
758 let file_matches = (!tree_before || !tree_after) && matcher.matches(&path);
759
760 let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
762 before
763 } else {
764 Merge::absent()
765 };
766 let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
767 after
768 } else {
769 Merge::absent()
770 };
771 if before.is_absent() && after.is_absent() {
772 continue;
773 }
774 entries.push((path, before.cloned(), after.cloned()));
775 }
776 entries.reverse();
777 Self { entries }
778 }
779}
780
781impl Iterator for TreeDiffIterator<'_> {
782 type Item = TreeDiffEntry;
783
784 fn next(&mut self) -> Option<Self::Item> {
785 while let Some(top) = self.stack.last_mut() {
786 let (path, before, after) = match top {
787 TreeDiffItem::Dir(dir) => match dir.entries.pop() {
788 Some(entry) => entry,
789 None => {
790 self.stack.pop().unwrap();
791 continue;
792 }
793 },
794 TreeDiffItem::File(..) => {
795 if let TreeDiffItem::File(path, before, after) = self.stack.pop().unwrap() {
796 return Some(TreeDiffEntry {
797 path,
798 values: Ok((before, after)),
799 });
800 } else {
801 unreachable!();
802 }
803 }
804 };
805
806 let tree_before = before.is_tree();
807 let tree_after = after.is_tree();
808 let post_subdir = if tree_before || tree_after {
809 let (before_tree, after_tree) = match (
810 Self::trees(&self.store, &path, &before),
811 Self::trees(&self.store, &path, &after),
812 ) {
813 (Ok(before_tree), Ok(after_tree)) => (before_tree, after_tree),
814 (Err(before_err), _) => {
815 return Some(TreeDiffEntry {
816 path,
817 values: Err(before_err),
818 });
819 }
820 (_, Err(after_err)) => {
821 return Some(TreeDiffEntry {
822 path,
823 values: Err(after_err),
824 });
825 }
826 };
827 let subdir =
828 TreeDiffDirItem::from_trees(&path, &before_tree, &after_tree, self.matcher);
829 self.stack.push(TreeDiffItem::Dir(subdir));
830 self.stack.len() - 1
831 } else {
832 self.stack.len()
833 };
834 if !tree_before && tree_after {
835 if before.is_present() {
836 return Some(TreeDiffEntry {
837 path,
838 values: Ok((before, Merge::absent())),
839 });
840 }
841 } else if tree_before && !tree_after {
842 if after.is_present() {
843 self.stack.insert(
844 post_subdir,
845 TreeDiffItem::File(path, Merge::absent(), after),
846 );
847 }
848 } else if !tree_before && !tree_after {
849 return Some(TreeDiffEntry {
850 path,
851 values: Ok((before, after)),
852 });
853 }
854 }
855 None
856 }
857}
858
859pub struct TreeDiffStreamImpl<'matcher> {
861 store: Arc<Store>,
862 matcher: &'matcher dyn Matcher,
863 items: BTreeMap<DiffStreamKey, BackendResult<(MergedTreeValue, MergedTreeValue)>>,
867 #[expect(clippy::type_complexity)]
869 pending_trees: VecDeque<(
870 RepoPathBuf,
871 BoxFuture<'matcher, BackendResult<(Merge<Tree>, Merge<Tree>)>>,
872 )>,
873 max_concurrent_reads: usize,
878 max_queued_items: usize,
884}
885
886#[derive(PartialEq, Eq, Clone, Debug)]
889struct DiffStreamKey {
890 path: RepoPathBuf,
891 file_after_dir: bool,
892}
893
894impl DiffStreamKey {
895 fn normal(path: RepoPathBuf) -> Self {
896 DiffStreamKey {
897 path,
898 file_after_dir: false,
899 }
900 }
901
902 fn file_after_dir(path: RepoPathBuf) -> Self {
903 DiffStreamKey {
904 path,
905 file_after_dir: true,
906 }
907 }
908}
909
910impl PartialOrd for DiffStreamKey {
911 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
912 Some(self.cmp(other))
913 }
914}
915
916impl Ord for DiffStreamKey {
917 fn cmp(&self, other: &Self) -> Ordering {
918 if self == other {
919 Ordering::Equal
920 } else if self.file_after_dir && other.path.starts_with(&self.path) {
921 Ordering::Greater
922 } else if other.file_after_dir && self.path.starts_with(&other.path) {
923 Ordering::Less
924 } else {
925 self.path.cmp(&other.path)
926 }
927 }
928}
929
930impl<'matcher> TreeDiffStreamImpl<'matcher> {
931 pub fn new(
934 trees1: &Merge<Tree>,
935 trees2: &Merge<Tree>,
936 matcher: &'matcher dyn Matcher,
937 max_concurrent_reads: usize,
938 ) -> Self {
939 assert!(Arc::ptr_eq(trees1.first().store(), trees2.first().store()));
940 let mut stream = Self {
941 store: trees1.first().store().clone(),
942 matcher,
943 items: BTreeMap::new(),
944 pending_trees: VecDeque::new(),
945 max_concurrent_reads,
946 max_queued_items: 10000,
947 };
948 stream.add_dir_diff_items(RepoPath::root(), trees1, trees2);
949 stream
950 }
951
952 async fn single_tree(
953 store: &Arc<Store>,
954 dir: RepoPathBuf,
955 value: Option<&TreeValue>,
956 ) -> BackendResult<Tree> {
957 match value {
958 Some(TreeValue::Tree(tree_id)) => store.get_tree_async(dir, tree_id).await,
959 _ => Ok(Tree::empty(store.clone(), dir.clone())),
960 }
961 }
962
963 async fn trees(
965 store: Arc<Store>,
966 dir: RepoPathBuf,
967 values: MergedTreeValue,
968 ) -> BackendResult<Merge<Tree>> {
969 if values.is_tree() {
970 values
971 .try_map_async(|value| Self::single_tree(&store, dir.clone(), value.as_ref()))
972 .await
973 } else {
974 Ok(Merge::resolved(Tree::empty(store, dir)))
975 }
976 }
977
978 fn add_dir_diff_items(&mut self, dir: &RepoPath, trees1: &Merge<Tree>, trees2: &Merge<Tree>) {
979 for (basename, before, after) in merged_tree_entry_diff(trees1, trees2) {
980 let path = dir.join(basename);
981 let tree_before = before.is_tree();
982 let tree_after = after.is_tree();
983 let tree_matches =
986 (tree_before || tree_after) && !self.matcher.visit(&path).is_nothing();
987 let file_matches = (!tree_before || !tree_after) && self.matcher.matches(&path);
988
989 let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
991 before
992 } else {
993 Merge::absent()
994 };
995 let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
996 after
997 } else {
998 Merge::absent()
999 };
1000 if before.is_absent() && after.is_absent() {
1001 continue;
1002 }
1003
1004 if tree_matches {
1006 let before_tree_future =
1007 Self::trees(self.store.clone(), path.clone(), before.cloned());
1008 let after_tree_future =
1009 Self::trees(self.store.clone(), path.clone(), after.cloned());
1010 let both_trees_future =
1011 async { futures::try_join!(before_tree_future, after_tree_future) };
1012 self.pending_trees
1013 .push_back((path.clone(), Box::pin(both_trees_future)));
1014 }
1015
1016 self.items.insert(
1017 DiffStreamKey::normal(path),
1018 Ok((before.cloned(), after.cloned())),
1019 );
1020 }
1021 }
1022
1023 fn poll_tree_futures(&mut self, cx: &mut Context<'_>) {
1024 let mut pending_index = 0;
1025 while pending_index < self.pending_trees.len()
1026 && (pending_index < self.max_concurrent_reads
1027 || self.items.len() < self.max_queued_items)
1028 {
1029 let (_, future) = &mut self.pending_trees[pending_index];
1030 if let Poll::Ready(tree_diff) = future.as_mut().poll(cx) {
1031 let (dir, _) = self.pending_trees.remove(pending_index).unwrap();
1032 let key = DiffStreamKey::normal(dir);
1033 let (before, after) = self.items.remove(&key).unwrap().unwrap();
1036 if before.is_present() && !before.is_tree() {
1039 self.items
1040 .insert(key.clone(), Ok((before, Merge::absent())));
1041 } else if after.is_present() && !after.is_tree() {
1042 self.items.insert(
1043 DiffStreamKey::file_after_dir(key.path.clone()),
1044 Ok((Merge::absent(), after)),
1045 );
1046 }
1047 match tree_diff {
1048 Ok((trees1, trees2)) => {
1049 self.add_dir_diff_items(&key.path, &trees1, &trees2);
1050 }
1051 Err(err) => {
1052 self.items.insert(DiffStreamKey::normal(key.path), Err(err));
1053 }
1054 }
1055 } else {
1056 pending_index += 1;
1057 }
1058 }
1059 }
1060}
1061
1062impl Stream for TreeDiffStreamImpl<'_> {
1063 type Item = TreeDiffEntry;
1064
1065 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1066 self.poll_tree_futures(cx);
1068
1069 if let Some(entry) = self.items.first_entry() {
1071 match entry.get() {
1072 Err(_) => {
1073 let (key, result) = entry.remove_entry();
1075 Poll::Ready(Some(match result {
1076 Err(err) => TreeDiffEntry {
1077 path: key.path,
1078 values: Err(err),
1079 },
1080 Ok((before, after)) => TreeDiffEntry {
1081 path: key.path,
1082 values: Ok((before, after)),
1083 },
1084 }))
1085 }
1086 Ok((before, after)) if !before.is_tree() && !after.is_tree() => {
1087 let (key, result) = entry.remove_entry();
1089 Poll::Ready(Some(match result {
1090 Err(err) => TreeDiffEntry {
1091 path: key.path,
1092 values: Err(err),
1093 },
1094 Ok((before, after)) => TreeDiffEntry {
1095 path: key.path,
1096 values: Ok((before, after)),
1097 },
1098 }))
1099 }
1100 _ => {
1101 assert!(!self.pending_trees.is_empty());
1104 Poll::Pending
1105 }
1106 }
1107 } else {
1108 Poll::Ready(None)
1109 }
1110 }
1111}
1112
1113pub struct MergedTreeBuilder {
1121 base_tree_id: MergedTreeId,
1122 overrides: BTreeMap<RepoPathBuf, MergedTreeValue>,
1123}
1124
1125impl MergedTreeBuilder {
1126 pub fn new(base_tree_id: MergedTreeId) -> Self {
1128 MergedTreeBuilder {
1129 base_tree_id,
1130 overrides: BTreeMap::new(),
1131 }
1132 }
1133
1134 pub fn set_or_remove(&mut self, path: RepoPathBuf, values: MergedTreeValue) {
1141 if let MergedTreeId::Merge(_) = &self.base_tree_id {
1142 assert!(!values
1143 .iter()
1144 .flatten()
1145 .any(|value| matches!(value, TreeValue::Conflict(_))));
1146 }
1147 self.overrides.insert(path, values);
1148 }
1149
1150 pub fn write_tree(self, store: &Arc<Store>) -> BackendResult<MergedTreeId> {
1152 let base_tree_ids = match self.base_tree_id.clone() {
1153 MergedTreeId::Legacy(base_tree_id) => {
1154 let legacy_base_tree = store.get_tree(RepoPathBuf::root(), &base_tree_id)?;
1155 let base_tree = MergedTree::from_legacy_tree(legacy_base_tree)?;
1156 base_tree.id().to_merge()
1157 }
1158 MergedTreeId::Merge(base_tree_ids) => base_tree_ids,
1159 };
1160 let new_tree_ids = self.write_merged_trees(base_tree_ids, store)?;
1161 match new_tree_ids.simplify().into_resolved() {
1162 Ok(single_tree_id) => Ok(MergedTreeId::resolved(single_tree_id)),
1163 Err(tree_id) => {
1164 let tree = store.get_root_tree(&MergedTreeId::Merge(tree_id))?;
1165 let resolved = tree.resolve()?;
1166 Ok(resolved.id())
1167 }
1168 }
1169 }
1170
1171 fn write_merged_trees(
1172 self,
1173 mut base_tree_ids: Merge<TreeId>,
1174 store: &Arc<Store>,
1175 ) -> BackendResult<Merge<TreeId>> {
1176 let num_sides = self
1177 .overrides
1178 .values()
1179 .map(|value| value.num_sides())
1180 .max()
1181 .unwrap_or(0);
1182 base_tree_ids.pad_to(num_sides, store.empty_tree_id());
1183 let mut tree_builders =
1185 base_tree_ids.map(|base_tree_id| TreeBuilder::new(store.clone(), base_tree_id.clone()));
1186 for (path, values) in self.overrides {
1187 match values.into_resolved() {
1188 Ok(value) => {
1189 for builder in tree_builders.iter_mut() {
1192 builder.set_or_remove(path.clone(), value.clone());
1193 }
1194 }
1195 Err(mut values) => {
1196 values.pad_to(num_sides, &None);
1197 for (builder, value) in zip(tree_builders.iter_mut(), values) {
1200 builder.set_or_remove(path.clone(), value);
1201 }
1202 }
1203 }
1204 }
1205 let merge_builder: MergeBuilder<TreeId> = tree_builders
1209 .into_iter()
1210 .map(|builder| builder.write_tree())
1211 .try_collect()?;
1212 Ok(merge_builder.build())
1213 }
1214}